ZEPPELIN-440 HiveInterpreter with multiple configuration

- Revised executeSql from Postgresql.executeSql
This commit is contained in:
Jongyoul Lee 2015-11-25 13:47:21 +09:00
parent f2b61c0aa7
commit dcb65ae1e6
3 changed files with 192 additions and 54 deletions

View file

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -23,7 +23,6 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -31,8 +30,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import com.google.common.base.Joiner;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
@ -43,7 +40,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.lang.String.format;
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
/**
* Hive interpreter for Zeppelin.
@ -51,6 +48,12 @@ import static java.lang.String.format;
public class HiveInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(HiveInterpreter.class);
static final String COMMON_KEY = "common";
static final String MAX_LINE_KEY = "max_count";
static final String MAX_LINE_DEFAULT = "1000";
static final String MAX_RETRY_KEY = "max_retry";
static final String MAX_RETRY_DEFAULT = "3";
static final String DEFAULT_KEY = "default";
static final String DRIVER_KEY = "driver";
static final String URL_KEY = "url";
@ -58,8 +61,14 @@ public class HiveInterpreter extends Interpreter {
static final String PASSWORD_KEY = "password";
static final String DOT = ".";
static final char TSV = '\t';
static final char LINEFEED = '\n';
static final char TAB = '\t';
static final char NEWLINE = '\n';
static final String EXPLAIN_PREDICATE = "EXPLAIN ";
static final String TABLE_MAGIC_TAG = "%table ";
static final String UPDATE_COUNT_HEADER = "Update Count";
static final String COMMON_MAX_LINE = COMMON_KEY + DOT + MAX_LINE_KEY;
static final String COMMON_MAX_RETRY = COMMON_KEY + DOT + MAX_RETRY_KEY;
static final String DEFAULT_DRIVER = DEFAULT_KEY + DOT + DRIVER_KEY;
static final String DEFAULT_URL = DEFAULT_KEY + DOT + URL_KEY;
@ -76,6 +85,8 @@ public class HiveInterpreter extends Interpreter {
"hive",
HiveInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(COMMON_MAX_LINE, MAX_LINE_DEFAULT, "Maximum line of results")
.add(COMMON_MAX_RETRY, MAX_RETRY_DEFAULT, "Maximum number of retry while error")
.add(DEFAULT_DRIVER, "org.apache.hive.jdbc.HiveDriver", "Hive JDBC driver")
.add(DEFAULT_URL, "jdbc:hive2://localhost:10000", "The URL for HiveServer2.")
.add(DEFAULT_USER, "hive", "The hive user")
@ -115,11 +126,13 @@ public class HiveInterpreter extends Interpreter {
Set<String> removeKeySet = new HashSet<>();
for (String key : propertiesMap.keySet()) {
Properties properties = propertiesMap.get(key);
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
key, DRIVER_KEY, key, key, URL_KEY);
removeKeySet.add(key);
if (!COMMON_KEY.equals(key)) {
Properties properties = propertiesMap.get(key);
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
key, DRIVER_KEY, key, key, URL_KEY);
removeKeySet.add(key);
}
}
}
@ -136,10 +149,12 @@ public class HiveInterpreter extends Interpreter {
for (Statement statement : paragraphIdStatementMap.values()) {
statement.close();
}
paragraphIdStatementMap.clear();
for (Connection connection : keyConnectionMap.values()) {
connection.close();
}
keyConnectionMap.clear();
} catch (SQLException e) {
logger.error("Error while closing...", e);
}
@ -149,7 +164,7 @@ public class HiveInterpreter extends Interpreter {
Connection connection = null;
if (keyConnectionMap.containsKey(propertyKey)) {
connection = keyConnectionMap.get(propertyKey);
if (connection.isClosed() || connection.isValid(10)) {
if (connection.isClosed() || !connection.isValid(10)) {
connection.close();
connection = null;
keyConnectionMap.remove(propertyKey);
@ -188,15 +203,81 @@ public class HiveInterpreter extends Interpreter {
return statement;
}
public ResultSet executeSql(String propertyKey,
String sql,
InterpreterContext interpreterContext)
throws SQLException, ClassNotFoundException {
public InterpreterResult executeSql(String propertyKey,
String sql,
InterpreterContext interpreterContext) {
String paragraphId = interpreterContext.getParagraphId();
Statement statement = getStatement(propertyKey, paragraphId);
ResultSet resultSet = statement.executeQuery(sql);
return resultSet;
try {
Statement statement = getStatement(propertyKey, paragraphId);
statement.setMaxRows(getMaxResult());
StringBuilder msg = null;
if (containsIgnoreCase(sql, EXPLAIN_PREDICATE)) {
msg = new StringBuilder();
} else {
msg = new StringBuilder(TABLE_MAGIC_TAG);
}
ResultSet resultSet = null;
try {
boolean isResultSetAvailable = statement.execute(sql);
if (isResultSetAvailable) {
resultSet = statement.getResultSet();
ResultSetMetaData md = resultSet.getMetaData();
for (int i = 1; i < md.getColumnCount() + 1; i++) {
if (i > 1) {
msg.append(TAB);
}
msg.append(md.getColumnName(i));
}
msg.append(NEWLINE);
int displayRowCount = 0;
while (resultSet.next() && displayRowCount < getMaxResult()) {
for (int i = 1; i < md.getColumnCount() + 1; i++) {
msg.append(resultSet.getString(i));
if (i != md.getColumnCount()) {
msg.append(TAB);
}
}
msg.append(NEWLINE);
displayRowCount++;
}
} else {
// Response contains either an update count or there are no results.
int updateCount = statement.getUpdateCount();
msg.append(UPDATE_COUNT_HEADER).append(NEWLINE);
msg.append(updateCount).append(NEWLINE);
}
} finally {
try {
if (resultSet != null) {
resultSet.close();
}
statement.close();
} finally {
removeStatement(paragraphId);
}
}
return new InterpreterResult(Code.SUCCESS, msg.toString());
} catch (SQLException | ClassNotFoundException ex) {
logger.error("Cannot run " + sql, ex);
return new InterpreterResult(Code.ERROR, ex.getMessage());
}
}
private void removeStatement(String paragraphId) {
paragraphIdStatementMap.remove(paragraphId);
}
@Override
@ -213,39 +294,17 @@ public class HiveInterpreter extends Interpreter {
logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
try {
ResultSet resultSet = executeSql(propertyKey, cmd, contextInterpreter);
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
return executeSql(propertyKey, cmd, contextInterpreter);
}
StringBuilder sb = new StringBuilder();
private int getMaxResult() {
return Integer.valueOf(
propertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT));
}
if (!StringUtils.containsIgnoreCase(cmd, "explain")) {
sb.append("%table");
}
int columnCount = resultSetMetaData.getColumnCount();
ArrayList<String> fields = new ArrayList<>();
for (int i = 0; i < columnCount; i++) {
fields.add(resultSetMetaData.getColumnName(i + 1));
}
sb.append(Joiner.on(TSV).join(fields));
sb.append(LINEFEED);
while (resultSet.next()) {
fields.clear();
for (int i = 0; i < columnCount; i++) {
fields.add(resultSet.getString(i + 1));
}
sb.append(Joiner.on(TSV).join(fields));
sb.append(LINEFEED);
}
return new InterpreterResult(Code.SUCCESS, sb.toString());
} catch (ClassNotFoundException | SQLException e) {
return new InterpreterResult(Code.ERROR,
format("%s%c%s", e.getClass().getName(), LINEFEED, e.getMessage()));
}
private int getMaxRetry() {
return Integer.valueOf(
propertiesMap.get(COMMON_KEY).getProperty(MAX_RETRY_KEY, MAX_RETRY_DEFAULT));
}
public String getPropertyKey(String cmd) {

View file

@ -34,6 +34,8 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.*;
import static java.lang.String.format;
@ -70,9 +72,73 @@ public class HiveInterpreterTest {
public void tearDown() throws Exception {
}
@Test
public void readTest() throws IOException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
HiveInterpreter t = new HiveInterpreter(properties);
t.open();
assertEquals("SCHEMA_NAME\nINFORMATION_SCHEMA\nPUBLIC\n",
t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message());
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
}
@Test
public void readTestWithConfiguration() throws IOException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
properties.setProperty("default.driver", "wrong.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
properties.setProperty("h2.driver", "org.h2.Driver");
properties.setProperty("h2.url", getJdbcConnection());
properties.setProperty("h2.user", "");
properties.setProperty("h2.password", "");
HiveInterpreter t = new HiveInterpreter(properties);
t.open();
assertEquals("SCHEMA_NAME\nINFORMATION_SCHEMA\nPUBLIC\n",
t.interpret("(h2) show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message());
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
}
@Test
public void jdbcRestart() throws IOException, SQLException, ClassNotFoundException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
HiveInterpreter t = new HiveInterpreter(properties);
t.open();
assertEquals("SCHEMA_NAME\nINFORMATION_SCHEMA\nPUBLIC\n",
t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message());
t.getConnection("default").close();
InterpreterResult interpreterResult =
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
}
@Test
public void test() throws IOException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
@ -95,6 +161,8 @@ public class HiveInterpreterTest {
@Test
public void parseMultiplePropertiesMap() {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
properties.setProperty("default.driver", "defaultDriver");
properties.setProperty("default.url", "defaultUri");
properties.setProperty("default.user", "defaultUser");
@ -109,6 +177,8 @@ public class HiveInterpreterTest {
@Test
public void ignoreInvalidSettings() {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
properties.setProperty("default.driver", "defaultDriver");
properties.setProperty("default.url", "defaultUri");
properties.setProperty("default.user", "defaultUser");
@ -123,6 +193,7 @@ public class HiveInterpreterTest {
@Test
public void getPropertyKey() {
HiveInterpreter hi = new HiveInterpreter(new Properties());
hi.open();
String testCommand = "(default)\nshow tables";
assertEquals("get key of default", "default", hi.getPropertyKey(testCommand));
testCommand = "(default) show tables";

View file

@ -23,8 +23,16 @@ import static org.junit.Assert.assertEquals;
public class ParagraphTest {
@Test
public void scriptBody() {
public void scriptBodyWithReplName() {
String text = "%spark(1234567";
assertEquals("(1234567", Paragraph.getScriptBody(text));
text = "%table 1234567";
assertEquals("1234567", Paragraph.getScriptBody(text));
}
@Test
public void scriptBodyWithoutReplName() {
String text = "12345678";
assertEquals(text, Paragraph.getScriptBody(text));
}
}