ZEPPELIN-25 save/restore angular object registry snapshot to the notebook file

This commit is contained in:
Lee moon soo 2015-04-05 11:48:24 +09:00
parent c2881982ed
commit 5954e292bf
8 changed files with 141 additions and 24 deletions

View file

@ -43,11 +43,15 @@ public class AngularObjectRegistry implements AngularObjectListener {
}
public AngularObject add(String name, Object o) {
return add(name, o, true);
}
public AngularObject add(String name, Object o, boolean emit) {
AngularObject ao = createNewAngularObject(name, o);
synchronized (registry) {
registry.put(name, ao);
if (listener != null) {
if (listener != null && emit) {
listener.onAdd(interpreterId, ao);
}
}

View file

@ -32,8 +32,12 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
AngularObjectRegistry angularObjectRegistry;
public InterpreterGroup(String id) {
this.id = id;
}
public InterpreterGroup() {
this.id = getId();
getId();
}
private static String generateId() {
@ -50,7 +54,6 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
}
}
public Properties getProperty() {
Properties p = new Properties();
for (Interpreter intp : this) {

View file

@ -226,6 +226,9 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
Client client = null;
try {
client = getClient();
} catch (NullPointerException e) {
// remote process not started
return;
} catch (Exception e) {
logger.error("Can't update angular object", e);
}

View file

@ -597,6 +597,7 @@ public class NotebookServer extends WebSocketServer implements
@Override
public void onUpdate(String interpreterGroupId, AngularObject object) {
Notebook notebook = notebook();
List<Note> notes = notebook.getAllNotes();
for (Note note : notes) {
List<InterpreterSetting> intpSettings = note.getNoteReplLoader()

View file

@ -226,17 +226,20 @@ public class InterpreterFactory {
// previously created setting should turn this feature on here.
setting.getOption().setRemote(true);
InterpreterGroup interpreterGroup = createInterpreterGroup(
setting.getGroup(),
setting.getOption(),
setting.getProperties());
InterpreterSetting intpSetting = new InterpreterSetting(
setting.id(),
setting.getName(),
setting.getGroup(),
setting.getOption());
InterpreterGroup interpreterGroup = createInterpreterGroup(
setting.id(),
setting.getGroup(),
setting.getOption(),
interpreterGroup);
setting.getProperties());
intpSetting.setInterpreterGroup(interpreterGroup);
interpreterSettings.put(k, intpSetting);
}
@ -329,37 +332,41 @@ public class InterpreterFactory {
InterpreterOption option, Properties properties)
throws InterpreterException, IOException {
synchronized (interpreterSettings) {
InterpreterGroup interpreterGroup = createInterpreterGroup(groupName, option, properties);
InterpreterSetting intpSetting = new InterpreterSetting(
name,
groupName,
option,
interpreterGroup);
interpreterSettings.put(intpSetting.id(), intpSetting);
option);
InterpreterGroup interpreterGroup = createInterpreterGroup(
intpSetting.id(), groupName, option, properties);
intpSetting.setInterpreterGroup(interpreterGroup);
interpreterSettings.put(intpSetting.id(), intpSetting);
saveToFile();
return interpreterGroup;
}
}
private InterpreterGroup createInterpreterGroup(String groupName,
private InterpreterGroup createInterpreterGroup(String id,
String groupName,
InterpreterOption option,
Properties properties)
throws InterpreterException {
AngularObjectRegistry angularObjectRegistry;
InterpreterGroup interpreterGroup = new InterpreterGroup();
InterpreterGroup interpreterGroup = new InterpreterGroup(id);
if (option.isRemote()) {
angularObjectRegistry = new RemoteAngularObjectRegistry(
interpreterGroup.getId(),
id,
angularObjectRegistryListener,
interpreterGroup
);
} else {
angularObjectRegistry = new AngularObjectRegistry(
interpreterGroup.getId(),
id,
angularObjectRegistryListener);
}
@ -506,6 +513,7 @@ public class InterpreterFactory {
intpsetting.setOption(option);
InterpreterGroup interpreterGroup = createInterpreterGroup(
intpsetting.id(),
intpsetting.getGroup(), option, properties);
intpsetting.setInterpreterGroup(interpreterGroup);
saveToFile();
@ -525,6 +533,7 @@ public class InterpreterFactory {
intpsetting.getInterpreterGroup().destroy();
InterpreterGroup interpreterGroup = createInterpreterGroup(
intpsetting.id(),
intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties());
intpsetting.setInterpreterGroup(interpreterGroup);
} else {

View file

@ -36,21 +36,17 @@ public class InterpreterSetting {
public InterpreterSetting(String id, String name,
String group,
InterpreterOption option,
InterpreterGroup interpreterGroup) {
InterpreterOption option) {
this.id = id;
this.name = name;
this.group = group;
this.properties = interpreterGroup.getProperty();
this.option = option;
this.interpreterGroup = interpreterGroup;
}
public InterpreterSetting(String name,
String group,
InterpreterOption option,
InterpreterGroup interpreterGroup) {
this(generateId(), name, group, option, interpreterGroup);
InterpreterOption option) {
this(generateId(), name, group, option);
}
public String id() {

View file

@ -33,7 +33,11 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.notebook.utility.IdHashes;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.JobListener;
@ -54,6 +58,8 @@ public class Note implements Serializable, JobListener {
private String name;
private String id;
Map<String, List<AngularObject>> angularObjects = new HashMap<String, List<AngularObject>>();
private transient NoteInterpreterLoader replLoader;
private transient ZeppelinConfiguration conf;
private transient JobListenerFactory jobListenerFactory;
@ -110,6 +116,10 @@ public class Note implements Serializable, JobListener {
this.conf = conf;
}
public Map<String, List<AngularObject>> getAngularObjects() {
return angularObjects;
}
/**
* Add paragraph last.
*
@ -268,6 +278,21 @@ public class Note implements Serializable, JobListener {
}
}
private void snapshotAngularObjectRegistry() {
angularObjects = new HashMap<String, List<AngularObject>>();
List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
if (settings == null || settings.size() == 0) {
return;
}
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getInterpreterGroup();
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
angularObjects.put(intpGroup.getId(), registry.getAll());
}
}
public void persist() throws IOException {
GsonBuilder gsonBuilder = new GsonBuilder();
gsonBuilder.setPrettyPrinting();
@ -283,6 +308,7 @@ public class Note implements Serializable, JobListener {
File file = new File(conf.getNotebookDir() + "/" + id + "/note.json");
logger().info("Persist note {} into {}", id, file.getAbsolutePath());
snapshotAngularObjectRegistry();
String json = gson.toJson(this);
FileOutputStream out = new FileOutputStream(file);
out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING)));
@ -320,7 +346,6 @@ public class Note implements Serializable, JobListener {
p.setStatus(Status.ABORT);
}
}
return note;
}

View file

@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@ -29,7 +31,10 @@ import java.util.Map;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -160,6 +165,10 @@ public class Notebook {
if (dirs == null) {
return;
}
Map<String, SnapshotAngularObject> angularObjectSnapshot =
new HashMap<String, SnapshotAngularObject>();
for (File f : dirs) {
boolean isHidden = f.getName().startsWith(".");
if (f.isDirectory() && !isHidden) {
@ -174,12 +183,79 @@ public class Notebook {
jobListenerFactory, quartzSched);
noteInterpreterLoader.setNoteId(note.id());
// restore angular object --------------
Date lastUpdatedDate = new Date(0);
for (Paragraph p : note.getParagraphs()) {
if (p.getDateFinished() != null &&
lastUpdatedDate.before(p.getDateFinished())) {
lastUpdatedDate = p.getDateFinished();
}
}
Map<String, List<AngularObject>> savedObjects = note.getAngularObjects();
if (savedObjects != null) {
for (String intpGroupName : savedObjects.keySet()) {
List<AngularObject> objectList = savedObjects.get(intpGroupName);
for (AngularObject savedObject : objectList) {
SnapshotAngularObject snapshot = angularObjectSnapshot.get(savedObject.getName());
if (snapshot == null || snapshot.getLastUpdate().before(lastUpdatedDate)) {
angularObjectSnapshot.put(
savedObject.getName(),
new SnapshotAngularObject(
intpGroupName,
savedObject,
lastUpdatedDate));
}
}
}
}
synchronized (notes) {
notes.put(note.id(), note);
refreshCron(note.id());
}
}
}
for (String name : angularObjectSnapshot.keySet()) {
SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
List<InterpreterSetting> settings = replFactory.get();
for (InterpreterSetting setting : settings) {
InterpreterGroup intpGroup = setting.getInterpreterGroup();
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
if (registry.get(name) == null) {
registry.add(name, snapshot.getAngularObject().get(), false);
}
}
}
}
}
class SnapshotAngularObject {
String intpGroupId;
AngularObject angularObject;
Date lastUpdate;
public SnapshotAngularObject(String intpGroupId,
AngularObject angularObject, Date lastUpdate) {
super();
this.intpGroupId = intpGroupId;
this.angularObject = angularObject;
this.lastUpdate = lastUpdate;
}
public String getIntpGroupId() {
return intpGroupId;
}
public AngularObject getAngularObject() {
return angularObject;
}
public Date getLastUpdate() {
return lastUpdate;
}
}
public List<Note> getAllNotes() {