ZEPPELIN-2685. Improvement on Interpreter class

This commit is contained in:
Jeff Zhang 2017-08-31 20:21:04 +08:00
parent a424f5c655
commit e545cc3c44
93 changed files with 1331 additions and 460 deletions

View file

@ -148,6 +148,7 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
if [[ -n "${HADOOP_CONF_DIR}" ]] && [[ -d "${HADOOP_CONF_DIR}" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${HADOOP_CONF_DIR}"
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR}
else
# autodetect HADOOP_CONF_HOME by heuristic
if [[ -n "${HADOOP_HOME}" ]] && [[ -z "${HADOOP_CONF_DIR}" ]]; then

View file

@ -181,6 +181,7 @@ For example,
* **local[*]** in local mode
* **spark://master:7077** in standalone cluster
* **yarn-client** in Yarn client mode
* **yarn-cluster** in Yarn cluster mode
* **mesos://host:5050** in Mesos cluster
That's it. Zeppelin will work with any version of Spark and any deployment type without rebuilding Zeppelin in this way.
@ -188,6 +189,11 @@ For the further information about Spark & Zeppelin version compatibility, please
> Note that without exporting `SPARK_HOME`, it's running in local mode with included version of Spark. The included version may vary depending on the build profile.
### 3. Yarn mode
Zeppelin support both yarn client and yarn cluster mode (yarn cluster mode is supported from 0.8.0). For yarn mode, you must specify `SPARK_HOME` & `HADOOP_CONF_DIR`.
You can either specify them in `zeppelin-env.sh`, or in interpreter setting page. Specifying them in `zeppelin-env.sh` means you can use only one version of `spark` & `hadoop`. Specifying them
in interpreter setting page means you can use multiple versions of `spark` & `hadoop` in one zeppelin instance.
## SparkContext, SQLContext, SparkSession, ZeppelinContext
SparkContext, SQLContext and ZeppelinContext are automatically created and exposed as variable names `sc`, `sqlContext` and `z`, respectively, in Scala, Python and R environments.
Staring from 0.6.1 SparkSession is available as variable `spark` when you are using Spark 2.x.

View file

@ -112,7 +112,7 @@ public class ElasticsearchInterpreter extends Interpreter {
@Override
public void open() {
logger.info("Properties: {}", getProperty());
logger.info("Properties: {}", getProperties());
String clientType = getProperty(ELASTICSEARCH_CLIENT_TYPE);
clientType = clientType == null ? null : clientType.toLowerCase();
@ -123,15 +123,15 @@ public class ElasticsearchInterpreter extends Interpreter {
catch (final NumberFormatException e) {
this.resultSize = 10;
logger.error("Unable to parse " + ELASTICSEARCH_RESULT_SIZE + " : " +
property.get(ELASTICSEARCH_RESULT_SIZE), e);
getProperty(ELASTICSEARCH_RESULT_SIZE), e);
}
try {
if (StringUtils.isEmpty(clientType) || "transport".equals(clientType)) {
elsClient = new TransportBasedClient(getProperty());
elsClient = new TransportBasedClient(getProperties());
}
else if ("http".equals(clientType)) {
elsClient = new HttpBasedClient(getProperty());
elsClient = new HttpBasedClient(getProperties());
}
else {
logger.error("Unknown type of Elasticsearch client: " + clientType);

View file

@ -20,6 +20,7 @@ package org.apache.zeppelin.file;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
@ -86,7 +87,7 @@ public abstract class FileInterpreter extends Interpreter {
// Functions that each file system implementation must override
public abstract String listAll(String path);
public abstract String listAll(String path) throws InterpreterException;
public abstract boolean isDirectory(String path);

View file

@ -202,7 +202,7 @@ public class HDFSFileInterpreter extends FileInterpreter {
return "No such File or directory";
}
public String listAll(String path) {
public String listAll(String path) throws InterpreterException {
String all = "";
if (exceptionOnConnect != null)
return "Error connecting to provided endpoint.";

View file

@ -17,7 +17,6 @@
*/
package org.apache.zeppelin.flink;
import java.lang.reflect.InvocationTargetException;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
@ -34,10 +33,8 @@ import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
@ -46,11 +43,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Console;
import scala.None;
import scala.Option;
import scala.Some;
import scala.collection.JavaConversions;
import scala.collection.immutable.Nil;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.AbstractFunction0;
import scala.tools.nsc.Settings;
@ -80,7 +74,7 @@ public class FlinkInterpreter extends Interpreter {
public void open() {
out = new ByteArrayOutputStream();
flinkConf = new org.apache.flink.configuration.Configuration();
Properties intpProperty = getProperty();
Properties intpProperty = getProperties();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));

View file

@ -316,12 +316,12 @@ public class GObject extends groovy.lang.GroovyObjectSupport {
@ZeppelinApi
public void run(String noteId, String paragraphId, InterpreterContext context) {
if (paragraphId.equals(context.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
throw new RuntimeException("Can not run current Paragraph");
}
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, paragraphId,
context);
if (runners.size() <= 0) {
throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
throw new RuntimeException("Paragraph " + paragraphId + " not found " + runners.size());
}
for (InterpreterContextRunner r : runners) {
r.run();
@ -338,7 +338,7 @@ public class GObject extends groovy.lang.GroovyObjectSupport {
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
if (runners.size() <= 0) {
throw new InterpreterException("Note " + noteId + " not found " + runners.size());
throw new RuntimeException("Note " + noteId + " not found " + runners.size());
}
for (InterpreterContextRunner r : runners) {

View file

@ -17,8 +17,6 @@
package org.apache.zeppelin.groovy;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.io.PrintWriter;
import java.io.File;
@ -26,10 +24,8 @@ import java.util.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
@ -40,7 +36,6 @@ import org.slf4j.LoggerFactory;
import groovy.lang.GroovyShell;
import groovy.lang.Script;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.codehaus.groovy.runtime.ResourceGroovyMethods;
import org.codehaus.groovy.runtime.StackTraceUtils;
import java.util.concurrent.ConcurrentHashMap;
@ -167,7 +162,7 @@ public class GroovyInterpreter extends Interpreter {
//put shared bindings evaluated in this interpreter
bindings.putAll(sharedBindings);
//put predefined bindings
bindings.put("g", new GObject(log, out, this.getProperty(), contextInterpreter, bindings));
bindings.put("g", new GObject(log, out, this.getProperties(), contextInterpreter, bindings));
bindings.put("out", new PrintWriter(out, true));
script.run();

View file

@ -68,7 +68,7 @@ public class HbaseInterpreter extends Interpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
this.scriptingContainer = new ScriptingContainer(LocalContextScope.SINGLETON);
this.writer = new StringWriter();
scriptingContainer.setOutput(this.writer);
@ -88,7 +88,7 @@ public class HbaseInterpreter extends Interpreter {
}
logger.info("Absolute Ruby Source:" + abs_ruby_src.toString());
// hirb.rb:41 requires the following system property to be set.
// hirb.rb:41 requires the following system properties to be set.
Properties sysProps = System.getProperties();
sysProps.setProperty(HBASE_RUBY_SRC, abs_ruby_src.toString());

View file

@ -15,6 +15,7 @@
package org.apache.zeppelin.hbase;
import org.apache.log4j.BasicConfigurator;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.BeforeClass;
import org.junit.Test;
@ -35,7 +36,7 @@ public class HbaseInterpreterTest {
private static HbaseInterpreter hbaseInterpreter;
@BeforeClass
public static void setUp() throws NullPointerException {
public static void setUp() throws NullPointerException, InterpreterException {
BasicConfigurator.configure();
Properties properties = new Properties();
properties.put("hbase.home", "");

View file

@ -74,7 +74,8 @@ public class DevInterpreter extends Interpreter {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
this.context = context;
try {
return interpreterEvent.interpret(st, context);

View file

@ -93,7 +93,7 @@ public class IgniteSqlInterpreter extends Interpreter {
}
@Override
public void close() {
public void close() throws InterpreterException {
try {
if (conn != null) {
conn.close();

View file

@ -27,6 +27,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
@ -82,7 +83,7 @@ public class IgniteSqlInterpreterTest {
}
@After
public void tearDown() {
public void tearDown() throws InterpreterException {
intp.close();
ignite.close();
}

View file

@ -45,8 +45,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
@ -172,7 +170,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
@Override
public void open() {
super.open();
for (String propertyKey : property.stringPropertyNames()) {
for (String propertyKey : properties.stringPropertyNames()) {
logger.debug("propertyKey: {}", propertyKey);
String[] keyValue = propertyKey.split("\\.", 2);
if (2 == keyValue.length) {
@ -185,7 +183,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
prefixProperties = new Properties();
basePropretiesMap.put(keyValue[0].trim(), prefixProperties);
}
prefixProperties.put(keyValue[1].trim(), property.getProperty(propertyKey));
prefixProperties.put(keyValue[1].trim(), getProperty(propertyKey));
}
}
@ -211,8 +209,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
protected boolean isKerboseEnabled() {
if (!isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
if (!isEmpty(getProperty("zeppelin.jdbc.auth.type"))) {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(properties);
if (authType.equals(KERBEROS)) {
return true;
}
@ -356,7 +354,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
}
private void setUserProperty(String propertyKey, InterpreterContext interpreterContext)
throws SQLException, IOException {
throws SQLException, IOException, InterpreterException {
String user = interpreterContext.getAuthenticationInfo().getUser();
@ -424,18 +422,19 @@ public class JDBCInterpreter extends KerberosInterpreter {
final Properties properties = jdbcUserConfigurations.getPropertyMap(propertyKey);
final String url = properties.getProperty(URL_KEY);
if (isEmpty(property.getProperty("zeppelin.jdbc.auth.type"))) {
if (isEmpty(getProperty("zeppelin.jdbc.auth.type"))) {
connection = getConnectionFromPool(url, user, propertyKey, properties);
} else {
UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthtype(property);
UserGroupInformation.AuthenticationMethod authType =
JDBCSecurityImpl.getAuthtype(getProperties());
final String connectionUrl = appendProxyUserToURL(url, user, propertyKey);
JDBCSecurityImpl.createSecureConfiguration(property, authType);
JDBCSecurityImpl.createSecureConfiguration(getProperties(), authType);
switch (authType) {
case KERBEROS:
if (user == null || "false".equalsIgnoreCase(
property.getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) {
getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable"))) {
connection = getConnectionFromPool(connectionUrl, user, propertyKey, properties);
} else {
if (basePropretiesMap.get(propertyKey).containsKey("proxy.user.property")) {
@ -497,7 +496,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
return connectionUrl.toString();
}
private String getPassword(Properties properties) throws IOException {
private String getPassword(Properties properties) throws IOException, InterpreterException {
if (isNotEmpty(properties.getProperty(PASSWORD_KEY))) {
return properties.getProperty(PASSWORD_KEY);
} else if (isNotEmpty(properties.getProperty(JDBC_JCEKS_FILE))
@ -850,7 +849,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
InterpreterContext interpreterContext) throws InterpreterException {
List<InterpreterCompletion> candidates = new ArrayList<>();
String propertyKey = getPropertyKey(buf);
String sqlCompleterKey =

View file

@ -37,6 +37,7 @@ import java.util.Properties;
import org.apache.zeppelin.completer.CompletionType;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.FIFOScheduler;
@ -349,7 +350,7 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
}
@Test
public void testAutoCompletion() throws SQLException, IOException {
public void testAutoCompletion() throws SQLException, IOException, InterpreterException {
Properties properties = new Properties();
properties.setProperty("common.max_count", "1000");
properties.setProperty("common.max_retry", "3");

View file

@ -101,7 +101,7 @@ public abstract class BaseLivyInterpreter extends Interpreter {
public abstract String getSessionKind();
@Override
public void open() {
public void open() throws InterpreterException {
try {
initLivySession();
} catch (LivyException e) {
@ -198,7 +198,7 @@ public abstract class BaseLivyInterpreter extends Interpreter {
throws LivyException {
try {
Map<String, String> conf = new HashMap<>();
for (Map.Entry<Object, Object> entry : property.entrySet()) {
for (Map.Entry<Object, Object> entry : getProperties().entrySet()) {
if (entry.getKey().toString().startsWith("livy.spark.") &&
!entry.getValue().toString().isEmpty())
conf.put(entry.getKey().toString().substring(5), entry.getValue().toString());
@ -428,15 +428,15 @@ public abstract class BaseLivyInterpreter extends Interpreter {
private RestTemplate createRestTemplate() {
String keytabLocation = property.getProperty("zeppelin.livy.keytab");
String principal = property.getProperty("zeppelin.livy.principal");
String keytabLocation = getProperty("zeppelin.livy.keytab");
String principal = getProperty("zeppelin.livy.principal");
boolean isSpnegoEnabled = StringUtils.isNotEmpty(keytabLocation) &&
StringUtils.isNotEmpty(principal);
HttpClient httpClient = null;
if (livyURL.startsWith("https:")) {
String keystoreFile = property.getProperty("zeppelin.livy.ssl.trustStore");
String password = property.getProperty("zeppelin.livy.ssl.trustStorePassword");
String keystoreFile = getProperty("zeppelin.livy.ssl.trustStore");
String password = getProperty("zeppelin.livy.ssl.trustStorePassword");
if (StringUtils.isBlank(keystoreFile)) {
throw new RuntimeException("No zeppelin.livy.ssl.trustStore specified for livy ssl");
}

View file

@ -59,7 +59,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
this.sparkInterpreter = getSparkInterpreter();
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
// to judge whether it is using spark2.
@ -93,7 +93,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
}
}
private LivySparkInterpreter getSparkInterpreter() {
private LivySparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
LivySparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());

View file

@ -75,8 +75,9 @@ public class LivyInterpreterIT {
return true;
}
// @Test
public void testSparkInterpreterRDD() {
public void testSparkInterpreterRDD() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@ -195,8 +196,9 @@ public class LivyInterpreterIT {
}
}
// @Test
public void testSparkInterpreterDataFrame() {
public void testSparkInterpreterDataFrame() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@ -284,7 +286,7 @@ public class LivyInterpreterIT {
}
// @Test
public void testSparkSQLInterpreter() {
public void testSparkSQLInterpreter() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@ -319,7 +321,7 @@ public class LivyInterpreterIT {
// @Test
public void testSparkSQLCancellation() {
public void testSparkSQLCancellation() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@ -400,7 +402,7 @@ public class LivyInterpreterIT {
}
// @Test
public void testStringWithTruncation() {
public void testStringWithTruncation() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@ -459,8 +461,9 @@ public class LivyInterpreterIT {
}
}
// @Test
public void testStringWithoutTruncation() {
public void testStringWithoutTruncation() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@ -525,7 +528,7 @@ public class LivyInterpreterIT {
}
@Test
public void testPySparkInterpreter() throws LivyException {
public void testPySparkInterpreter() throws LivyException, InterpreterException {
if (!checkPreCondition()) {
return;
}
@ -645,7 +648,7 @@ public class LivyInterpreterIT {
}
// @Test
public void testSparkInterpreterWithDisplayAppInfo() {
public void testSparkInterpreterWithDisplayAppInfo() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
@ -684,7 +687,7 @@ public class LivyInterpreterIT {
}
// @Test
public void testSparkRInterpreter() throws LivyException {
public void testSparkRInterpreter() throws LivyException, InterpreterException {
if (!checkPreCondition()) {
return;
}
@ -756,7 +759,7 @@ public class LivyInterpreterIT {
}
// @Test
public void testLivyTutorialNote() throws IOException {
public void testLivyTutorialNote() throws IOException, InterpreterException {
if (!checkPreCondition()) {
return;
}

View file

@ -18,7 +18,6 @@
package org.apache.zeppelin.pig;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.pig.PigServer;
import org.apache.pig.impl.logicalLayer.FrontendException;
@ -60,7 +59,7 @@ public class PigInterpreter extends BasePigInterpreter {
}
try {
pigServer = new PigServer(execType);
for (Map.Entry entry : getProperty().entrySet()) {
for (Map.Entry entry : getProperties().entrySet()) {
if (!entry.getKey().toString().startsWith("zeppelin.")) {
pigServer.getPigContext().getProperties().setProperty(entry.getKey().toString(),
entry.getValue().toString());

View file

@ -55,7 +55,7 @@ public class PigQueryInterpreter extends BasePigInterpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
pigServer = getPigInterpreter().getPigServer();
maxResult = Integer.parseInt(getProperty(MAX_RESULTS));
}
@ -159,7 +159,7 @@ public class PigQueryInterpreter extends BasePigInterpreter {
return this.pigServer;
}
private PigInterpreter getPigInterpreter() {
private PigInterpreter getPigInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
PigInterpreter pig = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PigInterpreter.class.getName());

View file

@ -21,6 +21,7 @@ package org.apache.zeppelin.pig;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.After;
@ -48,7 +49,7 @@ public class PigQueryInterpreterTest {
private InterpreterContext context;
@Before
public void setUp() {
public void setUp() throws InterpreterException {
Properties properties = new Properties();
properties.put("zeppelin.pig.execType", "local");
properties.put("zeppelin.pig.maxResult", "20");

View file

@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -112,19 +113,20 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
@Override
public void open() {
public void open() throws InterpreterException {
try {
if (ipythonClient != null) {
// IPythonInterpreter might already been opened by PythonInterpreter
return;
}
pythonExecutable = getProperty().getProperty("zeppelin.python", "python");
pythonExecutable = getProperty("zeppelin.python", "python");
LOGGER.info("Python Exec: " + pythonExecutable);
ipythonLaunchTimeout = Long.parseLong(
getProperty().getProperty("zeppelin.ipython.launch.timeout", "30000"));
getProperty("zeppelin.ipython.launch.timeout", "30000"));
this.zeppelinContext = new PythonZeppelinContext(
getInterpreterGroup().getInterpreterHookRegistry(),
Integer.parseInt(getProperty().getProperty("zeppelin.python.maxResult", "1000")));
Integer.parseInt(getProperty("zeppelin.python.maxResult", "1000")));
int ipythonPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
int jvmGatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
LOGGER.info("Launching IPython Kernel at port: " + ipythonPort);
@ -243,16 +245,7 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
}
Map<String, String> envs = EnvironmentUtils.getProcEnvironment();
if (envs.containsKey("PYTHONPATH")) {
if (additionalPythonPath != null) {
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
}
} else {
envs.put("PYTHONPATH", additionalPythonPath);
}
LOGGER.info("PYTHONPATH: " + envs.get("PYTHONPATH"));
Map<String, String> envs = setupIPythonEnv();
executor.execute(cmd, envs, this);
// wait until IPython kernel is started or timeout
@ -284,6 +277,18 @@ public class IPythonInterpreter extends Interpreter implements ExecuteResultHand
}
}
protected Map<String, String> setupIPythonEnv() throws IOException {
Map<String, String> envs = EnvironmentUtils.getProcEnvironment();
if (envs.containsKey("PYTHONPATH")) {
if (additionalPythonPath != null) {
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
}
} else {
envs.put("PYTHONPATH", additionalPythonPath);
}
return envs;
}
@Override
public void close() {
if (watchDog != null) {

View file

@ -67,7 +67,8 @@ public class PythonCondaInterpreter extends Interpreter {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
InterpreterOutput out = context.out;
Matcher activateMatcher = PATTERN_COMMAND_ACTIVATE.matcher(st);
Matcher createMatcher = PATTERN_COMMAND_CREATE.matcher(st);
@ -126,7 +127,7 @@ public class PythonCondaInterpreter extends Interpreter {
}
private void changePythonEnvironment(String envName)
throws IOException, InterruptedException {
throws IOException, InterruptedException, InterpreterException {
PythonInterpreter python = getPythonInterpreter();
String binPath = null;
if (envName == null) {
@ -147,13 +148,13 @@ public class PythonCondaInterpreter extends Interpreter {
python.setPythonCommand(binPath);
}
private void restartPythonProcess() {
private void restartPythonProcess() throws InterpreterException {
PythonInterpreter python = getPythonInterpreter();
python.close();
python.open();
}
protected PythonInterpreter getPythonInterpreter() {
protected PythonInterpreter getPythonInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p =
@ -213,7 +214,7 @@ public class PythonCondaInterpreter extends Interpreter {
}
private InterpreterResult runCondaActivate(String envName)
throws IOException, InterruptedException {
throws IOException, InterruptedException, InterpreterException {
if (null == envName || envName.isEmpty()) {
return new InterpreterResult(Code.ERROR, "Env name should be specified");
@ -226,7 +227,7 @@ public class PythonCondaInterpreter extends Interpreter {
}
private InterpreterResult runCondaDeactivate()
throws IOException, InterruptedException {
throws IOException, InterruptedException, InterpreterException {
changePythonEnvironment(null);
restartPythonProcess();
@ -375,10 +376,16 @@ public class PythonCondaInterpreter extends Interpreter {
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
PythonInterpreter pythonInterpreter = null;
try {
pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
} catch (InterpreterException e) {
e.printStackTrace();
return null;
}
}

View file

@ -56,7 +56,8 @@ public class PythonDockerInterpreter extends Interpreter {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
File pythonScript = new File(getPythonInterpreter().getScriptPath());
InterpreterOutput out = context.out;
@ -105,7 +106,7 @@ public class PythonDockerInterpreter extends Interpreter {
}
public void setPythonCommand(String cmd) {
public void setPythonCommand(String cmd) throws InterpreterException {
PythonInterpreter python = getPythonInterpreter();
python.setPythonCommand(cmd);
}
@ -140,21 +141,27 @@ public class PythonDockerInterpreter extends Interpreter {
*/
@Override
public Scheduler getScheduler() {
PythonInterpreter pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
PythonInterpreter pythonInterpreter = null;
try {
pythonInterpreter = getPythonInterpreter();
if (pythonInterpreter != null) {
return pythonInterpreter.getScheduler();
} else {
return null;
}
} catch (InterpreterException e) {
e.printStackTrace();
return null;
}
}
private void restartPythonProcess() {
private void restartPythonProcess() throws InterpreterException {
PythonInterpreter python = getPythonInterpreter();
python.close();
python.open();
}
protected PythonInterpreter getPythonInterpreter() {
protected PythonInterpreter getPythonInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
@ -173,7 +180,7 @@ public class PythonDockerInterpreter extends Interpreter {
return python;
}
public boolean pull(InterpreterOutput out, String image) {
public boolean pull(InterpreterOutput out, String image) throws InterpreterException {
int exit = 0;
try {
exit = runCommand(out, "docker", "pull", image);

View file

@ -57,7 +57,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
import py4j.commands.Command;
/**
* Python interpreter for Zeppelin.
@ -101,7 +100,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
File scriptFile = File.createTempFile("zeppelin_python-", ".py", new File("/tmp"));
scriptPath = scriptFile.getAbsolutePath();
} catch (IOException e) {
throw new InterpreterException(e);
throw new RuntimeException(e);
}
}
@ -116,7 +115,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
return path;
}
private void createPythonScript() {
private void createPythonScript() throws InterpreterException {
File out = new File(scriptPath);
if (out.exists() && out.isDirectory()) {
@ -131,7 +130,7 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
return scriptPath;
}
private void copyFile(File out, String sourceFile) {
private void copyFile(File out, String sourceFile) throws InterpreterException {
ClassLoader classLoader = getClass().getClassLoader();
try {
FileOutputStream outStream = new FileOutputStream(out);
@ -144,7 +143,8 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
}
private void createGatewayServerAndStartScript() throws UnknownHostException {
private void createGatewayServerAndStartScript()
throws UnknownHostException, InterpreterException {
createPythonScript();
if (System.getenv("ZEPPELIN_HOME") != null) {
py4jLibPath = System.getenv("ZEPPELIN_HOME") + File.separator + ZEPPELIN_PY4JPATH;
@ -219,11 +219,11 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
@Override
public void open() {
public void open() throws InterpreterException {
// try IPythonInterpreter first. If it is not available, we will fallback to the original
// python interpreter implementation.
iPythonInterpreter = getIPythonInterpreter();
if (getProperty().getProperty("zeppelin.python.useIPython", "true").equals("true") &&
if (getProperty("zeppelin.python.useIPython", "true").equals("true") &&
iPythonInterpreter.checkIPythonPrerequisite()) {
try {
iPythonInterpreter.open();
@ -369,7 +369,8 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
}
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter)
throws InterpreterException {
if (iPythonInterpreter != null) {
return iPythonInterpreter.interpret(cmd, contextInterpreter);
}
@ -551,7 +552,11 @@ public class PythonInterpreter extends Interpreter implements ExecuteResultHandl
bootstrapCode += line + "\n";
}
interpret(bootstrapCode, context);
try {
interpret(bootstrapCode, context);
} catch (InterpreterException e) {
throw new IOException(e);
}
}
public GUI getGui() {

View file

@ -22,6 +22,7 @@ import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
@ -42,7 +43,7 @@ public class PythonInterpreterPandasSql extends Interpreter {
super(property);
}
PythonInterpreter getPythonInterpreter() {
PythonInterpreter getPythonInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
PythonInterpreter python = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(PythonInterpreter.class.getName());
@ -62,7 +63,7 @@ public class PythonInterpreterPandasSql extends Interpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
LOG.info("Open Python SQL interpreter instance: {}", this.toString());
try {
@ -76,14 +77,15 @@ public class PythonInterpreterPandasSql extends Interpreter {
}
@Override
public void close() {
public void close() throws InterpreterException {
LOG.info("Close Python SQL interpreter instance: {}", this.toString());
Interpreter python = getPythonInterpreter();
python.close();
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
LOG.info("Running SQL query: '{}' over Pandas DataFrame", st);
Interpreter python = getPythonInterpreter();

View file

@ -23,6 +23,7 @@ import org.apache.zeppelin.display.ui.Select;
import org.apache.zeppelin.display.ui.TextBox;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@ -56,7 +57,7 @@ public class IPythonInterpreterTest {
private IPythonInterpreter interpreter;
@Before
public void setUp() {
public void setUp() throws InterpreterException {
Properties properties = new Properties();
interpreter = new IPythonInterpreter(properties);
InterpreterGroup mockInterpreterGroup = mock(InterpreterGroup.class);
@ -71,11 +72,11 @@ public class IPythonInterpreterTest {
@Test
public void testIPython() throws IOException, InterruptedException {
public void testIPython() throws IOException, InterruptedException, InterpreterException {
testInterpreter(interpreter);
}
public static void testInterpreter(final Interpreter interpreter) throws IOException, InterruptedException {
public static void testInterpreter(final Interpreter interpreter) throws IOException, InterruptedException, InterpreterException {
// to make this test can run under both python2 and python3
InterpreterResult result = interpreter.interpret("from __future__ import print_function", getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@ -261,7 +262,11 @@ public class IPythonInterpreterTest {
} catch (InterruptedException e) {
e.printStackTrace();
}
interpreter.cancel(context2);
try {
interpreter.cancel(context2);
} catch (InterpreterException e) {
e.printStackTrace();
}
}
}.start();
result = interpreter.interpret("import time\ntime.sleep(10)", context2);

View file

@ -37,7 +37,7 @@ public class PythonCondaInterpreterTest {
private PythonInterpreter python;
@Before
public void setUp() {
public void setUp() throws InterpreterException {
conda = spy(new PythonCondaInterpreter(new Properties()));
python = mock(PythonInterpreter.class);
@ -57,7 +57,7 @@ public class PythonCondaInterpreterTest {
}
@Test
public void testListEnv() throws IOException, InterruptedException {
public void testListEnv() throws IOException, InterruptedException, InterpreterException {
setMockCondaEnvList();
// list available env
@ -72,7 +72,7 @@ public class PythonCondaInterpreterTest {
}
@Test
public void testActivateEnv() throws IOException, InterruptedException {
public void testActivateEnv() throws IOException, InterruptedException, InterpreterException {
setMockCondaEnvList();
String envname = "env1";
InterpreterContext context = getInterpreterContext();
@ -84,7 +84,7 @@ public class PythonCondaInterpreterTest {
}
@Test
public void testDeactivate() {
public void testDeactivate() throws InterpreterException {
InterpreterContext context = getInterpreterContext();
conda.interpret("deactivate", context);
verify(python, times(1)).open();

View file

@ -41,7 +41,7 @@ public class PythonDockerInterpreterTest {
private PythonInterpreter python;
@Before
public void setUp() {
public void setUp() throws InterpreterException {
docker = spy(new PythonDockerInterpreter(new Properties()));
python = mock(PythonInterpreter.class);
@ -58,7 +58,7 @@ public class PythonDockerInterpreterTest {
}
@Test
public void testActivateEnv() {
public void testActivateEnv() throws InterpreterException {
InterpreterContext context = getInterpreterContext();
docker.interpret("activate env", context);
verify(python, times(1)).open();
@ -68,7 +68,7 @@ public class PythonDockerInterpreterTest {
}
@Test
public void testDeactivate() {
public void testDeactivate() throws InterpreterException {
InterpreterContext context = getInterpreterContext();
docker.interpret("deactivate", context);
verify(python, times(1)).open();

View file

@ -22,6 +22,7 @@ import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@ -83,7 +84,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene
}
@Test
public void dependenciesAreInstalled() {
public void dependenciesAreInstalled() throws InterpreterException {
// matplotlib
InterpreterResult ret = python.interpret("import matplotlib", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
@ -94,7 +95,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene
}
@Test
public void showPlot() throws IOException {
public void showPlot() throws IOException, InterpreterException {
// Simple plot test
InterpreterResult ret;
ret = python.interpret("import matplotlib.pyplot as plt", context);
@ -111,7 +112,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene
@Test
// Test for when configuration is set to auto-close figures after show().
public void testClose() throws IOException {
public void testClose() throws IOException, InterpreterException {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
@ -145,7 +146,7 @@ public class PythonInterpreterMatplotlibTest implements InterpreterOutputListene
@Test
// Test for when configuration is set to not auto-close figures after show().
public void testNoClose() throws IOException {
public void testNoClose() throws IOException, InterpreterException {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;

View file

@ -33,6 +33,7 @@ import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@ -107,18 +108,18 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener
}
@After
public void afterTest() throws IOException {
public void afterTest() throws IOException, InterpreterException {
sql.close();
}
@Test
public void dependenciesAreInstalled() {
public void dependenciesAreInstalled() throws InterpreterException {
InterpreterResult ret = python.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
public void errorMessageIfDependenciesNotInstalled() {
public void errorMessageIfDependenciesNotInstalled() throws InterpreterException {
InterpreterResult ret;
ret = sql.interpret("SELECT * from something", context);
@ -128,7 +129,7 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener
}
@Test
public void sqlOverTestDataPrintsTable() throws IOException {
public void sqlOverTestDataPrintsTable() throws IOException, InterpreterException {
InterpreterResult ret;
// given
//String expectedTable = "name\tage\n\nmoon\t33\n\npark\t34";
@ -152,7 +153,7 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener
}
@Test
public void badSqlSyntaxFails() throws IOException {
public void badSqlSyntaxFails() throws IOException, InterpreterException {
//when
InterpreterResult ret = sql.interpret("select wrong syntax", context);
@ -162,7 +163,7 @@ public class PythonInterpreterPandasSqlTest implements InterpreterOutputListener
}
@Test
public void showDataFrame() throws IOException {
public void showDataFrame() throws IOException, InterpreterException {
InterpreterResult ret;
ret = python.interpret("import pandas as pd", context);
ret = python.interpret("import numpy as np", context);

View file

@ -38,6 +38,7 @@ import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@ -64,7 +65,7 @@ public class PythonInterpreterTest implements InterpreterOutputListener {
}
@Before
public void beforeTest() throws IOException {
public void beforeTest() throws IOException, InterpreterException {
cmdHistory = "";
// python interpreter
@ -96,20 +97,20 @@ public class PythonInterpreterTest implements InterpreterOutputListener {
}
@Test
public void testInterpret() throws InterruptedException, IOException {
public void testInterpret() throws InterruptedException, IOException, InterpreterException {
InterpreterResult result = pythonInterpreter.interpret("print (\"hi\")", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
@Test
public void testInterpretInvalidSyntax() throws IOException {
public void testInterpretInvalidSyntax() throws IOException, InterpreterException {
InterpreterResult result = pythonInterpreter.interpret("for x in range(0,3): print (\"hi\")\n", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(new String(out.getOutputAt(0).toByteArray()).contains("hi\nhi\nhi"));
}
@Test
public void testRedefinitionZeppelinContext() {
public void testRedefinitionZeppelinContext() throws InterpreterException {
String pyRedefinitionCode = "z = 1\n";
String pyRestoreCode = "z = __zeppelin__\n";
String pyValidCode = "z.input(\"test\")\n";

View file

@ -69,7 +69,7 @@ public class ScaldingInterpreter extends Interpreter {
@Override
public void open() {
numOpenInstances = numOpenInstances + 1;
String maxOpenInstancesStr = property.getProperty(MAX_OPEN_INSTANCES,
String maxOpenInstancesStr = getProperty(MAX_OPEN_INSTANCES,
MAX_OPEN_INSTANCES_DEFAULT);
int maxOpenInstances = 50;
try {
@ -83,8 +83,8 @@ public class ScaldingInterpreter extends Interpreter {
return;
}
logger.info("Opening instance {}", numOpenInstances);
logger.info("property: {}", property);
String argsString = property.getProperty(ARGS_STRING, ARGS_STRING_DEFAULT);
logger.info("property: {}", getProperties());
String argsString = getProperty(ARGS_STRING, ARGS_STRING_DEFAULT);
String[] args;
if (argsString == null) {
args = new String[0];
@ -121,7 +121,7 @@ public class ScaldingInterpreter extends Interpreter {
return new InterpreterResult(Code.SUCCESS);
}
InterpreterResult interpreterResult = new InterpreterResult(Code.ERROR);
if (property.getProperty(ARGS_STRING).contains("hdfs")) {
if (getProperty(ARGS_STRING).contains("hdfs")) {
UserGroupInformation ugi = null;
try {
ugi = UserGroupInformation.createProxyUser(user, UserGroupInformation.getLoginUser());

View file

@ -174,8 +174,8 @@ public class ShellInterpreter extends KerberosInterpreter {
return false;
}
public void createSecureConfiguration() {
Properties properties = getProperty();
public void createSecureConfiguration() throws InterpreterException {
Properties properties = getProperties();
CommandLine cmdLine = CommandLine.parse(shell);
cmdLine.addArgument("-c", false);
String kinitCommand = String.format("kinit -k -t %s %s",

View file

@ -21,12 +21,15 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.python.IPythonInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
/**
@ -43,8 +46,9 @@ public class IPySparkInterpreter extends IPythonInterpreter {
}
@Override
public void open() {
property.setProperty("zeppelin.python", PySparkInterpreter.getPythonExec(property));
public void open() throws InterpreterException {
setProperty("zeppelin.python",
PySparkInterpreter.getPythonExec(getProperties()));
sparkInterpreter = getSparkInterpreter();
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
// only set PYTHONPATH in local or yarn-client mode.
@ -57,7 +61,18 @@ public class IPySparkInterpreter extends IPythonInterpreter {
super.open();
}
private SparkInterpreter getSparkInterpreter() {
@Override
protected Map<String, String> setupIPythonEnv() throws IOException {
Map<String, String> env = super.setupIPythonEnv();
// set PYSPARK_PYTHON
SparkConf conf = sparkInterpreter.getSparkContext().getConf();
if (conf.contains("spark.pyspark.python")) {
env.put("PYSPARK_PYTHON", conf.get("spark.pyspark.python"));
}
return env;
}
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());

View file

@ -86,11 +86,11 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py");
scriptPath = scriptFile.getAbsolutePath();
} catch (IOException e) {
throw new InterpreterException(e);
throw new RuntimeException(e);
}
}
private void createPythonScript() {
private void createPythonScript() throws InterpreterException {
ClassLoader classLoader = getClass().getClassLoader();
File out = new File(scriptPath);
@ -112,10 +112,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
@Override
public void open() {
public void open() throws InterpreterException {
// try IPySparkInterpreter first
iPySparkInterpreter = getIPySparkInterpreter();
if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true") &&
if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true") &&
iPySparkInterpreter.checkIPythonPrerequisite()) {
try {
iPySparkInterpreter.open();
@ -132,8 +132,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
iPySparkInterpreter = null;
if (property.getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) {
if (getProperty("zeppelin.pyspark.useIPython", "true").equals("true")) {
// don't print it when it is in testing, just for easy output check in test.
try {
InterpreterContext.get().out.write(("IPython is not available, " +
@ -202,7 +201,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
private Map setupPySparkEnv() throws IOException {
private Map setupPySparkEnv() throws IOException, InterpreterException {
Map env = EnvironmentUtils.getProcEnvironment();
// only set PYTHONPATH in local or yarn-client mode.
@ -229,6 +228,11 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
LOGGER.info("PYTHONPATH: " + env.get("PYTHONPATH"));
// set PYSPARK_PYTHON
if (getSparkConf().contains("spark.pyspark.python")) {
env.put("PYSPARK_PYTHON", getSparkConf().get("spark.pyspark.python"));
}
return env;
}
@ -246,7 +250,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
return pythonExec;
}
private void createGatewayServerAndStartScript() {
private void createGatewayServerAndStartScript() throws InterpreterException {
// create python script
createPythonScript();
@ -255,7 +259,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
gatewayServer = new GatewayServer(this, port);
gatewayServer.start();
String pythonExec = getPythonExec(property);
String pythonExec = getPythonExec(getProperties());
LOGGER.info("pythonExec: " + pythonExec);
CommandLine cmd = CommandLine.parse(pythonExec);
cmd.addArgument(scriptPath, false);
@ -295,7 +299,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
private int findRandomOpenPortOnAllLocalInterfaces() {
private int findRandomOpenPortOnAllLocalInterfaces() throws InterpreterException {
int port;
try (ServerSocket socket = new ServerSocket(0);) {
port = socket.getLocalPort();
@ -394,7 +398,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
sparkInterpreter.populateSparkWebUrl(context);
if (sparkInterpreter.isUnsupportedSparkVersion()) {
@ -500,7 +505,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
@Override
public void cancel(InterpreterContext context) {
public void cancel(InterpreterContext context) throws InterpreterException {
if (iPySparkInterpreter != null) {
iPySparkInterpreter.cancel(context);
return;
@ -520,7 +525,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
@Override
public int getProgress(InterpreterContext context) {
public int getProgress(InterpreterContext context) throws InterpreterException {
if (iPySparkInterpreter != null) {
return iPySparkInterpreter.getProgress(context);
}
@ -531,7 +536,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
InterpreterContext interpreterContext) throws InterpreterException {
if (iPySparkInterpreter != null) {
return iPySparkInterpreter.completion(buf, cursor, interpreterContext);
}
@ -632,7 +637,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
private SparkInterpreter getSparkInterpreter() {
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
@ -666,7 +671,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
return iPySpark;
}
public SparkZeppelinContext getZeppelinContext() {
public SparkZeppelinContext getZeppelinContext() throws InterpreterException {
SparkInterpreter sparkIntp = getSparkInterpreter();
if (sparkIntp != null) {
return getSparkInterpreter().getZeppelinContext();
@ -675,7 +680,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
public JavaSparkContext getJavaSparkContext() {
public JavaSparkContext getJavaSparkContext() throws InterpreterException {
SparkInterpreter intp = getSparkInterpreter();
if (intp == null) {
return null;
@ -684,7 +689,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
public Object getSparkSession() {
public Object getSparkSession() throws InterpreterException {
SparkInterpreter intp = getSparkInterpreter();
if (intp == null) {
return null;
@ -693,7 +698,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
public SparkConf getSparkConf() {
public SparkConf getSparkConf() throws InterpreterException {
JavaSparkContext sc = getJavaSparkContext();
if (sc == null) {
return null;
@ -702,7 +707,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
public SQLContext getSQLContext() {
public SQLContext getSQLContext() throws InterpreterException {
SparkInterpreter intp = getSparkInterpreter();
if (intp == null) {
return null;

View file

@ -353,7 +353,7 @@ public class SparkInterpreter extends Interpreter {
public boolean isYarnMode() {
String master = getProperty("master");
if (master == null) {
master = getProperty().getProperty("spark.master", "local[*]");
master = getProperty("spark.master", "local[*]");
}
return master.startsWith("yarn");
}
@ -376,7 +376,7 @@ public class SparkInterpreter extends Interpreter {
}
conf.set("spark.scheduler.mode", "FAIR");
Properties intpProperty = getProperty();
Properties intpProperty = getProperties();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));
@ -509,7 +509,7 @@ public class SparkInterpreter extends Interpreter {
}
conf.set("spark.scheduler.mode", "FAIR");
Properties intpProperty = getProperty();
Properties intpProperty = getProperties();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));
@ -543,19 +543,19 @@ public class SparkInterpreter extends Interpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
this.enableSupportedVersionCheck = java.lang.Boolean.parseBoolean(
property.getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
getProperty("zeppelin.spark.enableSupportedVersionCheck", "true"));
// set properties and do login before creating any spark stuff for secured cluster
if (isYarnMode()) {
System.setProperty("SPARK_YARN_MODE", "true");
}
if (getProperty().containsKey("spark.yarn.keytab") &&
getProperty().containsKey("spark.yarn.principal")) {
if (getProperties().containsKey("spark.yarn.keytab") &&
getProperties().containsKey("spark.yarn.principal")) {
try {
String keytab = getProperty().getProperty("spark.yarn.keytab");
String principal = getProperty().getProperty("spark.yarn.principal");
String keytab = getProperties().getProperty("spark.yarn.keytab");
String principal = getProperties().getProperty("spark.yarn.principal");
UserGroupInformation.loginUserFromKeytab(principal, keytab);
} catch (IOException e) {
throw new RuntimeException("Can not pass kerberos authentication", e);
@ -963,7 +963,7 @@ public class SparkInterpreter extends Interpreter {
sparkUrl = getSparkUIUrl();
Map<String, String> infos = new java.util.HashMap<>();
infos.put("url", sparkUrl);
String uiEnabledProp = property.getProperty("spark.ui.enabled", "true");
String uiEnabledProp = getProperty("spark.ui.enabled", "true");
java.lang.Boolean uiEnabled = java.lang.Boolean.parseBoolean(
uiEnabledProp.trim());
if (!uiEnabled) {

View file

@ -54,7 +54,7 @@ public class SparkRInterpreter extends Interpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
String rCmdPath = getProperty("zeppelin.R.cmd");
String sparkRLibPath;
@ -105,7 +105,8 @@ public class SparkRInterpreter extends Interpreter {
}
@Override
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) {
public InterpreterResult interpret(String lines, InterpreterContext interpreterContext)
throws InterpreterException {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
sparkInterpreter.populateSparkWebUrl(interpreterContext);
@ -220,7 +221,7 @@ public class SparkRInterpreter extends Interpreter {
return new ArrayList<>();
}
private SparkInterpreter getSparkInterpreter() {
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());

View file

@ -59,7 +59,7 @@ public class SparkSqlInterpreter extends Interpreter {
this.maxResult = Integer.parseInt(getProperty(MAX_RESULTS));
}
private SparkInterpreter getSparkInterpreter() {
private SparkInterpreter getSparkInterpreter() throws InterpreterException {
LazyOpenInterpreter lazy = null;
SparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class.getName());
@ -86,7 +86,8 @@ public class SparkSqlInterpreter extends Interpreter {
public void close() {}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
SQLContext sqlc = null;
SparkInterpreter sparkInterpreter = getSparkInterpreter();
@ -134,7 +135,7 @@ public class SparkSqlInterpreter extends Interpreter {
}
@Override
public void cancel(InterpreterContext context) {
public void cancel(InterpreterContext context) throws InterpreterException {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
SQLContext sqlc = sparkInterpreter.getSQLContext();
SparkContext sc = sqlc.sparkContext();
@ -149,7 +150,7 @@ public class SparkSqlInterpreter extends Interpreter {
@Override
public int getProgress(InterpreterContext context) {
public int getProgress(InterpreterContext context) throws InterpreterException {
SparkInterpreter sparkInterpreter = getSparkInterpreter();
return sparkInterpreter.getProgress(context);
}

View file

@ -79,7 +79,7 @@ public class SparkZeppelinContext extends BaseZeppelinContext {
}
if (supportedClasses.isEmpty()) {
throw new InterpreterException("Can not load Dataset/DataFrame/SchemaRDD class");
throw new RuntimeException("Can not load Dataset/DataFrame/SchemaRDD class");
}
}
@ -112,7 +112,7 @@ public class SparkZeppelinContext extends BaseZeppelinContext {
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException | ClassCastException e) {
sc.clearJobGroup();
throw new InterpreterException(e);
throw new RuntimeException(e);
}
List<Attribute> columns = null;
@ -129,7 +129,7 @@ public class SparkZeppelinContext extends BaseZeppelinContext {
.asJava();
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
throw new RuntimeException(e);
}
StringBuilder msg = new StringBuilder();
@ -165,7 +165,7 @@ public class SparkZeppelinContext extends BaseZeppelinContext {
}
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
throw new RuntimeException(e);
}
if (rows.length > maxResult) {

View file

@ -117,7 +117,7 @@ public class ZeppelinR implements ExecuteResultHandler {
File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R");
scriptPath = scriptFile.getAbsolutePath();
} catch (IOException e) {
throw new InterpreterException(e);
throw new RuntimeException(e);
}
}
@ -125,7 +125,7 @@ public class ZeppelinR implements ExecuteResultHandler {
* Start R repl
* @throws IOException
*/
public void open() throws IOException {
public void open() throws IOException, InterpreterException {
createRScript();
zeppelinR.put(hashCode(), this);
@ -170,7 +170,7 @@ public class ZeppelinR implements ExecuteResultHandler {
* @param expr
* @return
*/
public Object eval(String expr) {
public Object eval(String expr) throws InterpreterException {
synchronized (this) {
rRequestObject = new Request("eval", expr, null);
return request();
@ -182,7 +182,7 @@ public class ZeppelinR implements ExecuteResultHandler {
* @param key
* @param value
*/
public void set(String key, Object value) {
public void set(String key, Object value) throws InterpreterException {
synchronized (this) {
rRequestObject = new Request("set", key, value);
request();
@ -194,7 +194,7 @@ public class ZeppelinR implements ExecuteResultHandler {
* @param key
* @return
*/
public Object get(String key) {
public Object get(String key) throws InterpreterException {
synchronized (this) {
rRequestObject = new Request("get", key, null);
return request();
@ -206,7 +206,7 @@ public class ZeppelinR implements ExecuteResultHandler {
* @param key
* @return
*/
public String getS0(String key) {
public String getS0(String key) throws InterpreterException {
synchronized (this) {
rRequestObject = new Request("getS", key, null);
return (String) request();
@ -217,7 +217,7 @@ public class ZeppelinR implements ExecuteResultHandler {
* Send request to r repl and return response
* @return responseValue
*/
private Object request() throws RuntimeException {
private Object request() throws RuntimeException, InterpreterException {
if (!rScriptRunning) {
throw new RuntimeException("r repl is not running");
}
@ -332,7 +332,7 @@ public class ZeppelinR implements ExecuteResultHandler {
/**
* Create R script in tmp dir
*/
private void createRScript() {
private void createRScript() throws InterpreterException {
ClassLoader classLoader = getClass().getClassLoader();
File out = new File(scriptPath);

View file

@ -24,6 +24,7 @@ import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
@ -55,7 +56,7 @@ public class IPySparkInterpreterTest {
private InterpreterGroup intpGroup;
@Before
public void setup() {
public void setup() throws InterpreterException {
Properties p = new Properties();
p.setProperty("spark.master", "local[4]");
p.setProperty("master", "local[4]");
@ -90,7 +91,7 @@ public class IPySparkInterpreterTest {
}
@Test
public void testBasics() throws InterruptedException, IOException {
public void testBasics() throws InterruptedException, IOException, InterpreterException {
// all the ipython test should pass too.
IPythonInterpreterTest.testInterpreter(iPySparkInterpreter);

View file

@ -64,7 +64,7 @@ public class PySparkInterpreterMatplotlibTest {
* normally handles this in real use cases.
*/
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context) throws InterpreterException {
context.out.clear();
InterpreterResult result = super.interpret(st, context);
List<InterpreterResultMessage> resultMessages = null;
@ -140,7 +140,7 @@ public class PySparkInterpreterMatplotlibTest {
}
@Test
public void dependenciesAreInstalled() {
public void dependenciesAreInstalled() throws InterpreterException {
// matplotlib
InterpreterResult ret = pyspark.interpret("import matplotlib", context);
assertEquals(ret.message().toString(), InterpreterResult.Code.SUCCESS, ret.code());
@ -151,7 +151,7 @@ public class PySparkInterpreterMatplotlibTest {
}
@Test
public void showPlot() {
public void showPlot() throws InterpreterException {
// Simple plot test
InterpreterResult ret;
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
@ -168,7 +168,7 @@ public class PySparkInterpreterMatplotlibTest {
@Test
// Test for when configuration is set to auto-close figures after show().
public void testClose() {
public void testClose() throws InterpreterException {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
@ -195,7 +195,7 @@ public class PySparkInterpreterMatplotlibTest {
@Test
// Test for when configuration is set to not auto-close figures after show().
public void testNoClose() {
public void testNoClose() throws InterpreterException {
InterpreterResult ret;
InterpreterResult ret1;
InterpreterResult ret2;
@ -222,7 +222,7 @@ public class PySparkInterpreterMatplotlibTest {
@Test
// Test angular mode
public void testAngular() {
public void testAngular() throws InterpreterException {
InterpreterResult ret;
ret = pyspark.interpret("import matplotlib.pyplot as plt", context);
ret = pyspark.interpret("plt.close()", context);

View file

@ -112,7 +112,7 @@ public class PySparkInterpreterTest {
}
@Test
public void testBasicIntp() {
public void testBasicIntp() throws InterpreterException {
if (getSparkVersionNumber() > 11) {
assertEquals(InterpreterResult.Code.SUCCESS,
pySparkInterpreter.interpret("a = 1\n", context).code());
@ -136,7 +136,7 @@ public class PySparkInterpreterTest {
}
@Test
public void testCompletion() {
public void testCompletion() throws InterpreterException {
if (getSparkVersionNumber() > 11) {
List<InterpreterCompletion> completions = pySparkInterpreter.completion("sc.", "sc.".length(), null);
assertTrue(completions.size() > 0);
@ -144,7 +144,7 @@ public class PySparkInterpreterTest {
}
@Test
public void testRedefinitionZeppelinContext() {
public void testRedefinitionZeppelinContext() throws InterpreterException {
if (getSparkVersionNumber() > 11) {
String redefinitionCode = "z = 1\n";
String restoreCode = "z = __zeppelin__\n";
@ -162,7 +162,12 @@ public class PySparkInterpreterTest {
@Override
public void run() {
String code = "import time\nwhile True:\n time.sleep(1)" ;
InterpreterResult ret = pySparkInterpreter.interpret(code, context);
InterpreterResult ret = null;
try {
ret = pySparkInterpreter.interpret(code, context);
} catch (InterpreterException e) {
e.printStackTrace();
}
assertNotNull(ret);
Pattern expectedMessage = Pattern.compile("KeyboardInterrupt");
Matcher m = expectedMessage.matcher(ret.message().toString());
@ -171,7 +176,7 @@ public class PySparkInterpreterTest {
}
@Test
public void testCancelIntp() throws InterruptedException {
public void testCancelIntp() throws InterruptedException, InterpreterException {
if (getSparkVersionNumber() > 11) {
assertEquals(InterpreterResult.Code.SUCCESS,
pySparkInterpreter.interpret("a = 1\n", context).code());

View file

@ -213,7 +213,7 @@ public class SparkInterpreterTest {
}
@Test
public void testSparkSql() throws IOException {
public void testSparkSql() throws IOException, InterpreterException {
repl.interpret("case class Person(name:String, age:Int)\n", context);
repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
assertEquals(Code.SUCCESS, repl.interpret("people.take(3)", context).code());
@ -243,7 +243,7 @@ public class SparkInterpreterTest {
@Test
public void emptyConfigurationVariablesOnlyForNonSparkProperties() {
Properties intpProperty = repl.getProperty();
Properties intpProperty = repl.getProperties();
SparkConf sparkConf = repl.getSparkContext().getConf();
for (Object oKey : intpProperty.keySet()) {
String key = (String) oKey;
@ -256,7 +256,7 @@ public class SparkInterpreterTest {
}
@Test
public void shareSingleSparkContext() throws InterruptedException, IOException {
public void shareSingleSparkContext() throws InterruptedException, IOException, InterpreterException {
// create another SparkInterpreter
SparkInterpreter repl2 = new SparkInterpreter(getSparkTestProperties(tmpDir));
repl2.setInterpreterGroup(intpGroup);
@ -272,7 +272,7 @@ public class SparkInterpreterTest {
}
@Test
public void testEnableImplicitImport() throws IOException {
public void testEnableImplicitImport() throws IOException, InterpreterException {
if (getSparkVersionNumber(repl) >= 13) {
// Set option of importing implicits to "true", and initialize new Spark repl
Properties p = getSparkTestProperties(tmpDir);
@ -289,7 +289,7 @@ public class SparkInterpreterTest {
}
@Test
public void testDisableImplicitImport() throws IOException {
public void testDisableImplicitImport() throws IOException, InterpreterException {
if (getSparkVersionNumber(repl) >= 13) {
// Set option of importing implicits to "false", and initialize new Spark repl
// this test should return error status when creating DataFrame from sequence

View file

@ -85,7 +85,7 @@ public class SparkSqlInterpreterTest {
}
@Test
public void test() {
public void test() throws InterpreterException {
repl.interpret("case class Test(name:String, age:Int)", context);
repl.interpret("val test = sc.parallelize(Seq(Test(\"moon\", 33), Test(\"jobs\", 51), Test(\"gates\", 51), Test(\"park\", 34)))", context);
if (isDataFrameSupported()) {
@ -107,7 +107,7 @@ public class SparkSqlInterpreterTest {
}
@Test
public void testStruct() {
public void testStruct() throws InterpreterException {
repl.interpret("case class Person(name:String, age:Int)", context);
repl.interpret("case class People(group:String, person:Person)", context);
repl.interpret(
@ -124,7 +124,7 @@ public class SparkSqlInterpreterTest {
}
@Test
public void test_null_value_in_row() {
public void test_null_value_in_row() throws InterpreterException {
repl.interpret("import org.apache.spark.sql._", context);
if (isDataFrameSupported()) {
repl.interpret(
@ -162,7 +162,7 @@ public class SparkSqlInterpreterTest {
}
@Test
public void testMaxResults() {
public void testMaxResults() throws InterpreterException {
repl.interpret("case class P(age:Int)", context);
repl.interpret(
"val gr = sc.parallelize(Seq(P(1),P(2),P(3),P(4),P(5),P(6),P(7),P(8),P(9),P(10),P(11)))",

View file

@ -66,6 +66,11 @@
<artifactId>gson-extras</artifactId>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>

View file

@ -168,7 +168,7 @@ public abstract class BaseZeppelinContext {
interpreterContext.out.write(o.toString());
}
} catch (IOException e) {
throw new InterpreterException(e);
throw new RuntimeException(e);
}
}
@ -229,14 +229,14 @@ public abstract class BaseZeppelinContext {
public void run(String noteId, String paragraphId, InterpreterContext context,
boolean checkCurrentParagraph) {
if (paragraphId.equals(context.getParagraphId()) && checkCurrentParagraph) {
throw new InterpreterException("Can not run current Paragraph");
throw new RuntimeException("Can not run current Paragraph");
}
List<InterpreterContextRunner> runners =
getInterpreterContextRunner(noteId, paragraphId, context);
if (runners.size() <= 0) {
throw new InterpreterException("Paragraph " + paragraphId + " not found " + runners.size());
throw new RuntimeException("Paragraph " + paragraphId + " not found " + runners.size());
}
for (InterpreterContextRunner r : runners) {
@ -255,7 +255,7 @@ public abstract class BaseZeppelinContext {
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
if (runners.size() <= 0) {
throw new InterpreterException("Note " + noteId + " not found " + runners.size());
throw new RuntimeException("Note " + noteId + " not found " + runners.size());
}
for (InterpreterContextRunner r : runners) {
@ -346,12 +346,12 @@ public abstract class BaseZeppelinContext {
boolean checkCurrentParagraph) {
List<InterpreterContextRunner> runners = getInterpreterContextRunner(noteId, context);
if (idx >= runners.size()) {
throw new InterpreterException("Index out of bound");
throw new RuntimeException("Index out of bound");
}
InterpreterContextRunner runner = runners.get(idx);
if (runner.getParagraphId().equals(context.getParagraphId()) && checkCurrentParagraph) {
throw new InterpreterException("Can not run current Paragraph: " + runner.getParagraphId());
throw new RuntimeException("Can not run current Paragraph: " + runner.getParagraphId());
}
runner.run();
@ -377,7 +377,7 @@ public abstract class BaseZeppelinContext {
Integer idx = (Integer) idOrIdx;
run(noteId, idx, context);
} else {
throw new InterpreterException("Paragraph " + idOrIdx + " not found");
throw new RuntimeException("Paragraph " + idOrIdx + " not found");
}
}
}

View file

@ -51,7 +51,8 @@ public class ClassloaderInterpreter
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
@ -68,7 +69,7 @@ public class ClassloaderInterpreter
@Override
public void open() {
public void open() throws InterpreterException {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
@ -82,7 +83,7 @@ public class ClassloaderInterpreter
}
@Override
public void close() {
public void close() throws InterpreterException {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
@ -96,7 +97,7 @@ public class ClassloaderInterpreter
}
@Override
public void cancel(InterpreterContext context) {
public void cancel(InterpreterContext context) throws InterpreterException {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
@ -110,13 +111,11 @@ public class ClassloaderInterpreter
}
@Override
public FormType getFormType() {
public FormType getFormType() throws InterpreterException {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
return intp.getFormType();
} catch (Exception e) {
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -124,7 +123,7 @@ public class ClassloaderInterpreter
}
@Override
public int getProgress(InterpreterContext context) {
public int getProgress(InterpreterContext context) throws InterpreterException {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
@ -143,8 +142,6 @@ public class ClassloaderInterpreter
Thread.currentThread().setContextClassLoader(cl);
try {
return intp.getScheduler();
} catch (Exception e) {
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -153,14 +150,12 @@ public class ClassloaderInterpreter
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
InterpreterContext interpreterContext) throws InterpreterException {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
List completion = intp.completion(buf, cursor, interpreterContext);
return completion;
} catch (Exception e) {
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -174,8 +169,6 @@ public class ClassloaderInterpreter
Thread.currentThread().setContextClassLoader(cl);
try {
return intp.getClassName();
} catch (Exception e) {
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -188,8 +181,6 @@ public class ClassloaderInterpreter
Thread.currentThread().setContextClassLoader(cl);
try {
intp.setInterpreterGroup(interpreterGroup);
} catch (Exception e) {
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -202,8 +193,6 @@ public class ClassloaderInterpreter
Thread.currentThread().setContextClassLoader(cl);
try {
return intp.getInterpreterGroup();
} catch (Exception e) {
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -216,8 +205,6 @@ public class ClassloaderInterpreter
Thread.currentThread().setContextClassLoader(cl);
try {
intp.setClassloaderUrls(urls);
} catch (Exception e) {
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -230,8 +217,6 @@ public class ClassloaderInterpreter
Thread.currentThread().setContextClassLoader(cl);
try {
return intp.getClassloaderUrls();
} catch (Exception e) {
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -239,13 +224,11 @@ public class ClassloaderInterpreter
}
@Override
public void setProperty(Properties property) {
public void setProperties(Properties properties) {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
intp.setProperty(property);
} catch (Exception e) {
throw new InterpreterException(e);
intp.setProperties(properties);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -253,13 +236,11 @@ public class ClassloaderInterpreter
}
@Override
public Properties getProperty() {
public Properties getProperties() {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
return intp.getProperty();
} catch (Exception e) {
throw new InterpreterException(e);
return intp.getProperties();
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
@ -272,8 +253,6 @@ public class ClassloaderInterpreter
Thread.currentThread().setContextClassLoader(cl);
try {
return intp.getProperty(key);
} catch (Exception e) {
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);

View file

@ -31,6 +31,7 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -55,20 +56,21 @@ public abstract class Interpreter {
* open() is called only once
*/
@ZeppelinApi
public abstract void open();
public abstract void open() throws InterpreterException;
/**
* Closes interpreter. You may want to free your resources up here.
* close() is called only once
*/
@ZeppelinApi
public abstract void close();
public abstract void close() throws InterpreterException;
/**
* Run precode if exists.
*/
@ZeppelinApi
public InterpreterResult executePrecode(InterpreterContext interpreterContext) {
public InterpreterResult executePrecode(InterpreterContext interpreterContext)
throws InterpreterException {
String simpleName = this.getClass().getSimpleName();
String precode = getProperty(String.format("zeppelin.%s.precode", simpleName));
if (StringUtils.isNotBlank(precode)) {
@ -83,13 +85,15 @@ public abstract class Interpreter {
* @param st statements to run
*/
@ZeppelinApi
public abstract InterpreterResult interpret(String st, InterpreterContext context);
public abstract InterpreterResult interpret(String st,
InterpreterContext context)
throws InterpreterException;
/**
* Optionally implement the canceling routine to abort interpret() method
*/
@ZeppelinApi
public abstract void cancel(InterpreterContext context);
public abstract void cancel(InterpreterContext context) throws InterpreterException;
/**
* Dynamic form handling
@ -99,7 +103,7 @@ public abstract class Interpreter {
* FormType.NATIVE handles form in API
*/
@ZeppelinApi
public abstract FormType getFormType();
public abstract FormType getFormType() throws InterpreterException;
/**
* get interpret() method running process in percentage.
@ -107,7 +111,7 @@ public abstract class Interpreter {
* @return number between 0-100
*/
@ZeppelinApi
public abstract int getProgress(InterpreterContext context);
public abstract int getProgress(InterpreterContext context) throws InterpreterException;
/**
* Get completion list based on cursor position.
@ -120,7 +124,7 @@ public abstract class Interpreter {
*/
@ZeppelinApi
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
InterpreterContext interpreterContext) throws InterpreterException {
return null;
}
@ -144,22 +148,22 @@ public abstract class Interpreter {
public static Logger logger = LoggerFactory.getLogger(Interpreter.class);
private InterpreterGroup interpreterGroup;
private URL[] classloaderUrls;
protected Properties property;
private String userName;
protected Properties properties;
protected String userName;
@ZeppelinApi
public Interpreter(Properties property) {
this.property = property;
public Interpreter(Properties properties) {
this.properties = properties;
}
public void setProperty(Properties property) {
this.property = property;
public void setProperties(Properties properties) {
this.properties = properties;
}
@ZeppelinApi
public Properties getProperty() {
public Properties getProperties() {
Properties p = new Properties();
p.putAll(property);
p.putAll(properties);
RegisteredInterpreter registeredInterpreter = Interpreter.findRegisteredInterpreterByClassName(
getClassName());
@ -183,11 +187,22 @@ public abstract class Interpreter {
@ZeppelinApi
public String getProperty(String key) {
logger.debug("key: {}, value: {}", key, getProperty().getProperty(key));
logger.debug("key: {}, value: {}", key, getProperties().getProperty(key));
return getProperty().getProperty(key);
return getProperties().getProperty(key);
}
@ZeppelinApi
public String getProperty(String key, String defaultValue) {
logger.debug("key: {}, value: {}", key, getProperties().getProperty(key, defaultValue));
return getProperties().getProperty(key, defaultValue);
}
@ZeppelinApi
public void setProperty(String key, String value) {
properties.setProperty(key, value);
}
public String getClassName() {
return this.getClass().getName();

View file

@ -17,11 +17,12 @@
package org.apache.zeppelin.interpreter;
/**
* Runtime Exception for interpreters.
*
*/
public class InterpreterException extends RuntimeException {
public class InterpreterException extends Exception {
public InterpreterException(Throwable e) {
super(e);

View file

@ -44,13 +44,13 @@ public class LazyOpenInterpreter
}
@Override
public void setProperty(Properties property) {
intp.setProperty(property);
public void setProperties(Properties properties) {
intp.setProperties(properties);
}
@Override
public Properties getProperty() {
return intp.getProperty();
public Properties getProperties() {
return intp.getProperties();
}
@Override
@ -59,7 +59,7 @@ public class LazyOpenInterpreter
}
@Override
public synchronized void open() {
public synchronized void open() throws InterpreterException {
if (opened == true) {
return;
}
@ -73,12 +73,13 @@ public class LazyOpenInterpreter
}
@Override
public InterpreterResult executePrecode(InterpreterContext interpreterContext) {
public InterpreterResult executePrecode(InterpreterContext interpreterContext)
throws InterpreterException {
return intp.executePrecode(interpreterContext);
}
@Override
public void close() {
public void close() throws InterpreterException {
synchronized (intp) {
if (opened == true) {
intp.close();
@ -94,7 +95,8 @@ public class LazyOpenInterpreter
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
open();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
@ -105,18 +107,18 @@ public class LazyOpenInterpreter
}
@Override
public void cancel(InterpreterContext context) {
public void cancel(InterpreterContext context) throws InterpreterException {
open();
intp.cancel(context);
}
@Override
public FormType getFormType() {
public FormType getFormType() throws InterpreterException {
return intp.getFormType();
}
@Override
public int getProgress(InterpreterContext context) {
public int getProgress(InterpreterContext context) throws InterpreterException {
if (opened) {
return intp.getProgress(context);
} else {
@ -131,7 +133,7 @@ public class LazyOpenInterpreter
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
InterpreterContext interpreterContext) throws InterpreterException {
open();
List completion = intp.completion(buf, cursor, interpreterContext);
return completion;

View file

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
/**
* Interface to InterpreterClient which is created by InterpreterLauncher. This is the component
* that is used to for the communication fromzeppelin-server process to zeppelin interpreter process
*/
public interface InterpreterClient {
}

View file

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterRunner;
import java.util.Properties;
/**
* Context class for Interpreter Launch
*/
public class InterpreterLaunchContext {
private Properties properties;
private InterpreterOption option;
private InterpreterRunner runner;
private String interpreterGroupId;
private String interpreterGroupName;
public InterpreterLaunchContext(Properties properties,
InterpreterOption option,
InterpreterRunner runner,
String interpreterGroupId,
String interpreterGroupName) {
this.properties = properties;
this.option = option;
this.runner = runner;
this.interpreterGroupId = interpreterGroupId;
this.interpreterGroupName = interpreterGroupName;
}
public Properties getProperties() {
return properties;
}
public InterpreterOption getOption() {
return option;
}
public InterpreterRunner getRunner() {
return runner;
}
public String getInterpreterGroupId() {
return interpreterGroupId;
}
public String getInterpreterGroupName() {
return interpreterGroupName;
}
}

View file

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import java.io.IOException;
import java.util.Properties;
/**
* Component to Launch interpreter process.
*/
public abstract class InterpreterLauncher {
protected ZeppelinConfiguration zConf;
protected Properties properties;
public InterpreterLauncher(ZeppelinConfiguration zConf) {
this.zConf = zConf;
}
public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException;
}

View file

@ -33,6 +33,6 @@ public class RemoteInterpreterContextRunner extends InterpreterContextRunner {
public void run() {
// this class should be used only for gson deserialize abstract class
// code should not reach here
throw new InterpreterException("Assert");
throw new RuntimeException("Assert");
}
}

View file

@ -207,7 +207,11 @@ public class RemoteInterpreterServer
if (interpreterGroup != null) {
for (List<Interpreter> session : interpreterGroup.values()) {
for (Interpreter interpreter : session) {
interpreter.close();
try {
interpreter.close();
} catch (InterpreterException e) {
logger.warn("Fail to close interpreter", e);
}
}
}
}
@ -356,7 +360,11 @@ public class RemoteInterpreterServer
public void open(String sessionId, String className) throws TException {
logger.info(String.format("Open Interpreter %s for session %s ", className, sessionId));
Interpreter intp = getInterpreter(sessionId, className);
intp.open();
try {
intp.open();
} catch (InterpreterException e) {
throw new TException("Fail to open interpreter", e);
}
}
@Override
@ -388,7 +396,11 @@ public class RemoteInterpreterServer
while (it.hasNext()) {
Interpreter inp = it.next();
if (inp.getClassName().equals(className)) {
inp.close();
try {
inp.close();
} catch (InterpreterException e) {
logger.warn("Fail to close interpreter", e);
}
it.remove();
break;
}
@ -655,7 +667,11 @@ public class RemoteInterpreterServer
if (job != null) {
job.setStatus(Status.ABORT);
} else {
intp.cancel(convert(interpreterContext, null));
try {
intp.cancel(convert(interpreterContext, null));
} catch (InterpreterException e) {
throw new TException("Fail to cancel", e);
}
}
}
@ -672,7 +688,11 @@ public class RemoteInterpreterServer
throw new TException("No interpreter {} existed for session {}".format(
className, sessionId));
}
return intp.getProgress(convert(interpreterContext, null));
try {
return intp.getProgress(convert(interpreterContext, null));
} catch (InterpreterException e) {
throw new TException("Fail to getProgress", e);
}
}
}
@ -680,7 +700,11 @@ public class RemoteInterpreterServer
@Override
public String getFormType(String sessionId, String className) throws TException {
Interpreter intp = getInterpreter(sessionId, className);
return intp.getFormType().toString();
try {
return intp.getFormType().toString();
} catch (InterpreterException e) {
throw new TException(e);
}
}
@Override
@ -688,8 +712,11 @@ public class RemoteInterpreterServer
String className, String buf, int cursor, RemoteInterpreterContext remoteInterpreterContext)
throws TException {
Interpreter intp = getInterpreter(sessionId, className);
List completion = intp.completion(buf, cursor, convert(remoteInterpreterContext, null));
return completion;
try {
return intp.completion(buf, cursor, convert(remoteInterpreterContext, null));
} catch (InterpreterException e) {
throw new TException("Fail to get completion", e);
}
}
private InterpreterContext convert(RemoteInterpreterContext ric) {

View file

@ -33,8 +33,8 @@ public class InterpreterTest {
p.put("p1", "v1");
Interpreter intp = new DummyInterpreter(p);
assertEquals(1, intp.getProperty().size());
assertEquals("v1", intp.getProperty().get("p1"));
assertEquals(1, intp.getProperties().size());
assertEquals("v1", intp.getProperties().get("p1"));
assertEquals("v1", intp.getProperty("p1"));
}
@ -45,10 +45,10 @@ public class InterpreterTest {
Interpreter intp = new DummyInterpreter(p);
Properties overriddenProperty = new Properties();
overriddenProperty.put("p1", "v2");
intp.setProperty(overriddenProperty);
intp.setProperties(overriddenProperty);
assertEquals(1, intp.getProperty().size());
assertEquals("v2", intp.getProperty().get("p1"));
assertEquals(1, intp.getProperties().size());
assertEquals("v2", intp.getProperties().get("p1"));
assertEquals("v2", intp.getProperty("p1"));
}

View file

@ -28,7 +28,7 @@ public class LazyOpenInterpreterTest {
Interpreter interpreter = mock(Interpreter.class);
@Test
public void isOpenTest() {
public void isOpenTest() throws InterpreterException {
InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
when(interpreter.interpret(any(String.class), any(InterpreterContext.class))).thenReturn(interpreterResult);

View file

@ -123,7 +123,7 @@ public class InterpreterRestApi {
request.getOption(), request.getProperties());
logger.info("new setting created with {}", interpreterSetting.getId());
return new JsonResponse<>(Status.OK, "", interpreterSetting).build();
} catch (InterpreterException | IOException e) {
} catch (IOException e) {
logger.error("Exception in InterpreterRestApi while creating ", e);
return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e))
.build();

View file

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.Note;
@ -171,7 +172,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
}
@Test
public void sparkRTest() throws IOException {
public void sparkRTest() throws IOException, InterpreterException {
// create new note
Note note = ZeppelinServer.notebook.createNote(anonymous);
int sparkVersion = getSparkVersionNumber(note);
@ -426,7 +427,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
}
@Test
public void pySparkDepLoaderTest() throws IOException {
public void pySparkDepLoaderTest() throws IOException, InterpreterException {
// create new note
Note note = ZeppelinServer.notebook.createNote(anonymous);
int sparkVersionNumber = getSparkVersionNumber(note);

View file

@ -70,11 +70,6 @@
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>

View file

@ -77,7 +77,7 @@ public class InterpreterFactory {
return interpreter;
}
}
throw new InterpreterException(replName + " interpreter not found");
return null;
} else {
// first assume replName is 'name' of interpreter. ('groupName' is ommitted)

View file

@ -18,7 +18,6 @@
package org.apache.zeppelin.interpreter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.gson.JsonArray;
@ -34,19 +33,22 @@ import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext;
import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FilenameFilter;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
@ -58,7 +60,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -132,6 +133,10 @@ public class InterpreterSetting {
private transient ZeppelinConfiguration conf = new ZeppelinConfiguration();
// TODO(zjffdu) ShellScriptLauncher is the only launcher implemention for now. It could be other
// launcher in future when we have other launcher implementation. e.g. third party launcher
// service like livy
private transient InterpreterLauncher launcher;
///////////////////////////////////////////////////////////////////////////////////////////
@ -243,6 +248,7 @@ public class InterpreterSetting {
}
void postProcessing() {
// createLauncher();
this.status = Status.READY;
}
@ -266,6 +272,14 @@ public class InterpreterSetting {
this.conf = o.getConf();
}
private void createLauncher() {
if (group.equals("spark")) {
this.launcher = new SparkInterpreterLauncher(this.conf);
} else {
this.launcher = new ShellScriptLauncher(this.conf);
}
}
public AngularObjectRegistryListener getAngularObjectRegistryListener() {
return angularObjectRegistryListener;
}
@ -626,6 +640,7 @@ public class InterpreterSetting {
}
return interpreters;
}
<<<<<<< 0c64d9ca676e48a749db9879fa3cebc06eb78b54
RemoteInterpreterProcess createInterpreterProcess() {
RemoteInterpreterProcess remoteInterpreterProcess = null;
@ -771,9 +786,79 @@ public class InterpreterSetting {
return "\"" + value + "\"";
} else {
return "\'" + value + "\'";
=======
// Create Interpreter in ZeppelinServer for non-remote mode
private Interpreter createLocalInterpreter(String className) {
LOGGER.info("Create Local Interpreter {} from {}", className, interpreterDir);
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
try {
URLClassLoader ccl = cleanCl.get(interpreterDir);
if (ccl == null) {
// classloader fallback
ccl = URLClassLoader.newInstance(new URL[]{}, oldcl);
}
boolean separateCL = true;
try { // check if server's classloader has driver already.
Class cls = this.getClass().forName(className);
if (cls != null) {
separateCL = false;
}
} catch (Exception e) {
LOGGER.error("exception checking server classloader driver", e);
}
URLClassLoader cl;
if (separateCL == true) {
cl = URLClassLoader.newInstance(new URL[]{}, ccl);
} else {
cl = ccl;
}
Thread.currentThread().setContextClassLoader(cl);
Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className);
Constructor<Interpreter> constructor =
replClass.getConstructor(new Class[]{Properties.class});
Interpreter repl = constructor.newInstance(getJavaProperties());
repl.setClassloaderUrls(ccl.getURLs());
LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl));
return intp;
} catch (SecurityException e) {
throw new RuntimeException(e);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
} catch (IllegalArgumentException e) {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
} catch (InvocationTargetException e) {
throw new RuntimeException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
} finally {
Thread.currentThread().setContextClassLoader(oldcl);
}
}
synchronized RemoteInterpreterProcess createInterpreterProcess() throws IOException {
if (launcher == null) {
createLauncher();
>>>>>>> ZEPPELIN-2685. Improvement on Interpreter class
}
InterpreterLaunchContext launchContext = new
InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, name);
RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
process.setRemoteInterpreterEventPoller(
new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));
return process;
}
private List<Interpreter> getOrCreateSession(String user, String noteId) {
ManagedInterpreterGroup interpreterGroup = getOrCreateInterpreterGroup(user, noteId);
Preconditions.checkNotNull(interpreterGroup, "No InterpreterGroup existed for user {}, " +
@ -815,8 +900,7 @@ public class InterpreterSetting {
return null;
}
private ManagedInterpreterGroup createInterpreterGroup(String groupId)
throws InterpreterException {
private ManagedInterpreterGroup createInterpreterGroup(String groupId) {
AngularObjectRegistry angularObjectRegistry;
ManagedInterpreterGroup interpreterGroup = new ManagedInterpreterGroup(groupId, this);
angularObjectRegistry =
@ -938,7 +1022,8 @@ public class InterpreterSetting {
);
newProperties.put(key, property);
} else {
throw new RuntimeException("Can not convert this type of property: " + value.getClass());
throw new RuntimeException("Can not convert this type of property: " +
value.getClass());
}
}
return newProperties;

View file

@ -737,11 +737,12 @@ public class InterpreterSettingManager {
}
/**
* Change interpreter property and restart
* Change interpreter properties and restart
*/
public void setPropertyAndRestart(String id, InterpreterOption option,
Map<String, InterpreterProperty> properties,
List<Dependency> dependencies) throws IOException {
List<Dependency> dependencies)
throws InterpreterException, IOException {
synchronized (interpreterSettings) {
InterpreterSetting intpSetting = interpreterSettings.get(id);
if (intpSetting != null) {
@ -754,7 +755,7 @@ public class InterpreterSettingManager {
saveToFile();
} catch (Exception e) {
loadFromFile();
throw e;
throw new IOException(e);
}
} else {
throw new InterpreterException("Interpreter setting id " + id + " not found");
@ -763,7 +764,7 @@ public class InterpreterSettingManager {
}
// restart in note page
public void restart(String settingId, String noteId, String user) {
public void restart(String settingId, String noteId, String user) throws InterpreterException {
InterpreterSetting intpSetting = interpreterSettings.get(settingId);
Preconditions.checkNotNull(intpSetting);
synchronized (interpreterSettings) {
@ -787,7 +788,7 @@ public class InterpreterSettingManager {
}
}
public void restart(String id) {
public void restart(String id) throws InterpreterException {
restart(id, "", "anonymous");
}

View file

@ -25,6 +25,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@ -52,7 +53,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
return interpreterSetting;
}
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
if (remoteInterpreterProcess == null) {
LOGGER.info("Create InterperterProcess for InterpreterGroup: " + getId());
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess();
@ -112,7 +113,11 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
LOGGER.info("Job " + job.getJobName() + " aborted ");
}
interpreter.close();
try {
interpreter.close();
} catch (InterpreterException e) {
LOGGER.warn("Fail to close interpreter " + interpreter.getClassName(), e);
}
//TODO(zjffdu) move the close of schedule to Interpreter
if (null != scheduler) {
SchedulerFactory.singleton().removeScheduler(scheduler.getName());

View file

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterRunner;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Interpreter Launcher which use shell script to launch the interpreter process.
*
*/
public class ShellScriptLauncher extends InterpreterLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class);
public ShellScriptLauncher(ZeppelinConfiguration zConf) {
super(zConf);
}
@Override
public InterpreterClient launch(InterpreterLaunchContext context) {
LOGGER.info("Launching Interpreter: " + context.getInterpreterGroupName());
this.properties = context.getProperties();
InterpreterOption option = context.getOption();
InterpreterRunner runner = context.getRunner();
String groupName = context.getInterpreterGroupName();
int connectTimeout =
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
if (option.isExistingProcess()) {
// TODO(zjffdu) remove the existing process approach seems no one is using this.
// use the existing process
return new RemoteInterpreterRunningProcess(
connectTimeout,
option.getHost(),
option.getPort());
} else {
// create new remote process
String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+ context.getInterpreterGroupId();
return new RemoteInterpreterManagedProcess(
runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
zConf.getCallbackPortRange(),
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
buildEnvFromProperties(), connectTimeout, groupName);
}
}
protected Map<String, String> buildEnvFromProperties() {
Map<String, String> env = new HashMap<>();
for (Object key : properties.keySet()) {
if (RemoteInterpreterUtils.isEnvString((String) key)) {
env.put((String) key, properties.getProperty((String) key));
}
}
return env;
}
}

View file

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* Spark specific launcher.
*/
public class SparkInterpreterLauncher extends ShellScriptLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
public SparkInterpreterLauncher(ZeppelinConfiguration zConf) {
super(zConf);
}
@Override
protected Map<String, String> buildEnvFromProperties() {
Map<String, String> env = new HashMap<String, String>();
Properties sparkProperties = new Properties();
String sparkMaster = getSparkMaster(properties);
for (String key : properties.stringPropertyNames()) {
if (RemoteInterpreterUtils.isEnvString(key)) {
env.put(key, properties.getProperty(key));
}
if (isSparkConf(key, properties.getProperty(key))) {
sparkProperties.setProperty(key, toShellFormat(properties.getProperty(key)));
}
}
setupPropertiesForPySpark(sparkProperties);
setupPropertiesForSparkR(sparkProperties, properties.getProperty("SPARK_HOME"));
if (isYarnMode() && getDeployMode().equals("cluster")) {
env.put("SPARK_YARN_CLUSTER", "true");
}
StringBuilder sparkConfBuilder = new StringBuilder();
if (sparkMaster != null) {
sparkConfBuilder.append(" --master " + sparkMaster);
}
if (isYarnMode() && getDeployMode().equals("cluster")) {
sparkConfBuilder.append(" --files " + zConf.getConfDir() + "/log4j_yarn_cluster.properties");
}
for (String name : sparkProperties.stringPropertyNames()) {
sparkConfBuilder.append(" --conf " + name + "=" + sparkProperties.getProperty(name));
}
env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
// use the HADOOP_CONF_DIR defined in zeppelin-env.sh if it is not
// specified in interpreter setting
if (!env.containsKey("HADOOP_CONF_DIR") && System.getenv("HADOOP_CONF_DIR") != null) {
env.put("HADOOP_CONF_DIR", System.getenv("HADOOP_CONF_DIR"));
}
LOGGER.debug("buildEnvFromProperties: " + env);
return env;
}
private boolean isSparkConf(String key, String value) {
return !StringUtils.isEmpty(key) && key.startsWith("spark.") && !StringUtils.isEmpty(value);
}
private void setupPropertiesForPySpark(Properties sparkProperties) {
if (isYarnMode()) {
sparkProperties.setProperty("spark.yarn.isPython", "true");
}
}
private void mergeSparkProperty(Properties sparkProperties, String propertyName,
String propertyValue) {
if (sparkProperties.containsKey(propertyName)) {
String oldPropertyValue = sparkProperties.getProperty(propertyName);
sparkProperties.setProperty(propertyName, oldPropertyValue + "," + propertyValue);
} else {
sparkProperties.setProperty(propertyName, propertyValue);
}
}
private void setupPropertiesForSparkR(Properties sparkProperties,
String sparkHome) {
File sparkRBasePath = null;
if (sparkHome == null) {
if (!getSparkMaster(properties).startsWith("local")) {
throw new RuntimeException("SPARK_HOME is not specified in interpreter-setting" +
" for non-local mode, if you specify it in zeppelin-env.sh, please move that into " +
" interpreter setting");
}
String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME);
sparkRBasePath = new File(zeppelinHome,
"interpreter" + File.separator + "spark" + File.separator + "R");
} else {
sparkRBasePath = new File(sparkHome, "R" + File.separator + "lib");
}
File sparkRPath = new File(sparkRBasePath, "sparkr.zip");
if (sparkRPath.exists() && sparkRPath.isFile()) {
mergeSparkProperty(sparkProperties, "spark.yarn.dist.archives", sparkRPath.getAbsolutePath());
} else {
LOGGER.warn("sparkr.zip is not found, SparkR may not work.");
}
}
/**
* Order to look for spark master
* 1. master in interpreter setting
* 2. spark.master interpreter setting
* 3. SPARK_HOME in zeppelin-env.sh
* 4. use local[*]
* @param properties
* @return
*/
private String getSparkMaster(Properties properties) {
String master = properties.getProperty("master");
if (master == null) {
master = properties.getProperty("spark.master");
if (master == null) {
master = System.getenv("SPARK_HOME");
}
if (master == null) {
master = "local[*]";
}
}
return master;
}
private String getDeployMode() {
String master = getSparkMaster(properties);
if (master.equals("yarn-client")) {
return "client";
} else if (master.equals("yarn-cluster")) {
return "cluster";
} else if (master.startsWith("local")) {
return "client";
} else {
String deployMode = properties.getProperty("spark.submit.deployMode");
if (deployMode == null) {
throw new RuntimeException("master is set as yarn, but spark.submit.deployMode " +
"is not specified");
}
if (!deployMode.equals("client") && !deployMode.equals("cluster")) {
throw new RuntimeException("Invalid value for spark.submit.deployMode: " + deployMode);
}
return deployMode;
}
}
private boolean isYarnMode() {
return getSparkMaster(properties).startsWith("yarn");
}
private String toShellFormat(String value) {
if (value.contains("\'") && value.contains("\"")) {
throw new RuntimeException("Spark property value could not contain both \" and '");
} else if (value.contains("\'")) {
return "\"" + value + "\"";
} else {
return "\'" + value + "\'";
}
}
}

View file

@ -82,7 +82,7 @@ public class InterpreterContextRunnerPool {
}
}
throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId);
throw new RuntimeException("Can not run paragraph " + paragraphId + " on " + noteId);
}
}
}

View file

@ -28,6 +28,7 @@ import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -42,6 +43,7 @@ import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@ -90,7 +92,7 @@ public class RemoteInterpreter extends Interpreter {
return this.sessionId;
}
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() {
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
if (this.interpreterProcess != null) {
return this.interpreterProcess;
}
@ -113,7 +115,7 @@ public class RemoteInterpreter extends Interpreter {
}
@Override
public void open() {
public void open() throws InterpreterException {
synchronized (this) {
if (!isOpened) {
// create all the interpreters of the same session first, then Open the internal interpreter
@ -123,7 +125,11 @@ public class RemoteInterpreter extends Interpreter {
// also see method Interpreter.getInterpreterInTheSameSessionByClassName
for (Interpreter interpreter : getInterpreterGroup()
.getOrCreateSession(userName, sessionId)) {
((RemoteInterpreter) interpreter).internal_create();
try {
((RemoteInterpreter) interpreter).internal_create();
} catch (IOException e) {
throw new InterpreterException(e);
}
}
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@ -147,7 +153,7 @@ public class RemoteInterpreter extends Interpreter {
}
}
private void internal_create() {
private void internal_create() throws IOException {
synchronized (this) {
if (!isCreated) {
RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
@ -156,7 +162,7 @@ public class RemoteInterpreter extends Interpreter {
public Void call(Client client) throws Exception {
LOGGER.info("Create RemoteInterpreter {}", getClassName());
client.createInterpreter(getInterpreterGroup().getId(), sessionId,
className, (Map) property, userName);
className, (Map) getProperties(), userName);
return null;
}
});
@ -167,9 +173,14 @@ public class RemoteInterpreter extends Interpreter {
@Override
public void close() {
public void close() throws InterpreterException {
if (isOpened) {
RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
@ -184,13 +195,19 @@ public class RemoteInterpreter extends Interpreter {
}
@Override
public InterpreterResult interpret(final String st, final InterpreterContext context) {
public InterpreterResult interpret(final String st, final InterpreterContext context)
throws InterpreterException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("st:\n{}", st);
}
final FormType form = getFormType();
RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
.getInterpreterContextRunnerPool();
List<InterpreterContextRunner> runners = context.getRunners();
@ -238,12 +255,17 @@ public class RemoteInterpreter extends Interpreter {
}
@Override
public void cancel(final InterpreterContext context) {
public void cancel(final InterpreterContext context) throws InterpreterException {
if (!isOpened) {
LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className);
return;
}
RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
@ -254,7 +276,7 @@ public class RemoteInterpreter extends Interpreter {
}
@Override
public FormType getFormType() {
public FormType getFormType() throws InterpreterException {
if (formType != null) {
return formType;
}
@ -265,7 +287,12 @@ public class RemoteInterpreter extends Interpreter {
open();
}
}
RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
FormType type = interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<FormType>() {
@Override
@ -277,13 +304,19 @@ public class RemoteInterpreter extends Interpreter {
return type;
}
@Override
public int getProgress(final InterpreterContext context) {
public int getProgress(final InterpreterContext context) throws InterpreterException {
if (!isOpened) {
LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className);
return 0;
}
RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<Integer>() {
@Override
@ -296,12 +329,18 @@ public class RemoteInterpreter extends Interpreter {
@Override
public List<InterpreterCompletion> completion(final String buf, final int cursor,
final InterpreterContext interpreterContext) {
final InterpreterContext interpreterContext)
throws InterpreterException {
if (!isOpened) {
LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className);
return new ArrayList<>();
}
RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
@Override
@ -317,7 +356,12 @@ public class RemoteInterpreter extends Interpreter {
LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className);
return Job.Status.UNKNOWN.name();
}
RemoteInterpreterProcess interpreterProcess = getOrCreateInterpreterProcess();
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new RuntimeException(e);
}
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<String>() {
@Override
@ -331,7 +375,7 @@ public class RemoteInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
int maxConcurrency = Integer.parseInt(
property.getProperty("zeppelin.interpreter.max.poolsize",
getProperty("zeppelin.interpreter.max.poolsize",
ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + ""));
Scheduler s = new RemoteScheduler(

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter.remote;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
@ -73,11 +74,8 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
String localRepoDir,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener,
String interpreterGroupName) {
super(new RemoteInterpreterEventPoller(listener, appListener),
connectTimeout);
super(connectTimeout);
this.interpreterRunner = intpRunner;
this.portRange = portRange;
this.env = env;
@ -86,23 +84,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
this.interpreterGroupName = interpreterGroupName;
}
RemoteInterpreterManagedProcess(String intpRunner,
String intpDir,
String localRepoDir,
Map<String, String> env,
RemoteInterpreterEventPoller remoteInterpreterEventPoller,
int connectTimeout,
String interpreterGroupName) {
super(remoteInterpreterEventPoller,
connectTimeout);
this.interpreterRunner = intpRunner;
this.portRange = ":";
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
this.interpreterGroupName = interpreterGroupName;
}
@Override
public String getHost() {
return "localhost";
@ -124,7 +105,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
callbackHost = RemoteInterpreterUtils.findAvailableHostAddress();
callbackPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e1) {
throw new InterpreterException(e1);
throw new RuntimeException(e1);
}
logger.info("Thrift server for callback will start. Port: {}", callbackPort);
@ -206,7 +187,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
executor.execute(cmdLine, procEnv, this);
} catch (IOException e) {
running.set(false);
throw new InterpreterException(e);
throw new RuntimeException(e);
}
try {
@ -217,7 +198,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
}
if (!running.get()) {
callbackServer.stop();
throw new InterpreterException("Cannot run interpreter");
throw new RuntimeException(new String(cmdOut.toByteArray()));
}
} catch (InterruptedException e) {
logger.error("Remote interpreter is not accessible");
@ -227,7 +208,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
public void stop() {
// shutdown EventPoller first.
this.remoteInterpreterEventPoller.shutdown();
this.getRemoteInterpreterEventPoller().shutdown();
if (callbackServer.isServing()) {
callbackServer.stop();
}
@ -266,6 +247,31 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
running.set(false);
}
@VisibleForTesting
public Map<String, String> getEnv() {
return env;
}
@VisibleForTesting
public String getLocalRepoDir() {
return localRepoDir;
}
@VisibleForTesting
public String getInterpreterDir() {
return interpreterDir;
}
@VisibleForTesting
public String getInterpreterGroupName() {
return interpreterGroupName;
}
@VisibleForTesting
public String getInterpreterRunner() {
return interpreterRunner;
}
public boolean isRunning() {
return running.get();
}

View file

@ -21,6 +21,7 @@ import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TException;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -28,27 +29,17 @@ import org.slf4j.LoggerFactory;
/**
* Abstract class for interpreter process
*/
public abstract class RemoteInterpreterProcess {
public abstract class RemoteInterpreterProcess implements InterpreterClient {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class);
private GenericObjectPool<Client> clientPool;
protected final RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private final InterpreterContextRunnerPool interpreterContextRunnerPool;
private int connectTimeout;
public RemoteInterpreterProcess(
int connectTimeout,
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
this(new RemoteInterpreterEventPoller(listener, appListener),
connectTimeout);
this.remoteInterpreterEventPoller.setInterpreterProcess(this);
}
RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller,
int connectTimeout) {
int connectTimeout) {
this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
this.remoteInterpreterEventPoller = remoteInterpreterEventPoller;
this.connectTimeout = connectTimeout;
}
@ -56,6 +47,10 @@ public abstract class RemoteInterpreterProcess {
return remoteInterpreterEventPoller;
}
public void setRemoteInterpreterEventPoller(RemoteInterpreterEventPoller eventPoller) {
this.remoteInterpreterEventPoller = eventPoller;
}
public abstract String getHost();
public abstract int getPort();
public abstract void start(String userName, Boolean isUserImpersonate);
@ -147,9 +142,9 @@ public abstract class RemoteInterpreterProcess {
}
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
throw new RuntimeException(e);
} catch (Exception e1) {
throw new InterpreterException(e1);
throw new RuntimeException(e1);
} finally {
if (client != null) {
releaseClient(client, broken);

View file

@ -30,12 +30,10 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
public RemoteInterpreterRunningProcess(
int connectTimeout,
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener,
String host,
int port
) {
super(connectTimeout, listener, appListener);
super(connectTimeout);
this.host = host;
this.port = port;
}

View file

@ -627,7 +627,7 @@ public class Note implements ParagraphJobListener, JsonSerializable {
if (intp == null) {
String intpExceptionMsg =
p.getJobName() + "'s Interpreter " + requiredReplName + " not found";
InterpreterException intpException = new InterpreterException(intpExceptionMsg);
RuntimeException intpException = new RuntimeException(intpExceptionMsg);
InterpreterResult intpResult =
new InterpreterResult(InterpreterResult.Code.ERROR, intpException.getMessage());
p.setReturn(intpResult, intpException);

View file

@ -892,7 +892,11 @@ public class Notebook implements NoteEventListener {
if (releaseResource) {
for (InterpreterSetting setting : notebook.getInterpreterSettingManager()
.getInterpreterSettings(note.getId())) {
notebook.getInterpreterSettingManager().restart(setting.getId());
try {
notebook.getInterpreterSettingManager().restart(setting.getId());
} catch (InterpreterException e) {
logger.warn("Fail to resetart interpreter: " + setting.getId(), e);
}
}
}
}

View file

@ -312,15 +312,14 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
String replName = getRequiredReplName(trimmedBuffer);
String body = getScriptBody(trimmedBuffer);
Interpreter repl = getRepl(replName);
if (repl == null) {
return null;
}
InterpreterContext interpreterContext = getInterpreterContextWithoutRunner(null);
List completion = repl.completion(body, cursor, interpreterContext);
return completion;
try {
Interpreter repl = getRepl(replName);
return repl.completion(body, cursor, interpreterContext);
} catch (InterpreterException e) {
throw new RuntimeException("Fail to get completion", e);
}
}
public int calculateCursorPosition(String buffer, String trimmedBuffer, int cursor) {
@ -362,11 +361,15 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
@Override
public int progress() {
String replName = getRequiredReplName();
Interpreter repl = getRepl(replName);
if (repl != null) {
try {
Interpreter repl = getRepl(replName);
if (repl == null) {
return 0;
}
return repl.getProgress(getInterpreterContext(null));
} else {
return 0;
} catch (InterpreterException e) {
throw new RuntimeException("Fail to get progress", e);
}
}
@ -494,10 +497,8 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
protected boolean jobAbort() {
Interpreter repl = getRepl(getRequiredReplName());
if (repl == null) {
// when interpreters are already destroyed
return true;
}
Scheduler scheduler = repl.getScheduler();
if (scheduler == null) {
return true;
@ -507,7 +508,11 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
if (job != null) {
job.setStatus(Status.ABORT);
} else {
repl.cancel(getInterpreterContextWithoutRunner(null));
try {
repl.cancel(getInterpreterContextWithoutRunner(null));
} catch (InterpreterException e) {
throw new RuntimeException(e);
}
}
return true;
}
@ -738,12 +743,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
}
private boolean isValidInterpreter(String replName) {
try {
return factory.getInterpreter(user, note.getId(), replName) != null;
} catch (InterpreterException e) {
// ignore this exception, it would be recaught when running paragraph.
return false;
}
return factory.getInterpreter(user, note.getId(), replName) != null;
}
public void updateRuntimeInfos(String label, String tooltip, Map<String, String> infos,

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.helium;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.ApplicationState;
@ -241,7 +242,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest implem
@Test
public void testUnloadOnInterpreterRestart() throws IOException {
public void testUnloadOnInterpreterRestart() throws IOException, InterpreterException {
// given
HeliumPackage pkg1 = new HeliumPackage(HeliumType.APPLICATION,
"name1",

View file

@ -41,7 +41,7 @@ public class EchoInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
if (Boolean.parseBoolean(property.getProperty("zeppelin.interpreter.echo.fail", "false"))) {
if (Boolean.parseBoolean(getProperty("zeppelin.interpreter.echo.fail", "false"))) {
return new InterpreterResult(InterpreterResult.Code.ERROR);
} else {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, st);

View file

@ -52,10 +52,10 @@ public class InterpreterFactoryTest extends AbstractInterpreterTest {
assertEquals(DoubleEchoInterpreter.class.getName(), remoteInterpreter.getClassName());
}
@Test(expected = InterpreterException.class)
@Test
public void testUnknownRepl1() throws IOException {
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
interpreterFactory.getInterpreter("user1", "note1", "test.unknown_repl");
assertNull(interpreterFactory.getInterpreter("user1", "note1", "test.unknown_repl"));
}
@Test

View file

@ -96,7 +96,7 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest {
}
@Test
public void testCreateUpdateRemoveSetting() throws IOException {
public void testCreateUpdateRemoveSetting() throws IOException, InterpreterException {
// create new interpreter setting
InterpreterOption option = new InterpreterOption();
option.setPerNote("scoped");

View file

@ -46,7 +46,7 @@ public class SleepInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
if (Boolean.parseBoolean(property.getProperty("zeppelin.SleepInterpreter.parallel", "false"))) {
if (Boolean.parseBoolean(getProperty("zeppelin.SleepInterpreter.parallel", "false"))) {
return SchedulerFactory.singleton().createOrGetParallelScheduler(
"Parallel-" + SleepInterpreter.class.getName(), 10);
}

View file

@ -44,7 +44,7 @@ public class SparkInterpreterModeTest {
}
}
private void testInterpreterBasics() throws IOException {
private void testInterpreterBasics() throws IOException, InterpreterException {
// test SparkInterpreter
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark");
@ -75,7 +75,7 @@ public class SparkInterpreterModeTest {
}
@Test
public void testLocalMode() throws IOException, YarnException {
public void testLocalMode() throws IOException, YarnException, InterpreterException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "local[*]");
sparkInterpreterSetting.setProperty("SPARK_HOME", System.getenv("SPARK_HOME"));
@ -94,7 +94,7 @@ public class SparkInterpreterModeTest {
}
@Test
public void testYarnClientMode() throws IOException, YarnException, InterruptedException {
public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "yarn-client");
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
@ -116,7 +116,7 @@ public class SparkInterpreterModeTest {
}
@Test
public void testYarnClusterMode() throws IOException, YarnException, InterruptedException {
public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "yarn-cluster");
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());

View file

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.junit.Test;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ShellScriptLauncherTest {
@Test
public void testLauncher() {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
ShellScriptLauncher launcher = new ShellScriptLauncher(zConf);
Properties properties = new Properties();
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "groupName");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
assertEquals("groupName", interpreterProcess.getInterpreterGroupName());
assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(1, interpreterProcess.getEnv().size());
assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1"));
}
}

View file

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.junit.Test;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class SparkInterpreterLauncherTest {
@Test
public void testLocalMode() {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
properties.setProperty("master", "local[*]");
properties.setProperty("spark.files", "file_1");
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterGroupName());
assertEquals(".//interpreter/spark", interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(2, interpreterProcess.getEnv().size());
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals(" --master local[*] --conf spark.files='file_1' --conf spark.jars='jar_1'", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@Test
public void testYarnClientMode_1() {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
properties.setProperty("master", "yarn-client");
properties.setProperty("spark.files", "file_1");
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterGroupName());
assertEquals(".//interpreter/spark", interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(2, interpreterProcess.getEnv().size());
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals(" --master yarn-client --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@Test
public void testYarnClientMode_2() {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
properties.setProperty("master", "yarn");
properties.setProperty("spark.submit.deployMode", "client");
properties.setProperty("spark.files", "file_1");
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterGroupName());
assertEquals(".//interpreter/spark", interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(2, interpreterProcess.getEnv().size());
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals(" --master yarn --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='client' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@Test
public void testYarnClusterMode_1() {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
properties.setProperty("master", "yarn-cluster");
properties.setProperty("spark.files", "file_1");
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterGroupName());
assertEquals(".//interpreter/spark", interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(3, interpreterProcess.getEnv().size());
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals("true", interpreterProcess.getEnv().get("SPARK_YARN_CLUSTER"));
assertEquals(" --master yarn-cluster --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
@Test
public void testYarnClusterMode_2() {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
properties.setProperty("master", "yarn");
properties.setProperty("spark.submit.deployMode", "cluster");
properties.setProperty("spark.files", "file_1");
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
assertEquals("spark", interpreterProcess.getInterpreterGroupName());
assertEquals(".//interpreter/spark", interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());
assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner());
assertEquals(3, interpreterProcess.getEnv().size());
assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
assertEquals("true", interpreterProcess.getEnv().get("SPARK_YARN_CLUSTER"));
assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
}
}

View file

@ -100,7 +100,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
}
@Test
public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
public void testAngularObjectInterpreterSideCRUD() throws InterruptedException, InterpreterException {
InterpreterResult ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
String[] result = ret.message().get(0).getData().split(" ");
@ -133,7 +133,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
}
@Test
public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException {
public void testAngularObjectRemovalOnZeppelinServerSide() throws InterruptedException, InterpreterException {
// test if angularobject removal from server side propagate to interpreter process's registry.
// will happen when notebook is removed.
@ -158,7 +158,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
}
@Test
public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException {
public void testAngularObjectAddOnZeppelinServerSide() throws InterruptedException, InterpreterException {
// test if angularobject add from server side propagate to interpreter process's registry.
// will happen when zeppelin server loads notebook and restore the object into registry

View file

@ -82,7 +82,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
}
@Test
public void testInterpreterResultOnly() {
public void testInterpreterResultOnly() throws InterpreterException {
RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
InterpreterResult ret = intp.interpret("SUCCESS::staticresult", createInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
@ -98,7 +98,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
}
@Test
public void testInterpreterOutputStreamOnly() {
public void testInterpreterOutputStreamOnly() throws InterpreterException {
RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
InterpreterResult ret = intp.interpret("SUCCESS:streamresult:", createInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
@ -110,7 +110,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
}
@Test
public void testInterpreterResultOutputStreamMixed() {
public void testInterpreterResultOutputStreamMixed() throws InterpreterException {
RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
InterpreterResult ret = intp.interpret("SUCCESS:stream:static", createInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
@ -119,7 +119,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
}
@Test
public void testOutputType() {
public void testOutputType() throws InterpreterException {
RemoteInterpreter intp = (RemoteInterpreter) interpreterSetting.getDefaultInterpreter("user1", "note1");
InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());

View file

@ -81,7 +81,7 @@ public class RemoteInterpreterTest {
}
@Test
public void testSharedMode() {
public void testSharedMode() throws InterpreterException, IOException {
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
@ -124,7 +124,7 @@ public class RemoteInterpreterTest {
}
@Test
public void testScopedMode() {
public void testScopedMode() throws InterpreterException, IOException {
interpreterSetting.getOption().setPerUser(InterpreterOption.SCOPED);
Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
@ -170,7 +170,7 @@ public class RemoteInterpreterTest {
}
@Test
public void testIsolatedMode() {
public void testIsolatedMode() throws InterpreterException, IOException {
interpreterSetting.getOption().setPerUser(InterpreterOption.ISOLATED);
Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
@ -217,7 +217,7 @@ public class RemoteInterpreterTest {
}
@Test
public void testExecuteIncorrectPrecode() throws TTransportException, IOException {
public void testExecuteIncorrectPrecode() throws TTransportException, IOException, InterpreterException {
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
interpreterSetting.setProperty("zeppelin.SleepInterpreter.precode", "fail test");
Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
@ -228,7 +228,7 @@ public class RemoteInterpreterTest {
}
@Test
public void testExecuteCorrectPrecode() throws TTransportException, IOException {
public void testExecuteCorrectPrecode() throws TTransportException, IOException, InterpreterException {
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
interpreterSetting.setProperty("zeppelin.SleepInterpreter.precode", "1");
Interpreter interpreter1 = interpreterSetting.getInterpreter("user1", "note1", "sleep");
@ -239,7 +239,7 @@ public class RemoteInterpreterTest {
}
@Test
public void testRemoteInterperterErrorStatus() throws TTransportException, IOException {
public void testRemoteInterperterErrorStatus() throws TTransportException, IOException, InterpreterException {
interpreterSetting.setProperty("zeppelin.interpreter.echo.fail", "true");
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
@ -254,7 +254,7 @@ public class RemoteInterpreterTest {
}
@Test
public void testFIFOScheduler() throws InterruptedException {
public void testFIFOScheduler() throws InterruptedException, InterpreterException {
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
// by default SleepInterpreter would use FIFOScheduler
@ -268,13 +268,23 @@ public class RemoteInterpreterTest {
Thread thread1 = new Thread() {
@Override
public void run() {
assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
try {
assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
} catch (InterpreterException e) {
e.printStackTrace();
fail();
}
}
};
Thread thread2 = new Thread() {
@Override
public void run() {
assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
try {
assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
} catch (InterpreterException e) {
e.printStackTrace();
fail();
}
}
};
long start = System.currentTimeMillis();
@ -287,7 +297,7 @@ public class RemoteInterpreterTest {
}
@Test
public void testParallelScheduler() throws InterruptedException {
public void testParallelScheduler() throws InterruptedException, InterpreterException {
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
interpreterSetting.setProperty("zeppelin.SleepInterpreter.parallel", "true");
@ -302,13 +312,23 @@ public class RemoteInterpreterTest {
Thread thread1 = new Thread() {
@Override
public void run() {
assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
try {
assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
} catch (InterpreterException e) {
e.printStackTrace();
fail();
}
}
};
Thread thread2 = new Thread() {
@Override
public void run() {
assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
try {
assertEquals(Code.SUCCESS, interpreter1.interpret("100", context1).code());
} catch (InterpreterException e) {
e.printStackTrace();
fail();
}
}
};
long start = System.currentTimeMillis();
@ -376,7 +396,7 @@ public class RemoteInterpreterTest {
}
@Test
public void testEnvironmentAndProperty() {
public void testEnvironmentAndProperty() throws InterpreterException {
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
interpreterSetting.setProperty("ENV_1", "VALUE_1");
interpreterSetting.setProperty("property_1", "value_1");

View file

@ -51,8 +51,9 @@ public class MockInterpreterA extends Interpreter {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
if (property.containsKey("progress")) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
if (getProperties().containsKey("progress")) {
context.setProgress(Integer.parseInt(getProperty("progress")));
}
try {

View file

@ -52,7 +52,8 @@ public class MockInterpreterOutputStream extends Interpreter {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
public InterpreterResult interpret(String st, InterpreterContext context)
throws InterpreterException {
String[] ret = st.split(":");
try {
if (ret[1] != null) {

View file

@ -23,8 +23,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterGroup;
@ -32,9 +30,6 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
@ -667,7 +662,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
@Test
public void testAngularObjectRemovalOnInterpreterRestart() throws InterruptedException,
IOException {
IOException, InterpreterException {
// create a note and a paragraph
Note note = notebook.createNote(anonymous);
interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds());
@ -795,7 +790,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
@Test
public void testAbortParagraphStatusOnInterpreterRestart() throws InterruptedException,
IOException {
IOException, InterpreterException {
Note note = notebook.createNote(anonymous);
interpreterSettingManager.setInterpreterBinding(anonymous.getUser(), note.getId(), interpreterSettingManager.getInterpreterSettingIds());
@ -829,7 +824,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
}
@Test
public void testPerSessionInterpreterCloseOnNoteRemoval() throws IOException {
public void testPerSessionInterpreterCloseOnNoteRemoval() throws IOException, InterpreterException {
// create a notes
Note note1 = notebook.createNote(anonymous);
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@ -861,7 +856,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
}
@Test
public void testPerSessionInterpreter() throws IOException {
public void testPerSessionInterpreter() throws IOException, InterpreterException {
// create two notes
Note note1 = notebook.createNote(anonymous);
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@ -905,7 +900,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
@Test
public void testPerNoteSessionInterpreter() throws IOException {
public void testPerNoteSessionInterpreter() throws IOException, InterpreterException {
// create two notes
Note note1 = notebook.createNote(anonymous);
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
@ -964,7 +959,7 @@ public class NotebookTest extends AbstractInterpreterTest implements JobListener
}
@Test
public void testPerSessionInterpreterCloseOnUnbindInterpreterSetting() throws IOException {
public void testPerSessionInterpreterCloseOnUnbindInterpreterSetting() throws IOException, InterpreterException {
// create a notes
Note note1 = notebook.createNote(anonymous);
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);

View file

@ -79,7 +79,7 @@ public class DistributedResourcePoolTest extends AbstractInterpreterTest {
}
@Test
public void testRemoteDistributedResourcePool() {
public void testRemoteDistributedResourcePool() throws InterpreterException {
Gson gson = new Gson();
InterpreterResult ret;
intp1.interpret("put key1 value1", context);
@ -162,7 +162,7 @@ public class DistributedResourcePoolTest extends AbstractInterpreterTest {
}
@Test
public void testResourcePoolUtils() {
public void testResourcePoolUtils() throws InterpreterException {
Gson gson = new Gson();
InterpreterResult ret;
@ -201,7 +201,7 @@ public class DistributedResourcePoolTest extends AbstractInterpreterTest {
}
@Test
public void testResourceInvokeMethod() {
public void testResourceInvokeMethod() throws InterpreterException {
Gson gson = new Gson();
InterpreterResult ret;
intp1.interpret("put key1 hey", context);

View file

@ -20,6 +20,7 @@ package org.apache.zeppelin.scheduler;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterInfo;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterResult;
@ -209,7 +210,11 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
@Override
protected boolean jobAbort() {
if (isRunning()) {
intpA.cancel(context);
try {
intpA.cancel(context);
} catch (InterpreterException e) {
e.printStackTrace();
}
}
return true;
}
@ -259,7 +264,11 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
@Override
protected boolean jobAbort() {
if (isRunning()) {
intpA.cancel(context);
try {
intpA.cancel(context);
} catch (InterpreterException e) {
e.printStackTrace();
}
}
return true;
}