mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Make external libraries to be added to interpreter process classpath
This commit is contained in:
parent
11a45e2e22
commit
848d931916
12 changed files with 156 additions and 75 deletions
|
|
@ -21,10 +21,10 @@ bin=$(cd "${bin}">/dev/null; pwd)
|
|||
|
||||
|
||||
function usage() {
|
||||
echo "usage) $0 -p <port> -d <directory to load>"
|
||||
echo "usage) $0 -p <port> -d <interpreter dir to load> -l <local repo dir to load>"
|
||||
}
|
||||
|
||||
while getopts "hp:d:" o; do
|
||||
while getopts "hp:d:l:" o; do
|
||||
case ${o} in
|
||||
h)
|
||||
usage
|
||||
|
|
@ -36,6 +36,9 @@ while getopts "hp:d:" o; do
|
|||
p)
|
||||
PORT=${OPTARG}
|
||||
;;
|
||||
l)
|
||||
LOCAL_REPO_DIR=${OPTARG}
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
|
|
@ -128,6 +131,8 @@ if [[ "${INTERPRETER_ID}" == "spark" ]]; then
|
|||
fi
|
||||
fi
|
||||
|
||||
addJarInDir "${LOCAL_REPO_DIR}"
|
||||
|
||||
CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
|
||||
|
||||
if [[ -n "${SPARK_SUBMIT}" ]]; then
|
||||
|
|
|
|||
|
|
@ -59,9 +59,6 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonElement;
|
||||
import com.google.gson.JsonParseException;
|
||||
import com.google.gson.JsonParser;
|
||||
|
||||
import py4j.GatewayServer;
|
||||
|
||||
|
|
@ -124,12 +121,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
|
||||
// load libraries from Dependency Interpreter
|
||||
URL [] urls = new URL[0];
|
||||
List<URL> urlList = new LinkedList<URL>();
|
||||
|
||||
if (depInterpreter != null) {
|
||||
SparkDependencyContext depc = depInterpreter.getDependencyContext();
|
||||
if (depc != null) {
|
||||
List<File> files = depc.getFiles();
|
||||
List<URL> urlList = new LinkedList<URL>();
|
||||
if (files != null) {
|
||||
for (File f : files) {
|
||||
try {
|
||||
|
|
@ -138,12 +135,29 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
|
|||
logger.error("Error", e);
|
||||
}
|
||||
}
|
||||
|
||||
urls = urlList.toArray(urls);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String localRepo = getProperty("zeppelin.interpreter.localRepo");
|
||||
if (localRepo != null) {
|
||||
File localRepoDir = new File(localRepo);
|
||||
if (localRepoDir.exists()) {
|
||||
File[] files = localRepoDir.listFiles();
|
||||
if (files != null) {
|
||||
for (File f : files) {
|
||||
try {
|
||||
urlList.add(f.toURI().toURL());
|
||||
} catch (MalformedURLException e) {
|
||||
logger.error("Error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
urls = urlList.toArray(urls);
|
||||
|
||||
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
URLClassLoader newCl = new URLClassLoader(urls, oldCl);
|
||||
|
|
|
|||
|
|
@ -438,6 +438,23 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
// add dependency from local repo
|
||||
String localRepo = getProperty("zeppelin.interpreter.localRepo");
|
||||
if (localRepo != null) {
|
||||
File localRepoDir = new File(localRepo);
|
||||
if (localRepoDir.exists()) {
|
||||
File[] files = localRepoDir.listFiles();
|
||||
if (files != null) {
|
||||
for (File f : files) {
|
||||
if (classpath.length() > 0) {
|
||||
classpath += File.pathSeparator;
|
||||
}
|
||||
classpath += f.getAbsolutePath();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pathSettings.v_$eq(classpath);
|
||||
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
|
||||
|
||||
|
|
@ -529,7 +546,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
// add jar
|
||||
// add jar from DepInterpreter
|
||||
if (depInterpreter != null) {
|
||||
SparkDependencyContext depc = depInterpreter.getDependencyContext();
|
||||
if (depc != null) {
|
||||
|
|
@ -547,6 +564,25 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add jar from local repo
|
||||
if (localRepo != null) {
|
||||
File localRepoDir = new File(localRepo);
|
||||
if (localRepoDir.exists()) {
|
||||
File[] files = localRepoDir.listFiles();
|
||||
if (files != null) {
|
||||
for (File f : files) {
|
||||
if (f.getName().toLowerCase().endsWith(".jar")) {
|
||||
sc.addJar(f.getAbsolutePath());
|
||||
logger.info("sc.addJar(" + f.getAbsolutePath() + ")");
|
||||
} else {
|
||||
sc.addFile(f.getAbsolutePath());
|
||||
logger.info("sc.addFile(" + f.getAbsolutePath() + ")");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private List<File> currentClassPath() {
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
Gson gson = new Gson();
|
||||
private String interpreterRunner;
|
||||
private String interpreterPath;
|
||||
private String localRepoPath;
|
||||
private String className;
|
||||
FormType formType;
|
||||
boolean initialized;
|
||||
|
|
@ -61,32 +62,36 @@ public class RemoteInterpreter extends Interpreter {
|
|||
private int connectTimeout;
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
String localRepoPath,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
super(property);
|
||||
this.className = className;
|
||||
initialized = false;
|
||||
this.interpreterRunner = interpreterRunner;
|
||||
this.interpreterPath = interpreterPath;
|
||||
this.localRepoPath = localRepoPath;
|
||||
env = new HashMap<String, String>();
|
||||
this.connectTimeout = connectTimeout;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
}
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
String className,
|
||||
String interpreterRunner,
|
||||
String interpreterPath,
|
||||
String localRepoPath,
|
||||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
super(property);
|
||||
this.className = className;
|
||||
this.interpreterRunner = interpreterRunner;
|
||||
this.interpreterPath = interpreterPath;
|
||||
this.localRepoPath = localRepoPath;
|
||||
this.env = env;
|
||||
this.connectTimeout = connectTimeout;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
|
|
@ -107,8 +112,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
if (intpGroup.getRemoteInterpreterProcess() == null) {
|
||||
// create new remote process
|
||||
RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess(
|
||||
interpreterRunner, interpreterPath, env, connectTimeout,
|
||||
remoteInterpreterProcessListener);
|
||||
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
|
||||
remoteInterpreterProcessListener);
|
||||
|
||||
intpGroup.setRemoteInterpreterProcess(remoteProcess);
|
||||
}
|
||||
|
|
@ -140,6 +145,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
try {
|
||||
for (Interpreter intp : this.getInterpreterGroup()) {
|
||||
logger.info("Create remote interpreter {}", intp.getClassName());
|
||||
property.put("zeppelin.interpreter.localRepo", localRepoPath);
|
||||
client.createInterpreter(intp.getClassName(), (Map) property);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
private int port = -1;
|
||||
private final String interpreterRunner;
|
||||
private final String interpreterDir;
|
||||
private final String localRepoDir;
|
||||
|
||||
private GenericObjectPool<Client> clientPool;
|
||||
private Map<String, String> env;
|
||||
|
|
@ -53,20 +54,28 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
private int connectTimeout;
|
||||
|
||||
public RemoteInterpreterProcess(String intpRunner,
|
||||
String intpDir,
|
||||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener) {
|
||||
this(intpRunner, intpDir, env, new RemoteInterpreterEventPoller(listener), connectTimeout);
|
||||
String intpDir,
|
||||
String localRepoDir,
|
||||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener) {
|
||||
this(intpRunner,
|
||||
intpDir,
|
||||
localRepoDir,
|
||||
env,
|
||||
new RemoteInterpreterEventPoller(listener),
|
||||
connectTimeout);
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess(String intpRunner,
|
||||
String intpDir,
|
||||
String localRepoDir,
|
||||
Map<String, String> env,
|
||||
RemoteInterpreterEventPoller remoteInterpreterEventPoller,
|
||||
int connectTimeout) {
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
this.env = env;
|
||||
this.interpreterContextRunnerPool = new InterpreterContextRunnerPool();
|
||||
referenceCount = new AtomicInteger(0);
|
||||
|
|
@ -89,12 +98,13 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
throw new InterpreterException(e1);
|
||||
}
|
||||
|
||||
|
||||
CommandLine cmdLine = CommandLine.parse(interpreterRunner);
|
||||
cmdLine.addArgument("-d", false);
|
||||
cmdLine.addArgument(interpreterDir, false);
|
||||
cmdLine.addArgument("-p", false);
|
||||
cmdLine.addArgument(Integer.toString(port), false);
|
||||
cmdLine.addArgument("-l", false);
|
||||
cmdLine.addArgument(localRepoDir, false);
|
||||
|
||||
executor = new DefaultExecutor();
|
||||
|
||||
|
|
|
|||
|
|
@ -64,14 +64,15 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
|
|||
Properties p = new Properties();
|
||||
|
||||
intp = new RemoteInterpreter(
|
||||
p,
|
||||
MockInterpreterAngular.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
env,
|
||||
10 * 1000,
|
||||
null
|
||||
);
|
||||
p,
|
||||
MockInterpreterAngular.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null
|
||||
);
|
||||
|
||||
intpGroup.add(intp);
|
||||
intp.setInterpreterGroup(intpGroup);
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ public class RemoteInterpreterProcessTest {
|
|||
public void testStartStop() {
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
|
||||
"../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
|
||||
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
|
||||
10 * 1000, null);
|
||||
assertFalse(rip.isRunning());
|
||||
assertEquals(0, rip.referenceCount());
|
||||
|
|
@ -50,7 +50,7 @@ public class RemoteInterpreterProcessTest {
|
|||
public void testClientFactory() throws Exception {
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess(
|
||||
"../bin/interpreter.sh", "nonexists", new HashMap<String, String>(),
|
||||
"../bin/interpreter.sh", "nonexists", "fakeRepo", new HashMap<String, String>(),
|
||||
mock(RemoteInterpreterEventPoller.class), 10 * 1000);
|
||||
rip.reference(intpGroup);
|
||||
assertEquals(0, rip.getNumActiveClient());
|
||||
|
|
|
|||
|
|
@ -69,6 +69,7 @@ public class RemoteInterpreterTest {
|
|||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null);
|
||||
|
|
@ -80,6 +81,7 @@ public class RemoteInterpreterTest {
|
|||
MockInterpreterB.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null);
|
||||
|
|
@ -164,27 +166,27 @@ public class RemoteInterpreterTest {
|
|||
Properties p = new Properties();
|
||||
|
||||
RemoteInterpreter intpA = new RemoteInterpreter(
|
||||
p,
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
env,
|
||||
10 * 1000,
|
||||
null
|
||||
);
|
||||
p,
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
||||
RemoteInterpreter intpB = new RemoteInterpreter(
|
||||
p,
|
||||
MockInterpreterB.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
env,
|
||||
10 * 1000,
|
||||
null
|
||||
);
|
||||
p,
|
||||
MockInterpreterB.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null);
|
||||
|
||||
intpGroup.add(intpB);
|
||||
intpB.setInterpreterGroup(intpGroup);
|
||||
|
|
|
|||
|
|
@ -64,14 +64,14 @@ public class RemoteSchedulerTest {
|
|||
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
|
||||
|
||||
final RemoteInterpreter intpA = new RemoteInterpreter(
|
||||
p,
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
env,
|
||||
10 * 1000,
|
||||
null
|
||||
);
|
||||
p,
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
|
@ -148,14 +148,14 @@ public class RemoteSchedulerTest {
|
|||
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
|
||||
|
||||
final RemoteInterpreter intpA = new RemoteInterpreter(
|
||||
p,
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
env,
|
||||
10 * 1000,
|
||||
null
|
||||
);
|
||||
p,
|
||||
MockInterpreterA.class.getName(),
|
||||
new File("../bin/interpreter.sh").getAbsolutePath(),
|
||||
"fake",
|
||||
"fakeRepo",
|
||||
env,
|
||||
10 * 1000,
|
||||
null);
|
||||
|
||||
intpGroup.add(intpA);
|
||||
intpA.setInterpreterGroup(intpGroup);
|
||||
|
|
|
|||
|
|
@ -76,7 +76,8 @@ public class ZeppelinServer extends Application {
|
|||
public ZeppelinServer() throws Exception {
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
|
||||
this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO));
|
||||
this.depResolver = new DependencyResolver(
|
||||
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO));
|
||||
this.schedulerFactory = new SchedulerFactory();
|
||||
this.replFactory = new InterpreterFactory(conf, notebookWsServer,
|
||||
notebookWsServer, depResolver);
|
||||
|
|
|
|||
|
|
@ -346,6 +346,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_REMOTE_RUNNER);
|
||||
}
|
||||
|
||||
public String getInterpreterLocalRepoPath() {
|
||||
return getRelativeDir(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO);
|
||||
}
|
||||
|
||||
public String getRelativeDir(ConfVars c) {
|
||||
return getRelativeDir(getString(c));
|
||||
}
|
||||
|
|
@ -455,6 +459,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.scalding.ScaldingInterpreter,"
|
||||
+ "org.apache.zeppelin.jdbc.JDBCInterpreter"),
|
||||
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
|
||||
ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
|
||||
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
|
|
|
|||
|
|
@ -393,7 +393,8 @@ public class InterpreterFactory {
|
|||
if (option.isRemote()) {
|
||||
intp = createRemoteRepl(info.getPath(),
|
||||
info.getClassName(),
|
||||
properties);
|
||||
properties,
|
||||
interpreterGroup.id);
|
||||
} else {
|
||||
intp = createRepl(info.getPath(),
|
||||
info.getClassName(),
|
||||
|
|
@ -661,12 +662,12 @@ public class InterpreterFactory {
|
|||
|
||||
|
||||
private Interpreter createRemoteRepl(String interpreterPath, String className,
|
||||
Properties property) {
|
||||
|
||||
Properties property, String interpreterId) {
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterId;
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
|
||||
property, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, connectTimeout, remoteInterpreterProcessListener));
|
||||
interpreterPath, localRepoPath, connectTimeout, remoteInterpreterProcessListener));
|
||||
return intp;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue