Merge branch 'master' into extends-zrun-remote-transaction

# Conflicts:
#	zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
#	zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java
#	zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java
#	zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java
#	zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java
#	zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java
#	zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java
This commit is contained in:
CloverHearts 2016-11-25 23:07:25 +09:00
commit e6cd82c790
72 changed files with 4053 additions and 363 deletions

2484
_tools/maven-4.0.0.xsd Normal file

File diff suppressed because it is too large Load diff

View file

@ -43,7 +43,7 @@ public class BeamInterpreterTest {
Properties p = new Properties();
beam = new BeamInterpreter(p);
beam.open();
context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null,
context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null,
null);
}

View file

@ -190,7 +190,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.python.PythonInterpreterPandasSql,org.apache.zeppelin.python.PythonCondaInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter,org.apache.zeppelin.pig.PigQueryInterpreter,org.apache.zeppelin.scio.ScioInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.python.PythonInterpreterPandasSql,org.apache.zeppelin.python.PythonCondaInterpreter,org.apache.zeppelin.python.PythonDockerInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter,org.apache.zeppelin.pig.PigQueryInterpreter,org.apache.zeppelin.scio.ScioInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 164 KiB

After

Width:  |  Height:  |  Size: 56 KiB

View file

@ -95,7 +95,7 @@ Congratulations, you have successfully installed Apache Zeppelin! Here are few s
* Check [JDBC Interpreter](../interpreter/jdbc.html) to know more about configure and uses multiple JDBC data sources.
#### Zeppelin with Python ...
* Check [Python interpreter](../interpreter/python.html) to know more about Matplotlib, Pandas, Conda integration.
* Check [Python interpreter](../interpreter/python.html) to know more about Matplotlib, Pandas, Conda/Docker environment integration.
#### Multi-user environment ...

View file

@ -116,7 +116,11 @@ The JDBC interpreter properties are defined by default like below.
</tr>
</table>
If you want to connect other databases such as `Mysql`, `Redshift` and `Hive`, you need to edit the property values.
If you want to connect other databases such as `Mysql`, `Redshift` and `Hive`, you need to edit the property values.
You can also use [Credential](../security/datasource_authorization.html) for JDBC authentication.
If `default.user` and `default.password` properties are deleted(using X button) for database connection in the interpreter setting page,
the JDBC interpreter will get the account information from [Credential](../security/datasource_authorization.html).
The below example is for `Mysql` connection.
<img src="../assets/themes/zeppelin/img/docs-img/edit_properties.png" width="600px" />

View file

@ -56,10 +56,13 @@ The interpreter can only work if you already have python installed (the interpre
To access the help, type **help()**
## Python modules
## Python environments
### Default
By default, PythonInterpreter will use python command defined in `zeppelin.python` property to run python process.
The interpreter can use all modules already installed (with pip, easy_install...)
## Conda
### Conda
[Conda](http://conda.pydata.org/) is an package management system and environment management system for python.
`%python.conda` interpreter lets you change between environments.
@ -83,6 +86,32 @@ Deactivate
%python.conda deactivate
```
### Docker
`%python.docker` interpreter allows PythonInterpreter creates python process in a specified docker container.
#### Usage
Activate an environment
```
%python.docker activate [Repository]
%python.docker activate [Repository:Tag]
%python.docker activate [Image Id]
```
Deactivate
```
%python.docker deactivate
```
Example
```
# activate latest tensorflow image as a python environment
%python.docker activate gcr.io/tensorflow/tensorflow:latest
```
## Using Zeppelin Dynamic Forms
You can leverage [Zeppelin Dynamic Form]({{BASE_PATH}}/manual/dynamicform.html) inside your Python code.

View file

@ -37,7 +37,10 @@ You can add new credentials in the dropdown menu for your data source which can
<img class="img-responsive" src="../assets/themes/zeppelin/img/docs-img/credential_tab.png" width="180px"/>
**Entity** can be the key that distinguishes each credential sets. Type **Username & Password** for your own credentials. ex) user & password of Mysql
**Entity** can be the key that distinguishes each credential sets.(We suggest that the convention of the **Entity** is `[Interpreter Group].[Interpreter Name]`.)
Please see [what is interpreter group](../manual/interpreters.html#what-is-interpreter-group) for the detailed information.
Type **Username & Password** for your own credentials. ex) Mysql user & password of the JDBC Interpreter.
<img class="img-responsive" src="../assets/themes/zeppelin/img/docs-img/add_credential.png" />

View file

@ -40,7 +40,7 @@ public class FlinkInterpreterTest {
Properties p = new Properties();
flink = new FlinkInterpreter(p);
flink.open();
context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null);
context = new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null);
}
@AfterClass

View file

@ -40,7 +40,7 @@ public class IgniteInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";
private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null);
private IgniteInterpreter intp;
private Ignite ignite;

View file

@ -44,7 +44,7 @@ public class IgniteSqlInterpreterTest {
private static final String HOST = "127.0.0.1:47500..47509";
private static final InterpreterContext INTP_CONTEXT =
new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null);
new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null);
private Ignite ignite;
private IgniteSqlInterpreter intp;

View file

@ -36,6 +36,8 @@ import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.user.UserCredentials;
import org.apache.zeppelin.user.UsernamePassword;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -70,6 +72,9 @@ public class JDBCInterpreter extends Interpreter {
private Logger logger = LoggerFactory.getLogger(JDBCInterpreter.class);
static final String INTERPRETER_NAME = "jdbc";
static final String JDBC_DEFAULT_USER_KEY = "default.user";
static final String JDBC_DEFAULT_PASSWORD_KEY = "default.password";
static final String COMMON_KEY = "common";
static final String MAX_LINE_KEY = "max_count";
static final String MAX_LINE_DEFAULT = "1000";
@ -97,16 +102,12 @@ public class JDBCInterpreter extends Interpreter {
static final String EMPTY_COLUMN_VALUE = "";
private final String CONCURRENT_EXECUTION_KEY = "zeppelin.jdbc.concurrent.use";
private final String CONCURRENT_EXECUTION_COUNT = "zeppelin.jdbc.concurrent.max_connection";
private final String DBCP_STRING = "jdbc:apache:commons:dbcp:";
private final HashMap<String, Properties> propertiesMap;
private final Map<String, Statement> paragraphIdStatementMap;
private final Map<String, PoolingDriver> poolingDriverMap;
private final HashMap<String, Properties> basePropretiesMap;
private final HashMap<String, JDBCUserConfigurations> jdbcUserConfigurationsMap;
private final Map<String, SqlCompleter> propertyKeySqlCompleterMap;
private static final Function<CharSequence, InterpreterCompletion> sequenceToStringTransformer =
@ -120,14 +121,13 @@ public class JDBCInterpreter extends Interpreter {
public JDBCInterpreter(Properties property) {
super(property);
propertiesMap = new HashMap<>();
paragraphIdStatementMap = new HashMap<>();
poolingDriverMap = new HashMap<>();
jdbcUserConfigurationsMap = new HashMap<>();
propertyKeySqlCompleterMap = new HashMap<>();
basePropretiesMap = new HashMap<>();
}
public HashMap<String, Properties> getPropertiesMap() {
return propertiesMap;
return basePropretiesMap;
}
@Override
@ -137,21 +137,22 @@ public class JDBCInterpreter extends Interpreter {
String[] keyValue = propertyKey.split("\\.", 2);
if (2 == keyValue.length) {
logger.info("key: {}, value: {}", keyValue[0], keyValue[1]);
Properties prefixProperties;
if (propertiesMap.containsKey(keyValue[0])) {
prefixProperties = propertiesMap.get(keyValue[0]);
if (basePropretiesMap.containsKey(keyValue[0])) {
prefixProperties = basePropretiesMap.get(keyValue[0]);
} else {
prefixProperties = new Properties();
propertiesMap.put(keyValue[0], prefixProperties);
basePropretiesMap.put(keyValue[0], prefixProperties);
}
prefixProperties.put(keyValue[1], property.getProperty(propertyKey));
}
}
Set<String> removeKeySet = new HashSet<>();
for (String key : propertiesMap.keySet()) {
for (String key : basePropretiesMap.keySet()) {
if (!COMMON_KEY.equals(key)) {
Properties properties = propertiesMap.get(key);
Properties properties = basePropretiesMap.get(key);
if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) {
logger.error("{} will be ignored. {}.{} and {}.{} is mandatory.",
key, DRIVER_KEY, key, key, URL_KEY);
@ -161,15 +162,14 @@ public class JDBCInterpreter extends Interpreter {
}
for (String key : removeKeySet) {
propertiesMap.remove(key);
basePropretiesMap.remove(key);
}
logger.debug("propertiesMap: {}", propertiesMap);
logger.debug("JDBC PropretiesMap: {}", basePropretiesMap);
if (!StringUtils.isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
JDBCSecurityImpl.createSecureConfiguration(property);
}
for (String propertyKey : propertiesMap.keySet()) {
for (String propertyKey : basePropretiesMap.keySet()) {
propertyKeySqlCompleterMap.put(propertyKey, createSqlCompleter(null));
}
}
@ -191,128 +191,24 @@ public class JDBCInterpreter extends Interpreter {
return completer;
}
private boolean isConnectionInPool(String driverName) {
if (poolingDriverMap.containsKey(driverName)) return true;
return false;
}
private void createConnectionPool(String url, String propertyKey, Properties properties) {
ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(url, properties);
PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
connectionFactory, null);
ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory);
poolableConnectionFactory.setPool(connectionPool);
PoolingDriver driver = new PoolingDriver();
driver.registerPool(propertyKey, connectionPool);
poolingDriverMap.put(propertyKey, driver);
}
private Connection getConnectionFromPool(String url, String propertyKey, Properties properties)
throws SQLException {
if (!isConnectionInPool(propertyKey)) {
createConnectionPool(url, propertyKey, properties);
}
return DriverManager.getConnection(DBCP_STRING + propertyKey);
}
public Connection getConnection(String propertyKey, String user)
throws ClassNotFoundException, SQLException, InterpreterException {
Connection connection = null;
if (propertyKey == null || propertiesMap.get(propertyKey) == null) {
return null;
}
if (null == connection) {
final Properties properties = (Properties) propertiesMap.get(propertyKey).clone();
logger.info(properties.getProperty(DRIVER_KEY));
Class.forName(properties.getProperty(DRIVER_KEY));
final String url = properties.getProperty(URL_KEY);
if (StringUtils.isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
connection = DriverManager.getConnection(url, properties);
} else {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
switch (authType) {
case KERBEROS:
if (user == null) {
connection = getConnectionFromPool(url, propertyKey, properties);
} else {
if ("hive".equalsIgnoreCase(propertyKey)) {
connection = getConnectionFromPool(url + ";hive.server2.proxy.user=" + user,
propertyKey, properties);
} else {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getCurrentUser());
} catch (Exception e) {
logger.error("Error in createProxyUser", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
throw new InterpreterException(stringBuilder.toString());
}
final String poolKey = propertyKey;
try {
connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
@Override
public Connection run() throws Exception {
return getConnectionFromPool(url, poolKey, properties);
}
});
} catch (Exception e) {
logger.error("Error in doAs", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
throw new InterpreterException(stringBuilder.toString());
}
}
}
break;
default:
connection = getConnectionFromPool(url, propertyKey, properties);
}
}
}
propertyKeySqlCompleterMap.put(propertyKey, createSqlCompleter(connection));
return connection;
}
private void initStatementMap() {
for (Statement statement : paragraphIdStatementMap.values()) {
for (JDBCUserConfigurations configurations : jdbcUserConfigurationsMap.values()) {
try {
statement.close();
configurations.initStatementMap();
} catch (Exception e) {
logger.error("Error while closing paragraphIdStatementMap statement...", e);
}
}
paragraphIdStatementMap.clear();
}
private void initConnectionPoolMap() throws SQLException {
Iterator<String> it = poolingDriverMap.keySet().iterator();
while (it.hasNext()) {
String driverName = it.next();
poolingDriverMap.get(driverName).closePool(driverName);
it.remove();
private void initConnectionPoolMap() {
for (JDBCUserConfigurations configurations : jdbcUserConfigurationsMap.values()) {
try {
configurations.initConnectionPoolMap();
} catch (Exception e) {
logger.error("Error while closing initConnectionPoolMap...", e);
}
}
poolingDriverMap.clear();
}
private void saveStatement(String key, Statement statement) throws SQLException {
paragraphIdStatementMap.put(key, statement);
statement.setMaxRows(getMaxResult());
}
private void removeStatement(String key) {
paragraphIdStatementMap.remove(key);
}
@Override
@ -325,15 +221,182 @@ public class JDBCInterpreter extends Interpreter {
}
}
private String getEntityName(String replName) {
StringBuffer entityName = new StringBuffer();
entityName.append(INTERPRETER_NAME);
entityName.append(".");
entityName.append(replName);
return entityName.toString();
}
private String getJDBCDriverName(String user, String propertyKey) {
StringBuffer driverName = new StringBuffer();
driverName.append(DBCP_STRING);
driverName.append(propertyKey);
driverName.append(user);
return driverName.toString();
}
private boolean existAccountInBaseProperty() {
return property.containsKey(JDBC_DEFAULT_USER_KEY) &&
property.containsKey(JDBC_DEFAULT_PASSWORD_KEY);
}
private UsernamePassword getUsernamePassword(InterpreterContext interpreterContext,
String replName) {
UserCredentials uc = interpreterContext.getAuthenticationInfo().getUserCredentials();
if (uc != null) {
return uc.getUsernamePassword(replName);
}
return null;
}
public JDBCUserConfigurations getJDBCConfiguration(String user) {
JDBCUserConfigurations jdbcUserConfigurations =
jdbcUserConfigurationsMap.get(user);
if (jdbcUserConfigurations == null) {
jdbcUserConfigurations = new JDBCUserConfigurations();
jdbcUserConfigurationsMap.put(user, jdbcUserConfigurations);
}
return jdbcUserConfigurations;
}
private void closeDBPool(String user, String propertyKey) throws SQLException {
PoolingDriver poolingDriver = getJDBCConfiguration(user).removeDBDriverPool(propertyKey);
if (poolingDriver != null) {
poolingDriver.closePool(propertyKey + user);
}
}
private void setUserProperty(String propertyKey, InterpreterContext interpreterContext)
throws SQLException {
String user = interpreterContext.getAuthenticationInfo().getUser();
JDBCUserConfigurations jdbcUserConfigurations =
getJDBCConfiguration(user);
jdbcUserConfigurations.setPropertyMap(propertyKey, basePropretiesMap.get(propertyKey));
if (existAccountInBaseProperty()) {
return;
}
jdbcUserConfigurations.cleanUserProperty(propertyKey);
UsernamePassword usernamePassword = getUsernamePassword(interpreterContext,
getEntityName(interpreterContext.getReplName()));
if (usernamePassword != null) {
jdbcUserConfigurations.setUserProperty(propertyKey, usernamePassword);
} else {
closeDBPool(user, propertyKey);
}
}
private void createConnectionPool(String url, String user, String propertyKey,
Properties properties) throws SQLException, ClassNotFoundException {
ConnectionFactory connectionFactory =
new DriverManagerConnectionFactory(url, properties);
PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
connectionFactory, null);
ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory);
poolableConnectionFactory.setPool(connectionPool);
Class.forName(properties.getProperty(DRIVER_KEY));
PoolingDriver driver = new PoolingDriver();
driver.registerPool(propertyKey + user, connectionPool);
getJDBCConfiguration(user).saveDBDriverPool(propertyKey, driver);
}
private Connection getConnectionFromPool(String url, String user, String propertyKey,
Properties properties) throws SQLException, ClassNotFoundException {
String jdbcDriver = getJDBCDriverName(user, propertyKey);
if (!getJDBCConfiguration(user).isConnectionInDBDriverPool(propertyKey)) {
createConnectionPool(url, user, propertyKey, properties);
}
return DriverManager.getConnection(jdbcDriver);
}
public Connection getConnection(String propertyKey, InterpreterContext interpreterContext)
throws ClassNotFoundException, SQLException, InterpreterException {
final String user = interpreterContext.getAuthenticationInfo().getUser();
Connection connection;
if (propertyKey == null || basePropretiesMap.get(propertyKey) == null) {
return null;
}
JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user);
setUserProperty(propertyKey, interpreterContext);
final Properties properties = jdbcUserConfigurations.getPropertyMap(propertyKey);
final String url = properties.getProperty(URL_KEY);
if (StringUtils.isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
connection = getConnectionFromPool(url, user, propertyKey, properties);
} else {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
switch (authType) {
case KERBEROS:
if (user == null) {
connection = getConnectionFromPool(url, user, propertyKey, properties);
} else {
if ("hive".equalsIgnoreCase(propertyKey)) {
connection = getConnectionFromPool(url + ";hive.server2.proxy.user=" + user,
user, propertyKey, properties);
} else {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.createProxyUser(user,
UserGroupInformation.getCurrentUser());
} catch (Exception e) {
logger.error("Error in createProxyUser", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
throw new InterpreterException(stringBuilder.toString());
}
final String poolKey = propertyKey;
try {
connection = ugi.doAs(new PrivilegedExceptionAction<Connection>() {
@Override
public Connection run() throws Exception {
return getConnectionFromPool(url, user, poolKey, properties);
}
});
} catch (Exception e) {
logger.error("Error in doAs", e);
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getCause());
throw new InterpreterException(stringBuilder.toString());
}
}
}
break;
default:
connection = getConnectionFromPool(url, user, propertyKey, properties);
}
}
propertyKeySqlCompleterMap.put(propertyKey, createSqlCompleter(connection));
return connection;
}
private InterpreterResult executeSql(String propertyKey, String sql,
InterpreterContext interpreterContext) {
String paragraphId = interpreterContext.getParagraphId();
Connection connection;
Statement statement;
ResultSet resultSet = null;
String paragraphId = interpreterContext.getParagraphId();
String user = interpreterContext.getAuthenticationInfo().getUser();
try {
connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser());
connection = getConnection(propertyKey, interpreterContext);
if (connection == null) {
return new InterpreterResult(Code.ERROR, "Prefix not found.");
}
@ -354,8 +417,7 @@ public class JDBCInterpreter extends Interpreter {
}
try {
saveStatement(paragraphId +
interpreterContext.getAuthenticationInfo().getUser(), statement);
getJDBCConfiguration(user).saveStatement(paragraphId, statement);
boolean isResultSetAvailable = statement.execute(sql);
@ -413,8 +475,7 @@ public class JDBCInterpreter extends Interpreter {
connection.close();
} catch (SQLException e) { /*ignored*/ }
}
removeStatement(paragraphId +
interpreterContext.getAuthenticationInfo().getUser());
getJDBCConfiguration(user).removeStatement(paragraphId);
}
return new InterpreterResult(Code.SUCCESS, msg.toString());
@ -424,6 +485,13 @@ public class JDBCInterpreter extends Interpreter {
PrintStream ps = new PrintStream(baos);
e.printStackTrace(ps);
String errorMsg = new String(baos.toByteArray(), StandardCharsets.UTF_8);
try {
closeDBPool(user, propertyKey);
} catch (SQLException e1) {
e1.printStackTrace();
}
return new InterpreterResult(Code.ERROR, errorMsg);
}
}
@ -455,12 +523,12 @@ public class JDBCInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
logger.info("Cancel current query statement.");
String paragraphId = context.getParagraphId();
JDBCUserConfigurations jdbcUserConfigurations =
getJDBCConfiguration(context.getAuthenticationInfo().getUser());
try {
paragraphIdStatementMap.get(paragraphId + context.getAuthenticationInfo().getUser()).cancel();
jdbcUserConfigurations.cancelStatement(paragraphId);
} catch (SQLException e) {
logger.error("Error while cancelling...", e);
}
@ -517,7 +585,7 @@ public class JDBCInterpreter extends Interpreter {
public int getMaxResult() {
return Integer.valueOf(
propertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT));
basePropretiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY, MAX_LINE_DEFAULT));
}
boolean isConcurrentExecution() {

View file

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.zeppelin.jdbc;
import org.apache.commons.dbcp2.PoolingDriver;
import org.apache.zeppelin.user.UsernamePassword;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
/**
* UserConfigurations for JDBC impersonation.
*/
public class JDBCUserConfigurations {
private final Map<String, Statement> paragraphIdStatementMap;
private final Map<String, PoolingDriver> poolingDriverMap;
private final HashMap<String, Properties> propertiesMap;
public JDBCUserConfigurations() {
paragraphIdStatementMap = new HashMap<>();
poolingDriverMap = new HashMap<>();
propertiesMap = new HashMap<>();
}
public void initStatementMap() throws SQLException {
for (Statement statement : paragraphIdStatementMap.values()) {
statement.close();
}
paragraphIdStatementMap.clear();
}
public void initConnectionPoolMap() throws SQLException {
Iterator<String> it = poolingDriverMap.keySet().iterator();
while (it.hasNext()) {
String driverName = it.next();
poolingDriverMap.get(driverName).closePool(driverName);
it.remove();
}
poolingDriverMap.clear();
}
public void setPropertyMap(String key, Properties properties) {
Properties p = (Properties) properties.clone();
propertiesMap.put(key, p);
}
public Properties getPropertyMap(String key) {
return propertiesMap.get(key);
}
public void cleanUserProperty(String propertyKey) {
propertiesMap.get(propertyKey).remove("user");
propertiesMap.get(propertyKey).remove("password");
}
public void setUserProperty(String propertyKey, UsernamePassword usernamePassword) {
propertiesMap.get(propertyKey).setProperty("user", usernamePassword.getUsername());
propertiesMap.get(propertyKey).setProperty("password", usernamePassword.getPassword());
}
public void saveStatement(String key, Statement statement) throws SQLException {
paragraphIdStatementMap.put(key, statement);
}
public void cancelStatement(String key) throws SQLException {
paragraphIdStatementMap.get(key).cancel();
}
public void removeStatement(String key) {
paragraphIdStatementMap.remove(key);
}
public void saveDBDriverPool(String key, PoolingDriver driver) throws SQLException {
poolingDriverMap.put(key, driver);
}
public PoolingDriver removeDBDriverPool(String key) throws SQLException {
return poolingDriverMap.remove(key);
}
public boolean isConnectionInDBDriverPool(String key) {
return poolingDriverMap.containsKey(key);
}
}

View file

@ -16,20 +16,20 @@ package org.apache.zeppelin.jdbc;
import static java.lang.String.format;
import static org.apache.zeppelin.interpreter.Interpreter.logger;
import static org.junit.Assert.assertEquals;
import static org.apache.zeppelin.interpreter.Interpreter.register;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_KEY;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_DRIVER;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_PASSWORD;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_USER;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.DEFAULT_URL;
import static org.apache.zeppelin.jdbc.JDBCInterpreter.COMMON_MAX_LINE;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
@ -41,6 +41,9 @@ import org.apache.zeppelin.scheduler.FIFOScheduler;
import org.apache.zeppelin.scheduler.ParallelScheduler;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.user.Credentials;
import org.apache.zeppelin.user.UserCredentials;
import org.apache.zeppelin.user.UsernamePassword;
import org.junit.Before;
import org.junit.Test;
@ -75,7 +78,6 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
@Before
public void setUp() throws Exception {
Class.forName("org.h2.Driver");
Connection connection = DriverManager.getConnection(getJdbcConnection());
Statement statement = connection.createStatement();
@ -86,7 +88,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
PreparedStatement insertStatement = connection.prepareStatement("insert into test_table(id, name) values ('a', 'a_name'),('b', 'b_name'),('c', ?);");
insertStatement.setString(1, null);
insertStatement.execute();
interpreterContext = new InterpreterContext("", "1", "", "", new AuthenticationInfo(), null, null, null, null,
interpreterContext = new InterpreterContext("", "1", null, "", "", new AuthenticationInfo(), null, null, null, null,
null, null);
}
@ -251,7 +253,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
jdbcInterpreter.interpret("", interpreterContext);
List<InterpreterCompletion> completionList = jdbcInterpreter.completion("SEL", 0);
InterpreterCompletion correctCompletionKeyword = new InterpreterCompletion("SELECT", "SELECT");
assertEquals(2, completionList.size());
@ -259,4 +261,92 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
assertEquals(0, jdbcInterpreter.completion("SEL", 100).size());
}
}
private Properties getDBProperty(String dbUser, String dbPassowrd) throws IOException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");
properties.setProperty("default.driver", "org.h2.Driver");
properties.setProperty("default.url", getJdbcConnection());
if (dbUser != null) {
properties.setProperty("default.user", dbUser);
}
if (dbPassowrd != null) {
properties.setProperty("default.password", dbPassowrd);
}
return properties;
}
private AuthenticationInfo getUserAuth(String user, String entityName, String dbUser, String dbPassword){
UserCredentials userCredentials = new UserCredentials();
if (entityName != null && dbUser != null && dbPassword != null) {
UsernamePassword up = new UsernamePassword(dbUser, dbPassword);
userCredentials.putUsernamePassword(entityName, up);
}
AuthenticationInfo authInfo = new AuthenticationInfo();
authInfo.setUserCredentials(userCredentials);
authInfo.setUser(user);
return authInfo;
}
@Test
public void testMultiTenant() throws SQLException, IOException {
/**
* assume that the database user is 'dbuser' and password is 'dbpassword'
* 'jdbc1' interpreter has user('dbuser')/password('dbpassword') property
* 'jdbc2' interpreter doesn't have user/password property
* 'user1' doesn't have Credential information.
* 'user2' has 'jdbc2' Credential information that is same with database account.
*/
JDBCInterpreter jdbc1 = new JDBCInterpreter(getDBProperty("dbuser", "dbpassword"));
JDBCInterpreter jdbc2 = new JDBCInterpreter(getDBProperty(null, null));
AuthenticationInfo user1Credential = getUserAuth("user1", null, null, null);
AuthenticationInfo user2Credential = getUserAuth("user2", "jdbc.jdbc2", "dbuser", "dbpassword");
// user1 runs jdbc1
jdbc1.open();
InterpreterContext ctx1 = new InterpreterContext("", "1", "jdbc.jdbc1", "", "", user1Credential,
null, null, null, null, null, null);
jdbc1.interpret("", ctx1);
JDBCUserConfigurations user1JDBC1Conf = jdbc1.getJDBCConfiguration("user1");
assertEquals("dbuser", user1JDBC1Conf.getPropertyMap("default").get("user"));
assertEquals("dbpassword", user1JDBC1Conf.getPropertyMap("default").get("password"));
jdbc1.close();
// user1 runs jdbc2
jdbc2.open();
InterpreterContext ctx2 = new InterpreterContext("", "1", "jdbc.jdbc2", "", "", user1Credential,
null, null, null, null, null, null);
jdbc2.interpret("", ctx2);
JDBCUserConfigurations user1JDBC2Conf = jdbc2.getJDBCConfiguration("user1");
assertNull(user1JDBC2Conf.getPropertyMap("default").get("user"));
assertNull(user1JDBC2Conf.getPropertyMap("default").get("password"));
jdbc2.close();
// user2 runs jdbc1
jdbc1.open();
InterpreterContext ctx3 = new InterpreterContext("", "1", "jdbc.jdbc1", "", "", user2Credential,
null, null, null, null, null, null);
jdbc1.interpret("", ctx3);
JDBCUserConfigurations user2JDBC1Conf = jdbc1.getJDBCConfiguration("user2");
assertEquals("dbuser", user2JDBC1Conf.getPropertyMap("default").get("user"));
assertEquals("dbpassword", user2JDBC1Conf.getPropertyMap("default").get("password"));
jdbc1.close();
// user2 runs jdbc2
jdbc2.open();
InterpreterContext ctx4 = new InterpreterContext("", "1", "jdbc.jdbc2", "", "", user2Credential,
null, null, null, null, null, null);
jdbc2.interpret("", ctx4);
JDBCUserConfigurations user2JDBC2Conf = jdbc2.getJDBCConfiguration("user2");
assertNull(user2JDBC2Conf.getPropertyMap("default").get("user"));
assertNull(user2JDBC2Conf.getPropertyMap("default").get("password"));
jdbc2.close();
}
}

View file

@ -85,21 +85,7 @@ public class LensInterpreter extends Interpreter {
private LensClient m_lensClient;
static {
Interpreter.register(
"lens",
"lens",
LensInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION, "true", "Run concurrent Lens Sessions")
.add(ZEPPELIN_LENS_CONCURRENT_SESSIONS, "10",
"If concurrency is true then how many threads?")
.add(ZEPPELIN_MAX_ROWS, "1000", "max number of rows to display")
.add(LENS_SERVER_URL, "http://<hostname>:<port>/lensapi", "The URL for Lens Server")
.add(LENS_CLIENT_DBNAME, "default", "The database schema name")
.add(LENS_PERSIST_RESULTSET, "false", "Apache Lens to persist result in HDFS?")
.add(LENS_SESSION_CLUSTER_USER, "default", "Hadoop cluster username").build());
}
public LensInterpreter(Properties property) {
super(property);

View file

@ -0,0 +1,51 @@
[
{
"group": "lens",
"name": "lens",
"className": "org.apache.zeppelin.lens.LensInterpreter",
"properties": {
"zeppelin.lens.run.concurrent": {
"envName": null,
"propertyName": "zeppelin.lens.run.concurrent",
"defaultValue": "true",
"description": "Run concurrent Lens Sessions"
},
"zeppelin.lens.maxThreads": {
"envName": null,
"propertyName": "zeppelin.lens.maxThreads",
"defaultValue": "10",
"description": "If concurrency is true then how many threads?"
},
"zeppelin.lens.maxResults": {
"envName": null,
"propertyName": "zeppelin.lens.maxResults",
"defaultValue": "1000",
"description": "max number of rows to display"
},
"lens.server.base.url": {
"envName": null,
"propertyName": "lens.server.base.url",
"defaultValue": "http://<hostname>:<port>/lensapi",
"description": "The URL for Lens Server"
},
"lens.client.dbname": {
"envName": null,
"propertyName": "lens.client.dbname",
"defaultValue": "default",
"description": "The database schema name"
},
"lens.query.enable.persistent.resultset": {
"envName": null,
"propertyName": "lens.query.enable.persistent.resultset",
"defaultValue": "false",
"description": "Apache Lens to persist result in HDFS?"
},
"lens.session.cluster.user": {
"envName": null,
"propertyName": "lens.session.cluster.user",
"defaultValue": "default",
"description": "Hadoop cluster username"
}
}
}
]

View file

@ -83,7 +83,7 @@ public class LivyIntegrationTest {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "title",
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", null, "title",
"text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
@ -177,7 +177,7 @@ public class LivyIntegrationTest {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "title",
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", null, "title",
"text", authInfo, null, null, null, null, null, output);
pysparkInterpreter.open();
InterpreterResult result = pysparkInterpreter.interpret("sc.version", context);

View file

@ -47,7 +47,7 @@ public class PigInterpreterTest {
properties.put("zeppelin.pig.execType", "local");
pigInterpreter = new PigInterpreter(properties);
pigInterpreter.open();
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, null,
null, null);
}

View file

@ -65,7 +65,7 @@ public class PigQueryInterpreterTest {
pigInterpreter.open();
pigQueryInterpreter.open();
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null,
context = new InterpreterContext(null, "paragraph_id", null, null, null, null, null, null, null, null,
null, null);
}

38
pom.xml
View file

@ -385,6 +385,11 @@
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>xml-maven-plugin</artifactId>
</plugin>
<!--TODO(alex): make part of the build and reconcile conflicts
<plugin>
<groupId>com.ning.maven.plugins</groupId>
@ -422,11 +427,8 @@
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration combine.children="append">
<configuration>
<argLine>-Xmx2g -Xms1g -Dfile.encoding=UTF-8</argLine>
<encoding>UTF-8</encoding>
<inputEncoding>UTF-8</inputEncoding>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
<!-- <excludes> <exclude>**/itest/**</exclude> </excludes> <executions>
<execution> <id>surefire-itest</id> <phase>integration-test</phase> <goals>
@ -469,6 +471,33 @@
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>xml-maven-plugin</artifactId>
<version>1.0.1</version>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>validate</goal>
</goals>
</execution>
</executions>
<configuration>
<validationSets>
<validationSet>
<dir>${project.basedir}</dir>
<includes>
<include>
pom.xml
</include>
</includes>
<systemId>_tools/maven-4.0.0.xsd</systemId>
</validationSet>
</validationSets>
</configuration>
</plugin>
<!--This plugin's configuration is used to store Eclipse m2e settings
only. It has no influence on the Maven build itself. -->
<plugin>
@ -750,6 +779,7 @@
<exclude>**/null/**</exclude>
<exclude>**/notebook/**</exclude>
<exclude>_tools/site/css/*</exclude>
<exclude>_tools/maven-4.0.0.xsd</exclude>
<exclude>**/README.md</exclude>
<exclude>DEPENDENCIES</exclude>
<exclude>DEPLOY.md</exclude>

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.python;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,7 +37,6 @@ public class PythonCondaInterpreter extends Interpreter {
Pattern activatePattern = Pattern.compile("activate\\s*(.*)");
Pattern deactivatePattern = Pattern.compile("deactivate");
Pattern helpPattern = Pattern.compile("help");
String pythonCommand = null;
public PythonCondaInterpreter(Properties property) {
super(property);
@ -66,11 +66,11 @@ public class PythonCondaInterpreter extends Interpreter {
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
} else if (activateMatcher.matches()) {
String envName = activateMatcher.group(1);
pythonCommand = "conda run -n " + envName + " \"python -iu\"";
setPythonCommand("conda run -n " + envName + " \"python -iu\"");
restartPythonProcess();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + envName + "\" activated");
} else if (deactivateMatcher.matches()) {
pythonCommand = null;
setPythonCommand(null);
restartPythonProcess();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Deactivated");
} else if (helpMatcher.matches()) {
@ -81,6 +81,11 @@ public class PythonCondaInterpreter extends Interpreter {
}
}
public void setPythonCommand(String cmd) {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
}
private void restartPythonProcess() {
PythonInterpreter python = getPythonInterpreter();
python.close();
@ -106,10 +111,6 @@ public class PythonCondaInterpreter extends Interpreter {
return python;
}
public String getPythonCommand() {
return pythonCommand;
}
private void listEnv(InterpreterOutput out) {
StringBuilder sb = createStringBuilder();
try {
@ -149,7 +150,7 @@ public class PythonCondaInterpreter extends Interpreter {
private void printUsage(InterpreterOutput out) {
try {
out.setType(InterpreterResult.Type.HTML);
out.writeResource("output_templates/usage.html");
out.writeResource("output_templates/conda_usage.html");
} catch (IOException e) {
logger.error("Can't print usage", e);
}
@ -170,6 +171,21 @@ public class PythonCondaInterpreter extends Interpreter {
return 0;
}
/**
* Use python interpreter's scheduler.
* To make sure %python.conda paragraph and %python paragraph runs sequentially
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
}
protected int runCommand(StringBuilder sb, String ... command)
throws IOException, InterruptedException {
ProcessBuilder builder = new ProcessBuilder(command);

View file

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Helps run python interpreter on a docker container
*/
public class PythonDockerInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(PythonDockerInterpreter.class);
Pattern activatePattern = Pattern.compile("activate\\s*(.*)");
Pattern deactivatePattern = Pattern.compile("deactivate");
Pattern helpPattern = Pattern.compile("help");
public PythonDockerInterpreter(Properties property) {
super(property);
}
@Override
public void open() {
}
@Override
public void close() {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterOutput out = context.out;
Matcher activateMatcher = activatePattern.matcher(st);
Matcher deactivateMatcher = deactivatePattern.matcher(st);
Matcher helpMatcher = helpPattern.matcher(st);
if (st == null || st.isEmpty() || helpMatcher.matches()) {
printUsage(out);
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
} else if (activateMatcher.matches()) {
String image = activateMatcher.group(1);
pull(out, image);
setPythonCommand("docker run -i --rm " + image + " python -iu");
restartPythonProcess();
out.clear();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "\"" + image + "\" activated");
} else if (deactivateMatcher.matches()) {
setPythonCommand(null);
restartPythonProcess();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "Deactivated");
} else {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Not supported command: " + st);
}
}
public void setPythonCommand(String cmd) {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
}
private void printUsage(InterpreterOutput out) {
try {
out.setType(InterpreterResult.Type.HTML);
out.writeResource("output_templates/docker_usage.html");
} catch (IOException e) {
logger.error("Can't print usage", e);
}
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.NONE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
/**
* Use python interpreter's scheduler.
* To make sure %python.docker paragraph and %python paragraph runs sequentially
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
}
private void restartPythonProcess() {
PythonInterpreter python = getPythonInterpreter();
python.close();
python.open();
}
protected PythonInterpreter getPythonInterpreter() {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
python = (PythonInterpreter) p;
if (lazy != null) {
lazy.open();
}
return python;
}
public boolean pull(InterpreterOutput out, String image) {
int exit = 0;
try {
exit = runCommand(out, "docker", "pull", image);
} catch (IOException | InterruptedException e) {
logger.error(e.getMessage(), e);
throw new InterpreterException(e);
}
return exit == 0;
}
protected int runCommand(InterpreterOutput out, String... command)
throws IOException, InterruptedException {
ProcessBuilder builder = new ProcessBuilder(command);
builder.redirectErrorStream(true);
Process process = builder.start();
InputStream stdout = process.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(stdout));
String line;
while ((line = br.readLine()) != null) {
out.write(line + "\n");
}
int r = process.waitFor(); // Let the process finish.
return r;
}
}

View file

@ -60,6 +60,7 @@ public class PythonInterpreter extends Interpreter {
private int maxResult;
PythonProcess process = null;
private String pythonCommand = null;
public PythonInterpreter(Properties property) {
super(property);
@ -199,10 +200,9 @@ public class PythonInterpreter extends Interpreter {
public PythonProcess getPythonProcess() {
if (process == null) {
PythonCondaInterpreter conda = getCondaInterpreter();
String binPath = getProperty(ZEPPELIN_PYTHON);
if (conda != null && conda.getPythonCommand() != null) {
binPath = conda.getPythonCommand();
if (pythonCommand != null) {
binPath = pythonCommand;
}
return new PythonProcess(binPath);
} else {
@ -210,6 +210,14 @@ public class PythonInterpreter extends Interpreter {
}
}
public void setPythonCommand(String cmd) {
pythonCommand = cmd;
}
public String getPythonCommand() {
return pythonCommand;
}
private Job getRunningJob(String paragraphId) {
Job foundJob = null;
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
@ -284,25 +292,4 @@ public class PythonInterpreter extends Interpreter {
public int getMaxResult() {
return maxResult;
}
private PythonCondaInterpreter getCondaInterpreter() {
LazyOpenInterpreter lazy = null;
PythonCondaInterpreter conda = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(
PythonCondaInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
conda = (PythonCondaInterpreter) p;
if (lazy != null) {
lazy.open();
}
return conda;
}
}

View file

@ -0,0 +1,27 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<h4>Usage</h4>
<div>
Activate an docker environment (python interpreter will be restarted)
<pre>%python.docker activate [Repository]
%python.docker activate [Repository:Tag]
%python.docker activate [Image Id]</pre>
</div>
<div>
Deactivate
<pre>%python.docker deactivate</pre>
</div>
<div>
Example
<pre># Run python interpreter with latest tensorflow image
%python.docker activate gcr.io/tensorflow/tensorflow:latest</pre>
</div>

View file

@ -77,7 +77,7 @@ public class PythonCondaInterpreterTest implements InterpreterOutputListener {
conda.interpret("activate env", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
assertEquals("conda run -n env \"python -iu\"", conda.getPythonCommand());
verify(python).setPythonCommand("conda run -n env \"python -iu\"");
}
@Test
@ -86,13 +86,14 @@ public class PythonCondaInterpreterTest implements InterpreterOutputListener {
conda.interpret("deactivate", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
assertEquals(null, conda.getPythonCommand());
verify(python).setPythonCommand(null);
}
private InterpreterContext getInterpreterContext() {
return new InterpreterContext(
"noteId",
"paragraphId",
null,
"paragraphTitle",
"paragraphText",
new AuthenticationInfo(),

View file

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
public class PythonDockerInterpreterTest implements InterpreterOutputListener {
private PythonDockerInterpreter docker;
private PythonInterpreter python;
@Before
public void setUp() {
docker = spy(new PythonDockerInterpreter(new Properties()));
python = mock(PythonInterpreter.class);
InterpreterGroup group = new InterpreterGroup();
group.put("note", Arrays.asList(python, docker));
python.setInterpreterGroup(group);
docker.setInterpreterGroup(group);
doReturn(true).when(docker).pull(any(InterpreterOutput.class), anyString());
doReturn(python).when(docker).getPythonInterpreter();
}
@Test
public void testActivateEnv() {
InterpreterContext context = getInterpreterContext();
docker.interpret("activate env", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
verify(docker, times(1)).pull(any(InterpreterOutput.class), anyString());
verify(python).setPythonCommand("docker run -i --rm env python -iu");
}
@Test
public void testDeactivate() {
InterpreterContext context = getInterpreterContext();
docker.interpret("deactivate", context);
verify(python, times(1)).open();
verify(python, times(1)).close();
verify(python).setPythonCommand(null);
}
private InterpreterContext getInterpreterContext() {
return new InterpreterContext(
"noteId",
"paragraphId",
"replName",
"paragraphTitle",
"paragraphText",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),
null,
null,
null,
new InterpreterOutput(this));
}
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}

View file

@ -76,7 +76,7 @@ public class PythonInterpreterMatplotlibTest {
interpreters.add(python);
intpGroup.put("note", interpreters);
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(

View file

@ -78,7 +78,7 @@ public class PythonInterpreterPandasSqlTest {
intpGroup.put("note", Arrays.asList(python, sql));
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(

View file

@ -64,7 +64,7 @@ public class ScaldingInterpreterTest {
}
InterpreterGroup intpGroup = new InterpreterGroup();
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), null);

View file

@ -40,7 +40,7 @@ public class ScioInterpreterTest {
private final String newline = "\n";
private InterpreterContext getNewContext() {
return new InterpreterContext("note", "id", "title", "text",
return new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),

View file

@ -47,7 +47,7 @@ public class ShellInterpreterTest {
@Test
public void test() {
shell.open();
InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null);
InterpreterContext context = new InterpreterContext("", "1", null, "", "", null, null, null, null, null, null, null);
InterpreterResult result = new InterpreterResult(Code.ERROR);
if (System.getProperty("os.name").startsWith("Windows")) {
result = shell.interpret("dir", context);
@ -64,7 +64,7 @@ public class ShellInterpreterTest {
@Test
public void testInvalidCommand(){
shell.open();
InterpreterContext context = new InterpreterContext("","1","","",null,null,null,null,null,null,null);
InterpreterContext context = new InterpreterContext("","1",null,"","",null,null,null,null,null,null,null);
InterpreterResult result = new InterpreterResult(Code.ERROR);
if (System.getProperty("os.name").startsWith("Windows")) {
result = shell.interpret("invalid_command\ndir",context);

View file

@ -64,7 +64,7 @@ public class DepInterpreterTest {
intpGroup.get("note").add(dep);
dep.setInterpreterGroup(intpGroup);
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
null,

View file

@ -100,7 +100,7 @@ public class PySparkInterpreterTest {
pySparkInterpreter.open();
}
context = new InterpreterContext("note", "id", "title", "text",
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),

View file

@ -92,7 +92,7 @@ public class SparkInterpreterTest {
repl.open();
}
context = new InterpreterContext("note", "id", "title", "text",
context = new InterpreterContext("note", "id", null, "title", "text",
new AuthenticationInfo(),
new HashMap<String, Object>(),
new GUI(),

View file

@ -75,7 +75,7 @@ public class SparkSqlInterpreterTest {
sql.setInterpreterGroup(intpGroup);
sql.open();
}
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),

View file

@ -33,7 +33,7 @@ trait AbstractAngularElemTest
override def beforeEach() {
val intpGroup = new InterpreterGroup()
val context = new InterpreterContext("note", "paragraph", "title", "text",
val context = new InterpreterContext("note", "paragraph", null, "title", "text",
new AuthenticationInfo(), new util.HashMap[String, Object](), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
null,

View file

@ -29,7 +29,7 @@ trait AbstractAngularModelTest extends FlatSpec
with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers {
override def beforeEach() {
val intpGroup = new InterpreterGroup()
val context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
val context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(),
new java.util.HashMap[String, Object](), new GUI(), new AngularObjectRegistry(
intpGroup.getId(), null),
null,

View file

@ -49,6 +49,7 @@ public class InterpreterContext {
}
private final String noteId;
private final String replName;
private final String paragraphTitle;
private final String paragraphId;
private final String paragraphText;
@ -64,6 +65,7 @@ public class InterpreterContext {
public InterpreterContext(String noteId,
String paragraphId,
String replName,
String paragraphTitle,
String paragraphText,
AuthenticationInfo authenticationInfo,
@ -74,12 +76,13 @@ public class InterpreterContext {
List<InterpreterContextRunner> runners,
InterpreterOutput out
) {
this(noteId, paragraphId, paragraphTitle, paragraphText, authenticationInfo, config, gui,
angularObjectRegistry, resourcePool, runners, out, null);
this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
config, gui, angularObjectRegistry, resourcePool, runners, out, null);
}
public InterpreterContext(String noteId,
String paragraphId,
String replName,
String paragraphTitle,
String paragraphText,
AuthenticationInfo authenticationInfo,
@ -93,6 +96,7 @@ public class InterpreterContext {
) {
this.noteId = noteId;
this.paragraphId = paragraphId;
this.replName = replName;
this.paragraphTitle = paragraphTitle;
this.paragraphText = paragraphText;
this.authenticationInfo = authenticationInfo;
@ -107,6 +111,7 @@ public class InterpreterContext {
public InterpreterContext(String noteId,
String paragraphId,
String replName,
String paragraphTitle,
String paragraphText,
AuthenticationInfo authenticationInfo,
@ -118,8 +123,9 @@ public class InterpreterContext {
InterpreterOutput output,
RemoteWorksController remoteWorksController,
RemoteInterpreterEventClient eventClient) {
this(noteId, paragraphId, paragraphTitle, paragraphText, authenticationInfo, config, gui,
angularObjectRegistry, resourcePool, contextRunners, output, remoteWorksController);
this(noteId, paragraphId, replName, paragraphTitle, paragraphText, authenticationInfo,
config, gui, angularObjectRegistry, resourcePool, contextRunners, output,
remoteWorksController);
this.client = new RemoteEventClient(eventClient);
}
@ -127,6 +133,10 @@ public class InterpreterContext {
return noteId;
}
public String getReplName() {
return replName;
}
public String getParagraphId() {
return paragraphId;
}

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter;
import java.util.ArrayList;
import java.util.List;
/**
@ -96,6 +97,21 @@ public class InterpreterOption {
this.perNote = perNote;
}
public static InterpreterOption fromInterpreterOption(InterpreterOption other) {
InterpreterOption option = new InterpreterOption();
option.remote = other.remote;
option.host = other.host;
option.port = other.port;
option.perNote = other.perNote;
option.perUser = other.perUser;
option.isExistingProcess = other.isExistingProcess;
option.setPermission = other.setPermission;
option.users = (null == other.users) ?
new ArrayList<String>() : new ArrayList<>(other.users);
return option;
}
public boolean isRemote() {
return remote;
}

View file

@ -494,6 +494,7 @@ public class RemoteInterpreter extends Interpreter {
return new RemoteInterpreterContext(
ic.getNoteId(),
ic.getParagraphId(),
ic.getReplName(),
ic.getParagraphTitle(),
ic.getParagraphText(),
gson.toJson(ic.getAuthenticationInfo()),

View file

@ -17,18 +17,14 @@
package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
import org.apache.commons.exec.*;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TException;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Properties;
/**
* Abstract class for interpreter process
@ -36,14 +32,11 @@ import java.util.Properties;
public abstract class RemoteInterpreterProcess {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
private final AtomicInteger referenceCount;
private ExecuteWatchdog watchdog;
private GenericObjectPool<Client> clientPool;
private final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
private int connectTimeout;
String host = "localhost";
boolean isInterpreterAlreadyExecuting = false;
public RemoteInterpreterProcess(
int connectTimeout,

View file

@ -72,7 +72,6 @@ public class RemoteInterpreterServer
Gson gson = new Gson();
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
RemoteInterpreterServer handler;
private int port;
private TThreadPoolServer server;
@ -587,6 +586,7 @@ public class RemoteInterpreterServer
return new InterpreterContext(
ric.getNoteId(),
ric.getParagraphId(),
ric.getReplName(),
ric.getParagraphTitle(),
ric.getParagraphText(),
gson.fromJson(ric.getAuthenticationInfo(), AuthenticationInfo.class),

View file

@ -57,12 +57,13 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphId", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField PARAGRAPH_TITLE_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphTitle", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final org.apache.thrift.protocol.TField PARAGRAPH_TEXT_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphText", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField AUTHENTICATION_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("authenticationInfo", org.apache.thrift.protocol.TType.STRING, (short)5);
private static final org.apache.thrift.protocol.TField CONFIG_FIELD_DESC = new org.apache.thrift.protocol.TField("config", org.apache.thrift.protocol.TType.STRING, (short)6);
private static final org.apache.thrift.protocol.TField GUI_FIELD_DESC = new org.apache.thrift.protocol.TField("gui", org.apache.thrift.protocol.TType.STRING, (short)7);
private static final org.apache.thrift.protocol.TField RUNNERS_FIELD_DESC = new org.apache.thrift.protocol.TField("runners", org.apache.thrift.protocol.TType.STRING, (short)8);
private static final org.apache.thrift.protocol.TField REPL_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("replName", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final org.apache.thrift.protocol.TField PARAGRAPH_TITLE_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphTitle", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField PARAGRAPH_TEXT_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphText", org.apache.thrift.protocol.TType.STRING, (short)5);
private static final org.apache.thrift.protocol.TField AUTHENTICATION_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("authenticationInfo", org.apache.thrift.protocol.TType.STRING, (short)6);
private static final org.apache.thrift.protocol.TField CONFIG_FIELD_DESC = new org.apache.thrift.protocol.TField("config", org.apache.thrift.protocol.TType.STRING, (short)7);
private static final org.apache.thrift.protocol.TField GUI_FIELD_DESC = new org.apache.thrift.protocol.TField("gui", org.apache.thrift.protocol.TType.STRING, (short)8);
private static final org.apache.thrift.protocol.TField RUNNERS_FIELD_DESC = new org.apache.thrift.protocol.TField("runners", org.apache.thrift.protocol.TType.STRING, (short)9);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@ -72,6 +73,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
public String noteId; // required
public String paragraphId; // required
public String replName; // required
public String paragraphTitle; // required
public String paragraphText; // required
public String authenticationInfo; // required
@ -83,12 +85,13 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
NOTE_ID((short)1, "noteId"),
PARAGRAPH_ID((short)2, "paragraphId"),
PARAGRAPH_TITLE((short)3, "paragraphTitle"),
PARAGRAPH_TEXT((short)4, "paragraphText"),
AUTHENTICATION_INFO((short)5, "authenticationInfo"),
CONFIG((short)6, "config"),
GUI((short)7, "gui"),
RUNNERS((short)8, "runners");
REPL_NAME((short)3, "replName"),
PARAGRAPH_TITLE((short)4, "paragraphTitle"),
PARAGRAPH_TEXT((short)5, "paragraphText"),
AUTHENTICATION_INFO((short)6, "authenticationInfo"),
CONFIG((short)7, "config"),
GUI((short)8, "gui"),
RUNNERS((short)9, "runners");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@ -107,17 +110,19 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return NOTE_ID;
case 2: // PARAGRAPH_ID
return PARAGRAPH_ID;
case 3: // PARAGRAPH_TITLE
case 3: // REPL_NAME
return REPL_NAME;
case 4: // PARAGRAPH_TITLE
return PARAGRAPH_TITLE;
case 4: // PARAGRAPH_TEXT
case 5: // PARAGRAPH_TEXT
return PARAGRAPH_TEXT;
case 5: // AUTHENTICATION_INFO
case 6: // AUTHENTICATION_INFO
return AUTHENTICATION_INFO;
case 6: // CONFIG
case 7: // CONFIG
return CONFIG;
case 7: // GUI
case 8: // GUI
return GUI;
case 8: // RUNNERS
case 9: // RUNNERS
return RUNNERS;
default:
return null;
@ -166,6 +171,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("paragraphId", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.REPL_NAME, new org.apache.thrift.meta_data.FieldMetaData("replName", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.PARAGRAPH_TITLE, new org.apache.thrift.meta_data.FieldMetaData("paragraphTitle", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.PARAGRAPH_TEXT, new org.apache.thrift.meta_data.FieldMetaData("paragraphText", org.apache.thrift.TFieldRequirementType.DEFAULT,
@ -188,6 +195,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
public RemoteInterpreterContext(
String noteId,
String paragraphId,
String replName,
String paragraphTitle,
String paragraphText,
String authenticationInfo,
@ -198,6 +206,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
this();
this.noteId = noteId;
this.paragraphId = paragraphId;
this.replName = replName;
this.paragraphTitle = paragraphTitle;
this.paragraphText = paragraphText;
this.authenticationInfo = authenticationInfo;
@ -216,6 +225,9 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
if (other.isSetParagraphId()) {
this.paragraphId = other.paragraphId;
}
if (other.isSetReplName()) {
this.replName = other.replName;
}
if (other.isSetParagraphTitle()) {
this.paragraphTitle = other.paragraphTitle;
}
@ -244,6 +256,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
public void clear() {
this.noteId = null;
this.paragraphId = null;
this.replName = null;
this.paragraphTitle = null;
this.paragraphText = null;
this.authenticationInfo = null;
@ -300,6 +313,30 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
}
}
public String getReplName() {
return this.replName;
}
public RemoteInterpreterContext setReplName(String replName) {
this.replName = replName;
return this;
}
public void unsetReplName() {
this.replName = null;
}
/** Returns true if field replName is set (has been assigned a value) and false otherwise */
public boolean isSetReplName() {
return this.replName != null;
}
public void setReplNameIsSet(boolean value) {
if (!value) {
this.replName = null;
}
}
public String getParagraphTitle() {
return this.paragraphTitle;
}
@ -462,6 +499,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
}
break;
case REPL_NAME:
if (value == null) {
unsetReplName();
} else {
setReplName((String)value);
}
break;
case PARAGRAPH_TITLE:
if (value == null) {
unsetParagraphTitle();
@ -521,6 +566,9 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
case PARAGRAPH_ID:
return getParagraphId();
case REPL_NAME:
return getReplName();
case PARAGRAPH_TITLE:
return getParagraphTitle();
@ -554,6 +602,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return isSetNoteId();
case PARAGRAPH_ID:
return isSetParagraphId();
case REPL_NAME:
return isSetReplName();
case PARAGRAPH_TITLE:
return isSetParagraphTitle();
case PARAGRAPH_TEXT:
@ -601,6 +651,15 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return false;
}
boolean this_present_replName = true && this.isSetReplName();
boolean that_present_replName = true && that.isSetReplName();
if (this_present_replName || that_present_replName) {
if (!(this_present_replName && that_present_replName))
return false;
if (!this.replName.equals(that.replName))
return false;
}
boolean this_present_paragraphTitle = true && this.isSetParagraphTitle();
boolean that_present_paragraphTitle = true && that.isSetParagraphTitle();
if (this_present_paragraphTitle || that_present_paragraphTitle) {
@ -672,6 +731,11 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
if (present_paragraphId)
list.add(paragraphId);
boolean present_replName = true && (isSetReplName());
list.add(present_replName);
if (present_replName)
list.add(replName);
boolean present_paragraphTitle = true && (isSetParagraphTitle());
list.add(present_paragraphTitle);
if (present_paragraphTitle)
@ -733,6 +797,16 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetReplName()).compareTo(other.isSetReplName());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetReplName()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replName, other.replName);
if (lastComparison != 0) {
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetParagraphTitle()).compareTo(other.isSetParagraphTitle());
if (lastComparison != 0) {
return lastComparison;
@ -829,6 +903,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
}
first = false;
if (!first) sb.append(", ");
sb.append("replName:");
if (this.replName == null) {
sb.append("null");
} else {
sb.append(this.replName);
}
first = false;
if (!first) sb.append(", ");
sb.append("paragraphTitle:");
if (this.paragraphTitle == null) {
sb.append("null");
@ -935,7 +1017,15 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 3: // PARAGRAPH_TITLE
case 3: // REPL_NAME
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.replName = iprot.readString();
struct.setReplNameIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 4: // PARAGRAPH_TITLE
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.paragraphTitle = iprot.readString();
struct.setParagraphTitleIsSet(true);
@ -943,7 +1033,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 4: // PARAGRAPH_TEXT
case 5: // PARAGRAPH_TEXT
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.paragraphText = iprot.readString();
struct.setParagraphTextIsSet(true);
@ -951,7 +1041,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 5: // AUTHENTICATION_INFO
case 6: // AUTHENTICATION_INFO
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.authenticationInfo = iprot.readString();
struct.setAuthenticationInfoIsSet(true);
@ -959,7 +1049,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 6: // CONFIG
case 7: // CONFIG
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.config = iprot.readString();
struct.setConfigIsSet(true);
@ -967,7 +1057,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 7: // GUI
case 8: // GUI
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.gui = iprot.readString();
struct.setGuiIsSet(true);
@ -975,7 +1065,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 8: // RUNNERS
case 9: // RUNNERS
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.runners = iprot.readString();
struct.setRunnersIsSet(true);
@ -1008,6 +1098,11 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
oprot.writeString(struct.paragraphId);
oprot.writeFieldEnd();
}
if (struct.replName != null) {
oprot.writeFieldBegin(REPL_NAME_FIELD_DESC);
oprot.writeString(struct.replName);
oprot.writeFieldEnd();
}
if (struct.paragraphTitle != null) {
oprot.writeFieldBegin(PARAGRAPH_TITLE_FIELD_DESC);
oprot.writeString(struct.paragraphTitle);
@ -1062,31 +1157,37 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
if (struct.isSetParagraphId()) {
optionals.set(1);
}
if (struct.isSetParagraphTitle()) {
if (struct.isSetReplName()) {
optionals.set(2);
}
if (struct.isSetParagraphText()) {
if (struct.isSetParagraphTitle()) {
optionals.set(3);
}
if (struct.isSetAuthenticationInfo()) {
if (struct.isSetParagraphText()) {
optionals.set(4);
}
if (struct.isSetConfig()) {
if (struct.isSetAuthenticationInfo()) {
optionals.set(5);
}
if (struct.isSetGui()) {
if (struct.isSetConfig()) {
optionals.set(6);
}
if (struct.isSetRunners()) {
if (struct.isSetGui()) {
optionals.set(7);
}
oprot.writeBitSet(optionals, 8);
if (struct.isSetRunners()) {
optionals.set(8);
}
oprot.writeBitSet(optionals, 9);
if (struct.isSetNoteId()) {
oprot.writeString(struct.noteId);
}
if (struct.isSetParagraphId()) {
oprot.writeString(struct.paragraphId);
}
if (struct.isSetReplName()) {
oprot.writeString(struct.replName);
}
if (struct.isSetParagraphTitle()) {
oprot.writeString(struct.paragraphTitle);
}
@ -1110,7 +1211,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterContext struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(8);
BitSet incoming = iprot.readBitSet(9);
if (incoming.get(0)) {
struct.noteId = iprot.readString();
struct.setNoteIdIsSet(true);
@ -1120,26 +1221,30 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
struct.setParagraphIdIsSet(true);
}
if (incoming.get(2)) {
struct.replName = iprot.readString();
struct.setReplNameIsSet(true);
}
if (incoming.get(3)) {
struct.paragraphTitle = iprot.readString();
struct.setParagraphTitleIsSet(true);
}
if (incoming.get(3)) {
if (incoming.get(4)) {
struct.paragraphText = iprot.readString();
struct.setParagraphTextIsSet(true);
}
if (incoming.get(4)) {
if (incoming.get(5)) {
struct.authenticationInfo = iprot.readString();
struct.setAuthenticationInfoIsSet(true);
}
if (incoming.get(5)) {
if (incoming.get(6)) {
struct.config = iprot.readString();
struct.setConfigIsSet(true);
}
if (incoming.get(6)) {
if (incoming.get(7)) {
struct.gui = iprot.readString();
struct.setGuiIsSet(true);
}
if (incoming.get(7)) {
if (incoming.get(8)) {
struct.runners = iprot.readString();
struct.setRunnersIsSet(true);
}

View file

@ -18,16 +18,16 @@
namespace java org.apache.zeppelin.interpreter.thrift
struct RemoteInterpreterContext {
1: string noteId,
2: string paragraphId,
3: string paragraphTitle,
4: string paragraphText,
5: string authenticationInfo,
6: string config, // json serialized config
7: string gui, // json serialized gui
8: string runners // json serialized runner
3: string replName,
4: string paragraphTitle,
5: string paragraphText,
6: string authenticationInfo,
7: string config, // json serialized config
8: string gui, // json serialized gui
9: string runners // json serialized runner
}
struct RemoteInterpreterResult {

View file

@ -27,7 +27,7 @@ public class InterpreterContextTest {
public void testThreadLocal() {
assertNull(InterpreterContext.get());
InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null));
InterpreterContext.set(new InterpreterContext(null, null, null, null, null, null, null, null, null, null, null, null));
assertNotNull(InterpreterContext.get());
InterpreterContext.remove();

View file

@ -36,7 +36,7 @@ public class LazyOpenInterpreterTest {
assertFalse("Interpreter is not open", lazyOpenInterpreter.isOpen());
InterpreterContext interpreterContext =
new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null);
new InterpreterContext("note", "id", null, "title", "text", null, null, null, null, null, null, null);
lazyOpenInterpreter.interpret("intp 1", interpreterContext);
assertTrue("Interpeter is open", lazyOpenInterpreter.isOpen());
}

View file

@ -86,6 +86,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
context = new InterpreterContext(
"note",
"id",
null,
"title",
"text",
new AuthenticationInfo(),

View file

@ -85,6 +85,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
return new InterpreterContext(
"noteId",
"id",
null,
"title",
"text",
new AuthenticationInfo(),

View file

@ -148,6 +148,7 @@ public class RemoteInterpreterTest {
new InterpreterContext(
"note",
"id",
null,
"title",
"text",
new AuthenticationInfo(),
@ -184,6 +185,7 @@ public class RemoteInterpreterTest {
new InterpreterContext(
"noteId",
"id",
null,
"title",
"text",
new AuthenticationInfo(),
@ -243,6 +245,7 @@ public class RemoteInterpreterTest {
new InterpreterContext(
"note",
"id",
null,
"title",
"text",
new AuthenticationInfo(),
@ -257,6 +260,7 @@ public class RemoteInterpreterTest {
new InterpreterContext(
"note",
"id",
null,
"title",
"text",
new AuthenticationInfo(),
@ -311,6 +315,7 @@ public class RemoteInterpreterTest {
new InterpreterContext(
"note",
"jobA",
null,
"title",
"text",
new AuthenticationInfo(),
@ -347,6 +352,7 @@ public class RemoteInterpreterTest {
new InterpreterContext(
"note",
"jobB",
null,
"title",
"text",
new AuthenticationInfo(),
@ -413,6 +419,7 @@ public class RemoteInterpreterTest {
InterpreterResult ret = intpA.interpret(getJobName(), new InterpreterContext(
"note",
jobId,
null,
"title",
"text",
new AuthenticationInfo(),
@ -493,6 +500,7 @@ public class RemoteInterpreterTest {
InterpreterResult ret = intpA.interpret(stmt, new InterpreterContext(
"note",
jobId,
null,
"title",
"text",
new AuthenticationInfo(),
@ -595,6 +603,7 @@ public class RemoteInterpreterTest {
new InterpreterContext(
"note",
"jobA",
null,
"title",
"text",
new AuthenticationInfo(),
@ -754,6 +763,7 @@ public class RemoteInterpreterTest {
InterpreterContext context = new InterpreterContext(
"noteId",
"id",
null,
"title",
"text",
new AuthenticationInfo(),

View file

@ -103,6 +103,7 @@ public class DistributedResourcePoolTest {
context = new InterpreterContext(
"note",
"id",
null,
"title",
"text",
new AuthenticationInfo(),

View file

@ -112,6 +112,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
intpA.interpret("1000", new InterpreterContext(
"note",
"jobId",
null,
"title",
"text",
new AuthenticationInfo(),
@ -190,6 +191,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
InterpreterContext context = new InterpreterContext(
"note",
"jobId1",
null,
"title",
"text",
new AuthenticationInfo(),
@ -228,6 +230,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
InterpreterContext context = new InterpreterContext(
"note",
"jobId2",
null,
"title",
"text",
new AuthenticationInfo(),

View file

@ -37,6 +37,7 @@ import org.apache.shiro.authc.UsernamePasswordToken;
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
import org.apache.zeppelin.server.ZeppelinServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,6 +57,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
private static final String USER_LOGIN_API_ENDPOINT = "api/v1/users/login";
private static final String JSON_CONTENT_TYPE = "application/json";
private static final String UTF_8_ENCODING = "UTF-8";
private static final String USER_SESSION_HEADER = "X-session";
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();
private final HttpClient httpClient;
@ -126,6 +128,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
protected User authenticateUser(String requestBody) {
PutMethod put = new PutMethod(Joiner.on("/").join(zeppelinhubUrl, USER_LOGIN_API_ENDPOINT));
String responseBody = StringUtils.EMPTY;
String userSession = StringUtils.EMPTY;
try {
put.setRequestEntity(new StringRequestEntity(requestBody, JSON_CONTENT_TYPE, UTF_8_ENCODING));
int statusCode = httpClient.executeMethod(put);
@ -136,6 +139,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
+ "Login or password incorrect");
}
responseBody = put.getResponseBodyAsString();
userSession = put.getResponseHeader(USER_SESSION_HEADER).getValue();
put.releaseConnection();
} catch (IOException e) {
@ -150,13 +154,16 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
LOG.error("Cannot deserialize ZeppelinHub response to User instance", e);
throw new AuthenticationException("Cannot login to ZeppelinHub");
}
// Add ZeppelinHub user_session token this singleton map, this will help ZeppelinHubRepo
// to get specific information about the current user.
UserSessionContainer.instance.setSession(account.login, userSession);
/* TODO(khalid): add proper roles and add listener */
HashSet<String> userAndRoles = new HashSet<String>();
userAndRoles.add(account.login);
ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles);
return account;
}

View file

@ -193,7 +193,8 @@ public class NotebookServer extends WebSocketServlet implements
if (StringUtils.isEmpty(conn.getUser())) {
addUserConnection(messagereceived.principal, conn);
}
AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);
AuthenticationInfo subject =
new AuthenticationInfo(messagereceived.principal, messagereceived.ticket);
/** Lets be elegant here */
switch (messagereceived.op) {

View file

@ -18,11 +18,14 @@
package org.apache.zeppelin.rest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
@ -44,6 +47,7 @@ import org.junit.runners.MethodSorters;
import com.google.gson.Gson;
import static org.junit.Assert.*;
import static org.hamcrest.MatcherAssert.assertThat;
/**
* Zeppelin interpreter rest api tests
@ -146,6 +150,70 @@ public class InterpreterRestApiTest extends AbstractTestRestApi {
delete.releaseConnection();
}
@Test
public void testCreatedInterpreterDependencies() throws IOException {
// when: Create 2 interpreter settings `md1` and `md2` which have different dep.
String md1Name = "md1";
String md2Name = "md2";
String md1Dep = "org.apache.drill.exec:drill-jdbc:jar:1.7.0";
String md2Dep = "org.apache.drill.exec:drill-jdbc:jar:1.6.0";
String reqBody1 = "{\"name\":\"" + md1Name + "\",\"group\":\"md\",\"properties\":{\"propname\":\"propvalue\"}," +
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
"\"dependencies\":[ {\n" +
" \"groupArtifactVersion\": \"" + md1Dep + "\",\n" +
" \"exclusions\":[]\n" +
" }]," +
"\"option\": { \"remote\": true, \"session\": false }}";
PostMethod post = httpPost("/interpreter/setting", reqBody1);
assertThat("test create method:", post, isCreated());
post.releaseConnection();
String reqBody2 = "{\"name\":\"" + md2Name + "\",\"group\":\"md\",\"properties\":{\"propname\":\"propvalue\"}," +
"\"interpreterGroup\":[{\"class\":\"org.apache.zeppelin.markdown.Markdown\",\"name\":\"md\"}]," +
"\"dependencies\":[ {\n" +
" \"groupArtifactVersion\": \"" + md2Dep + "\",\n" +
" \"exclusions\":[]\n" +
" }]," +
"\"option\": { \"remote\": true, \"session\": false }}";
post = httpPost("/interpreter/setting", reqBody2);
assertThat("test create method:", post, isCreated());
post.releaseConnection();
// 1. Call settings API
GetMethod get = httpGet("/interpreter/setting");
String rawResponse = get.getResponseBodyAsString();
get.releaseConnection();
// 2. Parsing to List<InterpreterSettings>
JsonObject responseJson = gson.fromJson(rawResponse, JsonElement.class).getAsJsonObject();
JsonArray bodyArr = responseJson.getAsJsonArray("body");
List<InterpreterSetting> settings = new Gson().fromJson(bodyArr,
new TypeToken<ArrayList<InterpreterSetting>>() {
}.getType());
// 3. Filter interpreters out we have just created
InterpreterSetting md1 = null;
InterpreterSetting md2 = null;
for (InterpreterSetting setting : settings) {
if (md1Name.equals(setting.getName())) {
md1 = setting;
} else if (md2Name.equals(setting.getName())) {
md2 = setting;
}
}
// then: should get created interpreters which have different dependencies
// 4. Validate each md interpreter has its own dependencies
assertEquals(1, md1.getDependencies().size());
assertEquals(1, md2.getDependencies().size());
assertEquals(md1Dep, md1.getDependencies().get(0).getGroupArtifactVersion());
assertEquals(md2Dep, md2.getDependencies().get(0).getGroupArtifactVersion());
}
@Test
public void testSettingsCreateWithEmptyJson() throws IOException {
// Call Create Setting REST API

View file

@ -2,7 +2,13 @@
"env": {
"browser": true,
"jasmine": true,
"node": true
"node": true,
"es6": true
},
"parserOptions": {
"ecmaFeatures": {
"experimentalObjectRestSpread": true
}
},
"globals": {
"angular": false,

View file

@ -44,6 +44,30 @@ module.exports = function(grunt) {
// Project settings
yeoman: appConfig,
babel: {
options: {
sourceMap: true,
presets: ['es2015'],
plugins: ['transform-object-rest-spread']
},
dev: {
files: [{
expand: true,
cwd: './src/',
src: ['**/*.js'],
dest: '.tmp',
}]
},
dist: {
files: [{
expand: true,
cwd: '.tmp/concat/scripts',
src: ['scripts.js'],
dest: '.tmp/concat/scripts',
}]
}
},
// use ngAnnotate instead og ngMin
ngAnnotate: {
dist: {
@ -138,7 +162,7 @@ module.exports = function(grunt) {
'<%= yeoman.app %>/app/**/*.js',
'<%= yeoman.app %>/components/**/*.js'
],
tasks: ['newer:eslint:all', 'newer:jscs:all'],
tasks: ['newer:eslint:all', 'newer:jscs:all', 'newer:babel:dev'],
options: {
livereload: '<%= connect.options.livereload %>'
}
@ -147,11 +171,16 @@ module.exports = function(grunt) {
files: [
'<%= yeoman.app %>/**/*.html'
],
tasks: ['newer:htmlhint']
tasks: ['newer:htmlhint', 'newer:copy:dev']
},
jsTest: {
files: ['test/spec/{,*/}*.js'],
tasks: ['newer:eslint:test', 'newer:jscs:test', 'karma']
tasks: [
'newer:eslint:test',
'newer:jscs:test',
'newer:babel:dev',
'karma'
]
},
styles: {
files: [
@ -185,11 +214,12 @@ module.exports = function(grunt) {
port: 9000,
// Change this to '0.0.0.0' to access the server from outside.
hostname: 'localhost',
livereload: 35729
livereload: 35729,
base: '.tmp',
},
livereload: {
options: {
open: true,
open: false,
middleware: function(connect) {
return [
connect.static('.tmp'),
@ -197,7 +227,6 @@ module.exports = function(grunt) {
'/bower_components',
connect.static('./bower_components')
),
connect.static(appConfig.app)
];
}
}
@ -220,7 +249,7 @@ module.exports = function(grunt) {
},
dist: {
options: {
open: true,
open: false,
base: '<%= yeoman.dist %>'
}
}
@ -382,9 +411,6 @@ module.exports = function(grunt) {
}
}
},
// concat: {
// dist: {}
// },
svgmin: {
dist: {
@ -421,6 +447,60 @@ module.exports = function(grunt) {
// Copies remaining files to places other tasks can use
copy: {
dev: {
files: [{
expand: true,
dot: true,
cwd: '<%= yeoman.app %>',
dest: '.tmp',
src: [
'*.{ico,png,txt}',
'.htaccess',
'*.html',
'**/*.css',
'assets/styles/**/*',
'assets/images/**/*',
'WEB-INF/*'
]
}, {
// copy fonts
expand: true,
cwd: '<%= yeoman.app %>',
dest: '.tmp',
src: ['fonts/**/*.{eot,svg,ttf,woff}']
}, {
expand: true,
cwd: '<%= yeoman.app %>',
dest: '.tmp',
src: ['app/**/*.html', 'components/**/*.html']
}, {
expand: true,
cwd: 'bower_components/datatables/media/images',
src: '{,*/}*.{png,jpg,jpeg,gif}',
dest: '.tmp/images'
}, {
expand: true,
cwd: '.tmp/images',
dest: '.tmp/images',
src: ['generated/*']
}, {
expand: true,
cwd: 'bower_components/bootstrap/dist',
src: 'fonts/*',
dest: '.tmp'
}, {
expand: true,
cwd: 'bower_components/jquery-ui/themes/base/images',
src: '{,*/}*.{png,jpg,jpeg,gif}',
dest: '.tmp/styles/images'
}, {
expand: true,
cwd: 'bower_components/MathJax',
src: [
'extensions/**', 'jax/**', 'fonts/**'],
dest: '.tmp'
}]
},
dist: {
files: [{
expand: true,
@ -486,10 +566,10 @@ module.exports = function(grunt) {
// Run some tasks in parallel to speed up the build process
concurrent: {
server: [
'copy:styles'
'copy:dev'
],
test: [
'copy:styles'
'copy:dev',
],
dist: [
'copy:styles',
@ -516,6 +596,7 @@ module.exports = function(grunt) {
'wiredep',
'concurrent:server',
'postcss',
'babel:dev',
'connect:livereload',
'watch'
]);
@ -528,9 +609,11 @@ module.exports = function(grunt) {
grunt.registerTask('test', [
'clean:server',
'babel',
'wiredep',
'concurrent:test',
'postcss',
'babel:dev',
'connect:test',
'karma'
]);
@ -546,6 +629,7 @@ module.exports = function(grunt) {
'concurrent:dist',
'postcss',
'concat',
'babel:dist',
'ngAnnotate',
'copy:dist',
'cssmin',

View file

@ -7,7 +7,8 @@
"postinstall": "node_modules/.bin/bower install --silent",
"build": "./node_modules/.bin/grunt build",
"start": "./node_modules/.bin/grunt serve",
"test": "./node_modules/.bin/grunt test"
"test": "./node_modules/.bin/grunt test",
"pretest": "./node_modules/.bin/npm install karma-phantomjs-launcher"
},
"dependencies": {
"grunt-angular-templates": "^0.5.7",
@ -16,7 +17,10 @@
"devDependencies": {
"autoprefixer": "^6.1.0",
"bower": "^1.8.0",
"babel-plugin-transform-object-rest-spread": "^6.16.0",
"babel-preset-es2015": "^6.18.0",
"grunt": "^0.4.1",
"grunt-babel": "^6.0.0",
"grunt-cache-bust": "1.3.0",
"grunt-cli": "^0.1.13",
"grunt-concurrent": "^0.5.0",
@ -44,7 +48,6 @@
"karma": "~1.3.0",
"karma-coverage": "^1.1.1",
"karma-jasmine": "~1.0.2",
"karma-phantomjs-launcher": "~1.0.2",
"load-grunt-tasks": "^0.4.0",
"time-grunt": "^0.3.1"
},

View file

@ -24,6 +24,7 @@
$scope.credentialInfo = [];
$scope.showAddNewCredentialInfo = false;
$scope.availableInterpreters = [];
var getCredentialInfo = function() {
$http.get(baseUrlSrv.getRestApiBase() + '/credential').
@ -87,6 +88,33 @@
});
};
var getAvailableInterpreters = function() {
$http.get(baseUrlSrv.getRestApiBase() + '/interpreter/setting')
.success(function(data, status, headers, config) {
for (var setting = 0; setting < data.body.length; setting++) {
$scope.availableInterpreters.push(
data.body[setting].group + '.' + data.body[setting].name);
}
angular.element('#entityname').autocomplete({
source: $scope.availableInterpreters,
select: function(event, selected) {
$scope.entity = selected.item.value;
return false;
}
});
}).error(function(data, status, headers, config) {
console.log('Error %o %o', status, data.message);
});
};
$scope.toggleAddNewCredentialInfo = function() {
if ($scope.showAddNewCredentialInfo) {
$scope.showAddNewCredentialInfo = false;
} else {
$scope.showAddNewCredentialInfo = true;
}
};
$scope.cancelCredentialInfo = function() {
$scope.showAddNewCredentialInfo = false;
resetCredentialInfo();
@ -155,6 +183,7 @@
};
var init = function() {
getAvailableInterpreters();
getCredentialInfo();
};

View file

@ -26,7 +26,7 @@ limitations under the License.
<i class="icon-question" ng-style="{color: showRepositoryInfo ? '#3071A9' : 'black' }"></i>
</a>
<button class="btn btn-default btn-sm"
ng-click="showAddNewCredentialInfo = !showAddNewCredentialInfo">
ng-click="toggleAddNewCredentialInfo()">
<i class="fa fa-plus"></i>
Add
</button>
@ -59,7 +59,7 @@ limitations under the License.
</thead>
<tr>
<td>
<textarea msd-elastic ng-model="entity"></textarea>
<input id="entityname" ng-model="entity" placeholder="[Interpreter Group].[Interpreter Name]"/>
</td>
<td>
<textarea msd-elastic ng-model="username"></textarea>

View file

@ -68,17 +68,17 @@ module.exports = function(config) {
'bower_components/MathJax/MathJax.js',
'bower_components/angular-mocks/angular-mocks.js',
// endbower
'src/app/app.js',
'src/app/app.controller.js',
'src/app/tabledata/transformation.js',
'src/app/**/*.js',
'src/components/**/*.js',
'.tmp/app/app.js',
'.tmp/app/app.controller.js',
'.tmp/app/tabledata/transformation.js',
'.tmp/app/**/*.js',
'.tmp/components/**/*.js',
'test/spec/**/*.js'
],
// list of files / patterns to exclude
exclude: [
'src/app/visualization/builtins/*.js'
'.tmp/app/visualization/builtins/*.js'
],
// web server port

View file

@ -250,7 +250,7 @@
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration combine.children="append">
<configuration>
<forkMode>always</forkMode>
</configuration>
</plugin>

View file

@ -538,6 +538,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.python.PythonInterpreter,"
+ "org.apache.zeppelin.python.PythonInterpreterPandasSql,"
+ "org.apache.zeppelin.python.PythonCondaInterpreter,"
+ "org.apache.zeppelin.python.PythonDockerInterpreter,"
+ "org.apache.zeppelin.ignite.IgniteInterpreter,"
+ "org.apache.zeppelin.ignite.IgniteSqlInterpreter,"
+ "org.apache.zeppelin.lens.LensInterpreter,"

View file

@ -284,10 +284,17 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
private InterpreterSetting createFromInterpreterSettingRef(InterpreterSetting o) {
InterpreterSetting setting =
new InterpreterSetting(o.getName(), o.getName(), o.getInterpreterInfos(),
convertInterpreterProperties((Map<String, InterpreterProperty>) o.getProperties()),
o.getDependencies(), o.getOption(), o.getPath());
// should return immutable objects
List<InterpreterInfo> infos = (null == o.getInterpreterInfos()) ?
new ArrayList<InterpreterInfo>() : new ArrayList<>(o.getInterpreterInfos());
List<Dependency> deps = (null == o.getDependencies()) ?
new ArrayList<Dependency>() : new ArrayList<>(o.getDependencies());
Properties props =
convertInterpreterProperties((Map<String, InterpreterProperty>) o.getProperties());
InterpreterOption option = InterpreterOption.fromInterpreterOption(o.getOption());
InterpreterSetting setting = new InterpreterSetting(o.getName(), o.getName(),
infos, props, deps, option, o.getPath());
setting.setInterpreterGroupFactory(this);
return setting;
}

View file

@ -473,6 +473,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
InterpreterContext interpreterContext = new InterpreterContext(
note.getId(),
getId(),
getRequiredReplName(),
this.getTitle(),
this.getText(),
this.getAuthenticationInfo(),

View file

@ -22,6 +22,8 @@ import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@ -29,8 +31,9 @@ import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -51,20 +54,27 @@ public class ZeppelinHubRepo implements NotebookRepo {
public static final String TOKEN_HEADER = "X-Zeppelin-Token";
private static final Gson GSON = new Gson();
private static final Note EMPTY_NOTE = new Note();
private final Client websocketClient;
//private final Client websocketClient;
private String token;
private ZeppelinhubRestApiHandler restApiClient;
private final ZeppelinConfiguration conf;
// In order to avoid too many call to ZeppelinHub backend, we save a map of user -> session.
private ConcurrentMap<String, String> usersToken = new ConcurrentHashMap<String, String>();
public ZeppelinHubRepo(ZeppelinConfiguration conf) {
this.conf = conf;
String zeppelinHubUrl = getZeppelinHubUrl(conf);
LOG.info("Initializing ZeppelinHub integration module");
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
getZeppelinhubWebsocketUri(conf), token, conf);
websocketClient.start();
// TODO(xxx): refactor this in the next itaration
//websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
// getZeppelinhubWebsocketUri(conf), token, conf);
//websocketClient.start();
}
private String getZeppelinHubWsUri(URI api) throws URISyntaxException {
@ -144,10 +154,56 @@ public class ZeppelinHubRepo implements NotebookRepo {
}
return zeppelinhubUrl;
}
/**
* Get Token directly from Zeppelinhub.
* This will avoid and remove the needs of setting up token in zeppelin-env.sh.
*/
private String getUserZeppelinInstanceToken(String ticket) throws IOException {
if (StringUtils.isBlank(ticket)) {
return "";
}
List<Instance> instances = restApiClient.getInstances(ticket);
// TODO(anthony): Implement NotebookRepo Setting to let user switch token at runtime.
token = instances.isEmpty() ? StringUtils.EMPTY : instances.get(0).token;
return token;
}
/**
* For a given user logged in is zeppelin (via zeppelinhub notebook repo), get default token.
* */
private String getUserToken(String principal) {
String token = usersToken.get(principal);
if (StringUtils.isBlank(token)) {
String ticket = UserSessionContainer.instance.getSession(principal);
try {
token = getUserZeppelinInstanceToken(ticket);
usersToken.putIfAbsent(principal, token);
} catch (IOException e) {
LOG.error("Cannot get user token", e);
token = StringUtils.EMPTY;
}
}
return token;
}
private boolean isSubjectValid(AuthenticationInfo subject) {
if (subject == null) {
return false;
}
return (subject.isAnonymous() && !conf.isAnonymousAllowed()) ? false : true;
}
@Override
public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
String response = restApiClient.asyncGet("");
if (!isSubjectValid(subject)) {
return Collections.emptyList();
}
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, StringUtils.EMPTY);
List<NoteInfo> notes = GSON.fromJson(response, new TypeToken<List<NoteInfo>>() {}.getType());
if (notes == null) {
return Collections.emptyList();
@ -158,11 +214,11 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public Note get(String noteId, AuthenticationInfo subject) throws IOException {
if (StringUtils.isBlank(noteId)) {
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return EMPTY_NOTE;
}
//String response = zeppelinhubHandler.get(noteId);
String response = restApiClient.asyncGet(noteId);
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, noteId);
Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
@ -173,45 +229,55 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public void save(Note note, AuthenticationInfo subject) throws IOException {
if (note == null) {
throw new IOException("Zeppelinhub failed to save empty note");
if (note == null || !isSubjectValid(subject)) {
throw new IOException("Zeppelinhub failed to save note");
}
String notebook = GSON.toJson(note);
restApiClient.asyncPut(notebook);
LOG.info("ZeppelinHub REST API saving note {} ", note.getId());
String jsonNote = GSON.toJson(note);
String token = getUserToken(subject.getUser());
LOG.info("ZeppelinHub REST API saving note {} ", note.getId());
restApiClient.put(token, jsonNote);
}
@Override
public void remove(String noteId, AuthenticationInfo subject) throws IOException {
restApiClient.asyncDel(noteId);
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
throw new IOException("Zeppelinhub failed to remove note");
}
String token = getUserToken(subject.getUser());
LOG.info("ZeppelinHub REST API removing note {} ", noteId);
restApiClient.del(token, noteId);
}
@Override
public void close() {
websocketClient.stop();
//websocketClient.stop();
}
@Override
public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
throws IOException {
if (StringUtils.isBlank(noteId)) {
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return null;
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint");
String content = GSON.toJson(ImmutableMap.of("message", checkpointMsg));
String response = restApiClient.asyncPutWithResponseBody(endpoint, content);
String token = getUserToken(subject.getUser());
String response = restApiClient.putWithResponseBody(token, endpoint, content);
return GSON.fromJson(response, Revision.class);
}
@Override
public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId)) {
if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId) || !isSubjectValid(subject)) {
return EMPTY_NOTE;
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint", revId);
String response = restApiClient.asyncGet(endpoint);
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, endpoint);
Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
@ -222,13 +288,14 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
if (StringUtils.isBlank(noteId)) {
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return Collections.emptyList();
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint");
List<Revision> history = Collections.emptyList();
try {
String response = restApiClient.asyncGet(endpoint);
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, endpoint);
history = GSON.fromJson(response, new TypeToken<List<Revision>>(){}.getType());
} catch (IOException e) {
LOG.error("Cannot get note history", e);

View file

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo.zeppelinhub.model;
/**
* ZeppelinHub Instance structure.
*
*/
public class Instance {
public int id;
public String name;
public String token;
}

View file

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo.zeppelinhub.model;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
/**
* Simple and yet dummy container for zeppelinhub session.
*
*/
public class UserSessionContainer {
private static class Entity {
public final String userSession;
Entity(String userSession) {
this.userSession = userSession;
}
}
private Map<String, Entity> sessions = new ConcurrentHashMap<>();
public static final UserSessionContainer instance = new UserSessionContainer();
public synchronized String getSession(String principal) {
Entity entry = sessions.get(principal);
if (entry == null) {
return StringUtils.EMPTY;
}
return entry.userSession;
}
public synchronized String setSession(String principal, String userSession) {
Entity entry = new Entity(userSession);
sessions.put(principal, entry);
return entry.userSession;
}
}

View file

@ -18,12 +18,16 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@ -35,6 +39,9 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
* REST API handler.
*
@ -42,6 +49,7 @@ import org.slf4j.LoggerFactory;
public class ZeppelinhubRestApiHandler {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token";
private static final String USER_SESSION_HEADER = "X-User-Session";
private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
private static boolean PROXY_ON = false;
private static String PROXY_HOST;
@ -49,16 +57,13 @@ public class ZeppelinhubRestApiHandler {
private final HttpClient client;
private final String zepelinhubUrl;
private final String token;
public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl,
String token) {
public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl, String token) {
return new ZeppelinhubRestApiHandler(zeppelinhubUrl, token);
}
private ZeppelinhubRestApiHandler(String zeppelinhubUrl, String token) {
this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
this.token = token;
//TODO(khalid):to make proxy conf consistent with Zeppelin confs
//readProxyConf();
@ -114,35 +119,75 @@ public class ZeppelinhubRestApiHandler {
return httpClient;
}
public String asyncGet(String argument) throws IOException {
return sendToZeppelinHub(HttpMethod.GET, zepelinhubUrl + argument, StringUtils.EMPTY, true);
/**
* Fetch zeppelin instances for a given user.
* @param ticket
* @return
* @throws IOException
*/
public List<Instance> getInstances(String ticket) throws IOException {
InputStreamResponseListener listener = new InputStreamResponseListener();
Response response;
String url = zepelinhubUrl + "instances";
String data;
Request request = client.newRequest(url).header(USER_SESSION_HEADER, ticket);
request.send(listener);
try {
response = listener.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
LOG.error("Cannot perform request to ZeppelinHub", e);
throw new IOException("Cannot perform GET request to ZeppelinHub", e);
}
int code = response.getStatus();
if (code == 200) {
try (InputStream responseContent = listener.getInputStream()) {
data = IOUtils.toString(responseContent, "UTF-8");
}
} else {
LOG.error("ZeppelinHub GET {} returned with status {} ", url, code);
throw new IOException("Cannot perform GET request to ZeppelinHub");
}
Type listType = new TypeToken<ArrayList<Instance>>() {}.getType();
return new Gson().fromJson(data, listType);
}
public String get(String token, String argument) throws IOException {
String url = zepelinhubUrl + argument;
return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
}
public String asyncPutWithResponseBody(String url, String json) throws IOException {
public String putWithResponseBody(String token, String url, String json) throws IOException {
if (StringUtils.isBlank(url) || StringUtils.isBlank(json)) {
LOG.error("Empty note, cannot send it to zeppelinHub");
throw new IOException("Cannot send emtpy note to zeppelinHub");
}
return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, true);
return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
}
public void asyncPut(String jsonNote) throws IOException {
public void put(String token, String jsonNote) throws IOException {
if (StringUtils.isBlank(jsonNote)) {
LOG.error("Cannot save empty note/string to ZeppelinHub");
return;
}
sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, false);
sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
}
public void asyncDel(String argument) throws IOException {
public void del(String token, String argument) throws IOException {
if (StringUtils.isBlank(argument)) {
LOG.error("Cannot delete empty note from ZeppelinHub");
return;
}
sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, false);
sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, false);
}
private String sendToZeppelinHub(HttpMethod method, String url, String json, boolean withResponse)
private String sendToZeppelinHub(HttpMethod method,
String url,
String json,
String token,
boolean withResponse)
throws IOException {
Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token);
if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST))

View file

@ -95,7 +95,7 @@ public class InterpreterFactoryTest {
schedulerFactory = new SchedulerFactory();
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null, null, depResolver, false);
context = new InterpreterContext("note", "id", "title", "text", null, null, null, null, null, null, null);
context = new InterpreterContext("note", "id", null, "title", "text", null, null, null, null, null, null, null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);

View file

@ -58,8 +58,7 @@ public class ParagraphTest {
assertEquals("md", Paragraph.getRequiredReplName(text));
assertEquals("", Paragraph.getScriptBody(text));
}
@Test
public void replSingleCharName() {
String text = "%r a";

View file

@ -13,6 +13,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
@ -20,8 +21,9 @@ import com.google.common.io.Files;
public class ZeppelinHubRepoTest {
final String TOKEN = "AAA-BBB-CCC-00";
final String token = "AAA-BBB-CCC-00";
final String testAddr = "http://zeppelinhub.ltd";
final AuthenticationInfo auth = new AuthenticationInfo("anthony");
private ZeppelinHubRepo repo;
private File pathOfNotebooks = new File(System.getProperty("user.dir") + "/src/test/resources/list_of_notes");
@ -30,7 +32,7 @@ public class ZeppelinHubRepoTest {
@Before
public void setUp() throws Exception {
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, testAddr);
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_TOKEN, "AAA-BBB-CCC-00");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_TOKEN, token);
ZeppelinConfiguration conf = new ZeppelinConfiguration();
repo = new ZeppelinHubRepo(conf);
@ -41,10 +43,10 @@ public class ZeppelinHubRepoTest {
ZeppelinhubRestApiHandler mockedZeppelinhubHandler = mock(ZeppelinhubRestApiHandler.class);
byte[] response = Files.toByteArray(pathOfNotebooks);
when(mockedZeppelinhubHandler.asyncGet("")).thenReturn(new String(response));
when(mockedZeppelinhubHandler.get("", "")).thenReturn(new String(response));
response = Files.toByteArray(pathOfNotebook);
when(mockedZeppelinhubHandler.asyncGet("AAAAA")).thenReturn(new String(response));
when(mockedZeppelinhubHandler.get("", "AAAAA")).thenReturn(new String(response));
return mockedZeppelinhubHandler;
}
@ -123,14 +125,14 @@ public class ZeppelinHubRepoTest {
@Test
public void testGetAllNotes() throws IOException {
List<NoteInfo> notebooks = repo.list(null);
List<NoteInfo> notebooks = repo.list(auth);
assertThat(notebooks).isNotEmpty();
assertThat(notebooks.size()).isEqualTo(3);
}
@Test
public void testGetNote() throws IOException {
Note notebook = repo.get("AAAAA", null);
Note notebook = repo.get("AAAAA", auth);
assertThat(notebook).isNotNull();
assertThat(notebook.getId()).isEqualTo("2A94M5J1Z");
}
@ -138,13 +140,13 @@ public class ZeppelinHubRepoTest {
@Test
public void testRemoveNote() throws IOException {
// not suppose to throw
repo.remove("AAAAA", null);
repo.remove("AAAAA", auth);
}
@Test
public void testRemoveNoteError() throws IOException {
// not suppose to throw
repo.remove("BBBBB", null);
repo.remove("BBBBB", auth);
}
}