Merge pull request #3 from apache/master

ZEPPELIN-546 Enables interpreter library loading from maven repository
This commit is contained in:
swakrish 2016-01-12 19:47:52 -08:00
commit ec1403490f
26 changed files with 717 additions and 157 deletions

View file

@ -40,7 +40,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.spark.dep.DependencyContext;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonatype.aether.resolution.ArtifactResolutionException;
@ -69,7 +69,9 @@ public class DepInterpreter extends Interpreter {
"spark",
DepInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("zeppelin.dep.localrepo", "local-repo", "local repository for dependency loader")
.add("zeppelin.dep.localrepo",
getSystemDefault("ZEPPELIN_DEP_LOCALREPO", null, "local-repo"),
"local repository for dependency loader")
.add("zeppelin.dep.additionalRemoteRepository",
"spark-packages,http://dl.bintray.com/spark-packages/maven,false;",
"A list of 'id,remote-repository-URL,is-snapshot;' for each remote repository.")
@ -79,7 +81,7 @@ public class DepInterpreter extends Interpreter {
private SparkIMain intp;
private ByteArrayOutputStream out;
private DependencyContext depc;
private SparkDependencyContext depc;
private SparkJLineCompletion completor;
private SparkILoop interpreter;
static final Logger LOGGER = LoggerFactory.getLogger(DepInterpreter.class);
@ -88,10 +90,30 @@ public class DepInterpreter extends Interpreter {
super(property);
}
public DependencyContext getDependencyContext() {
public SparkDependencyContext getDependencyContext() {
return depc;
}
public static String getSystemDefault(
String envName,
String propertyName,
String defaultValue) {
if (envName != null && !envName.isEmpty()) {
String envValue = System.getenv().get(envName);
if (envValue != null) {
return envValue;
}
}
if (propertyName != null && !propertyName.isEmpty()) {
String propValue = System.getProperty(propertyName);
if (propValue != null) {
return propValue;
}
}
return defaultValue;
}
@Override
public void close() {
@ -152,16 +174,16 @@ public class DepInterpreter extends Interpreter {
intp.setContextClassLoader();
intp.initializeSynchronous();
depc = new DependencyContext(getProperty("zeppelin.dep.localrepo"),
depc = new SparkDependencyContext(getProperty("zeppelin.dep.localrepo"),
getProperty("zeppelin.dep.additionalRemoteRepository"));
completor = new SparkJLineCompletion(intp);
intp.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
Map<String, Object> binder = (Map<String, Object>) getValue("_binder");
binder.put("depc", depc);
intp.interpret("@transient val z = "
+ "_binder.get(\"depc\").asInstanceOf[org.apache.zeppelin.spark.dep.DependencyContext]");
+ "_binder.get(\"depc\")"
+ ".asInstanceOf[org.apache.zeppelin.spark.dep.SparkDependencyContext]");
}

View file

@ -54,7 +54,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.spark.dep.DependencyContext;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -127,7 +127,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand
URL [] urls = new URL[0];
if (depInterpreter != null) {
DependencyContext depc = depInterpreter.getDependencyContext();
SparkDependencyContext depc = depInterpreter.getDependencyContext();
if (depc != null) {
List<File> files = depc.getFiles();
List<URL> urlList = new LinkedList<URL>();

View file

@ -55,8 +55,8 @@ import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.spark.dep.DependencyContext;
import org.apache.zeppelin.spark.dep.DependencyResolver;
import org.apache.zeppelin.spark.dep.SparkDependencyContext;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -117,7 +117,7 @@ public class SparkInterpreter extends Interpreter {
private SparkContext sc;
private ByteArrayOutputStream out;
private SQLContext sqlc;
private DependencyResolver dep;
private SparkDependencyResolver dep;
private SparkJLineCompletion completor;
private JobProgressListener sparkListener;
@ -222,9 +222,9 @@ public class SparkInterpreter extends Interpreter {
return sqlc;
}
public DependencyResolver getDependencyResolver() {
public SparkDependencyResolver getDependencyResolver() {
if (dep == null) {
dep = new DependencyResolver(intp,
dep = new SparkDependencyResolver(intp,
sc,
getProperty("zeppelin.dep.localrepo"),
getProperty("zeppelin.dep.additionalRemoteRepository"));
@ -427,7 +427,7 @@ public class SparkInterpreter extends Interpreter {
// add dependency from DepInterpreter
DepInterpreter depInterpreter = getDepInterpreter();
if (depInterpreter != null) {
DependencyContext depc = depInterpreter.getDependencyContext();
SparkDependencyContext depc = depInterpreter.getDependencyContext();
if (depc != null) {
List<File> files = depc.getFiles();
if (files != null) {
@ -536,7 +536,7 @@ public class SparkInterpreter extends Interpreter {
// add jar
if (depInterpreter != null) {
DependencyContext depc = depInterpreter.getDependencyContext();
SparkDependencyContext depc = depInterpreter.getDependencyContext();
if (depc != null) {
List<File> files = depc.getFilesDist();
if (files != null) {

View file

@ -43,7 +43,7 @@ import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.spark.dep.DependencyResolver;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import scala.Tuple2;
import scala.Unit;
@ -53,14 +53,14 @@ import scala.collection.Iterable;
* Spark context for zeppelin.
*/
public class ZeppelinContext extends HashMap<String, Object> {
private DependencyResolver dep;
private SparkDependencyResolver dep;
private PrintStream out;
private InterpreterContext interpreterContext;
private int maxResult;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
DependencyResolver dep, PrintStream printStream,
SparkDependencyResolver dep, PrintStream printStream,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;

View file

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark.dep;
import java.io.File;
import java.net.MalformedURLException;
import java.util.LinkedList;
import java.util.List;
import org.apache.zeppelin.dep.Booter;
import org.apache.zeppelin.dep.Dependency;
import org.apache.zeppelin.dep.Repository;
import org.sonatype.aether.RepositorySystem;
import org.sonatype.aether.RepositorySystemSession;
import org.sonatype.aether.artifact.Artifact;
import org.sonatype.aether.collection.CollectRequest;
import org.sonatype.aether.graph.DependencyFilter;
import org.sonatype.aether.repository.RemoteRepository;
import org.sonatype.aether.repository.Authentication;
import org.sonatype.aether.resolution.ArtifactResolutionException;
import org.sonatype.aether.resolution.ArtifactResult;
import org.sonatype.aether.resolution.DependencyRequest;
import org.sonatype.aether.resolution.DependencyResolutionException;
import org.sonatype.aether.util.artifact.DefaultArtifact;
import org.sonatype.aether.util.artifact.JavaScopes;
import org.sonatype.aether.util.filter.DependencyFilterUtils;
import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
/**
*
*/
public class SparkDependencyContext {
List<Dependency> dependencies = new LinkedList<Dependency>();
List<Repository> repositories = new LinkedList<Repository>();
List<File> files = new LinkedList<File>();
List<File> filesDist = new LinkedList<File>();
private RepositorySystem system = Booter.newRepositorySystem();
private RepositorySystemSession session;
private RemoteRepository mavenCentral = Booter.newCentralRepository();
private RemoteRepository mavenLocal = Booter.newLocalRepository();
private List<RemoteRepository> additionalRepos = new LinkedList<RemoteRepository>();
public SparkDependencyContext(String localRepoPath, String additionalRemoteRepository) {
session = Booter.newRepositorySystemSession(system, localRepoPath);
addRepoFromProperty(additionalRemoteRepository);
}
public Dependency load(String lib) {
Dependency dep = new Dependency(lib);
if (dependencies.contains(dep)) {
dependencies.remove(dep);
}
dependencies.add(dep);
return dep;
}
public Repository addRepo(String name) {
Repository rep = new Repository(name);
repositories.add(rep);
return rep;
}
public void reset() {
dependencies = new LinkedList<Dependency>();
repositories = new LinkedList<Repository>();
files = new LinkedList<File>();
filesDist = new LinkedList<File>();
}
private void addRepoFromProperty(String listOfRepo) {
if (listOfRepo != null) {
String[] repos = listOfRepo.split(";");
for (String repo : repos) {
String[] parts = repo.split(",");
if (parts.length == 3) {
String id = parts[0].trim();
String url = parts[1].trim();
boolean isSnapshot = Boolean.parseBoolean(parts[2].trim());
if (id.length() > 1 && url.length() > 1) {
RemoteRepository rr = new RemoteRepository(id, "default", url);
rr.setPolicy(isSnapshot, null);
additionalRepos.add(rr);
}
}
}
}
}
/**
* fetch all artifacts
* @return
* @throws MalformedURLException
* @throws ArtifactResolutionException
* @throws DependencyResolutionException
*/
public List<File> fetch() throws MalformedURLException,
DependencyResolutionException, ArtifactResolutionException {
for (Dependency dep : dependencies) {
if (!dep.isLocalFsArtifact()) {
List<ArtifactResult> artifacts = fetchArtifactWithDep(dep);
for (ArtifactResult artifact : artifacts) {
if (dep.isDist()) {
filesDist.add(artifact.getArtifact().getFile());
}
files.add(artifact.getArtifact().getFile());
}
} else {
if (dep.isDist()) {
filesDist.add(new File(dep.getGroupArtifactVersion()));
}
files.add(new File(dep.getGroupArtifactVersion()));
}
}
return files;
}
private List<ArtifactResult> fetchArtifactWithDep(Dependency dep)
throws DependencyResolutionException, ArtifactResolutionException {
Artifact artifact = new DefaultArtifact(
SparkDependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion()));
DependencyFilter classpathFlter = DependencyFilterUtils
.classpathFilter(JavaScopes.COMPILE);
PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter(
SparkDependencyResolver.inferScalaVersion(dep.getExclusions()));
CollectRequest collectRequest = new CollectRequest();
collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact,
JavaScopes.COMPILE));
collectRequest.addRepository(mavenCentral);
collectRequest.addRepository(mavenLocal);
for (RemoteRepository repo : additionalRepos) {
collectRequest.addRepository(repo);
}
for (Repository repo : repositories) {
RemoteRepository rr = new RemoteRepository(repo.getName(), "default", repo.getUrl());
rr.setPolicy(repo.isSnapshot(), null);
Authentication auth = repo.getAuthentication();
if (auth != null) {
rr.setAuthentication(auth);
}
collectRequest.addRepository(rr);
}
DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter));
return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
}
public List<File> getFiles() {
return files;
}
public List<File> getFilesDist() {
return filesDist;
}
}

View file

@ -30,10 +30,9 @@ import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkContext;
import org.apache.spark.repl.SparkIMain;
import org.apache.zeppelin.dep.AbstractDependencyResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonatype.aether.RepositorySystem;
import org.sonatype.aether.RepositorySystemSession;
import org.sonatype.aether.artifact.Artifact;
import org.sonatype.aether.collection.CollectRequest;
import org.sonatype.aether.graph.Dependency;
@ -56,21 +55,13 @@ import scala.tools.nsc.util.MergedClassPath;
/**
* Deps resolver.
* Add new dependencies from mvn repo (at runetime) to Zeppelin.
* Add new dependencies from mvn repo (at runtime) to Spark interpreter group.
*/
public class DependencyResolver {
Logger logger = LoggerFactory.getLogger(DependencyResolver.class);
public class SparkDependencyResolver extends AbstractDependencyResolver {
Logger logger = LoggerFactory.getLogger(SparkDependencyResolver.class);
private Global global;
private SparkIMain intp;
private SparkContext sc;
private RepositorySystem system = Booter.newRepositorySystem();
private List<RemoteRepository> repos = new LinkedList<RemoteRepository>();
private RepositorySystemSession session;
private DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter(
JavaScopes.COMPILE,
JavaScopes.PROVIDED,
JavaScopes.RUNTIME,
JavaScopes.SYSTEM);
private final String[] exclusions = new String[] {"org.scala-lang:scala-library",
"org.scala-lang:scala-compiler",
@ -80,40 +71,15 @@ public class DependencyResolver {
"org.apache.zeppelin:zeppelin-spark",
"org.apache.zeppelin:zeppelin-server"};
public DependencyResolver(SparkIMain intp, SparkContext sc, String localRepoPath,
public SparkDependencyResolver(SparkIMain intp, SparkContext sc, String localRepoPath,
String additionalRemoteRepository) {
super(localRepoPath);
this.intp = intp;
this.global = intp.global();
this.sc = sc;
session = Booter.newRepositorySystemSession(system, localRepoPath);
repos.add(Booter.newCentralRepository()); // add maven central
repos.add(Booter.newLocalRepository());
addRepoFromProperty(additionalRemoteRepository);
}
public void addRepo(String id, String url, boolean snapshot) {
synchronized (repos) {
delRepo(id);
RemoteRepository rr = new RemoteRepository(id, "default", url);
rr.setPolicy(snapshot, null);
repos.add(rr);
}
}
public RemoteRepository delRepo(String id) {
synchronized (repos) {
Iterator<RemoteRepository> it = repos.iterator();
if (it.hasNext()) {
RemoteRepository repo = it.next();
if (repo.getId().equals(id)) {
it.remove();
return repo;
}
}
}
return null;
}
private void addRepoFromProperty(String listOfRepo) {
if (listOfRepo != null) {
String[] repos = listOfRepo.split(";");
@ -309,16 +275,16 @@ public class DependencyResolver {
}
/**
*
* @param dependency
* @param excludes list of pattern can either be of the form groupId:artifactId
* @return
* @throws Exception
*/
@Override
public List<ArtifactResult> getArtifactsWithDep(String dependency,
Collection<String> excludes) throws Exception {
Artifact artifact = new DefaultArtifact(inferScalaVersion(dependency));
DependencyFilter classpathFlter = DependencyFilterUtils.classpathFilter( JavaScopes.COMPILE );
DependencyFilter classpathFilter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE);
PatternExclusionsDependencyFilter exclusionFilter =
new PatternExclusionsDependencyFilter(inferScalaVersion(excludes));
@ -331,7 +297,7 @@ public class DependencyResolver {
}
}
DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
DependencyFilterUtils.andFilter(exclusionFilter, classpathFlter));
DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter));
return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
}

View file

@ -19,10 +19,10 @@ package org.apache.zeppelin.spark.dep;
import static org.junit.Assert.assertEquals;
import org.apache.zeppelin.spark.dep.DependencyResolver;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
import org.junit.Test;
public class DependencyResolverTest {
public class SparkDependencyResolverTest {
@Test
public void testInferScalaVersion() {
@ -30,23 +30,23 @@ public class DependencyResolverTest {
String scalaVersion = version[0] + "." + version[1];
assertEquals("groupId:artifactId:version",
DependencyResolver.inferScalaVersion("groupId:artifactId:version"));
SparkDependencyResolver.inferScalaVersion("groupId:artifactId:version"));
assertEquals("groupId:artifactId_" + scalaVersion + ":version",
DependencyResolver.inferScalaVersion("groupId::artifactId:version"));
SparkDependencyResolver.inferScalaVersion("groupId::artifactId:version"));
assertEquals("groupId:artifactId:version::test",
DependencyResolver.inferScalaVersion("groupId:artifactId:version::test"));
SparkDependencyResolver.inferScalaVersion("groupId:artifactId:version::test"));
assertEquals("*",
DependencyResolver.inferScalaVersion("*"));
SparkDependencyResolver.inferScalaVersion("*"));
assertEquals("groupId:*",
DependencyResolver.inferScalaVersion("groupId:*"));
SparkDependencyResolver.inferScalaVersion("groupId:*"));
assertEquals("groupId:artifactId*",
DependencyResolver.inferScalaVersion("groupId:artifactId*"));
SparkDependencyResolver.inferScalaVersion("groupId:artifactId*"));
assertEquals("groupId:artifactId_" + scalaVersion,
DependencyResolver.inferScalaVersion("groupId::artifactId"));
SparkDependencyResolver.inferScalaVersion("groupId::artifactId"));
assertEquals("groupId:artifactId_" + scalaVersion + "*",
DependencyResolver.inferScalaVersion("groupId::artifactId*"));
SparkDependencyResolver.inferScalaVersion("groupId::artifactId*"));
assertEquals("groupId:artifactId_" + scalaVersion + ":*",
DependencyResolver.inferScalaVersion("groupId::artifactId:*"));
SparkDependencyResolver.inferScalaVersion("groupId::artifactId:*"));
}
}

View file

@ -97,6 +97,123 @@
<artifactId>commons-lang3</artifactId>
<version>${commons-lang.version}</version>
</dependency>
<!-- Aether :: maven dependency resolution -->
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-plugin-api</artifactId>
<version>3.0</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
<exclusion>
<groupId>org.sonatype.sisu</groupId>
<artifactId>sisu-inject-plexus</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.maven</groupId>
<artifactId>maven-model</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-api</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-util</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-impl</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-aether-provider</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-spi</artifactId>
</exclusion>
<exclusion>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-connector-file</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-connector-wagon</artifactId>
<version>1.12</version>
<exclusions>
<exclusion>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-provider-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-provider-api</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-http-lightweight</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-http-shared</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-http</artifactId>
<version>1.0</version>
<exclusions>
</exclusions>
</dependency>
</dependencies>
<build>

View file

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.dep;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.sonatype.aether.RepositorySystem;
import org.sonatype.aether.RepositorySystemSession;
import org.sonatype.aether.repository.RemoteRepository;
import org.sonatype.aether.resolution.ArtifactResult;
/**
* Abstract dependency resolver.
* Add new dependencies from mvn repo (at runtime) Zeppelin.
*/
public abstract class AbstractDependencyResolver {
protected RepositorySystem system = Booter.newRepositorySystem();
protected List<RemoteRepository> repos = new LinkedList<RemoteRepository>();
protected RepositorySystemSession session;
public AbstractDependencyResolver(String localRepoPath) {
session = Booter.newRepositorySystemSession(system, localRepoPath);
repos.add(Booter.newCentralRepository()); // add maven central
repos.add(Booter.newLocalRepository());
}
public void addRepo(String id, String url, boolean snapshot) {
synchronized (repos) {
delRepo(id);
RemoteRepository rr = new RemoteRepository(id, "default", url);
rr.setPolicy(snapshot, null);
repos.add(rr);
}
}
public RemoteRepository delRepo(String id) {
synchronized (repos) {
Iterator<RemoteRepository> it = repos.iterator();
if (it.hasNext()) {
RemoteRepository repo = it.next();
if (repo.getId().equals(id)) {
it.remove();
return repo;
}
}
}
return null;
}
public abstract List<ArtifactResult> getArtifactsWithDep(String dependency,
Collection<String> excludes) throws Exception;
}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.zeppelin.spark.dep;
package org.apache.zeppelin.dep;
import java.io.File;
@ -64,7 +64,7 @@ public class Booter {
public static RemoteRepository newCentralRepository() {
return new RemoteRepository("central", "default", "http://repo1.maven.org/maven2/");
}
public static RemoteRepository newLocalRepository() {
return new RemoteRepository("local",
"default", "file://" + System.getProperty("user.home") + "/.m2/repository");

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.zeppelin.spark.dep;
package org.apache.zeppelin.dep;
import java.util.LinkedList;
import java.util.List;

View file

@ -15,14 +15,12 @@
* limitations under the License.
*/
package org.apache.zeppelin.spark.dep;
package org.apache.zeppelin.dep;
import java.io.File;
import java.net.MalformedURLException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import org.sonatype.aether.RepositorySystem;
import org.sonatype.aether.RepositorySystemSession;
@ -30,7 +28,6 @@ import org.sonatype.aether.artifact.Artifact;
import org.sonatype.aether.collection.CollectRequest;
import org.sonatype.aether.graph.DependencyFilter;
import org.sonatype.aether.repository.RemoteRepository;
import org.sonatype.aether.repository.Authentication;
import org.sonatype.aether.resolution.ArtifactResolutionException;
import org.sonatype.aether.resolution.ArtifactResult;
import org.sonatype.aether.resolution.DependencyRequest;
@ -54,11 +51,9 @@ public class DependencyContext {
private RepositorySystemSession session;
private RemoteRepository mavenCentral = Booter.newCentralRepository();
private RemoteRepository mavenLocal = Booter.newLocalRepository();
private List<RemoteRepository> additionalRepos = new LinkedList<RemoteRepository>();
public DependencyContext(String localRepoPath, String additionalRemoteRepository) {
session = Booter.newRepositorySystemSession(system, localRepoPath);
addRepoFromProperty(additionalRemoteRepository);
public DependencyContext(String localRepoPath) {
session = Booter.newRepositorySystemSession(system, localRepoPath);
}
public Dependency load(String lib) {
@ -85,24 +80,6 @@ public class DependencyContext {
filesDist = new LinkedList<File>();
}
private void addRepoFromProperty(String listOfRepo) {
if (listOfRepo != null) {
String[] repos = listOfRepo.split(";");
for (String repo : repos) {
String[] parts = repo.split(",");
if (parts.length == 3) {
String id = parts[0].trim();
String url = parts[1].trim();
boolean isSnapshot = Boolean.parseBoolean(parts[2].trim());
if (id.length() > 1 && url.length() > 1) {
RemoteRepository rr = new RemoteRepository(id, "default", url);
rr.setPolicy(isSnapshot, null);
additionalRepos.add(rr);
}
}
}
}
}
/**
* fetch all artifacts
@ -136,13 +113,12 @@ public class DependencyContext {
private List<ArtifactResult> fetchArtifactWithDep(Dependency dep)
throws DependencyResolutionException, ArtifactResolutionException {
Artifact artifact = new DefaultArtifact(
DependencyResolver.inferScalaVersion(dep.getGroupArtifactVersion()));
Artifact artifact = new DefaultArtifact(dep.getGroupArtifactVersion());
DependencyFilter classpathFlter = DependencyFilterUtils
.classpathFilter(JavaScopes.COMPILE);
PatternExclusionsDependencyFilter exclusionFilter = new PatternExclusionsDependencyFilter(
DependencyResolver.inferScalaVersion(dep.getExclusions()));
dep.getExclusions());
CollectRequest collectRequest = new CollectRequest();
collectRequest.setRoot(new org.sonatype.aether.graph.Dependency(artifact,
@ -150,16 +126,9 @@ public class DependencyContext {
collectRequest.addRepository(mavenCentral);
collectRequest.addRepository(mavenLocal);
for (RemoteRepository repo : additionalRepos) {
collectRequest.addRepository(repo);
}
for (Repository repo : repositories) {
RemoteRepository rr = new RemoteRepository(repo.getName(), "default", repo.getUrl());
rr.setPolicy(repo.isSnapshot(), null);
Authentication auth = repo.getAuthentication();
if (auth != null) {
rr.setAuthentication(auth);
}
collectRequest.addRepository(rr);
}

View file

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.dep;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonatype.aether.artifact.Artifact;
import org.sonatype.aether.collection.CollectRequest;
import org.sonatype.aether.graph.Dependency;
import org.sonatype.aether.graph.DependencyFilter;
import org.sonatype.aether.repository.RemoteRepository;
import org.sonatype.aether.resolution.ArtifactResult;
import org.sonatype.aether.resolution.DependencyRequest;
import org.sonatype.aether.util.artifact.DefaultArtifact;
import org.sonatype.aether.util.artifact.JavaScopes;
import org.sonatype.aether.util.filter.DependencyFilterUtils;
import org.sonatype.aether.util.filter.PatternExclusionsDependencyFilter;
/**
* Deps resolver.
* Add new dependencies from mvn repo (at runtime) to Zeppelin.
*/
public class DependencyResolver extends AbstractDependencyResolver {
Logger logger = LoggerFactory.getLogger(DependencyResolver.class);
private final String[] exclusions = new String[] {"org.apache.zeppelin:zeppelin-zengine",
"org.apache.zeppelin:zeppelin-interpreter",
"org.apache.zeppelin:zeppelin-server"};
public DependencyResolver(String localRepoPath) {
super(localRepoPath);
}
public List<File> load(String artifact) throws Exception {
return load(artifact, new LinkedList<String>());
}
public List<File> load(String artifact, String destPath) throws Exception {
return load(artifact, new LinkedList<String>(), destPath);
}
public synchronized List<File> load(String artifact, Collection<String> excludes)
throws Exception {
if (StringUtils.isBlank(artifact)) {
// Should throw here
throw new RuntimeException("Invalid artifact to load");
}
// <groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
int numSplits = artifact.split(":").length;
if (numSplits >= 3 && numSplits <= 6) {
return loadFromMvn(artifact, excludes);
} else {
LinkedList<File> libs = new LinkedList<File>();
libs.add(new File(artifact));
return libs;
}
}
public List<File> load(String artifact, Collection<String> excludes, String destPath)
throws Exception {
List<File> libs = load(artifact, excludes);
// find home dir
String home = System.getenv("ZEPPELIN_HOME");
if (home == null) {
home = System.getProperty("zeppelin.home");
}
if (home == null) {
home = "..";
}
for (File srcFile: libs) {
File destFile = new File(home + "/" + destPath, srcFile.getName());
if (!destFile.exists() || !FileUtils.contentEquals(srcFile, destFile)) {
FileUtils.copyFile(srcFile, destFile);
logger.info("copy {} to {}", srcFile.getAbsolutePath(), destPath);
}
}
return libs;
}
private List<File> loadFromMvn(String artifact, Collection<String> excludes) throws Exception {
Collection<String> allExclusions = new LinkedList<String>();
allExclusions.addAll(excludes);
allExclusions.addAll(Arrays.asList(exclusions));
List<ArtifactResult> listOfArtifact;
listOfArtifact = getArtifactsWithDep(artifact, allExclusions);
Iterator<ArtifactResult> it = listOfArtifact.iterator();
while (it.hasNext()) {
Artifact a = it.next().getArtifact();
String gav = a.getGroupId() + ":" + a.getArtifactId() + ":" + a.getVersion();
for (String exclude : allExclusions) {
if (gav.startsWith(exclude)) {
it.remove();
break;
}
}
}
List<File> files = new LinkedList<File>();
for (ArtifactResult artifactResult : listOfArtifact) {
files.add(artifactResult.getArtifact().getFile());
logger.info("load {}", artifactResult.getArtifact().getFile().getAbsolutePath());
}
return files;
}
/**
* @param dependency
* @param excludes list of pattern can either be of the form groupId:artifactId
* @return
* @throws Exception
*/
@Override
public List<ArtifactResult> getArtifactsWithDep(String dependency,
Collection<String> excludes) throws Exception {
Artifact artifact = new DefaultArtifact(dependency);
DependencyFilter classpathFilter = DependencyFilterUtils.classpathFilter(JavaScopes.COMPILE);
PatternExclusionsDependencyFilter exclusionFilter =
new PatternExclusionsDependencyFilter(excludes);
CollectRequest collectRequest = new CollectRequest();
collectRequest.setRoot(new Dependency(artifact, JavaScopes.COMPILE));
synchronized (repos) {
for (RemoteRepository repo : repos) {
collectRequest.addRepository(repo);
}
}
DependencyRequest dependencyRequest = new DependencyRequest(collectRequest,
DependencyFilterUtils.andFilter(exclusionFilter, classpathFilter));
return system.resolveDependencies(session, dependencyRequest).getArtifactResults();
}
}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.zeppelin.spark.dep;
package org.apache.zeppelin.dep;
import org.sonatype.aether.repository.Authentication;
/**
*
@ -70,7 +70,7 @@ public class Repository {
return this;
}
protected Authentication getAuthentication() {
public Authentication getAuthentication() {
Authentication auth = null;
if (this.username != null && this.password != null) {
auth = new Authentication(this.username, this.password);

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.zeppelin.spark.dep;
package org.apache.zeppelin.dep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.zeppelin.spark.dep;
package org.apache.zeppelin.dep;
import org.apache.maven.repository.internal.DefaultServiceLocator;
import org.apache.maven.wagon.Wagon;

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.zeppelin.spark.dep;
package org.apache.zeppelin.dep;
import java.io.PrintStream;
import java.text.DecimalFormat;

View file

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.dep;
import static org.junit.Assert.assertTrue;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class DependencyResolverTest {
private static DependencyResolver resolver;
private static String testPath;
private static String testCopyPath;
private static String home;
@BeforeClass
public static void setUp() throws Exception {
testPath = "test-repo";
testCopyPath = "test-copy-repo";
resolver = new DependencyResolver(testPath);
home = System.getenv("ZEPPELIN_HOME");
if (home == null) {
home = System.getProperty("zeppelin.home");
}
if (home == null) {
home = "..";
}
}
@AfterClass
public static void tearDown() throws Exception {
FileUtils.deleteDirectory(new File(home + "/" + testPath));
FileUtils.deleteDirectory(new File(home + "/" + testCopyPath));
}
@Test
public void testLoad() throws Exception {
resolver.load("org.apache.commons:commons-lang3:3.4", testCopyPath);
assertTrue(new File(home + "/" + testPath + "/org/apache/commons/commons-lang3/3.4/").exists());
assertTrue(new File(home + "/" + testCopyPath + "/commons-lang3-3.4.jar").exists());
}
}

View file

@ -30,6 +30,7 @@ import javax.ws.rs.core.Application;
import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
@ -73,12 +74,14 @@ public class ZeppelinServer extends Application {
private InterpreterFactory replFactory;
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
private DependencyResolver depResolver;
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
this.depResolver = new DependencyResolver(conf.getString(ConfVars.ZEPPELIN_DEP_LOCALREPO));
this.schedulerFactory = new SchedulerFactory();
this.replFactory = new InterpreterFactory(conf, notebookWsServer);
this.replFactory = new InterpreterFactory(conf, notebookWsServer, depResolver);
this.notebookRepo = new NotebookRepoSync(conf);
this.notebookIndex = new LuceneSearch();

View file

@ -428,6 +428,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
// Decide when new note is created, interpreter settings will be binded automatically or not.
ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true),
ZEPPELIN_CONF_DIR("zeppelin.conf.dir", "conf"),
ZEPPELIN_DEP_LOCALREPO("zeppelin.dep.localrepo", "local-repo"),
// Allows a way to specify a ',' separated list of allowed origins for rest and websockets
// i.e. http://localhost:8080
ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"),

View file

@ -23,6 +23,7 @@ import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.NullArgumentException;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
@ -65,19 +66,24 @@ public class InterpreterFactory {
AngularObjectRegistryListener angularObjectRegistryListener;
DependencyResolver depResolver;
public InterpreterFactory(ZeppelinConfiguration conf,
AngularObjectRegistryListener angularObjectRegistryListener)
AngularObjectRegistryListener angularObjectRegistryListener,
DependencyResolver depResolver)
throws InterpreterException, IOException {
this(conf, new InterpreterOption(true), angularObjectRegistryListener);
this(conf, new InterpreterOption(true), angularObjectRegistryListener, depResolver);
}
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
AngularObjectRegistryListener angularObjectRegistryListener)
AngularObjectRegistryListener angularObjectRegistryListener,
DependencyResolver depResolver)
throws InterpreterException, IOException {
this.conf = conf;
this.defaultOption = defaultOption;
this.angularObjectRegistryListener = angularObjectRegistryListener;
this.depResolver = depResolver;
String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
interpreterClassList = replsConf.split(",");

View file

@ -38,32 +38,32 @@ import org.junit.Test;
public class InterpreterFactoryTest {
private InterpreterFactory factory;
private InterpreterFactory factory;
private File tmpDir;
private ZeppelinConfiguration conf;
private InterpreterContext context;
@Before
public void setUp() throws Exception {
public void setUp() throws Exception {
tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
tmpDir.mkdirs();
new File(tmpDir, "conf").mkdirs();
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
conf = new ZeppelinConfiguration();
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
context = new InterpreterContext("note", "id", "title", "text", null, null, null, null);
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
conf = new ZeppelinConfiguration();
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
context = new InterpreterContext("note", "id", "title", "text", null, null, null, null);
}
}
@After
public void tearDown() throws Exception {
delete(tmpDir);
}
@After
public void tearDown() throws Exception {
delete(tmpDir);
}
private void delete(File file){
if(file.isFile()) file.delete();
@ -78,24 +78,24 @@ public class InterpreterFactoryTest {
}
}
@Test
public void testBasic() {
List<String> all = factory.getDefaultInterpreterSettingList();
@Test
public void testBasic() {
List<String> all = factory.getDefaultInterpreterSettingList();
// get interpreter
Interpreter repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
assertFalse(((LazyOpenInterpreter) repl1).isOpen());
repl1.interpret("repl1", context);
assertTrue(((LazyOpenInterpreter) repl1).isOpen());
// get interpreter
Interpreter repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
assertFalse(((LazyOpenInterpreter) repl1).isOpen());
repl1.interpret("repl1", context);
assertTrue(((LazyOpenInterpreter) repl1).isOpen());
// try to get unavailable interpreter
assertNull(factory.get("unknown"));
// try to get unavailable interpreter
assertNull(factory.get("unknown"));
// restart interpreter
factory.restart(all.get(0));
repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
assertFalse(((LazyOpenInterpreter) repl1).isOpen());
}
// restart interpreter
factory.restart(all.get(0));
repl1 = factory.get(all.get(0)).getInterpreterGroup().getFirst();
assertFalse(((LazyOpenInterpreter) repl1).isOpen());
}
@Test
public void testFactoryDefaultList() throws IOException {
@ -119,8 +119,8 @@ public class InterpreterFactoryTest {
try {
factory.add("a mock", "mock2", null, new Properties());
} catch(NullArgumentException e) {
assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage());
}
assertEquals("Test null option" , e.getMessage(),new NullArgumentException("option").getMessage());
}
try {
factory.add("a mock" , "mock2" , new InterpreterOption(false),null);
} catch (NullArgumentException e){
@ -140,7 +140,7 @@ public class InterpreterFactoryTest {
factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties());
assertEquals(3, factory.get().size());
InterpreterFactory factory2 = new InterpreterFactory(conf, null);
InterpreterFactory factory2 = new InterpreterFactory(conf, null, null);
assertEquals(3, factory2.get().size());
}
}

View file

@ -58,7 +58,7 @@ public class NoteInterpreterLoaderTest {
MockInterpreter11.register("mock11", "group1", "org.apache.zeppelin.interpreter.mock.MockInterpreter11");
MockInterpreter2.register("mock2", "group2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
}
@After

View file

@ -85,7 +85,7 @@ public class NotebookTest implements JobListenerFactory{
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);
@ -172,7 +172,7 @@ public class NotebookTest implements JobListenerFactory{
note.persist();
Notebook notebook2 = new Notebook(
conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this, null);
conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null, null), this, null);
assertEquals(1, notebook2.getAllNotes().size());
}

View file

@ -87,7 +87,7 @@ public class NotebookRepoSyncTest implements JobListenerFactory {
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
SearchService search = mock(SearchService.class);
notebookRepoSync = new NotebookRepoSync(conf);

View file

@ -76,7 +76,7 @@ public class VFSNotebookRepoTest implements JobListenerFactory {
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
this.schedulerFactory = new SchedulerFactory();
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
factory = new InterpreterFactory(conf, new InterpreterOption(false), null, null);
SearchService search = mock(SearchService.class);
notebookRepo = new VFSNotebookRepo(conf);