[ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node

- Followed spark's way to support pyspark
- https://issues.apache.org/jira/browse/SPARK-6869
- https://github.com/apache/spark/pull/5580
- https://github.com/apache/spark/pull/5478/files
This commit is contained in:
Jongyoul Lee 2015-06-24 15:08:12 +09:00
parent 1b192f60e4
commit 0ddb4366b9
4 changed files with 49 additions and 25 deletions

View file

@ -73,6 +73,16 @@ if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then
$(mkdir -p "${ZEPPELIN_LOG_DIR}")
fi
if [[ x"" == x${PYTHONPATH} ]]; then
export PYTHONPATH="${ZEPPELIN_HOME}/python/lib/pyspark.zip:${ZEPPELIN_HOME}/python/lib/py4j-0.8.2.1-src.zip"
else
export PYTHONPATH="$PYTHONPATH${ZEPPELIN_HOME}/lib/pyspark.zip:${ZEPPELIN_HOME}/python/lib/py4j-0.8.2.1-src.zip"
fi
if [[ x"" == x${SPARK_HOME} ]]; then
export SPARK_HOME=${ZEPPELIN_HOME}
fi
${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} -cp ${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} &
pid=$!

View file

@ -915,7 +915,7 @@
<executions>
<execution>
<id>download-pyspark-files</id>
<phase>prepare-package</phase>
<phase>validate</phase>
<goals>
<goal>wget</goal>
</goals>
@ -927,6 +927,20 @@
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<configuration>
<filesets>
<fileset>
<directory>${basedir}/../python/build</directory>
</fileset>
<fileset>
<directory>${project.build.direcoty}/spark-dist</directory>
</fileset>
</filesets>
<verbose>true</verbose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
@ -934,18 +948,21 @@
<executions>
<execution>
<id>download-and-zip-pyspark-files</id>
<phase>package</phase>
<phase>generate-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<delete dir="${project.build.directory}/../../python" />
<zip destfile="${project.build.directory}/../../python/pyspark.zip"
basedir="${project.build.directory}/spark-dist/spark-${spark.version}/python/pyspark"/>
<copy
file="${project.build.directory}/spark-dist/spark-${spark.version}/python/lib/py4j-0.8.2.1-src.zip"
todir="${project.build.directory}/../../python"/>
<delete dir="../python" />
<copy todir="../python">
<fileset dir="${project.build.directory}/spark-dist/spark-${spark.version}/python"/>
</copy>
<unzip src="../python/lib/py4j-0.8.2.1-src.zip"
dest="../python/build"/>
<zip destfile="${project.build.directory}/../../python/lib/pyspark.zip"
basedir="${project.build.directory}/spark-dist/spark-${spark.version}/python"
includes="pyspark/*.py,pyspark/**/*.py"/>
</target>
</configuration>
</execution>

View file

@ -159,18 +159,6 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
try {
Map env = EnvironmentUtils.getProcEnvironment();
String pythonPath = (String) env.get("PYTHONPATH");
if (pythonPath == null) {
pythonPath = "";
} else {
pythonPath += ":";
}
pythonPath += getSparkHome() + "/python/lib/py4j-0.8.2.1-src.zip:"
+ getSparkHome() + "/python";
env.put("PYTHONPATH", pythonPath);
executor.execute(cmd, env, this);
pythonscriptRunning = true;
} catch (IOException e) {

View file

@ -26,12 +26,9 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.*;
import com.google.common.base.Joiner;
import org.apache.spark.HttpServer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
@ -272,6 +269,18 @@ public class SparkInterpreter extends Interpreter {
conf.set(key, val);
}
}
//TODO(jongyoul): Move these codes into PySparkInterpreter.java
String zeppelinHome = getSystemDefault("ZEPPELIN_HOME", "zeppelin.home", "../");
File zeppelinPythonLibPath = new File(zeppelinHome, "python/lib");
String[] pythonLibs = new String[] {"pyspark.zip", "py4j-0.8.2.1-src.zip"};
ArrayList<String> pythonLibUris = new ArrayList<>();
for (String lib: pythonLibs) {
pythonLibUris.add(new File(zeppelinPythonLibPath, lib).toURI().toString());
}
conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris));
conf.set("spark.files", conf.get("spark.yarn.dist.files"));
conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs));
SparkContext sparkContext = new SparkContext(conf);
return sparkContext;