mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Dev mode
This commit is contained in:
parent
98f3872c6a
commit
024d7fc2c5
15 changed files with 496 additions and 45 deletions
|
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.zeppelin.helium;
|
||||
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
|
||||
/**
|
||||
* Application wrapper
|
||||
*/
|
||||
|
|
@ -67,4 +69,14 @@ public class ClassLoaderApplication extends Application {
|
|||
public Application getInnerApplication() {
|
||||
return app;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceSet args() {
|
||||
return app.args();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationContext context() {
|
||||
return app.context();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.dev;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
|
||||
/**
|
||||
* Dummy interpreter to support development mode for Zeppelin app
|
||||
*/
|
||||
public class DevInterpreter extends Interpreter {
|
||||
static {
|
||||
Interpreter.register(
|
||||
"dev",
|
||||
"dev",
|
||||
DevInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder().build());
|
||||
}
|
||||
|
||||
private InterpreterEvent interpreterEvent;
|
||||
private InterpreterContext context;
|
||||
|
||||
public static boolean isInterpreterName(String replName) {
|
||||
return replName.equals("dev");
|
||||
}
|
||||
|
||||
/**
|
||||
* event handler for ZeppelinApplicationDevServer
|
||||
*/
|
||||
public static interface InterpreterEvent {
|
||||
public InterpreterResult interpret(String st, InterpreterContext context);
|
||||
}
|
||||
|
||||
public DevInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
public DevInterpreter(Properties property, InterpreterEvent interpreterEvent) {
|
||||
super(property);
|
||||
this.interpreterEvent = interpreterEvent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
public void rerun() {
|
||||
for (InterpreterContextRunner r : context.getRunners()) {
|
||||
if (context.getParagraphId().equals(r.getParagraphId())) {
|
||||
r.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
this.context = context;
|
||||
try {
|
||||
return interpreterEvent.interpret(st, context);
|
||||
} catch (Exception e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.NATIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String buf, int cursor) {
|
||||
return new LinkedList<String>();
|
||||
}
|
||||
|
||||
public InterpreterContext getLastInterpretContext() {
|
||||
return context;
|
||||
}
|
||||
|
||||
public void setInterpreterEvent(InterpreterEvent event) {
|
||||
this.interpreterEvent = event;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.dev;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
import org.apache.log4j.ConsoleAppender;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.PatternLayout;
|
||||
import org.apache.zeppelin.helium.*;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Run this server for development mode.
|
||||
*/
|
||||
public class ZeppelinApplicationDevServer extends ZeppelinDevServer {
|
||||
final Logger logger = LoggerFactory.getLogger(ZeppelinApplicationDevServer.class);
|
||||
|
||||
private final String className;
|
||||
private final ResourceSet resourceSet;
|
||||
private Application app;
|
||||
private InterpreterOutput out;
|
||||
|
||||
public ZeppelinApplicationDevServer(final String className, ResourceSet resourceSet) throws
|
||||
Exception {
|
||||
this(ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT, className, resourceSet);
|
||||
}
|
||||
|
||||
public ZeppelinApplicationDevServer(int port, String className, ResourceSet resourceSet) throws
|
||||
Exception {
|
||||
super(port);
|
||||
this.className = className;
|
||||
this.resourceSet = resourceSet;
|
||||
setLogger();
|
||||
};
|
||||
|
||||
void setLogger() {
|
||||
ConsoleAppender console = new ConsoleAppender(); //create appender
|
||||
//configure the appender
|
||||
String PATTERN = "%d [%p|%c|%C{1}] %m%n";
|
||||
console.setLayout(new PatternLayout(PATTERN));
|
||||
console.setThreshold(Level.DEBUG);
|
||||
console.activateOptions();
|
||||
//add appender to any Logger (here is root)
|
||||
org.apache.log4j.Logger.getRootLogger().addAppender(console);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
if (app == null) {
|
||||
logger.info("Create instance " + className);
|
||||
try {
|
||||
Class<?> appClass = ClassLoader.getSystemClassLoader().loadClass(className);
|
||||
Constructor<?> constructor = appClass.getConstructor(
|
||||
ResourceSet.class, ApplicationContext.class);
|
||||
app = (Application) constructor.newInstance(resourceSet, getApplicationContext(context));
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
logger.info("Run " + className);
|
||||
app.context().out.setType(InterpreterResult.Type.ANGULAR);
|
||||
app.run();
|
||||
} catch (ApplicationException e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
return new InterpreterResult(Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
ApplicationContext getApplicationContext(InterpreterContext interpreterContext) {
|
||||
return new ApplicationContext(
|
||||
interpreterContext.getNoteId(),
|
||||
interpreterContext.getParagraphId(),
|
||||
new HeliumAppAngularObjectRegistry(
|
||||
interpreterContext.getAngularObjectRegistry(),
|
||||
interpreterContext.getNoteId(),
|
||||
interpreterContext.getParagraphId()),
|
||||
interpreterContext.out);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InterpreterOutput createInterpreterOutput(
|
||||
final String noteId, final String paragraphId) {
|
||||
if (out == null) {
|
||||
final RemoteInterpreterEventClient eventClient = getEventClient();
|
||||
try {
|
||||
out = new InterpreterOutput(new InterpreterOutputListener() {
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(InterpreterOutput out, byte[] output) {
|
||||
eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
|
||||
}
|
||||
}, this);
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return out;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,128 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.interpreter.dev;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.dev.DevInterpreter.InterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Interpreter development server
|
||||
*/
|
||||
public class ZeppelinDevServer extends
|
||||
RemoteInterpreterServer implements InterpreterEvent, InterpreterOutputChangeListener {
|
||||
final Logger logger = LoggerFactory.getLogger(ZeppelinDevServer.class);
|
||||
public static final int DEFAULT_TEST_INTERPRETER_PORT = 29914;
|
||||
|
||||
DevInterpreter interpreter = null;
|
||||
private InterpreterEvent listener;
|
||||
InterpreterOutput out;
|
||||
public ZeppelinDevServer(int port) throws TException {
|
||||
super(port);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Interpreter getInterpreter(String noteId, String className) throws TException {
|
||||
synchronized (this) {
|
||||
InterpreterGroup interpreterGroup = getInterpreterGroup();
|
||||
if (interpreterGroup == null) {
|
||||
createInterpreter(
|
||||
"dev",
|
||||
noteId,
|
||||
DevInterpreter.class.getName(),
|
||||
new HashMap<String, String>());
|
||||
|
||||
Interpreter intp = super.getInterpreter(noteId, className);
|
||||
interpreter = (DevInterpreter) (
|
||||
(ClassloaderInterpreter) ((LazyOpenInterpreter) intp).getInnerInterpreter())
|
||||
.getInnerInterpreter();
|
||||
interpreter.setInterpreterEvent(this);
|
||||
notify();
|
||||
}
|
||||
}
|
||||
return super.getInterpreter(noteId, className);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected InterpreterOutput createInterpreterOutput(
|
||||
final String noteId, final String paragraphId) {
|
||||
if (out == null) {
|
||||
final RemoteInterpreterEventClient eventClient = getEventClient();
|
||||
try {
|
||||
out = new InterpreterOutput(new InterpreterOutputListener() {
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(InterpreterOutput out, byte[] output) {
|
||||
eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output));
|
||||
}
|
||||
}, this);
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
out.clear();
|
||||
return out;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fileChanged(File file) {
|
||||
refresh();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
waitForConnected();
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
public void refresh() {
|
||||
interpreter.rerun();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until %dev paragraph is executed and connected to this process
|
||||
*/
|
||||
public void waitForConnected() {
|
||||
synchronized (this) {
|
||||
while (!isConnected()) {
|
||||
try {
|
||||
this.wait(10 * 1000);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isConnected() {
|
||||
return !(interpreter == null || interpreter.getLastInterpretContext() == null);
|
||||
}
|
||||
}
|
||||
|
|
@ -76,4 +76,9 @@ public class ClientFactory extends BasePooledObjectFactory<Client>{
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean validateObject(PooledObject<Client> p) {
|
||||
return p.getObject().getOutputProtocol().getTransport().isOpen();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -198,7 +198,9 @@ public class RemoteInterpreter extends Interpreter {
|
|||
boolean broken = false;
|
||||
try {
|
||||
logger.info("Create remote interpreter {}", getClassName());
|
||||
property.put("zeppelin.interpreter.localRepo", localRepoPath);
|
||||
if (localRepoPath != null) {
|
||||
property.put("zeppelin.interpreter.localRepo", localRepoPath);
|
||||
}
|
||||
client.createInterpreter(groupId, noteId,
|
||||
getClassName(), (Map) property);
|
||||
|
||||
|
|
|
|||
|
|
@ -73,7 +73,17 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
public void run() {
|
||||
Client client = null;
|
||||
|
||||
while (!shutdown && interpreterProcess.isRunning()) {
|
||||
while (!shutdown) {
|
||||
// wait and retry
|
||||
if (!interpreterProcess.isRunning()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// nothing to do
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
client = interpreterProcess.getClient();
|
||||
} catch (Exception e1) {
|
||||
|
|
@ -145,7 +155,7 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
String paragraphId = outputAppend.get("paragraphId");
|
||||
String outputToAppend = outputAppend.get("data");
|
||||
String appId = outputAppend.get("appId");
|
||||
logger.info("Append " + outputToAppend + ", appId = " + appId);
|
||||
|
||||
if (appId == null) {
|
||||
listener.onOutputAppend(noteId, paragraphId, outputToAppend);
|
||||
} else {
|
||||
|
|
@ -159,7 +169,7 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
String paragraphId = outputAppend.get("paragraphId");
|
||||
String outputToUpdate = outputAppend.get("data");
|
||||
String appId = outputAppend.get("appId");
|
||||
logger.info("Update " + outputToUpdate + ", appId = " + appId);
|
||||
|
||||
if (appId == null) {
|
||||
listener.onOutputUpdated(noteId, paragraphId, outputToUpdate);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -30,8 +30,10 @@ import java.util.Map;
|
|||
/**
|
||||
* This class manages start / stop of remote interpreter process
|
||||
*/
|
||||
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess implements ExecuteResultHandler {
|
||||
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterManagedProcess.class);
|
||||
public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
||||
implements ExecuteResultHandler {
|
||||
private static final Logger logger = LoggerFactory.getLogger(
|
||||
RemoteInterpreterManagedProcess.class);
|
||||
private final String interpreterRunner;
|
||||
|
||||
private DefaultExecutor executor;
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ public abstract class RemoteInterpreterProcess {
|
|||
|
||||
if (clientPool == null) {
|
||||
clientPool = new GenericObjectPool<Client>(new ClientFactory(getHost(), getPort()));
|
||||
clientPool.setTestOnReturn(true);
|
||||
clientPool.setTestOnBorrow(true);
|
||||
|
||||
remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup);
|
||||
remoteInterpreterEventPoller.setInterpreterProcess(this);
|
||||
|
|
|
|||
|
|
@ -75,7 +75,6 @@ public class RemoteInterpreterServer
|
|||
private final Map<String, Application> runningApplications =
|
||||
Collections.synchronizedMap(new HashMap<String, Application>());
|
||||
|
||||
|
||||
public RemoteInterpreterServer(int port) throws TTransportException {
|
||||
this.port = port;
|
||||
|
||||
|
|
@ -188,7 +187,19 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
}
|
||||
|
||||
private Interpreter getInterpreter(String noteId, String className) throws TException {
|
||||
protected InterpreterGroup getInterpreterGroup() {
|
||||
return interpreterGroup;
|
||||
}
|
||||
|
||||
protected ResourcePool getResourcePool() {
|
||||
return resourcePool;
|
||||
}
|
||||
|
||||
protected RemoteInterpreterEventClient getEventClient() {
|
||||
return eventClient;
|
||||
}
|
||||
|
||||
protected Interpreter getInterpreter(String noteId, String className) throws TException {
|
||||
if (interpreterGroup == null) {
|
||||
throw new TException(
|
||||
new InterpreterException("Interpreter instance " + className + " not created"));
|
||||
|
|
@ -393,7 +404,7 @@ public class RemoteInterpreterServer
|
|||
if (job != null) {
|
||||
job.setStatus(Status.ABORT);
|
||||
} else {
|
||||
intp.cancel(convert(interpreterContext));
|
||||
intp.cancel(convert(interpreterContext, null));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -402,7 +413,7 @@ public class RemoteInterpreterServer
|
|||
RemoteInterpreterContext interpreterContext)
|
||||
throws TException {
|
||||
Interpreter intp = getInterpreter(noteId, className);
|
||||
return intp.getProgress(convert(interpreterContext));
|
||||
return intp.getProgress(convert(interpreterContext, null));
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -420,6 +431,10 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
private InterpreterContext convert(RemoteInterpreterContext ric) {
|
||||
return convert(ric, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
|
||||
}
|
||||
|
||||
private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutput output) {
|
||||
List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
|
||||
List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
|
||||
new TypeToken<List<RemoteInterpreterContextRunner>>() {
|
||||
|
|
@ -440,11 +455,12 @@ public class RemoteInterpreterServer
|
|||
gson.fromJson(ric.getGui(), GUI.class),
|
||||
interpreterGroup.getAngularObjectRegistry(),
|
||||
interpreterGroup.getResourcePool(),
|
||||
contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId()));
|
||||
contextRunners, output);
|
||||
}
|
||||
|
||||
|
||||
private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) {
|
||||
protected InterpreterOutput createInterpreterOutput(final String noteId, final String
|
||||
paragraphId) {
|
||||
return new InterpreterOutput(new InterpreterOutputListener() {
|
||||
@Override
|
||||
public void onAppend(InterpreterOutput out, byte[] line) {
|
||||
|
|
@ -659,9 +675,14 @@ public class RemoteInterpreterServer
|
|||
@Override
|
||||
public List<String> resourcePoolGetAll() throws TException {
|
||||
logger.debug("Request getAll from ZeppelinServer");
|
||||
List<String> result = new LinkedList<String>();
|
||||
|
||||
if (resourcePool == null) {
|
||||
return result;
|
||||
}
|
||||
|
||||
ResourceSet resourceSet = resourcePool.getAll(false);
|
||||
List<String> result = new LinkedList<String>();
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
for (Resource r : resourceSet) {
|
||||
|
|
@ -708,7 +729,7 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
}
|
||||
|
||||
private InterpreterOutput createAppOutput(final String noteId,
|
||||
protected InterpreterOutput createAppOutput(final String noteId,
|
||||
final String paragraphId,
|
||||
final String appId) {
|
||||
return new InterpreterOutput(new InterpreterOutputListener() {
|
||||
|
|
@ -791,6 +812,7 @@ public class RemoteInterpreterServer
|
|||
return new RemoteApplicationResult(false, "Application instance does not exists");
|
||||
} else {
|
||||
try {
|
||||
app.context().out.clear();
|
||||
app.run();
|
||||
return new RemoteApplicationResult(true, "");
|
||||
} catch (ApplicationException e) {
|
||||
|
|
|
|||
|
|
@ -659,15 +659,10 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
// propagate change to (Remote) AngularObjectRegistry
|
||||
Note note = notebook.getNote(noteId);
|
||||
if (note != null) {
|
||||
List<InterpreterSetting> settings = note.getNoteReplLoader()
|
||||
.getInterpreterSettings();
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getInterpreterGroup() == null) {
|
||||
continue;
|
||||
}
|
||||
if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry = setting
|
||||
.getInterpreterGroup().getAngularObjectRegistry();
|
||||
Collection<InterpreterGroup> interpreterGroups = InterpreterGroup.getAll();
|
||||
for (InterpreterGroup interpreterGroup : interpreterGroups) {
|
||||
if (interpreterGroupId.equals(interpreterGroup.getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
|
||||
// first trying to get local registry
|
||||
ao = angularObjectRegistry.get(varName, noteId, paragraphId);
|
||||
if (ao == null) {
|
||||
|
|
@ -1164,19 +1159,17 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
|
||||
List<InterpreterSetting> intpSettings = note.getNoteReplLoader()
|
||||
.getInterpreterSettings();
|
||||
if (intpSettings.isEmpty())
|
||||
if (intpSettings.isEmpty()) {
|
||||
continue;
|
||||
for (InterpreterSetting setting : intpSettings) {
|
||||
if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) {
|
||||
broadcast(
|
||||
note.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
.put("angularObject", object)
|
||||
.put("interpreterGroupId", interpreterGroupId)
|
||||
.put("noteId", note.id())
|
||||
.put("paragraphId", object.getParagraphId()));
|
||||
}
|
||||
}
|
||||
|
||||
broadcast(
|
||||
note.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
.put("angularObject", object)
|
||||
.put("interpreterGroupId", interpreterGroupId)
|
||||
.put("noteId", note.id())
|
||||
.put("paragraphId", object.getParagraphId()));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1189,15 +1182,10 @@ public class NotebookServer extends WebSocketServlet implements
|
|||
continue;
|
||||
}
|
||||
|
||||
List<String> ids = note.getNoteReplLoader().getInterpreters();
|
||||
for (String id : ids) {
|
||||
if (id.equals(interpreterGroupId)) {
|
||||
broadcast(
|
||||
note.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
|
||||
"noteId", noteId).put("paragraphId", paragraphId));
|
||||
}
|
||||
}
|
||||
broadcast(
|
||||
note.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put(
|
||||
"noteId", noteId).put("paragraphId", paragraphId));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -179,11 +179,13 @@ angular.module('zeppelinWebApp')
|
|||
if (!data.paragraphId || data.paragraphId === $scope.paragraph.id) {
|
||||
scope = paragraphScope;
|
||||
registry = angularObjectRegistry;
|
||||
console.log("paragraph scope");
|
||||
} else {
|
||||
var app = _.find($scope.apps, { id: data.paragraphId});
|
||||
if (app) {
|
||||
scope = getAppScope(app);
|
||||
registry = getAppRegistry(app);
|
||||
console.log("app scope");
|
||||
} else {
|
||||
// no matching app in this paragraph
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -31,6 +31,8 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
|
||||
import org.apache.zeppelin.interpreter.dev.DevInterpreter;
|
||||
import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
||||
|
|
@ -80,6 +82,8 @@ public class InterpreterFactory {
|
|||
|
||||
private Map<String, String> env = new HashMap<String, String>();
|
||||
|
||||
private Interpreter devInterpreter;
|
||||
|
||||
public InterpreterFactory(ZeppelinConfiguration conf,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
|
|
@ -897,4 +901,26 @@ public class InterpreterFactory {
|
|||
public void setEnv(Map<String, String> env) {
|
||||
this.env = env;
|
||||
}
|
||||
|
||||
|
||||
public Interpreter getDevInterpreter() {
|
||||
if (devInterpreter == null) {
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
option.setRemote(true);
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup("dev", option);
|
||||
|
||||
devInterpreter = connectToRemoteRepl("dev", DevInterpreter.class.getName(),
|
||||
"localhost",
|
||||
ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT,
|
||||
new Properties());
|
||||
|
||||
LinkedList<Interpreter> intpList = new LinkedList<Interpreter>();
|
||||
intpList.add(devInterpreter);
|
||||
interpreterGroup.put("dev", intpList);
|
||||
|
||||
devInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
}
|
||||
return devInterpreter;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,4 +62,4 @@ public class InterpreterOption {
|
|||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterException;
|
|||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.dev.DevInterpreter;
|
||||
|
||||
/**
|
||||
* Interpreter loader per note.
|
||||
|
|
@ -183,6 +184,11 @@ public class NoteInterpreterLoader {
|
|||
}
|
||||
}
|
||||
|
||||
// dev interpreter
|
||||
if (DevInterpreter.isInterpreterName(replName)) {
|
||||
return factory.getDevInterpreter();
|
||||
}
|
||||
|
||||
throw new InterpreterException(replName + " interpreter not found");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue