mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-440 HiveInterpreter with multiple configuration
- Revised executeSql from Postgresql.executeSql
This commit is contained in:
parent
f2b61c0aa7
commit
dcb65ae1e6
3 changed files with 192 additions and 54 deletions
|
|
@ -6,9 +6,9 @@
|
|||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* <p/>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p/>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
|
@ -23,7 +23,6 @@ import java.sql.ResultSet;
|
|||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
|
@ -31,8 +30,6 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
|
|
@ -43,7 +40,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static java.lang.String.format;
|
||||
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
|
||||
|
||||
/**
|
||||
* Hive interpreter for Zeppelin.
|
||||
|
|
@ -51,6 +48,12 @@ import static java.lang.String.format;
|
|||
public class HiveInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(HiveInterpreter.class);
|
||||
|
||||
static final String COMMON_KEY = "common";
|
||||
static final String MAX_LINE_KEY = "max_count";
|
||||
static final String MAX_LINE_DEFAULT = "1000";
|
||||
static final String MAX_RETRY_KEY = "max_retry";
|
||||
static final String MAX_RETRY_DEFAULT = "3";
|
||||
|
||||
static final String DEFAULT_KEY = "default";
|
||||
static final String DRIVER_KEY = "driver";
|
||||
static final String URL_KEY = "url";
|
||||
|
|
@ -58,8 +61,14 @@ public class HiveInterpreter extends Interpreter {
|
|||
static final String PASSWORD_KEY = "password";
|
||||
static final String DOT = ".";
|
||||
|
||||
static final char TSV = '\t';
|
||||
static final char LINEFEED = '\n';
|
||||
static final char TAB = '\t';
|
||||
static final char NEWLINE = '\n';
|
||||
static final String EXPLAIN_PREDICATE = "EXPLAIN ";
|
||||
static final String TABLE_MAGIC_TAG = "%table ";
|
||||
static final String UPDATE_COUNT_HEADER = "Update Count";
|
||||
|
||||
static final String COMMON_MAX_LINE = COMMON_KEY + DOT + MAX_LINE_KEY;
|
||||
static final String COMMON_MAX_RETRY = COMMON_KEY + DOT + MAX_RETRY_KEY;
|
||||
|
||||
static final String DEFAULT_DRIVER = DEFAULT_KEY + DOT + DRIVER_KEY;
|
||||
static final String DEFAULT_URL = DEFAULT_KEY + DOT + URL_KEY;
|
||||
|
|
@ -76,6 +85,8 @@ public class HiveInterpreter extends Interpreter {
|
|||
"hive",
|
||||
HiveInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add(COMMON_MAX_LINE, MAX_LINE_DEFAULT, "Maximum line of results")
|
||||
.add(COMMON_MAX_RETRY, MAX_RETRY_DEFAULT, "Maximum number of retry while error")
|
||||
.add(DEFAULT_DRIVER, "org.apache.hive.jdbc.HiveDriver", "Hive JDBC driver")
|
||||
.add(DEFAULT_URL, "jdbc:hive2://localhost:10000", "The URL for HiveServer2.")
|
||||
.add(DEFAULT_USER, "hive", "The hive user")
|
||||
|
|
@ -115,11 +126,13 @@ public class HiveInterpreter extends Interpreter {
|
|||
|
||||
Set<String> removeKeySet = new HashSet<>();
|
||||
for (String key : propertiesMap.keySet()) {
|
||||
Properties properties = propertiesMap.get(key);
|
||||
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
|
||||
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
|
||||
key, DRIVER_KEY, key, key, URL_KEY);
|
||||
removeKeySet.add(key);
|
||||
if (!COMMON_KEY.equals(key)) {
|
||||
Properties properties = propertiesMap.get(key);
|
||||
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
|
||||
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
|
||||
key, DRIVER_KEY, key, key, URL_KEY);
|
||||
removeKeySet.add(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -136,10 +149,12 @@ public class HiveInterpreter extends Interpreter {
|
|||
for (Statement statement : paragraphIdStatementMap.values()) {
|
||||
statement.close();
|
||||
}
|
||||
paragraphIdStatementMap.clear();
|
||||
|
||||
for (Connection connection : keyConnectionMap.values()) {
|
||||
connection.close();
|
||||
}
|
||||
keyConnectionMap.clear();
|
||||
} catch (SQLException e) {
|
||||
logger.error("Error while closing...", e);
|
||||
}
|
||||
|
|
@ -149,7 +164,7 @@ public class HiveInterpreter extends Interpreter {
|
|||
Connection connection = null;
|
||||
if (keyConnectionMap.containsKey(propertyKey)) {
|
||||
connection = keyConnectionMap.get(propertyKey);
|
||||
if (connection.isClosed() || connection.isValid(10)) {
|
||||
if (connection.isClosed() || !connection.isValid(10)) {
|
||||
connection.close();
|
||||
connection = null;
|
||||
keyConnectionMap.remove(propertyKey);
|
||||
|
|
@ -188,15 +203,81 @@ public class HiveInterpreter extends Interpreter {
|
|||
return statement;
|
||||
}
|
||||
|
||||
public ResultSet executeSql(String propertyKey,
|
||||
String sql,
|
||||
InterpreterContext interpreterContext)
|
||||
throws SQLException, ClassNotFoundException {
|
||||
public InterpreterResult executeSql(String propertyKey,
|
||||
String sql,
|
||||
InterpreterContext interpreterContext) {
|
||||
String paragraphId = interpreterContext.getParagraphId();
|
||||
|
||||
Statement statement = getStatement(propertyKey, paragraphId);
|
||||
ResultSet resultSet = statement.executeQuery(sql);
|
||||
return resultSet;
|
||||
try {
|
||||
|
||||
Statement statement = getStatement(propertyKey, paragraphId);
|
||||
|
||||
statement.setMaxRows(getMaxResult());
|
||||
|
||||
StringBuilder msg = null;
|
||||
|
||||
if (containsIgnoreCase(sql, EXPLAIN_PREDICATE)) {
|
||||
msg = new StringBuilder();
|
||||
} else {
|
||||
msg = new StringBuilder(TABLE_MAGIC_TAG);
|
||||
}
|
||||
|
||||
ResultSet resultSet = null;
|
||||
|
||||
try {
|
||||
boolean isResultSetAvailable = statement.execute(sql);
|
||||
|
||||
if (isResultSetAvailable) {
|
||||
resultSet = statement.getResultSet();
|
||||
|
||||
ResultSetMetaData md = resultSet.getMetaData();
|
||||
|
||||
for (int i = 1; i < md.getColumnCount() + 1; i++) {
|
||||
if (i > 1) {
|
||||
msg.append(TAB);
|
||||
}
|
||||
msg.append(md.getColumnName(i));
|
||||
}
|
||||
msg.append(NEWLINE);
|
||||
|
||||
int displayRowCount = 0;
|
||||
while (resultSet.next() && displayRowCount < getMaxResult()) {
|
||||
for (int i = 1; i < md.getColumnCount() + 1; i++) {
|
||||
msg.append(resultSet.getString(i));
|
||||
if (i != md.getColumnCount()) {
|
||||
msg.append(TAB);
|
||||
}
|
||||
}
|
||||
msg.append(NEWLINE);
|
||||
displayRowCount++;
|
||||
}
|
||||
} else {
|
||||
// Response contains either an update count or there are no results.
|
||||
int updateCount = statement.getUpdateCount();
|
||||
msg.append(UPDATE_COUNT_HEADER).append(NEWLINE);
|
||||
msg.append(updateCount).append(NEWLINE);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if (resultSet != null) {
|
||||
resultSet.close();
|
||||
}
|
||||
statement.close();
|
||||
} finally {
|
||||
removeStatement(paragraphId);
|
||||
}
|
||||
}
|
||||
|
||||
return new InterpreterResult(Code.SUCCESS, msg.toString());
|
||||
|
||||
} catch (SQLException | ClassNotFoundException ex) {
|
||||
logger.error("Cannot run " + sql, ex);
|
||||
return new InterpreterResult(Code.ERROR, ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private void removeStatement(String paragraphId) {
|
||||
paragraphIdStatementMap.remove(paragraphId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -213,39 +294,17 @@ public class HiveInterpreter extends Interpreter {
|
|||
|
||||
logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
|
||||
|
||||
try {
|
||||
ResultSet resultSet = executeSql(propertyKey, cmd, contextInterpreter);
|
||||
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
|
||||
return executeSql(propertyKey, cmd, contextInterpreter);
|
||||
}
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
private int getMaxResult() {
|
||||
return Integer.valueOf(
|
||||
propertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT));
|
||||
}
|
||||
|
||||
if (!StringUtils.containsIgnoreCase(cmd, "explain")) {
|
||||
sb.append("%table");
|
||||
}
|
||||
int columnCount = resultSetMetaData.getColumnCount();
|
||||
ArrayList<String> fields = new ArrayList<>();
|
||||
for (int i = 0; i < columnCount; i++) {
|
||||
fields.add(resultSetMetaData.getColumnName(i + 1));
|
||||
}
|
||||
|
||||
sb.append(Joiner.on(TSV).join(fields));
|
||||
sb.append(LINEFEED);
|
||||
|
||||
while (resultSet.next()) {
|
||||
fields.clear();
|
||||
for (int i = 0; i < columnCount; i++) {
|
||||
fields.add(resultSet.getString(i + 1));
|
||||
}
|
||||
sb.append(Joiner.on(TSV).join(fields));
|
||||
sb.append(LINEFEED);
|
||||
}
|
||||
|
||||
return new InterpreterResult(Code.SUCCESS, sb.toString());
|
||||
|
||||
} catch (ClassNotFoundException | SQLException e) {
|
||||
return new InterpreterResult(Code.ERROR,
|
||||
format("%s%c%s", e.getClass().getName(), LINEFEED, e.getMessage()));
|
||||
}
|
||||
private int getMaxRetry() {
|
||||
return Integer.valueOf(
|
||||
propertiesMap.get(COMMON_KEY).getProperty(MAX_RETRY_KEY, MAX_RETRY_DEFAULT));
|
||||
}
|
||||
|
||||
public String getPropertyKey(String cmd) {
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
|
|||
import org.junit.After;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static java.lang.String.format;
|
||||
|
|
@ -70,9 +72,73 @@ public class HiveInterpreterTest {
|
|||
public void tearDown() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void readTest() throws IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
properties.setProperty("default.user", "");
|
||||
properties.setProperty("default.password", "");
|
||||
HiveInterpreter t = new HiveInterpreter(properties);
|
||||
t.open();
|
||||
|
||||
assertEquals("SCHEMA_NAME\nINFORMATION_SCHEMA\nPUBLIC\n",
|
||||
t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message());
|
||||
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
|
||||
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void readTestWithConfiguration() throws IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
properties.setProperty("default.driver", "wrong.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
properties.setProperty("default.user", "");
|
||||
properties.setProperty("default.password", "");
|
||||
properties.setProperty("h2.driver", "org.h2.Driver");
|
||||
properties.setProperty("h2.url", getJdbcConnection());
|
||||
properties.setProperty("h2.user", "");
|
||||
properties.setProperty("h2.password", "");
|
||||
HiveInterpreter t = new HiveInterpreter(properties);
|
||||
t.open();
|
||||
|
||||
assertEquals("SCHEMA_NAME\nINFORMATION_SCHEMA\nPUBLIC\n",
|
||||
t.interpret("(h2) show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message());
|
||||
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n",
|
||||
t.interpret("(h2)\n select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null)).message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void jdbcRestart() throws IOException, SQLException, ClassNotFoundException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
properties.setProperty("default.user", "");
|
||||
properties.setProperty("default.password", "");
|
||||
HiveInterpreter t = new HiveInterpreter(properties);
|
||||
t.open();
|
||||
|
||||
assertEquals("SCHEMA_NAME\nINFORMATION_SCHEMA\nPUBLIC\n",
|
||||
t.interpret("show databases", new InterpreterContext("", "1", "","", null,null,null,null)).message());
|
||||
|
||||
t.getConnection("default").close();
|
||||
|
||||
InterpreterResult interpreterResult =
|
||||
t.interpret("select * from test_table", new InterpreterContext("", "1", "","", null,null,null,null));
|
||||
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() throws IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
properties.setProperty("default.user", "");
|
||||
|
|
@ -95,6 +161,8 @@ public class HiveInterpreterTest {
|
|||
@Test
|
||||
public void parseMultiplePropertiesMap() {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
properties.setProperty("default.driver", "defaultDriver");
|
||||
properties.setProperty("default.url", "defaultUri");
|
||||
properties.setProperty("default.user", "defaultUser");
|
||||
|
|
@ -109,6 +177,8 @@ public class HiveInterpreterTest {
|
|||
@Test
|
||||
public void ignoreInvalidSettings() {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
properties.setProperty("default.driver", "defaultDriver");
|
||||
properties.setProperty("default.url", "defaultUri");
|
||||
properties.setProperty("default.user", "defaultUser");
|
||||
|
|
@ -123,6 +193,7 @@ public class HiveInterpreterTest {
|
|||
@Test
|
||||
public void getPropertyKey() {
|
||||
HiveInterpreter hi = new HiveInterpreter(new Properties());
|
||||
hi.open();
|
||||
String testCommand = "(default)\nshow tables";
|
||||
assertEquals("get key of default", "default", hi.getPropertyKey(testCommand));
|
||||
testCommand = "(default) show tables";
|
||||
|
|
|
|||
|
|
@ -23,8 +23,16 @@ import static org.junit.Assert.assertEquals;
|
|||
|
||||
public class ParagraphTest {
|
||||
@Test
|
||||
public void scriptBody() {
|
||||
public void scriptBodyWithReplName() {
|
||||
String text = "%spark(1234567";
|
||||
assertEquals("(1234567", Paragraph.getScriptBody(text));
|
||||
|
||||
text = "%table 1234567";
|
||||
assertEquals("1234567", Paragraph.getScriptBody(text));
|
||||
}
|
||||
@Test
|
||||
public void scriptBodyWithoutReplName() {
|
||||
String text = "12345678";
|
||||
assertEquals(text, Paragraph.getScriptBody(text));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue