Unload app on paragraph / note removal as well as interpreter unbind

This commit is contained in:
Lee moon soo 2016-04-03 12:14:23 +09:00
parent 4eaeea7205
commit 7aeb64addf
11 changed files with 304 additions and 48 deletions

View file

@ -61,7 +61,6 @@ public class InterpreterOutput extends OutputStream {
public void setType(InterpreterResult.Type type) {
if (this.type != type) {
clear();
flushListener.onUpdate(this, new byte[]{});
this.type = type;
}
}
@ -74,6 +73,8 @@ public class InterpreterOutput extends OutputStream {
if (watcher != null) {
watcher.clear();
}
flushListener.onUpdate(this, new byte[]{});
}
}

View file

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import javax.net.ssl.SSLContext;
import javax.servlet.DispatcherType;
@ -101,6 +102,8 @@ public class ZeppelinServer extends Application {
heliumApplicationFactory.setNotebook(notebook);
// to update fire websocket event on application event.
heliumApplicationFactory.setApplicationEventListener(notebookWsServer);
notebook.addNotebookEventListener(heliumApplicationFactory);
}
public static void main(String[] args) throws InterruptedException {

View file

@ -20,48 +20,40 @@ import com.google.gson.Gson;
import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.notebook.ApplicationState;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.scheduler.ExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.annotation.ExceptionProxy;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* HeliumApplicationFactory
*
* 1. sync api -> async api
* 2. unload app when paragraph / note / interpreter remove
* 3. front-end job
* 4. example app
* 5. dev mode
* 6. app launcher
* 7. offline mode. front-end table data / pivot panel access
*/
public class HeliumApplicationFactory implements ApplicationEventListener {
public class HeliumApplicationFactory implements ApplicationEventListener, NotebookEventListener {
private final Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class);
private final ExecutorService executor;
private final Gson gson;
private final Gson gson = new Gson();
private Notebook notebook;
private ApplicationEventListener applicationEventListener;
public HeliumApplicationFactory(ExecutorService executor) {
gson = new Gson();
this.executor = executor;
public HeliumApplicationFactory() {
executor = ExecutorFactory.singleton().createOrGet(
HeliumApplicationFactory.class.getName(), 10);
}
private boolean isRemote(InterpreterGroup group) {
return group.getAngularObjectRegistry() instanceof RemoteAngularObjectRegistry;
}
@ -325,21 +317,6 @@ public class HeliumApplicationFactory implements ApplicationEventListener {
}
}
public void unloadAllInTheInterpreterProcess(RemoteInterpreterProcess process) {
// TODO
}
public void unloadAllInTheNote(Note note) {
// TODO
}
public void unloadAllInTheParagraph(Paragraph paragraph) {
// TODO
}
@Override
public void onOutputAppend(String noteId, String paragraphId, String appId, String output) {
ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
@ -406,4 +383,40 @@ public class HeliumApplicationFactory implements ApplicationEventListener {
public void setApplicationEventListener(ApplicationEventListener applicationEventListener) {
this.applicationEventListener = applicationEventListener;
}
@Override
public void onNoteRemove(Note note) {
}
@Override
public void onNoteCreate(Note note) {
}
@Override
public void onUnbindInterpreter(Note note, InterpreterSetting setting) {
for (Paragraph p : note.getParagraphs()) {
Interpreter currentInterpreter = p.getCurrentRepl();
List<InterpreterSetting.InterpreterInfo> infos = setting.getInterpreterInfos();
for (InterpreterSetting.InterpreterInfo info : infos) {
if (info.getClassName().equals(currentInterpreter.getClassName())) {
onParagraphRemove(p);
break;
}
}
}
}
@Override
public void onParagraphRemove(Paragraph paragraph) {
List<ApplicationState> appStates = paragraph.getAllApplicationStates();
for (ApplicationState app : appStates) {
unload(paragraph, app.getId());
}
}
@Override
public void onParagraphCreate(Paragraph p) {
}
}

View file

@ -69,6 +69,7 @@ public class Note implements Serializable, JobListener {
private transient NotebookRepo repo;
private transient SearchService index;
private transient ScheduledFuture delayedPersist;
private transient NoteEventListener noteEventListener;
/**
* note configurations.
@ -88,11 +89,12 @@ public class Note implements Serializable, JobListener {
public Note() {}
public Note(NotebookRepo repo, NoteInterpreterLoader replLoader,
JobListenerFactory jlFactory, SearchService noteIndex) {
JobListenerFactory jlFactory, SearchService noteIndex, NoteEventListener noteEventListener) {
this.repo = repo;
this.replLoader = replLoader;
this.jobListenerFactory = jlFactory;
this.index = noteIndex;
this.noteEventListener = noteEventListener;
generateId();
}
@ -158,6 +160,9 @@ public class Note implements Serializable, JobListener {
synchronized (paragraphs) {
paragraphs.add(p);
}
if (noteEventListener != null) {
noteEventListener.onParagraphCreate(p);
}
return p;
}
@ -187,6 +192,9 @@ public class Note implements Serializable, JobListener {
synchronized (paragraphs) {
paragraphs.add(newParagraph);
}
if (noteEventListener != null) {
noteEventListener.onParagraphCreate(newParagraph);
}
}
/**
@ -199,6 +207,9 @@ public class Note implements Serializable, JobListener {
synchronized (paragraphs) {
paragraphs.add(index, p);
}
if (noteEventListener != null) {
noteEventListener.onParagraphCreate(p);
}
return p;
}
@ -218,12 +229,14 @@ public class Note implements Serializable, JobListener {
if (p.getId().equals(paragraphId)) {
index.deleteIndexDoc(this, p);
i.remove();
if (noteEventListener != null) {
noteEventListener.onParagraphRemove(p);
}
return p;
}
}
}
return null;
}
@ -513,4 +526,13 @@ public class Note implements Serializable, JobListener {
@Override
public void onProgressUpdate(Job job, int progress) {}
public NoteEventListener getNoteEventListener() {
return noteEventListener;
}
public void setNoteEventListener(NoteEventListener noteEventListener) {
this.noteEventListener = noteEventListener;
}
}

View file

@ -0,0 +1,9 @@
package org.apache.zeppelin.notebook;
/**
* NoteEventListener
*/
public interface NoteEventListener {
public void onParagraphRemove(Paragraph p);
public void onParagraphCreate(Paragraph p);
}

View file

@ -34,7 +34,6 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
@ -63,7 +62,7 @@ import com.google.gson.stream.JsonReader;
/**
* Collection of Notes.
*/
public class Notebook {
public class Notebook implements NoteEventListener {
static Logger logger = LoggerFactory.getLogger(Notebook.class);
@SuppressWarnings("unused") @Deprecated //TODO(bzz): remove unused
@ -79,6 +78,8 @@ public class Notebook {
private NotebookRepo notebookRepo;
private SearchService notebookIndex;
private NotebookAuthorization notebookAuthorization;
private final List<NotebookEventListener> notebookEventListeners =
Collections.synchronizedList(new LinkedList<NotebookEventListener>());
/**
* Main constructor \w manual Dependency Injection
@ -147,7 +148,7 @@ public class Notebook {
*/
public Note createNote(List<String> interpreterIds) throws IOException {
NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory);
Note note = new Note(notebookRepo, intpLoader, jobListenerFactory, notebookIndex);
Note note = new Note(notebookRepo, intpLoader, jobListenerFactory, notebookIndex, this);
intpLoader.setNoteId(note.id());
synchronized (notes) {
notes.put(note.id(), note);
@ -158,6 +159,7 @@ public class Notebook {
notebookIndex.addIndexDoc(note);
note.persist();
fireNoteCreateEvent(note);
return note;
}
@ -209,7 +211,7 @@ public class Notebook {
logger.error(e.toString(), e);
throw e;
}
return newNote;
}
@ -246,10 +248,17 @@ public class Notebook {
}
public void bindInterpretersToNote(String id,
List<String> interpreterSettingIds) throws IOException {
List<String> newBindings) throws IOException {
Note note = getNote(id);
if (note != null) {
note.getNoteReplLoader().setInterpreters(interpreterSettingIds);
List<InterpreterSetting> currentBindings = note.getNoteReplLoader().getInterpreterSettings();
for (InterpreterSetting setting : currentBindings) {
if (!newBindings.contains(setting.id())) {
fireUnbindInterpreter(note, setting);
}
}
note.getNoteReplLoader().setInterpreters(newBindings);
// comment out while note.getNoteReplLoader().setInterpreters(...) do the same
// replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds);
}
@ -311,6 +320,8 @@ public class Notebook {
ResourcePoolUtils.removeResourcesBelongsToNote(id);
fireNoteRemoveEvent(note);
try {
note.unpersist();
} catch (IOException e) {
@ -372,6 +383,8 @@ public class Notebook {
}
}
note.setNoteEventListener(this);
synchronized (notes) {
notes.put(note.id(), note);
refreshCron(note.id());
@ -395,6 +408,7 @@ public class Notebook {
}
}
}
return note;
}
@ -597,4 +611,39 @@ public class Notebook {
this.notebookIndex.close();
}
public void addNotebookEventListener(NotebookEventListener listener) {
notebookEventListeners.add(listener);
}
private void fireNoteCreateEvent(Note note) {
for (NotebookEventListener listener : notebookEventListeners) {
listener.onNoteCreate(note);
}
}
private void fireNoteRemoveEvent(Note note) {
for (NotebookEventListener listener : notebookEventListeners) {
listener.onNoteRemove(note);
}
}
private void fireUnbindInterpreter(Note note, InterpreterSetting setting) {
for (NotebookEventListener listener : notebookEventListeners) {
listener.onUnbindInterpreter(note, setting);
}
}
@Override
public void onParagraphRemove(Paragraph p) {
for (NotebookEventListener listener : notebookEventListeners) {
listener.onParagraphRemove(p);
}
}
@Override
public void onParagraphCreate(Paragraph p) {
for (NotebookEventListener listener : notebookEventListeners) {
listener.onParagraphCreate(p);
}
}
}

View file

@ -0,0 +1,13 @@
package org.apache.zeppelin.notebook;
import org.apache.zeppelin.interpreter.InterpreterSetting;
/**
* Notebook event
*/
public interface NotebookEventListener extends NoteEventListener {
public void onNoteRemove(Note note);
public void onNoteCreate(Note note);
public void onUnbindInterpreter(Note note, InterpreterSetting setting);
}

View file

@ -37,6 +37,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
@ -74,8 +75,7 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
ExecutorService executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
heliumAppFactory = new HeliumApplicationFactory(executor);
heliumAppFactory = new HeliumApplicationFactory();
depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo");
factory = new InterpreterFactory(conf,
@ -97,6 +97,8 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
notebookAuthorization);
heliumAppFactory.setNotebook(notebook);
notebook.addNotebookEventListener(heliumAppFactory);
}
@After
@ -133,16 +135,100 @@ public class HeliumApplicationFactoryTest implements JobListenerFactory {
String appId = heliumAppFactory.loadAndRun(pkg1, p1);
assertEquals(1, p1.getAllApplicationStates().size());
ApplicationState app = p1.getApplicationState(appId);
Thread.sleep(1000); // wait for enough time
Thread.sleep(500); // wait for enough time
// then
assertEquals("Hello world", app.getOutput());
assertEquals("Hello world 1", app.getOutput());
// when
heliumAppFactory.run(p1, appId);
Thread.sleep(500); // wait for enough time
// then
assertEquals("Hello world 2", app.getOutput());
// clean
heliumAppFactory.unload(p1, appId);
notebook.removeNote(note1.getId());
}
@Test
public void testUnloadOnParagraphRemove() throws IOException {
// given
HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
"name1",
"desc1",
"",
HeliumTestApplication.class.getName(),
new String[][]{});
Note note1 = notebook.createNote();
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
Paragraph p1 = note1.addParagraph();
// make sure interpreter process running
p1.setText("job");
note1.run(p1.getId());
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
assertEquals(0, p1.getAllApplicationStates().size());
String appId = heliumAppFactory.loadAndRun(pkg1, p1);
ApplicationState app = p1.getApplicationState(appId);
while (app.getStatus() != ApplicationState.Status.LOADED) {
Thread.yield();
}
// when remove paragraph
note1.removeParagraph(p1.getId());
while (app.getStatus() != ApplicationState.Status.UNLOADED) {
Thread.yield();
}
// then
assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());
// clean
notebook.removeNote(note1.getId());
}
@Test
public void testUnloadOnInterpreterUnbind() throws IOException {
// given
HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
"name1",
"desc1",
"",
HeliumTestApplication.class.getName(),
new String[][]{});
Note note1 = notebook.createNote();
note1.getNoteReplLoader().setInterpreters(factory.getDefaultInterpreterSettingList());
Paragraph p1 = note1.addParagraph();
// make sure interpreter process running
p1.setText("job");
note1.run(p1.getId());
while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
assertEquals(0, p1.getAllApplicationStates().size());
String appId = heliumAppFactory.loadAndRun(pkg1, p1);
ApplicationState app = p1.getApplicationState(appId);
while (app.getStatus() != ApplicationState.Status.LOADED) {
Thread.yield();
}
// when unbind interpreter
note1.getNoteReplLoader().setInterpreters(new LinkedList<String>());
// then
assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());
// clean
notebook.removeNote(note1.getId());
}
@Override
public ParagraphJobListener getParagraphJobListener(Note note) {

View file

@ -19,8 +19,10 @@ package org.apache.zeppelin.helium;
import org.apache.zeppelin.resource.ResourceSet;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
public class HeliumTestApplication extends Application {
AtomicInteger numRun = new AtomicInteger(0);
public HeliumTestApplication(ResourceSet args, ApplicationContext context)
throws ApplicationException {
super(args, context);
@ -29,7 +31,8 @@ public class HeliumTestApplication extends Application {
@Override
public void run() throws ApplicationException {
try {
context().out.write("Hello world");
context().out.clear();
context().out.write("Hello world " + numRun.incrementAndGet());
context().out.flush();
} catch (IOException e) {
throw new ApplicationException(e);

View file

@ -27,6 +27,7 @@ import static org.mockito.Mockito.mock;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@ -625,6 +626,62 @@ public class NotebookTest implements JobListenerFactory{
notebook.removeNote(note1.getId());
}
@Test
public void testNotebookEventListener() throws IOException {
final AtomicInteger onNoteRemove = new AtomicInteger(0);
final AtomicInteger onNoteCreate = new AtomicInteger(0);
final AtomicInteger onParagraphRemove = new AtomicInteger(0);
final AtomicInteger onParagraphCreate = new AtomicInteger(0);
final AtomicInteger unbindInterpreter = new AtomicInteger(0);
notebook.addNotebookEventListener(new NotebookEventListener() {
@Override
public void onNoteRemove(Note note) {
onNoteRemove.incrementAndGet();
}
@Override
public void onNoteCreate(Note note) {
onNoteCreate.incrementAndGet();
}
@Override
public void onUnbindInterpreter(Note note, InterpreterSetting setting) {
unbindInterpreter.incrementAndGet();
}
@Override
public void onParagraphRemove(Paragraph p) {
onParagraphRemove.incrementAndGet();
}
@Override
public void onParagraphCreate(Paragraph p) {
onParagraphCreate.incrementAndGet();
}
});
Note note1 = notebook.createNote();
assertEquals(1, onNoteCreate.get());
Paragraph p1 = note1.addParagraph();
assertEquals(1, onParagraphCreate.get());
note1.addCloneParagraph(p1);
assertEquals(2, onParagraphCreate.get());
note1.removeParagraph(p1.getId());
assertEquals(1, onParagraphRemove.get());
List<String> settings = notebook.getBindedInterpreterSettingsIds(note1.id());
notebook.bindInterpretersToNote(note1.id(), new LinkedList<String>());
assertEquals(settings.size(), unbindInterpreter.get());
notebook.removeNote(note1.getId());
assertEquals(1, onNoteRemove.get());
assertEquals(1, onParagraphRemove.get());
}
private void delete(File file){
if(file.isFile()) file.delete();

View file

@ -251,7 +251,7 @@ public class LuceneSearchTest {
}
private Note newNote(String name) {
Note note = new Note(notebookRepoMock, replLoaderMock, null, notebookIndex);
Note note = new Note(notebookRepoMock, replLoaderMock, null, notebookIndex, null);
note.setName(name);
return note;
}