Implement z.run()

This commit is contained in:
Lee moon soo 2015-04-07 20:13:11 +09:00
parent 08990115dc
commit dac416d26c
22 changed files with 544 additions and 83 deletions

View file

@ -25,6 +25,8 @@ import java.io.PrintStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
@ -35,6 +37,8 @@ import org.apache.zeppelin.display.AngularObjectWatcher;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.spark.dep.DependencyResolver;
import scala.Tuple2;
@ -66,12 +70,6 @@ public class ZeppelinContext extends HashMap<String, Object> {
public HiveContext hiveContext;
private GUI gui;
/* spark-1.3
public SchemaRDD sql(String sql) {
return sqlContext.sql(sql);
}
*/
/**
* Load dependency for interpreter and runtime (driver).
* And distribute them to spark cluster (sc.add())
@ -224,25 +222,6 @@ public class ZeppelinContext extends HashMap<String, Object> {
this.gui = o;
}
public void run(String lines) {
/*
String intpName = Paragraph.getRequiredReplName(lines);
String scriptBody = Paragraph.getScriptBody(lines);
Interpreter intp = interpreterContext.getParagraph().getRepl(intpName);
InterpreterResult ret = intp.interpret(scriptBody, interpreterContext);
if (ret.code() == InterpreterResult.Code.SUCCESS) {
out.println("%" + ret.type().toString().toLowerCase() + " " + ret.message());
} else if (ret.code() == InterpreterResult.Code.ERROR) {
out.println("Error: " + ret.message());
} else if (ret.code() == InterpreterResult.Code.INCOMPLETE) {
out.println("Incomplete");
} else {
out.println("Unknown error");
}
*/
throw new RuntimeException("Missing implementation");
}
private void restartInterpreter() {
}
@ -254,6 +233,86 @@ public class ZeppelinContext extends HashMap<String, Object> {
this.interpreterContext = interpreterContext;
}
/**
* Run paragraph by id
* @param id
*/
public void run(String id) {
if (id.equals(interpreterContext.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
}
for (InterpreterContextRunner r : interpreterContext.getRunners()) {
if (id.equals(r.getParagraphId())) {
r.run();
return;
}
}
throw new InterpreterException("Paragraph " + id + " not found");
}
/**
* Run paragraph at index
* @param idx index starting from 0
*/
public void run(int idx) {
if (idx >= interpreterContext.getRunners().size()) {
throw new InterpreterException("Index out of bound");
}
InterpreterContextRunner runner = interpreterContext.getRunners().get(idx);
if (runner.getParagraphId().equals(interpreterContext.getParagraphId())) {
throw new InterpreterException("Can not run current Paragraph");
}
runner.run();
}
/**
* Run paragraphs
* @param paragraphIdOrIdxs list of paragraph id or idx
*/
public void run(List<Object> paragraphIdOrIdx) {
for (Object idOrIdx : paragraphIdOrIdx) {
if (idOrIdx instanceof String) {
String id = (String) idOrIdx;
run(id);
} else if (idOrIdx instanceof Integer) {
Integer idx = (Integer) idOrIdx;
run(idx);
} else {
throw new InterpreterException("Paragraph " + idOrIdx + " not found");
}
}
}
/**
* Run all paragraphs. except this.
*/
public void runAll() {
for (InterpreterContextRunner r : interpreterContext.getRunners()) {
if (r.getParagraphId().equals(interpreterContext.getParagraphId())) {
// skip itself
continue;
}
r.run();
}
}
public List<String> listParagraphs() {
List<String> paragraphs = new LinkedList<String>();
for (InterpreterContextRunner r : interpreterContext.getRunners()) {
paragraphs.add(r.getParagraphId());
}
return paragraphs;
}
public Object angular(String name) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
AngularObject ao = registry.get(name);

View file

@ -21,16 +21,16 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.spark.DepInterpreter;
import org.apache.zeppelin.spark.SparkInterpreter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -59,7 +59,8 @@ public class DepInterpreterTest {
dep.setInterpreterGroup(intpGroup);
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
}
@After

View file

@ -22,15 +22,16 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.spark.SparkInterpreter;
import org.junit.After;
import org.junit.Before;
import org.junit.FixMethodOrder;
@ -59,7 +60,8 @@ public class SparkInterpreterTest {
InterpreterGroup intpGroup = new InterpreterGroup();
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
}
@After

View file

@ -20,16 +20,16 @@ package org.apache.zeppelin.spark;
import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.spark.SparkInterpreter;
import org.apache.zeppelin.spark.SparkSqlInterpreter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -64,7 +64,8 @@ public class SparkSqlInterpreterTest {
sql.open();
}
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
}
@After

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter;
import java.util.List;
import java.util.Map;
import org.apache.zeppelin.display.AngularObjectRegistry;
@ -32,14 +33,15 @@ public class InterpreterContext {
private final Map<String, Object> config;
private GUI gui;
private AngularObjectRegistry angularObjectRegistry;
private List<InterpreterContextRunner> runners;
public InterpreterContext(String paragraphId,
String paragraphTitle,
String paragraphText,
Map<String, Object> config,
GUI gui,
AngularObjectRegistry angularObjectRegistry
AngularObjectRegistry angularObjectRegistry,
List<InterpreterContextRunner> runners
) {
this.paragraphId = paragraphId;
this.paragraphTitle = paragraphTitle;
@ -47,6 +49,7 @@ public class InterpreterContext {
this.config = config;
this.gui = gui;
this.angularObjectRegistry = angularObjectRegistry;
this.runners = runners;
}
public String getParagraphId() {
@ -73,4 +76,8 @@ public class InterpreterContext {
return angularObjectRegistry;
}
public List<InterpreterContextRunner> getRunners() {
return runners;
}
}

View file

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
/**
*/
public abstract class InterpreterContextRunner implements Runnable {
String noteId;
private String paragraphId;
public InterpreterContextRunner(String noteId, String paragraphId) {
this.noteId = noteId;
this.paragraphId = paragraphId;
}
@Override
public boolean equals(Object o) {
if (o instanceof InterpreterContextRunner) {
InterpreterContextRunner io = ((InterpreterContextRunner) o);
if (io.getParagraphId().equals(paragraphId) &&
io.getNoteId().equals(noteId)) {
return true;
} else {
return false;
}
} else {
return false;
}
}
@Override
public abstract void run();
public String getNoteId() {
return noteId;
}
public String getParagraphId() {
return paragraphId;
}
}

View file

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class InterpreterContextRunnerPool {
Logger logger = LoggerFactory.getLogger(InterpreterContextRunnerPool.class);
private Map<String, List<InterpreterContextRunner>> interpreterContextRunners;
public InterpreterContextRunnerPool() {
interpreterContextRunners = new HashMap<String, List<InterpreterContextRunner>>();
}
// add runner
public void add(String noteId, InterpreterContextRunner runner) {
synchronized (interpreterContextRunners) {
if (!interpreterContextRunners.containsKey(noteId)) {
interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
}
interpreterContextRunners.get(noteId).add(runner);
}
}
// replace all runners to noteId
public void addAll(String noteId, List<InterpreterContextRunner> runners) {
synchronized (interpreterContextRunners) {
if (!interpreterContextRunners.containsKey(noteId)) {
interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>());
}
interpreterContextRunners.get(noteId).addAll(runners);
}
}
public void clear(String noteId) {
synchronized (interpreterContextRunners) {
interpreterContextRunners.remove(noteId);
}
}
public void run(String noteId, String paragraphId) {
synchronized (interpreterContextRunners) {
List<InterpreterContextRunner> list = interpreterContextRunners.get(noteId);
if (list != null) {
for (InterpreterContextRunner r : list) {
if (noteId.equals(r.getNoteId()) && paragraphId.equals(r.getParagraphId())) {
logger.info("run paragraph {} on note {} from InterpreterContext",
r.getParagraphId(), r.getNoteId());
r.run();
return;
}
}
}
throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId);
}
}
}

View file

@ -26,6 +26,7 @@ import org.apache.thrift.TException;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
@ -56,6 +57,8 @@ public class RemoteInterpreter extends Interpreter {
static Map<String, RemoteInterpreterProcess> interpreterGroupReference
= new HashMap<String, RemoteInterpreterProcess>();
private InterpreterContextRunnerPool interpreterContextRunnerPool;
public RemoteInterpreter(Properties property,
String className,
String interpreterRunner,
@ -67,6 +70,7 @@ public class RemoteInterpreter extends Interpreter {
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
env = new HashMap<String, String>();
interpreterContextRunnerPool = new InterpreterContextRunnerPool();
}
public RemoteInterpreter(Properties property,
@ -187,6 +191,15 @@ public class RemoteInterpreter extends Interpreter {
throw new InterpreterException(e1);
}
List<InterpreterContextRunner> runners = context.getRunners();
if (runners != null && runners.size() != 0) {
// assume all runners in this InterpreterContext have the same note id
String noteId = runners.get(0).getNoteId();
interpreterContextRunnerPool.clear(noteId);
interpreterContextRunnerPool.addAll(noteId, runners);
}
try {
GUI settings = context.getGui();
RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context));
@ -316,7 +329,7 @@ public class RemoteInterpreter extends Interpreter {
.containsKey(getInterpreterGroupKey(interpreterGroup))) {
interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup),
new RemoteInterpreterProcess(interpreterRunner,
interpreterPath, env));
interpreterPath, env, interpreterContextRunnerPool));
logger.info("setInterpreterGroup = "
+ getInterpreterGroupKey(interpreterGroup) + " class=" + className
@ -335,7 +348,8 @@ public class RemoteInterpreter extends Interpreter {
ic.getParagraphTitle(),
ic.getParagraphText(),
gson.toJson(ic.getConfig()),
gson.toJson(ic.getGui()));
gson.toJson(ic.getGui()),
gson.toJson(ic.getRunners()));
}
private InterpreterResult convert(RemoteInterpreterResult result) {

View file

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
/**
*
*/
public class RemoteInterpreterContextRunner extends InterpreterContextRunner {
public RemoteInterpreterContextRunner(String noteId, String paragraphId) {
super(noteId, paragraphId);
}
@Override
public void run() {
// this class should be used only for gson deserialize abstract class
// code should not reach here
throw new InterpreterException("Assert");
}
}

View file

@ -18,16 +18,17 @@
package org.apache.zeppelin.interpreter.remote;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
/**
*
@ -103,6 +104,12 @@ public class RemoteInterpreterEventPoller extends Thread {
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
angularObjectRegistry.remove(angularObject.getName());
} else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
InterpreterContextRunner runnerFromRemote = gson.fromJson(
event.getData(), RemoteInterpreterContextRunner.class);
interpreterProcess.getInterpreterContextRunnerPool().run(
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
}
} catch (Exception e) {
logger.error("Can't handle event " + event, e);

View file

@ -28,10 +28,10 @@ import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,11 +53,16 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
private GenericObjectPool<Client> clientPool;
private Map<String, String> env;
private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
private InterpreterContextRunnerPool interpreterContextRunnerPool;
public RemoteInterpreterProcess(String intpRunner, String intpDir, Map<String, String> env) {
public RemoteInterpreterProcess(String intpRunner,
String intpDir,
Map<String, String> env,
InterpreterContextRunnerPool interpreterContextRunnerPool) {
this.interpreterRunner = intpRunner;
this.interpreterDir = intpDir;
this.env = env;
this.interpreterContextRunnerPool = interpreterContextRunnerPool;
referenceCount = new AtomicInteger(0);
}
@ -242,4 +247,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
releaseClient(client);
}
}
public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
return interpreterContextRunnerPool;
}
}

View file

@ -37,22 +37,23 @@ import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.JobProgressPoller;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.Job.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -69,7 +70,6 @@ public class RemoteInterpreterServer
InterpreterGroup interpreterGroup;
AngularObjectRegistry angularObjectRegistry;
Gson gson = new Gson();
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
@ -314,6 +314,14 @@ public class RemoteInterpreterServer
}
private InterpreterContext convert(RemoteInterpreterContext ric) {
List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
new TypeToken<List<RemoteInterpreterContextRunner>>(){}.getType());
for (InterpreterContextRunner r : runners) {
contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId()));
}
return new InterpreterContext(
ric.getParagraphId(),
ric.getParagraphTitle(),
@ -321,7 +329,27 @@ public class RemoteInterpreterServer
(Map<String, Object>) gson.fromJson(ric.getConfig(),
new TypeToken<Map<String, Object>>() {}.getType()),
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry());
interpreterGroup.getAngularObjectRegistry(),
contextRunners);
}
static class ParagraphRunner extends InterpreterContextRunner {
private transient RemoteInterpreterServer server;
public ParagraphRunner(RemoteInterpreterServer server, String noteId, String paragraphId) {
super(noteId, paragraphId);
this.server = server;
}
@Override
public void run() {
Gson gson = new Gson();
server.sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER,
gson.toJson(this)));
}
}
private RemoteInterpreterResult convert(InterpreterResult result,

View file

@ -38,6 +38,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
private static final org.apache.thrift.protocol.TField PARAGRAPH_TEXT_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphText", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final org.apache.thrift.protocol.TField CONFIG_FIELD_DESC = new org.apache.thrift.protocol.TField("config", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField GUI_FIELD_DESC = new org.apache.thrift.protocol.TField("gui", org.apache.thrift.protocol.TType.STRING, (short)5);
private static final org.apache.thrift.protocol.TField RUNNERS_FIELD_DESC = new org.apache.thrift.protocol.TField("runners", org.apache.thrift.protocol.TType.STRING, (short)6);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@ -50,6 +51,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
public String paragraphText; // required
public String config; // required
public String gui; // required
public String runners; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@ -57,7 +59,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
PARAGRAPH_TITLE((short)2, "paragraphTitle"),
PARAGRAPH_TEXT((short)3, "paragraphText"),
CONFIG((short)4, "config"),
GUI((short)5, "gui");
GUI((short)5, "gui"),
RUNNERS((short)6, "runners");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@ -82,6 +85,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return CONFIG;
case 5: // GUI
return GUI;
case 6: // RUNNERS
return RUNNERS;
default:
return null;
}
@ -135,6 +140,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.GUI, new org.apache.thrift.meta_data.FieldMetaData("gui", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.RUNNERS, new org.apache.thrift.meta_data.FieldMetaData("runners", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteInterpreterContext.class, metaDataMap);
}
@ -147,7 +154,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
String paragraphTitle,
String paragraphText,
String config,
String gui)
String gui,
String runners)
{
this();
this.paragraphId = paragraphId;
@ -155,6 +163,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
this.paragraphText = paragraphText;
this.config = config;
this.gui = gui;
this.runners = runners;
}
/**
@ -176,6 +185,9 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
if (other.isSetGui()) {
this.gui = other.gui;
}
if (other.isSetRunners()) {
this.runners = other.runners;
}
}
public RemoteInterpreterContext deepCopy() {
@ -189,6 +201,7 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
this.paragraphText = null;
this.config = null;
this.gui = null;
this.runners = null;
}
public String getParagraphId() {
@ -311,6 +324,30 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
}
}
public String getRunners() {
return this.runners;
}
public RemoteInterpreterContext setRunners(String runners) {
this.runners = runners;
return this;
}
public void unsetRunners() {
this.runners = null;
}
/** Returns true if field runners is set (has been assigned a value) and false otherwise */
public boolean isSetRunners() {
return this.runners != null;
}
public void setRunnersIsSet(boolean value) {
if (!value) {
this.runners = null;
}
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case PARAGRAPH_ID:
@ -353,6 +390,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
}
break;
case RUNNERS:
if (value == null) {
unsetRunners();
} else {
setRunners((String)value);
}
break;
}
}
@ -373,6 +418,9 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
case GUI:
return getGui();
case RUNNERS:
return getRunners();
}
throw new IllegalStateException();
}
@ -394,6 +442,8 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return isSetConfig();
case GUI:
return isSetGui();
case RUNNERS:
return isSetRunners();
}
throw new IllegalStateException();
}
@ -456,6 +506,15 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return false;
}
boolean this_present_runners = true && this.isSetRunners();
boolean that_present_runners = true && that.isSetRunners();
if (this_present_runners || that_present_runners) {
if (!(this_present_runners && that_present_runners))
return false;
if (!this.runners.equals(that.runners))
return false;
}
return true;
}
@ -522,6 +581,16 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetRunners()).compareTo(typedOther.isSetRunners());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetRunners()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.runners, typedOther.runners);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}
@ -581,6 +650,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
sb.append(this.gui);
}
first = false;
if (!first) sb.append(", ");
sb.append("runners:");
if (this.runners == null) {
sb.append("null");
} else {
sb.append(this.runners);
}
first = false;
sb.append(")");
return sb.toString();
}
@ -664,6 +741,14 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 6: // RUNNERS
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.runners = iprot.readString();
struct.setRunnersIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@ -704,6 +789,11 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
oprot.writeString(struct.gui);
oprot.writeFieldEnd();
}
if (struct.runners != null) {
oprot.writeFieldBegin(RUNNERS_FIELD_DESC);
oprot.writeString(struct.runners);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@ -737,7 +827,10 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
if (struct.isSetGui()) {
optionals.set(4);
}
oprot.writeBitSet(optionals, 5);
if (struct.isSetRunners()) {
optionals.set(5);
}
oprot.writeBitSet(optionals, 6);
if (struct.isSetParagraphId()) {
oprot.writeString(struct.paragraphId);
}
@ -753,12 +846,15 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
if (struct.isSetGui()) {
oprot.writeString(struct.gui);
}
if (struct.isSetRunners()) {
oprot.writeString(struct.runners);
}
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterContext struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(5);
BitSet incoming = iprot.readBitSet(6);
if (incoming.get(0)) {
struct.paragraphId = iprot.readString();
struct.setParagraphIdIsSet(true);
@ -779,6 +875,10 @@ public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteI
struct.gui = iprot.readString();
struct.setGuiIsSet(true);
}
if (incoming.get(5)) {
struct.runners = iprot.readString();
struct.setRunnersIsSet(true);
}
}
}

View file

@ -15,7 +15,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
NO_OP(1),
ANGULAR_OBJECT_ADD(2),
ANGULAR_OBJECT_UPDATE(3),
ANGULAR_OBJECT_REMOVE(4);
ANGULAR_OBJECT_REMOVE(4),
RUN_INTERPRETER_CONTEXT_RUNNER(5);
private final int value;
@ -44,6 +45,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
return ANGULAR_OBJECT_UPDATE;
case 4:
return ANGULAR_OBJECT_REMOVE;
case 5:
return RUN_INTERPRETER_CONTEXT_RUNNER;
default:
return null;
}

View file

@ -24,7 +24,8 @@ struct RemoteInterpreterContext {
2: string paragraphTitle,
3: string paragraphText,
4: string config, // json serialized config
5: string gui // json serialized gui
5: string gui, // json serialized gui
6: string runners // json serialized runner
}
struct RemoteInterpreterResult {
@ -39,7 +40,8 @@ enum RemoteInterpreterEventType {
NO_OP = 1,
ANGULAR_OBJECT_ADD = 2,
ANGULAR_OBJECT_UPDATE = 3,
ANGULAR_OBJECT_REMOVE = 4
ANGULAR_OBJECT_REMOVE = 4,
RUN_INTERPRETER_CONTEXT_RUNNER = 5
}
struct RemoteInterpreterEvent {

View file

@ -21,21 +21,22 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterAngular;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
private InterpreterGroup intpGroup;
@ -79,7 +80,8 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>());
intp.open();
}

View file

@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse;
import java.util.HashMap;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.junit.Test;
@ -32,7 +31,8 @@ public class RemoteInterpreterProcessTest {
@Test
public void testStartStop() {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
new InterpreterContextRunnerPool());
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
assertEquals(1, rip.reference(intpGroup));
@ -47,7 +47,8 @@ public class RemoteInterpreterProcessTest {
@Test
public void testClientFactory() throws Exception {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
new InterpreterContextRunnerPool());
rip.reference(intpGroup);
assertEquals(0, rip.getNumActiveClient());
assertEquals(0, rip.getNumIdleClient());

View file

@ -33,10 +33,9 @@ import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB;
import org.apache.zeppelin.scheduler.Job;
@ -111,7 +110,8 @@ public class RemoteInterpreterTest {
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
intpB.open();
assertEquals(2, process.referenceCount());
@ -162,7 +162,8 @@ public class RemoteInterpreterTest {
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
assertEquals("500", ret.message());
ret = intpB.interpret("500",
@ -172,7 +173,8 @@ public class RemoteInterpreterTest {
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
assertEquals("1000", ret.message());
long end = System.currentTimeMillis();
assertTrue(end - start >= 1000);
@ -236,7 +238,8 @@ public class RemoteInterpreterTest {
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
}
@Override
@ -268,7 +271,8 @@ public class RemoteInterpreterTest {
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
}
@Override
@ -340,7 +344,8 @@ public class RemoteInterpreterTest {
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
synchronized (results) {
results.add(ret.message());
@ -421,7 +426,8 @@ public class RemoteInterpreterTest {
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
synchronized (results) {
results.add(ret.message());

View file

@ -21,18 +21,17 @@ import static org.junit.Assert.assertEquals;
import java.io.File;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Properties;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -95,7 +94,8 @@ public class RemoteSchedulerTest {
"text",
new HashMap<String, Object>(),
new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null)));
new AngularObjectRegistry(intpGroup.getId(), null),
new LinkedList<InterpreterContextRunner>()));
return "1000";
}

View file

@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@ -36,13 +35,14 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.utility.IdHashes;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.Job.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -126,7 +126,7 @@ public class Note implements Serializable, JobListener {
* @param p
*/
public Paragraph addParagraph() {
Paragraph p = new Paragraph(this, replLoader);
Paragraph p = new Paragraph(this, this, replLoader);
synchronized (paragraphs) {
paragraphs.add(p);
}
@ -140,7 +140,7 @@ public class Note implements Serializable, JobListener {
* @param p
*/
public Paragraph insertParagraph(int index) {
Paragraph p = new Paragraph(this, replLoader);
Paragraph p = new Paragraph(this, this, replLoader);
synchronized (paragraphs) {
paragraphs.add(index, p);
}
@ -342,6 +342,8 @@ public class Note implements Serializable, JobListener {
note.setReplLoader(replLoader);
note.jobListenerFactory = jobListenerFactory;
for (Paragraph p : note.paragraphs) {
p.setNote(note);
if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
p.setStatus(Status.ABORT);
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.notebook;
import java.io.Serializable;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@ -27,10 +28,11 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.Interpreter.FormType;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.JobListener;
import org.slf4j.Logger;
@ -44,14 +46,16 @@ import org.slf4j.LoggerFactory;
public class Paragraph extends Job implements Serializable {
private static final transient long serialVersionUID = -6328572073497992016L;
private transient NoteInterpreterLoader replLoader;
private transient Note note;
String title;
String text;
private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc
public final GUI settings; // form and parameter settings
public Paragraph(JobListener listener, NoteInterpreterLoader replLoader) {
public Paragraph(Note note, JobListener listener, NoteInterpreterLoader replLoader) {
super(generateId(), listener);
this.note = note;
this.replLoader = replLoader;
title = null;
text = null;
@ -81,6 +85,14 @@ public class Paragraph extends Job implements Serializable {
this.title = title;
}
public void setNote(Note note) {
this.note = note;
}
public Note getNote() {
return note;
}
public String getRequiredReplName() {
return getRequiredReplName(text);
}
@ -216,15 +228,36 @@ public class Paragraph extends Job implements Serializable {
registry = intpGroup.getInterpreterGroup().getAngularObjectRegistry();
}
List<InterpreterContextRunner> runners = new LinkedList<InterpreterContextRunner>();
for (Paragraph p : note.getParagraphs()) {
runners.add(new ParagraphRunner(note, note.id(), p.getId()));
}
InterpreterContext interpreterContext = new InterpreterContext(getId(),
this.getTitle(),
this.getText(),
this.getConfig(),
this.settings,
registry);
registry,
runners);
return interpreterContext;
}
static class ParagraphRunner extends InterpreterContextRunner {
private Note note;
public ParagraphRunner(Note note, String noteId, String paragraphId) {
super(noteId, paragraphId);
this.note = note;
}
@Override
public void run() {
note.run(getParagraphId());
}
}
private Logger logger() {
Logger logger = LoggerFactory.getLogger(Paragraph.class);
return logger;

View file

@ -55,7 +55,7 @@ public class InterpreterFactoryTest {
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
conf = new ZeppelinConfiguration();
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
context = new InterpreterContext("id", "title", "text", null, null, null);
context = new InterpreterContext("id", "title", "text", null, null, null, null);
}