Make external libraries to be added to interpreter process classpath

This commit is contained in:
Mina Lee 2016-01-19 18:02:11 -08:00
parent 11a45e2e22
commit 848d931916
12 changed files with 156 additions and 75 deletions

View file

@ -21,10 +21,10 @@ bin=$(cd "${bin}">/dev/null; pwd)
function usage() {
echo "usage) $0 -p <port> -d <directory to load>"
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local repo dir to load>"
}
while getopts "hp:d:" o; do
while getopts "hp:d:l:" o; do
case ${o} in
h)
usage
@ -36,6 +36,9 @@ while getopts "hp:d:" o; do
p)
PORT=${OPTARG}
;;
l)
LOCAL_REPO_DIR=${OPTARG}
;;
esac
done
@ -128,6 +131,8 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
fi
fi
addJarInDir "${LOCAL_REPO_DIR}"
CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
if [[ -n "${SPARK_SUBMIT}" ]]; then

View file

@ -59,9 +59,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import py4j.GatewayServer;
@ -124,12 +121,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
// load libraries from Dependency Interpreter
URL [] urls = new URL[0];
List<URL> urlList = new LinkedList<URL>();
if (depInterpreter != null) {
SparkDependencyContext depc = depInterpreter.getDependencyContext();
if (depc != null) {
List<File> files = depc.getFiles();
List<URL> urlList = new LinkedList<URL>();
if (files != null) {
for (File f : files) {
try {
@ -138,12 +135,29 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
logger.error("Error", e);
}
}
urls = urlList.toArray(urls);
}
}
}
String localRepo = getProperty("zeppelin.interpreter.localRepo");
if (localRepo != null) {
File localRepoDir = new File(localRepo);
if (localRepoDir.exists()) {
File[] files = localRepoDir.listFiles();
if (files != null) {
for (File f : files) {
try {
urlList.add(f.toURI().toURL());
} catch (MalformedURLException e) {
logger.error("Error", e);
}
}
}
}
}
urls = urlList.toArray(urls);
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
try {
URLClassLoader newCl = new URLClassLoader(urls, oldCl);

View file

@ -438,6 +438,23 @@ public class SparkInterpreter extends Interpreter {
}
}
// add dependency from local repo
String localRepo = getProperty("zeppelin.interpreter.localRepo");
if (localRepo != null) {
File localRepoDir = new File(localRepo);
if (localRepoDir.exists()) {
File[] files = localRepoDir.listFiles();
if (files != null) {
for (File f : files) {
if (classpath.length() > 0) {
classpath += File.pathSeparator;
}
classpath += f.getAbsolutePath();
}
}
}
}
pathSettings.v_$eq(classpath);
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
@ -529,7 +546,7 @@ public class SparkInterpreter extends Interpreter {
throw new InterpreterException(e);
}
// add jar
// add jar from DepInterpreter
if (depInterpreter != null) {
SparkDependencyContext depc = depInterpreter.getDependencyContext();
if (depc != null) {
@ -547,6 +564,25 @@ public class SparkInterpreter extends Interpreter {
}
}
}
// add jar from local repo
if (localRepo != null) {
File localRepoDir = new File(localRepo);
if (localRepoDir.exists()) {
File[] files = localRepoDir.listFiles();
if (files != null) {
for (File f : files) {
if (f.getName().toLowerCase().endsWith(".jar")) {
sc.addJar(f.getAbsolutePath());
logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
} else {
sc.addFile(f.getAbsolutePath());
logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
}
}
}
}
}
}
private List<File> currentClassPath() {

View file

@ -53,6 +53,7 @@ public class RemoteInterpreter extends Interpreter {
Gson gson = new Gson();
private String interpreterRunner;
private String interpreterPath;
private String localRepoPath;
private String className;
FormType formType;
boolean initialized;
@ -61,32 +62,36 @@ public class RemoteInterpreter extends Interpreter {
private int connectTimeout;
public RemoteInterpreter(Properties property,
String className,
String interpreterRunner,
String interpreterPath,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
String className,
String interpreterRunner,
String interpreterPath,
String localRepoPath,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.className = className;
initialized = false;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.localRepoPath = localRepoPath;
env = new HashMap<String, String>();
this.connectTimeout = connectTimeout;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
public RemoteInterpreter(Properties property,
String className,
String interpreterRunner,
String interpreterPath,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
String className,
String interpreterRunner,
String interpreterPath,
String localRepoPath,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.className = className;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.localRepoPath = localRepoPath;
this.env = env;
this.connectTimeout = connectTimeout;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
@ -107,8 +112,8 @@ public class RemoteInterpreter extends Interpreter {
if (intpGroup.getRemoteInterpreterProcess() == null) {
// create new remote process
RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
interpreterRunner, interpreterPath, env, connectTimeout,
remoteInterpreterProcessListener);
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
remoteInterpreterProcessListener);
intpGroup.setRemoteInterpreterProcess(remoteProcess);
}
@ -140,6 +145,7 @@ public class RemoteInterpreter extends Interpreter {
try {
for (Interpreter intp : this.getInterpreterGroup()) {
logger.info("Create remote interpreter {}", intp.getClassName());
property.put("zeppelin.interpreter.localRepo", localRepoPath);
client.createInterpreter(intp.getClassName(), (Map) property);
}

View file

@ -45,6 +45,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
private int port = -1;
private final String interpreterRunner;
private final String interpreterDir;
private final String localRepoDir;
private GenericObjectPool<Client> clientPool;
private Map<String, String> env;
@ -53,20 +54,28 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
private int connectTimeout;
public RemoteInterpreterProcess(String intpRunner,
String intpDir,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener listener) {
this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(listener), connectTimeout);
String intpDir,
String localRepoDir,
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener listener) {
this(intpRunner,
intpDir,
localRepoDir,
env,
new RemoteInterpreterEventPoller(listener),
connectTimeout);
}
RemoteInterpreterProcess(String intpRunner,
String intpDir,
String localRepoDir,
Map<String, String> env,
RemoteInterpreterEventPoller remoteInterpreterEventPoller,
int connectTimeout) {
this.interpreterRunner = intpRunner;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
this.env = env;
this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
referenceCount = new AtomicInteger(0);
@ -89,12 +98,13 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
throw new InterpreterException(e1);
}
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
cmdLine.addArgument("-d", false);
cmdLine.addArgument(interpreterDir, false);
cmdLine.addArgument("-p", false);
cmdLine.addArgument(Integer.toString(port), false);
cmdLine.addArgument("-l", false);
cmdLine.addArgument(localRepoDir, false);
executor = new DefaultExecutor();

View file

@ -64,14 +64,15 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
Properties p = new Properties();
intp = new RemoteInterpreter(
p,
MockInterpreterAngular.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
p,
MockInterpreterAngular.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null
);
intpGroup.add(intp);
intp.setInterpreterGroup(intpGroup);

View file

@ -33,7 +33,7 @@ public class RemoteInterpreterProcessTest {
public void testStartStop() {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
"../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
10 * 1000, null);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
@ -50,7 +50,7 @@ public class RemoteInterpreterProcessTest {
public void testClientFactory() throws Exception {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
"../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
mock(RemoteInterpreterEventPoller.class), 10 * 1000);
rip.reference(intpGroup);
assertEquals(0, rip.getNumActiveClient());

View file

@ -69,6 +69,7 @@ public class RemoteInterpreterTest {
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null);
@ -80,6 +81,7 @@ public class RemoteInterpreterTest {
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null);
@ -164,27 +166,27 @@ public class RemoteInterpreterTest {
Properties p = new Properties();
RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
RemoteInterpreter intpB = new RemoteInterpreter(
p,
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
p,
MockInterpreterB.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null);
intpGroup.add(intpB);
intpB.setInterpreterGroup(intpGroup);

View file

@ -64,14 +64,14 @@ public class RemoteSchedulerTest {
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);
@ -148,14 +148,14 @@ public class RemoteSchedulerTest {
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
final RemoteInterpreter intpA = new RemoteInterpreter(
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
env,
10 * 1000,
null
);
p,
MockInterpreterA.class.getName(),
new File("../bin/interpreter.sh").getAbsolutePath(),
"fake",
"fakeRepo",
env,
10 * 1000,
null);
intpGroup.add(intpA);
intpA.setInterpreterGroup(intpGroup);

View file

@ -76,7 +76,8 @@ public class ZeppelinServer extends Application {
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO));
this.depResolver = new DependencyResolver(
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookWsServer,
notebookWsServer, depResolver);

View file

@ -346,6 +346,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER);
}
public String getInterpreterLocalRepoPath() {
return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO);
}
public String getRelativeDir(ConfVars c) {
return getRelativeDir(getString(c));
}
@ -455,6 +459,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.scalding.ScaldingInterpreter,"
+ "org.apache.zeppelin.jdbc.JDBCInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),

View file

@ -393,7 +393,8 @@ public class InterpreterFactory {
if (option.isRemote()) {
intp = createRemoteRepl(info.getPath(),
info.getClassName(),
properties);
properties,
interpreterGroup.id);
} else {
intp = createRepl(info.getPath(),
info.getClassName(),
@ -661,12 +662,12 @@ public class InterpreterFactory {
private Interpreter createRemoteRepl(String interpreterPath, String className,
Properties property) {
Properties property, String interpreterId) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterId;
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
property, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, connectTimeout, remoteInterpreterProcessListener));
interpreterPath, localRepoPath, connectTimeout, remoteInterpreterProcessListener));
return intp;
}