mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Spark 2.0 support
This commit is contained in:
parent
483a89705b
commit
f06a2fa811
8 changed files with 267 additions and 57 deletions
|
|
@ -33,13 +33,17 @@ addons:
|
|||
|
||||
matrix:
|
||||
include:
|
||||
# Test all modules with spark-2.0.0-preview and scala 2.11
|
||||
- jdk: "oraclejdk7"
|
||||
env: SCALA_VER="2.11" SPARK_VER="2.0.0-preview" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Dspark.version=2.0.0-preview -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="-Dpython.test.exclude=''"
|
||||
|
||||
# Test all modules with scala 2.10
|
||||
- jdk: "oraclejdk7"
|
||||
env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples" BUILD_FLAG="package -Dscala-2.10 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
|
||||
env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
|
||||
|
||||
# Test all modules with scala 2.11
|
||||
- jdk: "oraclejdk7"
|
||||
env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Dscala-2.11 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
|
||||
env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
|
||||
|
||||
# Test spark module for 1.5.2
|
||||
- jdk: "oraclejdk7"
|
||||
|
|
|
|||
|
|
@ -283,12 +283,6 @@
|
|||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
|
||||
|
|
@ -517,9 +511,6 @@
|
|||
|
||||
<profile>
|
||||
<id>spark-1.6</id>
|
||||
<activation>
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
<properties>
|
||||
<spark.version>1.6.1</spark.version>
|
||||
<py4j.version>0.9</py4j.version>
|
||||
|
|
@ -529,6 +520,19 @@
|
|||
</properties>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>spark-2.0</id>
|
||||
<activation>
|
||||
<activeByDefault>true</activeByDefault>
|
||||
</activation>
|
||||
<properties>
|
||||
<spark.version>2.0.0</spark.version>
|
||||
<protobuf.version>2.5.0</protobuf.version>
|
||||
<py4j.version>0.10.1</py4j.version>
|
||||
<scala.version>2.11.8</scala.version>
|
||||
</properties>
|
||||
</profile>
|
||||
|
||||
<profile>
|
||||
<id>hadoop-0.23</id>
|
||||
<!-- SPARK-1121: Adds an explicit dependency on Avro to work around a
|
||||
|
|
|
|||
|
|
@ -528,6 +528,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
}
|
||||
}
|
||||
|
||||
public Object getSparkSession() {
|
||||
SparkInterpreter intp = getSparkInterpreter();
|
||||
if (intp == null) {
|
||||
return null;
|
||||
} else {
|
||||
return intp.getSparkSession();
|
||||
}
|
||||
}
|
||||
|
||||
public SparkConf getSparkConf() {
|
||||
JavaSparkContext sc = getJavaSparkContext();
|
||||
if (sc == null) {
|
||||
|
|
|
|||
|
|
@ -32,9 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import com.google.common.base.Joiner;
|
||||
|
||||
import com.google.common.reflect.TypeToken;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.spark.HttpServer;
|
||||
import org.apache.spark.SparkConf;
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.SparkEnv;
|
||||
|
|
@ -99,9 +96,11 @@ public class SparkInterpreter extends Interpreter {
|
|||
* intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
|
||||
*/
|
||||
private Object intp;
|
||||
private SparkConf conf;
|
||||
private static SparkContext sc;
|
||||
private static SQLContext sqlc;
|
||||
private static SparkEnv env;
|
||||
private static Object sparkSession; // spark 2.x
|
||||
private static JobProgressListener sparkListener;
|
||||
private static AbstractFile classOutputDir;
|
||||
private static Integer sharedInterpreterLock = new Integer(0);
|
||||
|
|
@ -118,7 +117,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
private Map<String, Object> binder;
|
||||
private SparkVersion sparkVersion;
|
||||
private File outputDir; // class outputdir for scala 2.11
|
||||
private HttpServer classServer; // classserver for scala 2.11
|
||||
private Object classServer; // classserver for scala 2.11
|
||||
|
||||
|
||||
public SparkInterpreter(Properties property) {
|
||||
|
|
@ -194,35 +193,80 @@ public class SparkInterpreter extends Interpreter {
|
|||
return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
|
||||
}
|
||||
|
||||
/**
|
||||
* See org.apache.spark.sql.SparkSession.hiveClassesArePresent
|
||||
* @return
|
||||
*/
|
||||
private boolean hiveClassesArePresent() {
|
||||
try {
|
||||
this.getClass().forName("org.apache.spark.sql.hive.HiveSessionState");
|
||||
this.getClass().forName("org.apache.spark.sql.hive.HiveSharedState");
|
||||
this.getClass().forName("org.apache.hadoop.hive.conf.HiveConf");
|
||||
return true;
|
||||
} catch (ClassNotFoundException | NoClassDefFoundError e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean importImplicit() {
|
||||
return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.importImplicit"));
|
||||
}
|
||||
|
||||
public Object getSparkSession() {
|
||||
synchronized (sharedInterpreterLock) {
|
||||
if (sparkSession == null) {
|
||||
createSparkSession();
|
||||
}
|
||||
return sparkSession;
|
||||
}
|
||||
}
|
||||
|
||||
public SQLContext getSQLContext() {
|
||||
synchronized (sharedInterpreterLock) {
|
||||
if (Utils.isSpark2()) {
|
||||
return getSQLContext_2();
|
||||
} else {
|
||||
return getSQLContext_1();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get SQLContext for spark 2.x
|
||||
*/
|
||||
private SQLContext getSQLContext_2() {
|
||||
if (sqlc == null) {
|
||||
sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "wrapped");
|
||||
if (sqlc == null) {
|
||||
if (useHiveContext()) {
|
||||
String name = "org.apache.spark.sql.hive.HiveContext";
|
||||
Constructor<?> hc;
|
||||
try {
|
||||
hc = getClass().getClassLoader().loadClass(name)
|
||||
.getConstructor(SparkContext.class);
|
||||
sqlc = (SQLContext) hc.newInstance(getSparkContext());
|
||||
} catch (NoSuchMethodException | SecurityException
|
||||
| ClassNotFoundException | InstantiationException
|
||||
| IllegalAccessException | IllegalArgumentException
|
||||
| InvocationTargetException e) {
|
||||
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
|
||||
// when hive dependency is not loaded, it'll fail.
|
||||
// in this case SQLContext can be used.
|
||||
sqlc = new SQLContext(getSparkContext());
|
||||
}
|
||||
} else {
|
||||
sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "sqlContext");
|
||||
}
|
||||
}
|
||||
return sqlc;
|
||||
}
|
||||
|
||||
public SQLContext getSQLContext_1() {
|
||||
if (sqlc == null) {
|
||||
if (useHiveContext()) {
|
||||
String name = "org.apache.spark.sql.hive.HiveContext";
|
||||
Constructor<?> hc;
|
||||
try {
|
||||
hc = getClass().getClassLoader().loadClass(name)
|
||||
.getConstructor(SparkContext.class);
|
||||
sqlc = (SQLContext) hc.newInstance(getSparkContext());
|
||||
} catch (NoSuchMethodException | SecurityException
|
||||
| ClassNotFoundException | InstantiationException
|
||||
| IllegalAccessException | IllegalArgumentException
|
||||
| InvocationTargetException e) {
|
||||
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
|
||||
// when hive dependency is not loaded, it'll fail.
|
||||
// in this case SQLContext can be used.
|
||||
sqlc = new SQLContext(getSparkContext());
|
||||
}
|
||||
} else {
|
||||
sqlc = new SQLContext(getSparkContext());
|
||||
}
|
||||
return sqlc;
|
||||
}
|
||||
return sqlc;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -250,7 +294,80 @@ public class SparkInterpreter extends Interpreter {
|
|||
return (DepInterpreter) p;
|
||||
}
|
||||
|
||||
/**
|
||||
* Spark 2.x
|
||||
* Create SparkSession
|
||||
*/
|
||||
public Object createSparkSession() {
|
||||
logger.info("------ Create new SparkContext {} -------", getProperty("master"));
|
||||
String execUri = System.getenv("SPARK_EXECUTOR_URI");
|
||||
conf.setAppName(getProperty("spark.app.name"));
|
||||
|
||||
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath());
|
||||
|
||||
if (execUri != null) {
|
||||
conf.set("spark.executor.uri", execUri);
|
||||
}
|
||||
|
||||
if (System.getenv("SPARK_HOME") != null) {
|
||||
conf.setSparkHome(System.getenv("SPARK_HOME"));
|
||||
}
|
||||
|
||||
conf.set("spark.scheduler.mode", "FAIR");
|
||||
conf.setMaster(getProperty("master"));
|
||||
|
||||
Properties intpProperty = getProperty();
|
||||
|
||||
for (Object k : intpProperty.keySet()) {
|
||||
String key = (String) k;
|
||||
String val = toString(intpProperty.get(key));
|
||||
if (!key.startsWith("spark.") || !val.trim().isEmpty()) {
|
||||
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
|
||||
conf.set(key, val);
|
||||
}
|
||||
}
|
||||
|
||||
Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
|
||||
Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
|
||||
Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
|
||||
|
||||
if (useHiveContext()) {
|
||||
if (hiveClassesArePresent()) {
|
||||
Utils.invokeMethod(builder, "enableHiveSupport");
|
||||
sparkSession = Utils.invokeMethod(builder, "getOrCreate");
|
||||
logger.info("Created Spark session with Hive support");
|
||||
} else {
|
||||
Utils.invokeMethod(builder, "config",
|
||||
new Class[]{ String.class, String.class},
|
||||
new Object[]{ "spark.sql.catalogImplementation", "in-memory"});
|
||||
sparkSession = Utils.invokeMethod(builder, "getOrCreate");
|
||||
logger.info("Created Spark session with Hive support");
|
||||
}
|
||||
} else {
|
||||
sparkSession = Utils.invokeMethod(builder, "getOrCreate");
|
||||
logger.info("Created Spark session");
|
||||
}
|
||||
|
||||
return sparkSession;
|
||||
}
|
||||
|
||||
public SparkContext createSparkContext() {
|
||||
if (Utils.isSpark2()) {
|
||||
return createSparkContext_2();
|
||||
} else {
|
||||
return createSparkContext_1();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create SparkContext for spark 2.x
|
||||
* @return
|
||||
*/
|
||||
private SparkContext createSparkContext_2() {
|
||||
return (SparkContext) Utils.invokeMethod(sparkSession, "sparkContext");
|
||||
}
|
||||
|
||||
public SparkContext createSparkContext_1() {
|
||||
logger.info("------ Create new SparkContext {} -------", getProperty("master"));
|
||||
|
||||
String execUri = System.getenv("SPARK_EXECUTOR_URI");
|
||||
|
|
@ -267,8 +384,8 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
try { // in case of spark 1.1x, spark 1.2x
|
||||
Method classServer = intp.getClass().getMethod("classServer");
|
||||
HttpServer httpServer = (HttpServer) classServer.invoke(intp);
|
||||
classServerUri = httpServer.uri();
|
||||
Object httpServer = classServer.invoke(intp);
|
||||
classServerUri = (String) Utils.invokeMethod(httpServer, "uri");
|
||||
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
|
||||
| IllegalArgumentException | InvocationTargetException e) {
|
||||
// continue
|
||||
|
|
@ -290,14 +407,12 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
if (Utils.isScala2_11()) {
|
||||
classServer = createHttpServer(outputDir);
|
||||
classServer.start();
|
||||
classServerUri = classServer.uri();
|
||||
Utils.invokeMethod(classServer, "start");
|
||||
classServerUri = (String) Utils.invokeMethod(classServer, "uri");
|
||||
}
|
||||
|
||||
SparkConf conf =
|
||||
new SparkConf()
|
||||
.setMaster(getProperty("master"))
|
||||
.setAppName(getProperty("spark.app.name"));
|
||||
conf.setMaster(getProperty("master"))
|
||||
.setAppName(getProperty("spark.app.name"));
|
||||
|
||||
if (classServerUri != null) {
|
||||
conf.set("spark.repl.class.uri", classServerUri);
|
||||
|
|
@ -409,6 +524,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void open() {
|
||||
conf = new SparkConf();
|
||||
URL[] urls = getClassloaderUrls();
|
||||
|
||||
// Very nice discussion about how scala compiler handle classpath
|
||||
|
|
@ -535,7 +651,19 @@ public class SparkInterpreter extends Interpreter {
|
|||
b.v_$eq(true);
|
||||
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
|
||||
|
||||
System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$");
|
||||
/* Required for scoped mode.
|
||||
* In scoped mode multiple scala compiler (repl) generates class in the same directory.
|
||||
* Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
|
||||
* Therefore it's possible to generated class conflict(overwrite) with other repl generated
|
||||
* class.
|
||||
*
|
||||
* To prevent generated class name conflict,
|
||||
* change prefix of generated class name from each scala compiler (repl) instance.
|
||||
*
|
||||
* In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
|
||||
* ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
|
||||
*/
|
||||
System.setProperty("scala.repl.name.line", "$line" + this.hashCode());
|
||||
|
||||
// To prevent 'File name too long' error on some file system.
|
||||
MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName();
|
||||
|
|
@ -582,6 +710,9 @@ public class SparkInterpreter extends Interpreter {
|
|||
new Object[]{intp});
|
||||
}
|
||||
|
||||
if (Utils.isSpark2()) {
|
||||
sparkSession = getSparkSession();
|
||||
}
|
||||
sc = getSparkContext();
|
||||
if (sc.getPoolForName("fair").isEmpty()) {
|
||||
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
|
||||
|
|
@ -611,6 +742,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
binder.put("sqlc", sqlc);
|
||||
binder.put("z", z);
|
||||
|
||||
if (Utils.isSpark2()) {
|
||||
binder.put("spark", sparkSession);
|
||||
}
|
||||
|
||||
interpret("@transient val z = "
|
||||
+ "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
|
||||
interpret("@transient val sc = "
|
||||
|
|
@ -619,15 +754,27 @@ public class SparkInterpreter extends Interpreter {
|
|||
+ "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
|
||||
interpret("@transient val sqlContext = "
|
||||
+ "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
|
||||
|
||||
if (Utils.isSpark2()) {
|
||||
interpret("@transient val spark = "
|
||||
+ "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]");
|
||||
}
|
||||
|
||||
interpret("import org.apache.spark.SparkContext._");
|
||||
|
||||
if (importImplicit()) {
|
||||
if (sparkVersion.oldSqlContextImplicits()) {
|
||||
interpret("import sqlContext._");
|
||||
} else {
|
||||
interpret("import sqlContext.implicits._");
|
||||
interpret("import sqlContext.sql");
|
||||
if (Utils.isSpark2()) {
|
||||
interpret("import spark.implicits._");
|
||||
interpret("import spark.sql");
|
||||
interpret("import org.apache.spark.sql.functions._");
|
||||
} else {
|
||||
if (sparkVersion.oldSqlContextImplicits()) {
|
||||
interpret("import sqlContext._");
|
||||
} else {
|
||||
interpret("import sqlContext.implicits._");
|
||||
interpret("import sqlContext.sql");
|
||||
interpret("import org.apache.spark.sql.functions._");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -825,6 +972,9 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
public Object getLastObject() {
|
||||
IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
|
||||
if (r == null || r.lineRep() == null) {
|
||||
return null;
|
||||
}
|
||||
Object obj = r.lineRep().call("$result",
|
||||
JavaConversions.asScalaBuffer(new LinkedList<Object>()));
|
||||
return obj;
|
||||
|
|
@ -955,7 +1105,18 @@ public class SparkInterpreter extends Interpreter {
|
|||
return;
|
||||
}
|
||||
|
||||
Object lastObj = getValue(varName);
|
||||
Object lastObj = null;
|
||||
try {
|
||||
if (Utils.isScala2_10()) {
|
||||
lastObj = getValue(varName);
|
||||
} else {
|
||||
lastObj = getLastObject();
|
||||
}
|
||||
} catch (NullPointerException e) {
|
||||
// Some case, scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call throws an NPE
|
||||
logger.error(e.getMessage(), e);
|
||||
}
|
||||
|
||||
if (lastObj != null) {
|
||||
ResourcePool resourcePool = context.getResourcePool();
|
||||
resourcePool.put(context.getNoteId(), context.getParagraphId(),
|
||||
|
|
@ -1100,7 +1261,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
sc.stop();
|
||||
sc = null;
|
||||
if (classServer != null) {
|
||||
classServer.stop();
|
||||
Utils.invokeMethod(classServer, "stop");
|
||||
classServer = null;
|
||||
}
|
||||
}
|
||||
|
|
@ -1153,16 +1314,16 @@ public class SparkInterpreter extends Interpreter {
|
|||
return file;
|
||||
}
|
||||
|
||||
private HttpServer createHttpServer(File outputDir) {
|
||||
private Object createHttpServer(File outputDir) {
|
||||
SparkConf conf = new SparkConf();
|
||||
try {
|
||||
// try to create HttpServer
|
||||
Constructor<?> constructor = getClass().getClassLoader()
|
||||
.loadClass(HttpServer.class.getName())
|
||||
.loadClass("org.apache.spark.HttpServer")
|
||||
.getConstructor(new Class[]{
|
||||
SparkConf.class, File.class, SecurityManager.class, int.class, String.class});
|
||||
|
||||
return (HttpServer) constructor.newInstance(new Object[] {
|
||||
return constructor.newInstance(new Object[] {
|
||||
conf, outputDir, new SecurityManager(conf), 0, "HTTP Server"});
|
||||
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
|
||||
InstantiationException | InvocationTargetException e) {
|
||||
|
|
@ -1170,10 +1331,10 @@ public class SparkInterpreter extends Interpreter {
|
|||
Constructor<?> constructor = null;
|
||||
try {
|
||||
constructor = getClass().getClassLoader()
|
||||
.loadClass(HttpServer.class.getName())
|
||||
.loadClass("org.apache.spark.HttpServer")
|
||||
.getConstructor(new Class[]{
|
||||
File.class, SecurityManager.class, int.class, String.class});
|
||||
return (HttpServer) constructor.newInstance(new Object[] {
|
||||
return constructor.newInstance(new Object[] {
|
||||
outputDir, new SecurityManager(conf), 0, "HTTP Server"});
|
||||
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
|
||||
InstantiationException | InvocationTargetException e1) {
|
||||
|
|
|
|||
|
|
@ -89,4 +89,13 @@ class Utils {
|
|||
static boolean isScala2_11() {
|
||||
return !isScala2_10();
|
||||
}
|
||||
|
||||
static boolean isSpark2() {
|
||||
try {
|
||||
Utils.class.forName("org.apache.spark.sql.SparkSession");
|
||||
return true;
|
||||
} catch (ClassNotFoundException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,6 @@ import java.util.List;
|
|||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.spark.sql.catalyst.expressions.Attribute;
|
||||
import org.apache.spark.sql.hive.HiveContext;
|
||||
import org.apache.zeppelin.annotation.ZeppelinApi;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
|
|
@ -70,7 +69,6 @@ public class ZeppelinContext {
|
|||
|
||||
public SparkContext sc;
|
||||
public SQLContext sqlContext;
|
||||
public HiveContext hiveContext;
|
||||
private GUI gui;
|
||||
|
||||
@ZeppelinApi
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ from pyspark.broadcast import Broadcast
|
|||
from pyspark.serializers import MarshalSerializer, PickleSerializer
|
||||
|
||||
# for back compatibility
|
||||
from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
|
||||
from pyspark.sql import SQLContext, HiveContext, Row
|
||||
|
||||
class Logger(object):
|
||||
def __init__(self):
|
||||
|
|
@ -107,6 +107,7 @@ class PyZeppelinContext(dict):
|
|||
class SparkVersion(object):
|
||||
SPARK_1_4_0 = 140
|
||||
SPARK_1_3_0 = 130
|
||||
SPARK_2_0_0 = 200
|
||||
|
||||
def __init__(self, versionNumber):
|
||||
self.version = versionNumber
|
||||
|
|
@ -117,6 +118,9 @@ class SparkVersion(object):
|
|||
def isImportAllPackageUnderSparkSql(self):
|
||||
return self.version >= self.SPARK_1_3_0
|
||||
|
||||
def isSpark2(self):
|
||||
return self.version >= self.SPARK_2_0_0
|
||||
|
||||
class PySparkCompletion:
|
||||
def __init__(self, interpreterObject):
|
||||
self.interpreterObject = interpreterObject
|
||||
|
|
@ -175,6 +179,12 @@ sys.stderr = output
|
|||
client = GatewayClient(port=int(sys.argv[1]))
|
||||
sparkVersion = SparkVersion(int(sys.argv[2]))
|
||||
|
||||
if sparkVersion.isSpark2():
|
||||
from pyspark.sql import SparkSession
|
||||
else:
|
||||
from pyspark.sql import SchemaRDD
|
||||
|
||||
|
||||
if sparkVersion.isAutoConvertEnabled():
|
||||
gateway = JavaGateway(client, auto_convert = True)
|
||||
else:
|
||||
|
|
@ -209,6 +219,9 @@ sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
|
|||
sqlc = SQLContext(sc, intp.getSQLContext())
|
||||
sqlContext = sqlc
|
||||
|
||||
if sparkVersion.isSpark2():
|
||||
spark = SparkSession(sc, intp.getSparkSession())
|
||||
|
||||
completion = PySparkCompletion(intp)
|
||||
z = PyZeppelinContext(intp.getZeppelinContext())
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import org.apache.spark.SparkContext;
|
|||
import org.apache.spark.repl.SparkILoop;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.resource.LocalResourcePool;
|
||||
import org.apache.zeppelin.resource.WellKnownResourceName;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
|
|
@ -176,6 +177,17 @@ public class SparkInterpreterTest {
|
|||
assertNotNull(SparkInterpreter.setupListeners(sc));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateDataFrame() {
|
||||
repl.interpret("case class Person(name:String, age:Int)\n", context);
|
||||
repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
|
||||
repl.interpret("people.toDF.count", context);
|
||||
assertEquals(new Long(4), context.getResourcePool().get(
|
||||
context.getNoteId(),
|
||||
context.getParagraphId(),
|
||||
WellKnownResourceName.ZeppelinReplResult.toString()).get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSparkSql(){
|
||||
repl.interpret("case class Person(name:String, age:Int)\n", context);
|
||||
|
|
|
|||
Loading…
Reference in a new issue