This commit is contained in:
Lee moon soo 2016-04-13 09:37:40 +01:00
parent 98f3872c6a
commit 024d7fc2c5
15 changed files with 496 additions and 45 deletions

View file

@ -16,6 +16,8 @@
*/
package org.apache.zeppelin.helium;
import org.apache.zeppelin.resource.ResourceSet;
/**
* Application wrapper
*/
@ -67,4 +69,14 @@ public class ClassLoaderApplication extends Application {
public Application getInnerApplication() {
return app;
}
@Override
public ResourceSet args() {
return app.args();
}
@Override
public ApplicationContext context() {
return app.context();
}
}

View file

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.dev;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
/**
* Dummy interpreter to support development mode for Zeppelin app
*/
public class DevInterpreter extends Interpreter {
static {
Interpreter.register(
"dev",
"dev",
DevInterpreter.class.getName(),
new InterpreterPropertyBuilder().build());
}
private InterpreterEvent interpreterEvent;
private InterpreterContext context;
public static boolean isInterpreterName(String replName) {
return replName.equals("dev");
}
/**
* event handler for ZeppelinApplicationDevServer
*/
public static interface InterpreterEvent {
public InterpreterResult interpret(String st, InterpreterContext context);
}
public DevInterpreter(Properties property) {
super(property);
}
public DevInterpreter(Properties property, InterpreterEvent interpreterEvent) {
super(property);
this.interpreterEvent = interpreterEvent;
}
@Override
public void open() {
}
@Override
public void close() {
}
public void rerun() {
for (InterpreterContextRunner r : context.getRunners()) {
if (context.getParagraphId().equals(r.getParagraphId())) {
r.run();
}
}
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
this.context = context;
try {
return interpreterEvent.interpret(st, context);
} catch (Exception e) {
throw new InterpreterException(e);
}
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.NATIVE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public List<String> completion(String buf, int cursor) {
return new LinkedList<String>();
}
public InterpreterContext getLastInterpretContext() {
return context;
}
public void setInterpreterEvent(InterpreterEvent event) {
this.interpreterEvent = event;
}
}

View file

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.dev;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.PatternLayout;
import org.apache.zeppelin.helium.*;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.resource.ResourceSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Run this server for development mode.
*/
public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
final Logger logger = LoggerFactory.getLogger(ZeppelinApplicationDevServer.class);
private final String className;
private final ResourceSet resourceSet;
private Application app;
private InterpreterOutput out;
public ZeppelinApplicationDevServer(final String className, ResourceSet resourceSet) throws
Exception {
this(ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT, className, resourceSet);
}
public ZeppelinApplicationDevServer(int port, String className, ResourceSet resourceSet) throws
Exception {
super(port);
this.className = className;
this.resourceSet = resourceSet;
setLogger();
};
void setLogger() {
ConsoleAppender console = new ConsoleAppender(); //create appender
//configure the appender
String PATTERN = "%d [%p|%c|%C{1}] %m%n";
console.setLayout(new PatternLayout(PATTERN));
console.setThreshold(Level.DEBUG);
console.activateOptions();
//add appender to any Logger (here is root)
org.apache.log4j.Logger.getRootLogger().addAppender(console);
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
if (app == null) {
logger.info("Create instance " + className);
try {
Class<?> appClass = ClassLoader.getSystemClassLoader().loadClass(className);
Constructor<?> constructor = appClass.getConstructor(
ResourceSet.class, ApplicationContext.class);
app = (Application) constructor.newInstance(resourceSet, getApplicationContext(context));
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
}
try {
logger.info("Run " + className);
app.context().out.setType(InterpreterResult.Type.ANGULAR);
app.run();
} catch (ApplicationException e) {
logger.error(e.getMessage(), e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
return new InterpreterResult(Code.SUCCESS, "");
}
ApplicationContext getApplicationContext(InterpreterContext interpreterContext) {
return new ApplicationContext(
interpreterContext.getNoteId(),
interpreterContext.getParagraphId(),
new HeliumAppAngularObjectRegistry(
interpreterContext.getAngularObjectRegistry(),
interpreterContext.getNoteId(),
interpreterContext.getParagraphId()),
interpreterContext.out);
}
@Override
protected InterpreterOutput createInterpreterOutput(
final String noteId, final String paragraphId) {
if (out == null) {
final RemoteInterpreterEventClient eventClient = getEventClient();
try {
out = new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
}
}, this);
} catch (IOException e) {
return null;
}
}
return out;
}
}

View file

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.dev;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.dev.DevInterpreter.InterpreterEvent;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Interpreter development server
*/
public class ZeppelinDevServer extends
RemoteInterpreterServer implements InterpreterEvent, InterpreterOutputChangeListener {
final Logger logger = LoggerFactory.getLogger(ZeppelinDevServer.class);
public static final int DEFAULT_TEST_INTERPRETER_PORT = 29914;
DevInterpreter interpreter = null;
private InterpreterEvent listener;
InterpreterOutput out;
public ZeppelinDevServer(int port) throws TException {
super(port);
}
@Override
protected Interpreter getInterpreter(String noteId, String className) throws TException {
synchronized (this) {
InterpreterGroup interpreterGroup = getInterpreterGroup();
if (interpreterGroup == null) {
createInterpreter(
"dev",
noteId,
DevInterpreter.class.getName(),
new HashMap<String, String>());
Interpreter intp = super.getInterpreter(noteId, className);
interpreter = (DevInterpreter) (
(ClassloaderInterpreter) ((LazyOpenInterpreter) intp).getInnerInterpreter())
.getInnerInterpreter();
interpreter.setInterpreterEvent(this);
notify();
}
}
return super.getInterpreter(noteId, className);
}
@Override
protected InterpreterOutput createInterpreterOutput(
final String noteId, final String paragraphId) {
if (out == null) {
final RemoteInterpreterEventClient eventClient = getEventClient();
try {
out = new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
}
@Override
public void onUpdate(InterpreterOutput out, byte[] output) {
eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
}
}, this);
} catch (IOException e) {
return null;
}
}
out.clear();
return out;
}
@Override
public void fileChanged(File file) {
refresh();
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
waitForConnected();
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
public void refresh() {
interpreter.rerun();
}
/**
* Wait until %dev paragraph is executed and connected to this process
*/
public void waitForConnected() {
synchronized (this) {
while (!isConnected()) {
try {
this.wait(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public boolean isConnected() {
return !(interpreter == null || interpreter.getLastInterpretContext() == null);
}
}

View file

@ -76,4 +76,9 @@ public class ClientFactory extends BasePooledObjectFactory<Client>{
}
}
}
@Override
public boolean validateObject(PooledObject<Client> p) {
return p.getObject().getOutputProtocol().getTransport().isOpen();
}
}

View file

@ -198,7 +198,9 @@ public class RemoteInterpreter extends Interpreter {
boolean broken = false;
try {
logger.info("Create remote interpreter {}", getClassName());
property.put("zeppelin.interpreter.localRepo", localRepoPath);
if (localRepoPath != null) {
property.put("zeppelin.interpreter.localRepo", localRepoPath);
}
client.createInterpreter(groupId, noteId,
getClassName(), (Map) property);

View file

@ -73,7 +73,17 @@ public class RemoteInterpreterEventPoller extends Thread {
public void run() {
Client client = null;
while (!shutdown && interpreterProcess.isRunning()) {
while (!shutdown) {
// wait and retry
if (!interpreterProcess.isRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// nothing to do
}
continue;
}
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
@ -145,7 +155,7 @@ public class RemoteInterpreterEventPoller extends Thread {
String paragraphId = outputAppend.get("paragraphId");
String outputToAppend = outputAppend.get("data");
String appId = outputAppend.get("appId");
logger.info("Append " + outputToAppend + ", appId = " + appId);
if (appId == null) {
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
} else {
@ -159,7 +169,7 @@ public class RemoteInterpreterEventPoller extends Thread {
String paragraphId = outputAppend.get("paragraphId");
String outputToUpdate = outputAppend.get("data");
String appId = outputAppend.get("appId");
logger.info("Update " + outputToUpdate + ", appId = " + appId);
if (appId == null) {
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
} else {

View file

@ -30,8 +30,10 @@ import java.util.Map;
/**
* This class manages start / stop of remote interpreter process
*/
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess implements ExecuteResultHandler {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterManagedProcess.class);
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
implements ExecuteResultHandler {
private static final Logger logger = LoggerFactory.getLogger(
RemoteInterpreterManagedProcess.class);
private final String interpreterRunner;
private DefaultExecutor executor;

View file

@ -75,7 +75,7 @@ public abstract class RemoteInterpreterProcess {
if (clientPool == null) {
clientPool = new GenericObjectPool<Client>(new ClientFactory(getHost(), getPort()));
clientPool.setTestOnReturn(true);
clientPool.setTestOnBorrow(true);
remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
remoteInterpreterEventPoller.setInterpreterProcess(this);

View file

@ -75,7 +75,6 @@ public class RemoteInterpreterServer
private final Map<String, Application> runningApplications =
Collections.synchronizedMap(new HashMap<String, Application>());
public RemoteInterpreterServer(int port) throws TTransportException {
this.port = port;
@ -188,7 +187,19 @@ public class RemoteInterpreterServer
}
}
private Interpreter getInterpreter(String noteId, String className) throws TException {
protected InterpreterGroup getInterpreterGroup() {
return interpreterGroup;
}
protected ResourcePool getResourcePool() {
return resourcePool;
}
protected RemoteInterpreterEventClient getEventClient() {
return eventClient;
}
protected Interpreter getInterpreter(String noteId, String className) throws TException {
if (interpreterGroup == null) {
throw new TException(
new InterpreterException("Interpreter instance " + className + " not created"));
@ -393,7 +404,7 @@ public class RemoteInterpreterServer
if (job != null) {
job.setStatus(Status.ABORT);
} else {
intp.cancel(convert(interpreterContext));
intp.cancel(convert(interpreterContext, null));
}
}
@ -402,7 +413,7 @@ public class RemoteInterpreterServer
RemoteInterpreterContext interpreterContext)
throws TException {
Interpreter intp = getInterpreter(noteId, className);
return intp.getProgress(convert(interpreterContext));
return intp.getProgress(convert(interpreterContext, null));
}
@ -420,6 +431,10 @@ public class RemoteInterpreterServer
}
private InterpreterContext convert(RemoteInterpreterContext ric) {
return convert(ric, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
}
private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutput output) {
List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
new TypeToken<List<RemoteInterpreterContextRunner>>() {
@ -440,11 +455,12 @@ public class RemoteInterpreterServer
gson.fromJson(ric.getGui(), GUI.class),
interpreterGroup.getAngularObjectRegistry(),
interpreterGroup.getResourcePool(),
contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
contextRunners, output);
}
private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) {
protected InterpreterOutput createInterpreterOutput(final String noteId, final String
paragraphId) {
return new InterpreterOutput(new InterpreterOutputListener() {
@Override
public void onAppend(InterpreterOutput out, byte[] line) {
@ -659,9 +675,14 @@ public class RemoteInterpreterServer
@Override
public List<String> resourcePoolGetAll() throws TException {
logger.debug("Request getAll from ZeppelinServer");
List<String> result = new LinkedList<String>();
if (resourcePool == null) {
return result;
}
ResourceSet resourceSet = resourcePool.getAll(false);
List<String> result = new LinkedList<String>();
Gson gson = new Gson();
for (Resource r : resourceSet) {
@ -708,7 +729,7 @@ public class RemoteInterpreterServer
}
}
private InterpreterOutput createAppOutput(final String noteId,
protected InterpreterOutput createAppOutput(final String noteId,
final String paragraphId,
final String appId) {
return new InterpreterOutput(new InterpreterOutputListener() {
@ -791,6 +812,7 @@ public class RemoteInterpreterServer
return new RemoteApplicationResult(false, "Application instance does not exists");
} else {
try {
app.context().out.clear();
app.run();
return new RemoteApplicationResult(true, "");
} catch (ApplicationException e) {

View file

@ -659,15 +659,10 @@ public class NotebookServer extends WebSocketServlet implements
// propagate change to (Remote) AngularObjectRegistry
Note note = notebook.getNote(noteId);
if (note != null) {
List<InterpreterSetting> settings = note.getNoteReplLoader()
.getInterpreterSettings();
for (InterpreterSetting setting : settings) {
if (setting.getInterpreterGroup() == null) {
continue;
}
if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
AngularObjectRegistry angularObjectRegistry = setting
.getInterpreterGroup().getAngularObjectRegistry();
Collection<InterpreterGroup> interpreterGroups = InterpreterGroup.getAll();
for (InterpreterGroup interpreterGroup : interpreterGroups) {
if (interpreterGroupId.equals(interpreterGroup.getId())) {
AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
// first trying to get local registry
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
if (ao == null) {
@ -1164,19 +1159,17 @@ public class NotebookServer extends WebSocketServlet implements
List<InterpreterSetting> intpSettings = note.getNoteReplLoader()
.getInterpreterSettings();
if (intpSettings.isEmpty())
if (intpSettings.isEmpty()) {
continue;
for (InterpreterSetting setting : intpSettings) {
if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) {
broadcast(
note.id(),
new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", object)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", note.id())
.put("paragraphId", object.getParagraphId()));
}
}
broadcast(
note.id(),
new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", object)
.put("interpreterGroupId", interpreterGroupId)
.put("noteId", note.id())
.put("paragraphId", object.getParagraphId()));
}
}
@ -1189,15 +1182,10 @@ public class NotebookServer extends WebSocketServlet implements
continue;
}
List<String> ids = note.getNoteReplLoader().getInterpreters();
for (String id : ids) {
if (id.equals(interpreterGroupId)) {
broadcast(
note.id(),
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
"noteId", noteId).put("paragraphId", paragraphId));
}
}
broadcast(
note.id(),
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
"noteId", noteId).put("paragraphId", paragraphId));
}
}
}

View file

@ -179,11 +179,13 @@ angular.module('zeppelinWebApp')
if (!data.paragraphId || data.paragraphId === $scope.paragraph.id) {
scope = paragraphScope;
registry = angularObjectRegistry;
console.log("paragraph scope");
} else {
var app = _.find($scope.apps, { id: data.paragraphId});
if (app) {
scope = getAppScope(app);
registry = getAppRegistry(app);
console.log("app scope");
} else {
// no matching app in this paragraph
return;

View file

@ -31,6 +31,8 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.zeppelin.interpreter.dev.DevInterpreter;
import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
@ -80,6 +82,8 @@ public class InterpreterFactory {
private Map<String, String> env = new HashMap<String, String>();
private Interpreter devInterpreter;
public InterpreterFactory(ZeppelinConfiguration conf,
AngularObjectRegistryListener angularObjectRegistryListener,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
@ -897,4 +901,26 @@ public class InterpreterFactory {
public void setEnv(Map<String, String> env) {
this.env = env;
}
public Interpreter getDevInterpreter() {
if (devInterpreter == null) {
InterpreterOption option = new InterpreterOption();
option.setRemote(true);
InterpreterGroup interpreterGroup = createInterpreterGroup("dev", option);
devInterpreter = connectToRemoteRepl("dev", DevInterpreter.class.getName(),
"localhost",
ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT,
new Properties());
LinkedList<Interpreter> intpList = new LinkedList<Interpreter>();
intpList.add(devInterpreter);
interpreterGroup.put("dev", intpList);
devInterpreter.setInterpreterGroup(interpreterGroup);
}
return devInterpreter;
}
}

View file

@ -62,4 +62,4 @@ public class InterpreterOption {
public int getPort() {
return port;
}
}
}

View file

@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.dev.DevInterpreter;
/**
* Interpreter loader per note.
@ -183,6 +184,11 @@ public class NoteInterpreterLoader {
}
}
// dev interpreter
if (DevInterpreter.isInterpreterName(replName)) {
return factory.getDevInterpreter();
}
throw new InterpreterException(replName + " interpreter not found");
}
}