Share a SparkContext across ScalaCompilers

This commit is contained in:
Lee moon soo 2016-02-16 00:11:35 -08:00
parent cc33c25843
commit 4299b876e2
2 changed files with 49 additions and 11 deletions

View file

@ -18,9 +18,9 @@
package org.apache.zeppelin.spark;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
@ -29,7 +29,6 @@ import java.util.*;
import com.google.common.base.Joiner;
import org.apache.commons.io.FileUtils;
import org.apache.spark.HttpServer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
@ -46,7 +45,6 @@ import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
@ -59,17 +57,15 @@ import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Console;
import scala.*;
import scala.Enumeration.Value;
import scala.None;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashSet;
import scala.reflect.io.AbstractFile;
import scala.tools.nsc.Settings;
import scala.tools.nsc.interpreter.Completion.Candidates;
import scala.tools.nsc.interpreter.Completion.ScalaCompleter;
@ -114,15 +110,18 @@ public class SparkInterpreter extends Interpreter {
private SparkILoop interpreter;
private SparkIMain intp;
private static SparkContext sc;
private SparkOutputStream out;
private static SQLContext sqlc;
private static SparkEnv env;
private static AbstractFile classOutputDir;
private static Integer sharedInterpreterLock = new Integer(0);
private SparkOutputStream out;
private SparkDependencyResolver dep;
private SparkJLineCompletion completor;
private JobProgressListener sparkListener;
private Map<String, Object> binder;
private SparkEnv env;
private SparkVersion sparkVersion;
@ -192,7 +191,7 @@ public class SparkInterpreter extends Interpreter {
}
private boolean useHiveContext() {
return Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
}
public SQLContext getSQLContext() {
@ -464,7 +463,6 @@ public class SparkInterpreter extends Interpreter {
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$");
/* create scala repl */
this.interpreter = new SparkILoop(null, new PrintWriter(out));
@ -477,6 +475,25 @@ public class SparkInterpreter extends Interpreter {
intp.setContextClassLoader();
intp.initializeSynchronous();
synchronized (sharedInterpreterLock) {
if (classOutputDir == null) {
classOutputDir = settings.outputDirs().getSingleOutput().get();
} else {
// change SparkIMain class output dir
settings.outputDirs().setSingleOutput(classOutputDir);
ClassLoader cl = intp.classLoader();
try {
Field rootField = cl.getClass().getSuperclass().getDeclaredField("root");
rootField.setAccessible(true);
rootField.set(cl, classOutputDir);
} catch (NoSuchFieldException | IllegalAccessException e) {
logger.error(e.getMessage(), e);
}
}
}
completor = new SparkJLineCompletion(intp);
sc = getSparkContext();

View file

@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Properties;
import org.apache.spark.HttpServer;
import org.apache.spark.SecurityManager;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.zeppelin.display.AngularObjectRegistry;
@ -70,8 +72,10 @@ public class SparkInterpreterTest {
if (repl == null) {
Properties p = new Properties();
intpGroup = new InterpreterGroup();
intpGroup.put("note", new LinkedList<Interpreter>());
repl = new SparkInterpreter(p);
repl.setInterpreterGroup(intpGroup);
intpGroup.get("note").add(repl);
repl.open();
}
@ -186,4 +190,21 @@ public class SparkInterpreterTest {
}
}
}
@Test
public void shareSingleSparkContext() throws InterruptedException {
// create another SparkInterpreter
Properties p = new Properties();
SparkInterpreter repl2 = new SparkInterpreter(p);
repl2.setInterpreterGroup(intpGroup);
intpGroup.get("note").add(repl2);
repl2.open();
assertEquals(Code.SUCCESS,
repl.interpret("print(sc.parallelize(1 to 10).count())", context).code());
assertEquals(Code.SUCCESS,
repl2.interpret("print(sc.parallelize(1 to 10).count())", context).code());
repl2.close();
}
}