Multiple results

This commit is contained in:
Lee moon soo 2016-11-15 08:45:48 -08:00
parent f4dc390619
commit 95b6037425
53 changed files with 1873 additions and 808 deletions

View file

@ -104,7 +104,7 @@ public class BigQueryInterpreterTest {
InterpreterResult ret = bqInterpreter.interpret(CONSTANTS.getOne(), context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.type(), InterpreterResult.Type.TABLE);
assertEquals(ret.message().get(0).getType(), InterpreterResult.Type.TABLE);
}

View file

@ -165,8 +165,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("ID\tNAME\na\ta_name\nb\tb_name\n", interpreterResult.message().get(0).getData());
}
@Test
@ -186,8 +186,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
assertEquals("ID\tNAME\nc\tnull\n", interpreterResult.message());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("ID\tNAME\nc\tnull\n", interpreterResult.message().get(0).getData());
}
@ -209,8 +209,8 @@ public class JDBCInterpreterTest extends BasicJDBCTestCaseAdapter {
InterpreterResult interpreterResult = t.interpret(sqlQuery, interpreterContext);
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.type());
assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message());
assertEquals(InterpreterResult.Type.TABLE, interpreterResult.message().get(0).getType());
assertEquals("ID\tNAME\na\ta_name\n", interpreterResult.message().get(0).getData());
}
@Test

View file

@ -90,37 +90,37 @@ public class PigQueryInterpreterTest {
+ "a2 = load 'invalid_path' as (name, gender, age);\n"
+ "dump a;";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)"));
assertTrue(result.message().get(0).getData().contains("(andy,male,10)\n(peter,male,20)\n(amy,female,14)"));
// run single line query in PigQueryInterpreter
String query = "foreach a generate name, age;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("name\tage\nandy\t10\npeter\t20\namy\t14\n", result.message());
assertEquals("name\tage\nandy\t10\npeter\t20\namy\t14\n", result.message().get(0).getData());
// run multiple line query in PigQueryInterpreter
query = "b = group a by gender;\nforeach b generate group as gender, COUNT($1) as count;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("gender\tcount\nmale\t2\nfemale\t1\n", result.message());
assertEquals("gender\tcount\nmale\t2\nfemale\t1\n", result.message().get(0).getData());
// syntax error in PigQueryInterpereter
query = "b = group a by invalid_column;\nforeach b generate group as gender, COUNT($1) as count;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertTrue(result.message().contains("Projected field [invalid_column] does not exist in schema"));
assertTrue(result.message().get(0).getData().contains("Projected field [invalid_column] does not exist in schema"));
// execution error in PigQueryInterpreter
query = "foreach a2 generate name, age;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertTrue(result.message().contains("Input path does not exist"));
assertTrue(result.message().get(0).getData().contains("Input path does not exist"));
}
@Test
@ -137,7 +137,7 @@ public class PigQueryInterpreterTest {
// run script in PigInterpreter
String pigscript = "a = load '" + tmpFile.getAbsolutePath() + "' as (id, name);";
InterpreterResult result = pigInterpreter.interpret(pigscript, context);
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// empty output
assertTrue(result.message().isEmpty());
@ -145,9 +145,9 @@ public class PigQueryInterpreterTest {
// run single line query in PigQueryInterpreter
String query = "foreach a generate id;";
result = pigQueryInterpreter.interpret(query, context);
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertTrue(result.message().contains("id\n0\n1\n2"));
assertTrue(result.message().contains("Results are limited by 20"));
assertTrue(result.message().get(0).getData().contains("id\n0\n1\n2"));
assertTrue(result.message().get(0).getData().contains("Results are limited by 20"));
}
}

View file

@ -79,22 +79,18 @@ public class PythonInterpreterMatplotlibTest {
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(
new InterpreterOutputListener() {
@Override public void onAppend(InterpreterOutput out, byte[] line) {}
@Override public void onUpdate(InterpreterOutput out, byte[] output) {}
}));
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
}
@Test
public void dependenciesAreInstalled() {
// matplotlib
InterpreterResult ret = python.interpret("import matplotlib", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
// inline backend
ret = python.interpret("import backend_zinline", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
@ -106,8 +102,8 @@ public class PythonInterpreterMatplotlibTest {
ret = python.interpret("plt.plot([1, 2, 3])", context);
ret = python.interpret("plt.show()", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message(), Type.HTML, ret.type());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.HTML, ret.message().get(0).getType());
assertTrue(ret.message().contains("data:image/png;base64"));
assertTrue(ret.message().contains("<div>"));
}
@ -128,9 +124,9 @@ public class PythonInterpreterMatplotlibTest {
// of FigureManager, causing show() to return before setting the output
// type to HTML.
ret = python.interpret("plt.show()", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message(), Type.TEXT, ret.type());
assertTrue(ret.message().equals(""));
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.TEXT, ret.message().get(0).getType());
assertTrue(ret.message().get(0).getData().equals(""));
// Now test that new plot is drawn. It should be identical to the
// previous one.
@ -155,8 +151,8 @@ public class PythonInterpreterMatplotlibTest {
// of FigureManager, causing show() to set the output
// type to HTML even though the figure is inactive.
ret = python.interpret("plt.show()", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message(), Type.HTML, ret.type());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.HTML, ret.message().get(0).getType());
assertTrue(ret.message().equals(""));
// Now test that plot can be reshown if it is updated. It should be

View file

@ -81,11 +81,7 @@ public class PythonInterpreterPandasSqlTest {
context = new InterpreterContext("note", "id", "title", "text", new AuthenticationInfo(),
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null), null,
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(
new InterpreterOutputListener() {
@Override public void onAppend(InterpreterOutput out, byte[] line) {}
@Override public void onUpdate(InterpreterOutput out, byte[] output) {}
}));
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
//important to be last step
sql.open();
@ -95,7 +91,7 @@ public class PythonInterpreterPandasSqlTest {
@Test
public void dependenciesAreInstalled() {
InterpreterResult ret = python.interpret("import pandas\nimport pandasql\nimport numpy\n", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
}
@Test
@ -105,14 +101,14 @@ public class PythonInterpreterPandasSqlTest {
ret = python.interpret(
"pysqldf = lambda q: print('Can not execute SQL as Python dependency is not installed')",
context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
// when
ret = sql.interpret("SELECT * from something", context);
// then
assertNotNull(ret);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertTrue(ret.message().contains("dependency is not installed"));
}
@ -126,14 +122,14 @@ public class PythonInterpreterPandasSqlTest {
// DataFrame df2 \w test data
ret = python.interpret("df2 = pd.DataFrame({ 'age' : np.array([33, 51, 51, 34]), "+
"'name' : pd.Categorical(['moon','jobs','gates','park'])})", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
//when
ret = sql.interpret("select name, age from df2 where age < 40", context);
//then
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message(), Type.TABLE, ret.type());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
//assertEquals(expectedTable, ret.message()); //somehow it's same but not equal
assertTrue(ret.message().indexOf("moon\t33") > 0);
assertTrue(ret.message().indexOf("park\t34") > 0);
@ -150,7 +146,7 @@ public class PythonInterpreterPandasSqlTest {
assertNotNull("Interpreter returned 'null'", ret);
//System.out.println("\nInterpreter response: \n" + ret.message());
assertEquals(ret.toString(), InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().length() > 0);
assertTrue(ret.message().get(0).getData().length() > 0);
}
@Test
@ -163,17 +159,17 @@ public class PythonInterpreterPandasSqlTest {
ret = python.interpret("index = pd.Index([10, 11, 12, 13], name='index_name')", context);
ret = python.interpret("d1 = {1 : [np.nan, 1, 2, 3], 'two' : [3., 4., 5., 6.7]}", context);
ret = python.interpret("df1 = pd.DataFrame(d1, index=index)", context);
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
// when
ret = python.interpret("z.show(df1, show_index=True)", context);
// then
assertEquals(ret.message(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message(), Type.TABLE, ret.type());
assertTrue(ret.message().indexOf("index_name") == 0);
assertTrue(ret.message().indexOf("13") > 0);
assertTrue(ret.message().indexOf("nan") > 0);
assertTrue(ret.message().indexOf("6.7") > 0);
assertEquals(ret.message().get(0).getData(), InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(ret.message().get(0).getData(), Type.TABLE, ret.message().get(0).getType());
assertTrue(ret.message().get(0).getData().indexOf("index_name") == 0);
assertTrue(ret.message().get(0).getData().indexOf("13") > 0);
assertTrue(ret.message().get(0).getData().indexOf("nan") > 0);
assertTrue(ret.message().get(0).getData().indexOf("6.7") > 0);
}
}

View file

@ -158,7 +158,7 @@ public class PythonInterpreterTest {
cmdHistory = "";
InterpreterResult result = pythonInterpreter.interpret("print a", null);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("%text print a", result.toString());
assertEquals("%text print a", result.message().get(0).toString());
}
/**
@ -230,13 +230,13 @@ public class PythonInterpreterTest {
assertNotNull("Interpreter result for raise exception is Null", ret);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().length() > 0);
assertTrue(ret.message().get(0).getData().length() > 0);
assertNotNull("Interpreter result for text is Null", ret);
String codePrintText = "print (\"Exception(\\\"test exception\\\")\")";
ret = pythonInterpreter.interpret(codePrintText, null);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertTrue(ret.message().length() > 0);
assertTrue(ret.message().get(0).getData().length() > 0);
}
}

View file

@ -55,7 +55,7 @@ public class PythonInterpreterWithPythonInstalledTest {
assertNotNull("Interpreter returned 'null'", ret);
//System.out.println("\nInterpreter response: \n" + ret.message());
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().length() > 0);
assertTrue(ret.message().get(0).getData().length() > 0);
realPython.close();
}
@ -74,7 +74,7 @@ public class PythonInterpreterWithPythonInstalledTest {
assertNotNull("Interpreter returned 'null'", ret);
//System.out.println("\nInterpreter response: \n" + ret.message());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertTrue(ret.message().length() > 0);
assertTrue(ret.message().get(0).getData().length() > 0);
realPython.close();
}

View file

@ -45,14 +45,8 @@ import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
@ -341,10 +335,10 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
String errorMessage = "";
List<InterpreterResultMessage> errorMessage;
try {
context.out.flush();
errorMessage = new String(context.out.toByteArray());
errorMessage = context.out.toInterpreterResultMessage();
} catch (IOException e) {
throw new InterpreterException(e);
}
@ -352,18 +346,22 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
if (pythonscriptRunning == false) {
// python script failed to initialize and terminated
return new InterpreterResult(Code.ERROR, "failed to start pyspark"
+ errorMessage);
errorMessage.add(new InterpreterResultMessage(
InterpreterResult.Type.TEXT, "failed to start pyspark"));
return new InterpreterResult(Code.ERROR, errorMessage);
}
if (pythonScriptInitialized == false) {
// timeout. didn't get initialized message
return new InterpreterResult(Code.ERROR, "pyspark is not responding "
+ errorMessage);
errorMessage.add(new InterpreterResultMessage(
InterpreterResult.Type.TEXT, "pyspark is not responding"));
return new InterpreterResult(Code.ERROR, errorMessage);
}
if (!sparkInterpreter.getSparkVersion().isPysparkSupported()) {
return new InterpreterResult(Code.ERROR, "pyspark "
+ sparkInterpreter.getSparkContext().version() + " is not supported");
errorMessage.add(new InterpreterResultMessage(
InterpreterResult.Type.TEXT,
"pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported"));
return new InterpreterResult(Code.ERROR, errorMessage);
}
String jobGroup = sparkInterpreter.getJobGroup(context);
ZeppelinContext z = sparkInterpreter.getZeppelinContext();
@ -459,10 +457,8 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
if (statementError) {
return new LinkedList<>();
}
InterpreterResult completionResult;
completionResult = new InterpreterResult(Code.SUCCESS, statementOutput);
Gson gson = new Gson();
completionList = gson.fromJson(completionResult.message(), String[].class);
completionList = gson.fromJson(statementOutput, String[].class);
}
//end code for completion

View file

@ -22,7 +22,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -158,16 +158,7 @@ public class ZeppelinR implements ExecuteResultHandler {
Map env = EnvironmentUtils.getProcEnvironment();
initialOutput = new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
logger.debug(new String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
});
initialOutput = new InterpreterOutput(null);
outputStream.setInterpreterOutput(initialOutput);
executor.execute(cmd, env, this);
rScriptRunning = true;

View file

@ -18,13 +18,7 @@
package org.apache.zeppelin.spark;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.user.AuthenticationInfo;
@ -107,17 +101,7 @@ public class PySparkInterpreterTest {
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}));
new InterpreterOutput(null));
}
@After

View file

@ -99,17 +99,7 @@ public class SparkInterpreterTest {
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(),
new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}));
new InterpreterOutput(null));
}
@After
@ -138,7 +128,7 @@ public class SparkInterpreterTest {
// when interpret incomplete expression
InterpreterResult incomplete = repl.interpret("val a = \"\"\"", context);
assertEquals(InterpreterResult.Code.INCOMPLETE, incomplete.code());
assertTrue(incomplete.message().length() > 0); // expecting some error
assertTrue(incomplete.message().get(0).getData().length() > 0); // expecting some error
// message
/*

View file

@ -79,17 +79,7 @@ public class SparkSqlInterpreterTest {
new HashMap<String, Object>(), new GUI(),
new AngularObjectRegistry(intpGroup.getId(), null),
new LocalResourcePool("id"),
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
}
}));
new LinkedList<InterpreterContextRunner>(), new InterpreterOutput(null));
}
@After
@ -112,12 +102,12 @@ public class SparkSqlInterpreterTest {
InterpreterResult ret = sql.interpret("select name, age from test where age < 40", context);
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(Type.TABLE, ret.type());
assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
assertEquals(Type.TABLE, ret.message().get(0).getType());
assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message().get(0).getData());
ret = sql.interpret("select wrong syntax", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().length() > 0);
assertTrue(ret.message().get(0).getData().length() > 0);
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code());
}
@ -173,7 +163,7 @@ public class SparkSqlInterpreterTest {
"select name, age from people where name = 'gates'", context);
System.err.println("RET=" + ret.message());
assertEquals(InterpreterResult.Code.SUCCESS, ret.code());
assertEquals(Type.TABLE, ret.type());
assertEquals("name\tage\ngates\tnull\n", ret.message());
assertEquals(Type.TABLE, ret.message().get(0).getType());
assertEquals("name\tage\ngates\tnull\n", ret.message().get(0).getData());
}
}

View file

@ -38,15 +38,7 @@ trait AbstractAngularElemTest
new AngularObjectRegistry(intpGroup.getId(), null),
null,
new util.LinkedList[InterpreterContextRunner](),
new InterpreterOutput(new InterpreterOutputListener() {
override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {
// nothing to do
}
override def onUpdate(out: InterpreterOutput, output: Array[Byte]): Unit = {
// nothing to do
}
}))
new InterpreterOutput(null));
InterpreterContext.set(context)
super.beforeEach() // To be stackable, must call super.beforeEach

View file

@ -34,15 +34,7 @@ with BeforeAndAfter with BeforeAndAfterEach with Eventually with Matchers {
intpGroup.getId(), null),
null,
new java.util.LinkedList[InterpreterContextRunner](),
new InterpreterOutput(new InterpreterOutputListener() {
override def onAppend(out: InterpreterOutput, line: Array[Byte]): Unit = {
// nothing to do
}
override def onUpdate(out: InterpreterOutput, output: Array[Byte]): Unit = {
// nothing to do
}
}))
new InterpreterOutput(null));
InterpreterContext.set(context)
super.beforeEach() // To be stackable, must call super.beforeEach

View file

@ -35,87 +35,196 @@ import java.util.concurrent.ConcurrentHashMap;
public class InterpreterOutput extends OutputStream {
Logger logger = LoggerFactory.getLogger(InterpreterOutput.class);
private final int NEW_LINE_CHAR = '\n';
private List<InterpreterResultMessageOutput> resultMessageOutputs = new LinkedList<>();
private InterpreterResultMessageOutput currentOut;
private List<String> resourceSearchPaths = Collections.synchronizedList(new LinkedList<String>());
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private final List<Object> outList = new LinkedList<>();
private InterpreterOutputChangeWatcher watcher;
private final InterpreterOutputListener flushListener;
private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
private boolean firstWrite = true;
private final InterpreterOutputChangeListener changeListener;
public InterpreterOutput(InterpreterOutputListener flushListener) {
this.flushListener = flushListener;
changeListener = null;
clear();
}
public InterpreterOutput(InterpreterOutputListener flushListener,
InterpreterOutputChangeListener listener) throws IOException {
this.flushListener = flushListener;
this.changeListener = listener;
clear();
watcher = new InterpreterOutputChangeWatcher(listener);
watcher.start();
}
public InterpreterResult.Type getType() {
return type;
public void setType(InterpreterResult.Type type) throws IOException {
InterpreterResultMessageOutput out = null;
synchronized (resultMessageOutputs) {
int index = resultMessageOutputs.size();
InterpreterResultMessageOutputListener listener =
createInterpreterResultMessageOutputListener(index);
if (changeListener == null) {
out = new InterpreterResultMessageOutput(type, listener);
} else {
out = new InterpreterResultMessageOutput(type, listener, changeListener);
}
out.setResourceSearchPaths(resourceSearchPaths);
buffer.reset();
if (currentOut != null) {
currentOut.flush();
}
resultMessageOutputs.add(out);
currentOut = out;
}
}
public void setType(InterpreterResult.Type type) {
if (this.type != type) {
clear();
this.type = type;
public InterpreterResultMessageOutputListener createInterpreterResultMessageOutputListener(
final int index) {
return new InterpreterResultMessageOutputListener() {
final int idx = index;
@Override
public void onAppend(InterpreterResultMessageOutput out, byte[] line) {
if (flushListener != null) {
flushListener.onAppend(idx, out, line);
}
}
@Override
public void onUpdate(InterpreterResultMessageOutput out) {
if (flushListener != null) {
flushListener.onUpdate(idx, out);
}
}
};
}
public InterpreterResultMessageOutput getCurrentOutput() {
synchronized (resultMessageOutputs) {
return currentOut;
}
}
public InterpreterResultMessageOutput getOutputAt(int index) {
synchronized (resultMessageOutputs) {
return resultMessageOutputs.get(index);
}
}
public int size() {
synchronized (resultMessageOutputs) {
return resultMessageOutputs.size();
}
}
public void clear() {
synchronized (outList) {
type = InterpreterResult.Type.TEXT;
buffer.reset();
outList.clear();
if (watcher != null) {
watcher.clear();
buffer.reset();
synchronized (resultMessageOutputs) {
for (InterpreterResultMessageOutput out : resultMessageOutputs) {
out.clear();
try {
out.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
flushListener.onUpdate(this, new byte[]{});
// clear all ResultMessages
resultMessageOutputs.clear();
currentOut = null;
startOfTheNewLine = true;
firstCharIsPercentSign = false;
updateAllResultMessages();
}
}
private void updateAllResultMessages() {
if (flushListener != null) {
flushListener.onUpdateAll(this);
}
}
int previousChar = 0;
boolean startOfTheNewLine = true;
boolean firstCharIsPercentSign = false;
@Override
public void write(int b) throws IOException {
synchronized (outList) {
buffer.write(b);
if (b == NEW_LINE_CHAR) {
// first time use of this outputstream.
if (firstWrite) {
// clear the output on gui
flushListener.onUpdate(this, new byte[]{});
firstWrite = false;
}
InterpreterResultMessageOutput out;
flush();
synchronized (resultMessageOutputs) {
if (startOfTheNewLine) {
startOfTheNewLine = false;
if (b == '%') {
firstCharIsPercentSign = true;
buffer.write(b);
previousChar = b;
return;
}
} else if (b == NEW_LINE_CHAR) {
currentOut = getCurrentOutput();
if (currentOut != null && currentOut.getType() == InterpreterResult.Type.TABLE) {
if (previousChar == NEW_LINE_CHAR) {
startOfTheNewLine = true;
return;
}
} else {
startOfTheNewLine = true;
}
}
boolean flushBuffer = false;
if (firstCharIsPercentSign) {
if (b == ' ' || b == NEW_LINE_CHAR || b == '\t') {
firstCharIsPercentSign = false;
String displaySystem = buffer.toString();
for (InterpreterResult.Type type : InterpreterResult.Type.values()) {
if (displaySystem.equals('%' + type.name().toLowerCase())) {
// new type detected
setType(type);
previousChar = b;
return;
}
}
// not a defined display system
flushBuffer = true;
} else {
buffer.write(b);
previousChar = b;
return;
}
}
out = getCurrentOutputForWriting();
if (flushBuffer) {
out.write(buffer.toByteArray());
buffer.reset();
}
out.write(b);
previousChar = b;
}
}
private byte [] detectTypeFromLine(byte [] byteArray) {
// check output type directive
String line = new String(byteArray);
for (InterpreterResult.Type t : InterpreterResult.Type.values()) {
String typeString = '%' + t.name().toLowerCase();
if ((typeString + "\n").equals(line)) {
setType(t);
byteArray = null;
break;
} else if (line.startsWith(typeString + " ")) {
setType(t);
byteArray = line.substring(typeString.length() + 1).getBytes();
break;
private InterpreterResultMessageOutput getCurrentOutputForWriting() throws IOException {
synchronized (resultMessageOutputs) {
InterpreterResultMessageOutput out = getCurrentOutput();
if (out == null) {
// add text type result message
setType(InterpreterResult.Type.TEXT);
out = getCurrentOutput();
}
return out;
}
return byteArray;
}
@Override
@ -125,10 +234,8 @@ public class InterpreterOutput extends OutputStream {
@Override
public void write(byte [] b, int off, int len) throws IOException {
synchronized (outList) {
for (int i = off; i < len; i++) {
write(b[i]);
}
for (int i = off; i < len; i++) {
write(b[i]);
}
}
@ -138,10 +245,8 @@ public class InterpreterOutput extends OutputStream {
* @throws IOException
*/
public void write(File file) throws IOException {
outList.add(file);
if (watcher != null) {
watcher.watch(file);
}
InterpreterResultMessageOutput out = getCurrentOutputForWriting();
out.write(file);
}
public void write(String string) throws IOException {
@ -154,7 +259,8 @@ public class InterpreterOutput extends OutputStream {
* @throws IOException
*/
public void write(URL url) throws IOException {
outList.add(url);
InterpreterResultMessageOutput out = getCurrentOutputForWriting();
out.write(url);
}
public void addResourceSearchPath(String path) {
@ -162,99 +268,44 @@ public class InterpreterOutput extends OutputStream {
}
public void writeResource(String resourceName) throws IOException {
// search file under provided paths first, for dev mode
for (String path : resourceSearchPaths) {
File res = new File(path + "/" + resourceName);
if (res.isFile()) {
write(res);
return;
InterpreterResultMessageOutput out = getCurrentOutputForWriting();
out.write(resourceName);
}
public List<InterpreterResultMessage> toInterpreterResultMessage() throws IOException {
List<InterpreterResultMessage> list = new LinkedList<>();
synchronized (resultMessageOutputs) {
for (InterpreterResultMessageOutput out : resultMessageOutputs) {
list.add(out.toInterpreterResultMessage());
}
}
return list;
}
// search from classpath
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = this.getClass().getClassLoader();
public void flush() throws IOException {
InterpreterResultMessageOutput out = getCurrentOutput();
if (out != null) {
out.flush();
}
if (cl == null) {
cl = ClassLoader.getSystemClassLoader();
}
write(cl.getResource(resourceName));
}
public byte[] toByteArray() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
List<Object> all = new LinkedList<>();
synchronized (outList) {
all.addAll(outList);
}
for (Object o : all) {
if (o instanceof File) {
File f = (File) o;
FileInputStream fin = new FileInputStream(f);
copyStream(fin, out);
fin.close();
} else if (o instanceof byte[]) {
out.write((byte[]) o);
} else if (o instanceof Integer) {
out.write((int) o);
} else if (o instanceof URL) {
InputStream fin = ((URL) o).openStream();
copyStream(fin, out);
fin.close();
} else {
// can not handle the object
synchronized (resultMessageOutputs) {
for (InterpreterResultMessageOutput m : resultMessageOutputs) {
out.write(m.toByteArray());
}
}
out.close();
return out.toByteArray();
}
private boolean typeShouldBeDetected() {
return getType() == InterpreterResult.Type.TABLE ? false : true;
}
public void flush() throws IOException {
synchronized (outList) {
buffer.flush();
byte[] bytes = buffer.toByteArray();
if (typeShouldBeDetected()) {
bytes = detectTypeFromLine(bytes);
}
if (bytes != null) {
outList.add(bytes);
if (type == InterpreterResult.Type.TEXT) {
flushListener.onAppend(this, bytes);
}
}
buffer.reset();
}
}
private void copyStream(InputStream in, OutputStream out) throws IOException {
int bufferSize = 8192;
byte[] buffer = new byte[bufferSize];
while (true) {
int bytesRead = in.read(buffer);
if (bytesRead == -1) {
break;
} else {
out.write(buffer, 0, bytesRead);
}
}
}
@Override
public void close() throws IOException {
flush();
if (watcher != null) {
watcher.clear();
watcher.shutdown();
synchronized (resultMessageOutputs) {
for (InterpreterResultMessageOutput out : resultMessageOutputs) {
out.close();
}
}
}
}

View file

@ -20,15 +20,28 @@ package org.apache.zeppelin.interpreter;
* Listen InterpreterOutput buffer flush
*/
public interface InterpreterOutputListener {
/**
* update all message outputs
*/
public void onUpdateAll(InterpreterOutput out);
/**
* When more update/append are expected
*/
public void onClose(InterpreterOutput out);
/**
* called when newline is detected
* @param index
* @param out
* @param line
*/
public void onAppend(InterpreterOutput out, byte[] line);
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line);
/**
* when entire output is updated. eg) after detecting new display system
* @param output
* @param index
* @param out
*/
public void onUpdate(InterpreterOutput out, byte[] output);
public void onUpdate(int index, InterpreterResultMessageOutput out);
}

View file

@ -17,15 +17,19 @@
package org.apache.zeppelin.interpreter;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* Interpreter result template.
*/
public class InterpreterResult implements Serializable {
transient Logger logger = LoggerFactory.getLogger(InterpreterResult.class);
/**
* Type of result after code execution.
*/
@ -50,104 +54,70 @@ public class InterpreterResult implements Serializable {
}
Code code;
Type type;
String msg;
List<InterpreterResultMessage> msg = new LinkedList<>();
public InterpreterResult(Code code) {
this.code = code;
this.msg = null;
this.type = Type.TEXT;
}
public InterpreterResult(Code code, List<InterpreterResultMessage> msgs) {
this.code = code;
msg.addAll(msgs);
}
public InterpreterResult(Code code, String msg) {
this.code = code;
this.msg = getData(msg);
this.type = getType(msg);
add(msg);
}
public InterpreterResult(Code code, Type type, String msg) {
this.code = code;
this.msg = msg;
this.type = type;
add(type, msg);
}
/**
* Magic is like %html %text.
*
* Automatically detect %[display_system] directives
* @param msg
* @return
*/
private String getData(String msg) {
if (msg == null) {
return null;
}
Type[] types = type.values();
TreeMap<Integer, Type> typesLastIndexInMsg = buildIndexMap(msg);
if (typesLastIndexInMsg.size() == 0) {
return msg;
} else {
Map.Entry<Integer, Type> lastType = typesLastIndexInMsg.firstEntry();
//add 1 for the % char
int magicLength = lastType.getValue().name().length() + 1;
// 1 for the last \n or space after magic
int subStringPos = magicLength + lastType.getKey() + 1;
return msg.substring(subStringPos);
public void add(String msg) {
InterpreterOutput out = new InterpreterOutput(null);
try {
out.write(msg);
out.flush();
this.msg.addAll(out.toInterpreterResultMessage());
out.close();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
private Type getType(String msg) {
if (msg == null) {
return Type.TEXT;
}
Type[] types = type.values();
TreeMap<Integer, Type> typesLastIndexInMsg = buildIndexMap(msg);
if (typesLastIndexInMsg.size() == 0) {
return Type.TEXT;
} else {
Map.Entry<Integer, Type> lastType = typesLastIndexInMsg.firstEntry();
return lastType.getValue();
}
}
private int getIndexOfType(String msg, Type t) {
if (msg == null) {
return 0;
}
String typeString = "%" + t.name().toLowerCase();
return StringUtils.indexOf(msg, typeString );
}
private TreeMap<Integer, Type> buildIndexMap(String msg) {
int lastIndexOftypes = 0;
TreeMap<Integer, Type> typesLastIndexInMsg = new TreeMap<>();
Type[] types = Type.values();
for (Type t : types) {
lastIndexOftypes = getIndexOfType(msg, t);
if (lastIndexOftypes >= 0) {
typesLastIndexInMsg.put(lastIndexOftypes, t);
}
}
return typesLastIndexInMsg;
public void add(Type type, String data) {
msg.add(new InterpreterResultMessage(type, data));
}
public Code code() {
return code;
}
public String message() {
public List<InterpreterResultMessage> message() {
return msg;
}
public Type type() {
return type;
}
public InterpreterResult type(Type type) {
this.type = type;
return this;
}
public String toString() {
return "%" + type.name().toLowerCase() + " " + msg;
StringBuilder sb = new StringBuilder();
Type prevType = null;
for (InterpreterResultMessage m : msg) {
if (prevType != null) {
sb.append("\n");
if (prevType == Type.TABLE) {
sb.append("\n");
}
}
sb.append(m.toString());
prevType = m.getType();
}
return sb.toString();
}
}

View file

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
/**
* Interpreter result message
*/
public class InterpreterResultMessage {
InterpreterResult.Type type;
String data;
public InterpreterResultMessage(InterpreterResult.Type type, String data) {
this.type = type;
this.data = data;
}
public InterpreterResult.Type getType() {
return type;
}
public String getData() {
return data;
}
public String toString() {
return "%" + type.name().toLowerCase() + " " + data;
}
}

View file

@ -0,0 +1,250 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
/**
* InterpreterMessageOutputStream
*/
public class InterpreterResultMessageOutput extends OutputStream {
Logger logger = LoggerFactory.getLogger(InterpreterResultMessageOutput.class);
private final int NEW_LINE_CHAR = '\n';
private List<String> resourceSearchPaths;
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private final List<Object> outList = new LinkedList<>();
private InterpreterOutputChangeWatcher watcher;
private final InterpreterResultMessageOutputListener flushListener;
private InterpreterResult.Type type = InterpreterResult.Type.TEXT;
private boolean firstWrite = true;
public InterpreterResultMessageOutput(
InterpreterResult.Type type,
InterpreterResultMessageOutputListener listener) {
this.type = type;
this.flushListener = listener;
}
public InterpreterResultMessageOutput(
InterpreterResult.Type type,
InterpreterResultMessageOutputListener flushListener,
InterpreterOutputChangeListener listener) throws IOException {
this.type = type;
this.flushListener = flushListener;
watcher = new InterpreterOutputChangeWatcher(listener);
watcher.start();
}
public InterpreterResult.Type getType() {
return type;
}
public void setType(InterpreterResult.Type type) {
if (this.type != type) {
clear();
this.type = type;
}
}
public void clear() {
synchronized (outList) {
buffer.reset();
outList.clear();
if (watcher != null) {
watcher.clear();
}
if (flushListener != null) {
flushListener.onUpdate(this);
}
}
}
@Override
public void write(int b) throws IOException {
synchronized (outList) {
buffer.write(b);
if (b == NEW_LINE_CHAR) {
// first time use of this outputstream.
if (firstWrite) {
// clear the output on gui
if (flushListener != null) {
flushListener.onUpdate(this);
}
firstWrite = false;
}
flush();
}
}
}
@Override
public void write(byte [] b) throws IOException {
write(b, 0, b.length);
}
@Override
public void write(byte [] b, int off, int len) throws IOException {
synchronized (outList) {
for (int i = off; i < len; i++) {
write(b[i]);
}
}
}
/**
* In dev mode, it monitors file and update ZeppelinServer
* @param file
* @throws IOException
*/
public void write(File file) throws IOException {
outList.add(file);
if (watcher != null) {
watcher.watch(file);
}
}
public void write(String string) throws IOException {
write(string.getBytes());
}
/**
* write contents in the resource file in the classpath
* @param url
* @throws IOException
*/
public void write(URL url) throws IOException {
outList.add(url);
}
public void setResourceSearchPaths(List<String> resourceSearchPaths) {
resourceSearchPaths = resourceSearchPaths;
}
public void writeResource(String resourceName) throws IOException {
// search file under provided paths first, for dev mode
for (String path : resourceSearchPaths) {
File res = new File(path + "/" + resourceName);
if (res.isFile()) {
write(res);
return;
}
}
// search from classpath
ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (cl == null) {
cl = this.getClass().getClassLoader();
}
if (cl == null) {
cl = ClassLoader.getSystemClassLoader();
}
write(cl.getResource(resourceName));
}
public byte[] toByteArray() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
List<Object> all = new LinkedList<>();
synchronized (outList) {
all.addAll(outList);
}
for (Object o : all) {
if (o instanceof File) {
File f = (File) o;
FileInputStream fin = new FileInputStream(f);
copyStream(fin, out);
fin.close();
} else if (o instanceof byte[]) {
out.write((byte[]) o);
} else if (o instanceof Integer) {
out.write((int) o);
} else if (o instanceof URL) {
InputStream fin = ((URL) o).openStream();
copyStream(fin, out);
fin.close();
} else {
// can not handle the object
}
}
out.close();
return out.toByteArray();
}
public InterpreterResultMessage toInterpreterResultMessage() throws IOException {
return new InterpreterResultMessage(type, new String(toByteArray()));
}
public void flush() throws IOException {
synchronized (outList) {
buffer.flush();
byte[] bytes = buffer.toByteArray();
if (bytes != null && bytes.length > 0) {
outList.add(bytes);
if (type == InterpreterResult.Type.TEXT) {
if (flushListener != null) {
flushListener.onAppend(this, bytes);
}
}
}
buffer.reset();
}
}
private void copyStream(InputStream in, OutputStream out) throws IOException {
int bufferSize = 8192;
byte[] buffer = new byte[bufferSize];
while (true) {
int bytesRead = in.read(buffer);
if (bytesRead == -1) {
break;
} else {
out.write(buffer, 0, bytesRead);
}
}
}
@Override
public void close() throws IOException {
flush();
if (watcher != null) {
watcher.clear();
watcher.shutdown();
}
}
public String toString() {
try {
return "%" + type.name().toLowerCase() + " " + new String(toByteArray());
} catch (IOException e) {
logger.error(e.getMessage(), e);
return "%" + type.name().toLowerCase() + "\n";
}
}
}

View file

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
/**
* InterpreterResultMessage update events
*/
public interface InterpreterResultMessageOutputListener {
/**
* called when newline is detected
* @param line
*/
public void onAppend(InterpreterResultMessageOutput out, byte[] line);
/**
* when entire output is updated. eg) after detecting new display system
*/
public void onUpdate(InterpreterResultMessageOutput out);
}

View file

@ -116,9 +116,12 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
String resultJson = gson.toJson(result);
StringBuffer transferResult = new StringBuffer();
transferResult.append("$z.result = " + resultJson + ";\n");
// TODO
/*
if (result.type() == InterpreterResult.Type.TABLE) {
transferResult.append("$z.scope.loadTableData($z.result);\n");
}
*/
transferResult.append("$z.scope._devmodeResult = $z.result;\n");
app.printStringAsJavascript(transferResult.toString());
}
@ -143,14 +146,31 @@ public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
try {
out = new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
public void onUpdateAll(InterpreterOutput out) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
public void onClose(InterpreterOutput out) {
}
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
eventClient.onInterpreterOutputAppend(noteId, paragraphId,
index, out.getType(), new String(line));
}
@Override
public void onUpdate(int index, InterpreterResultMessageOutput out) {
try {
eventClient.onInterpreterOutputUpdate(noteId, paragraphId,
index, out.getType(), new String(out.toByteArray()));
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}, this);
} catch (IOException e) {
return null;

View file

@ -72,13 +72,29 @@ public class ZeppelinDevServer extends
try {
out = new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
public void onUpdateAll(InterpreterOutput out) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
public void onClose(InterpreterOutput out) {
}
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
eventClient.onInterpreterOutputAppend(noteId, paragraphId,
index, out.getType(), new String(line));
}
@Override
public void onUpdate(int index, InterpreterResultMessageOutput out) {
try {
eventClient.onInterpreterOutputUpdate(noteId, paragraphId,
index, out.getType(), new String(out.toByteArray()));
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
}, this);
} catch (IOException e) {

View file

@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -503,10 +504,14 @@ public class RemoteInterpreter extends Interpreter {
}
private InterpreterResult convert(RemoteInterpreterResult result) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(result.getCode()),
Type.valueOf(result.getType()),
result.getMsg());
InterpreterResult r = new InterpreterResult(
InterpreterResult.Code.valueOf(result.getCode()));
for (RemoteInterpreterResultMessage m : result.getMsg()) {
r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
}
return r;
}
/**

View file

@ -19,6 +19,8 @@ package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.resource.*;
@ -212,10 +214,14 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
}
}
public void onInterpreterOutputAppend(String noteId, String paragraphId, String output) {
Map<String, String> appendOutput = new HashMap<>();
public void onInterpreterOutputAppend(
String noteId, String paragraphId, int outputIndex,
InterpreterResult.Type type, String output) {
Map<String, Object> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("index", outputIndex);
appendOutput.put("type", type.name());
appendOutput.put("data", output);
sendEvent(new RemoteInterpreterEvent(
@ -223,10 +229,14 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
gson.toJson(appendOutput)));
}
public void onInterpreterOutputUpdate(String noteId, String paragraphId, String output) {
Map<String, String> appendOutput = new HashMap<>();
public void onInterpreterOutputUpdate(
String noteId, String paragraphId, int outputIndex,
InterpreterResult.Type type, String output) {
Map<String, Object> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("index", outputIndex);
appendOutput.put("type", type.name());
appendOutput.put("data", output);
sendEvent(new RemoteInterpreterEvent(
@ -234,6 +244,29 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
gson.toJson(appendOutput)));
}
public void onInterpreterOutputUpdateAll(
String noteId, String paragraphId, List<InterpreterResultMessage> messages) {
Map<String, Object> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("messages", messages);
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.OUTPUT_UPDATE_ALL,
gson.toJson(appendOutput)));
}
public void onInterpreterOutputClose(
String noteId, String paragraphId, List<InterpreterResultMessage> messages) {
Map<String, Object> appendOutput = new HashMap<>();
appendOutput.put("noteId", noteId);
appendOutput.put("paragraphId", paragraphId);
appendOutput.put("messages", messages);
sendEvent(new RemoteInterpreterEvent(
RemoteInterpreterEventType.OUTPUT_CLOSE,
gson.toJson(appendOutput)));
}
private void sendEvent(RemoteInterpreterEvent event) {
synchronized (eventQueue) {

View file

@ -25,6 +25,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
@ -158,12 +159,16 @@ public class RemoteInterpreterEventPoller extends Thread {
sendResourceResponseGet(resourceId, o);
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
// on output append
Map<String, String> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, String>>() {}.getType());
String noteId = outputAppend.get("noteId");
String paragraphId = outputAppend.get("paragraphId");
String outputToAppend = outputAppend.get("data");
String appId = outputAppend.get("appId");
Map<String, Object> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
String noteId = (String) outputAppend.get("noteId");
String paragraphId = (String) outputAppend.get("paragraphId");
int index = (int) outputAppend.get("index");
InterpreterResult.Type type =
InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
String outputToAppend = (String) outputAppend.get("data");
String appId = (String) outputAppend.get("appId");
if (appId == null) {
runner.appendBuffer(noteId, paragraphId, outputToAppend);
@ -172,12 +177,15 @@ public class RemoteInterpreterEventPoller extends Thread {
}
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
// on output update
Map<String, String> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, String>>() {}.getType());
String noteId = outputAppend.get("noteId");
String paragraphId = outputAppend.get("paragraphId");
String outputToUpdate = outputAppend.get("data");
String appId = outputAppend.get("appId");
Map<String, Object> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
String noteId = (String) outputAppend.get("noteId");
String paragraphId = (String) outputAppend.get("paragraphId");
int index = (int) outputAppend.get("index");
InterpreterResult.Type type =
InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
String outputToUpdate = (String) outputAppend.get("data");
String appId = (String) outputAppend.get("appId");
if (appId == null) {
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);

View file

@ -447,27 +447,13 @@ public class RemoteInterpreterServer
InterpreterResult result = interpreter.interpret(script, context);
// data from context.out is prepended to InterpreterResult if both defined
String message = "";
context.out.flush();
InterpreterResult.Type outputType = context.out.getType();
byte[] interpreterOutput = context.out.toByteArray();
if (interpreterOutput != null && interpreterOutput.length > 0) {
message = new String(interpreterOutput);
}
String interpreterResultMessage = result.message();
InterpreterResult combinedResult;
if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
message += interpreterResultMessage;
combinedResult = new InterpreterResult(result.code(), result.type(), message);
} else {
combinedResult = new InterpreterResult(result.code(), outputType, message);
}
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
resultMessages.addAll(result.message());
// put result into resource pool
// TODO
/*
if (combinedResult.type() == InterpreterResult.Type.TABLE) {
context.getResourcePool().put(
context.getNoteId(),
@ -475,7 +461,8 @@ public class RemoteInterpreterServer
WellKnownResourceName.ZeppelinTableResult.toString(),
combinedResult);
}
return combinedResult;
*/
return resultMessages;
} finally {
InterpreterContext.remove();
}
@ -560,15 +547,46 @@ public class RemoteInterpreterServer
paragraphId) {
return new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
logger.debug("Output Append:" + new String(line));
eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
public void onUpdateAll(InterpreterOutput out) {
try {
eventClient.onInterpreterOutputUpdateAll(
noteId, paragraphId, out.toInterpreterResultMessage());
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
logger.debug("Output Update:" + new String(output));
eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
public void onClose(InterpreterOutput out) {
// persist data without refreshing front-end
try {
eventClient.onInterpreterOutputClose(
noteId, paragraphId, out.toInterpreterResultMessage());
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
String output = new String(line);
logger.debug("Output Append: {}", output);
eventClient.onInterpreterOutputAppend(
noteId, paragraphId, index, out.getType(), output);
}
@Override
public void onUpdate(int index, InterpreterResultMessageOutput out) {
String output;
try {
output = new String(out.toByteArray());
logger.debug("Output Update: {}", output);
eventClient.onInterpreterOutputUpdate(
noteId, paragraphId, index, out.getType(), output);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
});
}
@ -591,10 +609,17 @@ public class RemoteInterpreterServer
private RemoteInterpreterResult convert(InterpreterResult result,
Map<String, Object> config, GUI gui) {
List<RemoteInterpreterResultMessage> msg = new LinkedList<>();
for (InterpreterResultMessage m : result.message()) {
msg.add(new RemoteInterpreterResultMessage(
m.getType().name(),
m.getData()));
}
return new RemoteInterpreterResult(
result.code().name(),
result.type().name(),
result.message(),
msg,
gson.toJson(config),
gson.toJson(gui));
}
@ -831,17 +856,21 @@ public class RemoteInterpreterServer
protected InterpreterOutput createAppOutput(final String noteId,
final String paragraphId,
final String appId) {
return null;
// TODO
/*
return new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
public void onAppend(InterpreterResultMessageOutput out, byte[] line) {
eventClient.onAppOutputAppend(noteId, paragraphId, appId, new String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
public void onUpdate(InterpreterResultMessageOutput out, byte[] output) {
eventClient.onAppOutputUpdate(noteId, paragraphId, appId, new String(output));
}
});
*/
}
private ApplicationContext getApplicationContext(
@ -925,20 +954,20 @@ public class RemoteInterpreterServer
System.err.println("Resource " + res.get());
}
runningApp.app.run(resource);
String output = new String(context.out.toByteArray());
// TODO
/*
List<InterpreterResultMessage> outputMessage = context.out.toInterpreterResultMessage();
eventClient.onAppOutputUpdate(
context.getNoteId(),
context.getParagraphId(),
applicationInstanceId,
output);
*/
return new RemoteApplicationResult(true, "");
} catch (ApplicationException | IOException e) {
return new RemoteApplicationResult(false, e.getMessage());
}
}
}
private static class RunningApplication {

View file

@ -38,9 +38,11 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
RESOURCE_GET(7),
OUTPUT_APPEND(8),
OUTPUT_UPDATE(9),
ANGULAR_REGISTRY_PUSH(10),
APP_STATUS_UPDATE(11),
META_INFOS(12);
OUTPUT_UPDATE_ALL(10),
OUTPUT_CLOSE(11),
ANGULAR_REGISTRY_PUSH(12),
APP_STATUS_UPDATE(13),
META_INFOS(14);
private final int value;
@ -80,8 +82,12 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
case 9:
return OUTPUT_UPDATE;
case 10:
return ANGULAR_REGISTRY_PUSH;
return OUTPUT_UPDATE_ALL;
case 11:
return OUTPUT_CLOSE;
case 12:
return ANGULAR_REGISTRY_PUSH;
case 13:
return APP_STATUS_UPDATE;
case 12:
return META_INFOS;

View file

@ -56,10 +56,9 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
private static final org.apache.thrift.protocol.TField CODE_FIELD_DESC = new org.apache.thrift.protocol.TField("code", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final org.apache.thrift.protocol.TField CONFIG_FIELD_DESC = new org.apache.thrift.protocol.TField("config", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField GUI_FIELD_DESC = new org.apache.thrift.protocol.TField("gui", org.apache.thrift.protocol.TType.STRING, (short)5);
private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.LIST, (short)2);
private static final org.apache.thrift.protocol.TField CONFIG_FIELD_DESC = new org.apache.thrift.protocol.TField("config", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final org.apache.thrift.protocol.TField GUI_FIELD_DESC = new org.apache.thrift.protocol.TField("gui", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@ -68,18 +67,16 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
}
public String code; // required
public String type; // required
public String msg; // required
public List<RemoteInterpreterResultMessage> msg; // required
public String config; // required
public String gui; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
CODE((short)1, "code"),
TYPE((short)2, "type"),
MSG((short)3, "msg"),
CONFIG((short)4, "config"),
GUI((short)5, "gui");
MSG((short)2, "msg"),
CONFIG((short)3, "config"),
GUI((short)4, "gui");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@ -96,13 +93,11 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
switch(fieldId) {
case 1: // CODE
return CODE;
case 2: // TYPE
return TYPE;
case 3: // MSG
case 2: // MSG
return MSG;
case 4: // CONFIG
case 3: // CONFIG
return CONFIG;
case 5: // GUI
case 4: // GUI
return GUI;
default:
return null;
@ -149,10 +144,9 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.CODE, new org.apache.thrift.meta_data.FieldMetaData("code", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, RemoteInterpreterResultMessage.class))));
tmpMap.put(_Fields.CONFIG, new org.apache.thrift.meta_data.FieldMetaData("config", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.GUI, new org.apache.thrift.meta_data.FieldMetaData("gui", org.apache.thrift.TFieldRequirementType.DEFAULT,
@ -166,14 +160,12 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
public RemoteInterpreterResult(
String code,
String type,
String msg,
List<RemoteInterpreterResultMessage> msg,
String config,
String gui)
{
this();
this.code = code;
this.type = type;
this.msg = msg;
this.config = config;
this.gui = gui;
@ -186,11 +178,12 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
if (other.isSetCode()) {
this.code = other.code;
}
if (other.isSetType()) {
this.type = other.type;
}
if (other.isSetMsg()) {
this.msg = other.msg;
List<RemoteInterpreterResultMessage> __this__msg = new ArrayList<RemoteInterpreterResultMessage>(other.msg.size());
for (RemoteInterpreterResultMessage other_element : other.msg) {
__this__msg.add(new RemoteInterpreterResultMessage(other_element));
}
this.msg = __this__msg;
}
if (other.isSetConfig()) {
this.config = other.config;
@ -207,7 +200,6 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
@Override
public void clear() {
this.code = null;
this.type = null;
this.msg = null;
this.config = null;
this.gui = null;
@ -237,35 +229,26 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
}
}
public String getType() {
return this.type;
public int getMsgSize() {
return (this.msg == null) ? 0 : this.msg.size();
}
public RemoteInterpreterResult setType(String type) {
this.type = type;
return this;
public java.util.Iterator<RemoteInterpreterResultMessage> getMsgIterator() {
return (this.msg == null) ? null : this.msg.iterator();
}
public void unsetType() {
this.type = null;
}
/** Returns true if field type is set (has been assigned a value) and false otherwise */
public boolean isSetType() {
return this.type != null;
}
public void setTypeIsSet(boolean value) {
if (!value) {
this.type = null;
public void addToMsg(RemoteInterpreterResultMessage elem) {
if (this.msg == null) {
this.msg = new ArrayList<RemoteInterpreterResultMessage>();
}
this.msg.add(elem);
}
public String getMsg() {
public List<RemoteInterpreterResultMessage> getMsg() {
return this.msg;
}
public RemoteInterpreterResult setMsg(String msg) {
public RemoteInterpreterResult setMsg(List<RemoteInterpreterResultMessage> msg) {
this.msg = msg;
return this;
}
@ -343,19 +326,11 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
}
break;
case TYPE:
if (value == null) {
unsetType();
} else {
setType((String)value);
}
break;
case MSG:
if (value == null) {
unsetMsg();
} else {
setMsg((String)value);
setMsg((List<RemoteInterpreterResultMessage>)value);
}
break;
@ -383,9 +358,6 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
case CODE:
return getCode();
case TYPE:
return getType();
case MSG:
return getMsg();
@ -408,8 +380,6 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
switch (field) {
case CODE:
return isSetCode();
case TYPE:
return isSetType();
case MSG:
return isSetMsg();
case CONFIG:
@ -442,15 +412,6 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
return false;
}
boolean this_present_type = true && this.isSetType();
boolean that_present_type = true && that.isSetType();
if (this_present_type || that_present_type) {
if (!(this_present_type && that_present_type))
return false;
if (!this.type.equals(that.type))
return false;
}
boolean this_present_msg = true && this.isSetMsg();
boolean that_present_msg = true && that.isSetMsg();
if (this_present_msg || that_present_msg) {
@ -490,11 +451,6 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
if (present_code)
list.add(code);
boolean present_type = true && (isSetType());
list.add(present_type);
if (present_type)
list.add(type);
boolean present_msg = true && (isSetMsg());
list.add(present_msg);
if (present_msg)
@ -531,16 +487,6 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetType()).compareTo(other.isSetType());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetType()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, other.type);
if (lastComparison != 0) {
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetMsg()).compareTo(other.isSetMsg());
if (lastComparison != 0) {
return lastComparison;
@ -599,14 +545,6 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
}
first = false;
if (!first) sb.append(", ");
sb.append("type:");
if (this.type == null) {
sb.append("null");
} else {
sb.append(this.type);
}
first = false;
if (!first) sb.append(", ");
sb.append("msg:");
if (this.msg == null) {
sb.append("null");
@ -681,23 +619,26 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 2: // TYPE
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.type = iprot.readString();
struct.setTypeIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 3: // MSG
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.msg = iprot.readString();
case 2: // MSG
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
struct.msg = new ArrayList<RemoteInterpreterResultMessage>(_list0.size);
RemoteInterpreterResultMessage _elem1;
for (int _i2 = 0; _i2 < _list0.size; ++_i2)
{
_elem1 = new RemoteInterpreterResultMessage();
_elem1.read(iprot);
struct.msg.add(_elem1);
}
iprot.readListEnd();
}
struct.setMsgIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 4: // CONFIG
case 3: // CONFIG
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.config = iprot.readString();
struct.setConfigIsSet(true);
@ -705,7 +646,7 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 5: // GUI
case 4: // GUI
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.gui = iprot.readString();
struct.setGuiIsSet(true);
@ -733,14 +674,16 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
oprot.writeString(struct.code);
oprot.writeFieldEnd();
}
if (struct.type != null) {
oprot.writeFieldBegin(TYPE_FIELD_DESC);
oprot.writeString(struct.type);
oprot.writeFieldEnd();
}
if (struct.msg != null) {
oprot.writeFieldBegin(MSG_FIELD_DESC);
oprot.writeString(struct.msg);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.msg.size()));
for (RemoteInterpreterResultMessage _iter3 : struct.msg)
{
_iter3.write(oprot);
}
oprot.writeListEnd();
}
oprot.writeFieldEnd();
}
if (struct.config != null) {
@ -774,27 +717,27 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
if (struct.isSetCode()) {
optionals.set(0);
}
if (struct.isSetType()) {
if (struct.isSetMsg()) {
optionals.set(1);
}
if (struct.isSetMsg()) {
if (struct.isSetConfig()) {
optionals.set(2);
}
if (struct.isSetConfig()) {
if (struct.isSetGui()) {
optionals.set(3);
}
if (struct.isSetGui()) {
optionals.set(4);
}
oprot.writeBitSet(optionals, 5);
oprot.writeBitSet(optionals, 4);
if (struct.isSetCode()) {
oprot.writeString(struct.code);
}
if (struct.isSetType()) {
oprot.writeString(struct.type);
}
if (struct.isSetMsg()) {
oprot.writeString(struct.msg);
{
oprot.writeI32(struct.msg.size());
for (RemoteInterpreterResultMessage _iter4 : struct.msg)
{
_iter4.write(oprot);
}
}
}
if (struct.isSetConfig()) {
oprot.writeString(struct.config);
@ -807,24 +750,30 @@ public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteIn
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterResult struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(5);
BitSet incoming = iprot.readBitSet(4);
if (incoming.get(0)) {
struct.code = iprot.readString();
struct.setCodeIsSet(true);
}
if (incoming.get(1)) {
struct.type = iprot.readString();
struct.setTypeIsSet(true);
}
if (incoming.get(2)) {
struct.msg = iprot.readString();
{
org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
struct.msg = new ArrayList<RemoteInterpreterResultMessage>(_list5.size);
RemoteInterpreterResultMessage _elem6;
for (int _i7 = 0; _i7 < _list5.size; ++_i7)
{
_elem6 = new RemoteInterpreterResultMessage();
_elem6.read(iprot);
struct.msg.add(_elem6);
}
}
struct.setMsgIsSet(true);
}
if (incoming.get(3)) {
if (incoming.get(2)) {
struct.config = iprot.readString();
struct.setConfigIsSet(true);
}
if (incoming.get(4)) {
if (incoming.get(3)) {
struct.gui = iprot.readString();
struct.setGuiIsSet(true);
}

View file

@ -0,0 +1,520 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
package org.apache.zeppelin.interpreter.thrift;
import org.apache.thrift.scheme.IScheme;
import org.apache.thrift.scheme.SchemeFactory;
import org.apache.thrift.scheme.StandardScheme;
import org.apache.thrift.scheme.TupleScheme;
import org.apache.thrift.protocol.TTupleProtocol;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.EncodingUtils;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.server.AbstractNonblockingServer.*;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.EnumMap;
import java.util.Set;
import java.util.HashSet;
import java.util.EnumSet;
import java.util.Collections;
import java.util.BitSet;
import java.nio.ByteBuffer;
import java.util.Arrays;
import javax.annotation.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase<RemoteInterpreterResultMessage, RemoteInterpreterResultMessage._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResultMessage> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage");
private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
schemes.put(StandardScheme.class, new RemoteInterpreterResultMessageStandardSchemeFactory());
schemes.put(TupleScheme.class, new RemoteInterpreterResultMessageTupleSchemeFactory());
}
public String type; // required
public String data; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
TYPE((short)1, "type"),
DATA((short)2, "data");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
static {
for (_Fields field : EnumSet.allOf(_Fields.class)) {
byName.put(field.getFieldName(), field);
}
}
/**
* Find the _Fields constant that matches fieldId, or null if its not found.
*/
public static _Fields findByThriftId(int fieldId) {
switch(fieldId) {
case 1: // TYPE
return TYPE;
case 2: // DATA
return DATA;
default:
return null;
}
}
/**
* Find the _Fields constant that matches fieldId, throwing an exception
* if it is not found.
*/
public static _Fields findByThriftIdOrThrow(int fieldId) {
_Fields fields = findByThriftId(fieldId);
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
return fields;
}
/**
* Find the _Fields constant that matches name, or null if its not found.
*/
public static _Fields findByName(String name) {
return byName.get(name);
}
private final short _thriftId;
private final String _fieldName;
_Fields(short thriftId, String fieldName) {
_thriftId = thriftId;
_fieldName = fieldName;
}
public short getThriftFieldId() {
return _thriftId;
}
public String getFieldName() {
return _fieldName;
}
}
// isset id assignments
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteInterpreterResultMessage.class, metaDataMap);
}
public RemoteInterpreterResultMessage() {
}
public RemoteInterpreterResultMessage(
String type,
String data)
{
this();
this.type = type;
this.data = data;
}
/**
* Performs a deep copy on <i>other</i>.
*/
public RemoteInterpreterResultMessage(RemoteInterpreterResultMessage other) {
if (other.isSetType()) {
this.type = other.type;
}
if (other.isSetData()) {
this.data = other.data;
}
}
public RemoteInterpreterResultMessage deepCopy() {
return new RemoteInterpreterResultMessage(this);
}
@Override
public void clear() {
this.type = null;
this.data = null;
}
public String getType() {
return this.type;
}
public RemoteInterpreterResultMessage setType(String type) {
this.type = type;
return this;
}
public void unsetType() {
this.type = null;
}
/** Returns true if field type is set (has been assigned a value) and false otherwise */
public boolean isSetType() {
return this.type != null;
}
public void setTypeIsSet(boolean value) {
if (!value) {
this.type = null;
}
}
public String getData() {
return this.data;
}
public RemoteInterpreterResultMessage setData(String data) {
this.data = data;
return this;
}
public void unsetData() {
this.data = null;
}
/** Returns true if field data is set (has been assigned a value) and false otherwise */
public boolean isSetData() {
return this.data != null;
}
public void setDataIsSet(boolean value) {
if (!value) {
this.data = null;
}
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case TYPE:
if (value == null) {
unsetType();
} else {
setType((String)value);
}
break;
case DATA:
if (value == null) {
unsetData();
} else {
setData((String)value);
}
break;
}
}
public Object getFieldValue(_Fields field) {
switch (field) {
case TYPE:
return getType();
case DATA:
return getData();
}
throw new IllegalStateException();
}
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
public boolean isSet(_Fields field) {
if (field == null) {
throw new IllegalArgumentException();
}
switch (field) {
case TYPE:
return isSetType();
case DATA:
return isSetData();
}
throw new IllegalStateException();
}
@Override
public boolean equals(Object that) {
if (that == null)
return false;
if (that instanceof RemoteInterpreterResultMessage)
return this.equals((RemoteInterpreterResultMessage)that);
return false;
}
public boolean equals(RemoteInterpreterResultMessage that) {
if (that == null)
return false;
boolean this_present_type = true && this.isSetType();
boolean that_present_type = true && that.isSetType();
if (this_present_type || that_present_type) {
if (!(this_present_type && that_present_type))
return false;
if (!this.type.equals(that.type))
return false;
}
boolean this_present_data = true && this.isSetData();
boolean that_present_data = true && that.isSetData();
if (this_present_data || that_present_data) {
if (!(this_present_data && that_present_data))
return false;
if (!this.data.equals(that.data))
return false;
}
return true;
}
@Override
public int hashCode() {
List<Object> list = new ArrayList<Object>();
boolean present_type = true && (isSetType());
list.add(present_type);
if (present_type)
list.add(type);
boolean present_data = true && (isSetData());
list.add(present_data);
if (present_data)
list.add(data);
return list.hashCode();
}
@Override
public int compareTo(RemoteInterpreterResultMessage other) {
if (!getClass().equals(other.getClass())) {
return getClass().getName().compareTo(other.getClass().getName());
}
int lastComparison = 0;
lastComparison = Boolean.valueOf(isSetType()).compareTo(other.isSetType());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetType()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, other.type);
if (lastComparison != 0) {
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetData()).compareTo(other.isSetData());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetData()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.data, other.data);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}
public _Fields fieldForId(int fieldId) {
return _Fields.findByThriftId(fieldId);
}
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("RemoteInterpreterResultMessage(");
boolean first = true;
sb.append("type:");
if (this.type == null) {
sb.append("null");
} else {
sb.append(this.type);
}
first = false;
if (!first) sb.append(", ");
sb.append("data:");
if (this.data == null) {
sb.append("null");
} else {
sb.append(this.data);
}
first = false;
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
// check for sub-struct validity
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
}
}
private static class RemoteInterpreterResultMessageStandardSchemeFactory implements SchemeFactory {
public RemoteInterpreterResultMessageStandardScheme getScheme() {
return new RemoteInterpreterResultMessageStandardScheme();
}
}
private static class RemoteInterpreterResultMessageStandardScheme extends StandardScheme<RemoteInterpreterResultMessage> {
public void read(org.apache.thrift.protocol.TProtocol iprot, RemoteInterpreterResultMessage struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
{
schemeField = iprot.readFieldBegin();
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
break;
}
switch (schemeField.id) {
case 1: // TYPE
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.type = iprot.readString();
struct.setTypeIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 2: // DATA
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.data = iprot.readString();
struct.setDataIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
iprot.readFieldEnd();
}
iprot.readStructEnd();
// check for required fields of primitive type, which can't be checked in the validate method
struct.validate();
}
public void write(org.apache.thrift.protocol.TProtocol oprot, RemoteInterpreterResultMessage struct) throws org.apache.thrift.TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
if (struct.type != null) {
oprot.writeFieldBegin(TYPE_FIELD_DESC);
oprot.writeString(struct.type);
oprot.writeFieldEnd();
}
if (struct.data != null) {
oprot.writeFieldBegin(DATA_FIELD_DESC);
oprot.writeString(struct.data);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
}
private static class RemoteInterpreterResultMessageTupleSchemeFactory implements SchemeFactory {
public RemoteInterpreterResultMessageTupleScheme getScheme() {
return new RemoteInterpreterResultMessageTupleScheme();
}
}
private static class RemoteInterpreterResultMessageTupleScheme extends TupleScheme<RemoteInterpreterResultMessage> {
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterResultMessage struct) throws org.apache.thrift.TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
BitSet optionals = new BitSet();
if (struct.isSetType()) {
optionals.set(0);
}
if (struct.isSetData()) {
optionals.set(1);
}
oprot.writeBitSet(optionals, 2);
if (struct.isSetType()) {
oprot.writeString(struct.type);
}
if (struct.isSetData()) {
oprot.writeString(struct.data);
}
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterResultMessage struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
struct.type = iprot.readString();
struct.setTypeIsSet(true);
}
if (incoming.get(1)) {
struct.data = iprot.readString();
struct.setDataIsSet(true);
}
}
}
}

View file

@ -3830,15 +3830,15 @@ public class RemoteInterpreterService {
case 4: // PROPERTIES
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
org.apache.thrift.protocol.TMap _map0 = iprot.readMapBegin();
struct.properties = new HashMap<String,String>(2*_map0.size);
String _key1;
String _val2;
for (int _i3 = 0; _i3 < _map0.size; ++_i3)
org.apache.thrift.protocol.TMap _map8 = iprot.readMapBegin();
struct.properties = new HashMap<String,String>(2*_map8.size);
String _key9;
String _val10;
for (int _i11 = 0; _i11 < _map8.size; ++_i11)
{
_key1 = iprot.readString();
_val2 = iprot.readString();
struct.properties.put(_key1, _val2);
_key9 = iprot.readString();
_val10 = iprot.readString();
struct.properties.put(_key9, _val10);
}
iprot.readMapEnd();
}
@ -3881,10 +3881,10 @@ public class RemoteInterpreterService {
oprot.writeFieldBegin(PROPERTIES_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.properties.size()));
for (Map.Entry<String, String> _iter4 : struct.properties.entrySet())
for (Map.Entry<String, String> _iter12 : struct.properties.entrySet())
{
oprot.writeString(_iter4.getKey());
oprot.writeString(_iter4.getValue());
oprot.writeString(_iter12.getKey());
oprot.writeString(_iter12.getValue());
}
oprot.writeMapEnd();
}
@ -3933,10 +3933,10 @@ public class RemoteInterpreterService {
if (struct.isSetProperties()) {
{
oprot.writeI32(struct.properties.size());
for (Map.Entry<String, String> _iter5 : struct.properties.entrySet())
for (Map.Entry<String, String> _iter13 : struct.properties.entrySet())
{
oprot.writeString(_iter5.getKey());
oprot.writeString(_iter5.getValue());
oprot.writeString(_iter13.getKey());
oprot.writeString(_iter13.getValue());
}
}
}
@ -3960,15 +3960,15 @@ public class RemoteInterpreterService {
}
if (incoming.get(3)) {
{
org.apache.thrift.protocol.TMap _map6 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
struct.properties = new HashMap<String,String>(2*_map6.size);
String _key7;
String _val8;
for (int _i9 = 0; _i9 < _map6.size; ++_i9)
org.apache.thrift.protocol.TMap _map14 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
struct.properties = new HashMap<String,String>(2*_map14.size);
String _key15;
String _val16;
for (int _i17 = 0; _i17 < _map14.size; ++_i17)
{
_key7 = iprot.readString();
_val8 = iprot.readString();
struct.properties.put(_key7, _val8);
_key15 = iprot.readString();
_val16 = iprot.readString();
struct.properties.put(_key15, _val16);
}
}
struct.setPropertiesIsSet(true);
@ -10279,14 +10279,14 @@ public class RemoteInterpreterService {
case 0: // SUCCESS
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
org.apache.thrift.protocol.TList _list10 = iprot.readListBegin();
struct.success = new ArrayList<InterpreterCompletion>(_list10.size);
InterpreterCompletion _elem11;
for (int _i12 = 0; _i12 < _list10.size; ++_i12)
org.apache.thrift.protocol.TList _list18 = iprot.readListBegin();
struct.success = new ArrayList<InterpreterCompletion>(_list18.size);
InterpreterCompletion _elem19;
for (int _i20 = 0; _i20 < _list18.size; ++_i20)
{
_elem11 = new InterpreterCompletion();
_elem11.read(iprot);
struct.success.add(_elem11);
_elem19 = new InterpreterCompletion();
_elem19.read(iprot);
struct.success.add(_elem19);
}
iprot.readListEnd();
}
@ -10314,9 +10314,9 @@ public class RemoteInterpreterService {
oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.success.size()));
for (InterpreterCompletion _iter13 : struct.success)
for (InterpreterCompletion _iter21 : struct.success)
{
_iter13.write(oprot);
_iter21.write(oprot);
}
oprot.writeListEnd();
}
@ -10347,9 +10347,9 @@ public class RemoteInterpreterService {
if (struct.isSetSuccess()) {
{
oprot.writeI32(struct.success.size());
for (InterpreterCompletion _iter14 : struct.success)
for (InterpreterCompletion _iter22 : struct.success)
{
_iter14.write(oprot);
_iter22.write(oprot);
}
}
}
@ -10361,14 +10361,14 @@ public class RemoteInterpreterService {
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TList _list15 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
struct.success = new ArrayList<InterpreterCompletion>(_list15.size);
InterpreterCompletion _elem16;
for (int _i17 = 0; _i17 < _list15.size; ++_i17)
org.apache.thrift.protocol.TList _list23 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
struct.success = new ArrayList<InterpreterCompletion>(_list23.size);
InterpreterCompletion _elem24;
for (int _i25 = 0; _i25 < _list23.size; ++_i25)
{
_elem16 = new InterpreterCompletion();
_elem16.read(iprot);
struct.success.add(_elem16);
_elem24 = new InterpreterCompletion();
_elem24.read(iprot);
struct.success.add(_elem24);
}
}
struct.setSuccessIsSet(true);
@ -12628,13 +12628,13 @@ public class RemoteInterpreterService {
case 1: // RESOURCES
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
org.apache.thrift.protocol.TList _list18 = iprot.readListBegin();
struct.resources = new ArrayList<String>(_list18.size);
String _elem19;
for (int _i20 = 0; _i20 < _list18.size; ++_i20)
org.apache.thrift.protocol.TList _list26 = iprot.readListBegin();
struct.resources = new ArrayList<String>(_list26.size);
String _elem27;
for (int _i28 = 0; _i28 < _list26.size; ++_i28)
{
_elem19 = iprot.readString();
struct.resources.add(_elem19);
_elem27 = iprot.readString();
struct.resources.add(_elem27);
}
iprot.readListEnd();
}
@ -12662,9 +12662,9 @@ public class RemoteInterpreterService {
oprot.writeFieldBegin(RESOURCES_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.resources.size()));
for (String _iter21 : struct.resources)
for (String _iter29 : struct.resources)
{
oprot.writeString(_iter21);
oprot.writeString(_iter29);
}
oprot.writeListEnd();
}
@ -12695,9 +12695,9 @@ public class RemoteInterpreterService {
if (struct.isSetResources()) {
{
oprot.writeI32(struct.resources.size());
for (String _iter22 : struct.resources)
for (String _iter30 : struct.resources)
{
oprot.writeString(_iter22);
oprot.writeString(_iter30);
}
}
}
@ -12709,13 +12709,13 @@ public class RemoteInterpreterService {
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TList _list23 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
struct.resources = new ArrayList<String>(_list23.size);
String _elem24;
for (int _i25 = 0; _i25 < _list23.size; ++_i25)
org.apache.thrift.protocol.TList _list31 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
struct.resources = new ArrayList<String>(_list31.size);
String _elem32;
for (int _i33 = 0; _i33 < _list31.size; ++_i33)
{
_elem24 = iprot.readString();
struct.resources.add(_elem24);
_elem32 = iprot.readString();
struct.resources.add(_elem32);
}
}
struct.setResourcesIsSet(true);
@ -14258,13 +14258,13 @@ public class RemoteInterpreterService {
case 0: // SUCCESS
if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
{
org.apache.thrift.protocol.TList _list26 = iprot.readListBegin();
struct.success = new ArrayList<String>(_list26.size);
String _elem27;
for (int _i28 = 0; _i28 < _list26.size; ++_i28)
org.apache.thrift.protocol.TList _list34 = iprot.readListBegin();
struct.success = new ArrayList<String>(_list34.size);
String _elem35;
for (int _i36 = 0; _i36 < _list34.size; ++_i36)
{
_elem27 = iprot.readString();
struct.success.add(_elem27);
_elem35 = iprot.readString();
struct.success.add(_elem35);
}
iprot.readListEnd();
}
@ -14292,9 +14292,9 @@ public class RemoteInterpreterService {
oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.success.size()));
for (String _iter29 : struct.success)
for (String _iter37 : struct.success)
{
oprot.writeString(_iter29);
oprot.writeString(_iter37);
}
oprot.writeListEnd();
}
@ -14325,9 +14325,9 @@ public class RemoteInterpreterService {
if (struct.isSetSuccess()) {
{
oprot.writeI32(struct.success.size());
for (String _iter30 : struct.success)
for (String _iter38 : struct.success)
{
oprot.writeString(_iter30);
oprot.writeString(_iter38);
}
}
}
@ -14339,13 +14339,13 @@ public class RemoteInterpreterService {
BitSet incoming = iprot.readBitSet(1);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TList _list31 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
struct.success = new ArrayList<String>(_list31.size);
String _elem32;
for (int _i33 = 0; _i33 < _list31.size; ++_i33)
org.apache.thrift.protocol.TList _list39 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, iprot.readI32());
struct.success = new ArrayList<String>(_list39.size);
String _elem40;
for (int _i41 = 0; _i41 < _list39.size; ++_i41)
{
_elem32 = iprot.readString();
struct.success.add(_elem32);
_elem40 = iprot.readString();
struct.success.add(_elem40);
}
}
struct.setSuccessIsSet(true);

View file

@ -30,12 +30,15 @@ struct RemoteInterpreterContext {
8: string runners // json serialized runner
}
struct RemoteInterpreterResultMessage {
1: string type,
2: string data
}
struct RemoteInterpreterResult {
1: string code,
2: string type,
3: string msg,
4: string config, // json serialized config
5: string gui // json serialized gui
2: list<RemoteInterpreterResultMessage> msg,
3: string config, // json serialized config
4: string gui // json serialized gui
}
enum RemoteInterpreterEventType {
@ -48,9 +51,11 @@ enum RemoteInterpreterEventType {
RESOURCE_GET = 7
OUTPUT_APPEND = 8,
OUTPUT_UPDATE = 9,
ANGULAR_REGISTRY_PUSH = 10,
APP_STATUS_UPDATE = 11,
META_INFOS = 12
OUTPUT_UPDATE_ALL = 10,
OUTPUT_CLOSE = 11,
ANGULAR_REGISTRY_PUSH = 12,
APP_STATUS_UPDATE = 13,
META_INFOS = 14
}
struct RemoteInterpreterEvent {

View file

@ -21,6 +21,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.junit.After;
import org.junit.Before;
@ -90,12 +91,22 @@ public class ApplicationLoaderTest {
null,
new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
public void onUpdateAll(InterpreterOutput out) {
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
public void onClose(InterpreterOutput out) {
}
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
}
@Override
public void onUpdate(int index, InterpreterResultMessageOutput out) {
}
}));

View file

@ -45,12 +45,14 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
@Test
public void testDetectNewline() throws IOException {
out.write("hello\nworld");
assertEquals("hello\n", new String(out.toByteArray()));
assertEquals(1, out.size());
assertEquals(InterpreterResult.Type.TEXT, out.getOutputAt(0).getType());
assertEquals("hello\n", new String(out.getOutputAt(0).toByteArray()));
assertEquals(1, numAppendEvent);
assertEquals(1, numUpdateEvent);
out.write("\n");
assertEquals("hello\nworld\n", new String(out.toByteArray()));
assertEquals("hello\nworld\n", new String(out.getOutputAt(0).toByteArray()));
assertEquals(2, numAppendEvent);
assertEquals(1, numUpdateEvent);
}
@ -58,77 +60,105 @@ public class InterpreterOutputTest implements InterpreterOutputListener {
@Test
public void testFlush() throws IOException {
out.write("hello\nworld");
assertEquals("hello\n", new String(out.toByteArray()));
assertEquals("hello\n", new String(out.getOutputAt(0).toByteArray()));
assertEquals(1, numAppendEvent);
assertEquals(1, numUpdateEvent);
out.flush();
assertEquals("hello\nworld", new String(out.toByteArray()));
assertEquals("hello\nworld", new String(out.getOutputAt(0).toByteArray()));
assertEquals(2, numAppendEvent);
assertEquals(1, numUpdateEvent);
out.clear();
out.write("%html div");
assertEquals("", new String(out.toByteArray()));
assertEquals(InterpreterResult.Type.TEXT, out.getType());
assertEquals("", new String(out.getOutputAt(0).toByteArray()));
assertEquals(InterpreterResult.Type.HTML, out.getOutputAt(0).getType());
out.flush();
out.write("%html div");
assertEquals("div", new String(out.toByteArray()));
assertEquals(InterpreterResult.Type.HTML, out.getType());
assertEquals("div", new String(out.getOutputAt(0).toByteArray()));
}
@Test
public void testType() throws IOException {
// default output stream type is TEXT
out.write("Text\n");
assertEquals(InterpreterResult.Type.TEXT, out.getType());
assertEquals("Text\n", new String(out.toByteArray()));
assertEquals(InterpreterResult.Type.TEXT, out.getOutputAt(0).getType());
assertEquals("Text\n", new String(out.getOutputAt(0).toByteArray()));
assertEquals(1, numAppendEvent);
assertEquals(1, numUpdateEvent);
// change type
out.write("%html\n");
assertEquals(InterpreterResult.Type.HTML, out.getType());
assertEquals("", new String(out.toByteArray()));
assertEquals(InterpreterResult.Type.HTML, out.getOutputAt(1).getType());
assertEquals("", new String(out.getOutputAt(1).toByteArray()));
assertEquals(1, numAppendEvent);
assertEquals(2, numUpdateEvent);
assertEquals(1, numUpdateEvent);
// none TEXT type output stream does not generate append event
out.write("<div>html</div>\n");
assertEquals(InterpreterResult.Type.HTML, out.getType());
assertEquals(InterpreterResult.Type.HTML, out.getOutputAt(1).getType());
assertEquals(1, numAppendEvent);
assertEquals(2, numUpdateEvent);
assertEquals("<div>html</div>\n", new String(out.toByteArray()));
assertEquals("<div>html</div>\n", new String(out.getOutputAt(1).toByteArray()));
// change type to text again
out.write("%text hello\n");
assertEquals(InterpreterResult.Type.TEXT, out.getType());
assertEquals(InterpreterResult.Type.TEXT, out.getOutputAt(2).getType());
assertEquals(2, numAppendEvent);
assertEquals(3, numUpdateEvent);
assertEquals("hello\n", new String(out.toByteArray()));
assertEquals("hello\n", new String(out.getOutputAt(2).toByteArray()));
}
@Test
public void testType2() throws IOException {
public void testChangeTypeInTheBeginning() throws IOException {
out.write("%html\nHello");
assertEquals(InterpreterResult.Type.HTML, out.getType());
assertEquals(InterpreterResult.Type.HTML, out.getOutputAt(0).getType());
}
@Test
public void testChangeTypeWithoutData() throws IOException {
out.write("%html\n%table\n");
assertEquals(InterpreterResult.Type.HTML, out.getOutputAt(0).getType());
assertEquals(InterpreterResult.Type.TABLE, out.getOutputAt(1).getType());
}
@Test
public void testMagicData() throws IOException {
out.write("%table col1\tcol2\n%html <h3> This is a hack </h3>\t234\n".getBytes());
assertEquals(InterpreterResult.Type.TABLE, out.getType());
assertEquals("col1\tcol2\n%html <h3> This is a hack </h3>\t234\n", new String(out.toByteArray()));
out.write("%table col1\tcol2\n\n%html <h3> This is a hack </h3>\t234\n".getBytes());
assertEquals(InterpreterResult.Type.TABLE, out.getOutputAt(0).getType());
assertEquals(InterpreterResult.Type.HTML, out.getOutputAt(1).getType());
assertEquals("col1\tcol2\n", new String(out.getOutputAt(0).toByteArray()));
assertEquals("<h3> This is a hack </h3>\t234\n", new String(out.getOutputAt(1).toByteArray()));
}
@Test
public void testTableCellFormatting() throws IOException {
out.write("%table col1\tcol2\n%html val1\tval2\n".getBytes());
assertEquals(InterpreterResult.Type.TABLE, out.getOutputAt(0).getType());
assertEquals("col1\tcol2\n%html val1\tval2\n", new String(out.getOutputAt(0).toByteArray()));
}
@Override
public void onUpdateAll(InterpreterOutput out) {
}
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
public void onClose(InterpreterOutput out) {
}
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
numAppendEvent++;
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
public void onUpdate(int index, InterpreterResultMessageOutput out) {
numUpdateEvent++;
}
}

View file

@ -29,11 +29,11 @@ public class InterpreterResultTest {
public void testTextType() {
InterpreterResult result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "this is a TEXT type");
assertEquals("No magic", InterpreterResult.Type.TEXT, result.type());
assertEquals("No magic", InterpreterResult.Type.TEXT, result.message().get(0).getType());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%this is a TEXT type");
assertEquals("No magic", InterpreterResult.Type.TEXT, result.type());
assertEquals("No magic", InterpreterResult.Type.TEXT, result.message().get(0).getType());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%\n");
assertEquals("No magic", InterpreterResult.Type.TEXT, result.type());
assertEquals("No magic", InterpreterResult.Type.TEXT, result.message().get(0).getType());
}
@Test
@ -41,24 +41,24 @@ public class InterpreterResultTest {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table\ncol1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.type());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word\n%table col1\tcol2\naaa\t123\n");
assertEquals(InterpreterResult.Type.TABLE, result.message().get(1).getType());
}
public void testComplexMagicType() {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before %table col1\tcol2\naaa\t123\n");
assertEquals("some text before magic return magic", InterpreterResult.Type.TABLE, result.type());
assertEquals("some text before magic return magic", InterpreterResult.Type.TABLE, result.message().get(0).getType());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html <h3> This is a hack </h3> %table\n col1\tcol2\naaa\t123\n");
assertEquals("magic A before magic B return magic A", InterpreterResult.Type.HTML, result.type());
assertEquals("magic A before magic B return magic A", InterpreterResult.Type.HTML, result.message().get(0).getType());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
assertEquals("text & magic A before magic B return magic A", InterpreterResult.Type.TABLE, result.type());
assertEquals("text & magic A before magic B return magic A", InterpreterResult.Type.TABLE, result.message().get(0).getType());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
assertEquals("magic A, magic B, magic a' return magic A", InterpreterResult.Type.TABLE, result.type());
assertEquals("magic A, magic B, magic a' return magic A", InterpreterResult.Type.TABLE, result.message().get(0).getType());
}
@Test
@ -67,11 +67,11 @@ public class InterpreterResultTest {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n");
assertEquals("%table col1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message());
assertEquals("%table col1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message().get(0).getData());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table\ncol1\tcol2\naaa\t123\n");
assertEquals("%table\ncol1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n");
assertEquals("some text before magic word %table col1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message());
assertEquals("%table\ncol1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message().get(0).getData());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word\n%table col1\tcol2\naaa\t123\n");
assertEquals("some text before magic word\n%table col1\tcol2\naaa\t123\n", "col1\tcol2\naaa\t123\n", result.message().get(1).getData());
}
@Test
@ -79,28 +79,19 @@ public class InterpreterResultTest {
InterpreterResult result = null;
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before %table col1\tcol2\naaa\t123\n");
assertEquals("text before %table return %table", "col1\tcol2\naaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html <h3> This is a hack </h3> %table\ncol1\tcol2\naaa\t123\n");
assertEquals("%html before %table return %html", " <h3> This is a hack </h3> %table\n" +
"col1\tcol2\n" +
"aaa\t123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word %table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3>");
assertEquals("text & %table befoe %html return %table", "col1\tcol2\n" +
"aaa\t123\n" +
" %html <h3> This is a hack </h3>", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n %html <h3> This is a hack </h3> %table col1\naaa\n123\n");
assertEquals("%table, %html before %table return first %table", "col1\tcol2\n" +
"aaa\t123\n" +
" %html <h3> This is a hack </h3> %table col1\n" +
"aaa\n" +
"123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n %table col1\naaa\n123\n");
assertEquals("%table before %table return first %table", "col1\tcol2\n" +
"aaa\t123\n" +
" %table col1\n" +
"aaa\n" +
"123\n", result.message());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before\n%table col1\tcol2\naaa\t123\n");
assertEquals("text before %table", "some text before\n", result.message().get(0).getData());
assertEquals("text after %table", "col1\tcol2\naaa\t123\n", result.message().get(1).getData());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%html <h3> This is a hack </h3>\n%table\ncol1\tcol2\naaa\t123\n");
assertEquals(" <h3> This is a hack </h3>\n", result.message().get(0).getData());
assertEquals("col1\tcol2\naaa\t123\n", result.message().get(1).getData());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "some text before magic word\n%table col1\tcol2\naaa\t123\n\n%html <h3> This is a hack </h3>");
assertEquals("<h3> This is a hack </h3>", result.message().get(2).getData());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table col1\tcol2\naaa\t123\n\n%html <h3> This is a hack </h3>\n%table col1\naaa\n123\n");
assertEquals("col1\naaa\n123\n", result.message().get(2).getData());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "%table " + "col1\tcol2\naaa\t123\n\n%table col1\naaa\n123\n");
assertEquals("col1\tcol2\naaa\t123\n", result.message().get(0).getData());
assertEquals("col1\naaa\n123\n", result.message().get(1).getData());
}
@Test

View file

@ -109,21 +109,21 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
public void testAngularObjectInterpreterSideCRUD() throws InterruptedException {
InterpreterResult ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
String[] result = ret.message().split(" ");
String[] result = ret.message().get(0).getData().split(" ");
assertEquals("0", result[0]); // size of registry
assertEquals("0", result[1]); // num watcher called
// create object
ret = intp.interpret("add n1 v1", context);
Thread.sleep(500);
result = ret.message().split(" ");
result = ret.message().get(0).getData().split(" ");
assertEquals("1", result[0]); // size of registry
assertEquals("0", result[1]); // num watcher called
assertEquals("v1", localRegistry.get("n1", "note", null).get());
// update object
ret = intp.interpret("update n1 v11", context);
result = ret.message().split(" ");
result = ret.message().get(0).getData().split(" ");
Thread.sleep(500);
assertEquals("1", result[0]); // size of registry
assertEquals("1", result[1]); // num watcher called
@ -131,7 +131,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
// remove object
ret = intp.interpret("remove n1", context);
result = ret.message().split(" ");
result = ret.message().get(0).getData().split(" ");
Thread.sleep(500);
assertEquals("0", result[0]); // size of registry
assertEquals("1", result[1]); // num watcher called
@ -145,13 +145,13 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
InterpreterResult ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
String[] result = ret.message().split(" ");
String[] result = ret.message().get(0).getData().split(" ");
assertEquals("0", result[0]); // size of registry
// create object
ret = intp.interpret("add n1 v1", context);
Thread.sleep(500);
result = ret.message().split(" ");
result = ret.message().get(0).getData().split(" ");
assertEquals("1", result[0]); // size of registry
assertEquals("v1", localRegistry.get("n1", "note", null).get());
@ -159,7 +159,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
localRegistry.removeAndNotifyRemoteProcess("n1", "note", null);
ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
result = ret.message().split(" ");
result = ret.message().get(0).getData().split(" ");
assertEquals("0", result[0]); // size of registry
}
@ -170,7 +170,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
InterpreterResult ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
String[] result = ret.message().split(" ");
String[] result = ret.message().get(0).getData().split(" ");
assertEquals("0", result[0]); // size of registry
// create object
@ -179,7 +179,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
// get from remote registry
ret = intp.interpret("get", context);
Thread.sleep(500); // waitFor eventpoller pool event
result = ret.message().split(" ");
result = ret.message().get(0).getData().split(" ");
assertEquals("1", result[0]); // size of registry
}

View file

@ -136,15 +136,15 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
RemoteInterpreter intp = createMockInterpreter();
InterpreterResult ret = intp.interpret("SUCCESS:%html hello:", createInterpreterContext());
assertEquals(InterpreterResult.Type.HTML, ret.type());
assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
assertEquals("hello", ret.message());
ret = intp.interpret("SUCCESS:%html\nhello:", createInterpreterContext());
assertEquals(InterpreterResult.Type.HTML, ret.type());
assertEquals(InterpreterResult.Type.HTML, ret.message().get(0).getType());
assertEquals("hello", ret.message());
ret = intp.interpret("SUCCESS:%html hello:%angular world", createInterpreterContext());
assertEquals(InterpreterResult.Type.ANGULAR, ret.type());
assertEquals(InterpreterResult.Type.ANGULAR, ret.message().get(0).getType());
assertEquals("helloworld", ret.message());
}

View file

@ -31,6 +31,7 @@ import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterEnv;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.user.AuthenticationInfo;
@ -392,7 +393,7 @@ public class RemoteInterpreterTest {
intpA.open();
int concurrency = 3;
final List<String> results = new LinkedList<>();
final List<InterpreterResultMessage> results = new LinkedList<>();
Scheduler scheduler = intpA.getScheduler();
for (int i = 0; i < concurrency; i++) {
@ -424,7 +425,7 @@ public class RemoteInterpreterTest {
new LinkedList<InterpreterContextRunner>(), null));
synchronized (results) {
results.add(ret.message());
results.addAll(ret.message());
results.notify();
}
return null;
@ -446,8 +447,8 @@ public class RemoteInterpreterTest {
}
int i = 0;
for (String result : results) {
assertEquals(Integer.toString(i++), result);
for (InterpreterResultMessage result : results) {
assertEquals(Integer.toString(i++), result.getData());
}
assertEquals(concurrency, i);
@ -470,7 +471,7 @@ public class RemoteInterpreterTest {
int concurrency = 4;
final int timeToSleep = 1000;
final List<String> results = new LinkedList<>();
final List<InterpreterResultMessage> results = new LinkedList<>();
long start = System.currentTimeMillis();
Scheduler scheduler = intpA.getScheduler();
@ -504,7 +505,7 @@ public class RemoteInterpreterTest {
new LinkedList<InterpreterContextRunner>(), null));
synchronized (results) {
results.add(ret.message());
results.addAll(ret.message());
results.notify();
}
return stmt;

View file

@ -148,16 +148,16 @@ public class DistributedResourcePoolTest {
intp2.interpret("put key2 value2", context);
ret = intp1.interpret("getAll", context);
assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
ret = intp2.interpret("getAll", context);
assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
assertEquals(2, gson.fromJson(ret.message().get(0).getData(), ResourceSet.class).size());
ret = intp1.interpret("get key1", context);
assertEquals("value1", gson.fromJson(ret.message(), String.class));
assertEquals("value1", gson.fromJson(ret.message().get(0).getData(), String.class));
ret = intp1.interpret("get key2", context);
assertEquals("value2", gson.fromJson(ret.message(), String.class));
assertEquals("value2", gson.fromJson(ret.message().get(0).getData(), String.class));
}
@Test
@ -233,10 +233,10 @@ public class DistributedResourcePoolTest {
// then resources should be removed.
assertEquals(2, ResourcePoolUtils.getAllResources().size());
assertEquals("", gson.fromJson(
intp1.interpret("get note1:paragraph1:key1", context).message(),
intp1.interpret("get note1:paragraph1:key1", context).message().get(0).getData(),
String.class));
assertEquals("", gson.fromJson(
intp1.interpret("get note1:paragraph2:key1", context).message(),
intp1.interpret("get note1:paragraph2:key1", context).message().get(0).getData(),
String.class));
@ -246,7 +246,7 @@ public class DistributedResourcePoolTest {
// then 1
assertEquals(1, ResourcePoolUtils.getAllResources().size());
assertEquals("value2", gson.fromJson(
intp1.interpret("get note2:paragraph2:key2", context).message(),
intp1.interpret("get note2:paragraph2:key2", context).message().get(0).getData(),
String.class));
}

View file

@ -41,10 +41,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@ -1643,11 +1640,11 @@ public class NotebookServer extends WebSocketServlet implements
/**
* This callback is for paragraph that runs on RemoteInterpreterProcess
* @param paragraph
* @param out
* @param idx
* @param output
*/
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputAppend(Paragraph paragraph, int idx, String output) {
Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT)
.put("noteId", paragraph.getNote().getId())
.put("paragraphId", paragraph.getId())
@ -1659,11 +1656,12 @@ public class NotebookServer extends WebSocketServlet implements
/**
* This callback is for paragraph that runs on RemoteInterpreterProcess
* @param paragraph
* @param out
* @param output
* @param idx
* @param result
*/
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage result) {
String output = result.getData();
Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT)
.put("noteId", paragraph.getNote().getId())
.put("paragraphId", paragraph.getId())
@ -1671,6 +1669,11 @@ public class NotebookServer extends WebSocketServlet implements
notebookServer.broadcast(paragraph.getNote().getId(), msg);
}
@Override
public void onOutputUpdateAll(Paragraph paragraph, List<InterpreterResultMessage> msgs) {
// TODO
}
}
@Override

View file

@ -89,7 +89,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("55", p.getResult().message());
assertEquals("55", p.getResult().message().get(0).getData());
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}
@ -111,7 +111,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertTrue(p.getResult().message().contains(
assertTrue(p.getResult().message().get(0).getData().contains(
"Array[org.apache.spark.sql.Row] = Array([hello,20])"));
// test display DataFrame
@ -125,8 +125,8 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TABLE, p.getResult().type());
assertEquals("_1\t_2\nhello\t20\n", p.getResult().message());
assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(0).getType());
assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(0).getData());
// test display DataSet
if (sparkVersion >= 20) {
@ -140,8 +140,8 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TABLE, p.getResult().type());
assertEquals("_1\t_2\nhello\t20\n", p.getResult().message());
assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(0).getType());
assertEquals("_1\t_2\nhello\t20\n", p.getResult().message().get(0).getData());
}
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}
@ -182,7 +182,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
waitForFinish(p);
System.err.println("sparkRTest=" + p.getResult().message());
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[1] 3", p.getResult().message());
assertEquals("[1] 3", p.getResult().message().get(0).getData());
}
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}
@ -205,7 +205,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("55\n", p.getResult().message());
assertEquals("55\n", p.getResult().message().get(0).getData());
if (sparkVersion >= 13) {
// run sqlContext test
p = note.addParagraph();
@ -219,7 +219,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message().get(0).getData());
// test display Dataframe
p = note.addParagraph();
@ -233,9 +233,9 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals(InterpreterResult.Type.TABLE, p.getResult().type());
assertEquals(InterpreterResult.Type.TABLE, p.getResult().message().get(0).getType());
// TODO (zjffdu), one more \n is appended, need to investigate why.
assertEquals("age\tid\n20\t1\n\n", p.getResult().message());
assertEquals("age\tid\n20\t1\n\n", p.getResult().message().get(0).getData());
// test udf
p = note.addParagraph();
@ -248,7 +248,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(len=u'3')]\n", p.getResult().message());
assertEquals("[Row(len=u'3')]\n", p.getResult().message().get(0).getData());
}
if (sparkVersion >= 20) {
// run SparkSession test
@ -263,7 +263,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message());
assertEquals("[Row(age=20, id=1)]\n", p.getResult().message().get(0).getData());
// test udf
p = note.addParagraph();
@ -277,7 +277,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("[Row(len=u'3')]\n", p.getResult().message());
assertEquals("[Row(len=u'3')]\n", p.getResult().message().get(0).getData());
}
}
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
@ -309,7 +309,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
assertEquals("10\n", p.getResult().message());
assertEquals("10\n", p.getResult().message().get(0).getData());
}
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}
@ -344,7 +344,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p2.getId());
waitForFinish(p2);
assertEquals(Status.FINISHED, p2.getStatus());
assertEquals("10", p2.getResult().message());
assertEquals("10", p2.getResult().message().get(0).getData());
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}
@ -399,7 +399,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
waitForFinish(p1);
assertEquals(Status.FINISHED, p1.getStatus());
assertEquals("2\n", p1.getResult().message());
assertEquals("2\n", p1.getResult().message().get(0).getData());
}
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
}
@ -419,7 +419,7 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
note.run(p.getId());
waitForFinish(p);
assertEquals(Status.FINISHED, p.getStatus());
String sparkVersion = p.getResult().message();
String sparkVersion = p.getResult().message().get(0).getData();
System.out.println("Spark version detected " + sparkVersion);
String[] split = sparkVersion.split("\\.");
int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);

View file

@ -52,10 +52,13 @@ limitations under the License.
<div ng-include src="'app/notebook/paragraph/paragraph-parameterizedQueryForm.html'"></div>
<!-- Rendering -->
<div class="tableDisplay" ng-show="!paragraph.config.tableHide">
<div ng-include src="'app/notebook/paragraph/paragraph-chart-selector.html'"></div>
<div ng-include src="'app/notebook/paragraph/paragraph-pivot.html'"></div>
<div ng-include src="'app/notebook/paragraph/paragraph-results.html'"></div>
<div class="tableDisplay"
ng-show="!paragraph.config.tableHide"
ng-controller="ResultCtrl"
ng-repeat="result in paragraph.result"
ng-init="init(result)"
ng-include src="'app/notebook/paragraph/result.html'"
>
</div>
</div>

View file

@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
'use strict';
(function() {
angular.module('zeppelinWebApp').controller('ResultCtrl', ResultCtrl);
ResultCtrl.$inject = [
'$scope',
'$rootScope',
'$route',
'$window',
'$routeParams',
'$location',
'$timeout',
'$compile',
'$http',
'$q',
'websocketMsgSrv',
'baseUrlSrv',
'ngToast',
'saveAsService'
];
function ResultCtrl($scope, $rootScope, $route, $window, $routeParams, $location,
$timeout, $compile, $http, $q, websocketMsgSrv,
baseUrlSrv, ngToast, saveAsService) {
$scope.parentNote = null;
$scope.paragraph = null;
$scope.originalText = '';
$scope.editor = null;
$scope.init = function(results, configs) {
console.log('result controller init %o %o', results, configs);
};
};
})();

View file

@ -0,0 +1,19 @@
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<div>
<div ng-include src="'app/notebook/paragraph/result/result-chart-selector.html'"></div>
<div ng-include src="'app/notebook/paragraph/result/result-pivot.html'"></div>
<div ng-include src="'app/notebook/paragraph/result/result-results.html'"></div>
</div>

View file

@ -31,17 +31,14 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
@ -705,21 +702,31 @@ public class Note implements Serializable, ParagraphJobListener {
}
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputAppend(Paragraph paragraph, int idx, String output) {
if (jobListenerFactory != null) {
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
if (listener != null) {
listener.onOutputAppend(paragraph, out, output);
listener.onOutputAppend(paragraph, idx, output);
}
}
}
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage msg) {
if (jobListenerFactory != null) {
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
if (listener != null) {
listener.onOutputUpdate(paragraph, out, output);
listener.onOutputUpdate(paragraph, idx, msg);
}
}
}
@Override
public void onOutputUpdateAll(Paragraph paragraph, List<InterpreterResultMessage> msgs) {
if (jobListenerFactory != null) {
ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this);
if (listener != null) {
listener.onOutputUpdateAll(paragraph, msgs);
}
}
}

View file

@ -39,6 +39,7 @@ import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.apache.zeppelin.interpreter.*;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobBuilder;
@ -56,9 +57,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
@ -386,6 +384,53 @@ public class Notebook implements NoteEventListener {
return notebookRepo.get(noteId, revisionId, subject);
}
public void convertFromSingleResultToMultipleResultsFormat(Note note) {
for (Paragraph p : note.paragraphs) {
Object ret = p.getReturn();
try {
if (ret instanceof Map) {
Map r = ((Map) ret);
if (r.containsKey("code") &&
r.containsKey("msg") &&
r.containsKey("type")) { // all three fields exists in sinle result format
InterpreterResult.Code code = InterpreterResult.Code.valueOf((String) r.get("code"));
InterpreterResult.Type type = InterpreterResult.Type.valueOf((String) r.get("type"));
String msg = (String) r.get("msg");
InterpreterResult result = new InterpreterResult(code, msg);
if (result.message().size() == 1) {
result = new InterpreterResult(code);
result.add(type, msg);
}
p.setResult(result);
// convert config
Map<String, Object> config = p.getConfig();
Object graph = config.remove("graph");
Object apps = config.remove("apps");
Object helium = config.remove("helium");
List<Object> results = new LinkedList<>();
for (int i = 0; i < result.message().size(); i++) {
if (i == result.message().size() - 1) {
HashMap<Object, Object> res = new HashMap<>();
res.put("graph", graph);
res.put("apps", apps);
res.put("helium", helium);
results.add(res);
} else {
results.add(new HashMap<>());
}
}
config.put("result", results);
}
}
} catch (Exception e) {
logger.error("Conversion failure", e);
}
}
}
@SuppressWarnings("rawtypes")
private Note loadNoteFromRepo(String id, AuthenticationInfo subject) {
Note note = null;
@ -398,6 +443,8 @@ public class Notebook implements NoteEventListener {
return null;
}
convertFromSingleResultToMultipleResultsFormat(note);
//Manually inject ALL dependencies, as DI constructor was NOT used
note.setIndex(this.noteSearchService);
note.setCredentials(this.credentials);

View file

@ -335,27 +335,11 @@ public class Paragraph extends Job implements Serializable, Cloneable {
return getReturn();
}
String message = "";
context.out.flush();
InterpreterResult.Type outputType = context.out.getType();
byte[] interpreterOutput = context.out.toByteArray();
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
resultMessages.addAll(ret.message());
if (interpreterOutput != null && interpreterOutput.length > 0) {
message = new String(interpreterOutput);
}
if (message.isEmpty()) {
return ret;
} else {
String interpreterResultMessage = ret.message();
if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
message += interpreterResultMessage;
return new InterpreterResult(ret.code(), ret.type(), message);
} else {
return new InterpreterResult(ret.code(), outputType, message);
}
}
return resultMessages;
} finally {
InterpreterContext.remove();
}
@ -416,29 +400,45 @@ public class Paragraph extends Job implements Serializable, Cloneable {
return getInterpreterContext(new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
updateParagraphResult(out);
((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line));
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
((ParagraphJobListener) getListener()).onOutputAppend(self, index, new String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
updateParagraphResult(out);
((ParagraphJobListener) getListener()).onOutputUpdate(self, out,
new String(output));
}
private void updateParagraphResult(InterpreterOutput out) {
// update paragraph result
Throwable t = null;
String message = null;
public void onUpdate(int index, InterpreterResultMessageOutput out) {
try {
message = new String(out.toByteArray());
((ParagraphJobListener) getListener()).onOutputUpdate(
self, index, out.toInterpreterResultMessage());
} catch (IOException e) {
logger.error(e.getMessage(), e);
t = e;
}
setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t);
}
@Override
public void onUpdateAll(InterpreterOutput out) {
try {
List<InterpreterResultMessage> messages = out.toInterpreterResultMessage();
((ParagraphJobListener) getListener()).onOutputUpdateAll(self, messages);
updateParagraphResult(messages);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
@Override
public void onClose(InterpreterOutput out) {
try {
updateParagraphResult(out.toInterpreterResultMessage());
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
private void updateParagraphResult(List<InterpreterResultMessage> msgs) {
// update paragraph result
InterpreterResult result = new InterpreterResult(Code.SUCCESS, msgs);
setReturn(result, null);
}
}));
}

View file

@ -18,12 +18,17 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.scheduler.JobListener;
import java.util.List;
/**
* Listen paragraph update
*/
public interface ParagraphJobListener extends JobListener {
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output);
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output);
public void onOutputAppend(Paragraph paragraph, int idx, String output);
public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage msg);
public void onOutputUpdateAll(Paragraph paragraph, List<InterpreterResultMessage> msgs);
}

View file

@ -323,11 +323,17 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
public ParagraphJobListener getParagraphJobListener(Note note) {
return new ParagraphJobListener() {
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputAppend(Paragraph paragraph, int idx, String output) {
}
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage msg) {
}
@Override
public void onOutputUpdateAll(Paragraph paragraph, List<InterpreterResultMessage> msgs) {
}

View file

@ -1080,11 +1080,18 @@ public class NotebookTest implements JobListenerFactory{
return new ParagraphJobListener(){
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputAppend(Paragraph paragraph, int idx, String output) {
}
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage msg) {
}
@Override
public void onOutputUpdateAll(Paragraph paragraph, List<InterpreterResultMessage> msgs) {
}
@Override

View file

@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -35,6 +36,7 @@ import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.notebook.*;
@ -400,11 +402,18 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
return new ParagraphJobListener(){
@Override
public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputAppend(Paragraph paragraph, int idx, String output) {
}
@Override
public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) {
public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage msg) {
}
@Override
public void onOutputUpdateAll(Paragraph paragraph, List<InterpreterResultMessage> msgs) {
}
@Override