[ZEPPELIN-1988] add property "precode" to JDBCInterpreter

This commit is contained in:
Tinkoff DWH 2017-02-27 14:41:58 +05:00
parent ff7c8d2ac9
commit ba3477a6af
5 changed files with 96 additions and 12 deletions

View file

@ -167,6 +167,10 @@ There are more JDBC interpreter properties you can specify like below.
<td>default.jceks.credentialKey</td>
<td>jceks credential key</td>
</tr>
<tr>
<td>zeppelin.interpreter.precode</td>
<td>Some SQL which executes while opening connection</td>
</tr>
</table>
You can also add more properties by using this [method](http://docs.oracle.com/javase/7/docs/api/java/sql/DriverManager.html#getConnection%28java.lang.String,%20java.util.Properties%29).

View file

@ -14,14 +14,7 @@
*/
package org.apache.zeppelin.jdbc;
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
import static org.apache.commons.lang.StringUtils.isEmpty;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.DriverManager;
@ -37,11 +30,11 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import com.google.common.base.Throwables;
import org.apache.commons.dbcp2.ConnectionFactory;
import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
import org.apache.commons.dbcp2.PoolableConnectionFactory;
import org.apache.commons.dbcp2.PoolingDriver;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.conf.Configuration;
@ -49,7 +42,10 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
@ -61,9 +57,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
import static org.apache.commons.lang.StringUtils.isEmpty;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
import static org.apache.zeppelin.interpreter.Constants.ZEPPELIN_PRECODE_PROPERTY_KEY;
/**
* JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ,
@ -340,6 +341,7 @@ public class JDBCInterpreter extends Interpreter {
if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) {
createConnectionPool(url, user, propertyKey, properties);
executePreCode(DriverManager.getConnection(jdbcDriver));
}
return DriverManager.getConnection(jdbcDriver);
}
@ -540,6 +542,33 @@ public class JDBCInterpreter extends Interpreter {
return queries;
}
private void executePreCode(Connection connection) {
String precode = getProperty(ZEPPELIN_PRECODE_PROPERTY_KEY);
if (StringUtils.isNotEmpty(precode)) {
logger.info("Run SQL precode '{}'", precode);
try {
Statement statement = connection.createStatement();
statement.execute(precode);
if (!connection.getAutoCommit()) {
connection.commit();
}
if (statement != null) {
statement.close();
}
} catch (SQLException e) {
logger.error("Cannot create precode statement", e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error("Cannot close connection of precode", e);
}
}
}
}
}
private InterpreterResult executeSql(String propertyKey, String sql,
InterpreterContext interpreterContext) {
Connection connection;
@ -761,4 +790,3 @@ public class JDBCInterpreter extends Interpreter {
}
}
}

View file

@ -63,6 +63,12 @@
"propertyName": "zeppelin.jdbc.principal",
"defaultValue": "",
"description": "Kerberos principal"
},
"zeppelin.interpreter.precode": {
"envName": null,
"propertyName": "zeppelin.interpreter.precode",
"defaultValue": "",
"description": "SQL which executes while opening connection"
}
},
"editor": {

View file

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -43,6 +44,9 @@ import org.junit.Before;
import org.junit.Test;
import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
import static org.apache.zeppelin.interpreter.Constants.ZEPPELIN_PRECODE_PROPERTY_KEY;
/**
* JDBC interpreter unit tests
*/
@ -386,4 +390,44 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
assertNull(user2JDBC2Conf.getPropertyMap("default").get("password"));
jdbc2.close();
}
@Test
public void testPrecode() throws SQLException, IOException {
Properties properties = new Properties();
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
properties.setProperty(ZEPPELIN_PRECODE_PROPERTY_KEY, "SET @testVariable=1");
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
jdbcInterpreter.open();
String sqlQuery = "select @testVariable";
InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("@TESTVARIABLE\n1\n", interpreterResult.message().get(0).getData());
}
@Test
public void testIncorrectPrecode() throws SQLException, IOException {
Properties properties = new Properties();
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
properties.setProperty(ZEPPELIN_PRECODE_PROPERTY_KEY, "incorrect command");
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
jdbcInterpreter.open();
String sqlQuery = "select 1";
InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("1\n1\n", interpreterResult.message().get(0).getData());
}
}

View file

@ -26,6 +26,8 @@ public class Constants {
public static final String ZEPPELIN_INTERPRETER_HOST = "zeppelin.interpreter.host";
public static final String ZEPPELIN_PRECODE_PROPERTY_KEY = "zeppelin.interpreter.precode";
public static final String EXISTING_PROCESS = "existing_process";
public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914;