mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Implement z.run()
This commit is contained in:
parent
08990115dc
commit
dac416d26c
22 changed files with 544 additions and 83 deletions
|
|
@ -25,6 +25,8 @@ import java.io.PrintStream;
|
|||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
|
|
@ -35,6 +37,8 @@ import org.apache.zeppelin.display.AngularObjectWatcher;
|
|||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.display.Input.ParamOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.spark.dep.DependencyResolver;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
|
@ -66,12 +70,6 @@ public class ZeppelinContext extends HashMap<String, Object> {
|
|||
public HiveContext hiveContext;
|
||||
private GUI gui;
|
||||
|
||||
/* spark-1.3
|
||||
public SchemaRDD sql(String sql) {
|
||||
return sqlContext.sql(sql);
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Load dependency for interpreter and runtime (driver).
|
||||
* And distribute them to spark cluster (sc.add())
|
||||
|
|
@ -224,25 +222,6 @@ public class ZeppelinContext extends HashMap<String, Object> {
|
|||
this.gui = o;
|
||||
}
|
||||
|
||||
public void run(String lines) {
|
||||
/*
|
||||
String intpName = Paragraph.getRequiredReplName(lines);
|
||||
String scriptBody = Paragraph.getScriptBody(lines);
|
||||
Interpreter intp = interpreterContext.getParagraph().getRepl(intpName);
|
||||
InterpreterResult ret = intp.interpret(scriptBody, interpreterContext);
|
||||
if (ret.code() == InterpreterResult.Code.SUCCESS) {
|
||||
out.println("%" + ret.type().toString().toLowerCase() + " " + ret.message());
|
||||
} else if (ret.code() == InterpreterResult.Code.ERROR) {
|
||||
out.println("Error: " + ret.message());
|
||||
} else if (ret.code() == InterpreterResult.Code.INCOMPLETE) {
|
||||
out.println("Incomplete");
|
||||
} else {
|
||||
out.println("Unknown error");
|
||||
}
|
||||
*/
|
||||
throw new RuntimeException("Missing implementation");
|
||||
}
|
||||
|
||||
private void restartInterpreter() {
|
||||
}
|
||||
|
||||
|
|
@ -254,6 +233,86 @@ public class ZeppelinContext extends HashMap<String, Object> {
|
|||
this.interpreterContext = interpreterContext;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraph by id
|
||||
* @param id
|
||||
*/
|
||||
public void run(String id) {
|
||||
if (id.equals(interpreterContext.getParagraphId())) {
|
||||
throw new InterpreterException("Can not run current Paragraph");
|
||||
}
|
||||
|
||||
for (InterpreterContextRunner r : interpreterContext.getRunners()) {
|
||||
if (id.equals(r.getParagraphId())) {
|
||||
r.run();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
throw new InterpreterException("Paragraph " + id + " not found");
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraph at index
|
||||
* @param idx index starting from 0
|
||||
*/
|
||||
public void run(int idx) {
|
||||
if (idx >= interpreterContext.getRunners().size()) {
|
||||
throw new InterpreterException("Index out of bound");
|
||||
}
|
||||
|
||||
InterpreterContextRunner runner = interpreterContext.getRunners().get(idx);
|
||||
if (runner.getParagraphId().equals(interpreterContext.getParagraphId())) {
|
||||
throw new InterpreterException("Can not run current Paragraph");
|
||||
}
|
||||
|
||||
runner.run();
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraphs
|
||||
* @param paragraphIdOrIdxs list of paragraph id or idx
|
||||
*/
|
||||
public void run(List<Object> paragraphIdOrIdx) {
|
||||
for (Object idOrIdx : paragraphIdOrIdx) {
|
||||
if (idOrIdx instanceof String) {
|
||||
String id = (String) idOrIdx;
|
||||
run(id);
|
||||
} else if (idOrIdx instanceof Integer) {
|
||||
Integer idx = (Integer) idOrIdx;
|
||||
run(idx);
|
||||
} else {
|
||||
throw new InterpreterException("Paragraph " + idOrIdx + " not found");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Run all paragraphs. except this.
|
||||
*/
|
||||
public void runAll() {
|
||||
for (InterpreterContextRunner r : interpreterContext.getRunners()) {
|
||||
if (r.getParagraphId().equals(interpreterContext.getParagraphId())) {
|
||||
// skip itself
|
||||
continue;
|
||||
}
|
||||
r.run();
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> listParagraphs() {
|
||||
List<String> paragraphs = new LinkedList<String>();
|
||||
|
||||
for (InterpreterContextRunner r : interpreterContext.getRunners()) {
|
||||
paragraphs.add(r.getParagraphId());
|
||||
}
|
||||
|
||||
return paragraphs;
|
||||
}
|
||||
|
||||
|
||||
|
||||
public Object angular(String name) {
|
||||
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
|
||||
AngularObject ao = registry.get(name);
|
||||
|
|
|
|||
|
|
@ -21,16 +21,16 @@ import static org.junit.Assert.assertEquals;
|
|||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.spark.DepInterpreter;
|
||||
import org.apache.zeppelin.spark.SparkInterpreter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
@ -59,7 +59,8 @@ public class DepInterpreterTest {
|
|||
dep.setInterpreterGroup(intpGroup);
|
||||
|
||||
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>());
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
|||
|
|
@ -22,15 +22,16 @@ import static org.junit.Assert.assertTrue;
|
|||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.spark.SparkInterpreter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.FixMethodOrder;
|
||||
|
|
@ -59,7 +60,8 @@ public class SparkInterpreterTest {
|
|||
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>());
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
|||
|
|
@ -20,16 +20,16 @@ package org.apache.zeppelin.spark;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.spark.SparkInterpreter;
|
||||
import org.apache.zeppelin.spark.SparkSqlInterpreter;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
@ -64,7 +64,8 @@ public class SparkSqlInterpreterTest {
|
|||
sql.open();
|
||||
}
|
||||
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>());
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
|
|
@ -32,14 +33,15 @@ public class InterpreterContext {
|
|||
private final Map<String, Object> config;
|
||||
private GUI gui;
|
||||
private AngularObjectRegistry angularObjectRegistry;
|
||||
|
||||
private List<InterpreterContextRunner> runners;
|
||||
|
||||
public InterpreterContext(String paragraphId,
|
||||
String paragraphTitle,
|
||||
String paragraphText,
|
||||
Map<String, Object> config,
|
||||
GUI gui,
|
||||
AngularObjectRegistry angularObjectRegistry
|
||||
AngularObjectRegistry angularObjectRegistry,
|
||||
List<InterpreterContextRunner> runners
|
||||
) {
|
||||
this.paragraphId = paragraphId;
|
||||
this.paragraphTitle = paragraphTitle;
|
||||
|
|
@ -47,6 +49,7 @@ public class InterpreterContext {
|
|||
this.config = config;
|
||||
this.gui = gui;
|
||||
this.angularObjectRegistry = angularObjectRegistry;
|
||||
this.runners = runners;
|
||||
}
|
||||
|
||||
public String getParagraphId() {
|
||||
|
|
@ -73,4 +76,8 @@ public class InterpreterContext {
|
|||
return angularObjectRegistry;
|
||||
}
|
||||
|
||||
public List<InterpreterContextRunner> getRunners() {
|
||||
return runners;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class InterpreterContextRunner implements Runnable {
|
||||
String noteId;
|
||||
private String paragraphId;
|
||||
|
||||
public InterpreterContextRunner(String noteId, String paragraphId) {
|
||||
this.noteId = noteId;
|
||||
this.paragraphId = paragraphId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o instanceof InterpreterContextRunner) {
|
||||
InterpreterContextRunner io = ((InterpreterContextRunner) o);
|
||||
if (io.getParagraphId().equals(paragraphId) &&
|
||||
io.getNoteId().equals(noteId)) {
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract void run();
|
||||
|
||||
public String getNoteId() {
|
||||
return noteId;
|
||||
}
|
||||
|
||||
public String getParagraphId() {
|
||||
return paragraphId;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class InterpreterContextRunnerPool {
|
||||
Logger logger = LoggerFactory.getLogger(InterpreterContextRunnerPool.class);
|
||||
private Map<String, List<InterpreterContextRunner>> interpreterContextRunners;
|
||||
|
||||
public InterpreterContextRunnerPool() {
|
||||
interpreterContextRunners = new HashMap<String, List<InterpreterContextRunner>>();
|
||||
|
||||
}
|
||||
|
||||
// add runner
|
||||
public void add(String noteId, InterpreterContextRunner runner) {
|
||||
synchronized (interpreterContextRunners) {
|
||||
if (!interpreterContextRunners.containsKey(noteId)) {
|
||||
interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
|
||||
}
|
||||
|
||||
interpreterContextRunners.get(noteId).add(runner);
|
||||
}
|
||||
}
|
||||
|
||||
// replace all runners to noteId
|
||||
public void addAll(String noteId, List<InterpreterContextRunner> runners) {
|
||||
synchronized (interpreterContextRunners) {
|
||||
if (!interpreterContextRunners.containsKey(noteId)) {
|
||||
interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
|
||||
}
|
||||
|
||||
interpreterContextRunners.get(noteId).addAll(runners);
|
||||
}
|
||||
}
|
||||
|
||||
public void clear(String noteId) {
|
||||
synchronized (interpreterContextRunners) {
|
||||
interpreterContextRunners.remove(noteId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void run(String noteId, String paragraphId) {
|
||||
synchronized (interpreterContextRunners) {
|
||||
List<InterpreterContextRunner> list = interpreterContextRunners.get(noteId);
|
||||
if (list != null) {
|
||||
for (InterpreterContextRunner r : list) {
|
||||
if (noteId.equals(r.getNoteId()) && paragraphId.equals(r.getParagraphId())) {
|
||||
logger.info("run paragraph {} on note {} from InterpreterContext",
|
||||
r.getParagraphId(), r.getNoteId());
|
||||
r.run();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -26,6 +26,7 @@ import org.apache.thrift.TException;
|
|||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
|
|
@ -56,6 +57,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
static Map<String, RemoteInterpreterProcess> interpreterGroupReference
|
||||
= new HashMap<String, RemoteInterpreterProcess>();
|
||||
|
||||
private InterpreterContextRunnerPool interpreterContextRunnerPool;
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
|
|
@ -67,6 +70,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.interpreterRunner = interpreterRunner;
|
||||
this.interpreterPath = interpreterPath;
|
||||
env = new HashMap<String, String>();
|
||||
interpreterContextRunnerPool = new InterpreterContextRunnerPool();
|
||||
}
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
|
|
@ -187,6 +191,15 @@ public class RemoteInterpreter extends Interpreter {
|
|||
throw new InterpreterException(e1);
|
||||
}
|
||||
|
||||
List<InterpreterContextRunner> runners = context.getRunners();
|
||||
if (runners != null && runners.size() != 0) {
|
||||
// assume all runners in this InterpreterContext have the same note id
|
||||
String noteId = runners.get(0).getNoteId();
|
||||
|
||||
interpreterContextRunnerPool.clear(noteId);
|
||||
interpreterContextRunnerPool.addAll(noteId, runners);
|
||||
}
|
||||
|
||||
try {
|
||||
GUI settings = context.getGui();
|
||||
RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context));
|
||||
|
|
@ -316,7 +329,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
.containsKey(getInterpreterGroupKey(interpreterGroup))) {
|
||||
interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup),
|
||||
new RemoteInterpreterProcess(interpreterRunner,
|
||||
interpreterPath, env));
|
||||
interpreterPath, env, interpreterContextRunnerPool));
|
||||
|
||||
logger.info("setInterpreterGroup = "
|
||||
+ getInterpreterGroupKey(interpreterGroup) + " class=" + className
|
||||
|
|
@ -335,7 +348,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
ic.getParagraphTitle(),
|
||||
ic.getParagraphText(),
|
||||
gson.toJson(ic.getConfig()),
|
||||
gson.toJson(ic.getGui()));
|
||||
gson.toJson(ic.getGui()),
|
||||
gson.toJson(ic.getRunners()));
|
||||
}
|
||||
|
||||
private InterpreterResult convert(RemoteInterpreterResult result) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class RemoteInterpreterContextRunner extends InterpreterContextRunner {
|
||||
|
||||
public RemoteInterpreterContextRunner(String noteId, String paragraphId) {
|
||||
super(noteId, paragraphId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// this class should be used only for gson deserialize abstract class
|
||||
// code should not reach here
|
||||
throw new InterpreterException("Assert");
|
||||
}
|
||||
}
|
||||
|
|
@ -18,16 +18,17 @@
|
|||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
@ -103,6 +104,12 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
|
||||
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
|
||||
angularObjectRegistry.remove(angularObject.getName());
|
||||
} else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
|
||||
InterpreterContextRunner runnerFromRemote = gson.fromJson(
|
||||
event.getData(), RemoteInterpreterContextRunner.class);
|
||||
|
||||
interpreterProcess.getInterpreterContextRunnerPool().run(
|
||||
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Can't handle event " + event, e);
|
||||
|
|
|
|||
|
|
@ -28,10 +28,10 @@ import org.apache.commons.exec.ExecuteResultHandler;
|
|||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPool;
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.apache.thrift.TException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -53,11 +53,16 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
private GenericObjectPool<Client> clientPool;
|
||||
private Map<String, String> env;
|
||||
private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
|
||||
private InterpreterContextRunnerPool interpreterContextRunnerPool;
|
||||
|
||||
public RemoteInterpreterProcess(String intpRunner, String intpDir, Map<String, String> env) {
|
||||
public RemoteInterpreterProcess(String intpRunner,
|
||||
String intpDir,
|
||||
Map<String, String> env,
|
||||
InterpreterContextRunnerPool interpreterContextRunnerPool) {
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.interpreterDir = intpDir;
|
||||
this.env = env;
|
||||
this.interpreterContextRunnerPool = interpreterContextRunnerPool;
|
||||
referenceCount = new AtomicInteger(0);
|
||||
}
|
||||
|
||||
|
|
@ -242,4 +247,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
releaseClient(client);
|
||||
}
|
||||
}
|
||||
|
||||
public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
|
||||
return interpreterContextRunnerPool;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,22 +37,23 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
|||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.FormType;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.FormType;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
import org.apache.zeppelin.scheduler.JobProgressPoller;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -69,7 +70,6 @@ public class RemoteInterpreterServer
|
|||
|
||||
InterpreterGroup interpreterGroup;
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
|
||||
|
|
@ -314,6 +314,14 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
private InterpreterContext convert(RemoteInterpreterContext ric) {
|
||||
List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
|
||||
List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
|
||||
new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType());
|
||||
|
||||
for (InterpreterContextRunner r : runners) {
|
||||
contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
|
||||
}
|
||||
|
||||
return new InterpreterContext(
|
||||
ric.getParagraphId(),
|
||||
ric.getParagraphTitle(),
|
||||
|
|
@ -321,7 +329,27 @@ public class RemoteInterpreterServer
|
|||
(Map<String, Object>) gson.fromJson(ric.getConfig(),
|
||||
new TypeToken<Map<String, Object>>() {}.getType()),
|
||||
gson.fromJson(ric.getGui(), GUI.class),
|
||||
interpreterGroup.getAngularObjectRegistry());
|
||||
interpreterGroup.getAngularObjectRegistry(),
|
||||
contextRunners);
|
||||
}
|
||||
|
||||
|
||||
static class ParagraphRunner extends InterpreterContextRunner {
|
||||
|
||||
private transient RemoteInterpreterServer server;
|
||||
|
||||
public ParagraphRunner(RemoteInterpreterServer server, String noteId, String paragraphId) {
|
||||
super(noteId, paragraphId);
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Gson gson = new Gson();
|
||||
server.sendEvent(new RemoteInterpreterEvent(
|
||||
RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER,
|
||||
gson.toJson(this)));
|
||||
}
|
||||
}
|
||||
|
||||
private RemoteInterpreterResult convert(InterpreterResult result,
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
private static final org.apache.thrift.protocol.TField PARAGRAPH_TEXT_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphText", org.apache.thrift.protocol.TType.STRING, (short)3);
|
||||
private static final org.apache.thrift.protocol.TField CONFIG_FIELD_DESC = new org.apache.thrift.protocol.TField("config", org.apache.thrift.protocol.TType.STRING, (short)4);
|
||||
private static final org.apache.thrift.protocol.TField GUI_FIELD_DESC = new org.apache.thrift.protocol.TField("gui", org.apache.thrift.protocol.TType.STRING, (short)5);
|
||||
private static final org.apache.thrift.protocol.TField RUNNERS_FIELD_DESC = new org.apache.thrift.protocol.TField("runners", org.apache.thrift.protocol.TType.STRING, (short)6);
|
||||
|
||||
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
|
||||
static {
|
||||
|
|
@ -50,6 +51,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
public String paragraphText; // required
|
||||
public String config; // required
|
||||
public String gui; // required
|
||||
public String runners; // required
|
||||
|
||||
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
|
||||
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
|
||||
|
|
@ -57,7 +59,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
PARAGRAPH_TITLE((short)2, "paragraphTitle"),
|
||||
PARAGRAPH_TEXT((short)3, "paragraphText"),
|
||||
CONFIG((short)4, "config"),
|
||||
GUI((short)5, "gui");
|
||||
GUI((short)5, "gui"),
|
||||
RUNNERS((short)6, "runners");
|
||||
|
||||
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
|
||||
|
||||
|
|
@ -82,6 +85,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
return CONFIG;
|
||||
case 5: // GUI
|
||||
return GUI;
|
||||
case 6: // RUNNERS
|
||||
return RUNNERS;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
|
@ -135,6 +140,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
tmpMap.put(_Fields.GUI, new org.apache.thrift.meta_data.FieldMetaData("gui", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
tmpMap.put(_Fields.RUNNERS, new org.apache.thrift.meta_data.FieldMetaData("runners", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
metaDataMap = Collections.unmodifiableMap(tmpMap);
|
||||
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteInterpreterContext.class, metaDataMap);
|
||||
}
|
||||
|
|
@ -147,7 +154,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
String paragraphTitle,
|
||||
String paragraphText,
|
||||
String config,
|
||||
String gui)
|
||||
String gui,
|
||||
String runners)
|
||||
{
|
||||
this();
|
||||
this.paragraphId = paragraphId;
|
||||
|
|
@ -155,6 +163,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
this.paragraphText = paragraphText;
|
||||
this.config = config;
|
||||
this.gui = gui;
|
||||
this.runners = runners;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -176,6 +185,9 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
if (other.isSetGui()) {
|
||||
this.gui = other.gui;
|
||||
}
|
||||
if (other.isSetRunners()) {
|
||||
this.runners = other.runners;
|
||||
}
|
||||
}
|
||||
|
||||
public RemoteInterpreterContext deepCopy() {
|
||||
|
|
@ -189,6 +201,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
this.paragraphText = null;
|
||||
this.config = null;
|
||||
this.gui = null;
|
||||
this.runners = null;
|
||||
}
|
||||
|
||||
public String getParagraphId() {
|
||||
|
|
@ -311,6 +324,30 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
}
|
||||
}
|
||||
|
||||
public String getRunners() {
|
||||
return this.runners;
|
||||
}
|
||||
|
||||
public RemoteInterpreterContext setRunners(String runners) {
|
||||
this.runners = runners;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetRunners() {
|
||||
this.runners = null;
|
||||
}
|
||||
|
||||
/** Returns true if field runners is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetRunners() {
|
||||
return this.runners != null;
|
||||
}
|
||||
|
||||
public void setRunnersIsSet(boolean value) {
|
||||
if (!value) {
|
||||
this.runners = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setFieldValue(_Fields field, Object value) {
|
||||
switch (field) {
|
||||
case PARAGRAPH_ID:
|
||||
|
|
@ -353,6 +390,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
}
|
||||
break;
|
||||
|
||||
case RUNNERS:
|
||||
if (value == null) {
|
||||
unsetRunners();
|
||||
} else {
|
||||
setRunners((String)value);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -373,6 +418,9 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
case GUI:
|
||||
return getGui();
|
||||
|
||||
case RUNNERS:
|
||||
return getRunners();
|
||||
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
|
@ -394,6 +442,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
return isSetConfig();
|
||||
case GUI:
|
||||
return isSetGui();
|
||||
case RUNNERS:
|
||||
return isSetRunners();
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
|
@ -456,6 +506,15 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
return false;
|
||||
}
|
||||
|
||||
boolean this_present_runners = true && this.isSetRunners();
|
||||
boolean that_present_runners = true && that.isSetRunners();
|
||||
if (this_present_runners || that_present_runners) {
|
||||
if (!(this_present_runners && that_present_runners))
|
||||
return false;
|
||||
if (!this.runners.equals(that.runners))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -522,6 +581,16 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
return lastComparison;
|
||||
}
|
||||
}
|
||||
lastComparison = Boolean.valueOf(isSetRunners()).compareTo(typedOther.isSetRunners());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetRunners()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.runners, typedOther.runners);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -581,6 +650,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
sb.append(this.gui);
|
||||
}
|
||||
first = false;
|
||||
if (!first) sb.append(", ");
|
||||
sb.append("runners:");
|
||||
if (this.runners == null) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
sb.append(this.runners);
|
||||
}
|
||||
first = false;
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
|
@ -664,6 +741,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
case 6: // RUNNERS
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
|
||||
struct.runners = iprot.readString();
|
||||
struct.setRunnersIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
|
|
@ -704,6 +789,11 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
oprot.writeString(struct.gui);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
if (struct.runners != null) {
|
||||
oprot.writeFieldBegin(RUNNERS_FIELD_DESC);
|
||||
oprot.writeString(struct.runners);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
oprot.writeFieldStop();
|
||||
oprot.writeStructEnd();
|
||||
}
|
||||
|
|
@ -737,7 +827,10 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
if (struct.isSetGui()) {
|
||||
optionals.set(4);
|
||||
}
|
||||
oprot.writeBitSet(optionals, 5);
|
||||
if (struct.isSetRunners()) {
|
||||
optionals.set(5);
|
||||
}
|
||||
oprot.writeBitSet(optionals, 6);
|
||||
if (struct.isSetParagraphId()) {
|
||||
oprot.writeString(struct.paragraphId);
|
||||
}
|
||||
|
|
@ -753,12 +846,15 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
if (struct.isSetGui()) {
|
||||
oprot.writeString(struct.gui);
|
||||
}
|
||||
if (struct.isSetRunners()) {
|
||||
oprot.writeString(struct.runners);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void read(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterContext struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol iprot = (TTupleProtocol) prot;
|
||||
BitSet incoming = iprot.readBitSet(5);
|
||||
BitSet incoming = iprot.readBitSet(6);
|
||||
if (incoming.get(0)) {
|
||||
struct.paragraphId = iprot.readString();
|
||||
struct.setParagraphIdIsSet(true);
|
||||
|
|
@ -779,6 +875,10 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
|
|||
struct.gui = iprot.readString();
|
||||
struct.setGuiIsSet(true);
|
||||
}
|
||||
if (incoming.get(5)) {
|
||||
struct.runners = iprot.readString();
|
||||
struct.setRunnersIsSet(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
|||
NO_OP(1),
|
||||
ANGULAR_OBJECT_ADD(2),
|
||||
ANGULAR_OBJECT_UPDATE(3),
|
||||
ANGULAR_OBJECT_REMOVE(4);
|
||||
ANGULAR_OBJECT_REMOVE(4),
|
||||
RUN_INTERPRETER_CONTEXT_RUNNER(5);
|
||||
|
||||
private final int value;
|
||||
|
||||
|
|
@ -44,6 +45,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
|||
return ANGULAR_OBJECT_UPDATE;
|
||||
case 4:
|
||||
return ANGULAR_OBJECT_REMOVE;
|
||||
case 5:
|
||||
return RUN_INTERPRETER_CONTEXT_RUNNER;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,8 @@ struct RemoteInterpreterContext {
|
|||
2: string paragraphTitle,
|
||||
3: string paragraphText,
|
||||
4: string config, // json serialized config
|
||||
5: string gui // json serialized gui
|
||||
5: string gui, // json serialized gui
|
||||
6: string runners // json serialized runner
|
||||
}
|
||||
|
||||
struct RemoteInterpreterResult {
|
||||
|
|
@ -39,7 +40,8 @@ enum RemoteInterpreterEventType {
|
|||
NO_OP = 1,
|
||||
ANGULAR_OBJECT_ADD = 2,
|
||||
ANGULAR_OBJECT_UPDATE = 3,
|
||||
ANGULAR_OBJECT_REMOVE = 4
|
||||
ANGULAR_OBJECT_REMOVE = 4,
|
||||
RUN_INTERPRETER_CONTEXT_RUNNER = 5
|
||||
}
|
||||
|
||||
struct RemoteInterpreterEvent {
|
||||
|
|
|
|||
|
|
@ -21,21 +21,22 @@ import static org.junit.Assert.assertEquals;
|
|||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
|
||||
private InterpreterGroup intpGroup;
|
||||
|
|
@ -79,7 +80,8 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
|
|||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>());
|
||||
|
||||
intp.open();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse;
|
|||
import java.util.HashMap;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.junit.Test;
|
||||
|
||||
|
|
@ -32,7 +31,8 @@ public class RemoteInterpreterProcessTest {
|
|||
@Test
|
||||
public void testStartStop() {
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
|
||||
new InterpreterContextRunnerPool());
|
||||
assertFalse(rip.isRunning());
|
||||
assertEquals(0, rip.referenceCount());
|
||||
assertEquals(1, rip.reference(intpGroup));
|
||||
|
|
@ -47,7 +47,8 @@ public class RemoteInterpreterProcessTest {
|
|||
@Test
|
||||
public void testClientFactory() throws Exception {
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
|
||||
new InterpreterContextRunnerPool());
|
||||
rip.reference(intpGroup);
|
||||
assertEquals(0, rip.getNumActiveClient());
|
||||
assertEquals(0, rip.getNumIdleClient());
|
||||
|
|
|
|||
|
|
@ -33,10 +33,9 @@ import org.apache.thrift.transport.TTransportException;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
|
|
@ -111,7 +110,8 @@ public class RemoteInterpreterTest {
|
|||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>()));
|
||||
|
||||
intpB.open();
|
||||
assertEquals(2, process.referenceCount());
|
||||
|
|
@ -162,7 +162,8 @@ public class RemoteInterpreterTest {
|
|||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>()));
|
||||
assertEquals("500", ret.message());
|
||||
|
||||
ret = intpB.interpret("500",
|
||||
|
|
@ -172,7 +173,8 @@ public class RemoteInterpreterTest {
|
|||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>()));
|
||||
assertEquals("1000", ret.message());
|
||||
long end = System.currentTimeMillis();
|
||||
assertTrue(end - start >= 1000);
|
||||
|
|
@ -236,7 +238,8 @@ public class RemoteInterpreterTest {
|
|||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -268,7 +271,8 @@ public class RemoteInterpreterTest {
|
|||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>()));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -340,7 +344,8 @@ public class RemoteInterpreterTest {
|
|||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>()));
|
||||
|
||||
synchronized (results) {
|
||||
results.add(ret.message());
|
||||
|
|
@ -421,7 +426,8 @@ public class RemoteInterpreterTest {
|
|||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>()));
|
||||
|
||||
synchronized (results) {
|
||||
results.add(ret.message());
|
||||
|
|
|
|||
|
|
@ -21,18 +21,17 @@ import static org.junit.Assert.assertEquals;
|
|||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
|
@ -95,7 +94,8 @@ public class RemoteSchedulerTest {
|
|||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
new AngularObjectRegistry(intpGroup.getId(), null),
|
||||
new LinkedList<InterpreterContextRunner>()));
|
||||
return "1000";
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
|
|
@ -36,13 +35,14 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
|||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.notebook.utility.IdHashes;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -126,7 +126,7 @@ public class Note implements Serializable, JobListener {
|
|||
* @param p
|
||||
*/
|
||||
public Paragraph addParagraph() {
|
||||
Paragraph p = new Paragraph(this, replLoader);
|
||||
Paragraph p = new Paragraph(this, this, replLoader);
|
||||
synchronized (paragraphs) {
|
||||
paragraphs.add(p);
|
||||
}
|
||||
|
|
@ -140,7 +140,7 @@ public class Note implements Serializable, JobListener {
|
|||
* @param p
|
||||
*/
|
||||
public Paragraph insertParagraph(int index) {
|
||||
Paragraph p = new Paragraph(this, replLoader);
|
||||
Paragraph p = new Paragraph(this, this, replLoader);
|
||||
synchronized (paragraphs) {
|
||||
paragraphs.add(index, p);
|
||||
}
|
||||
|
|
@ -342,6 +342,8 @@ public class Note implements Serializable, JobListener {
|
|||
note.setReplLoader(replLoader);
|
||||
note.jobListenerFactory = jobListenerFactory;
|
||||
for (Paragraph p : note.paragraphs) {
|
||||
p.setNote(note);
|
||||
|
||||
if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
|
||||
p.setStatus(Status.ABORT);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package org.apache.zeppelin.notebook;
|
|||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
|
@ -27,10 +28,11 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
|
|||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.display.Input;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.FormType;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.FormType;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -44,14 +46,16 @@ import org.slf4j.LoggerFactory;
|
|||
public class Paragraph extends Job implements Serializable {
|
||||
private static final transient long serialVersionUID = -6328572073497992016L;
|
||||
private transient NoteInterpreterLoader replLoader;
|
||||
private transient Note note;
|
||||
|
||||
String title;
|
||||
String text;
|
||||
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
|
||||
public final GUI settings; // form and parameter settings
|
||||
|
||||
public Paragraph(JobListener listener, NoteInterpreterLoader replLoader) {
|
||||
public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoader) {
|
||||
super(generateId(), listener);
|
||||
this.note = note;
|
||||
this.replLoader = replLoader;
|
||||
title = null;
|
||||
text = null;
|
||||
|
|
@ -81,6 +85,14 @@ public class Paragraph extends Job implements Serializable {
|
|||
this.title = title;
|
||||
}
|
||||
|
||||
public void setNote(Note note) {
|
||||
this.note = note;
|
||||
}
|
||||
|
||||
public Note getNote() {
|
||||
return note;
|
||||
}
|
||||
|
||||
public String getRequiredReplName() {
|
||||
return getRequiredReplName(text);
|
||||
}
|
||||
|
|
@ -216,15 +228,36 @@ public class Paragraph extends Job implements Serializable {
|
|||
registry = intpGroup.getInterpreterGroup().getAngularObjectRegistry();
|
||||
}
|
||||
|
||||
List<InterpreterContextRunner> runners = new LinkedList<InterpreterContextRunner>();
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
runners.add(new ParagraphRunner(note, note.id(), p.getId()));
|
||||
}
|
||||
|
||||
InterpreterContext interpreterContext = new InterpreterContext(getId(),
|
||||
this.getTitle(),
|
||||
this.getText(),
|
||||
this.getConfig(),
|
||||
this.settings,
|
||||
registry);
|
||||
registry,
|
||||
runners);
|
||||
return interpreterContext;
|
||||
}
|
||||
|
||||
static class ParagraphRunner extends InterpreterContextRunner {
|
||||
private Note note;
|
||||
|
||||
public ParagraphRunner(Note note, String noteId, String paragraphId) {
|
||||
super(noteId, paragraphId);
|
||||
this.note = note;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
note.run(getParagraphId());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Logger logger() {
|
||||
Logger logger = LoggerFactory.getLogger(Paragraph.class);
|
||||
return logger;
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ public class InterpreterFactoryTest {
|
|||
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
conf = new ZeppelinConfiguration();
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
|
||||
context = new InterpreterContext("id", "title", "text", null, null, null);
|
||||
context = new InterpreterContext("id", "title", "text", null, null, null, null);
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue