mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' into py4jPythonInterpreter
This commit is contained in:
commit
a48df58c6a
6 changed files with 49 additions and 36 deletions
|
|
@ -38,6 +38,7 @@ import org.apache.spark.SparkContext;
|
|||
import org.apache.spark.SparkEnv;
|
||||
|
||||
import org.apache.spark.SecurityManager;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.repl.SparkILoop;
|
||||
import org.apache.spark.scheduler.ActiveJob;
|
||||
import org.apache.spark.scheduler.DAGScheduler;
|
||||
|
|
@ -126,6 +127,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
private SparkVersion sparkVersion;
|
||||
private static File outputDir; // class outputdir for scala 2.11
|
||||
private Object classServer; // classserver for scala 2.11
|
||||
private JavaSparkContext jsc;
|
||||
|
||||
|
||||
public SparkInterpreter(Properties property) {
|
||||
|
|
@ -152,6 +154,15 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
public JavaSparkContext getJavaSparkContext() {
|
||||
synchronized (sharedInterpreterLock) {
|
||||
if (jsc == null) {
|
||||
jsc = JavaSparkContext.fromSparkContext(sc);
|
||||
}
|
||||
return jsc;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSparkContextInitialized() {
|
||||
synchronized (sharedInterpreterLock) {
|
||||
return sc != null;
|
||||
|
|
@ -1422,6 +1433,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
sparkSession = null;
|
||||
sc = null;
|
||||
jsc = null;
|
||||
if (classServer != null) {
|
||||
Utils.invokeMethod(classServer, "stop");
|
||||
classServer = null;
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.SparkRBackend;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
|
|
@ -45,6 +46,7 @@ public class SparkRInterpreter extends Interpreter {
|
|||
private SparkInterpreter sparkInterpreter;
|
||||
private ZeppelinR zeppelinR;
|
||||
private SparkContext sc;
|
||||
private JavaSparkContext jsc;
|
||||
|
||||
public SparkRInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -73,8 +75,10 @@ public class SparkRInterpreter extends Interpreter {
|
|||
|
||||
this.sparkInterpreter = getSparkInterpreter();
|
||||
this.sc = sparkInterpreter.getSparkContext();
|
||||
this.jsc = sparkInterpreter.getJavaSparkContext();
|
||||
SparkVersion sparkVersion = new SparkVersion(sc.version());
|
||||
ZeppelinRContext.setSparkContext(sc);
|
||||
ZeppelinRContext.setJavaSparkContext(jsc);
|
||||
if (Utils.isSpark2()) {
|
||||
ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.zeppelin.spark;
|
||||
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
|
||||
/**
|
||||
|
|
@ -28,6 +29,7 @@ public class ZeppelinRContext {
|
|||
private static SQLContext sqlContext;
|
||||
private static ZeppelinContext zeppelinContext;
|
||||
private static Object sparkSession;
|
||||
private static JavaSparkContext javaSparkContext;
|
||||
|
||||
public static void setSparkContext(SparkContext sparkContext) {
|
||||
ZeppelinRContext.sparkContext = sparkContext;
|
||||
|
|
@ -60,4 +62,8 @@ public class ZeppelinRContext {
|
|||
public static Object getSparkSession() {
|
||||
return sparkSession;
|
||||
}
|
||||
|
||||
public static void setJavaSparkContext(JavaSparkContext jsc) { javaSparkContext = jsc; }
|
||||
|
||||
public static JavaSparkContext getJavaSparkContext() { return javaSparkContext; }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
|
|||
if (version >= 20000) {
|
||||
assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv)
|
||||
assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
|
||||
assign(".sparkRjsc", get(".sc", envir = SparkR:::.sparkREnv), envir=SparkR:::.sparkREnv)
|
||||
assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv)
|
||||
}
|
||||
assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
|
||||
assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
|
||||
|
|
|
|||
|
|
@ -182,22 +182,16 @@ public abstract class Job {
|
|||
this.exception = null;
|
||||
errorMessage = null;
|
||||
dateFinished = new Date();
|
||||
progressUpdator.terminate();
|
||||
} catch (NullPointerException e) {
|
||||
LOGGER.error("Job failed", e);
|
||||
progressUpdator.terminate();
|
||||
this.exception = e;
|
||||
setResult(e.getMessage());
|
||||
errorMessage = getStack(e);
|
||||
dateFinished = new Date();
|
||||
} catch (Throwable e) {
|
||||
LOGGER.error("Job failed", e);
|
||||
progressUpdator.terminate();
|
||||
this.exception = e;
|
||||
setResult(e.getMessage());
|
||||
errorMessage = getStack(e);
|
||||
dateFinished = new Date();
|
||||
} finally {
|
||||
if (progressUpdator != null) {
|
||||
progressUpdator.interrupt();
|
||||
}
|
||||
//aborted = false;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,48 +21,45 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Polls job progress with given interval
|
||||
*
|
||||
* @see Job#progress()
|
||||
* @see JobListener#onProgressUpdate(org.apache.zeppelin.scheduler.Job, int)
|
||||
*
|
||||
* TODO(moon) : add description.
|
||||
*/
|
||||
public class JobProgressPoller extends Thread {
|
||||
public static final long DEFAULT_INTERVAL_MSEC = 500;
|
||||
Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
|
||||
|
||||
private Job job;
|
||||
private long intervalMs;
|
||||
boolean terminate = false;
|
||||
|
||||
public JobProgressPoller(Job job, long intervalMs) {
|
||||
super("JobProgressPoller, jobId=" + job.getId());
|
||||
this.job = job;
|
||||
this.intervalMs = intervalMs;
|
||||
if (intervalMs < 0) {
|
||||
throw new IllegalArgumentException("polling interval can't be " + intervalMs);
|
||||
}
|
||||
this.intervalMs = intervalMs == 0 ? DEFAULT_INTERVAL_MSEC : intervalMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (intervalMs < 0) {
|
||||
return;
|
||||
} else if (intervalMs == 0) {
|
||||
intervalMs = DEFAULT_INTERVAL_MSEC;
|
||||
}
|
||||
|
||||
while (terminate == false) {
|
||||
JobListener listener = job.getListener();
|
||||
if (listener != null) {
|
||||
try {
|
||||
if (job.isRunning()) {
|
||||
listener.onProgressUpdate(job, job.progress());
|
||||
try {
|
||||
while (!Thread.interrupted()) {
|
||||
JobListener listener = job.getListener();
|
||||
if (listener != null) {
|
||||
try {
|
||||
if (job.isRunning()) {
|
||||
listener.onProgressUpdate(job, job.progress());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Can not get or update progress", e);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Can not get or update progress", e);
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(intervalMs);
|
||||
} catch (InterruptedException e) {
|
||||
logger.error("Exception in JobProgressPoller while run Thread.sleep", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void terminate() {
|
||||
terminate = true;
|
||||
} catch (InterruptedException ignored) {}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue