mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Protect SparkContext creation
This commit is contained in:
parent
f2299d6c74
commit
72b22354fe
2 changed files with 61 additions and 46 deletions
|
|
@ -26,6 +26,7 @@ import java.lang.reflect.Method;
|
|||
import java.net.URL;
|
||||
import java.net.URLClassLoader;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
|
|
@ -112,15 +113,15 @@ public class SparkInterpreter extends Interpreter {
|
|||
private static SparkContext sc;
|
||||
private static SQLContext sqlc;
|
||||
private static SparkEnv env;
|
||||
private static JobProgressListener sparkListener;
|
||||
private static AbstractFile classOutputDir;
|
||||
private static Integer sharedInterpreterLock = new Integer(0);
|
||||
private static AtomicInteger numReferenceOfSparkContext = new AtomicInteger(0);
|
||||
|
||||
private SparkOutputStream out;
|
||||
private SparkDependencyResolver dep;
|
||||
private SparkJLineCompletion completor;
|
||||
|
||||
private JobProgressListener sparkListener;
|
||||
|
||||
private Map<String, Object> binder;
|
||||
private SparkVersion sparkVersion;
|
||||
|
||||
|
|
@ -138,17 +139,21 @@ public class SparkInterpreter extends Interpreter {
|
|||
sparkListener = setupListeners(this.sc);
|
||||
}
|
||||
|
||||
public synchronized SparkContext getSparkContext() {
|
||||
if (sc == null) {
|
||||
sc = createSparkContext();
|
||||
env = SparkEnv.get();
|
||||
sparkListener = setupListeners(sc);
|
||||
public SparkContext getSparkContext() {
|
||||
synchronized (sharedInterpreterLock) {
|
||||
if (sc == null) {
|
||||
sc = createSparkContext();
|
||||
env = SparkEnv.get();
|
||||
sparkListener = setupListeners(sc);
|
||||
}
|
||||
return sc;
|
||||
}
|
||||
return sc;
|
||||
}
|
||||
|
||||
public boolean isSparkContextInitialized() {
|
||||
return sc != null;
|
||||
synchronized (sharedInterpreterLock) {
|
||||
return sc != null;
|
||||
}
|
||||
}
|
||||
|
||||
static JobProgressListener setupListeners(SparkContext context) {
|
||||
|
|
@ -195,29 +200,30 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
public SQLContext getSQLContext() {
|
||||
if (sqlc == null) {
|
||||
if (useHiveContext()) {
|
||||
String name = "org.apache.spark.sql.hive.HiveContext";
|
||||
Constructor<?> hc;
|
||||
try {
|
||||
hc = getClass().getClassLoader().loadClass(name)
|
||||
.getConstructor(SparkContext.class);
|
||||
sqlc = (SQLContext) hc.newInstance(getSparkContext());
|
||||
} catch (NoSuchMethodException | SecurityException
|
||||
| ClassNotFoundException | InstantiationException
|
||||
| IllegalAccessException | IllegalArgumentException
|
||||
| InvocationTargetException e) {
|
||||
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
|
||||
// when hive dependency is not loaded, it'll fail.
|
||||
// in this case SQLContext can be used.
|
||||
synchronized (sharedInterpreterLock) {
|
||||
if (sqlc == null) {
|
||||
if (useHiveContext()) {
|
||||
String name = "org.apache.spark.sql.hive.HiveContext";
|
||||
Constructor<?> hc;
|
||||
try {
|
||||
hc = getClass().getClassLoader().loadClass(name)
|
||||
.getConstructor(SparkContext.class);
|
||||
sqlc = (SQLContext) hc.newInstance(getSparkContext());
|
||||
} catch (NoSuchMethodException | SecurityException
|
||||
| ClassNotFoundException | InstantiationException
|
||||
| IllegalAccessException | IllegalArgumentException
|
||||
| InvocationTargetException e) {
|
||||
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
|
||||
// when hive dependency is not loaded, it'll fail.
|
||||
// in this case SQLContext can be used.
|
||||
sqlc = new SQLContext(getSparkContext());
|
||||
}
|
||||
} else {
|
||||
sqlc = new SQLContext(getSparkContext());
|
||||
}
|
||||
} else {
|
||||
sqlc = new SQLContext(getSparkContext());
|
||||
}
|
||||
return sqlc;
|
||||
}
|
||||
|
||||
return sqlc;
|
||||
}
|
||||
|
||||
public SparkDependencyResolver getDependencyResolver() {
|
||||
|
|
@ -475,6 +481,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
intp.setContextClassLoader();
|
||||
intp.initializeSynchronous();
|
||||
|
||||
numReferenceOfSparkContext.incrementAndGet();
|
||||
|
||||
synchronized (sharedInterpreterLock) {
|
||||
if (classOutputDir == null) {
|
||||
|
|
@ -492,23 +499,23 @@ public class SparkInterpreter extends Interpreter {
|
|||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
completor = new SparkJLineCompletion(intp);
|
||||
|
||||
sc = getSparkContext();
|
||||
if (sc.getPoolForName("fair").isEmpty()) {
|
||||
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
|
||||
int minimumShare = 0;
|
||||
int weight = 1;
|
||||
Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
|
||||
sc.taskScheduler().rootPool().addSchedulable(pool);
|
||||
}
|
||||
|
||||
sparkVersion = SparkVersion.fromVersionString(sc.version());
|
||||
|
||||
sqlc = getSQLContext();
|
||||
}
|
||||
|
||||
completor = new SparkJLineCompletion(intp);
|
||||
|
||||
sc = getSparkContext();
|
||||
if (sc.getPoolForName("fair").isEmpty()) {
|
||||
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
|
||||
int minimumShare = 0;
|
||||
int weight = 1;
|
||||
Pool pool = new Pool("fair", schedulingMode, minimumShare, weight);
|
||||
sc.taskScheduler().rootPool().addSchedulable(pool);
|
||||
}
|
||||
|
||||
sparkVersion = SparkVersion.fromVersionString(sc.version());
|
||||
|
||||
sqlc = getSQLContext();
|
||||
|
||||
dep = getDependencyResolver();
|
||||
|
||||
z = new ZeppelinContext(sc, sqlc, null, dep,
|
||||
|
|
@ -922,8 +929,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
sc.stop();
|
||||
sc = null;
|
||||
if (numReferenceOfSparkContext.decrementAndGet() == 0) {
|
||||
sc.stop();
|
||||
sc = null;
|
||||
}
|
||||
|
||||
intp.close();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
|||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -246,7 +247,12 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
@Override
|
||||
protected boolean jobAbort() {
|
||||
Interpreter repl = getRepl(getRequiredReplName());
|
||||
Job job = repl.getScheduler().removeFromWaitingQueue(getId());
|
||||
Scheduler scheduler = repl.getScheduler();
|
||||
if (scheduler == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
Job job = scheduler.removeFromWaitingQueue(getId());
|
||||
if (job != null) {
|
||||
job.setStatus(Status.ABORT);
|
||||
} else {
|
||||
|
|
|
|||
Loading…
Reference in a new issue