initial rest api impl

This commit is contained in:
Lee moon soo 2016-04-06 10:35:37 +09:00
parent 134bbe640a
commit 94b490d76c
10 changed files with 257 additions and 19 deletions

View file

@ -136,8 +136,7 @@ public class ApplicationLoader {
}
private ResourceSet findRequiredResourceSet(
String [][] requiredResources, String noteId, String paragraphId)
throws ApplicationException {
String [][] requiredResources, String noteId, String paragraphId) {
if (requiredResources == null || requiredResources.length == 0) {
return new ResourceSet();
}
@ -151,7 +150,19 @@ public class ApplicationLoader {
allResources = resourcePool.getAll();
}
allResources = allResources.filterByNoteId(noteId).filterByParagraphId(paragraphId);
return findRequiredResourceSet(requiredResources, noteId, paragraphId, allResources);
}
static ResourceSet findRequiredResourceSet(String [][] requiredResources,
String noteId,
String paragraphId,
ResourceSet resources) {
ResourceSet args = new ResourceSet();
if (requiredResources == null || requiredResources.length == 0) {
return args;
}
resources = resources.filterByNoteId(noteId).filterByParagraphId(paragraphId);
for (String [] requires : requiredResources) {
args.clear();
@ -159,7 +170,7 @@ public class ApplicationLoader {
for (String require : requires) {
boolean found = false;
for (Resource r : allResources) {
for (Resource r : resources) {
if (r.getClassName().equals(require)) {
args.add(r);
found = true;
@ -177,7 +188,7 @@ public class ApplicationLoader {
}
}
throw new ApplicationException("Can not find available resources");
return null;
}

View file

@ -81,6 +81,7 @@ public class RemoteInterpreter extends Interpreter {
this.applicationEventListener = appListener;
}
// VisibleForTesting
public RemoteInterpreter(Properties property,
String noteId,
String className,

View file

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.rest;
import com.google.gson.Gson;
import org.apache.zeppelin.helium.Helium;
import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.server.JsonResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.Response;
/**
* Helium Rest Api
*/
@Path("/helium")
@Produces("application/json")
public class HeliumRestApi {
Logger logger = LoggerFactory.getLogger(HeliumRestApi.class);
private Helium helium;
private HeliumApplicationFactory applicationFactory;
private Notebook notebook;
private Gson gson = new Gson();
public HeliumRestApi(Helium helium,
HeliumApplicationFactory heliumApplicationFactory,
Notebook notebook) {
this.helium = helium;
this.applicationFactory = heliumApplicationFactory;
this.notebook = notebook;
}
/**
* Get all packages
* @return
*/
@GET
@Path("all")
public Response getAll() {
return new JsonResponse(Response.Status.OK, "", helium.getAllPackageInfo()).build();
}
@GET
@Path("suggest/{noteId}/{paragraphId}")
public Response suggest(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId) {
Note note = notebook.getNote(noteId);
if (note == null) {
return new JsonResponse(Response.Status.NOT_FOUND, "Note " + noteId + " not found").build();
}
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph == null) {
return new JsonResponse(Response.Status.NOT_FOUND, "Paragraph " + paragraphId + " not found")
.build();
}
return new JsonResponse(Response.Status.OK, "", helium.suggestApp(paragraph)).build();
}
@POST
@Path("load/{noteId}/{paragraphId}")
public Response suggest(@PathParam("noteId") String noteId,
@PathParam("paragraphId") String paragraphId,
String heliumPackage) {
Note note = notebook.getNote(noteId);
if (note == null) {
return new JsonResponse(Response.Status.NOT_FOUND, "Note " + noteId + " not found").build();
}
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph == null) {
return new JsonResponse(Response.Status.NOT_FOUND, "Paragraph " + paragraphId + " not found")
.build();
}
HeliumPackage pkg = gson.fromJson(heliumPackage, HeliumPackage.class);
String appId = applicationFactory.loadAndRun(pkg, paragraph);
return new JsonResponse(Response.Status.OK, "", appId).build();
}
}

View file

@ -70,6 +70,8 @@ public class ZeppelinServer extends Application {
public static Notebook notebook;
public static Server jettyWebServer;
public static NotebookServer notebookWsServer;
public static Helium helium;
public static HeliumApplicationFactory heliumApplicationFactory;
private SchedulerFactory schedulerFactory;
private InterpreterFactory replFactory;
@ -77,8 +79,6 @@ public class ZeppelinServer extends Application {
private SearchService notebookIndex;
private NotebookAuthorization notebookAuthorization;
private DependencyResolver depResolver;
private Helium helium;
private HeliumApplicationFactory heliumApplicationFactory;
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
@ -298,6 +298,9 @@ public class ZeppelinServer extends Application {
NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer, notebookIndex);
singletons.add(notebookApi);
HeliumRestApi heliumApi = new HeliumRestApi(helium, heliumApplicationFactory, notebook);
singletons.add(heliumApi);
InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory);
singletons.add(interpreterApi);

View file

@ -19,11 +19,19 @@ package org.apache.zeppelin.helium;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.resource.DistributedResourcePool;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.resource.ResourceSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -76,7 +84,12 @@ public class Helium {
File heliumConfFile = new File(path);
if (!heliumConfFile.isFile()) {
logger.warn("{} does not exists", path);
return new HeliumConf();
HeliumConf conf = new HeliumConf();
LinkedList<HeliumRegistry> defaultRegistry = new LinkedList<HeliumRegistry>();
defaultRegistry.add(new HeliumLocalRegistry("local", "../helium"));
conf.setRegistry(defaultRegistry);
this.registry = conf.getRegistry();
return conf;
} else {
String jsonString = FileUtils.readFileToString(heliumConfFile);
HeliumConf conf = gson.fromJson(jsonString, HeliumConf.class);
@ -115,4 +128,40 @@ public class Helium {
}
return list;
}
public HeliumPackageSuggestion suggestApp(Paragraph paragraph) {
HeliumPackageSuggestion suggestion = new HeliumPackageSuggestion();
Interpreter intp = paragraph.getCurrentRepl();
if (intp == null) {
return suggestion;
}
ResourcePool resourcePool = intp.getInterpreterGroup().getResourcePool();
ResourceSet allResources;
if (resourcePool == null) {
allResources = new ResourceSet();
} else if (resourcePool instanceof DistributedResourcePool) {
allResources = ((DistributedResourcePool) resourcePool).getAll(false);
} else {
allResources = resourcePool.getAll();
}
for (HeliumPackageSearchResult pkg : getAllPackageInfo()) {
ResourceSet resources = ApplicationLoader.findRequiredResourceSet(
pkg.getPkg().getResources(),
paragraph.getNote().getId(),
paragraph.getId(),
allResources);
if (resources == null) {
continue;
} else {
suggestion.addAvailablePackage(pkg);
}
}
suggestion.sort();
return suggestion;
}
}

View file

@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
/**
* HeliumApplicationFactory
*
* 2. unload on interpreter restart
* 3. front-end job
* 4. example app
* 5. dev mode
@ -211,8 +212,15 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
"Can't unload application status " + appsToUnload.getStatus());
}
appsToUnload.setStatus(ApplicationState.Status.UNLOADING);
RemoteInterpreterProcess intpProcess =
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
Interpreter intp = paragraph.getCurrentRepl();
if (intp == null) {
throw new ApplicationException("No interpreter found");
}
RemoteInterpreterProcess intpProcess = intp.getInterpreterGroup().getRemoteInterpreterProcess();
if (intpProcess == null) {
throw new ApplicationException("Target interpreter process is not running");
}
RemoteInterpreterService.Client client;
try {
@ -289,9 +297,16 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
throw new ApplicationException(
"Can't run application status " + app.getStatus());
}
RemoteInterpreterProcess intpProcess =
paragraph.getCurrentRepl().getInterpreterGroup().getRemoteInterpreterProcess();
Interpreter intp = paragraph.getCurrentRepl();
if (intp == null) {
throw new ApplicationException("No interpreter found");
}
RemoteInterpreterProcess intpProcess = intp.getInterpreterGroup().getRemoteInterpreterProcess();
if (intpProcess == null) {
throw new ApplicationException("Target interpreter process is not running");
}
RemoteInterpreterService.Client client;
try {
client = intpProcess.getClient();
@ -411,7 +426,8 @@ public class HeliumApplicationFactory implements ApplicationEventListener, Noteb
public void onParagraphRemove(Paragraph paragraph) {
List<ApplicationState> appStates = paragraph.getAllApplicationStates();
for (ApplicationState app : appStates) {
unload(paragraph, app.getId());
UnloadApplication unloadJob = new UnloadApplication(paragraph, app.getId());
unloadJob.run();
}
}

View file

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.helium;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
/**
* Suggested apps
*/
public class HeliumPackageSuggestion {
private final List<HeliumPackageSearchResult> available =
new LinkedList<HeliumPackageSearchResult>();
/*
* possible future improvement
* provides n - 'favorite' list, based on occurrence of apps in notebook
*/
public HeliumPackageSuggestion() {
}
public void addAvailablePackage(HeliumPackageSearchResult r) {
available.add(r);
}
public void sort() {
Collections.sort(available, new Comparator<HeliumPackageSearchResult>() {
@Override
public int compare(HeliumPackageSearchResult o1, HeliumPackageSearchResult o2) {
return o1.getPkg().getName().compareTo(o2.getPkg().getName());
}
});
}
}

View file

@ -78,7 +78,7 @@ public class InterpreterFactory {
private DependencyResolver depResolver;
private Map<String, String> env;
private Map<String, String> env = new HashMap<String, String>();
public InterpreterFactory(ZeppelinConfiguration conf,
AngularObjectRegistryListener angularObjectRegistryListener,

View file

@ -175,6 +175,9 @@ public class Paragraph extends Job implements Serializable, Cloneable {
}
public Interpreter getRepl(String name) {
if (replLoader == null) {
return null;
}
return replLoader.get(name);
}

View file

@ -181,9 +181,6 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
// when remove paragraph
note1.removeParagraph(p1.getId());
while (app.getStatus() != ApplicationState.Status.UNLOADED) {
Thread.yield();
}
// then
assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());
@ -204,7 +201,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
new String[][]{});
Note note1 = notebook.createNote();
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
notebook.bindInterpretersToNote(note1.id(), factory.getDefaultInterpreterSettingList());
Paragraph p1 = note1.addParagraph();
@ -221,7 +218,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
}
// when unbind interpreter
note1.getNoteReplLoader().setInterpreters(new LinkedList<String>());
notebook.bindInterpretersToNote(note1.id(), new LinkedList<String>());
// then
assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());