mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge remote-tracking branch 'origin/master' into livyInterperter
This commit is contained in:
commit
ca06e916cf
25 changed files with 1787 additions and 48 deletions
|
|
@ -24,7 +24,7 @@
|
|||
# export ZEPPELIN_INTP_JAVA_OPTS # zeppelin interpreter process jvm options. Default = ZEPPELIN_JAVA_OPTS
|
||||
|
||||
# export ZEPPELIN_LOG_DIR # Where log files are stored. PWD by default.
|
||||
# export ZEPPELIN_PID_DIR # The pid files are stored. /tmp by default.
|
||||
# export ZEPPELIN_PID_DIR # The pid files are stored. ${ZEPPELIN_HOME}/run by default.
|
||||
# export ZEPPELIN_WAR_TEMPDIR # The location of jetty temporary directory.
|
||||
# export ZEPPELIN_NOTEBOOK_DIR # Where notebook saved
|
||||
# export ZEPPELIN_NOTEBOOK_HOMESCREEN # Id of notebook to be displayed in homescreen. ex) 2A94M5J1Z
|
||||
|
|
|
|||
|
|
@ -93,7 +93,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
getSystemDefault("MASTER", "spark.master", "local[*]"),
|
||||
"Spark master uri. ex) spark://masterhost:7077")
|
||||
.add("spark.executor.memory",
|
||||
getSystemDefault(null, "spark.executor.memory", "512m"),
|
||||
getSystemDefault(null, "spark.executor.memory", ""),
|
||||
"Executor memory per worker instance. ex) 512m, 32g")
|
||||
.add("spark.cores.max",
|
||||
getSystemDefault(null, "spark.cores.max", ""),
|
||||
|
|
|
|||
|
|
@ -260,7 +260,10 @@ public class RemoteInterpreterEventPoller extends Thread {
|
|||
boolean broken = false;
|
||||
try {
|
||||
client = remoteInterpreterProcess.getClient();
|
||||
ByteBuffer res = client.resourceGet(resourceId.getName());
|
||||
ByteBuffer res = client.resourceGet(
|
||||
resourceId.getNoteId(),
|
||||
resourceId.getParagraphId(),
|
||||
resourceId.getName());
|
||||
Object o = Resource.deserializeObject(res);
|
||||
return o;
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -343,12 +343,22 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
String interpreterResultMessage = result.message();
|
||||
|
||||
InterpreterResult combinedResult;
|
||||
if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
|
||||
message += interpreterResultMessage;
|
||||
return new InterpreterResult(result.code(), result.type(), message);
|
||||
combinedResult = new InterpreterResult(result.code(), result.type(), message);
|
||||
} else {
|
||||
return new InterpreterResult(result.code(), outputType, message);
|
||||
combinedResult = new InterpreterResult(result.code(), outputType, message);
|
||||
}
|
||||
|
||||
// put result into resource pool
|
||||
context.getResourcePool().put(
|
||||
context.getNoteId(),
|
||||
context.getParagraphId(),
|
||||
WellKnownResourceName.ParagraphResult.toString(),
|
||||
combinedResult);
|
||||
return combinedResult;
|
||||
} finally {
|
||||
InterpreterContext.remove();
|
||||
}
|
||||
|
|
@ -651,9 +661,17 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer resourceGet(String resourceName) throws TException {
|
||||
public boolean resourceRemove(String noteId, String paragraphId, String resourceName)
|
||||
throws TException {
|
||||
Resource resource = resourcePool.remove(noteId, paragraphId, resourceName);
|
||||
return resource != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer resourceGet(String noteId, String paragraphId, String resourceName)
|
||||
throws TException {
|
||||
logger.debug("Request resourceGet {} from ZeppelinServer", resourceName);
|
||||
Resource resource = resourcePool.get(resourceName, false);
|
||||
Resource resource = resourcePool.get(noteId, paragraphId, resourceName, false);
|
||||
|
||||
if (resource == null || resource.get() == null || !resource.isSerializable()) {
|
||||
return ByteBuffer.allocate(0);
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.3)
|
||||
* Autogenerated by Thrift Compiler (0.9.2)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-02-16")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
|
||||
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.3)
|
||||
* Autogenerated by Thrift Compiler (0.9.2)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-02-16")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
|
||||
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.3)
|
||||
* Autogenerated by Thrift Compiler (0.9.2)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.3)
|
||||
* Autogenerated by Thrift Compiler (0.9.2)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
|
|
@ -51,7 +51,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-02-16")
|
||||
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
|
||||
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");
|
||||
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -33,6 +33,11 @@ public class DistributedResourcePool extends LocalResourcePool {
|
|||
return get(name, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource get(String noteId, String paragraphId, String name) {
|
||||
return get(noteId, paragraphId, name, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* get resource by name.
|
||||
* @param name
|
||||
|
|
@ -58,6 +63,35 @@ public class DistributedResourcePool extends LocalResourcePool {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get resource by name.
|
||||
* @param name
|
||||
* @param remote false only return from local resource
|
||||
* @return null if resource not found.
|
||||
*/
|
||||
public Resource get(String noteId, String paragraphId, String name, boolean remote) {
|
||||
// try local first
|
||||
Resource resource = super.get(noteId, paragraphId, name);
|
||||
if (resource != null) {
|
||||
return resource;
|
||||
}
|
||||
|
||||
if (remote) {
|
||||
ResourceSet resources = connector.getAllResources()
|
||||
.filterByNoteId(noteId)
|
||||
.filterByParagraphId(paragraphId)
|
||||
.filterByName(name);
|
||||
|
||||
if (resources.isEmpty()) {
|
||||
return null;
|
||||
} else {
|
||||
return resources.get(0);
|
||||
}
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceSet getAll() {
|
||||
return getAll(true);
|
||||
|
|
|
|||
|
|
@ -52,6 +52,12 @@ public class LocalResourcePool implements ResourcePool {
|
|||
return resources.get(resourceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource get(String noteId, String paragraphId, String name) {
|
||||
ResourceId resourceId = new ResourceId(resourcePoolId, noteId, paragraphId, name);
|
||||
return resources.get(resourceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceSet getAll() {
|
||||
return new ResourceSet(resources.values());
|
||||
|
|
@ -70,8 +76,21 @@ public class LocalResourcePool implements ResourcePool {
|
|||
resources.put(resourceId, resource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(String noteId, String paragraphId, String name, Object object) {
|
||||
ResourceId resourceId = new ResourceId(resourcePoolId, noteId, paragraphId, name);
|
||||
|
||||
Resource resource = new Resource(resourceId, object);
|
||||
resources.put(resourceId, resource);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource remove(String name) {
|
||||
return resources.remove(new ResourceId(resourcePoolId, name));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource remove(String noteId, String paragraphId, String name) {
|
||||
return resources.remove(new ResourceId(resourcePoolId, noteId, paragraphId, name));
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,9 +22,20 @@ package org.apache.zeppelin.resource;
|
|||
public class ResourceId {
|
||||
private final String resourcePoolId;
|
||||
private final String name;
|
||||
private final String noteId;
|
||||
private final String paragraphId;
|
||||
|
||||
ResourceId(String resourcePoolId, String name) {
|
||||
this.resourcePoolId = resourcePoolId;
|
||||
this.noteId = null;
|
||||
this.paragraphId = null;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
ResourceId(String resourcePoolId, String noteId, String paragraphId, String name) {
|
||||
this.resourcePoolId = resourcePoolId;
|
||||
this.noteId = noteId;
|
||||
this.paragraphId = paragraphId;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
|
|
@ -36,16 +47,35 @@ public class ResourceId {
|
|||
return name;
|
||||
}
|
||||
|
||||
public String getNoteId() {
|
||||
return noteId;
|
||||
}
|
||||
|
||||
public String getParagraphId() {
|
||||
return paragraphId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return (resourcePoolId + name).hashCode();
|
||||
return (resourcePoolId + noteId + paragraphId + name).hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o instanceof ResourceId) {
|
||||
ResourceId r = (ResourceId) o;
|
||||
return (r.name.equals(name) && r.resourcePoolId.equals(resourcePoolId));
|
||||
return equals(r.name, name) && equals(r.resourcePoolId, resourcePoolId) &&
|
||||
equals(r.noteId, noteId) && equals(r.paragraphId, paragraphId);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private boolean equals(String a, String b) {
|
||||
if (a == null && b == null) {
|
||||
return true;
|
||||
} else if (a != null && b != null) {
|
||||
return a.equals(b);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,6 +33,15 @@ public interface ResourcePool {
|
|||
*/
|
||||
public Resource get(String name);
|
||||
|
||||
/**
|
||||
* Get resource from name
|
||||
* @param noteId
|
||||
* @param paragraphId
|
||||
* @param name Resource name
|
||||
* @return null if resource not found
|
||||
*/
|
||||
public Resource get(String noteId, String paragraphId, String name);
|
||||
|
||||
/**
|
||||
* Get all resources
|
||||
* @return
|
||||
|
|
@ -46,10 +55,31 @@ public interface ResourcePool {
|
|||
*/
|
||||
public void put(String name, Object object);
|
||||
|
||||
/**
|
||||
* Put an object into resource pool
|
||||
* Given noteId and paragraphId is identifying resource along with name.
|
||||
* Object will be automatically removed on related note or paragraph removal.
|
||||
*
|
||||
* @param noteId
|
||||
* @param paragraphId
|
||||
* @param name
|
||||
* @param object
|
||||
*/
|
||||
public void put(String noteId, String paragraphId, String name, Object object);
|
||||
|
||||
/**
|
||||
* Remove object
|
||||
* @param name Resource name to remove
|
||||
* @return removed Resource. null if resource not found
|
||||
*/
|
||||
public Resource remove(String name);
|
||||
|
||||
/**
|
||||
* Remove object
|
||||
* @param noteId
|
||||
* @param paragraphId
|
||||
* @param name Resource name to remove
|
||||
* @return removed Resource. null if resource not found
|
||||
*/
|
||||
public Resource remove(String noteId, String paragraphId, String name);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,136 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.resource;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Utilities for ResourcePool
|
||||
*/
|
||||
public class ResourcePoolUtils {
|
||||
static Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolUtils.class);
|
||||
|
||||
public static ResourceSet getAllResources() {
|
||||
return getAllResourcesExcept(null);
|
||||
}
|
||||
|
||||
public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
|
||||
ResourceSet resourceSet = new ResourceSet();
|
||||
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
|
||||
if (interpreterGroupExcludsion != null &&
|
||||
intpGroup.getId().equals(interpreterGroupExcludsion)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
|
||||
if (remoteInterpreterProcess == null) {
|
||||
ResourcePool localPool = intpGroup.getResourcePool();
|
||||
if (localPool != null) {
|
||||
resourceSet.addAll(localPool.getAll());
|
||||
}
|
||||
} else if (remoteInterpreterProcess.isRunning()) {
|
||||
RemoteInterpreterService.Client client = null;
|
||||
boolean broken = false;
|
||||
try {
|
||||
client = remoteInterpreterProcess.getClient();
|
||||
List<String> resourceList = client.resoucePoolGetAll();
|
||||
Gson gson = new Gson();
|
||||
for (String res : resourceList) {
|
||||
resourceSet.add(gson.fromJson(res, Resource.class));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
broken = true;
|
||||
} finally {
|
||||
if (client != null) {
|
||||
intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return resourceSet;
|
||||
}
|
||||
|
||||
public static void removeResourcesBelongsToNote(String noteId) {
|
||||
removeResourcesBelongsToParagraph(noteId, null);
|
||||
}
|
||||
|
||||
public static void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
|
||||
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
|
||||
ResourceSet resourceSet = new ResourceSet();
|
||||
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
|
||||
if (remoteInterpreterProcess == null) {
|
||||
ResourcePool localPool = intpGroup.getResourcePool();
|
||||
if (localPool != null) {
|
||||
resourceSet.addAll(localPool.getAll());
|
||||
}
|
||||
if (noteId != null) {
|
||||
resourceSet = resourceSet.filterByNoteId(noteId);
|
||||
}
|
||||
if (paragraphId != null) {
|
||||
resourceSet = resourceSet.filterByParagraphId(paragraphId);
|
||||
}
|
||||
|
||||
for (Resource r : resourceSet) {
|
||||
localPool.remove(
|
||||
r.getResourceId().getNoteId(),
|
||||
r.getResourceId().getParagraphId(),
|
||||
r.getResourceId().getName());
|
||||
}
|
||||
} else if (remoteInterpreterProcess.isRunning()) {
|
||||
RemoteInterpreterService.Client client = null;
|
||||
boolean broken = false;
|
||||
try {
|
||||
client = remoteInterpreterProcess.getClient();
|
||||
List<String> resourceList = client.resoucePoolGetAll();
|
||||
Gson gson = new Gson();
|
||||
for (String res : resourceList) {
|
||||
resourceSet.add(gson.fromJson(res, Resource.class));
|
||||
}
|
||||
|
||||
if (noteId != null) {
|
||||
resourceSet = resourceSet.filterByNoteId(noteId);
|
||||
}
|
||||
if (paragraphId != null) {
|
||||
resourceSet = resourceSet.filterByParagraphId(paragraphId);
|
||||
}
|
||||
|
||||
for (Resource r : resourceSet) {
|
||||
client.resourceRemove(
|
||||
r.getResourceId().getNoteId(),
|
||||
r.getResourceId().getParagraphId(),
|
||||
r.getResourceId().getName());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error(e.getMessage(), e);
|
||||
broken = true;
|
||||
} finally {
|
||||
if (client != null) {
|
||||
intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -72,4 +72,34 @@ public class ResourceSet extends LinkedList<Resource> {
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public ResourceSet filterByNoteId(String noteId) {
|
||||
ResourceSet result = new ResourceSet();
|
||||
for (Resource r : this) {
|
||||
if (equals(r.getResourceId().getNoteId(), noteId)) {
|
||||
result.add(r);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public ResourceSet filterByParagraphId(String paragraphId) {
|
||||
ResourceSet result = new ResourceSet();
|
||||
for (Resource r : this) {
|
||||
if (equals(r.getResourceId().getParagraphId(), paragraphId)) {
|
||||
result.add(r);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private boolean equals(String a, String b) {
|
||||
if (a == null && b == null) {
|
||||
return true;
|
||||
} else if (a != null && b != null) {
|
||||
return a.equals(b);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.resource;
|
||||
|
||||
/**
|
||||
* Well known resource names in ResourcePool
|
||||
*/
|
||||
public enum WellKnownResourceName {
|
||||
ParagraphResult("zeppelin.paragraph.result"); // paragraph run result
|
||||
|
||||
String name;
|
||||
WellKnownResourceName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return name;
|
||||
}
|
||||
}
|
||||
|
|
@ -78,7 +78,9 @@ service RemoteInterpreterService {
|
|||
// get all resources in the interpreter process
|
||||
list<string> resoucePoolGetAll();
|
||||
// get value of resource
|
||||
binary resourceGet(1: string resourceName);
|
||||
binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName);
|
||||
// remove resource
|
||||
bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName);
|
||||
|
||||
void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
|
||||
object);
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
|
|||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.resource.Resource;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
|
||||
public class MockInterpreterResourcePool extends Interpreter {
|
||||
|
|
@ -61,9 +62,18 @@ public class MockInterpreterResourcePool extends Interpreter {
|
|||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] stmt = st.split(" ");
|
||||
String cmd = stmt[0];
|
||||
String noteId = null;
|
||||
String paragraphId = null;
|
||||
String name = null;
|
||||
if (stmt.length >= 2) {
|
||||
name = stmt[1];
|
||||
String[] npn = stmt[1].split(":");
|
||||
if (npn.length == 3) {
|
||||
noteId = npn[0];
|
||||
paragraphId = npn[1];
|
||||
name = npn[2];
|
||||
} else {
|
||||
name = stmt[1];
|
||||
}
|
||||
}
|
||||
String value = null;
|
||||
if (stmt.length == 3) {
|
||||
|
|
@ -73,11 +83,16 @@ public class MockInterpreterResourcePool extends Interpreter {
|
|||
ResourcePool resourcePool = context.getResourcePool();
|
||||
Object ret = null;
|
||||
if (cmd.equals("put")) {
|
||||
resourcePool.put(name, value);
|
||||
resourcePool.put(noteId, paragraphId, name, value);
|
||||
} else if (cmd.equalsIgnoreCase("get")) {
|
||||
ret = resourcePool.get(name).get();
|
||||
Resource resource = resourcePool.get(noteId, paragraphId, name);
|
||||
if (resource != null) {
|
||||
ret = resourcePool.get(noteId, paragraphId, name).get();
|
||||
} else {
|
||||
ret = "";
|
||||
}
|
||||
} else if (cmd.equals("remove")) {
|
||||
ret = resourcePool.remove(name);
|
||||
ret = resourcePool.remove(noteId, paragraphId, name);
|
||||
} else if (cmd.equals("getAll")) {
|
||||
ret = resourcePool.getAll();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -136,12 +136,13 @@ public class DistributedResourcePoolTest {
|
|||
InterpreterResult ret;
|
||||
intp1.interpret("put key1 value1", context);
|
||||
intp2.interpret("put key2 value2", context);
|
||||
int numInterpreterResult = 2;
|
||||
|
||||
ret = intp1.interpret("getAll", context);
|
||||
assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
|
||||
assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size());
|
||||
|
||||
ret = intp2.interpret("getAll", context);
|
||||
assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
|
||||
assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size());
|
||||
|
||||
ret = intp1.interpret("get key1", context);
|
||||
assertEquals("value1", gson.fromJson(ret.message(), String.class));
|
||||
|
|
@ -201,4 +202,44 @@ public class DistributedResourcePoolTest {
|
|||
assertEquals("value1", pool1.getAll().get(0).get());
|
||||
assertEquals("value2", pool1.getAll().get(1).get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourcePoolUtils() {
|
||||
Gson gson = new Gson();
|
||||
InterpreterResult ret;
|
||||
|
||||
// when create some resources
|
||||
intp1.interpret("put note1:paragraph1:key1 value1", context);
|
||||
intp1.interpret("put note1:paragraph2:key1 value2", context);
|
||||
intp2.interpret("put note2:paragraph1:key1 value1", context);
|
||||
intp2.interpret("put note2:paragraph2:key2 value2", context);
|
||||
|
||||
int numInterpreterResult = 2;
|
||||
|
||||
// then get all resources.
|
||||
assertEquals(numInterpreterResult + 4, ResourcePoolUtils.getAllResources().size());
|
||||
|
||||
// when remove all resources from note1
|
||||
ResourcePoolUtils.removeResourcesBelongsToNote("note1");
|
||||
|
||||
// then resources should be removed.
|
||||
assertEquals(numInterpreterResult + 2, ResourcePoolUtils.getAllResources().size());
|
||||
assertEquals("", gson.fromJson(
|
||||
intp1.interpret("get note1:paragraph1:key1", context).message(),
|
||||
String.class));
|
||||
assertEquals("", gson.fromJson(
|
||||
intp1.interpret("get note1:paragraph2:key1", context).message(),
|
||||
String.class));
|
||||
|
||||
|
||||
// when remove all resources from note2:paragraph1
|
||||
ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
|
||||
|
||||
// then 1
|
||||
assertEquals(numInterpreterResult + 1, ResourcePoolUtils.getAllResources().size());
|
||||
assertEquals("value2", gson.fromJson(
|
||||
intp1.interpret("get note2:paragraph2:key2", context).message(),
|
||||
String.class));
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -434,6 +434,8 @@ public class InterpreterFactory {
|
|||
angularObjectRegistry = new AngularObjectRegistry(
|
||||
id,
|
||||
angularObjectRegistryListener);
|
||||
|
||||
// TODO(moon) : create distributed resource pool for local interpreters and set
|
||||
}
|
||||
|
||||
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ import org.apache.zeppelin.interpreter.*;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
import org.apache.zeppelin.notebook.utility.IdHashes;
|
||||
import org.apache.zeppelin.resource.ResourcePoolUtils;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
|
|
@ -208,6 +209,8 @@ public class Note implements Serializable, JobListener {
|
|||
* @return a paragraph that was deleted, or <code>null</code> otherwise
|
||||
*/
|
||||
public Paragraph removeParagraph(String paragraphId) {
|
||||
removeAllAngularObjectInParagraph(paragraphId);
|
||||
ResourcePoolUtils.removeResourcesBelongsToParagraph(id(), paragraphId);
|
||||
synchronized (paragraphs) {
|
||||
Iterator<Paragraph> i = paragraphs.iterator();
|
||||
while (i.hasNext()) {
|
||||
|
|
@ -220,7 +223,7 @@ public class Note implements Serializable, JobListener {
|
|||
}
|
||||
}
|
||||
|
||||
removeAllAngularObjectInParagraph(paragraphId);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
|
||||
import org.apache.zeppelin.resource.ResourcePoolUtils;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.apache.zeppelin.search.SearchService;
|
||||
import org.quartz.CronScheduleBuilder;
|
||||
|
|
@ -307,6 +308,8 @@ public class Notebook {
|
|||
}
|
||||
}
|
||||
|
||||
ResourcePoolUtils.removeResourcesBelongsToNote(id);
|
||||
|
||||
try {
|
||||
note.unpersist();
|
||||
} catch (IOException e) {
|
||||
|
|
|
|||
|
|
@ -45,13 +45,20 @@ public class MockInterpreter1 extends Interpreter{
|
|||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
InterpreterResult result;
|
||||
|
||||
if ("getId".equals(st)) {
|
||||
// get unique id of this interpreter instance
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
|
||||
} else {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
|
||||
}
|
||||
|
||||
if (context.getResourcePool() != null) {
|
||||
context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -45,7 +45,19 @@ public class MockInterpreter2 extends Interpreter{
|
|||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: "+st);
|
||||
InterpreterResult result;
|
||||
|
||||
if ("getId".equals(st)) {
|
||||
// get unique id of this interpreter instance
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
|
||||
} else {
|
||||
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
|
||||
}
|
||||
|
||||
if (context.getResourcePool() != null) {
|
||||
context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
|
|||
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
|
||||
import org.apache.zeppelin.notebook.repo.NotebookRepo;
|
||||
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
|
||||
import org.apache.zeppelin.resource.LocalResourcePool;
|
||||
import org.apache.zeppelin.resource.ResourcePoolUtils;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Job.Status;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
|
|
@ -325,6 +327,33 @@ public class NotebookTest implements JobListenerFactory{
|
|||
assertEquals(cp.getResult().message(), p.getResult().message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourceRemovealOnParagraphNoteRemove() throws IOException {
|
||||
Note note = notebook.createNote();
|
||||
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
|
||||
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
|
||||
intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
|
||||
}
|
||||
Paragraph p1 = note.addParagraph();
|
||||
p1.setText("hello");
|
||||
Paragraph p2 = note.addParagraph();
|
||||
p2.setText("%mock2 world");
|
||||
|
||||
note.runAll();
|
||||
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
|
||||
while(p2.isTerminated()==false || p2.getResult()==null) Thread.yield();
|
||||
|
||||
assertEquals(2, ResourcePoolUtils.getAllResources().size());
|
||||
|
||||
// remove a paragraph
|
||||
note.removeParagraph(p1.getId());
|
||||
assertEquals(1, ResourcePoolUtils.getAllResources().size());
|
||||
|
||||
// remove note
|
||||
notebook.removeNote(note.id());
|
||||
assertEquals(0, ResourcePoolUtils.getAllResources().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAngularObjectRemovalOnNotebookRemove() throws InterruptedException,
|
||||
IOException {
|
||||
|
|
|
|||
Loading…
Reference in a new issue