ZEPPELIN-440 HiveInterpreter with multiple configuration

- Enable multiple connection properties
This commit is contained in:
Jongyoul Lee 2015-11-21 00:59:51 +09:00
parent f7deadbaf3
commit 97bfa65fd8
7 changed files with 299 additions and 103 deletions

View file

@ -15,14 +15,14 @@
# limitations under the License.
#
log4j.rootLogger = INFO, dailyfile
log4j.rootLogger = DEBUG, dailyfile
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
log4j.appender.dailyfile.DatePattern=.yyyy-MM-dd
log4j.appender.dailyfile.Threshold = INFO
log4j.appender.dailyfile.Threshold = DEBUG
log4j.appender.dailyfile = org.apache.log4j.DailyRollingFileAppender
log4j.appender.dailyfile.File = ${zeppelin.log.file}
log4j.appender.dailyfile.layout = org.apache.log4j.PatternLayout

View file

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -17,15 +17,10 @@
*/
package org.apache.zeppelin.hive;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
import java.sql.*;
import java.util.*;
import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@ -37,6 +32,8 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.String.format;
/**
* Hive interpreter for Zeppelin.
*/
@ -44,40 +41,95 @@ public class HiveInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(HiveInterpreter.class);
int commandTimeOut = 600000;
static final String HIVESERVER_URL = "hive.hiveserver2.url";
static final String HIVESERVER_USER = "hive.hiveserver2.user";
static final String HIVESERVER_PASSWORD = "hive.hiveserver2.password";
static final String DEFAULT_KEY = "default";
static final String DRIVER_KEY = "driver";
static final String URL_KEY = "url";
static final String USER_KEY = "user";
static final String PASSWORD_KEY = "password";
static final String DOT = ".";
static final char TSV_KEY = '\t';
static final String DEFAULT_DRIVER = DEFAULT_KEY + DOT + DRIVER_KEY;
static final String DEFAULT_URL = DEFAULT_KEY + DOT + URL_KEY;
static final String DEFAULT_USER = DEFAULT_KEY + DOT + USER_KEY;
static final String DEFAULT_PASSWORD = DEFAULT_KEY + DOT + PASSWORD_KEY;
private final HashMap<String, Properties> propertiesMap = new HashMap<>();
private final Map<String, Connection> keyConnectionMap = new HashMap<>();
private final Map<String, Statement> paragraphIdStatementMap = new HashMap<>();
static {
Interpreter.register(
"hql",
"hive",
HiveInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(HIVESERVER_URL, "jdbc:hive2://localhost:10000", "The URL for HiveServer2.")
.add(HIVESERVER_USER, "hive", "The hive user")
.add(HIVESERVER_PASSWORD, "", "The password for the hive user").build());
"hql",
"hive",
HiveInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(DEFAULT_DRIVER, "org.apache.hive.jdbc.HiveDriver", "Hive JDBC driver")
.add(DEFAULT_URL, "jdbc:hive2://localhost:10000", "The URL for HiveServer2.")
.add(DEFAULT_USER, "hive", "The hive user")
.add(DEFAULT_PASSWORD, "", "The password for the hive user").build());
}
public HiveInterpreter(Properties property) {
super(property);
}
Connection jdbcConnection;
Exception exceptionOnConnect;
public HashMap<String, Properties> getPropertiesMap() {
return propertiesMap;
}
// Connection jdbcConnection;
// Exception exceptionOnConnect;
//Test only method
public Connection getJdbcConnection()
/* public Connection getJdbcConnection()
throws SQLException {
String url = getProperty(HIVESERVER_URL);
String user = getProperty(HIVESERVER_USER);
String password = getProperty(HIVESERVER_PASSWORD);
String url = getProperty(DEFAULT_URL);
String user = getProperty(DEFAULT_USER);
String password = getProperty(DEFAULT_PASSWORD);
return DriverManager.getConnection(url, user, password);
}
}*/
@Override
public void open() {
logger.debug("property: {}", property);
for (String propertyKey : property.stringPropertyNames()) {
logger.debug("propertyKey: {}", propertyKey);
String[] keyValue = propertyKey.split("\\.", 2);
if (2 == keyValue.length) {
logger.debug("key: {}, value: {}", keyValue[0], keyValue[1]);
Properties prefixProperties;
if (propertiesMap.containsKey(keyValue[0])) {
prefixProperties = propertiesMap.get(keyValue[0]);
} else {
prefixProperties = new Properties();
propertiesMap.put(keyValue[0], prefixProperties);
}
prefixProperties.put(keyValue[1], property.getProperty(propertyKey));
}
}
Set<String> removeKeySet = new HashSet<>();
for (String key : propertiesMap.keySet()) {
Properties properties = propertiesMap.get(key);
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
logger.error("{} will be ignored. {}.driver and {}.uri is mandatory.", key, key, key);
removeKeySet.add(key);
}
}
for (String key : removeKeySet) {
propertiesMap.remove(key);
}
logger.debug("propertiesMap: {}", propertiesMap);
// old below
/*
logger.info("Jdbc open connection called!");
try {
String driverName = "org.apache.hive.jdbc.HiveDriver";
@ -95,12 +147,23 @@ public class HiveInterpreter extends Interpreter {
catch (SQLException e) {
logger.error("Cannot open connection", e);
exceptionOnConnect = e;
}
}*/
}
@Override
public void close() {
try {
for (Statement statement : paragraphIdStatementMap.values()) {
statement.close();
}
for (Connection connection : keyConnectionMap.values()) {
connection.close();
}
} catch (SQLException e) {
logger.error("Error while closing...", e);
}
/* try {
if (jdbcConnection != null) {
jdbcConnection.close();
}
@ -111,78 +174,133 @@ public class HiveInterpreter extends Interpreter {
finally {
jdbcConnection = null;
exceptionOnConnect = null;
}
}*/
}
Statement currentStatement;
private InterpreterResult executeSql(String sql) {
try {
if (exceptionOnConnect != null) {
return new InterpreterResult(Code.ERROR, exceptionOnConnect.getMessage());
private Connection getConnection(String propertyKey) throws ClassNotFoundException, SQLException {
Connection connection = null;
if (keyConnectionMap.containsKey(propertyKey)) {
connection = keyConnectionMap.get(propertyKey);
if (connection.isClosed() || connection.isValid(10)) {
connection.close();
connection = null;
keyConnectionMap.remove(propertyKey);
}
currentStatement = jdbcConnection.createStatement();
StringBuilder msg = null;
if (StringUtils.containsIgnoreCase(sql, "EXPLAIN ")) {
//return the explain as text, make this visual explain later
msg = new StringBuilder();
}
else {
msg = new StringBuilder("%table ");
}
ResultSet res = currentStatement.executeQuery(sql);
try {
ResultSetMetaData md = res.getMetaData();
for (int i = 1; i < md.getColumnCount() + 1; i++) {
if (i == 1) {
msg.append(md.getColumnName(i));
} else {
msg.append("\t" + md.getColumnName(i));
}
}
msg.append("\n");
while (res.next()) {
for (int i = 1; i < md.getColumnCount() + 1; i++) {
msg.append(res.getString(i) + "\t");
}
msg.append("\n");
}
}
finally {
try {
res.close();
currentStatement.close();
}
finally {
currentStatement = null;
}
}
if (null == connection) {
Properties properties = propertiesMap.get(propertyKey);
Class.forName(properties.getProperty(DRIVER_KEY));
String url = properties.getProperty(URL_KEY);
String user = properties.getProperty(USER_KEY);
String password = properties.getProperty(PASSWORD_KEY);
if (null != user && null != password) {
connection = DriverManager.getConnection(url, user, password);
} else {
connection = DriverManager.getConnection(url, properties);
}
keyConnectionMap.put(propertyKey, connection);
}
return connection;
}
InterpreterResult rett = new InterpreterResult(Code.SUCCESS, msg.toString());
return rett;
private Statement getStatement(String propertyKey, String paragraphId)
throws SQLException, ClassNotFoundException {
Statement statement = null;
if (paragraphIdStatementMap.containsKey(paragraphId)) {
statement = paragraphIdStatementMap.get(paragraphId);
if (statement.isClosed()) {
statement = null;
paragraphIdStatementMap.remove(paragraphId);
}
}
catch (SQLException ex) {
logger.error("Can not run " + sql, ex);
return new InterpreterResult(Code.ERROR, ex.getMessage());
if (null == statement) {
statement = getConnection(propertyKey).createStatement();
paragraphIdStatementMap.put(paragraphId, statement);
}
return statement;
}
private ResultSet executeSql(String propertyKey,
String sql,
InterpreterContext interpreterContext)
throws SQLException, ClassNotFoundException {
String paragraphId = interpreterContext.getParagraphId();
Statement statement = getStatement(propertyKey, paragraphId);
ResultSet resultSet = statement.executeQuery(sql);
return resultSet;
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.info("Run SQL command '" + cmd + "'");
return executeSql(cmd);
String propertyKey = getPropertyKey(cmd);
if (null != propertyKey) {
cmd = cmd.substring(propertyKey.length() + 2);
} else {
propertyKey = DEFAULT_KEY;
}
cmd = cmd.trim();
logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
try {
ResultSet resultSet = executeSql(propertyKey, cmd, contextInterpreter);
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
StringBuilder sb = new StringBuilder();
if (!StringUtils.containsIgnoreCase(cmd, "explain")) {
sb.append("%table");
}
int columnCount = resultSetMetaData.getColumnCount();
ArrayList<String> fields = new ArrayList<>();
for (int i = 0; i < columnCount; i++) {
fields.add(resultSetMetaData.getColumnName(i + 1));
}
sb.append(Joiner.on(TSV_KEY).join(fields));
sb.append("\n");
while (resultSet.next()) {
fields.clear();
for (int i = 0; i < columnCount; i++) {
fields.add(resultSet.getString(i + 1));
}
sb.append(Joiner.on(TSV_KEY).join(fields));
sb.append("\n");
}
return new InterpreterResult(Code.SUCCESS, sb.toString());
} catch (ClassNotFoundException | SQLException e) {
return new InterpreterResult(Code.ERROR,
format("%s\n%s", e.getClass().getName(), e.getMessage()));
}
}
public String getPropertyKey(String cmd) {
int firstLineIndex = cmd.indexOf("\n");
if (-1 == firstLineIndex) {
firstLineIndex = cmd.length();
}
int configStartIndex = cmd.indexOf("(");
int configLastIndex = cmd.indexOf(")");
if (configStartIndex != -1 && configLastIndex != -1
&& configLastIndex < firstLineIndex && configLastIndex < firstLineIndex) {
return cmd.substring(configStartIndex + 1, configLastIndex);
}
return null;
}
@Override
public void cancel(InterpreterContext context) {
if (currentStatement != null) {
try {
currentStatement.cancel();
}
catch (SQLException ex) {
}
finally {
currentStatement = null;
}
String paragraphId = context.getParagraphId();
try {
paragraphIdStatementMap.get(paragraphId).cancel();
} catch (SQLException e) {
logger.error("Error while cancelling...", e);
}
}
@ -198,8 +316,8 @@ public class HiveInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
HiveInterpreter.class.getName() + this.hashCode());
return SchedulerFactory.singleton().createOrGetParallelScheduler(
HiveInterpreter.class.getName() + this.hashCode(), 10);
}
@Override

View file

@ -17,24 +17,26 @@
*/
package org.apache.zeppelin.hive;
import static org.junit.Assert.assertEquals;
import java.io.InputStream;
import java.io.Reader;
import java.math.BigDecimal;
import java.net.URL;
import java.sql.*;
import java.util.Calendar;
import java.util.Map;
import java.util.Properties;
import java.sql.Date;
import java.util.*;
import java.util.concurrent.Executor;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Hive interpreter unit tests
*/
@ -61,6 +63,55 @@ public class HiveInterpreterTest {
assertEquals(result.type(), InterpreterResult.Type.TEXT);
t.close();
}
@Test
public void parseMultiplePropertiesMap() {
Properties properties = new Properties();
properties.setProperty("default.driver", "defaultDriver");
properties.setProperty("default.url", "defaultUri");
properties.setProperty("default.user", "defaultUser");
HiveInterpreter hi = new HiveInterpreter(properties);
assertNotNull("propertiesMap is not null", hi.getPropertiesMap());
assertNotNull("propertiesMap.get(default) is not null", hi.getPropertiesMap().get("default"));
assertTrue("default exists", "defaultDriver".equals(hi.getPropertiesMap().get("default").getProperty("driver")));
}
@Test
public void ignoreInvalidSettings() {
Properties properties = new Properties();
properties.setProperty("default.driver", "defaultDriver");
properties.setProperty("default.url", "defaultUri");
properties.setProperty("default.user", "defaultUser");
properties.setProperty("presto.driver", "com.facebook.presto.jdbc.PrestoDriver");
HiveInterpreter hi = new HiveInterpreter(properties);
assertTrue("default exists", hi.getPropertiesMap().containsKey("default"));
assertFalse("presto doesn't exists", hi.getPropertiesMap().containsKey("presto"));
}
@Test
public void getPropertyKey() {
HiveInterpreter hi = new HiveInterpreter(new Properties());
String testCommand = "(default)\nshow tables";
assertEquals("get key of default", "default", hi.getPropertyKey(testCommand));
testCommand = "(default) show tables";
assertEquals("get key of default", "default", hi.getPropertyKey(testCommand));
}
@Test
public void prestoTest() {
InterpreterContext interpreterContext = new InterpreterContext("", "a", "", "", new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry("", null), new ArrayList<InterpreterContextRunner>());
Properties properties = new Properties();
properties.setProperty("default.driver", "defaultDriver");
properties.setProperty("default.url", "defaultUri");
properties.setProperty("default.user", "defaultUser");
properties.setProperty("presto.driver", "com.facebook.presto.jdbc.PrestoDriver");
properties.setProperty("presto.url", "jdbc:presto://10.10.36.191:8080/hive");
HiveInterpreter hi = new HiveInterpreter(properties);
hi.open();
InterpreterResult interpreterResult = hi.interpret("(presto)\nshow catalogs", interpreterContext);
System.out.println(interpreterResult.message());
}
}
class MockHiveInterpreter extends HiveInterpreter {
@ -69,11 +120,6 @@ class MockHiveInterpreter extends HiveInterpreter {
super(property);
}
@Override
public Connection getJdbcConnection()
throws SQLException {
return new MockConnection();
}
}
class MockResultSetMetadata implements ResultSetMetaData {

View file

@ -193,6 +193,7 @@ public class RemoteInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
logger.debug("st: {}", st);
FormType form = getFormType();
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
Client client = null;

View file

@ -203,6 +203,7 @@ public class RemoteInterpreterServer
@Override
public RemoteInterpreterResult interpret(String className, String st,
RemoteInterpreterContext interpreterContext) throws TException {
logger.debug("st: {}", st);
Interpreter intp = getInterpreter(className);
InterpreterContext context = convert(interpreterContext);

View file

@ -103,7 +103,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
int scriptHeadIndex = 0;
for (int i = 0; i < text.length(); i++) {
char ch = text.charAt(i);
if (ch == ' ' || ch == '\n') {
if (ch == ' ' || ch == '\n' || ch == '(') {
scriptHeadIndex = i;
break;
}
@ -132,10 +132,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
if (magic == null) {
return text;
}
if (magic.length() + 2 >= text.length()) {
if (magic.length() + 1 >= text.length()) {
return "";
}
return text.substring(magic.length() + 2);
return text.substring(magic.length() + 1);
}
public NoteInterpreterLoader getNoteReplLoader() {

View file

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class ParagraphTest {
@Test
public void scriptBody() {
String text = "%spark(1234567";
assertEquals("(1234567", Paragraph.getScriptBody(text));
}
}