add remote works controller class and include interpreter factory

This commit is contained in:
CloverHearts 2016-11-14 21:22:51 +09:00
parent 6e1f219993
commit 0570ae804b
29 changed files with 269 additions and 141 deletions

View file

@ -328,6 +328,19 @@ public class ZeppelinContext {
throw new InterpreterException("Paragraph " + id + " not found");
}
/**
* get Zeppelin Paragraph Runner from zeppelin server
*/
@ZeppelinApi
public List<InterpreterContextRunner> getInterpreterContextRunner(
String noteId, String paragraphId) {
List<InterpreterContextRunner> runners = new LinkedList<>();
return runners;
}
/**
* Run paragraph at idx
* @param idx

View file

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import java.util.List;
/**
* zeppelin job for Remote works controller by interpreter
*
*/
public interface RemoteWorksController {
List<InterpreterContextRunner> getRunner(String noteId);
InterpreterContextRunner getRunner(String noteId, String paragraphId);
}

View file

@ -45,6 +45,7 @@ import com.google.gson.reflect.TypeToken;
public class RemoteInterpreter extends Interpreter {
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
private final ApplicationEventListener applicationEventListener;
private final RemoteWorksController remoteWorksController;
Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
Gson gson = new Gson();
private String interpreterRunner;
@ -72,7 +73,8 @@ public class RemoteInterpreter extends Interpreter {
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener) {
ApplicationEventListener appListener,
RemoteWorksController remoteWorksController) {
super(property);
this.noteId = noteId;
this.className = className;
@ -85,6 +87,7 @@ public class RemoteInterpreter extends Interpreter {
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
this.remoteWorksController = remoteWorksController;
}
@ -100,7 +103,8 @@ public class RemoteInterpreter extends Interpreter {
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener) {
ApplicationEventListener appListener,
RemoteWorksController remoteWorksController) {
super(property);
this.noteId = noteId;
this.className = className;
@ -111,6 +115,7 @@ public class RemoteInterpreter extends Interpreter {
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
this.remoteWorksController = remoteWorksController;
}
@ -125,7 +130,8 @@ public class RemoteInterpreter extends Interpreter {
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener) {
ApplicationEventListener appListener,
RemoteWorksController remoteWorksController) {
super(property);
this.className = className;
this.noteId = noteId;
@ -138,6 +144,7 @@ public class RemoteInterpreter extends Interpreter {
this.maxPoolSize = 10;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
this.remoteWorksController = remoteWorksController;
}
private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
@ -181,13 +188,15 @@ public class RemoteInterpreter extends Interpreter {
connectTimeout,
remoteInterpreterProcessListener,
applicationEventListener,
remoteWorksController,
host,
port);
} else {
// create new remote process
remoteProcess = new RemoteInterpreterManagedProcess(
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
remoteInterpreterProcessListener, applicationEventListener);
remoteInterpreterProcessListener, applicationEventListener,
remoteWorksController);
}
intpGroup.setRemoteInterpreterProcess(remoteProcess);

View file

@ -61,6 +61,7 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector {
RemoteZeppelinServerController eventBody = new RemoteZeppelinServerController();
eventBody.setType(RemoteZeppelinServerControlEvent.REQ_RESOURCE_PARAGRAPH_RUN_CONTEXT);
eventBody.setEventOwnerKey(eventOwnerKey);
logger.info("clover gson.toJson(runner) - " + gson.toJson(runner));
eventBody.setMsg(gson.toJson(runner));
sendEvent(new RemoteInterpreterEvent(

View file

@ -25,6 +25,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
@ -61,12 +62,15 @@ public class RemoteInterpreterEventPoller extends Thread {
private RemoteInterpreterProcess interpreterProcess;
private InterpreterGroup interpreterGroup;
private RemoteWorksController remoteWorkController;
public RemoteInterpreterEventPoller(
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
ApplicationEventListener appListener,
RemoteWorksController remoteWorkController) {
this.listener = listener;
this.appListener = appListener;
this.remoteWorkController = remoteWorkController;
shutdown = false;
}
@ -78,6 +82,14 @@ public class RemoteInterpreterEventPoller extends Thread {
this.interpreterGroup = interpreterGroup;
}
public RemoteWorksController getRemoteWorkController() {
return remoteWorkController;
}
public void setRemoteWorkController(RemoteWorksController remoteWorkController) {
this.remoteWorkController = remoteWorkController;
}
@Override
public void run() {
Client client = null;
@ -223,15 +235,37 @@ public class RemoteInterpreterEventPoller extends Thread {
boolean broken = false;
try {
interpreterServer = interpreterProcess.getClient();
List<InterpreterContextRunner> interpreterContextRunners = new LinkedList<>();
List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
if (event.getType() == RemoteZeppelinServerControlEvent.REQ_RESOURCE_PARAGRAPH_RUN_CONTEXT) {
ZeppelinServerResourceParagraphRunner runner = gson.fromJson(
event.getMsg(), ZeppelinServerResourceParagraphRunner.class);
logger.info("clover req note id {} p id {} - msg {}",
runner.getNoteId(), runner.getParagraphId(), event.getMsg());
RemoteZeppelinServerController resResource = new RemoteZeppelinServerController();
resResource.setType(RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT);
resResource.setEventOwnerKey(eventOwnerKey);
resResource.setMsg("test123123");
if (runner.getParagraphId() != null) {
InterpreterContextRunner intpRunner = remoteWorkController.getRunner(
runner.getNoteId(), runner.getParagraphId());
interpreterContextRunners.add(intpRunner);
} else {
interpreterContextRunners = remoteWorkController.getRunner(runner.getNoteId());
}
logger.info("clover remotework count 1 {}", interpreterContextRunners.size());
for (InterpreterContextRunner r : interpreterContextRunners) {
remoteRunners.add(
new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
);
}
logger.info("clover remotework count 2 {}", remoteRunners.size());
resResource.setMsg(gson.toJson(remoteRunners));
interpreterServer.remoteZeppelinServerControlFeedback(resResource);
logger.info("get runner noteid {} paragraphid {}",
runner.getNoteId(), runner.getParagraphId());

View file

@ -21,6 +21,7 @@ import org.apache.commons.exec.*;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -43,6 +44,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
private int port = -1;
private final String interpreterDir;
private final String localRepoDir;
private RemoteWorksController remoteWorksController;
private Map<String, String> env;
@ -53,13 +55,15 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
Map<String, String> env,
int connectTimeout,
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
super(new RemoteInterpreterEventPoller(listener, appListener),
ApplicationEventListener appListener,
RemoteWorksController remoteWorksController) {
super(new RemoteInterpreterEventPoller(listener, appListener, remoteWorksController),
connectTimeout);
this.interpreterRunner = intpRunner;
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
this.remoteWorksController = remoteWorksController;
}
@ -75,6 +79,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
this.env = env;
this.interpreterDir = intpDir;
this.localRepoDir = localRepoDir;
this.remoteWorksController = remoteInterpreterEventPoller.getRemoteWorkController();
}
@Override

View file

@ -24,6 +24,7 @@ import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,8 +49,9 @@ public abstract class RemoteInterpreterProcess {
public RemoteInterpreterProcess(
int connectTimeout,
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
this(new RemoteInterpreterEventPoller(listener, appListener),
ApplicationEventListener appListener,
RemoteWorksController remoteWorksController) {
this(new RemoteInterpreterEventPoller(listener, appListener, remoteWorksController),
connectTimeout);
}

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter.remote;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -32,10 +33,11 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
int connectTimeout,
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener,
RemoteWorksController remoteWorksController,
String host,
int port
) {
super(connectTimeout, listener, appListener);
super(connectTimeout, listener, appListener, remoteWorksController);
this.host = host;
this.port = port;
}

View file

@ -341,6 +341,14 @@ public class RemoteInterpreterServer
RemoteZeppelinServerController response) throws TException {
logger.info("clover remote zeppelin server controller feedback {}", response);
logger.info("clover remote zeppelin server conteroller body {}", response.getMsg());
if (response.getType() == RemoteZeppelinServerControlEvent.RES_RESOURCE_PARAGRAPH_RUN_CONTEXT) {
List<ZeppelinServerResourceParagraphRunner> runners = gson.fromJson(response.getMsg(),
new TypeToken<List<ZeppelinServerResourceParagraphRunner>>() {}.getType());
for (ZeppelinServerResourceParagraphRunner r : runners) {
logger.info("clover runner nid {} pid {}", r.getNoteId(), r.getParagraphId());
}
}
}
class InterpretJobListener implements JobListener {
@ -592,8 +600,8 @@ public class RemoteInterpreterServer
@Override
public void run() {
ZeppelinServerResourceParagraphRunner test = new ZeppelinServerResourceParagraphRunner();
test.setNoteId("TEST NOTE");
test.setParagraphId("TEST PARAGRAPH");
test.setNoteId(getNoteId());
test.setParagraphId(getParagraphId());
server.eventClient.getZeppelinServerNoteRunner("IamOWNER", test);
server.eventClient.run(this);
}

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class InterpreterCompletion implements org.apache.thrift.TBase<InterpreterCompletion, InterpreterCompletion._Fields>, java.io.Serializable, Cloneable, Comparable<InterpreterCompletion> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class RemoteInterpreterService {
public interface Iface {

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class RemoteZeppelinServerController implements org.apache.thrift.TBase<RemoteZeppelinServerController, RemoteZeppelinServerController._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteZeppelinServerController> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteZeppelinServerController");

View file

@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class ZeppelinServerResource implements org.apache.thrift.TBase<ZeppelinServerResource, ZeppelinServerResource._Fields>, java.io.Serializable, Cloneable, Comparable<ZeppelinServerResource> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResource");

View file

@ -51,13 +51,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-11")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-11-14")
public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.TBase<ZeppelinServerResourceParagraphRunner, ZeppelinServerResourceParagraphRunner._Fields>, java.io.Serializable, Cloneable, Comparable<ZeppelinServerResourceParagraphRunner> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ZeppelinServerResourceParagraphRunner");
private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.thrift.protocol.TField PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphId", org.apache.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.thrift.protocol.TField RUNNERS_FIELD_DESC = new org.apache.thrift.protocol.TField("runners", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@ -67,13 +66,11 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
public String noteId; // required
public String paragraphId; // required
public String runners; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
NOTE_ID((short)1, "noteId"),
PARAGRAPH_ID((short)2, "paragraphId"),
RUNNERS((short)3, "runners");
PARAGRAPH_ID((short)2, "paragraphId");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@ -92,8 +89,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
return NOTE_ID;
case 2: // PARAGRAPH_ID
return PARAGRAPH_ID;
case 3: // RUNNERS
return RUNNERS;
default:
return null;
}
@ -141,8 +136,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("paragraphId", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.RUNNERS, new org.apache.thrift.meta_data.FieldMetaData("runners", org.apache.thrift.TFieldRequirementType.DEFAULT,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ZeppelinServerResourceParagraphRunner.class, metaDataMap);
}
@ -152,13 +145,11 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
public ZeppelinServerResourceParagraphRunner(
String noteId,
String paragraphId,
String runners)
String paragraphId)
{
this();
this.noteId = noteId;
this.paragraphId = paragraphId;
this.runners = runners;
}
/**
@ -171,9 +162,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
if (other.isSetParagraphId()) {
this.paragraphId = other.paragraphId;
}
if (other.isSetRunners()) {
this.runners = other.runners;
}
}
public ZeppelinServerResourceParagraphRunner deepCopy() {
@ -184,7 +172,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
public void clear() {
this.noteId = null;
this.paragraphId = null;
this.runners = null;
}
public String getNoteId() {
@ -235,30 +222,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
}
}
public String getRunners() {
return this.runners;
}
public ZeppelinServerResourceParagraphRunner setRunners(String runners) {
this.runners = runners;
return this;
}
public void unsetRunners() {
this.runners = null;
}
/** Returns true if field runners is set (has been assigned a value) and false otherwise */
public boolean isSetRunners() {
return this.runners != null;
}
public void setRunnersIsSet(boolean value) {
if (!value) {
this.runners = null;
}
}
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case NOTE_ID:
@ -277,14 +240,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
}
break;
case RUNNERS:
if (value == null) {
unsetRunners();
} else {
setRunners((String)value);
}
break;
}
}
@ -296,9 +251,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
case PARAGRAPH_ID:
return getParagraphId();
case RUNNERS:
return getRunners();
}
throw new IllegalStateException();
}
@ -314,8 +266,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
return isSetNoteId();
case PARAGRAPH_ID:
return isSetParagraphId();
case RUNNERS:
return isSetRunners();
}
throw new IllegalStateException();
}
@ -351,15 +301,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
return false;
}
boolean this_present_runners = true && this.isSetRunners();
boolean that_present_runners = true && that.isSetRunners();
if (this_present_runners || that_present_runners) {
if (!(this_present_runners && that_present_runners))
return false;
if (!this.runners.equals(that.runners))
return false;
}
return true;
}
@ -377,11 +318,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
if (present_paragraphId)
list.add(paragraphId);
boolean present_runners = true && (isSetRunners());
list.add(present_runners);
if (present_runners)
list.add(runners);
return list.hashCode();
}
@ -413,16 +349,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
return lastComparison;
}
}
lastComparison = Boolean.valueOf(isSetRunners()).compareTo(other.isSetRunners());
if (lastComparison != 0) {
return lastComparison;
}
if (isSetRunners()) {
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.runners, other.runners);
if (lastComparison != 0) {
return lastComparison;
}
}
return 0;
}
@ -458,14 +384,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
sb.append(this.paragraphId);
}
first = false;
if (!first) sb.append(", ");
sb.append("runners:");
if (this.runners == null) {
sb.append("null");
} else {
sb.append(this.runners);
}
first = false;
sb.append(")");
return sb.toString();
}
@ -525,14 +443,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
case 3: // RUNNERS
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
struct.runners = iprot.readString();
struct.setRunnersIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@ -558,11 +468,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
oprot.writeString(struct.paragraphId);
oprot.writeFieldEnd();
}
if (struct.runners != null) {
oprot.writeFieldBegin(RUNNERS_FIELD_DESC);
oprot.writeString(struct.runners);
oprot.writeFieldEnd();
}
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@ -587,25 +492,19 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
if (struct.isSetParagraphId()) {
optionals.set(1);
}
if (struct.isSetRunners()) {
optionals.set(2);
}
oprot.writeBitSet(optionals, 3);
oprot.writeBitSet(optionals, 2);
if (struct.isSetNoteId()) {
oprot.writeString(struct.noteId);
}
if (struct.isSetParagraphId()) {
oprot.writeString(struct.paragraphId);
}
if (struct.isSetRunners()) {
oprot.writeString(struct.runners);
}
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, ZeppelinServerResourceParagraphRunner struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
BitSet incoming = iprot.readBitSet(3);
BitSet incoming = iprot.readBitSet(2);
if (incoming.get(0)) {
struct.noteId = iprot.readString();
struct.setNoteIdIsSet(true);
@ -614,10 +513,6 @@ public class ZeppelinServerResourceParagraphRunner implements org.apache.thrift.
struct.paragraphId = iprot.readString();
struct.setParagraphIdIsSet(true);
}
if (incoming.get(2)) {
struct.runners = iprot.readString();
struct.setRunnersIsSet(true);
}
}
}

View file

@ -79,8 +79,7 @@ enum RemoteZeppelinServerResourceType {
struct ZeppelinServerResourceParagraphRunner {
1: string noteId,
2: string paragraphId,
3: string runners
2: string paragraphId
}
struct ZeppelinServerResource {

View file

@ -74,6 +74,7 @@ public class RemoteAngularObjectTest implements AngularObjectRegistryListener {
env,
10 * 1000,
null,
null,
null
);

View file

@ -71,6 +71,7 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce
env,
10 * 1000,
this,
null,
null);
intpGroup.get("note").add(intp);

View file

@ -43,7 +43,7 @@ public class RemoteInterpreterProcessTest {
InterpreterGroup intpGroup = new InterpreterGroup();
RemoteInterpreterManagedProcess rip = new RemoteInterpreterManagedProcess(
INTERPRETER_SCRIPT, "nonexists", "fakeRepo", new HashMap<String, String>(),
10 * 1000, null, null);
10 * 1000, null, null, null);
assertFalse(rip.isRunning());
assertEquals(0, rip.referenceCount());
assertEquals(1, rip.reference(intpGroup));

View file

@ -90,6 +90,7 @@ public class RemoteInterpreterTest {
env,
10 * 1000,
null,
null,
null);
}
@ -108,6 +109,7 @@ public class RemoteInterpreterTest {
env,
10 * 1000,
null,
null,
null);
}
@ -207,6 +209,7 @@ public class RemoteInterpreterTest {
env,
10 * 1000,
null,
null,
null);
@ -223,6 +226,7 @@ public class RemoteInterpreterTest {
env,
10 * 1000,
null,
null,
null);
intpGroup.get("note").add(intpB);
@ -687,7 +691,8 @@ public class RemoteInterpreterTest {
//Given
final Client client = Mockito.mock(Client.class);
final RemoteInterpreter intr = new RemoteInterpreter(new Properties(), "noteId",
MockInterpreterA.class.getName(), "runner", "path","localRepo", env, 10 * 1000, null, null);
MockInterpreterA.class.getName(),
"runner", "path","localRepo", env, 10 * 1000, null, null, null);
final AngularObjectRegistry registry = new AngularObjectRegistry("spark", null);
registry.add("name", "DuyHai DOAN", "nodeId", "paragraphId");
final InterpreterGroup interpreterGroup = new InterpreterGroup("groupId");
@ -733,6 +738,7 @@ public class RemoteInterpreterTest {
env,
10 * 1000,
null,
null,
null);
intpGroup.put("note", new LinkedList<Interpreter>());

View file

@ -70,6 +70,7 @@ public class DistributedResourcePoolTest {
env,
10 * 1000,
null,
null,
null
);
@ -88,6 +89,7 @@ public class DistributedResourcePoolTest {
env,
10 * 1000,
null,
null,
null
);
@ -112,11 +114,11 @@ public class DistributedResourcePoolTest {
intp1.open();
intp2.open();
eventPoller1 = new RemoteInterpreterEventPoller(null, null);
eventPoller1 = new RemoteInterpreterEventPoller(null, null, null);
eventPoller1.setInterpreterGroup(intpGroup1);
eventPoller1.setInterpreterProcess(intpGroup1.getRemoteInterpreterProcess());
eventPoller2 = new RemoteInterpreterEventPoller(null, null);
eventPoller2 = new RemoteInterpreterEventPoller(null, null, null);
eventPoller2.setInterpreterGroup(intpGroup2);
eventPoller2.setInterpreterProcess(intpGroup2.getRemoteInterpreterProcess());

View file

@ -81,6 +81,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
env,
10 * 1000,
this,
null,
null);
intpGroup.put("note", new LinkedList<Interpreter>());
@ -170,6 +171,7 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener {
env,
10 * 1000,
this,
null,
null);
intpGroup.put("note", new LinkedList<Interpreter>());

View file

@ -39,6 +39,7 @@ import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
import org.apache.zeppelin.remoteworks.RemoteWorksManager;
import org.apache.zeppelin.rest.ConfigurationsRestApi;
import org.apache.zeppelin.rest.CredentialRestApi;
import org.apache.zeppelin.rest.HeliumRestApi;
@ -91,6 +92,7 @@ public class ZeppelinServer extends Application {
private NotebookAuthorization notebookAuthorization;
private Credentials credentials;
private DependencyResolver depResolver;
private RemoteWorksManager remoteWorksManager;
public ZeppelinServer() throws Exception {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
@ -118,6 +120,9 @@ public class ZeppelinServer extends Application {
notebook.addNotebookEventListener(heliumApplicationFactory);
notebook.addNotebookEventListener(notebookWsServer.getNotebookInformationListener());
remoteWorksManager = new RemoteWorksManager(notebook);
replFactory.setRemoteController(remoteWorksManager.getInstance());
}
public static void main(String[] args) throws InterruptedException {

View file

@ -112,6 +112,8 @@ public class InterpreterFactory implements InterpreterGroupFactory {
private Map<String, List<String>> interpreterBindings = new HashMap<>();
private List<RemoteRepository> interpreterRepositories;
private RemoteWorksController remoteWorksController;
private Gson gson;
private InterpreterOption defaultOption;
@ -141,7 +143,6 @@ public class InterpreterFactory implements InterpreterGroupFactory {
remoteInterpreterProcessListener, appEventListener, depResolver, shiroEnabled);
}
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
AngularObjectRegistryListener angularObjectRegistryListener,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
@ -278,6 +279,14 @@ public class InterpreterFactory implements InterpreterGroupFactory {
}
}
public RemoteWorksController getRemoteWorksController() {
return remoteWorksController;
}
public void setRemoteController(RemoteWorksController remoteController) {
this.remoteWorksController = remoteController;
}
private InterpreterSetting createFromInterpreterSettingRef(String name) {
Preconditions.checkNotNull(name, "reference name should be not null");
InterpreterSetting settingRef = interpreterSettingsRef.get(name);
@ -1103,7 +1112,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(
new RemoteInterpreter(property, noteId, className, host, port, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener));
remoteInterpreterProcessListener, appEventListener, remoteWorksController));
return intp;
}
@ -1116,7 +1125,7 @@ public class InterpreterFactory implements InterpreterGroupFactory {
RemoteInterpreter remoteInterpreter =
new RemoteInterpreter(property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout, maxPoolSize,
remoteInterpreterProcessListener, appEventListener);
remoteInterpreterProcessListener, appEventListener, remoteWorksController);
remoteInterpreter.addEnv(env);
return new LazyOpenInterpreter(remoteInterpreter);

View file

@ -453,6 +453,7 @@ public class Paragraph extends Job implements Serializable, Cloneable {
resourcePool = intpGroup.getInterpreterGroup(getUser(), note.getId()).getResourcePool();
}
//cloverhearts
List<InterpreterContextRunner> runners = new LinkedList<>();
for (Paragraph p : note.getParagraphs()) {
runners.add(new ParagraphRunner(note, note.getId(), p.getId()));
@ -482,6 +483,10 @@ public class Paragraph extends Job implements Serializable, Cloneable {
return interpreterContext;
}
public InterpreterContextRunner getInterpreterContextRunner() {
return new ParagraphRunner(note, note.getId(), getId());
}
static class ParagraphRunner extends InterpreterContextRunner {
private transient Note note;

View file

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.remoteworks;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.RemoteWorksController;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
/**
* Zeppelin Server RemoteWorkController Singleton.
*/
public class RemoteWorksManager {
private static final Logger LOG = LoggerFactory.getLogger(RemoteWorksManager.class);
private static NotebookJobManager instance;
public RemoteWorksManager(Notebook notebook) {
if (RemoteWorksManager.instance == null) {
RemoteWorksManager.instance = new NotebookJobManager(notebook);
}
}
public static NotebookJobManager getInstance() {
return RemoteWorksManager.instance;
}
private class NotebookJobManager implements RemoteWorksController {
private transient Notebook notebook;
public NotebookJobManager(Notebook notebook) {
setNotebook(notebook);
}
private void setNotebook(Notebook notebook) {
this.notebook = notebook;
}
private Notebook getNotebook() throws NullPointerException {
if (notebook == null) {
throw new NullPointerException("Notebook instance is Null");
}
return notebook;
}
public List<InterpreterContextRunner> getRunner(String noteId) {
List<InterpreterContextRunner> runners = new LinkedList<>();
try {
Note note = getNotebook().getNote(noteId);
if (note != null) {
for (Paragraph paragraph : note.getParagraphs()) {
runners.add(paragraph.getInterpreterContextRunner());
}
}
} catch (NullPointerException e) {
LOG.warn(e.getMessage());
}
return runners;
}
public InterpreterContextRunner getRunner(String noteId, String paragraphId) {
InterpreterContextRunner runner = null;
try {
Note note = getNotebook().getNote(noteId);
if (note != null) {
Paragraph paragraph = note.getParagraph(paragraphId);
if (paragraph != null) {
runner = paragraph.getInterpreterContextRunner();
}
}
} catch (NullPointerException e) {
LOG.warn(e.getMessage());
}
return runner;
}
}
}