Protect SparkContext creation

This commit is contained in:
Lee moon soo 2016-02-17 22:05:42 -08:00
parent f2299d6c74
commit 72b22354fe
2 changed files with 61 additions and 46 deletions

View file

@ -26,6 +26,7 @@ import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Joiner;
@ -112,15 +113,15 @@ public class SparkInterpreter extends Interpreter {
private static SparkContext sc;
private static SQLContext sqlc;
private static SparkEnv env;
private static JobProgressListener sparkListener;
private static AbstractFile classOutputDir;
private static Integer sharedInterpreterLock = new Integer(0);
private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
private SparkOutputStream out;
private SparkDependencyResolver dep;
private SparkJLineCompletion completor;
private JobProgressListener sparkListener;
private Map<String, Object> binder;
private SparkVersion sparkVersion;
@ -138,17 +139,21 @@ public class SparkInterpreter extends Interpreter {
sparkListener = setupListeners(this.sc);
}
public synchronized SparkContext getSparkContext() {
if (sc == null) {
sc = createSparkContext();
env = SparkEnv.get();
sparkListener = setupListeners(sc);
public SparkContext getSparkContext() {
synchronized (sharedInterpreterLock) {
if (sc == null) {
sc = createSparkContext();
env = SparkEnv.get();
sparkListener = setupListeners(sc);
}
return sc;
}
return sc;
}
public boolean isSparkContextInitialized() {
return sc != null;
synchronized (sharedInterpreterLock) {
return sc != null;
}
}
static JobProgressListener setupListeners(SparkContext context) {
@ -195,29 +200,30 @@ public class SparkInterpreter extends Interpreter {
}
public SQLContext getSQLContext() {
if (sqlc == null) {
if (useHiveContext()) {
String name = "org.apache.spark.sql.hive.HiveContext";
Constructor<?> hc;
try {
hc = getClass().getClassLoader().loadClass(name)
.getConstructor(SparkContext.class);
sqlc = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
synchronized (sharedInterpreterLock) {
if (sqlc == null) {
if (useHiveContext()) {
String name = "org.apache.spark.sql.hive.HiveContext";
Constructor<?> hc;
try {
hc = getClass().getClassLoader().loadClass(name)
.getConstructor(SparkContext.class);
sqlc = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
sqlc = new SQLContext(getSparkContext());
}
} else {
sqlc = new SQLContext(getSparkContext());
}
} else {
sqlc = new SQLContext(getSparkContext());
}
return sqlc;
}
return sqlc;
}
public SparkDependencyResolver getDependencyResolver() {
@ -475,6 +481,7 @@ public class SparkInterpreter extends Interpreter {
intp.setContextClassLoader();
intp.initializeSynchronous();
numReferenceOfSparkContext.incrementAndGet();
synchronized (sharedInterpreterLock) {
if (classOutputDir == null) {
@ -492,23 +499,23 @@ public class SparkInterpreter extends Interpreter {
logger.error(e.getMessage(), e);
}
}
completor = new SparkJLineCompletion(intp);
sc = getSparkContext();
if (sc.getPoolForName("fair").isEmpty()) {
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
int minimumShare = 0;
int weight = 1;
Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
sc.taskScheduler().rootPool().addSchedulable(pool);
}
sparkVersion = SparkVersion.fromVersionString(sc.version());
sqlc = getSQLContext();
}
completor = new SparkJLineCompletion(intp);
sc = getSparkContext();
if (sc.getPoolForName("fair").isEmpty()) {
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
int minimumShare = 0;
int weight = 1;
Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
sc.taskScheduler().rootPool().addSchedulable(pool);
}
sparkVersion = SparkVersion.fromVersionString(sc.version());
sqlc = getSQLContext();
dep = getDependencyResolver();
z = new ZeppelinContext(sc, sqlc, null, dep,
@ -922,8 +929,10 @@ public class SparkInterpreter extends Interpreter {
@Override
public void close() {
sc.stop();
sc = null;
if (numReferenceOfSparkContext.decrementAndGet() == 0) {
sc.stop();
sc = null;
}
intp.close();
}

View file

@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.JobListener;
import org.apache.zeppelin.scheduler.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -246,7 +247,12 @@ public class Paragraph extends Job implements Serializable, Cloneable {
@Override
protected boolean jobAbort() {
Interpreter repl = getRepl(getRequiredReplName());
Job job = repl.getScheduler().removeFromWaitingQueue(getId());
Scheduler scheduler = repl.getScheduler();
if (scheduler == null) {
return true;
}
Job job = scheduler.removeFromWaitingQueue(getId());
if (job != null) {
job.setStatus(Status.ABORT);
} else {