mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
add remote works controller class and include interpreter factory
This commit is contained in:
parent
6e1f219993
commit
0570ae804b
29 changed files with 269 additions and 141 deletions
|
|
@ -328,6 +328,19 @@ public class ZeppelinContext {
|
|||
throw new InterpreterException("Paragraph " + id + " not found");
|
||||
}
|
||||
|
||||
/**
|
||||
* get Zeppelin Paragraph Runner from zeppelin server
|
||||
*/
|
||||
@ZeppelinApi
|
||||
public List<InterpreterContextRunner> getInterpreterContextRunner(
|
||||
String noteId, String paragraphId) {
|
||||
List<InterpreterContextRunner> runners = new LinkedList<>();
|
||||
|
||||
|
||||
|
||||
return runners;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraph at idx
|
||||
* @param idx
|
||||
|
|
|
|||
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* zeppelin job for Remote works controller by interpreter
|
||||
*
|
||||
*/
|
||||
public interface RemoteWorksController {
|
||||
List<InterpreterContextRunner> getRunner(String noteId);
|
||||
InterpreterContextRunner getRunner(String noteId, String paragraphId);
|
||||
}
|
||||
|
|
@ -45,6 +45,7 @@ import com.google.gson.reflect.TypeToken;
|
|||
public class RemoteInterpreter extends Interpreter {
|
||||
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
|
||||
private final ApplicationEventListener applicationEventListener;
|
||||
private final RemoteWorksController remoteWorksController;
|
||||
Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
|
||||
Gson gson = new Gson();
|
||||
private String interpreterRunner;
|
||||
|
|
@ -72,7 +73,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
int connectTimeout,
|
||||
int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener) {
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorksController) {
|
||||
super(property);
|
||||
this.noteId = noteId;
|
||||
this.className = className;
|
||||
|
|
@ -85,6 +87,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.maxPoolSize = maxPoolSize;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.applicationEventListener = appListener;
|
||||
this.remoteWorksController = remoteWorksController;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -100,7 +103,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
int connectTimeout,
|
||||
int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener) {
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorksController) {
|
||||
super(property);
|
||||
this.noteId = noteId;
|
||||
this.className = className;
|
||||
|
|
@ -111,6 +115,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.maxPoolSize = maxPoolSize;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.applicationEventListener = appListener;
|
||||
this.remoteWorksController = remoteWorksController;
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -125,7 +130,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
ApplicationEventListener appListener) {
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorksController) {
|
||||
super(property);
|
||||
this.className = className;
|
||||
this.noteId = noteId;
|
||||
|
|
@ -138,6 +144,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.maxPoolSize = 10;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.applicationEventListener = appListener;
|
||||
this.remoteWorksController = remoteWorksController;
|
||||
}
|
||||
|
||||
private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
|
||||
|
|
@ -181,13 +188,15 @@ public class RemoteInterpreter extends Interpreter {
|
|||
connectTimeout,
|
||||
remoteInterpreterProcessListener,
|
||||
applicationEventListener,
|
||||
remoteWorksController,
|
||||
host,
|
||||
port);
|
||||
} else {
|
||||
// create new remote process
|
||||
remoteProcess = new RemoteInterpreterManagedProcess(
|
||||
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
|
||||
remoteInterpreterProcessListener, applicationEventListener);
|
||||
remoteInterpreterProcessListener, applicationEventListener,
|
||||
remoteWorksController);
|
||||
}
|
||||
|
||||
intpGroup.setRemoteInterpreterProcess(remoteProcess);
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
|
|||
RemoteZeppelinServerController eventBody = new RemoteZeppelinServerController();
|
||||
eventBody.setType(RemoteZeppelinServerControlEvent.REQ_RESOURCE_PARAGRAPH_RUN_CONTEXT);
|
||||
eventBody.setEventOwnerKey(eventOwnerKey);
|
||||
logger.info("clover gson.toJson(runner) - " + gson.toJson(runner));
|
||||
eventBody.setMsg(gson.toJson(runner));
|
||||
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
|
|||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
|
|
@ -61,12 +62,15 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
|
||||
private RemoteInterpreterProcess interpreterProcess;
|
||||
private InterpreterGroup interpreterGroup;
|
||||
private RemoteWorksController remoteWorkController;
|
||||
|
||||
public RemoteInterpreterEventPoller(
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener) {
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorkController) {
|
||||
this.listener = listener;
|
||||
this.appListener = appListener;
|
||||
this.remoteWorkController = remoteWorkController;
|
||||
shutdown = false;
|
||||
}
|
||||
|
||||
|
|
@ -78,6 +82,14 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
this.interpreterGroup = interpreterGroup;
|
||||
}
|
||||
|
||||
public RemoteWorksController getRemoteWorkController() {
|
||||
return remoteWorkController;
|
||||
}
|
||||
|
||||
public void setRemoteWorkController(RemoteWorksController remoteWorkController) {
|
||||
this.remoteWorkController = remoteWorkController;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Client client = null;
|
||||
|
|
@ -223,15 +235,37 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
boolean broken = false;
|
||||
try {
|
||||
interpreterServer = interpreterProcess.getClient();
|
||||
|
||||
List<InterpreterContextRunner> interpreterContextRunners = new LinkedList<>();
|
||||
List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
|
||||
if (event.getType() == RemoteZeppelinServerControlEvent.REQ_RESOURCE_PARAGRAPH_RUN_CONTEXT) {
|
||||
ZeppelinServerResourceParagraphRunner runner = gson.fromJson(
|
||||
event.getMsg(), ZeppelinServerResourceParagraphRunner.class);
|
||||
|
||||
logger.info("clover req note id {} p id {} - msg {}",
|
||||
runner.getNoteId(), runner.getParagraphId(), event.getMsg());
|
||||
RemoteZeppelinServerController resResource = new RemoteZeppelinServerController();
|
||||
resResource.setType(RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT);
|
||||
resResource.setEventOwnerKey(eventOwnerKey);
|
||||
resResource.setMsg("test123123");
|
||||
if (runner.getParagraphId() != null) {
|
||||
InterpreterContextRunner intpRunner = remoteWorkController.getRunner(
|
||||
runner.getNoteId(), runner.getParagraphId());
|
||||
interpreterContextRunners.add(intpRunner);
|
||||
} else {
|
||||
interpreterContextRunners = remoteWorkController.getRunner(runner.getNoteId());
|
||||
}
|
||||
|
||||
logger.info("clover remotework count 1 {}", interpreterContextRunners.size());
|
||||
|
||||
for (InterpreterContextRunner r : interpreterContextRunners) {
|
||||
remoteRunners.add(
|
||||
new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
|
||||
);
|
||||
}
|
||||
|
||||
logger.info("clover remotework count 2 {}", remoteRunners.size());
|
||||
|
||||
resResource.setMsg(gson.toJson(remoteRunners));
|
||||
|
||||
interpreterServer.remoteZeppelinServerControlFeedback(resResource);
|
||||
logger.info("get runner noteid {} paragraphid {}",
|
||||
runner.getNoteId(), runner.getParagraphId());
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import org.apache.commons.exec.*;
|
|||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -43,6 +44,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
private int port = -1;
|
||||
private final String interpreterDir;
|
||||
private final String localRepoDir;
|
||||
private RemoteWorksController remoteWorksController;
|
||||
|
||||
private Map<String, String> env;
|
||||
|
||||
|
|
@ -53,13 +55,15 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
Map<String, String> env,
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener) {
|
||||
super(new RemoteInterpreterEventPoller(listener, appListener),
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorksController) {
|
||||
super(new RemoteInterpreterEventPoller(listener, appListener, remoteWorksController),
|
||||
connectTimeout);
|
||||
this.interpreterRunner = intpRunner;
|
||||
this.env = env;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
this.remoteWorksController = remoteWorksController;
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -75,6 +79,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
this.env = env;
|
||||
this.interpreterDir = intpDir;
|
||||
this.localRepoDir = localRepoDir;
|
||||
this.remoteWorksController = remoteInterpreterEventPoller.getRemoteWorkController();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import org.apache.zeppelin.helium.ApplicationEventListener;
|
|||
import org.apache.zeppelin.interpreter.Constants;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -48,8 +49,9 @@ public abstract class RemoteInterpreterProcess {
|
|||
public RemoteInterpreterProcess(
|
||||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener) {
|
||||
this(new RemoteInterpreterEventPoller(listener, appListener),
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorksController) {
|
||||
this(new RemoteInterpreterEventPoller(listener, appListener, remoteWorksController),
|
||||
connectTimeout);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -32,10 +33,11 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
int connectTimeout,
|
||||
RemoteInterpreterProcessListener listener,
|
||||
ApplicationEventListener appListener,
|
||||
RemoteWorksController remoteWorksController,
|
||||
String host,
|
||||
int port
|
||||
) {
|
||||
super(connectTimeout, listener, appListener);
|
||||
super(connectTimeout, listener, appListener, remoteWorksController);
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -341,6 +341,14 @@ public class RemoteInterpreterServer
|
|||
RemoteZeppelinServerController response) throws TException {
|
||||
logger.info("clover remote zeppelin server controller feedback {}", response);
|
||||
logger.info("clover remote zeppelin server conteroller body {}", response.getMsg());
|
||||
|
||||
if (response.getType() == RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT) {
|
||||
List<ZeppelinServerResourceParagraphRunner> runners = gson.fromJson(response.getMsg(),
|
||||
new TypeToken<List<ZeppelinServerResourceParagraphRunner>>() {}.getType());
|
||||
for (ZeppelinServerResourceParagraphRunner r : runners) {
|
||||
logger.info("clover runner nid {} pid {}", r.getNoteId(), r.getParagraphId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class InterpretJobListener implements JobListener {
|
||||
|
|
@ -592,8 +600,8 @@ public class RemoteInterpreterServer
|
|||
@Override
|
||||
public void run() {
|
||||
ZeppelinServerResourceParagraphRunner test = new ZeppelinServerResourceParagraphRunner();
|
||||
test.setNoteId("TEST NOTE");
|
||||
test.setParagraphId("TEST PARAGRAPH");
|
||||
test.setNoteId(getNoteId());
|
||||
test.setParagraphId(getParagraphId());
|
||||
server.eventClient.getZeppelinServerNoteRunner("IamOWNER", test);
|
||||
server.eventClient.run(this);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
|
||||
public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
|
||||
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
|
||||
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
|
||||
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
|
||||
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
|
||||
public class RemoteInterpreterService {
|
||||
|
||||
public interface Iface {
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
|
||||
public class RemoteZeppelinServerController implements org.apache.thrift.TBase<RemoteZeppelinServerController, RemoteZeppelinServerController._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteZeppelinServerController> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteZeppelinServerController");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
|
||||
public class ZeppelinServerResource implements org.apache.thrift.TBase<ZeppelinServerResource, ZeppelinServerResource._Fields>, java.io.Serializable, Cloneable, Comparable<ZeppelinServerResource> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResource");
|
||||
|
||||
|
|
|
|||
|
|
@ -51,13 +51,12 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
|
||||
public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase<ZeppelinServerResourceParagraphRunner, ZeppelinServerResourceParagraphRunner._Fields>, java.io.Serializable, Cloneable, Comparable<ZeppelinServerResourceParagraphRunner> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner");
|
||||
|
||||
private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
|
||||
private static final org.apache.thrift.protocol.TField PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphId", org.apache.thrift.protocol.TType.STRING, (short)2);
|
||||
private static final org.apache.thrift.protocol.TField RUNNERS_FIELD_DESC = new org.apache.thrift.protocol.TField("runners", org.apache.thrift.protocol.TType.STRING, (short)3);
|
||||
|
||||
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
|
||||
static {
|
||||
|
|
@ -67,13 +66,11 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
|
||||
public String noteId; // required
|
||||
public String paragraphId; // required
|
||||
public String runners; // required
|
||||
|
||||
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
|
||||
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
|
||||
NOTE_ID((short)1, "noteId"),
|
||||
PARAGRAPH_ID((short)2, "paragraphId"),
|
||||
RUNNERS((short)3, "runners");
|
||||
PARAGRAPH_ID((short)2, "paragraphId");
|
||||
|
||||
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
|
||||
|
||||
|
|
@ -92,8 +89,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
return NOTE_ID;
|
||||
case 2: // PARAGRAPH_ID
|
||||
return PARAGRAPH_ID;
|
||||
case 3: // RUNNERS
|
||||
return RUNNERS;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
|
|
@ -141,8 +136,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
tmpMap.put(_Fields.PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("paragraphId", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
tmpMap.put(_Fields.RUNNERS, new org.apache.thrift.meta_data.FieldMetaData("runners", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
metaDataMap = Collections.unmodifiableMap(tmpMap);
|
||||
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ZeppelinServerResourceParagraphRunner.class, metaDataMap);
|
||||
}
|
||||
|
|
@ -152,13 +145,11 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
|
||||
public ZeppelinServerResourceParagraphRunner(
|
||||
String noteId,
|
||||
String paragraphId,
|
||||
String runners)
|
||||
String paragraphId)
|
||||
{
|
||||
this();
|
||||
this.noteId = noteId;
|
||||
this.paragraphId = paragraphId;
|
||||
this.runners = runners;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -171,9 +162,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
if (other.isSetParagraphId()) {
|
||||
this.paragraphId = other.paragraphId;
|
||||
}
|
||||
if (other.isSetRunners()) {
|
||||
this.runners = other.runners;
|
||||
}
|
||||
}
|
||||
|
||||
public ZeppelinServerResourceParagraphRunner deepCopy() {
|
||||
|
|
@ -184,7 +172,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
public void clear() {
|
||||
this.noteId = null;
|
||||
this.paragraphId = null;
|
||||
this.runners = null;
|
||||
}
|
||||
|
||||
public String getNoteId() {
|
||||
|
|
@ -235,30 +222,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
}
|
||||
}
|
||||
|
||||
public String getRunners() {
|
||||
return this.runners;
|
||||
}
|
||||
|
||||
public ZeppelinServerResourceParagraphRunner setRunners(String runners) {
|
||||
this.runners = runners;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetRunners() {
|
||||
this.runners = null;
|
||||
}
|
||||
|
||||
/** Returns true if field runners is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetRunners() {
|
||||
return this.runners != null;
|
||||
}
|
||||
|
||||
public void setRunnersIsSet(boolean value) {
|
||||
if (!value) {
|
||||
this.runners = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setFieldValue(_Fields field, Object value) {
|
||||
switch (field) {
|
||||
case NOTE_ID:
|
||||
|
|
@ -277,14 +240,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
}
|
||||
break;
|
||||
|
||||
case RUNNERS:
|
||||
if (value == null) {
|
||||
unsetRunners();
|
||||
} else {
|
||||
setRunners((String)value);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -296,9 +251,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
case PARAGRAPH_ID:
|
||||
return getParagraphId();
|
||||
|
||||
case RUNNERS:
|
||||
return getRunners();
|
||||
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
|
@ -314,8 +266,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
return isSetNoteId();
|
||||
case PARAGRAPH_ID:
|
||||
return isSetParagraphId();
|
||||
case RUNNERS:
|
||||
return isSetRunners();
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
|
@ -351,15 +301,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
return false;
|
||||
}
|
||||
|
||||
boolean this_present_runners = true && this.isSetRunners();
|
||||
boolean that_present_runners = true && that.isSetRunners();
|
||||
if (this_present_runners || that_present_runners) {
|
||||
if (!(this_present_runners && that_present_runners))
|
||||
return false;
|
||||
if (!this.runners.equals(that.runners))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -377,11 +318,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
if (present_paragraphId)
|
||||
list.add(paragraphId);
|
||||
|
||||
boolean present_runners = true && (isSetRunners());
|
||||
list.add(present_runners);
|
||||
if (present_runners)
|
||||
list.add(runners);
|
||||
|
||||
return list.hashCode();
|
||||
}
|
||||
|
||||
|
|
@ -413,16 +349,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
return lastComparison;
|
||||
}
|
||||
}
|
||||
lastComparison = Boolean.valueOf(isSetRunners()).compareTo(other.isSetRunners());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetRunners()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.runners, other.runners);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -458,14 +384,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
sb.append(this.paragraphId);
|
||||
}
|
||||
first = false;
|
||||
if (!first) sb.append(", ");
|
||||
sb.append("runners:");
|
||||
if (this.runners == null) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
sb.append(this.runners);
|
||||
}
|
||||
first = false;
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
|
@ -525,14 +443,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
case 3: // RUNNERS
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
|
||||
struct.runners = iprot.readString();
|
||||
struct.setRunnersIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
|
|
@ -558,11 +468,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
oprot.writeString(struct.paragraphId);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
if (struct.runners != null) {
|
||||
oprot.writeFieldBegin(RUNNERS_FIELD_DESC);
|
||||
oprot.writeString(struct.runners);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
oprot.writeFieldStop();
|
||||
oprot.writeStructEnd();
|
||||
}
|
||||
|
|
@ -587,25 +492,19 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
if (struct.isSetParagraphId()) {
|
||||
optionals.set(1);
|
||||
}
|
||||
if (struct.isSetRunners()) {
|
||||
optionals.set(2);
|
||||
}
|
||||
oprot.writeBitSet(optionals, 3);
|
||||
oprot.writeBitSet(optionals, 2);
|
||||
if (struct.isSetNoteId()) {
|
||||
oprot.writeString(struct.noteId);
|
||||
}
|
||||
if (struct.isSetParagraphId()) {
|
||||
oprot.writeString(struct.paragraphId);
|
||||
}
|
||||
if (struct.isSetRunners()) {
|
||||
oprot.writeString(struct.runners);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void read(org.apache.thrift.protocol.TProtocol prot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol iprot = (TTupleProtocol) prot;
|
||||
BitSet incoming = iprot.readBitSet(3);
|
||||
BitSet incoming = iprot.readBitSet(2);
|
||||
if (incoming.get(0)) {
|
||||
struct.noteId = iprot.readString();
|
||||
struct.setNoteIdIsSet(true);
|
||||
|
|
@ -614,10 +513,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
|
|||
struct.paragraphId = iprot.readString();
|
||||
struct.setParagraphIdIsSet(true);
|
||||
}
|
||||
if (incoming.get(2)) {
|
||||
struct.runners = iprot.readString();
|
||||
struct.setRunnersIsSet(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -79,8 +79,7 @@ enum RemoteZeppelinServerResourceType {
|
|||
|
||||
struct ZeppelinServerResourceParagraphRunner {
|
||||
1: string noteId,
|
||||
2: string paragraphId,
|
||||
3: string runners
|
||||
2: string paragraphId
|
||||
}
|
||||
|
||||
struct ZeppelinServerResource {
|
||||
|
|
|
|||
|
|
@ -74,6 +74,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
|
|||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -71,6 +71,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
|
|||
env,
|
||||
10 * 1000,
|
||||
this,
|
||||
null,
|
||||
null);
|
||||
|
||||
intpGroup.get("note").add(intp);
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ public class RemoteInterpreterProcessTest {
|
|||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
|
||||
INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
|
||||
10 * 1000, null, null);
|
||||
10 * 1000, null, null, null);
|
||||
assertFalse(rip.isRunning());
|
||||
assertEquals(0, rip.referenceCount());
|
||||
assertEquals(1, rip.reference(intpGroup));
|
||||
|
|
|
|||
|
|
@ -90,6 +90,7 @@ public class RemoteInterpreterTest {
|
|||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
|
||||
|
|
@ -108,6 +109,7 @@ public class RemoteInterpreterTest {
|
|||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
}
|
||||
|
||||
|
|
@ -207,6 +209,7 @@ public class RemoteInterpreterTest {
|
|||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
|
||||
|
||||
|
|
@ -223,6 +226,7 @@ public class RemoteInterpreterTest {
|
|||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
|
||||
intpGroup.get("note").add(intpB);
|
||||
|
|
@ -687,7 +691,8 @@ public class RemoteInterpreterTest {
|
|||
//Given
|
||||
final Client client = Mockito.mock(Client.class);
|
||||
final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
|
||||
MockInterpreterA.class.getName(), "runner", "path","localRepo", env, 10 * 1000, null, null);
|
||||
MockInterpreterA.class.getName(),
|
||||
"runner", "path","localRepo", env, 10 * 1000, null, null, null);
|
||||
final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
|
||||
registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
|
||||
final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
|
||||
|
|
@ -733,6 +738,7 @@ public class RemoteInterpreterTest {
|
|||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
null);
|
||||
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
|
|
|||
|
|
@ -70,6 +70,7 @@ public class DistributedResourcePoolTest {
|
|||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
@ -88,6 +89,7 @@ public class DistributedResourcePoolTest {
|
|||
env,
|
||||
10 * 1000,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
|
||||
|
|
@ -112,11 +114,11 @@ public class DistributedResourcePoolTest {
|
|||
intp1.open();
|
||||
intp2.open();
|
||||
|
||||
eventPoller1 = new RemoteInterpreterEventPoller(null, null);
|
||||
eventPoller1 = new RemoteInterpreterEventPoller(null, null, null);
|
||||
eventPoller1.setInterpreterGroup(intpGroup1);
|
||||
eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
|
||||
|
||||
eventPoller2 = new RemoteInterpreterEventPoller(null, null);
|
||||
eventPoller2 = new RemoteInterpreterEventPoller(null, null, null);
|
||||
eventPoller2.setInterpreterGroup(intpGroup2);
|
||||
eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());
|
||||
|
||||
|
|
|
|||
|
|
@ -81,6 +81,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
env,
|
||||
10 * 1000,
|
||||
this,
|
||||
null,
|
||||
null);
|
||||
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
|
@ -170,6 +171,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
|
|||
env,
|
||||
10 * 1000,
|
||||
this,
|
||||
null,
|
||||
null);
|
||||
|
||||
intpGroup.put("note", new LinkedList<Interpreter>());
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ import org.apache.zeppelin.interpreter.InterpreterFactory;
|
|||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.NotebookAuthorization;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
|
||||
import org.apache.zeppelin.remoteworks.RemoteWorksManager;
|
||||
import org.apache.zeppelin.rest.ConfigurationsRestApi;
|
||||
import org.apache.zeppelin.rest.CredentialRestApi;
|
||||
import org.apache.zeppelin.rest.HeliumRestApi;
|
||||
|
|
@ -91,6 +92,7 @@ public class ZeppelinServer extends Application {
|
|||
private NotebookAuthorization notebookAuthorization;
|
||||
private Credentials credentials;
|
||||
private DependencyResolver depResolver;
|
||||
private RemoteWorksManager remoteWorksManager;
|
||||
|
||||
public ZeppelinServer() throws Exception {
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
|
|
@ -118,6 +120,9 @@ public class ZeppelinServer extends Application {
|
|||
|
||||
notebook.addNotebookEventListener(heliumApplicationFactory);
|
||||
notebook.addNotebookEventListener(notebookWsServer.getNotebookInformationListener());
|
||||
|
||||
remoteWorksManager = new RemoteWorksManager(notebook);
|
||||
replFactory.setRemoteController(remoteWorksManager.getInstance());
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
|
|
|||
|
|
@ -112,6 +112,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
private Map<String, List<String>> interpreterBindings = new HashMap<>();
|
||||
private List<RemoteRepository> interpreterRepositories;
|
||||
|
||||
private RemoteWorksController remoteWorksController;
|
||||
|
||||
private Gson gson;
|
||||
|
||||
private InterpreterOption defaultOption;
|
||||
|
|
@ -141,7 +143,6 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
remoteInterpreterProcessListener, appEventListener, depResolver, shiroEnabled);
|
||||
}
|
||||
|
||||
|
||||
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
|
||||
|
|
@ -278,6 +279,14 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
}
|
||||
}
|
||||
|
||||
public RemoteWorksController getRemoteWorksController() {
|
||||
return remoteWorksController;
|
||||
}
|
||||
|
||||
public void setRemoteController(RemoteWorksController remoteController) {
|
||||
this.remoteWorksController = remoteController;
|
||||
}
|
||||
|
||||
private InterpreterSetting createFromInterpreterSettingRef(String name) {
|
||||
Preconditions.checkNotNull(name, "reference name should be not null");
|
||||
InterpreterSetting settingRef = interpreterSettingsRef.get(name);
|
||||
|
|
@ -1103,7 +1112,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(
|
||||
new RemoteInterpreter(property, noteId, className, host, port, connectTimeout, maxPoolSize,
|
||||
remoteInterpreterProcessListener, appEventListener));
|
||||
remoteInterpreterProcessListener, appEventListener, remoteWorksController));
|
||||
return intp;
|
||||
}
|
||||
|
||||
|
|
@ -1116,7 +1125,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
|
|||
RemoteInterpreter remoteInterpreter =
|
||||
new RemoteInterpreter(property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
|
||||
remoteInterpreterProcessListener, appEventListener);
|
||||
remoteInterpreterProcessListener, appEventListener, remoteWorksController);
|
||||
remoteInterpreter.addEnv(env);
|
||||
|
||||
return new LazyOpenInterpreter(remoteInterpreter);
|
||||
|
|
|
|||
|
|
@ -453,6 +453,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
|
||||
}
|
||||
|
||||
//cloverhearts
|
||||
List<InterpreterContextRunner> runners = new LinkedList<>();
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
runners.add(new ParagraphRunner(note, note.getId(), p.getId()));
|
||||
|
|
@ -482,6 +483,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
|
|||
return interpreterContext;
|
||||
}
|
||||
|
||||
public InterpreterContextRunner getInterpreterContextRunner() {
|
||||
return new ParagraphRunner(note, note.getId(), getId());
|
||||
}
|
||||
|
||||
static class ParagraphRunner extends InterpreterContextRunner {
|
||||
private transient Note note;
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.remoteworks;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.RemoteWorksController;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Zeppelin Server RemoteWorkController Singleton.
|
||||
*/
|
||||
public class RemoteWorksManager {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RemoteWorksManager.class);
|
||||
private static NotebookJobManager instance;
|
||||
|
||||
public RemoteWorksManager(Notebook notebook) {
|
||||
if (RemoteWorksManager.instance == null) {
|
||||
RemoteWorksManager.instance = new NotebookJobManager(notebook);
|
||||
}
|
||||
}
|
||||
|
||||
public static NotebookJobManager getInstance() {
|
||||
return RemoteWorksManager.instance;
|
||||
}
|
||||
|
||||
private class NotebookJobManager implements RemoteWorksController {
|
||||
private transient Notebook notebook;
|
||||
|
||||
public NotebookJobManager(Notebook notebook) {
|
||||
setNotebook(notebook);
|
||||
}
|
||||
|
||||
private void setNotebook(Notebook notebook) {
|
||||
this.notebook = notebook;
|
||||
}
|
||||
|
||||
private Notebook getNotebook() throws NullPointerException {
|
||||
if (notebook == null) {
|
||||
throw new NullPointerException("Notebook instance is Null");
|
||||
}
|
||||
return notebook;
|
||||
}
|
||||
|
||||
public List<InterpreterContextRunner> getRunner(String noteId) {
|
||||
List<InterpreterContextRunner> runners = new LinkedList<>();
|
||||
try {
|
||||
Note note = getNotebook().getNote(noteId);
|
||||
if (note != null) {
|
||||
for (Paragraph paragraph : note.getParagraphs()) {
|
||||
runners.add(paragraph.getInterpreterContextRunner());
|
||||
}
|
||||
}
|
||||
} catch (NullPointerException e) {
|
||||
LOG.warn(e.getMessage());
|
||||
}
|
||||
|
||||
return runners;
|
||||
}
|
||||
|
||||
public InterpreterContextRunner getRunner(String noteId, String paragraphId) {
|
||||
InterpreterContextRunner runner = null;
|
||||
try {
|
||||
Note note = getNotebook().getNote(noteId);
|
||||
if (note != null) {
|
||||
Paragraph paragraph = note.getParagraph(paragraphId);
|
||||
if (paragraph != null) {
|
||||
runner = paragraph.getInterpreterContextRunner();
|
||||
}
|
||||
}
|
||||
} catch (NullPointerException e) {
|
||||
LOG.warn(e.getMessage());
|
||||
}
|
||||
return runner;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue