Merge remote-tracking branch 'origin/master' into livyInterperter

This commit is contained in:
Prabhjyot Singh 2016-03-16 11:28:50 +05:30
commit ca06e916cf
25 changed files with 1787 additions and 48 deletions

View file

@ -24,7 +24,7 @@
# export ZEPPELIN_INTP_JAVA_OPTS # zeppelin interpreter process jvm options. Default = ZEPPELIN_JAVA_OPTS
# export ZEPPELIN_LOG_DIR # Where log files are stored. PWD by default.
# export ZEPPELIN_PID_DIR # The pid files are stored. /tmp by default.
# export ZEPPELIN_PID_DIR # The pid files are stored. ${ZEPPELIN_HOME}/run by default.
# export ZEPPELIN_WAR_TEMPDIR # The location of jetty temporary directory.
# export ZEPPELIN_NOTEBOOK_DIR # Where notebook saved
# export ZEPPELIN_NOTEBOOK_HOMESCREEN # Id of notebook to be displayed in homescreen. ex) 2A94M5J1Z

View file

@ -93,7 +93,7 @@ public class SparkInterpreter extends Interpreter {
getSystemDefault("MASTER", "spark.master", "local[*]"),
"Spark master uri. ex) spark://masterhost:7077")
.add("spark.executor.memory",
getSystemDefault(null, "spark.executor.memory", "512m"),
getSystemDefault(null, "spark.executor.memory", ""),
"Executor memory per worker instance. ex) 512m, 32g")
.add("spark.cores.max",
getSystemDefault(null, "spark.cores.max", ""),

View file

@ -260,7 +260,10 @@ public class RemoteInterpreterEventPoller extends Thread {
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
ByteBuffer res = client.resourceGet(resourceId.getName());
ByteBuffer res = client.resourceGet(
resourceId.getNoteId(),
resourceId.getParagraphId(),
resourceId.getName());
Object o = Resource.deserializeObject(res);
return o;
} catch (Exception e) {

View file

@ -343,12 +343,22 @@ public class RemoteInterpreterServer
}
String interpreterResultMessage = result.message();
InterpreterResult combinedResult;
if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) {
message += interpreterResultMessage;
return new InterpreterResult(result.code(), result.type(), message);
combinedResult = new InterpreterResult(result.code(), result.type(), message);
} else {
return new InterpreterResult(result.code(), outputType, message);
combinedResult = new InterpreterResult(result.code(), outputType, message);
}
// put result into resource pool
context.getResourcePool().put(
context.getNoteId(),
context.getParagraphId(),
WellKnownResourceName.ParagraphResult.toString(),
combinedResult);
return combinedResult;
} finally {
InterpreterContext.remove();
}
@ -651,9 +661,17 @@ public class RemoteInterpreterServer
}
@Override
public ByteBuffer resourceGet(String resourceName) throws TException {
public boolean resourceRemove(String noteId, String paragraphId, String resourceName)
throws TException {
Resource resource = resourcePool.remove(noteId, paragraphId, resourceName);
return resource != null;
}
@Override
public ByteBuffer resourceGet(String noteId, String paragraphId, String resourceName)
throws TException {
logger.debug("Request resourceGet {} from ZeppelinServer", resourceName);
Resource resource = resourcePool.get(resourceName, false);
Resource resource = resourcePool.get(noteId, paragraphId, resourceName, false);
if (resource == null || resource.get() == null || !resource.isSerializable()) {
return ByteBuffer.allocate(0);

View file

@ -16,7 +16,7 @@
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.3)
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-02-16")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
public class RemoteInterpreterContext implements org.apache.thrift.TBase<RemoteInterpreterContext, RemoteInterpreterContext._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterContext> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext");

View file

@ -16,7 +16,7 @@
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.3)
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-02-16")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterEvent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");

View file

@ -16,7 +16,7 @@
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.3)
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated

View file

@ -16,7 +16,7 @@
* limitations under the License.
*/
/**
* Autogenerated by Thrift Compiler (0.9.3)
* Autogenerated by Thrift Compiler (0.9.2)
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
@Generated(value = "Autogenerated by Thrift Compiler (0.9.3)", date = "2016-02-16")
@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-3-4")
public class RemoteInterpreterResult implements org.apache.thrift.TBase<RemoteInterpreterResult, RemoteInterpreterResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteInterpreterResult> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult");

View file

@ -33,6 +33,11 @@ public class DistributedResourcePool extends LocalResourcePool {
return get(name, true);
}
@Override
public Resource get(String noteId, String paragraphId, String name) {
return get(noteId, paragraphId, name, true);
}
/**
* get resource by name.
* @param name
@ -58,6 +63,35 @@ public class DistributedResourcePool extends LocalResourcePool {
}
}
/**
* get resource by name.
* @param name
* @param remote false only return from local resource
* @return null if resource not found.
*/
public Resource get(String noteId, String paragraphId, String name, boolean remote) {
// try local first
Resource resource = super.get(noteId, paragraphId, name);
if (resource != null) {
return resource;
}
if (remote) {
ResourceSet resources = connector.getAllResources()
.filterByNoteId(noteId)
.filterByParagraphId(paragraphId)
.filterByName(name);
if (resources.isEmpty()) {
return null;
} else {
return resources.get(0);
}
} else {
return null;
}
}
@Override
public ResourceSet getAll() {
return getAll(true);

View file

@ -52,6 +52,12 @@ public class LocalResourcePool implements ResourcePool {
return resources.get(resourceId);
}
@Override
public Resource get(String noteId, String paragraphId, String name) {
ResourceId resourceId = new ResourceId(resourcePoolId, noteId, paragraphId, name);
return resources.get(resourceId);
}
@Override
public ResourceSet getAll() {
return new ResourceSet(resources.values());
@ -70,8 +76,21 @@ public class LocalResourcePool implements ResourcePool {
resources.put(resourceId, resource);
}
@Override
public void put(String noteId, String paragraphId, String name, Object object) {
ResourceId resourceId = new ResourceId(resourcePoolId, noteId, paragraphId, name);
Resource resource = new Resource(resourceId, object);
resources.put(resourceId, resource);
}
@Override
public Resource remove(String name) {
return resources.remove(new ResourceId(resourcePoolId, name));
}
@Override
public Resource remove(String noteId, String paragraphId, String name) {
return resources.remove(new ResourceId(resourcePoolId, noteId, paragraphId, name));
}
}

View file

@ -22,9 +22,20 @@ package org.apache.zeppelin.resource;
public class ResourceId {
private final String resourcePoolId;
private final String name;
private final String noteId;
private final String paragraphId;
ResourceId(String resourcePoolId, String name) {
this.resourcePoolId = resourcePoolId;
this.noteId = null;
this.paragraphId = null;
this.name = name;
}
ResourceId(String resourcePoolId, String noteId, String paragraphId, String name) {
this.resourcePoolId = resourcePoolId;
this.noteId = noteId;
this.paragraphId = paragraphId;
this.name = name;
}
@ -36,16 +47,35 @@ public class ResourceId {
return name;
}
public String getNoteId() {
return noteId;
}
public String getParagraphId() {
return paragraphId;
}
@Override
public int hashCode() {
return (resourcePoolId + name).hashCode();
return (resourcePoolId + noteId + paragraphId + name).hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof ResourceId) {
ResourceId r = (ResourceId) o;
return (r.name.equals(name) && r.resourcePoolId.equals(resourcePoolId));
return equals(r.name, name) && equals(r.resourcePoolId, resourcePoolId) &&
equals(r.noteId, noteId) && equals(r.paragraphId, paragraphId);
} else {
return false;
}
}
private boolean equals(String a, String b) {
if (a == null && b == null) {
return true;
} else if (a != null && b != null) {
return a.equals(b);
} else {
return false;
}

View file

@ -33,6 +33,15 @@ public interface ResourcePool {
*/
public Resource get(String name);
/**
* Get resource from name
* @param noteId
* @param paragraphId
* @param name Resource name
* @return null if resource not found
*/
public Resource get(String noteId, String paragraphId, String name);
/**
* Get all resources
* @return
@ -46,10 +55,31 @@ public interface ResourcePool {
*/
public void put(String name, Object object);
/**
* Put an object into resource pool
* Given noteId and paragraphId is identifying resource along with name.
* Object will be automatically removed on related note or paragraph removal.
*
* @param noteId
* @param paragraphId
* @param name
* @param object
*/
public void put(String noteId, String paragraphId, String name, Object object);
/**
* Remove object
* @param name Resource name to remove
* @return removed Resource. null if resource not found
*/
public Resource remove(String name);
/**
* Remove object
* @param noteId
* @param paragraphId
* @param name Resource name to remove
* @return removed Resource. null if resource not found
*/
public Resource remove(String noteId, String paragraphId, String name);
}

View file

@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.resource;
import com.google.gson.Gson;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.slf4j.Logger;
import java.util.List;
/**
* Utilities for ResourcePool
*/
public class ResourcePoolUtils {
static Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolUtils.class);
public static ResourceSet getAllResources() {
return getAllResourcesExcept(null);
}
public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) {
ResourceSet resourceSet = new ResourceSet();
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
if (interpreterGroupExcludsion != null &&
intpGroup.getId().equals(interpreterGroupExcludsion)) {
continue;
}
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
ResourcePool localPool = intpGroup.getResourcePool();
if (localPool != null) {
resourceSet.addAll(localPool.getAll());
}
} else if (remoteInterpreterProcess.isRunning()) {
RemoteInterpreterService.Client client = null;
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
List<String> resourceList = client.resoucePoolGetAll();
Gson gson = new Gson();
for (String res : resourceList) {
resourceSet.add(gson.fromJson(res, Resource.class));
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
broken = true;
} finally {
if (client != null) {
intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
}
}
}
}
return resourceSet;
}
public static void removeResourcesBelongsToNote(String noteId) {
removeResourcesBelongsToParagraph(noteId, null);
}
public static void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
ResourceSet resourceSet = new ResourceSet();
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
ResourcePool localPool = intpGroup.getResourcePool();
if (localPool != null) {
resourceSet.addAll(localPool.getAll());
}
if (noteId != null) {
resourceSet = resourceSet.filterByNoteId(noteId);
}
if (paragraphId != null) {
resourceSet = resourceSet.filterByParagraphId(paragraphId);
}
for (Resource r : resourceSet) {
localPool.remove(
r.getResourceId().getNoteId(),
r.getResourceId().getParagraphId(),
r.getResourceId().getName());
}
} else if (remoteInterpreterProcess.isRunning()) {
RemoteInterpreterService.Client client = null;
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
List<String> resourceList = client.resoucePoolGetAll();
Gson gson = new Gson();
for (String res : resourceList) {
resourceSet.add(gson.fromJson(res, Resource.class));
}
if (noteId != null) {
resourceSet = resourceSet.filterByNoteId(noteId);
}
if (paragraphId != null) {
resourceSet = resourceSet.filterByParagraphId(paragraphId);
}
for (Resource r : resourceSet) {
client.resourceRemove(
r.getResourceId().getNoteId(),
r.getResourceId().getParagraphId(),
r.getResourceId().getName());
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
broken = true;
} finally {
if (client != null) {
intpGroup.getRemoteInterpreterProcess().releaseClient(client, broken);
}
}
}
}
}
}

View file

@ -72,4 +72,34 @@ public class ResourceSet extends LinkedList<Resource> {
}
return result;
}
public ResourceSet filterByNoteId(String noteId) {
ResourceSet result = new ResourceSet();
for (Resource r : this) {
if (equals(r.getResourceId().getNoteId(), noteId)) {
result.add(r);
}
}
return result;
}
public ResourceSet filterByParagraphId(String paragraphId) {
ResourceSet result = new ResourceSet();
for (Resource r : this) {
if (equals(r.getResourceId().getParagraphId(), paragraphId)) {
result.add(r);
}
}
return result;
}
private boolean equals(String a, String b) {
if (a == null && b == null) {
return true;
} else if (a != null && b != null) {
return a.equals(b);
} else {
return false;
}
}
}

View file

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.resource;
/**
* Well known resource names in ResourcePool
*/
public enum WellKnownResourceName {
ParagraphResult("zeppelin.paragraph.result"); // paragraph run result
String name;
WellKnownResourceName(String name) {
this.name = name;
}
public String toString() {
return name;
}
}

View file

@ -78,7 +78,9 @@ service RemoteInterpreterService {
// get all resources in the interpreter process
list<string> resoucePoolGetAll();
// get value of resource
binary resourceGet(1: string resourceName);
binary resourceGet(1: string noteId, 2: string paragraphId, 3: string resourceName);
// remove resource
bool resourceRemove(1: string noteId, 2: string paragraphId, 3:string resourceName);
void angularObjectUpdate(1: string name, 2: string noteId, 3: string paragraphId, 4: string
object);

View file

@ -29,6 +29,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
public class MockInterpreterResourcePool extends Interpreter {
@ -61,9 +62,18 @@ public class MockInterpreterResourcePool extends Interpreter {
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] stmt = st.split(" ");
String cmd = stmt[0];
String noteId = null;
String paragraphId = null;
String name = null;
if (stmt.length >= 2) {
name = stmt[1];
String[] npn = stmt[1].split(":");
if (npn.length == 3) {
noteId = npn[0];
paragraphId = npn[1];
name = npn[2];
} else {
name = stmt[1];
}
}
String value = null;
if (stmt.length == 3) {
@ -73,11 +83,16 @@ public class MockInterpreterResourcePool extends Interpreter {
ResourcePool resourcePool = context.getResourcePool();
Object ret = null;
if (cmd.equals("put")) {
resourcePool.put(name, value);
resourcePool.put(noteId, paragraphId, name, value);
} else if (cmd.equalsIgnoreCase("get")) {
ret = resourcePool.get(name).get();
Resource resource = resourcePool.get(noteId, paragraphId, name);
if (resource != null) {
ret = resourcePool.get(noteId, paragraphId, name).get();
} else {
ret = "";
}
} else if (cmd.equals("remove")) {
ret = resourcePool.remove(name);
ret = resourcePool.remove(noteId, paragraphId, name);
} else if (cmd.equals("getAll")) {
ret = resourcePool.getAll();
}

View file

@ -136,12 +136,13 @@ public class DistributedResourcePoolTest {
InterpreterResult ret;
intp1.interpret("put key1 value1", context);
intp2.interpret("put key2 value2", context);
int numInterpreterResult = 2;
ret = intp1.interpret("getAll", context);
assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size());
ret = intp2.interpret("getAll", context);
assertEquals(2, gson.fromJson(ret.message(), ResourceSet.class).size());
assertEquals(numInterpreterResult + 2, gson.fromJson(ret.message(), ResourceSet.class).size());
ret = intp1.interpret("get key1", context);
assertEquals("value1", gson.fromJson(ret.message(), String.class));
@ -201,4 +202,44 @@ public class DistributedResourcePoolTest {
assertEquals("value1", pool1.getAll().get(0).get());
assertEquals("value2", pool1.getAll().get(1).get());
}
@Test
public void testResourcePoolUtils() {
Gson gson = new Gson();
InterpreterResult ret;
// when create some resources
intp1.interpret("put note1:paragraph1:key1 value1", context);
intp1.interpret("put note1:paragraph2:key1 value2", context);
intp2.interpret("put note2:paragraph1:key1 value1", context);
intp2.interpret("put note2:paragraph2:key2 value2", context);
int numInterpreterResult = 2;
// then get all resources.
assertEquals(numInterpreterResult + 4, ResourcePoolUtils.getAllResources().size());
// when remove all resources from note1
ResourcePoolUtils.removeResourcesBelongsToNote("note1");
// then resources should be removed.
assertEquals(numInterpreterResult + 2, ResourcePoolUtils.getAllResources().size());
assertEquals("", gson.fromJson(
intp1.interpret("get note1:paragraph1:key1", context).message(),
String.class));
assertEquals("", gson.fromJson(
intp1.interpret("get note1:paragraph2:key1", context).message(),
String.class));
// when remove all resources from note2:paragraph1
ResourcePoolUtils.removeResourcesBelongsToParagraph("note2", "paragraph1");
// then 1
assertEquals(numInterpreterResult + 1, ResourcePoolUtils.getAllResources().size());
assertEquals("value2", gson.fromJson(
intp1.interpret("get note2:paragraph2:key2", context).message(),
String.class));
}
}

View file

@ -434,6 +434,8 @@ public class InterpreterFactory {
angularObjectRegistry = new AngularObjectRegistry(
id,
angularObjectRegistryListener);
// TODO(moon) : create distributed resource pool for local interpreters and set
}
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);

View file

@ -32,6 +32,7 @@ import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.utility.IdHashes;
import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@ -208,6 +209,8 @@ public class Note implements Serializable, JobListener {
* @return a paragraph that was deleted, or <code>null</code> otherwise
*/
public Paragraph removeParagraph(String paragraphId) {
removeAllAngularObjectInParagraph(paragraphId);
ResourcePoolUtils.removeResourcesBelongsToParagraph(id(), paragraphId);
synchronized (paragraphs) {
Iterator<Paragraph> i = paragraphs.iterator();
while (i.hasNext()) {
@ -220,7 +223,7 @@ public class Note implements Serializable, JobListener {
}
}
removeAllAngularObjectInParagraph(paragraphId);
return null;
}

View file

@ -40,6 +40,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.search.SearchService;
import org.quartz.CronScheduleBuilder;
@ -307,6 +308,8 @@ public class Notebook {
}
}
ResourcePoolUtils.removeResourcesBelongsToNote(id);
try {
note.unpersist();
} catch (IOException e) {

View file

@ -45,13 +45,20 @@ public class MockInterpreter1 extends Interpreter{
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
InterpreterResult result;
if ("getId".equals(st)) {
// get unique id of this interpreter instance
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
} else {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl1: " + st);
}
if (context.getResourcePool() != null) {
context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result);
}
return result;
}
@Override

View file

@ -45,7 +45,19 @@ public class MockInterpreter2 extends Interpreter{
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: "+st);
InterpreterResult result;
if ("getId".equals(st)) {
// get unique id of this interpreter instance
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "" + this.hashCode());
} else {
result = new InterpreterResult(InterpreterResult.Code.SUCCESS, "repl2: " + st);
}
if (context.getResourcePool() != null) {
context.getResourcePool().put(context.getNoteId(), context.getParagraphId(), "result", result);
}
return result;
}
@Override

View file

@ -38,6 +38,8 @@ import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
import org.apache.zeppelin.resource.LocalResourcePool;
import org.apache.zeppelin.resource.ResourcePoolUtils;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Job.Status;
import org.apache.zeppelin.scheduler.JobListener;
@ -325,6 +327,33 @@ public class NotebookTest implements JobListenerFactory{
assertEquals(cp.getResult().message(), p.getResult().message());
}
@Test
public void testResourceRemovealOnParagraphNoteRemove() throws IOException {
Note note = notebook.createNote();
note.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
for (InterpreterGroup intpGroup : InterpreterGroup.getAll()) {
intpGroup.setResourcePool(new LocalResourcePool(intpGroup.getId()));
}
Paragraph p1 = note.addParagraph();
p1.setText("hello");
Paragraph p2 = note.addParagraph();
p2.setText("%mock2 world");
note.runAll();
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
while(p2.isTerminated()==false || p2.getResult()==null) Thread.yield();
assertEquals(2, ResourcePoolUtils.getAllResources().size());
// remove a paragraph
note.removeParagraph(p1.getId());
assertEquals(1, ResourcePoolUtils.getAllResources().size());
// remove note
notebook.removeNote(note.id());
assertEquals(0, ResourcePoolUtils.getAllResources().size());
}
@Test
public void testAngularObjectRemovalOnNotebookRemove() throws InterruptedException,
IOException {