Merge remote-tracking branch 'upstream/master'

This commit is contained in:
Jesang Yoon 2016-01-19 10:57:24 +09:00
commit eba0315380
16 changed files with 841 additions and 3 deletions

View file

@ -45,6 +45,7 @@ sudo ln -s /usr/local/apache-maven-3.3.3/bin/mvn /usr/local/bin/mvn
_Notes:_
- Ensure node is installed by running `node --version`
- Ensure maven is running version 3.1.x or higher with `mvn -version`
- Configure maven to use more memory than usual by ```export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=1024m"```
### Build
If you want to build Zeppelin from the source, please first clone this repository, then:

View file

@ -67,6 +67,9 @@ if [[ -d "${ZEPPELIN_HOME}/zeppelin-server/target/classes" ]]; then
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-server/target/classes"
fi
# Add jdbc connector jar
# ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/jdbc/jars/jdbc-connector-jar"
addJarInDir "${ZEPPELIN_HOME}"
addJarInDir "${ZEPPELIN_HOME}/lib"
addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"

View file

@ -105,7 +105,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

View file

@ -37,6 +37,7 @@
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Interpreter <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="{{BASE_PATH}}/manual/interpreters.html">Overview</a></li>
<li><a href="{{BASE_PATH}}/manual/dynamicinterpreterload.html">Dynamic Interpreter Loading</a></li>
<li role="separator" class="divider"></li>
<li><a href="{{BASE_PATH}}/interpreter/cassandra.html">Cassandra</a></li>
<li><a href="{{BASE_PATH}}/interpreter/elasticsearch.html">Elasticsearch</a></li>

Binary file not shown.

After

Width:  |  Height:  |  Size: 322 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 298 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 396 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 146 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 257 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 325 KiB

View file

@ -0,0 +1,124 @@
---
layout: page
title: "Dynamic Interpreter Loading"
description: ""
group: manual
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
{% include JB/setup %}
## Dynamic Interpreter Loading using REST API
Zeppelin provides pluggable interpreter architecture which results in a wide and variety of the supported backend system. In this section, we will introduce **Dynamic interpreter loading** using **REST API**. This concept actually comes from [Zeppelin Helium Proposal](https://cwiki.apache.org/confluence/display/ZEPPELIN/Helium+proposal).
Before we start, if you are not familiar with the concept of **Zeppelin interpreter**, you can check out [Overview of Zeppelin interpreter](../manual/interpreters.html) first.
<br/>
## Overview
In the past, Zeppelin was loading interpreter binaries from `/interpreter/[interpreter_name]` directory. They were configured by `zeppelin.interpreters` property in `conf/zeppelin-site.xml` or `ZEPPELIN_INTERPRETERS` env variables in `conf/zeppelin-env.sh`. They were loaded on Zeppelin server startup and stayed alive until the server was stopped.
In order to simplify using 3rd party interpreters, we changed this way to **dynamically** load interpreters from **Maven Repository** using **REST API**. Hopefully, the picture below will help you to understand the process.
<center><img src="../assets/themes/zeppelin/img/docs-img/zeppelin_user.png" height="85%" width="85%"></center>
## Load & Unload Interpreters Using REST API
### 1. Load
You can **load** interpreters located in Maven repository using REST API, like this:
( Maybe, you are unfamiliar with `[interpreter_group_name]` or `[interpreter_name]`. If so, please checkout [Interpreters in Zeppelin](../manual/interpreter.html) again. )
```
http://[zeppelin-server]:[zeppelin-port]/api/interpreter/load/[interpreter_group_name]/[interpreter_name]
```
The Restful method will be <code>**POST**</code>. And the parameters you need are:
1. **Artifact:** Maven artifact ( groupId:artifactId:version )
2. **Class Name:** Package name + Interpreter class name
3. **Repository ( optional ):** Additional maven repository address
For example, if you want to load `markdown` interpreter to your Zeppelin, the parameters and URL you need may look like:
```
http://127.0.0.1:8080/api/interpreter/load/md/markdown
```
```
{
"artifact": "org.apache.zeppelin:zeppelin-markdown:0.6.0-incubating-SNAPSHOT",
"className": "org.apache.zeppelin.markdown.Markdown",
"repository": {
"url": "http://dl.bintray.com/spark-packages/maven",
"snapshot": false
}
}
```
The meaning of each parameters is:
1. **Artifact**
- groupId: org.apache.zeppelin
- artifactId: zeppelin-markdown
- version: 0.6.0-incubating-SNAPSHOT
2. **Class Name**
- Package Name: org.apache.zeppelin
- Interpreter Class Name: markdown.Markdown
3. **Repository ( optional )**
- Url: http://dl.bintray.com/spark-packages/maven
- Snapshot: false
> <b>Please note: </b>The interpreters you downloaded need to be **reload**, when your Zeppelin server is down.
### 2. Unload
If you want to **unload** the interpreters using REST API,
```
http://[zeppelin-server]:[zeppelin-port]/api/interpreter/unload/[interpreter_group_name]/[interpreter_name]
```
In this case, the Restful method will be <code>**DELETE**</code>.
<br/>
## What is the next step after Loading ?
### Q1. Where is the location of interpreters you downloaded ?
Actually, the answer about this question is in the above picture. Once the REST API is called, the `.jar` files of interpreters you get are saved under `ZEPPELIN_HOME/local-repo` first. Then, they will be copied to `ZEPPELIN_HOME/interpreter` directory. So, please checkout your `ZEPPELIN_HOME/interpreter`.
### Q2. Then, how can I use this interpreter ?
After loading an interpreter, you can use it by creating and configuring it in Zeppelin's **Interpreter tab**.
Oh, you don't need to restart your Zeppelin server. Because it is **Dynamic Loading**, you can configure and load it **at runtime** !
1. After Zeppelin server up, browse Zeppelin home and click **Interpreter tab**.
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_setting_1.png" height="85%" width="85%"></center>
2. At the **Interpreter** section, click **+Create** button.
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_setting_2.png" height="85%" width="85%"></center>
3. Then, you can verify the interpreter list that you loaded.
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_setting_3.png" height="85%" width="85%"></center>
4. After choosing an interpreter, you can configure and use it. Don't forget to save it.
5. Create a new notebook in the **Notebook** section, then you can bind the interpreters from your interpreter list. Just drag and drop !
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_binding_1.png" height="85%" width="85%"></center>
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_binding_2.png" height="85%" width="85%"></center>
6. At last, you can use your interpreter !
If you want to get the specific information about respective interpreters, please checkout each interpreter documentation.

169
jdbc/pom.xml Normal file
View file

@ -0,0 +1,169 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.6.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-jdbc</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<name>Zeppelin: JDBC interpreter</name>
<url>http://www.apache.org</url>
<properties>
<postgresql.version>9.4-1201-jdbc41</postgresql.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgresql.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.190</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.mockrunner</groupId>
<artifactId>mockrunner-jdbc</artifactId>
<version>1.0.8</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/jdbc</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/jdbc</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,413 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.zeppelin.jdbc;
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
/**
* JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ,
* GreenplumDB, MariaDB, MySQL, Postgres and Redshit.
*
* <ul>
* <li>{@code default.url} - JDBC URL to connect to.</li>
* <li>{@code default.user} - JDBC user name..</li>
* <li>{@code default.password} - JDBC password..</li>
* <li>{@code default.driver.name} - JDBC driver name.</li>
* <li>{@code common.max.result} - Max number of SQL result to display.</li>
* </ul>
*
* <p>
* How to use: <br/>
* {@code %jdbc.sql} <br/>
* {@code
* SELECT store_id, count(*)
* FROM retail_demo.order_lineitems_pxf
* GROUP BY store_id;
* }
* </p>
*
*/
public class JDBCInterpreter extends Interpreter {
private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class);
static final String COMMON_KEY = "common";
static final String MAX_LINE_KEY = "max_count";
static final String MAX_LINE_DEFAULT = "1000";
static final String DEFAULT_KEY = "default";
static final String DRIVER_KEY = "driver";
static final String URL_KEY = "url";
static final String USER_KEY = "user";
static final String PASSWORD_KEY = "password";
static final String DOT = ".";
private static final char WHITESPACE = ' ';
private static final char NEWLINE = '\n';
private static final char TAB = '\t';
private static final String TABLE_MAGIC_TAG = "%table ";
private static final String EXPLAIN_PREDICATE = "EXPLAIN ";
private static final String UPDATE_COUNT_HEADER = "Update Count";
static final String COMMON_MAX_LINE = COMMON_KEY + DOT + MAX_LINE_KEY;
static final String DEFAULT_DRIVER = DEFAULT_KEY + DOT + DRIVER_KEY;
static final String DEFAULT_URL = DEFAULT_KEY + DOT + URL_KEY;
static final String DEFAULT_USER = DEFAULT_KEY + DOT + USER_KEY;
static final String DEFAULT_PASSWORD = DEFAULT_KEY + DOT + PASSWORD_KEY;
static final String EMPTY_COLUMN_VALUE = "";
private final HashMap<String, Properties> propertiesMap;
private final Map<String, Statement> paragraphIdStatementMap;
private final Map<String, ArrayList<Connection>> propertyKeyUnusedConnectionListMap;
private final Map<String, Connection> paragraphIdConnectionMap;
static {
Interpreter.register(
"sql",
"jdbc",
JDBCInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(DEFAULT_URL, "jdbc:postgresql://localhost:5432/", "The URL for JDBC.")
.add(DEFAULT_USER, "gpadmin", "The JDBC user name")
.add(DEFAULT_PASSWORD, "",
"The JDBC user password")
.add(DEFAULT_DRIVER, "org.postgresql.Driver", "JDBC Driver Name")
.add(COMMON_MAX_LINE, MAX_LINE_DEFAULT,
"Max number of SQL result to display.").build());
}
public JDBCInterpreter(Properties property) {
super(property);
propertiesMap = new HashMap<>();
propertyKeyUnusedConnectionListMap = new HashMap<>();
paragraphIdStatementMap = new HashMap<>();
paragraphIdConnectionMap = new HashMap<>();
}
public HashMap<String, Properties> getPropertiesMap() {
return propertiesMap;
}
@Override
public void open() {
for (String propertyKey : property.stringPropertyNames()) {
logger.debug("propertyKey: {}", propertyKey);
String[] keyValue = propertyKey.split("\\.", 2);
if (2 == keyValue.length) {
logger.info("key: {}, value: {}", keyValue[0], keyValue[1]);
Properties prefixProperties;
if (propertiesMap.containsKey(keyValue[0])) {
prefixProperties = propertiesMap.get(keyValue[0]);
} else {
prefixProperties = new Properties();
propertiesMap.put(keyValue[0], prefixProperties);
}
prefixProperties.put(keyValue[1], property.getProperty(propertyKey));
}
}
Set<String> removeKeySet = new HashSet<>();
for (String key : propertiesMap.keySet()) {
if (!COMMON_KEY.equals(key)) {
Properties properties = propertiesMap.get(key);
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
key, DRIVER_KEY, key, key, URL_KEY);
removeKeySet.add(key);
}
}
}
for (String key : removeKeySet) {
propertiesMap.remove(key);
}
logger.debug("propertiesMap: {}", propertiesMap);
}
public Connection getConnection(String propertyKey) throws ClassNotFoundException, SQLException {
Connection connection = null;
if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
ArrayList<Connection> connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey);
if (0 != connectionList.size()) {
connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0);
if (null != connection && connection.isClosed()) {
connection.close();
connection = null;
}
}
}
if (null == connection) {
Properties properties = propertiesMap.get(propertyKey);
logger.info(properties.getProperty(DRIVER_KEY));
Class.forName(properties.getProperty(DRIVER_KEY));
String url = properties.getProperty(URL_KEY);
String user = properties.getProperty(USER_KEY);
String password = properties.getProperty(PASSWORD_KEY);
if (null != user && null != password) {
connection = DriverManager.getConnection(url, user, password);
} else {
connection = DriverManager.getConnection(url, properties);
}
}
return connection;
}
public Statement getStatement(String propertyKey, String paragraphId)
throws SQLException, ClassNotFoundException {
Connection connection;
if (paragraphIdConnectionMap.containsKey(paragraphId)) {
connection = paragraphIdConnectionMap.get(paragraphId);
} else {
connection = getConnection(propertyKey);
}
Statement statement = connection.createStatement();
if (isStatementClosed(statement)) {
connection = getConnection(propertyKey);
statement = connection.createStatement();
}
paragraphIdConnectionMap.put(paragraphId, connection);
paragraphIdStatementMap.put(paragraphId, statement);
return statement;
}
private boolean isStatementClosed(Statement statement) {
try {
return statement.isClosed();
} catch (Throwable t) {
logger.debug("{} doesn't support isClosed method", statement);
return false;
}
}
@Override
public void close() {
try {
for (List<Connection> connectionList : propertyKeyUnusedConnectionListMap.values()) {
for (Connection c : connectionList) {
c.close();
}
}
for (Statement statement : paragraphIdStatementMap.values()) {
statement.close();
}
paragraphIdStatementMap.clear();
for (Connection connection : paragraphIdConnectionMap.values()) {
connection.close();
}
paragraphIdConnectionMap.clear();
} catch (SQLException e) {
logger.error("Error while closing...", e);
}
}
private InterpreterResult executeSql(String propertyKey, String sql,
InterpreterContext interpreterContext) {
String paragraphId = interpreterContext.getParagraphId();
try {
Statement statement = getStatement(propertyKey, paragraphId);
statement.setMaxRows(getMaxResult());
StringBuilder msg = null;
boolean isTableType = false;
if (containsIgnoreCase(sql, EXPLAIN_PREDICATE)) {
msg = new StringBuilder();
} else {
msg = new StringBuilder(TABLE_MAGIC_TAG);
isTableType = true;
}
ResultSet resultSet = null;
try {
boolean isResultSetAvailable = statement.execute(sql);
if (isResultSetAvailable) {
resultSet = statement.getResultSet();
ResultSetMetaData md = resultSet.getMetaData();
for (int i = 1; i < md.getColumnCount() + 1; i++) {
if (i > 1) {
msg.append(TAB);
}
msg.append(replaceReservedChars(isTableType, md.getColumnName(i)));
}
msg.append(NEWLINE);
int displayRowCount = 0;
while (resultSet.next() && displayRowCount < getMaxResult()) {
for (int i = 1; i < md.getColumnCount() + 1; i++) {
msg.append(replaceReservedChars(isTableType, resultSet.getString(i)));
if (i != md.getColumnCount()) {
msg.append(TAB);
}
}
msg.append(NEWLINE);
displayRowCount++;
}
} else {
// Response contains either an update count or there are no results.
int updateCount = statement.getUpdateCount();
msg.append(UPDATE_COUNT_HEADER).append(NEWLINE);
msg.append(updateCount).append(NEWLINE);
}
} finally {
try {
if (resultSet != null) {
resultSet.close();
}
statement.close();
} finally {
statement = null;
}
}
return new InterpreterResult(Code.SUCCESS, msg.toString());
} catch (SQLException ex) {
logger.error("Cannot run " + sql, ex);
return new InterpreterResult(Code.ERROR, ex.getMessage());
} catch (ClassNotFoundException e) {
logger.error("Cannot run " + sql, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
}
/**
* For %table response replace Tab and Newline characters from the content.
*/
private String replaceReservedChars(boolean isTableResponseType, String str) {
if (str == null) {
return EMPTY_COLUMN_VALUE;
}
return (!isTableResponseType) ? str : str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.info("Run SQL command '{}'", cmd);
String propertyKey = getPropertyKey(cmd);
if (null != propertyKey) {
cmd = cmd.substring(propertyKey.length() + 2);
} else {
propertyKey = DEFAULT_KEY;
}
cmd = cmd.trim();
logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
return executeSql(propertyKey, cmd, contextInterpreter);
}
@Override
public void cancel(InterpreterContext context) {
logger.info("Cancel current query statement.");
String paragraphId = context.getParagraphId();
try {
paragraphIdStatementMap.get(paragraphId).cancel();
} catch (SQLException e) {
logger.error("Error while cancelling...", e);
}
}
public String getPropertyKey(String cmd) {
int firstLineIndex = cmd.indexOf("\n");
if (-1 == firstLineIndex) {
firstLineIndex = cmd.length();
}
int configStartIndex = cmd.indexOf("(");
int configLastIndex = cmd.indexOf(")");
if (configStartIndex != -1 && configLastIndex != -1
&& configLastIndex < firstLineIndex && configLastIndex < firstLineIndex) {
return cmd.substring(configStartIndex + 1, configLastIndex);
}
return null;
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
JDBCInterpreter.class.getName() + this.hashCode());
}
@Override
public List<String> completion(String buf, int cursor) {
return null;
}
public int getMaxResult() {
return Integer.valueOf(
propertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT));
}
}

View file

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.zeppelin.jdbc;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_KEY;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_DRIVER;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_PASSWORD;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.COMMON_MAX_LINE;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.jdbc.JDBCInterpreter;
import org.junit.Before;
import org.junit.Test;
import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
/**
* JDBC interpreter unit tests
*/
public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
static String jdbcConnection;
private static String getJdbcConnection() throws IOException {
if(null == jdbcConnection) {
Path tmpDir = Files.createTempDirectory("h2-test-");
tmpDir.toFile().deleteOnExit();
jdbcConnection = format("jdbc:h2:%s", tmpDir);
}
return jdbcConnection;
}
@Before
public void setUp() throws Exception {
Class.forName("org.h2.Driver");
Connection connection = DriverManager.getConnection(getJdbcConnection());
Statement statement = connection.createStatement();
statement.execute(
"DROP TABLE IF EXISTS test_table; " +
"CREATE TABLE test_table(id varchar(255), name varchar(255));");
statement.execute(
"insert into test_table(id, name) values ('a', 'a_name'),('b', 'b_name');"
);
}
@Test
public void testDefaultProperties() throws SQLException {
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(new Properties());
assertEquals("org.postgresql.Driver", jdbcInterpreter.getProperty(DEFAULT_DRIVER));
assertEquals("jdbc:postgresql://localhost:5432/", jdbcInterpreter.getProperty(DEFAULT_URL));
assertEquals("gpadmin", jdbcInterpreter.getProperty(DEFAULT_USER));
assertEquals("", jdbcInterpreter.getProperty(DEFAULT_PASSWORD));
assertEquals("1000", jdbcInterpreter.getProperty(COMMON_MAX_LINE));
}
@Test
public void testSelectQuery() throws SQLException, IOException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
JDBCInterpreter t = new JDBCInterpreter(properties);
t.open();
String sqlQuery = "select * from test_table";
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null));
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
}
@Test
public void testSelectQueryMaxResult() throws SQLException, IOException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1");
properties.setProperty("common.max_retry", "3");
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
properties.setProperty("default.user", "");
properties.setProperty("default.password", "");
JDBCInterpreter t = new JDBCInterpreter(properties);
t.open();
String sqlQuery = "select * from test_table";
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null));
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message());
}
}

View file

@ -94,6 +94,7 @@
<module>hive</module>
<module>phoenix</module>
<module>postgresql</module>
<module>jdbc</module>
<module>tajo</module>
<module>flink</module>
<module>ignite</module>

View file

@ -402,6 +402,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
+ "org.apache.zeppelin.phoenix.PhoenixInterpreter,"
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+ "org.apache.zeppelin.tajo.TajoInterpreter,"
+ "org.apache.zeppelin.flink.FlinkInterpreter,"
+ "org.apache.zeppelin.ignite.IgniteInterpreter,"
@ -409,10 +410,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.lens.LensInterpreter,"
+ "org.apache.zeppelin.cassandra.CassandraInterpreter,"
+ "org.apache.zeppelin.geode.GeodeOqlInterpreter,"
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+ "org.apache.zeppelin.kylin.KylinInterpreter,"
+ "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
+ "org.apache.zeppelin.scalding.ScaldingInterpreter"),
+ "org.apache.zeppelin.scalding.ScaldingInterpreter,"
+ "org.apache.zeppelin.jdbc.JDBCInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),