Merge branch 'master' into py4jPythonInterpreter

This commit is contained in:
astroshim 2017-03-09 02:27:03 +09:00
commit a48df58c6a
6 changed files with 49 additions and 36 deletions

View file

@ -38,6 +38,7 @@ import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
import org.apache.spark.SecurityManager;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.repl.SparkILoop;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
@ -126,6 +127,7 @@ public class SparkInterpreter extends Interpreter {
private SparkVersion sparkVersion;
private static File outputDir; // class outputdir for scala 2.11
private Object classServer; // classserver for scala 2.11
private JavaSparkContext jsc;
public SparkInterpreter(Properties property) {
@ -152,6 +154,15 @@ public class SparkInterpreter extends Interpreter {
}
}
public JavaSparkContext getJavaSparkContext() {
synchronized (sharedInterpreterLock) {
if (jsc == null) {
jsc = JavaSparkContext.fromSparkContext(sc);
}
return jsc;
}
}
public boolean isSparkContextInitialized() {
synchronized (sharedInterpreterLock) {
return sc != null;
@ -1422,6 +1433,7 @@ public class SparkInterpreter extends Interpreter {
}
sparkSession = null;
sc = null;
jsc = null;
if (classServer != null) {
Utils.invokeMethod(classServer, "stop");
classServer = null;

View file

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkRBackend;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
@ -45,6 +46,7 @@ public class SparkRInterpreter extends Interpreter {
private SparkInterpreter sparkInterpreter;
private ZeppelinR zeppelinR;
private SparkContext sc;
private JavaSparkContext jsc;
public SparkRInterpreter(Properties property) {
super(property);
@ -73,8 +75,10 @@ public class SparkRInterpreter extends Interpreter {
this.sparkInterpreter = getSparkInterpreter();
this.sc = sparkInterpreter.getSparkContext();
this.jsc = sparkInterpreter.getJavaSparkContext();
SparkVersion sparkVersion = new SparkVersion(sc.version());
ZeppelinRContext.setSparkContext(sc);
ZeppelinRContext.setJavaSparkContext(jsc);
if (Utils.isSpark2()) {
ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
}

View file

@ -18,6 +18,7 @@
package org.apache.zeppelin.spark;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
/**
@ -28,6 +29,7 @@ public class ZeppelinRContext {
private static SQLContext sqlContext;
private static ZeppelinContext zeppelinContext;
private static Object sparkSession;
private static JavaSparkContext javaSparkContext;
public static void setSparkContext(SparkContext sparkContext) {
ZeppelinRContext.sparkContext = sparkContext;
@ -60,4 +62,8 @@ public class ZeppelinRContext {
public static Object getSparkSession() {
return sparkSession;
}
public static void setJavaSparkContext(JavaSparkContext jsc) { javaSparkContext = jsc; }
public static JavaSparkContext getJavaSparkContext() { return javaSparkContext; }
}

View file

@ -45,7 +45,7 @@ assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv)
if (version >= 20000) {
assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv)
assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)
assign(".sparkRjsc", get(".sc", envir = SparkR:::.sparkREnv), envir=SparkR:::.sparkREnv)
assign(".sparkRjsc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getJavaSparkContext"), envir = SparkR:::.sparkREnv)
}
assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv)
assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv)

View file

@ -182,22 +182,16 @@ public abstract class Job {
this.exception = null;
errorMessage = null;
dateFinished = new Date();
progressUpdator.terminate();
} catch (NullPointerException e) {
LOGGER.error("Job failed", e);
progressUpdator.terminate();
this.exception = e;
setResult(e.getMessage());
errorMessage = getStack(e);
dateFinished = new Date();
} catch (Throwable e) {
LOGGER.error("Job failed", e);
progressUpdator.terminate();
this.exception = e;
setResult(e.getMessage());
errorMessage = getStack(e);
dateFinished = new Date();
} finally {
if (progressUpdator != null) {
progressUpdator.interrupt();
}
//aborted = false;
}
}

View file

@ -21,48 +21,45 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Polls job progress with given interval
*
* @see Job#progress()
* @see JobListener#onProgressUpdate(org.apache.zeppelin.scheduler.Job, int)
*
* TODO(moon) : add description.
*/
public class JobProgressPoller extends Thread {
public static final long DEFAULT_INTERVAL_MSEC = 500;
Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
private static final Logger logger = LoggerFactory.getLogger(JobProgressPoller.class);
private Job job;
private long intervalMs;
boolean terminate = false;
public JobProgressPoller(Job job, long intervalMs) {
super("JobProgressPoller, jobId=" + job.getId());
this.job = job;
this.intervalMs = intervalMs;
if (intervalMs < 0) {
throw new IllegalArgumentException("polling interval can't be " + intervalMs);
}
this.intervalMs = intervalMs == 0 ? DEFAULT_INTERVAL_MSEC : intervalMs;
}
@Override
public void run() {
if (intervalMs < 0) {
return;
} else if (intervalMs == 0) {
intervalMs = DEFAULT_INTERVAL_MSEC;
}
while (terminate == false) {
JobListener listener = job.getListener();
if (listener != null) {
try {
if (job.isRunning()) {
listener.onProgressUpdate(job, job.progress());
try {
while (!Thread.interrupted()) {
JobListener listener = job.getListener();
if (listener != null) {
try {
if (job.isRunning()) {
listener.onProgressUpdate(job, job.progress());
}
} catch (Exception e) {
logger.error("Can not get or update progress", e);
}
} catch (Exception e) {
logger.error("Can not get or update progress", e);
}
}
try {
Thread.sleep(intervalMs);
} catch (InterruptedException e) {
logger.error("Exception in JobProgressPoller while run Thread.sleep", e);
}
}
}
public void terminate() {
terminate = true;
} catch (InterruptedException ignored) {}
}
}