mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
initial rest api impl
This commit is contained in:
parent
134bbe640a
commit
94b490d76c
10 changed files with 257 additions and 19 deletions
|
|
@ -136,8 +136,7 @@ public class ApplicationLoader {
|
|||
}
|
||||
|
||||
private ResourceSet findRequiredResourceSet(
|
||||
String [][] requiredResources, String noteId, String paragraphId)
|
||||
throws ApplicationException {
|
||||
String [][] requiredResources, String noteId, String paragraphId) {
|
||||
if (requiredResources == null || requiredResources.length == 0) {
|
||||
return new ResourceSet();
|
||||
}
|
||||
|
|
@ -151,7 +150,19 @@ public class ApplicationLoader {
|
|||
allResources = resourcePool.getAll();
|
||||
}
|
||||
|
||||
allResources = allResources.filterByNoteId(noteId).filterByParagraphId(paragraphId);
|
||||
return findRequiredResourceSet(requiredResources, noteId, paragraphId, allResources);
|
||||
}
|
||||
|
||||
static ResourceSet findRequiredResourceSet(String [][] requiredResources,
|
||||
String noteId,
|
||||
String paragraphId,
|
||||
ResourceSet resources) {
|
||||
ResourceSet args = new ResourceSet();
|
||||
if (requiredResources == null || requiredResources.length == 0) {
|
||||
return args;
|
||||
}
|
||||
|
||||
resources = resources.filterByNoteId(noteId).filterByParagraphId(paragraphId);
|
||||
|
||||
for (String [] requires : requiredResources) {
|
||||
args.clear();
|
||||
|
|
@ -159,7 +170,7 @@ public class ApplicationLoader {
|
|||
for (String require : requires) {
|
||||
boolean found = false;
|
||||
|
||||
for (Resource r : allResources) {
|
||||
for (Resource r : resources) {
|
||||
if (r.getClassName().equals(require)) {
|
||||
args.add(r);
|
||||
found = true;
|
||||
|
|
@ -177,7 +188,7 @@ public class ApplicationLoader {
|
|||
}
|
||||
}
|
||||
|
||||
throw new ApplicationException("Can not find available resources");
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -81,6 +81,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.applicationEventListener = appListener;
|
||||
}
|
||||
|
||||
// VisibleForTesting
|
||||
public RemoteInterpreter(Properties property,
|
||||
String noteId,
|
||||
String className,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.rest;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.zeppelin.helium.Helium;
|
||||
import org.apache.zeppelin.helium.HeliumApplicationFactory;
|
||||
import org.apache.zeppelin.helium.HeliumPackage;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.server.JsonResponse;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.ws.rs.*;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
||||
/**
|
||||
* Helium Rest Api
|
||||
*/
|
||||
@Path("/helium")
|
||||
@Produces("application/json")
|
||||
public class HeliumRestApi {
|
||||
Logger logger = LoggerFactory.getLogger(HeliumRestApi.class);
|
||||
|
||||
private Helium helium;
|
||||
private HeliumApplicationFactory applicationFactory;
|
||||
private Notebook notebook;
|
||||
private Gson gson = new Gson();
|
||||
|
||||
public HeliumRestApi(Helium helium,
|
||||
HeliumApplicationFactory heliumApplicationFactory,
|
||||
Notebook notebook) {
|
||||
this.helium = helium;
|
||||
this.applicationFactory = heliumApplicationFactory;
|
||||
this.notebook = notebook;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all packages
|
||||
* @return
|
||||
*/
|
||||
@GET
|
||||
@Path("all")
|
||||
public Response getAll() {
|
||||
return new JsonResponse(Response.Status.OK, "", helium.getAllPackageInfo()).build();
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("suggest/{noteId}/{paragraphId}")
|
||||
public Response suggest(@PathParam("noteId") String noteId,
|
||||
@PathParam("paragraphId") String paragraphId) {
|
||||
Note note = notebook.getNote(noteId);
|
||||
if (note == null) {
|
||||
return new JsonResponse(Response.Status.NOT_FOUND, "Note " + noteId + " not found").build();
|
||||
}
|
||||
|
||||
Paragraph paragraph = note.getParagraph(paragraphId);
|
||||
if (paragraph == null) {
|
||||
return new JsonResponse(Response.Status.NOT_FOUND, "Paragraph " + paragraphId + " not found")
|
||||
.build();
|
||||
}
|
||||
|
||||
return new JsonResponse(Response.Status.OK, "", helium.suggestApp(paragraph)).build();
|
||||
}
|
||||
|
||||
@POST
|
||||
@Path("load/{noteId}/{paragraphId}")
|
||||
public Response suggest(@PathParam("noteId") String noteId,
|
||||
@PathParam("paragraphId") String paragraphId,
|
||||
String heliumPackage) {
|
||||
|
||||
Note note = notebook.getNote(noteId);
|
||||
if (note == null) {
|
||||
return new JsonResponse(Response.Status.NOT_FOUND, "Note " + noteId + " not found").build();
|
||||
}
|
||||
|
||||
Paragraph paragraph = note.getParagraph(paragraphId);
|
||||
if (paragraph == null) {
|
||||
return new JsonResponse(Response.Status.NOT_FOUND, "Paragraph " + paragraphId + " not found")
|
||||
.build();
|
||||
}
|
||||
HeliumPackage pkg = gson.fromJson(heliumPackage, HeliumPackage.class);
|
||||
|
||||
String appId = applicationFactory.loadAndRun(pkg, paragraph);
|
||||
return new JsonResponse(Response.Status.OK, "", appId).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -70,6 +70,8 @@ public class ZeppelinServer extends Application {
|
|||
public static Notebook notebook;
|
||||
public static Server jettyWebServer;
|
||||
public static NotebookServer notebookWsServer;
|
||||
public static Helium helium;
|
||||
public static HeliumApplicationFactory heliumApplicationFactory;
|
||||
|
||||
private SchedulerFactory schedulerFactory;
|
||||
private InterpreterFactory replFactory;
|
||||
|
|
@ -77,8 +79,6 @@ public class ZeppelinServer extends Application {
|
|||
private SearchService notebookIndex;
|
||||
private NotebookAuthorization notebookAuthorization;
|
||||
private DependencyResolver depResolver;
|
||||
private Helium helium;
|
||||
private HeliumApplicationFactory heliumApplicationFactory;
|
||||
|
||||
public ZeppelinServer() throws Exception {
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
|
|
@ -298,6 +298,9 @@ public class ZeppelinServer extends Application {
|
|||
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer, notebookIndex);
|
||||
singletons.add(notebookApi);
|
||||
|
||||
HeliumRestApi heliumApi = new HeliumRestApi(helium, heliumApplicationFactory, notebook);
|
||||
singletons.add(heliumApi);
|
||||
|
||||
InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);
|
||||
singletons.add(interpreterApi);
|
||||
|
||||
|
|
|
|||
|
|
@ -19,11 +19,19 @@ package org.apache.zeppelin.helium;
|
|||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.resource.DistributedResourcePool;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.ResourcePoolUtils;
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
|
@ -76,7 +84,12 @@ public class Helium {
|
|||
File heliumConfFile = new File(path);
|
||||
if (!heliumConfFile.isFile()) {
|
||||
logger.warn("{} does not exists", path);
|
||||
return new HeliumConf();
|
||||
HeliumConf conf = new HeliumConf();
|
||||
LinkedList<HeliumRegistry> defaultRegistry = new LinkedList<HeliumRegistry>();
|
||||
defaultRegistry.add(new HeliumLocalRegistry("local", "../helium"));
|
||||
conf.setRegistry(defaultRegistry);
|
||||
this.registry = conf.getRegistry();
|
||||
return conf;
|
||||
} else {
|
||||
String jsonString = FileUtils.readFileToString(heliumConfFile);
|
||||
HeliumConf conf = gson.fromJson(jsonString, HeliumConf.class);
|
||||
|
|
@ -115,4 +128,40 @@ public class Helium {
|
|||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
public HeliumPackageSuggestion suggestApp(Paragraph paragraph) {
|
||||
HeliumPackageSuggestion suggestion = new HeliumPackageSuggestion();
|
||||
|
||||
Interpreter intp = paragraph.getCurrentRepl();
|
||||
if (intp == null) {
|
||||
return suggestion;
|
||||
}
|
||||
|
||||
ResourcePool resourcePool = intp.getInterpreterGroup().getResourcePool();
|
||||
ResourceSet allResources;
|
||||
|
||||
if (resourcePool == null) {
|
||||
allResources = new ResourceSet();
|
||||
} else if (resourcePool instanceof DistributedResourcePool) {
|
||||
allResources = ((DistributedResourcePool) resourcePool).getAll(false);
|
||||
} else {
|
||||
allResources = resourcePool.getAll();
|
||||
}
|
||||
|
||||
for (HeliumPackageSearchResult pkg : getAllPackageInfo()) {
|
||||
ResourceSet resources = ApplicationLoader.findRequiredResourceSet(
|
||||
pkg.getPkg().getResources(),
|
||||
paragraph.getNote().getId(),
|
||||
paragraph.getId(),
|
||||
allResources);
|
||||
if (resources == null) {
|
||||
continue;
|
||||
} else {
|
||||
suggestion.addAvailablePackage(pkg);
|
||||
}
|
||||
}
|
||||
|
||||
suggestion.sort();
|
||||
return suggestion;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
|
|||
/**
|
||||
* HeliumApplicationFactory
|
||||
*
|
||||
* 2. unload on interpreter restart
|
||||
* 3. front-end job
|
||||
* 4. example app
|
||||
* 5. dev mode
|
||||
|
|
@ -211,8 +212,15 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
"Can't unload application status " + appsToUnload.getStatus());
|
||||
}
|
||||
appsToUnload.setStatus(ApplicationState.Status.UNLOADING);
|
||||
RemoteInterpreterProcess intpProcess =
|
||||
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
Interpreter intp = paragraph.getCurrentRepl();
|
||||
if (intp == null) {
|
||||
throw new ApplicationException("No interpreter found");
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess intpProcess = intp.getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
if (intpProcess == null) {
|
||||
throw new ApplicationException("Target interpreter process is not running");
|
||||
}
|
||||
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
|
|
@ -289,9 +297,16 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
throw new ApplicationException(
|
||||
"Can't run application status " + app.getStatus());
|
||||
}
|
||||
RemoteInterpreterProcess intpProcess =
|
||||
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
|
||||
Interpreter intp = paragraph.getCurrentRepl();
|
||||
if (intp == null) {
|
||||
throw new ApplicationException("No interpreter found");
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess intpProcess = intp.getInterpreterGroup().getRemoteInterpreterProcess();
|
||||
if (intpProcess == null) {
|
||||
throw new ApplicationException("Target interpreter process is not running");
|
||||
}
|
||||
RemoteInterpreterService.Client client;
|
||||
try {
|
||||
client = intpProcess.getClient();
|
||||
|
|
@ -411,7 +426,8 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
|
|||
public void onParagraphRemove(Paragraph paragraph) {
|
||||
List<ApplicationState> appStates = paragraph.getAllApplicationStates();
|
||||
for (ApplicationState app : appStates) {
|
||||
unload(paragraph, app.getId());
|
||||
UnloadApplication unloadJob = new UnloadApplication(paragraph, app.getId());
|
||||
unloadJob.run();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Suggested apps
|
||||
*/
|
||||
public class HeliumPackageSuggestion {
|
||||
private final List<HeliumPackageSearchResult> available =
|
||||
new LinkedList<HeliumPackageSearchResult>();
|
||||
|
||||
/*
|
||||
* possible future improvement
|
||||
* provides n - 'favorite' list, based on occurrence of apps in notebook
|
||||
*/
|
||||
|
||||
public HeliumPackageSuggestion() {
|
||||
|
||||
}
|
||||
|
||||
public void addAvailablePackage(HeliumPackageSearchResult r) {
|
||||
available.add(r);
|
||||
|
||||
}
|
||||
|
||||
public void sort() {
|
||||
Collections.sort(available, new Comparator<HeliumPackageSearchResult>() {
|
||||
@Override
|
||||
public int compare(HeliumPackageSearchResult o1, HeliumPackageSearchResult o2) {
|
||||
return o1.getPkg().getName().compareTo(o2.getPkg().getName());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -78,7 +78,7 @@ public class InterpreterFactory {
|
|||
|
||||
private DependencyResolver depResolver;
|
||||
|
||||
private Map<String, String> env;
|
||||
private Map<String, String> env = new HashMap<String, String>();
|
||||
|
||||
public InterpreterFactory(ZeppelinConfiguration conf,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener,
|
||||
|
|
|
|||
|
|
@ -175,6 +175,9 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
}
|
||||
|
||||
public Interpreter getRepl(String name) {
|
||||
if (replLoader == null) {
|
||||
return null;
|
||||
}
|
||||
return replLoader.get(name);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -181,9 +181,6 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
|
||||
// when remove paragraph
|
||||
note1.removeParagraph(p1.getId());
|
||||
while (app.getStatus() != ApplicationState.Status.UNLOADED) {
|
||||
Thread.yield();
|
||||
}
|
||||
|
||||
// then
|
||||
assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());
|
||||
|
|
@ -204,7 +201,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
new String[][]{});
|
||||
|
||||
Note note1 = notebook.createNote();
|
||||
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
notebook.bindInterpretersToNote(note1.id(), factory.getDefaultInterpreterSettingList());
|
||||
|
||||
Paragraph p1 = note1.addParagraph();
|
||||
|
||||
|
|
@ -221,7 +218,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
|
|||
}
|
||||
|
||||
// when unbind interpreter
|
||||
note1.getNoteReplLoader().setInterpreters(new LinkedList<String>());
|
||||
notebook.bindInterpretersToNote(note1.id(), new LinkedList<String>());
|
||||
|
||||
// then
|
||||
assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());
|
||||
|
|
|
|||
Loading…
Reference in a new issue