mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Put last value of scala repl into resource pool
This commit is contained in:
parent
f2ab95dcd3
commit
b4ff52fbcf
2 changed files with 31 additions and 5 deletions
|
|
@ -51,6 +51,8 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
|
|||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.WellKnownResourceName;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
|
||||
|
|
@ -545,7 +547,9 @@ public class SparkInterpreter extends Interpreter {
|
|||
binder.put("sc", sc);
|
||||
binder.put("sqlc", sqlc);
|
||||
binder.put("z", z);
|
||||
|
||||
binder.put("intp", intp);
|
||||
intp.interpret("@transient val intp = _binder.get(\"intp\").asInstanceOf[org.apache.spark" +
|
||||
".repl.SparkIMain]");
|
||||
intp.interpret("@transient val z = "
|
||||
+ "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
|
||||
intp.interpret("@transient val sc = "
|
||||
|
|
@ -761,13 +765,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
|
||||
SparkEnv.set(env);
|
||||
|
||||
// add print("") to make sure not finishing with comment
|
||||
// see https://github.com/NFLabs/zeppelin/issues/151
|
||||
String[] linesToRun = new String[lines.length + 1];
|
||||
String[] linesToRun = new String[lines.length];
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
linesToRun[i] = lines[i];
|
||||
}
|
||||
linesToRun[lines.length] = "print(\"\")";
|
||||
|
||||
Console.setOut(context.out);
|
||||
out.setInterpreterOutput(context.out);
|
||||
|
|
@ -809,17 +810,41 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
// make sure code does not finish with comment
|
||||
if (r == Code.INCOMPLETE) {
|
||||
scala.tools.nsc.interpreter.Results.Result res;
|
||||
res = intp.interpret(incomplete + "\nprint(\"\")");
|
||||
r = getResultCode(res);
|
||||
}
|
||||
|
||||
if (r == Code.INCOMPLETE) {
|
||||
sc.clearJobGroup();
|
||||
out.setInterpreterOutput(null);
|
||||
return new InterpreterResult(r, "Incomplete expression");
|
||||
} else {
|
||||
sc.clearJobGroup();
|
||||
putLatestVarInResourcePool(context);
|
||||
out.setInterpreterOutput(null);
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
private void putLatestVarInResourcePool(InterpreterContext context) {
|
||||
String varName = intp.mostRecentVar();
|
||||
if (varName == null || varName.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Object lastObj = getValue(varName);
|
||||
if (lastObj != null) {
|
||||
logger.info("Put LastValue {} {} {} into ResourcePool",
|
||||
varName, lastObj, lastObj.getClass().getName());
|
||||
ResourcePool resourcePool = context.getResourcePool();
|
||||
resourcePool.put(context.getNoteId(), context.getParagraphId(),
|
||||
WellKnownResourceName.ZeppelinReplResult.toString(), lastObj);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.resource;
|
|||
* Well known resource names in ResourcePool
|
||||
*/
|
||||
public enum WellKnownResourceName {
|
||||
ZeppelinReplResult("zeppelin.repl.result"), // last object of repl
|
||||
ZeppelinTableResult("zeppelin.paragraph.result.table"); // paragraph run result
|
||||
|
||||
String name;
|
||||
|
|
|
|||
Loading…
Reference in a new issue