mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
[ZEPPELIN-1988] add property "precode" to JDBCInterpreter
This commit is contained in:
parent
ff7c8d2ac9
commit
ba3477a6af
5 changed files with 96 additions and 12 deletions
|
|
@ -167,6 +167,10 @@ There are more JDBC interpreter properties you can specify like below.
|
|||
<td>default.jceks.credentialKey</td>
|
||||
<td>jceks credential key</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>zeppelin.interpreter.precode</td>
|
||||
<td>Some SQL which executes while opening connection</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
You can also add more properties by using this [method](http://docs.oracle.com/javase/7/docs/api/java/sql/DriverManager.html#getConnection%28java.lang.String,%20java.util.Properties%29).
|
||||
|
|
|
|||
|
|
@ -14,14 +14,7 @@
|
|||
*/
|
||||
package org.apache.zeppelin.jdbc;
|
||||
|
||||
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
|
||||
import static org.apache.commons.lang.StringUtils.isEmpty;
|
||||
import static org.apache.commons.lang.StringUtils.isNotEmpty;
|
||||
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
|
|
@ -37,11 +30,11 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import org.apache.commons.dbcp2.ConnectionFactory;
|
||||
import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
|
||||
import org.apache.commons.dbcp2.PoolableConnectionFactory;
|
||||
import org.apache.commons.dbcp2.PoolingDriver;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.pool2.ObjectPool;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPool;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
|
@ -49,7 +42,10 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.security.alias.CredentialProvider;
|
||||
import org.apache.hadoop.security.alias.CredentialProviderFactory;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
|
||||
|
|
@ -61,9 +57,14 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.Sets.SetView;
|
||||
|
||||
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
|
||||
import static org.apache.commons.lang.StringUtils.isEmpty;
|
||||
import static org.apache.commons.lang.StringUtils.isNotEmpty;
|
||||
import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
|
||||
import static org.apache.zeppelin.interpreter.Constants.ZEPPELIN_PRECODE_PROPERTY_KEY;
|
||||
|
||||
/**
|
||||
* JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ,
|
||||
|
|
@ -340,6 +341,7 @@ public class JDBCInterpreter extends Interpreter {
|
|||
|
||||
if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) {
|
||||
createConnectionPool(url, user, propertyKey, properties);
|
||||
executePreCode(DriverManager.getConnection(jdbcDriver));
|
||||
}
|
||||
return DriverManager.getConnection(jdbcDriver);
|
||||
}
|
||||
|
|
@ -540,6 +542,33 @@ public class JDBCInterpreter extends Interpreter {
|
|||
return queries;
|
||||
}
|
||||
|
||||
private void executePreCode(Connection connection) {
|
||||
String precode = getProperty(ZEPPELIN_PRECODE_PROPERTY_KEY);
|
||||
if (StringUtils.isNotEmpty(precode)) {
|
||||
logger.info("Run SQL precode '{}'", precode);
|
||||
try {
|
||||
Statement statement = connection.createStatement();
|
||||
statement.execute(precode);
|
||||
if (!connection.getAutoCommit()) {
|
||||
connection.commit();
|
||||
}
|
||||
if (statement != null) {
|
||||
statement.close();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
logger.error("Cannot create precode statement", e);
|
||||
} finally {
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (SQLException e) {
|
||||
logger.error("Cannot close connection of precode", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private InterpreterResult executeSql(String propertyKey, String sql,
|
||||
InterpreterContext interpreterContext) {
|
||||
Connection connection;
|
||||
|
|
@ -761,4 +790,3 @@ public class JDBCInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -63,6 +63,12 @@
|
|||
"propertyName": "zeppelin.jdbc.principal",
|
||||
"defaultValue": "",
|
||||
"description": "Kerberos principal"
|
||||
},
|
||||
"zeppelin.interpreter.precode": {
|
||||
"envName": null,
|
||||
"propertyName": "zeppelin.interpreter.precode",
|
||||
"defaultValue": "",
|
||||
"description": "SQL which executes while opening connection"
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Constants;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
|
|
@ -43,6 +44,9 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
|
||||
|
||||
import static org.apache.zeppelin.interpreter.Constants.ZEPPELIN_PRECODE_PROPERTY_KEY;
|
||||
|
||||
/**
|
||||
* JDBC interpreter unit tests
|
||||
*/
|
||||
|
|
@ -386,4 +390,44 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
assertNull(user2JDBC2Conf.getPropertyMap("default").get("password"));
|
||||
jdbc2.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPrecode() throws SQLException, IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
properties.setProperty("default.user", "");
|
||||
properties.setProperty("default.password", "");
|
||||
properties.setProperty(ZEPPELIN_PRECODE_PROPERTY_KEY, "SET @testVariable=1");
|
||||
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
|
||||
jdbcInterpreter.open();
|
||||
|
||||
String sqlQuery = "select @testVariable";
|
||||
|
||||
InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);
|
||||
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
|
||||
assertEquals("@TESTVARIABLE\n1\n", interpreterResult.message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncorrectPrecode() throws SQLException, IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
properties.setProperty("default.user", "");
|
||||
properties.setProperty("default.password", "");
|
||||
properties.setProperty(ZEPPELIN_PRECODE_PROPERTY_KEY, "incorrect command");
|
||||
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(properties);
|
||||
jdbcInterpreter.open();
|
||||
|
||||
String sqlQuery = "select 1";
|
||||
|
||||
InterpreterResult interpreterResult = jdbcInterpreter.interpret(sqlQuery, interpreterContext);
|
||||
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
|
||||
assertEquals("1\n1\n", interpreterResult.message().get(0).getData());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,8 @@ public class Constants {
|
|||
|
||||
public static final String ZEPPELIN_INTERPRETER_HOST = "zeppelin.interpreter.host";
|
||||
|
||||
public static final String ZEPPELIN_PRECODE_PROPERTY_KEY = "zeppelin.interpreter.precode";
|
||||
|
||||
public static final String EXISTING_PROCESS = "existing_process";
|
||||
|
||||
public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914;
|
||||
|
|
|
|||
Loading…
Reference in a new issue