mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' into ZEPPELIN-1619-rebased
This commit is contained in:
commit
c7b187f9d2
16 changed files with 331 additions and 128 deletions
10
.travis.yml
10
.travis.yml
|
|
@ -42,19 +42,19 @@ matrix:
|
|||
|
||||
# Test all modules with spark 2.0.2 and scala 2.11
|
||||
- jdk: "oraclejdk7"
|
||||
env: SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
|
||||
env: SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
|
||||
|
||||
# Test all modules with spark 2.1.0 and scala 2.11
|
||||
- jdk: "oraclejdk7"
|
||||
env: SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
|
||||
env: SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" PROFILE="-Pspark-2.0 -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
|
||||
|
||||
# Test all modules with scala 2.10
|
||||
- jdk: "oraclejdk7"
|
||||
env: SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Pbeam -Pexamples -Pscala-2.10" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
|
||||
env: SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Pbeam -Phelium-dev -Pexamples -Pscala-2.10" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
|
||||
|
||||
# Test all modules with scala 2.11
|
||||
- jdk: "oraclejdk7"
|
||||
env: SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
|
||||
env: SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.6 -Ppyspark -Psparkr -Pscalding -Phelium-dev -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify -Pusing-packaged-distr -DskipRat" TEST_PROJECTS=""
|
||||
|
||||
# Test spark module for 1.5.2
|
||||
- jdk: "oraclejdk7"
|
||||
|
|
@ -66,7 +66,7 @@ matrix:
|
|||
|
||||
# Test selenium with spark module for 1.6.3
|
||||
- jdk: "oraclejdk7"
|
||||
env: TEST_SELENIUM="true" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop-2.6 -Ppyspark -Pexamples" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.AbstractFunctionalSuite -DfailIfNoTests=false"
|
||||
env: TEST_SELENIUM="true" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" PROFILE="-Pspark-1.6 -Phadoop-2.6 -Ppyspark -Phelium-dev -Pexamples" BUILD_FLAG="package -DskipTests -DskipRat" TEST_FLAG="verify -DskipRat" TEST_PROJECTS="-pl zeppelin-interpreter,zeppelin-zengine,zeppelin-server,zeppelin-display,spark-dependencies,spark -Dtest=org.apache.zeppelin.AbstractFunctionalSuite -DfailIfNoTests=false"
|
||||
|
||||
# Test python/pyspark with python 2
|
||||
- jdk: "oraclejdk7"
|
||||
|
|
|
|||
|
|
@ -60,7 +60,7 @@ You can check example applications under [./zeppelin-examples](https://github.co
|
|||
|
||||
In the development mode, you can run your Application in your IDE as a normal java application and see the result inside of Zeppelin notebook.
|
||||
|
||||
`org.apache.zeppelin.interpreter.dev.ZeppelinApplicationDevServer` can run Zeppelin Application in development mode.
|
||||
`org.apache.zeppelin.helium.ZeppelinApplicationDevServer` can run Zeppelin Application in development mode.
|
||||
|
||||
```java
|
||||
|
||||
|
|
@ -73,7 +73,7 @@ public static void main(String[] args) throws Exception {
|
|||
|
||||
// run application in devlopment mode with given resource
|
||||
// in this case, Clock.class.getName() will be the application class name
|
||||
ZeppelinApplicationDevServer devServer = new ZeppelinApplicationDevServer(
|
||||
org.apache.zeppelin.helium.ZeppelinApplicationDevServer devServer = new org.apache.zeppelin.helium.ZeppelinApplicationDevServer(
|
||||
Clock.class.getName(), pool.getAll());
|
||||
|
||||
// start development mode
|
||||
|
|
|
|||
89
helium-dev/pom.xml
Normal file
89
helium-dev/pom.xml
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<version>0.7.0-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>helium-dev</artifactId>
|
||||
<version>0.7.0-SNAPSHOT</version>
|
||||
<name>Zeppelin: Helium development interpreter</name>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/helium-dev</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/helium-dev</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
</project>
|
||||
|
|
@ -14,7 +14,8 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.dev;
|
||||
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
|
@ -48,7 +49,7 @@ public class DevInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
/**
|
||||
* event handler for ZeppelinApplicationDevServer
|
||||
* event handler for org.apache.zeppelin.helium.ZeppelinApplicationDevServer
|
||||
*/
|
||||
public static interface InterpreterEvent {
|
||||
public InterpreterResult interpret(String st, InterpreterContext context);
|
||||
|
|
@ -14,23 +14,20 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.dev;
|
||||
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.log4j.ConsoleAppender;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.PatternLayout;
|
||||
import org.apache.zeppelin.helium.*;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
import org.apache.zeppelin.resource.WellKnownResourceName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -47,7 +44,7 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
|
|||
|
||||
public ZeppelinApplicationDevServer(final String className, ResourceSet resourceSet) throws
|
||||
Exception {
|
||||
this(ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT, className, resourceSet);
|
||||
this(Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT, className, resourceSet);
|
||||
}
|
||||
|
||||
public ZeppelinApplicationDevServer(int port, String className, ResourceSet resourceSet) throws
|
||||
|
|
@ -14,16 +14,16 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.dev;
|
||||
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.helium.DevInterpreter.InterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.dev.DevInterpreter.InterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -34,11 +34,10 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class ZeppelinDevServer extends
|
||||
RemoteInterpreterServer implements InterpreterEvent, InterpreterOutputChangeListener {
|
||||
final Logger logger = LoggerFactory.getLogger(ZeppelinDevServer.class);
|
||||
public static final int DEFAULT_TEST_INTERPRETER_PORT = 29914;
|
||||
private static final Logger logger = LoggerFactory.getLogger(ZeppelinDevServer.class);
|
||||
|
||||
DevInterpreter interpreter = null;
|
||||
InterpreterOutput out;
|
||||
private DevInterpreter interpreter = null;
|
||||
private InterpreterOutput out;
|
||||
public ZeppelinDevServer(int port) throws TException {
|
||||
super(port);
|
||||
}
|
||||
|
|
@ -47,21 +46,21 @@ public class ZeppelinDevServer extends
|
|||
protected Interpreter getInterpreter(String sessionKey, String className) throws TException {
|
||||
synchronized (this) {
|
||||
InterpreterGroup interpreterGroup = getInterpreterGroup();
|
||||
if (interpreterGroup == null) {
|
||||
if (interpreterGroup == null || interpreterGroup.isEmpty()) {
|
||||
createInterpreter(
|
||||
"dev",
|
||||
sessionKey,
|
||||
DevInterpreter.class.getName(),
|
||||
new HashMap<String, String>(),
|
||||
"anonymous");
|
||||
|
||||
Interpreter intp = super.getInterpreter(sessionKey, className);
|
||||
interpreter = (DevInterpreter) (
|
||||
((LazyOpenInterpreter) intp).getInnerInterpreter());
|
||||
interpreter.setInterpreterEvent(this);
|
||||
notify();
|
||||
}
|
||||
}
|
||||
|
||||
Interpreter intp = super.getInterpreter(sessionKey, className);
|
||||
interpreter = (DevInterpreter) (
|
||||
((LazyOpenInterpreter) intp).getInnerInterpreter());
|
||||
interpreter.setInterpreterEvent(this);
|
||||
return super.getInterpreter(sessionKey, className);
|
||||
}
|
||||
|
||||
19
helium-dev/src/main/resources/interpreter-setting.json
Normal file
19
helium-dev/src/main/resources/interpreter-setting.json
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
[
|
||||
{
|
||||
"group": "dev",
|
||||
"name": "dev",
|
||||
"className": "org.apache.zeppelin.helium.DevInterpreter",
|
||||
"properties": {
|
||||
"port": {
|
||||
"envName": "PORT",
|
||||
"propertyName": "port",
|
||||
"defaultValue": "jdbc:postgresql://localhost:5432/",
|
||||
"description": "The URL for JDBC."
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
"language": "helium",
|
||||
"editOnDblClick": false
|
||||
}
|
||||
}
|
||||
]
|
||||
|
|
@ -15,12 +15,26 @@
|
|||
package org.apache.zeppelin.jdbc;
|
||||
|
||||
import static org.apache.commons.lang.StringUtils.containsIgnoreCase;
|
||||
import java.io.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.sql.*;
|
||||
import java.util.*;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.dbcp2.ConnectionFactory;
|
||||
import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
|
||||
|
|
@ -30,10 +44,7 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.apache.commons.pool2.ObjectPool;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPool;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
|
||||
|
|
@ -443,6 +454,57 @@ public class JDBCInterpreter extends Interpreter {
|
|||
return updatedCount < 0 && columnCount <= 0 ? true : false;
|
||||
}
|
||||
|
||||
/*
|
||||
inspired from https://github.com/postgres/pgadmin3/blob/794527d97e2e3b01399954f3b79c8e2585b908dd/
|
||||
pgadmin/dlg/dlgProperty.cpp#L999-L1045
|
||||
*/
|
||||
protected ArrayList<String> splitSqlQueries(String sql) {
|
||||
ArrayList<String> queries = new ArrayList<>();
|
||||
StringBuilder query = new StringBuilder();
|
||||
Character character;
|
||||
|
||||
Boolean antiSlash = false;
|
||||
Boolean quoteString = false;
|
||||
Boolean doubleQuoteString = false;
|
||||
|
||||
for (int item = 0; item < sql.length(); item++) {
|
||||
character = sql.charAt(item);
|
||||
|
||||
if (character.equals('\\')) {
|
||||
antiSlash = true;
|
||||
}
|
||||
if (character.equals('\'')) {
|
||||
if (antiSlash) {
|
||||
antiSlash = false;
|
||||
} else if (quoteString) {
|
||||
quoteString = false;
|
||||
} else if (!doubleQuoteString) {
|
||||
quoteString = true;
|
||||
}
|
||||
}
|
||||
if (character.equals('"')) {
|
||||
if (antiSlash) {
|
||||
antiSlash = false;
|
||||
} else if (doubleQuoteString) {
|
||||
doubleQuoteString = false;
|
||||
} else if (!quoteString) {
|
||||
doubleQuoteString = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (character.equals(';') && !antiSlash && !quoteString && !doubleQuoteString) {
|
||||
queries.add(query.toString());
|
||||
query = new StringBuilder();
|
||||
} else if (item == sql.length() - 1) {
|
||||
query.append(character);
|
||||
queries.add(query.toString());
|
||||
} else {
|
||||
query.append(character);
|
||||
}
|
||||
}
|
||||
return queries;
|
||||
}
|
||||
|
||||
private InterpreterResult executeSql(String propertyKey, String sql,
|
||||
InterpreterContext interpreterContext) {
|
||||
Connection connection;
|
||||
|
|
@ -451,60 +513,68 @@ public class JDBCInterpreter extends Interpreter {
|
|||
String paragraphId = interpreterContext.getParagraphId();
|
||||
String user = interpreterContext.getAuthenticationInfo().getUser();
|
||||
|
||||
try {
|
||||
String results = null;
|
||||
connection = getConnection(propertyKey, interpreterContext);
|
||||
InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
|
||||
|
||||
try {
|
||||
connection = getConnection(propertyKey, interpreterContext);
|
||||
if (connection == null) {
|
||||
return new InterpreterResult(Code.ERROR, "Prefix not found.");
|
||||
}
|
||||
|
||||
statement = connection.createStatement();
|
||||
if (statement == null) {
|
||||
return new InterpreterResult(Code.ERROR, "Prefix not found.");
|
||||
}
|
||||
ArrayList<String> multipleSqlArray = splitSqlQueries(sql);
|
||||
for (int i = 0; i < multipleSqlArray.size(); i++) {
|
||||
String sqlToExecute = multipleSqlArray.get(i);
|
||||
statement = connection.createStatement();
|
||||
if (statement == null) {
|
||||
return new InterpreterResult(Code.ERROR, "Prefix not found.");
|
||||
}
|
||||
|
||||
try {
|
||||
getJDBCConfiguration(user).saveStatement(paragraphId, statement);
|
||||
try {
|
||||
getJDBCConfiguration(user).saveStatement(paragraphId, statement);
|
||||
|
||||
boolean isResultSetAvailable = statement.execute(sql);
|
||||
if (isResultSetAvailable) {
|
||||
resultSet = statement.getResultSet();
|
||||
boolean isResultSetAvailable = statement.execute(sqlToExecute);
|
||||
if (isResultSetAvailable) {
|
||||
resultSet = statement.getResultSet();
|
||||
|
||||
// Regards that the command is DDL.
|
||||
if (isDDLCommand(statement.getUpdateCount(), resultSet.getMetaData().getColumnCount())) {
|
||||
results = "Query executed successfully.";
|
||||
// Regards that the command is DDL.
|
||||
if (isDDLCommand(statement.getUpdateCount(),
|
||||
resultSet.getMetaData().getColumnCount())) {
|
||||
interpreterResult.add(InterpreterResult.Type.TEXT,
|
||||
"Query executed successfully.");
|
||||
} else {
|
||||
interpreterResult.add(
|
||||
getResults(resultSet, !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE)));
|
||||
}
|
||||
} else {
|
||||
results = getResults(resultSet, !containsIgnoreCase(sql, EXPLAIN_PREDICATE));
|
||||
// Response contains either an update count or there are no results.
|
||||
int updateCount = statement.getUpdateCount();
|
||||
interpreterResult.add(InterpreterResult.Type.TEXT,
|
||||
"Query executed successfully. Affected rows : " +
|
||||
updateCount);
|
||||
}
|
||||
} finally {
|
||||
if (resultSet != null) {
|
||||
try {
|
||||
resultSet.close();
|
||||
} catch (SQLException e) { /*ignored*/ }
|
||||
}
|
||||
if (statement != null) {
|
||||
try {
|
||||
statement.close();
|
||||
} catch (SQLException e) { /*ignored*/ }
|
||||
}
|
||||
} else {
|
||||
// Response contains either an update count or there are no results.
|
||||
int updateCount = statement.getUpdateCount();
|
||||
results = "Query executed successfully. Affected rows : " + updateCount;
|
||||
}
|
||||
//In case user ran an insert/update/upsert statement
|
||||
if (connection.getAutoCommit() != true) connection.commit();
|
||||
|
||||
} finally {
|
||||
if (resultSet != null) {
|
||||
try {
|
||||
resultSet.close();
|
||||
} catch (SQLException e) { /*ignored*/ }
|
||||
}
|
||||
if (statement != null) {
|
||||
try {
|
||||
statement.close();
|
||||
} catch (SQLException e) { /*ignored*/ }
|
||||
}
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (SQLException e) { /*ignored*/ }
|
||||
}
|
||||
getJDBCConfiguration(user).removeStatement(paragraphId);
|
||||
}
|
||||
return new InterpreterResult(Code.SUCCESS, results);
|
||||
|
||||
//In case user ran an insert/update/upsert statement
|
||||
if (connection != null) {
|
||||
try {
|
||||
if (!connection.getAutoCommit()) {
|
||||
connection.commit();
|
||||
}
|
||||
connection.close();
|
||||
} catch (SQLException e) { /*ignored*/ }
|
||||
}
|
||||
getJDBCConfiguration(user).removeStatement(paragraphId);
|
||||
} catch (Exception e) {
|
||||
logger.error("Cannot run " + sql, e);
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
|
|
@ -517,9 +587,10 @@ public class JDBCInterpreter extends Interpreter {
|
|||
} catch (SQLException e1) {
|
||||
e1.printStackTrace();
|
||||
}
|
||||
|
||||
return new InterpreterResult(Code.ERROR, errorMsg);
|
||||
interpreterResult.add(errorMsg);
|
||||
return new InterpreterResult(Code.ERROR, interpreterResult.message());
|
||||
}
|
||||
return interpreterResult;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -15,9 +15,6 @@
|
|||
package org.apache.zeppelin.jdbc;
|
||||
|
||||
import static java.lang.String.format;
|
||||
import static org.apache.zeppelin.interpreter.Interpreter.logger;
|
||||
import static org.apache.zeppelin.interpreter.Interpreter.register;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_KEY;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_DRIVER;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_PASSWORD;
|
||||
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER;
|
||||
|
|
@ -29,19 +26,17 @@ import java.io.IOException;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.jdbc.JDBCInterpreter;
|
||||
import org.apache.zeppelin.scheduler.FIFOScheduler;
|
||||
import org.apache.zeppelin.scheduler.ParallelScheduler;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.apache.zeppelin.user.Credentials;
|
||||
import org.apache.zeppelin.user.UserCredentials;
|
||||
import org.apache.zeppelin.user.UsernamePassword;
|
||||
import org.junit.Before;
|
||||
|
|
@ -171,6 +166,49 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
|
|||
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSplitSqlQuery() throws SQLException, IOException {
|
||||
String sqlQuery = "insert into test_table(id, name) values ('a', ';\"');" +
|
||||
"select * from test_table;" +
|
||||
"select * from test_table WHERE ID = \";'\";" +
|
||||
"select * from test_table WHERE ID = ';'";
|
||||
|
||||
Properties properties = new Properties();
|
||||
JDBCInterpreter t = new JDBCInterpreter(properties);
|
||||
t.open();
|
||||
ArrayList<String> multipleSqlArray = t.splitSqlQueries(sqlQuery);
|
||||
assertEquals(4, multipleSqlArray.size());
|
||||
assertEquals("insert into test_table(id, name) values ('a', ';\"')", multipleSqlArray.get(0));
|
||||
assertEquals("select * from test_table", multipleSqlArray.get(1));
|
||||
assertEquals("select * from test_table WHERE ID = \";'\"", multipleSqlArray.get(2));
|
||||
assertEquals("select * from test_table WHERE ID = ';'", multipleSqlArray.get(3));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectMultipleQuries() throws SQLException, IOException {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("common.max_count", "1000");
|
||||
properties.setProperty("common.max_retry", "3");
|
||||
properties.setProperty("default.driver", "org.h2.Driver");
|
||||
properties.setProperty("default.url", getJdbcConnection());
|
||||
properties.setProperty("default.user", "");
|
||||
properties.setProperty("default.password", "");
|
||||
JDBCInterpreter t = new JDBCInterpreter(properties);
|
||||
t.open();
|
||||
|
||||
String sqlQuery = "select * from test_table;" +
|
||||
"select * from test_table WHERE ID = ';';";
|
||||
InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
|
||||
assertEquals(2, interpreterResult.message().size());
|
||||
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
|
||||
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\nc\tnull\n", interpreterResult.message().get(0).getData());
|
||||
|
||||
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(1).getType());
|
||||
assertEquals("ID\tNAME\n", interpreterResult.message().get(1).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSelectQueryWithNull() throws SQLException, IOException {
|
||||
Properties properties = new Properties();
|
||||
|
|
|
|||
7
pom.xml
7
pom.xml
|
|
@ -726,6 +726,13 @@
|
|||
</modules>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>helium-dev</id>
|
||||
<modules>
|
||||
<module>helium-dev</module>
|
||||
</modules>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>build-distr</id>
|
||||
<activation>
|
||||
|
|
|
|||
|
|
@ -34,10 +34,16 @@
|
|||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>helium-dev</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ package org.apache.zeppelin.example.app.clock;
|
|||
import org.apache.zeppelin.helium.Application;
|
||||
import org.apache.zeppelin.helium.ApplicationContext;
|
||||
import org.apache.zeppelin.helium.ApplicationException;
|
||||
import org.apache.zeppelin.interpreter.dev.ZeppelinApplicationDevServer;
|
||||
import org.apache.zeppelin.helium.ZeppelinApplicationDevServer;
|
||||
import org.apache.zeppelin.resource.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
|
|||
|
|
@ -38,6 +38,12 @@
|
|||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>helium-dev</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
|
|
|
|||
|
|
@ -28,4 +28,6 @@ public class Constants {
|
|||
|
||||
public static final String EXISTING_PROCESS = "existing_process";
|
||||
|
||||
public static final int ZEPPELIN_INTERPRETER_DEFAUlT_PORT = 29914;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,16 +17,12 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.rmi.server.RemoteServer;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
|
|
@ -39,7 +35,6 @@ import org.apache.zeppelin.interpreter.*;
|
|||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
|
||||
import org.apache.zeppelin.interpreter.thrift.*;
|
||||
import org.apache.zeppelin.resource.*;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
|
|
@ -144,7 +139,7 @@ public class RemoteInterpreterServer
|
|||
public static void main(String[] args)
|
||||
throws TTransportException, InterruptedException {
|
||||
|
||||
int port = ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT;
|
||||
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
|
||||
if (args.length > 0) {
|
||||
port = Integer.parseInt(args[0]);
|
||||
}
|
||||
|
|
@ -442,7 +437,7 @@ public class RemoteInterpreterServer
|
|||
public void onPreExecute(String script) {
|
||||
String cmdDev = interpreter.getHook(noteId, HookType.PRE_EXEC_DEV);
|
||||
String cmdUser = interpreter.getHook(noteId, HookType.PRE_EXEC);
|
||||
|
||||
|
||||
// User defined hook should be executed before dev hook
|
||||
List<String> cmds = Arrays.asList(cmdDev, cmdUser);
|
||||
for (String cmd : cmds) {
|
||||
|
|
@ -450,15 +445,15 @@ public class RemoteInterpreterServer
|
|||
script = cmd + '\n' + script;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
InterpretJob.this.script = script;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onPostExecute(String script) {
|
||||
String cmdDev = interpreter.getHook(noteId, HookType.POST_EXEC_DEV);
|
||||
String cmdUser = interpreter.getHook(noteId, HookType.POST_EXEC);
|
||||
|
||||
|
||||
// User defined hook should be executed after dev hook
|
||||
List<String> cmds = Arrays.asList(cmdUser, cmdDev);
|
||||
for (String cmd : cmds) {
|
||||
|
|
@ -466,7 +461,7 @@ public class RemoteInterpreterServer
|
|||
script += '\n' + cmd;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
InterpretJob.this.script = script;
|
||||
}
|
||||
};
|
||||
|
|
@ -478,7 +473,7 @@ public class RemoteInterpreterServer
|
|||
protected Object jobRun() throws Throwable {
|
||||
try {
|
||||
InterpreterContext.set(context);
|
||||
|
||||
|
||||
// Open the interpreter instance prior to calling interpret().
|
||||
// This is necessary because the earliest we can register a hook
|
||||
// is from within the open() method.
|
||||
|
|
@ -486,7 +481,7 @@ public class RemoteInterpreterServer
|
|||
if (!lazy.isOpen()) {
|
||||
lazy.open();
|
||||
}
|
||||
|
||||
|
||||
// Add hooks to script from registry.
|
||||
// Global scope first, followed by notebook scope
|
||||
processInterpreterHooks(null);
|
||||
|
|
|
|||
|
|
@ -74,8 +74,6 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
|
||||
import org.apache.zeppelin.interpreter.dev.DevInterpreter;
|
||||
import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
||||
|
|
@ -1372,11 +1370,6 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
// dev interpreter
|
||||
if (DevInterpreter.isInterpreterName(replName)) {
|
||||
return getDevInterpreter();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -1453,24 +1446,4 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
return editor;
|
||||
}
|
||||
|
||||
private Interpreter getDevInterpreter() {
|
||||
if (devInterpreter == null) {
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
option.setRemote(true);
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup("dev", option);
|
||||
|
||||
devInterpreter = connectToRemoteRepl("dev", DevInterpreter.class.getName(), "localhost",
|
||||
ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT, new Properties(), "dev", "anonymous",
|
||||
false);
|
||||
|
||||
LinkedList<Interpreter> intpList = new LinkedList<>();
|
||||
intpList.add(devInterpreter);
|
||||
interpreterGroup.put("dev", intpList);
|
||||
|
||||
devInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
}
|
||||
return devInterpreter;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue