mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge remote-tracking branch 'upstream/master'
This commit is contained in:
commit
eba0315380
16 changed files with 841 additions and 3 deletions
|
|
@ -45,6 +45,7 @@ sudo ln -s /usr/local/apache-maven-3.3.3/bin/mvn /usr/local/bin/mvn
|
|||
_Notes:_
|
||||
- Ensure node is installed by running `node --version`
|
||||
- Ensure maven is running version 3.1.x or higher with `mvn -version`
|
||||
- Configure maven to use more memory than usual by ```export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=1024m"```
|
||||
|
||||
### Build
|
||||
If you want to build Zeppelin from the source, please first clone this repository, then:
|
||||
|
|
|
|||
|
|
@ -67,6 +67,9 @@ if [[ -d "${ZEPPELIN_HOME}/zeppelin-server/target/classes" ]]; then
|
|||
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-server/target/classes"
|
||||
fi
|
||||
|
||||
# Add jdbc connector jar
|
||||
# ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/jdbc/jars/jdbc-connector-jar"
|
||||
|
||||
addJarInDir "${ZEPPELIN_HOME}"
|
||||
addJarInDir "${ZEPPELIN_HOME}/lib"
|
||||
addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@
|
|||
|
||||
<property>
|
||||
<name>zeppelin.interpreters</name>
|
||||
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter</value>
|
||||
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter</value>
|
||||
<description>Comma separated interpreter configurations. First interpreter become a default</description>
|
||||
</property>
|
||||
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@
|
|||
<a href="#" data-toggle="dropdown" class="dropdown-toggle">Interpreter <b class="caret"></b></a>
|
||||
<ul class="dropdown-menu">
|
||||
<li><a href="{{BASE_PATH}}/manual/interpreters.html">Overview</a></li>
|
||||
<li><a href="{{BASE_PATH}}/manual/dynamicinterpreterload.html">Dynamic Interpreter Loading</a></li>
|
||||
<li role="separator" class="divider"></li>
|
||||
<li><a href="{{BASE_PATH}}/interpreter/cassandra.html">Cassandra</a></li>
|
||||
<li><a href="{{BASE_PATH}}/interpreter/elasticsearch.html">Elasticsearch</a></li>
|
||||
|
|
|
|||
Binary file not shown.
|
After Width: | Height: | Size: 322 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 298 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 396 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 146 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 257 KiB |
BIN
docs/assets/themes/zeppelin/img/docs-img/zeppelin_user.png
Normal file
BIN
docs/assets/themes/zeppelin/img/docs-img/zeppelin_user.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 325 KiB |
124
docs/manual/dynamicinterpreterload.md
Normal file
124
docs/manual/dynamicinterpreterload.md
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
---
|
||||
layout: page
|
||||
title: "Dynamic Interpreter Loading"
|
||||
description: ""
|
||||
group: manual
|
||||
---
|
||||
<!--
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
{% include JB/setup %}
|
||||
|
||||
## Dynamic Interpreter Loading using REST API
|
||||
|
||||
Zeppelin provides pluggable interpreter architecture which results in a wide and variety of the supported backend system. In this section, we will introduce **Dynamic interpreter loading** using **REST API**. This concept actually comes from [Zeppelin Helium Proposal](https://cwiki.apache.org/confluence/display/ZEPPELIN/Helium+proposal).
|
||||
Before we start, if you are not familiar with the concept of **Zeppelin interpreter**, you can check out [Overview of Zeppelin interpreter](../manual/interpreters.html) first.
|
||||
|
||||
<br/>
|
||||
## Overview
|
||||
In the past, Zeppelin was loading interpreter binaries from `/interpreter/[interpreter_name]` directory. They were configured by `zeppelin.interpreters` property in `conf/zeppelin-site.xml` or `ZEPPELIN_INTERPRETERS` env variables in `conf/zeppelin-env.sh`. They were loaded on Zeppelin server startup and stayed alive until the server was stopped.
|
||||
In order to simplify using 3rd party interpreters, we changed this way to **dynamically** load interpreters from **Maven Repository** using **REST API**. Hopefully, the picture below will help you to understand the process.
|
||||
<center><img src="../assets/themes/zeppelin/img/docs-img/zeppelin_user.png" height="85%" width="85%"></center>
|
||||
|
||||
## Load & Unload Interpreters Using REST API
|
||||
|
||||
### 1. Load
|
||||
You can **load** interpreters located in Maven repository using REST API, like this:
|
||||
|
||||
( Maybe, you are unfamiliar with `[interpreter_group_name]` or `[interpreter_name]`. If so, please checkout [Interpreters in Zeppelin](../manual/interpreter.html) again. )
|
||||
|
||||
```
|
||||
http://[zeppelin-server]:[zeppelin-port]/api/interpreter/load/[interpreter_group_name]/[interpreter_name]
|
||||
```
|
||||
The Restful method will be <code>**POST**</code>. And the parameters you need are:
|
||||
|
||||
1. **Artifact:** Maven artifact ( groupId:artifactId:version )
|
||||
|
||||
2. **Class Name:** Package name + Interpreter class name
|
||||
|
||||
3. **Repository ( optional ):** Additional maven repository address
|
||||
|
||||
For example, if you want to load `markdown` interpreter to your Zeppelin, the parameters and URL you need may look like:
|
||||
|
||||
```
|
||||
http://127.0.0.1:8080/api/interpreter/load/md/markdown
|
||||
```
|
||||
|
||||
```
|
||||
{
|
||||
"artifact": "org.apache.zeppelin:zeppelin-markdown:0.6.0-incubating-SNAPSHOT",
|
||||
"className": "org.apache.zeppelin.markdown.Markdown",
|
||||
"repository": {
|
||||
"url": "http://dl.bintray.com/spark-packages/maven",
|
||||
"snapshot": false
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
The meaning of each parameters is:
|
||||
|
||||
1. **Artifact**
|
||||
- groupId: org.apache.zeppelin
|
||||
- artifactId: zeppelin-markdown
|
||||
- version: 0.6.0-incubating-SNAPSHOT
|
||||
|
||||
2. **Class Name**
|
||||
- Package Name: org.apache.zeppelin
|
||||
- Interpreter Class Name: markdown.Markdown
|
||||
|
||||
3. **Repository ( optional )**
|
||||
- Url: http://dl.bintray.com/spark-packages/maven
|
||||
- Snapshot: false
|
||||
|
||||
> <b>Please note: </b>The interpreters you downloaded need to be **reload**, when your Zeppelin server is down.
|
||||
|
||||
### 2. Unload
|
||||
If you want to **unload** the interpreters using REST API,
|
||||
|
||||
```
|
||||
http://[zeppelin-server]:[zeppelin-port]/api/interpreter/unload/[interpreter_group_name]/[interpreter_name]
|
||||
```
|
||||
In this case, the Restful method will be <code>**DELETE**</code>.
|
||||
|
||||
<br/>
|
||||
## What is the next step after Loading ?
|
||||
|
||||
### Q1. Where is the location of interpreters you downloaded ?
|
||||
|
||||
Actually, the answer about this question is in the above picture. Once the REST API is called, the `.jar` files of interpreters you get are saved under `ZEPPELIN_HOME/local-repo` first. Then, they will be copied to `ZEPPELIN_HOME/interpreter` directory. So, please checkout your `ZEPPELIN_HOME/interpreter`.
|
||||
|
||||
### Q2. Then, how can I use this interpreter ?
|
||||
|
||||
After loading an interpreter, you can use it by creating and configuring it in Zeppelin's **Interpreter tab**.
|
||||
|
||||
Oh, you don't need to restart your Zeppelin server. Because it is **Dynamic Loading**, you can configure and load it **at runtime** !
|
||||
|
||||
1. After Zeppelin server up, browse Zeppelin home and click **Interpreter tab**.
|
||||
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_setting_1.png" height="85%" width="85%"></center>
|
||||
|
||||
2. At the **Interpreter** section, click **+Create** button.
|
||||
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_setting_2.png" height="85%" width="85%"></center>
|
||||
|
||||
3. Then, you can verify the interpreter list that you loaded.
|
||||
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_setting_3.png" height="85%" width="85%"></center>
|
||||
|
||||
4. After choosing an interpreter, you can configure and use it. Don't forget to save it.
|
||||
|
||||
5. Create a new notebook in the **Notebook** section, then you can bind the interpreters from your interpreter list. Just drag and drop !
|
||||
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_binding_1.png" height="85%" width="85%"></center>
|
||||
<center><img src="../assets/themes/zeppelin/img/docs-img/interpreter_binding_2.png" height="85%" width="85%"></center>
|
||||
|
||||
6. At last, you can use your interpreter !
|
||||
|
||||
If you want to get the specific information about respective interpreters, please checkout each interpreter documentation.
|
||||
169
jdbc/pom.xml
Normal file
169
jdbc/pom.xml
Normal file
|
|
@ -0,0 +1,169 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-jdbc</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<name>Zeppelin: JDBC interpreter</name>
|
||||
<url>http://www.apache.org</url>
|
||||
|
||||
<properties>
|
||||
<postgresql.version>9.4-1201-jdbc41</postgresql.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
<version>${postgresql.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>jline</groupId>
|
||||
<artifactId>jline</artifactId>
|
||||
<version>2.12.1</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.h2database</groupId>
|
||||
<artifactId>h2</artifactId>
|
||||
<version>1.4.190</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<version>1.9.5</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.mockrunner</groupId>
|
||||
<artifactId>mockrunner-jdbc</artifactId>
|
||||
<version>1.0.8</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/jdbc</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/jdbc</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
413
jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Normal file
413
jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
Normal file
|
|
@ -0,0 +1,413 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.zeppelin.jdbc;
|
||||
|
||||
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.collect.Sets.SetView;
|
||||
|
||||
/**
|
||||
* JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ,
|
||||
* GreenplumDB, MariaDB, MySQL, Postgres and Redshit.
|
||||
*
|
||||
* <ul>
|
||||
* <li>{@code default.url} - JDBC URL to connect to.</li>
|
||||
* <li>{@code default.user} - JDBC user name..</li>
|
||||
* <li>{@code default.password} - JDBC password..</li>
|
||||
* <li>{@code default.driver.name} - JDBC driver name.</li>
|
||||
* <li>{@code common.max.result} - Max number of SQL result to display.</li>
|
||||
* </ul>
|
||||
*
|
||||
* <p>
|
||||
* How to use: <br/>
|
||||
* {@code %jdbc.sql} <br/>
|
||||
* {@code
|
||||
* SELECT store_id, count(*)
|
||||
* FROM retail_demo.order_lineitems_pxf
|
||||
* GROUP BY store_id;
|
||||
* }
|
||||
* </p>
|
||||
*
|
||||
*/
|
||||
public class JDBCInterpreter extends Interpreter {
|
||||
|
||||
private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class);
|
||||
|
||||
static final String COMMON_KEY = "common";
|
||||
static final String MAX_LINE_KEY = "max_count";
|
||||
static final String MAX_LINE_DEFAULT = "1000";
|
||||
|
||||
static final String DEFAULT_KEY = "default";
|
||||
static final String DRIVER_KEY = "driver";
|
||||
static final String URL_KEY = "url";
|
||||
static final String USER_KEY = "user";
|
||||
static final String PASSWORD_KEY = "password";
|
||||
static final String DOT = ".";
|
||||
|
||||
private static final char WHITESPACE = ' ';
|
||||
private static final char NEWLINE = '\n';
|
||||
private static final char TAB = '\t';
|
||||
private static final String TABLE_MAGIC_TAG = "%table ";
|
||||
private static final String EXPLAIN_PREDICATE = "EXPLAIN ";
|
||||
private static final String UPDATE_COUNT_HEADER = "Update Count";
|
||||
|
||||
static final String COMMON_MAX_LINE = COMMON_KEY + DOT + MAX_LINE_KEY;
|
||||
|
||||
static final String DEFAULT_DRIVER = DEFAULT_KEY + DOT + DRIVER_KEY;
|
||||
static final String DEFAULT_URL = DEFAULT_KEY + DOT + URL_KEY;
|
||||
static final String DEFAULT_USER = DEFAULT_KEY + DOT + USER_KEY;
|
||||
static final String DEFAULT_PASSWORD = DEFAULT_KEY + DOT + PASSWORD_KEY;
|
||||
|
||||
static final String EMPTY_COLUMN_VALUE = "";
|
||||
|
||||
private final HashMap<String, Properties> propertiesMap;
|
||||
private final Map<String, Statement> paragraphIdStatementMap;
|
||||
|
||||
private final Map<String, ArrayList<Connection>> propertyKeyUnusedConnectionListMap;
|
||||
private final Map<String, Connection> paragraphIdConnectionMap;
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"sql",
|
||||
"jdbc",
|
||||
JDBCInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add(DEFAULT_URL, "jdbc:postgresql://localhost:5432/", "The URL for JDBC.")
|
||||
.add(DEFAULT_USER, "gpadmin", "The JDBC user name")
|
||||
.add(DEFAULT_PASSWORD, "",
|
||||
"The JDBC user password")
|
||||
.add(DEFAULT_DRIVER, "org.postgresql.Driver", "JDBC Driver Name")
|
||||
.add(COMMON_MAX_LINE, MAX_LINE_DEFAULT,
|
||||
"Max number of SQL result to display.").build());
|
||||
}
|
||||
|
||||
public JDBCInterpreter(Properties property) {
|
||||
super(property);
|
||||
propertiesMap = new HashMap<>();
|
||||
propertyKeyUnusedConnectionListMap = new HashMap<>();
|
||||
paragraphIdStatementMap = new HashMap<>();
|
||||
paragraphIdConnectionMap = new HashMap<>();
|
||||
}
|
||||
|
||||
public HashMap<String, Properties> getPropertiesMap() {
|
||||
return propertiesMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
for (String propertyKey : property.stringPropertyNames()) {
|
||||
logger.debug("propertyKey: {}", propertyKey);
|
||||
String[] keyValue = propertyKey.split("\\.", 2);
|
||||
if (2 == keyValue.length) {
|
||||
logger.info("key: {}, value: {}", keyValue[0], keyValue[1]);
|
||||
Properties prefixProperties;
|
||||
if (propertiesMap.containsKey(keyValue[0])) {
|
||||
prefixProperties = propertiesMap.get(keyValue[0]);
|
||||
} else {
|
||||
prefixProperties = new Properties();
|
||||
propertiesMap.put(keyValue[0], prefixProperties);
|
||||
}
|
||||
prefixProperties.put(keyValue[1], property.getProperty(propertyKey));
|
||||
}
|
||||
}
|
||||
|
||||
Set<String> removeKeySet = new HashSet<>();
|
||||
for (String key : propertiesMap.keySet()) {
|
||||
if (!COMMON_KEY.equals(key)) {
|
||||
Properties properties = propertiesMap.get(key);
|
||||
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
|
||||
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
|
||||
key, DRIVER_KEY, key, key, URL_KEY);
|
||||
removeKeySet.add(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (String key : removeKeySet) {
|
||||
propertiesMap.remove(key);
|
||||
}
|
||||
|
||||
logger.debug("propertiesMap: {}", propertiesMap);
|
||||
}
|
||||
|
||||
public Connection getConnection(String propertyKey) throws ClassNotFoundException, SQLException {
|
||||
Connection connection = null;
|
||||
if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
|
||||
ArrayList<Connection> connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey);
|
||||
if (0 != connectionList.size()) {
|
||||
connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0);
|
||||
if (null != connection && connection.isClosed()) {
|
||||
connection.close();
|
||||
connection = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (null == connection) {
|
||||
Properties properties = propertiesMap.get(propertyKey);
|
||||
logger.info(properties.getProperty(DRIVER_KEY));
|
||||
Class.forName(properties.getProperty(DRIVER_KEY));
|
||||
String url = properties.getProperty(URL_KEY);
|
||||
String user = properties.getProperty(USER_KEY);
|
||||
String password = properties.getProperty(PASSWORD_KEY);
|
||||
if (null != user && null != password) {
|
||||
connection = DriverManager.getConnection(url, user, password);
|
||||
} else {
|
||||
connection = DriverManager.getConnection(url, properties);
|
||||
}
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
|
||||
public Statement getStatement(String propertyKey, String paragraphId)
|
||||
throws SQLException, ClassNotFoundException {
|
||||
Connection connection;
|
||||
if (paragraphIdConnectionMap.containsKey(paragraphId)) {
|
||||
connection = paragraphIdConnectionMap.get(paragraphId);
|
||||
} else {
|
||||
connection = getConnection(propertyKey);
|
||||
}
|
||||
|
||||
Statement statement = connection.createStatement();
|
||||
if (isStatementClosed(statement)) {
|
||||
connection = getConnection(propertyKey);
|
||||
statement = connection.createStatement();
|
||||
}
|
||||
paragraphIdConnectionMap.put(paragraphId, connection);
|
||||
paragraphIdStatementMap.put(paragraphId, statement);
|
||||
|
||||
return statement;
|
||||
}
|
||||
|
||||
private boolean isStatementClosed(Statement statement) {
|
||||
try {
|
||||
return statement.isClosed();
|
||||
} catch (Throwable t) {
|
||||
logger.debug("{} doesn't support isClosed method", statement);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
try {
|
||||
for (List<Connection> connectionList : propertyKeyUnusedConnectionListMap.values()) {
|
||||
for (Connection c : connectionList) {
|
||||
c.close();
|
||||
}
|
||||
}
|
||||
|
||||
for (Statement statement : paragraphIdStatementMap.values()) {
|
||||
statement.close();
|
||||
}
|
||||
paragraphIdStatementMap.clear();
|
||||
|
||||
for (Connection connection : paragraphIdConnectionMap.values()) {
|
||||
connection.close();
|
||||
}
|
||||
paragraphIdConnectionMap.clear();
|
||||
|
||||
} catch (SQLException e) {
|
||||
logger.error("Error while closing...", e);
|
||||
}
|
||||
}
|
||||
|
||||
private InterpreterResult executeSql(String propertyKey, String sql,
|
||||
InterpreterContext interpreterContext) {
|
||||
|
||||
String paragraphId = interpreterContext.getParagraphId();
|
||||
|
||||
try {
|
||||
|
||||
Statement statement = getStatement(propertyKey, paragraphId);
|
||||
statement.setMaxRows(getMaxResult());
|
||||
|
||||
StringBuilder msg = null;
|
||||
boolean isTableType = false;
|
||||
|
||||
if (containsIgnoreCase(sql, EXPLAIN_PREDICATE)) {
|
||||
msg = new StringBuilder();
|
||||
} else {
|
||||
msg = new StringBuilder(TABLE_MAGIC_TAG);
|
||||
isTableType = true;
|
||||
}
|
||||
|
||||
ResultSet resultSet = null;
|
||||
try {
|
||||
|
||||
boolean isResultSetAvailable = statement.execute(sql);
|
||||
|
||||
if (isResultSetAvailable) {
|
||||
resultSet = statement.getResultSet();
|
||||
|
||||
ResultSetMetaData md = resultSet.getMetaData();
|
||||
|
||||
for (int i = 1; i < md.getColumnCount() + 1; i++) {
|
||||
if (i > 1) {
|
||||
msg.append(TAB);
|
||||
}
|
||||
msg.append(replaceReservedChars(isTableType, md.getColumnName(i)));
|
||||
}
|
||||
msg.append(NEWLINE);
|
||||
|
||||
int displayRowCount = 0;
|
||||
while (resultSet.next() && displayRowCount < getMaxResult()) {
|
||||
for (int i = 1; i < md.getColumnCount() + 1; i++) {
|
||||
msg.append(replaceReservedChars(isTableType, resultSet.getString(i)));
|
||||
if (i != md.getColumnCount()) {
|
||||
msg.append(TAB);
|
||||
}
|
||||
}
|
||||
msg.append(NEWLINE);
|
||||
displayRowCount++;
|
||||
}
|
||||
} else {
|
||||
// Response contains either an update count or there are no results.
|
||||
int updateCount = statement.getUpdateCount();
|
||||
msg.append(UPDATE_COUNT_HEADER).append(NEWLINE);
|
||||
msg.append(updateCount).append(NEWLINE);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if (resultSet != null) {
|
||||
resultSet.close();
|
||||
}
|
||||
statement.close();
|
||||
} finally {
|
||||
statement = null;
|
||||
}
|
||||
}
|
||||
|
||||
return new InterpreterResult(Code.SUCCESS, msg.toString());
|
||||
|
||||
} catch (SQLException ex) {
|
||||
logger.error("Cannot run " + sql, ex);
|
||||
return new InterpreterResult(Code.ERROR, ex.getMessage());
|
||||
} catch (ClassNotFoundException e) {
|
||||
logger.error("Cannot run " + sql, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For %table response replace Tab and Newline characters from the content.
|
||||
*/
|
||||
private String replaceReservedChars(boolean isTableResponseType, String str) {
|
||||
if (str == null) {
|
||||
return EMPTY_COLUMN_VALUE;
|
||||
}
|
||||
return (!isTableResponseType) ? str : str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run SQL command '{}'", cmd);
|
||||
String propertyKey = getPropertyKey(cmd);
|
||||
|
||||
if (null != propertyKey) {
|
||||
cmd = cmd.substring(propertyKey.length() + 2);
|
||||
} else {
|
||||
propertyKey = DEFAULT_KEY;
|
||||
}
|
||||
|
||||
cmd = cmd.trim();
|
||||
|
||||
logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
|
||||
|
||||
return executeSql(propertyKey, cmd, contextInterpreter);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
|
||||
logger.info("Cancel current query statement.");
|
||||
|
||||
String paragraphId = context.getParagraphId();
|
||||
try {
|
||||
paragraphIdStatementMap.get(paragraphId).cancel();
|
||||
} catch (SQLException e) {
|
||||
logger.error("Error while cancelling...", e);
|
||||
}
|
||||
}
|
||||
|
||||
public String getPropertyKey(String cmd) {
|
||||
int firstLineIndex = cmd.indexOf("\n");
|
||||
if (-1 == firstLineIndex) {
|
||||
firstLineIndex = cmd.length();
|
||||
}
|
||||
int configStartIndex = cmd.indexOf("(");
|
||||
int configLastIndex = cmd.indexOf(")");
|
||||
if (configStartIndex != -1 && configLastIndex != -1
|
||||
&& configLastIndex < firstLineIndex && configLastIndex < firstLineIndex) {
|
||||
return cmd.substring(configStartIndex + 1, configLastIndex);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
JDBCInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String buf, int cursor) {
|
||||
return null;
|
||||
}
|
||||
|
||||
public int getMaxResult() {
|
||||
return Integer.valueOf(
|
||||
propertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,125 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||
* copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
* or implied. See the License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.zeppelin.jdbc;
|
||||
|
||||
import static java.lang.String.format;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_KEY;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_DRIVER;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_PASSWORD;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.COMMON_MAX_LINE;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.jdbc.JDBCInterpreter;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.mockrunner.jdbc.BasicJDBCTestCaseAdapter;
|
||||
/**
|
||||
* JDBC interpreter unit tests
|
||||
*/
|
||||
public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
||||
|
||||
static String jdbcConnection;
|
||||
|
||||
private static String getJdbcConnection() throws IOException {
|
||||
if(null == jdbcConnection) {
|
||||
Path tmpDir = Files.createTempDirectory("h2-test-");
|
||||
tmpDir.toFile().deleteOnExit();
|
||||
jdbcConnection = format("jdbc:h2:%s", tmpDir);
|
||||
}
|
||||
return jdbcConnection;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
||||
Class.forName("org.h2.Driver");
|
||||
Connection connection = DriverManager.getConnection(getJdbcConnection());
|
||||
Statement statement = connection.createStatement();
|
||||
statement.execute(
|
||||
"DROP TABLE IF EXISTS test_table; " +
|
||||
"CREATE TABLE test_table(id varchar(255), name varchar(255));");
|
||||
statement.execute(
|
||||
"insert into test_table(id, name) values ('a', 'a_name'),('b', 'b_name');"
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultProperties() throws SQLException {
|
||||
JDBCInterpreter jdbcInterpreter = new JDBCInterpreter(new Properties());
|
||||
|
||||
assertEquals("org.postgresql.Driver", jdbcInterpreter.getProperty(DEFAULT_DRIVER));
|
||||
assertEquals("jdbc:postgresql://localhost:5432/", jdbcInterpreter.getProperty(DEFAULT_URL));
|
||||
assertEquals("gpadmin", jdbcInterpreter.getProperty(DEFAULT_USER));
|
||||
assertEquals("", jdbcInterpreter.getProperty(DEFAULT_PASSWORD));
|
||||
assertEquals("1000", jdbcInterpreter.getProperty(COMMON_MAX_LINE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectQuery() throws SQLException, IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
properties.setProperty("default.user", "");
|
||||
properties.setProperty("default.password", "");
|
||||
JDBCInterpreter t = new JDBCInterpreter(properties);
|
||||
t.open();
|
||||
|
||||
String sqlQuery = "select * from test_table";
|
||||
|
||||
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null));
|
||||
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
|
||||
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectQueryMaxResult() throws SQLException, IOException {
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
properties.setProperty("default.user", "");
|
||||
properties.setProperty("default.password", "");
|
||||
JDBCInterpreter t = new JDBCInterpreter(properties);
|
||||
t.open();
|
||||
|
||||
String sqlQuery = "select * from test_table";
|
||||
|
||||
InterpreterResult interpreterResult = t.interpret(sqlQuery, new InterpreterContext("", "1", "","", null,null,null,null));
|
||||
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
|
||||
assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message());
|
||||
}
|
||||
}
|
||||
1
pom.xml
1
pom.xml
|
|
@ -94,6 +94,7 @@
|
|||
<module>hive</module>
|
||||
<module>phoenix</module>
|
||||
<module>postgresql</module>
|
||||
<module>jdbc</module>
|
||||
<module>tajo</module>
|
||||
<module>flink</module>
|
||||
<module>ignite</module>
|
||||
|
|
|
|||
|
|
@ -402,6 +402,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.shell.ShellInterpreter,"
|
||||
+ "org.apache.zeppelin.hive.HiveInterpreter,"
|
||||
+ "org.apache.zeppelin.phoenix.PhoenixInterpreter,"
|
||||
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
|
||||
+ "org.apache.zeppelin.tajo.TajoInterpreter,"
|
||||
+ "org.apache.zeppelin.flink.FlinkInterpreter,"
|
||||
+ "org.apache.zeppelin.ignite.IgniteInterpreter,"
|
||||
|
|
@ -409,10 +410,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.lens.LensInterpreter,"
|
||||
+ "org.apache.zeppelin.cassandra.CassandraInterpreter,"
|
||||
+ "org.apache.zeppelin.geode.GeodeOqlInterpreter,"
|
||||
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
|
||||
+ "org.apache.zeppelin.kylin.KylinInterpreter,"
|
||||
+ "org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,"
|
||||
+ "org.apache.zeppelin.scalding.ScaldingInterpreter"),
|
||||
+ "org.apache.zeppelin.scalding.ScaldingInterpreter,"
|
||||
+ "org.apache.zeppelin.jdbc.JDBCInterpreter"),
|
||||
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
|
||||
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
|
|
|
|||
Loading…
Reference in a new issue