[ZEPPELIN-2216] general solution to precode. refactoring jdbc precode

This commit is contained in:
Tinkoff DWH 2017-04-04 11:42:54 +05:00
parent 20b72758b4
commit c0436a248e
8 changed files with 57 additions and 25 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.1 KiB

View file

@ -121,7 +121,7 @@ The JDBC interpreter properties are defined by default like below.
<tr>
<td>default.precode</td>
<td></td>
<td>Some SQL which executes while opening connection</td>
<td>Some SQL which executes every time after initialization of the interpreter (see [Binding mode](../manual/interpreters.md#interpreter-binding-mode))</td>
</tr>
</table>

View file

@ -113,3 +113,11 @@ interpreter.start()
The above code will start interpreter thread inside your process. Once the interpreter is started you can configure zeppelin to connect to RemoteInterpreter by checking **Connect to existing process** checkbox and then provide **Host** and **Port** on which interpreter process is listening as shown in the image below:
<img src="../assets/themes/zeppelin/img/screenshots/existing_interpreter.png" width="450px">
## Precode
Snippet of code (language of interpreter) that executes after initialization of the interpreter depends on [Binding mode](#interpreter-binding-mode). To configure add parameter with class of interpreter (`zeppelin.<ClassName>.precode`) except JDBCInterpreter ([JDBC precode](../interpreter/jdbc.md#usage-precode)).
<img src="../assets/themes/zeppelin/img/screenshots/interpreter_precode.png" width="800px">

View file

@ -343,9 +343,6 @@ public class JDBCInterpreter extends Interpreter {
if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) {
createConnectionPool(url, user, propertyKey, properties);
try (Connection connection = DriverManager.getConnection(jdbcDriver)) {
executePrecode(connection, propertyKey);
}
}
return DriverManager.getConnection(jdbcDriver);
}
@ -550,18 +547,19 @@ public class JDBCInterpreter extends Interpreter {
return queries;
}
private void executePrecode(Connection connection, String propertyKey) throws SQLException {
String precode = getProperty(String.format(PRECODE_KEY_TEMPLATE, propertyKey));
if (StringUtils.isNotBlank(precode)) {
precode = StringUtils.trim(precode);
logger.info("Run SQL precode '{}'", precode);
try (Statement statement = connection.createStatement()) {
statement.execute(precode);
if (!connection.getAutoCommit()) {
connection.commit();
public InterpreterResult executePrecode(InterpreterContext interpreterContext) {
InterpreterResult interpreterResult = null;
for (String propertyKey : basePropretiesMap.keySet()) {
String precode = getProperty(String.format("%s.precode", propertyKey));
if (StringUtils.isNotBlank(precode)) {
interpreterResult = executeSql(propertyKey, precode, interpreterContext);
if (interpreterResult.code() != Code.SUCCESS) {
break;
}
}
}
return interpreterResult;
}
private InterpreterResult executeSql(String propertyKey, String sql,

View file

@ -400,6 +400,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
properties.setProperty(DEFAULT_PRECODE, "SET @testVariable=1");
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
jdbcInterpreter.open();
jdbcInterpreter.executePrecode(interpreterContext);
String sqlQuery = "select @testVariable";
@ -417,13 +418,15 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
properties.setProperty(DEFAULT_PRECODE, "incorrect command");
properties.setProperty(DEFAULT_PRECODE, "select 1");
properties.setProperty("incorrect.driver", "org.h2.Driver");
properties.setProperty("incorrect.url", getJdbcConnection());
properties.setProperty("incorrect.user", "");
properties.setProperty("incorrect.password", "");
properties.setProperty(String.format(PRECODE_KEY_TEMPLATE, "incorrect"), "incorrect command");
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
jdbcInterpreter.open();
String sqlQuery = "select 1";
InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);
InterpreterResult interpreterResult = jdbcInterpreter.executePrecode(interpreterContext);
assertEquals(InterpreterResult.Code.ERROR, interpreterResult.code());
assertEquals(InterpreterResult.Type.TEXT, interpreterResult.message().get(0).getType());
@ -439,6 +442,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
properties.setProperty(String.format(PRECODE_KEY_TEMPLATE, "anotherPrefix"), "SET @testVariable=2");
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
jdbcInterpreter.open();
jdbcInterpreter.executePrecode(interpreterContext);
String sqlQuery = "(anotherPrefix) select @testVariable";

View file

@ -64,6 +64,19 @@ public abstract class Interpreter {
@ZeppelinApi
public abstract void close();
/**
* Run precode if exists.
*/
@ZeppelinApi
public InterpreterResult executePrecode(InterpreterContext interpreterContext) {
String simpleName = this.getClass().getSimpleName();
String precode = getProperty(String.format("zeppelin.%s.precode", simpleName));
if (StringUtils.isNotBlank(precode)) {
return interpret(precode, interpreterContext);
}
return null;
}
/**
* Run code and return result, in synchronous way.
*

View file

@ -21,7 +21,6 @@ import java.net.URL;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@ -73,6 +72,11 @@ public class LazyOpenInterpreter
}
}
@Override
public InterpreterResult executePrecode(InterpreterContext interpreterContext) {
return intp.executePrecode(interpreterContext);
}
@Override
public void close() {
synchronized (intp) {
@ -151,7 +155,7 @@ public class LazyOpenInterpreter
public void setClassloaderUrls(URL [] urls) {
intp.setClassloaderUrls(urls);
}
@Override
public void registerHook(String noteId, String event, String cmd) {
intp.registerHook(noteId, event, cmd);

View file

@ -481,19 +481,24 @@ public class RemoteInterpreterServer
try {
InterpreterContext.set(context);
InterpreterResult result = null;
// Open the interpreter instance prior to calling interpret().
// This is necessary because the earliest we can register a hook
// is from within the open() method.
LazyOpenInterpreter lazy = (LazyOpenInterpreter) interpreter;
if (!lazy.isOpen()) {
lazy.open();
result = lazy.executePrecode(context);
}
// Add hooks to script from registry.
// Global scope first, followed by notebook scope
processInterpreterHooks(null);
processInterpreterHooks(context.getNoteId());
InterpreterResult result = interpreter.interpret(script, context);
if (result == null || result.code() == Code.SUCCESS) {
// Add hooks to script from registry.
// Global scope first, followed by notebook scope
processInterpreterHooks(null);
processInterpreterHooks(context.getNoteId());
result = interpreter.interpret(script, context);
}
// data from context.out is prepended to InterpreterResult if both defined
context.out.flush();