Spark 2.0 support

This commit is contained in:
Lee moon soo 2016-07-16 08:51:16 +09:00
parent 483a89705b
commit f06a2fa811
8 changed files with 267 additions and 57 deletions

View file

@ -33,13 +33,17 @@ addons:
matrix:
include:
# Test all modules with spark-2.0.0-preview and scala 2.11
- jdk: "oraclejdk7"
env: SCALA_VER="2.11" SPARK_VER="2.0.0-preview" HADOOP_VER="2.3" PROFILE="-Pspark-2.0 -Dspark.version=2.0.0-preview -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS="-Dpython.test.exclude=''"
# Test all modules with scala 2.10
- jdk: "oraclejdk7"
env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples" BUILD_FLAG="package -Dscala-2.10 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
env: SCALA_VER="2.10" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
# Test all modules with scala 2.11
- jdk: "oraclejdk7"
env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Dscala-2.11 -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
env: SCALA_VER="2.11" SPARK_VER="1.6.1" HADOOP_VER="2.3" PROFILE="-Pspark-1.6 -Pr -Phadoop-2.3 -Ppyspark -Psparkr -Pscalding -Pexamples -Pscala-2.11" BUILD_FLAG="package -Pbuild-distr" TEST_FLAG="verify -Pusing-packaged-distr" TEST_PROJECTS=""
# Test spark module for 1.5.2
- jdk: "oraclejdk7"

View file

@ -283,12 +283,6 @@
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
@ -517,9 +511,6 @@
<profile>
<id>spark-1.6</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.version>1.6.1</spark.version>
<py4j.version>0.9</py4j.version>
@ -529,6 +520,19 @@
</properties>
</profile>
<profile>
<id>spark-2.0</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<spark.version>2.0.0</spark.version>
<protobuf.version>2.5.0</protobuf.version>
<py4j.version>0.10.1</py4j.version>
<scala.version>2.11.8</scala.version>
</properties>
</profile>
<profile>
<id>hadoop-0.23</id>
<!-- SPARK-1121: Adds an explicit dependency on Avro to work around a

View file

@ -528,6 +528,15 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
}
}
public Object getSparkSession() {
SparkInterpreter intp = getSparkInterpreter();
if (intp == null) {
return null;
} else {
return intp.getSparkSession();
}
}
public SparkConf getSparkConf() {
JavaSparkContext sc = getJavaSparkContext();
if (sc == null) {

View file

@ -32,9 +32,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Joiner;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import org.apache.spark.HttpServer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.SparkEnv;
@ -99,9 +96,11 @@ public class SparkInterpreter extends Interpreter {
* intp - scala.tools.nsc.interpreter.IMain; (scala 2.11)
*/
private Object intp;
private SparkConf conf;
private static SparkContext sc;
private static SQLContext sqlc;
private static SparkEnv env;
private static Object sparkSession; // spark 2.x
private static JobProgressListener sparkListener;
private static AbstractFile classOutputDir;
private static Integer sharedInterpreterLock = new Integer(0);
@ -118,7 +117,7 @@ public class SparkInterpreter extends Interpreter {
private Map<String, Object> binder;
private SparkVersion sparkVersion;
private File outputDir; // class outputdir for scala 2.11
private HttpServer classServer; // classserver for scala 2.11
private Object classServer; // classserver for scala 2.11
public SparkInterpreter(Properties property) {
@ -194,35 +193,80 @@ public class SparkInterpreter extends Interpreter {
return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext"));
}
/**
* See org.apache.spark.sql.SparkSession.hiveClassesArePresent
* @return
*/
private boolean hiveClassesArePresent() {
try {
this.getClass().forName("org.apache.spark.sql.hive.HiveSessionState");
this.getClass().forName("org.apache.spark.sql.hive.HiveSharedState");
this.getClass().forName("org.apache.hadoop.hive.conf.HiveConf");
return true;
} catch (ClassNotFoundException | NoClassDefFoundError e) {
return false;
}
}
private boolean importImplicit() {
return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.importImplicit"));
}
public Object getSparkSession() {
synchronized (sharedInterpreterLock) {
if (sparkSession == null) {
createSparkSession();
}
return sparkSession;
}
}
public SQLContext getSQLContext() {
synchronized (sharedInterpreterLock) {
if (Utils.isSpark2()) {
return getSQLContext_2();
} else {
return getSQLContext_1();
}
}
}
/**
* Get SQLContext for spark 2.x
*/
private SQLContext getSQLContext_2() {
if (sqlc == null) {
sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "wrapped");
if (sqlc == null) {
if (useHiveContext()) {
String name = "org.apache.spark.sql.hive.HiveContext";
Constructor<?> hc;
try {
hc = getClass().getClassLoader().loadClass(name)
.getConstructor(SparkContext.class);
sqlc = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
sqlc = new SQLContext(getSparkContext());
}
} else {
sqlc = (SQLContext) Utils.invokeMethod(sparkSession, "sqlContext");
}
}
return sqlc;
}
public SQLContext getSQLContext_1() {
if (sqlc == null) {
if (useHiveContext()) {
String name = "org.apache.spark.sql.hive.HiveContext";
Constructor<?> hc;
try {
hc = getClass().getClassLoader().loadClass(name)
.getConstructor(SparkContext.class);
sqlc = (SQLContext) hc.newInstance(getSparkContext());
} catch (NoSuchMethodException | SecurityException
| ClassNotFoundException | InstantiationException
| IllegalAccessException | IllegalArgumentException
| InvocationTargetException e) {
logger.warn("Can't create HiveContext. Fallback to SQLContext", e);
// when hive dependency is not loaded, it'll fail.
// in this case SQLContext can be used.
sqlc = new SQLContext(getSparkContext());
}
} else {
sqlc = new SQLContext(getSparkContext());
}
return sqlc;
}
return sqlc;
}
@ -250,7 +294,80 @@ public class SparkInterpreter extends Interpreter {
return (DepInterpreter) p;
}
/**
* Spark 2.x
* Create SparkSession
*/
public Object createSparkSession() {
logger.info("------ Create new SparkContext {} -------", getProperty("master"));
String execUri = System.getenv("SPARK_EXECUTOR_URI");
conf.setAppName(getProperty("spark.app.name"));
conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath());
if (execUri != null) {
conf.set("spark.executor.uri", execUri);
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"));
}
conf.set("spark.scheduler.mode", "FAIR");
conf.setMaster(getProperty("master"));
Properties intpProperty = getProperty();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));
if (!key.startsWith("spark.") || !val.trim().isEmpty()) {
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
conf.set(key, val);
}
}
Class SparkSession = Utils.findClass("org.apache.spark.sql.SparkSession");
Object builder = Utils.invokeStaticMethod(SparkSession, "builder");
Utils.invokeMethod(builder, "config", new Class[]{ SparkConf.class }, new Object[]{ conf });
if (useHiveContext()) {
if (hiveClassesArePresent()) {
Utils.invokeMethod(builder, "enableHiveSupport");
sparkSession = Utils.invokeMethod(builder, "getOrCreate");
logger.info("Created Spark session with Hive support");
} else {
Utils.invokeMethod(builder, "config",
new Class[]{ String.class, String.class},
new Object[]{ "spark.sql.catalogImplementation", "in-memory"});
sparkSession = Utils.invokeMethod(builder, "getOrCreate");
logger.info("Created Spark session with Hive support");
}
} else {
sparkSession = Utils.invokeMethod(builder, "getOrCreate");
logger.info("Created Spark session");
}
return sparkSession;
}
public SparkContext createSparkContext() {
if (Utils.isSpark2()) {
return createSparkContext_2();
} else {
return createSparkContext_1();
}
}
/**
* Create SparkContext for spark 2.x
* @return
*/
private SparkContext createSparkContext_2() {
return (SparkContext) Utils.invokeMethod(sparkSession, "sparkContext");
}
public SparkContext createSparkContext_1() {
logger.info("------ Create new SparkContext {} -------", getProperty("master"));
String execUri = System.getenv("SPARK_EXECUTOR_URI");
@ -267,8 +384,8 @@ public class SparkInterpreter extends Interpreter {
try { // in case of spark 1.1x, spark 1.2x
Method classServer = intp.getClass().getMethod("classServer");
HttpServer httpServer = (HttpServer) classServer.invoke(intp);
classServerUri = httpServer.uri();
Object httpServer = classServer.invoke(intp);
classServerUri = (String) Utils.invokeMethod(httpServer, "uri");
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
// continue
@ -290,14 +407,12 @@ public class SparkInterpreter extends Interpreter {
if (Utils.isScala2_11()) {
classServer = createHttpServer(outputDir);
classServer.start();
classServerUri = classServer.uri();
Utils.invokeMethod(classServer, "start");
classServerUri = (String) Utils.invokeMethod(classServer, "uri");
}
SparkConf conf =
new SparkConf()
.setMaster(getProperty("master"))
.setAppName(getProperty("spark.app.name"));
conf.setMaster(getProperty("master"))
.setAppName(getProperty("spark.app.name"));
if (classServerUri != null) {
conf.set("spark.repl.class.uri", classServerUri);
@ -409,6 +524,7 @@ public class SparkInterpreter extends Interpreter {
@Override
public void open() {
conf = new SparkConf();
URL[] urls = getClassloaderUrls();
// Very nice discussion about how scala compiler handle classpath
@ -535,7 +651,19 @@ public class SparkInterpreter extends Interpreter {
b.v_$eq(true);
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
System.setProperty("scala.repl.name.line", "line" + this.hashCode() + "$");
/* Required for scoped mode.
* In scoped mode multiple scala compiler (repl) generates class in the same directory.
* Class names is not randomly generated and look like '$line12.$read$$iw$$iw'
* Therefore it's possible to generated class conflict(overwrite) with other repl generated
* class.
*
* To prevent generated class name conflict,
* change prefix of generated class name from each scala compiler (repl) instance.
*
* In Spark 2.x, REPL generated wrapper class name should compatible with the pattern
* ^(\$line(?:\d+)\.\$read)(?:\$\$iw)+$
*/
System.setProperty("scala.repl.name.line", "$line" + this.hashCode());
// To prevent 'File name too long' error on some file system.
MutableSettings.IntSetting numClassFileSetting = settings.maxClassfileName();
@ -582,6 +710,9 @@ public class SparkInterpreter extends Interpreter {
new Object[]{intp});
}
if (Utils.isSpark2()) {
sparkSession = getSparkSession();
}
sc = getSparkContext();
if (sc.getPoolForName("fair").isEmpty()) {
Value schedulingMode = org.apache.spark.scheduler.SchedulingMode.FAIR();
@ -611,6 +742,10 @@ public class SparkInterpreter extends Interpreter {
binder.put("sqlc", sqlc);
binder.put("z", z);
if (Utils.isSpark2()) {
binder.put("spark", sparkSession);
}
interpret("@transient val z = "
+ "_binder.get(\"z\").asInstanceOf[org.apache.zeppelin.spark.ZeppelinContext]");
interpret("@transient val sc = "
@ -619,15 +754,27 @@ public class SparkInterpreter extends Interpreter {
+ "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
interpret("@transient val sqlContext = "
+ "_binder.get(\"sqlc\").asInstanceOf[org.apache.spark.sql.SQLContext]");
if (Utils.isSpark2()) {
interpret("@transient val spark = "
+ "_binder.get(\"spark\").asInstanceOf[org.apache.spark.sql.SparkSession]");
}
interpret("import org.apache.spark.SparkContext._");
if (importImplicit()) {
if (sparkVersion.oldSqlContextImplicits()) {
interpret("import sqlContext._");
} else {
interpret("import sqlContext.implicits._");
interpret("import sqlContext.sql");
if (Utils.isSpark2()) {
interpret("import spark.implicits._");
interpret("import spark.sql");
interpret("import org.apache.spark.sql.functions._");
} else {
if (sparkVersion.oldSqlContextImplicits()) {
interpret("import sqlContext._");
} else {
interpret("import sqlContext.implicits._");
interpret("import sqlContext.sql");
interpret("import org.apache.spark.sql.functions._");
}
}
}
}
@ -825,6 +972,9 @@ public class SparkInterpreter extends Interpreter {
public Object getLastObject() {
IMain.Request r = (IMain.Request) Utils.invokeMethod(intp, "lastRequest");
if (r == null || r.lineRep() == null) {
return null;
}
Object obj = r.lineRep().call("$result",
JavaConversions.asScalaBuffer(new LinkedList<Object>()));
return obj;
@ -955,7 +1105,18 @@ public class SparkInterpreter extends Interpreter {
return;
}
Object lastObj = getValue(varName);
Object lastObj = null;
try {
if (Utils.isScala2_10()) {
lastObj = getValue(varName);
} else {
lastObj = getLastObject();
}
} catch (NullPointerException e) {
// Some case, scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call throws an NPE
logger.error(e.getMessage(), e);
}
if (lastObj != null) {
ResourcePool resourcePool = context.getResourcePool();
resourcePool.put(context.getNoteId(), context.getParagraphId(),
@ -1100,7 +1261,7 @@ public class SparkInterpreter extends Interpreter {
sc.stop();
sc = null;
if (classServer != null) {
classServer.stop();
Utils.invokeMethod(classServer, "stop");
classServer = null;
}
}
@ -1153,16 +1314,16 @@ public class SparkInterpreter extends Interpreter {
return file;
}
private HttpServer createHttpServer(File outputDir) {
private Object createHttpServer(File outputDir) {
SparkConf conf = new SparkConf();
try {
// try to create HttpServer
Constructor<?> constructor = getClass().getClassLoader()
.loadClass(HttpServer.class.getName())
.loadClass("org.apache.spark.HttpServer")
.getConstructor(new Class[]{
SparkConf.class, File.class, SecurityManager.class, int.class, String.class});
return (HttpServer) constructor.newInstance(new Object[] {
return constructor.newInstance(new Object[] {
conf, outputDir, new SecurityManager(conf), 0, "HTTP Server"});
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
InstantiationException | InvocationTargetException e) {
@ -1170,10 +1331,10 @@ public class SparkInterpreter extends Interpreter {
Constructor<?> constructor = null;
try {
constructor = getClass().getClassLoader()
.loadClass(HttpServer.class.getName())
.loadClass("org.apache.spark.HttpServer")
.getConstructor(new Class[]{
File.class, SecurityManager.class, int.class, String.class});
return (HttpServer) constructor.newInstance(new Object[] {
return constructor.newInstance(new Object[] {
outputDir, new SecurityManager(conf), 0, "HTTP Server"});
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
InstantiationException | InvocationTargetException e1) {

View file

@ -89,4 +89,13 @@ class Utils {
static boolean isScala2_11() {
return !isScala2_10();
}
static boolean isSpark2() {
try {
Utils.class.forName("org.apache.spark.sql.SparkSession");
return true;
} catch (ClassNotFoundException e) {
return false;
}
}
}

View file

@ -31,7 +31,6 @@ import java.util.List;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
@ -70,7 +69,6 @@ public class ZeppelinContext {
public SparkContext sc;
public SQLContext sqlContext;
public HiveContext hiveContext;
private GUI gui;
@ZeppelinApi

View file

@ -29,7 +29,7 @@ from pyspark.broadcast import Broadcast
from pyspark.serializers import MarshalSerializer, PickleSerializer
# for back compatibility
from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row
from pyspark.sql import SQLContext, HiveContext, Row
class Logger(object):
def __init__(self):
@ -107,6 +107,7 @@ class PyZeppelinContext(dict):
class SparkVersion(object):
SPARK_1_4_0 = 140
SPARK_1_3_0 = 130
SPARK_2_0_0 = 200
def __init__(self, versionNumber):
self.version = versionNumber
@ -117,6 +118,9 @@ class SparkVersion(object):
def isImportAllPackageUnderSparkSql(self):
return self.version >= self.SPARK_1_3_0
def isSpark2(self):
return self.version >= self.SPARK_2_0_0
class PySparkCompletion:
def __init__(self, interpreterObject):
self.interpreterObject = interpreterObject
@ -175,6 +179,12 @@ sys.stderr = output
client = GatewayClient(port=int(sys.argv[1]))
sparkVersion = SparkVersion(int(sys.argv[2]))
if sparkVersion.isSpark2():
from pyspark.sql import SparkSession
else:
from pyspark.sql import SchemaRDD
if sparkVersion.isAutoConvertEnabled():
gateway = JavaGateway(client, auto_convert = True)
else:
@ -209,6 +219,9 @@ sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
sqlc = SQLContext(sc, intp.getSQLContext())
sqlContext = sqlc
if sparkVersion.isSpark2():
spark = SparkSession(sc, intp.getSparkSession())
completion = PySparkCompletion(intp)
z = PyZeppelinContext(intp.getZeppelinContext())

View file

@ -30,6 +30,7 @@ import org.apache.spark.SparkContext;
import org.apache.spark.repl.SparkILoop;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.resource.WellKnownResourceName;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.*;
@ -176,6 +177,17 @@ public class SparkInterpreterTest {
assertNotNull(SparkInterpreter.setupListeners(sc));
}
@Test
public void testCreateDataFrame() {
repl.interpret("case class Person(name:String, age:Int)\n", context);
repl.interpret("val people = sc.parallelize(Seq(Person(\"moon\", 33), Person(\"jobs\", 51), Person(\"gates\", 51), Person(\"park\", 34)))\n", context);
repl.interpret("people.toDF.count", context);
assertEquals(new Long(4), context.getResourcePool().get(
context.getNoteId(),
context.getParagraphId(),
WellKnownResourceName.ZeppelinReplResult.toString()).get());
}
@Test
public void testSparkSql(){
repl.interpret("case class Person(name:String, age:Int)\n", context);