mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Share a SparkContext across ScalaCompilers
This commit is contained in:
parent
cc33c25843
commit
4299b876e2
2 changed files with 49 additions and 11 deletions
|
|
@ -18,9 +18,9 @@
|
|||
package org.apache.zeppelin.spark;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URL;
|
||||
|
|
@ -29,7 +29,6 @@ import java.util.*;
|
|||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.spark.HttpServer;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.SparkContext;
|
||||
|
|
@ -46,7 +45,6 @@ import org.apache.spark.ui.jobs.JobProgressListener;
|
|||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
|
|
@ -59,17 +57,15 @@ import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import scala.Console;
|
||||
import scala.*;
|
||||
import scala.Enumeration.Value;
|
||||
import scala.None;
|
||||
import scala.Some;
|
||||
import scala.Tuple2;
|
||||
import scala.collection.Iterator;
|
||||
import scala.collection.JavaConversions;
|
||||
import scala.collection.JavaConverters;
|
||||
import scala.collection.Seq;
|
||||
import scala.collection.mutable.HashMap;
|
||||
import scala.collection.mutable.HashSet;
|
||||
import scala.reflect.io.AbstractFile;
|
||||
import scala.tools.nsc.Settings;
|
||||
import scala.tools.nsc.interpreter.Completion.Candidates;
|
||||
import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
|
||||
|
|
@ -114,15 +110,18 @@ public class SparkInterpreter extends Interpreter {
|
|||
private SparkILoop interpreter;
|
||||
private SparkIMain intp;
|
||||
private static SparkContext sc;
|
||||
private SparkOutputStream out;
|
||||
private static SQLContext sqlc;
|
||||
private static SparkEnv env;
|
||||
private static AbstractFile classOutputDir;
|
||||
private static Integer sharedInterpreterLock = new Integer(0);
|
||||
|
||||
private SparkOutputStream out;
|
||||
private SparkDependencyResolver dep;
|
||||
private SparkJLineCompletion completor;
|
||||
|
||||
private JobProgressListener sparkListener;
|
||||
|
||||
private Map<String, Object> binder;
|
||||
private SparkEnv env;
|
||||
private SparkVersion sparkVersion;
|
||||
|
||||
|
||||
|
|
@ -192,7 +191,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
private boolean useHiveContext() {
|
||||
return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
|
||||
return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
|
||||
}
|
||||
|
||||
public SQLContext getSQLContext() {
|
||||
|
|
@ -464,7 +463,6 @@ public class SparkInterpreter extends Interpreter {
|
|||
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
|
||||
|
||||
System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$");
|
||||
|
||||
|
||||
/* create scala repl */
|
||||
this.interpreter = new SparkILoop(null, new PrintWriter(out));
|
||||
|
|
@ -477,6 +475,25 @@ public class SparkInterpreter extends Interpreter {
|
|||
intp.setContextClassLoader();
|
||||
intp.initializeSynchronous();
|
||||
|
||||
|
||||
synchronized (sharedInterpreterLock) {
|
||||
if (classOutputDir == null) {
|
||||
classOutputDir = settings.outputDirs().getSingleOutput().get();
|
||||
} else {
|
||||
// change SparkIMain class output dir
|
||||
settings.outputDirs().setSingleOutput(classOutputDir);
|
||||
ClassLoader cl = intp.classLoader();
|
||||
|
||||
try {
|
||||
Field rootField = cl.getClass().getSuperclass().getDeclaredField("root");
|
||||
rootField.setAccessible(true);
|
||||
rootField.set(cl, classOutputDir);
|
||||
} catch (NoSuchFieldException | IllegalAccessException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
completor = new SparkJLineCompletion(intp);
|
||||
|
||||
sc = getSparkContext();
|
||||
|
|
|
|||
|
|
@ -24,6 +24,8 @@ import java.util.HashMap;
|
|||
import java.util.LinkedList;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.spark.HttpServer;
|
||||
import org.apache.spark.SecurityManager;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
|
|
@ -70,8 +72,10 @@ public class SparkInterpreterTest {
|
|||
if (repl == null) {
|
||||
Properties p = new Properties();
|
||||
intpGroup = new InterpreterGroup();
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
repl = new SparkInterpreter(p);
|
||||
repl.setInterpreterGroup(intpGroup);
|
||||
intpGroup.get("note").add(repl);
|
||||
repl.open();
|
||||
}
|
||||
|
||||
|
|
@ -186,4 +190,21 @@ public class SparkInterpreterTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shareSingleSparkContext() throws InterruptedException {
|
||||
// create another SparkInterpreter
|
||||
Properties p = new Properties();
|
||||
SparkInterpreter repl2 = new SparkInterpreter(p);
|
||||
repl2.setInterpreterGroup(intpGroup);
|
||||
intpGroup.get("note").add(repl2);
|
||||
repl2.open();
|
||||
|
||||
assertEquals(Code.SUCCESS,
|
||||
repl.interpret("print(sc.parallelize(1 to 10).count())", context).code());
|
||||
assertEquals(Code.SUCCESS,
|
||||
repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code());
|
||||
|
||||
repl2.close();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue