mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-25 save/restore angular object registry snapshot to the notebook file
This commit is contained in:
parent
c2881982ed
commit
5954e292bf
8 changed files with 141 additions and 24 deletions
|
|
@ -43,11 +43,15 @@ public class AngularObjectRegistry implements AngularObjectListener {
|
|||
}
|
||||
|
||||
public AngularObject add(String name, Object o) {
|
||||
return add(name, o, true);
|
||||
}
|
||||
|
||||
public AngularObject add(String name, Object o, boolean emit) {
|
||||
AngularObject ao = createNewAngularObject(name, o);
|
||||
|
||||
synchronized (registry) {
|
||||
registry.put(name, ao);
|
||||
if (listener != null) {
|
||||
if (listener != null && emit) {
|
||||
listener.onAdd(interpreterId, ao);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,8 +32,12 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
|
|||
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
|
||||
public InterpreterGroup(String id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public InterpreterGroup() {
|
||||
this.id = getId();
|
||||
getId();
|
||||
}
|
||||
|
||||
private static String generateId() {
|
||||
|
|
@ -50,7 +54,6 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public Properties getProperty() {
|
||||
Properties p = new Properties();
|
||||
for (Interpreter intp : this) {
|
||||
|
|
|
|||
|
|
@ -226,6 +226,9 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
Client client = null;
|
||||
try {
|
||||
client = getClient();
|
||||
} catch (NullPointerException e) {
|
||||
// remote process not started
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
logger.error("Can't update angular object", e);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -597,6 +597,7 @@ public class NotebookServer extends WebSocketServer implements
|
|||
@Override
|
||||
public void onUpdate(String interpreterGroupId, AngularObject object) {
|
||||
Notebook notebook = notebook();
|
||||
|
||||
List<Note> notes = notebook.getAllNotes();
|
||||
for (Note note : notes) {
|
||||
List<InterpreterSetting> intpSettings = note.getNoteReplLoader()
|
||||
|
|
|
|||
|
|
@ -226,17 +226,20 @@ public class InterpreterFactory {
|
|||
// previously created setting should turn this feature on here.
|
||||
setting.getOption().setRemote(true);
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(
|
||||
setting.getGroup(),
|
||||
setting.getOption(),
|
||||
setting.getProperties());
|
||||
|
||||
|
||||
InterpreterSetting intpSetting = new InterpreterSetting(
|
||||
setting.id(),
|
||||
setting.getName(),
|
||||
setting.getGroup(),
|
||||
setting.getOption());
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(
|
||||
setting.id(),
|
||||
setting.getGroup(),
|
||||
setting.getOption(),
|
||||
interpreterGroup);
|
||||
setting.getProperties());
|
||||
intpSetting.setInterpreterGroup(interpreterGroup);
|
||||
|
||||
interpreterSettings.put(k, intpSetting);
|
||||
}
|
||||
|
|
@ -329,37 +332,41 @@ public class InterpreterFactory {
|
|||
InterpreterOption option, Properties properties)
|
||||
throws InterpreterException, IOException {
|
||||
synchronized (interpreterSettings) {
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(groupName, option, properties);
|
||||
|
||||
InterpreterSetting intpSetting = new InterpreterSetting(
|
||||
name,
|
||||
groupName,
|
||||
option,
|
||||
interpreterGroup);
|
||||
interpreterSettings.put(intpSetting.id(), intpSetting);
|
||||
option);
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(
|
||||
intpSetting.id(), groupName, option, properties);
|
||||
|
||||
intpSetting.setInterpreterGroup(interpreterGroup);
|
||||
|
||||
interpreterSettings.put(intpSetting.id(), intpSetting);
|
||||
saveToFile();
|
||||
return interpreterGroup;
|
||||
}
|
||||
}
|
||||
|
||||
private InterpreterGroup createInterpreterGroup(String groupName,
|
||||
private InterpreterGroup createInterpreterGroup(String id,
|
||||
String groupName,
|
||||
InterpreterOption option,
|
||||
Properties properties)
|
||||
throws InterpreterException {
|
||||
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup();
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup(id);
|
||||
if (option.isRemote()) {
|
||||
angularObjectRegistry = new RemoteAngularObjectRegistry(
|
||||
interpreterGroup.getId(),
|
||||
id,
|
||||
angularObjectRegistryListener,
|
||||
interpreterGroup
|
||||
);
|
||||
} else {
|
||||
angularObjectRegistry = new AngularObjectRegistry(
|
||||
interpreterGroup.getId(),
|
||||
id,
|
||||
angularObjectRegistryListener);
|
||||
}
|
||||
|
||||
|
|
@ -506,6 +513,7 @@ public class InterpreterFactory {
|
|||
intpsetting.setOption(option);
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(
|
||||
intpsetting.id(),
|
||||
intpsetting.getGroup(), option, properties);
|
||||
intpsetting.setInterpreterGroup(interpreterGroup);
|
||||
saveToFile();
|
||||
|
|
@ -525,6 +533,7 @@ public class InterpreterFactory {
|
|||
intpsetting.getInterpreterGroup().destroy();
|
||||
|
||||
InterpreterGroup interpreterGroup = createInterpreterGroup(
|
||||
intpsetting.id(),
|
||||
intpsetting.getGroup(), intpsetting.getOption(), intpsetting.getProperties());
|
||||
intpsetting.setInterpreterGroup(interpreterGroup);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -36,21 +36,17 @@ public class InterpreterSetting {
|
|||
|
||||
public InterpreterSetting(String id, String name,
|
||||
String group,
|
||||
InterpreterOption option,
|
||||
InterpreterGroup interpreterGroup) {
|
||||
InterpreterOption option) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
this.group = group;
|
||||
this.properties = interpreterGroup.getProperty();
|
||||
this.option = option;
|
||||
this.interpreterGroup = interpreterGroup;
|
||||
}
|
||||
|
||||
public InterpreterSetting(String name,
|
||||
String group,
|
||||
InterpreterOption option,
|
||||
InterpreterGroup interpreterGroup) {
|
||||
this(generateId(), name, group, option, interpreterGroup);
|
||||
InterpreterOption option) {
|
||||
this(generateId(), name, group, option);
|
||||
}
|
||||
|
||||
public String id() {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,11 @@ import org.apache.commons.io.FileUtils;
|
|||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.notebook.utility.IdHashes;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
|
|
@ -54,6 +58,8 @@ public class Note implements Serializable, JobListener {
|
|||
private String name;
|
||||
private String id;
|
||||
|
||||
Map<String, List<AngularObject>> angularObjects = new HashMap<String, List<AngularObject>>();
|
||||
|
||||
private transient NoteInterpreterLoader replLoader;
|
||||
private transient ZeppelinConfiguration conf;
|
||||
private transient JobListenerFactory jobListenerFactory;
|
||||
|
|
@ -110,6 +116,10 @@ public class Note implements Serializable, JobListener {
|
|||
this.conf = conf;
|
||||
}
|
||||
|
||||
public Map<String, List<AngularObject>> getAngularObjects() {
|
||||
return angularObjects;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add paragraph last.
|
||||
*
|
||||
|
|
@ -268,6 +278,21 @@ public class Note implements Serializable, JobListener {
|
|||
}
|
||||
}
|
||||
|
||||
private void snapshotAngularObjectRegistry() {
|
||||
angularObjects = new HashMap<String, List<AngularObject>>();
|
||||
|
||||
List<InterpreterSetting> settings = replLoader.getInterpreterSettings();
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup();
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
angularObjects.put(intpGroup.getId(), registry.getAll());
|
||||
}
|
||||
}
|
||||
|
||||
public void persist() throws IOException {
|
||||
GsonBuilder gsonBuilder = new GsonBuilder();
|
||||
gsonBuilder.setPrettyPrinting();
|
||||
|
|
@ -283,6 +308,7 @@ public class Note implements Serializable, JobListener {
|
|||
File file = new File(conf.getNotebookDir() + "/" + id + "/note.json");
|
||||
logger().info("Persist note {} into {}", id, file.getAbsolutePath());
|
||||
|
||||
snapshotAngularObjectRegistry();
|
||||
String json = gson.toJson(this);
|
||||
FileOutputStream out = new FileOutputStream(file);
|
||||
out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING)));
|
||||
|
|
@ -320,7 +346,6 @@ public class Note implements Serializable, JobListener {
|
|||
p.setStatus(Status.ABORT);
|
||||
}
|
||||
}
|
||||
|
||||
return note;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
|
@ -29,7 +31,10 @@ import java.util.Map;
|
|||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.InterpreterFactory;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -160,6 +165,10 @@ public class Notebook {
|
|||
if (dirs == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<String, SnapshotAngularObject> angularObjectSnapshot =
|
||||
new HashMap<String, SnapshotAngularObject>();
|
||||
|
||||
for (File f : dirs) {
|
||||
boolean isHidden = f.getName().startsWith(".");
|
||||
if (f.isDirectory() && !isHidden) {
|
||||
|
|
@ -174,12 +183,79 @@ public class Notebook {
|
|||
jobListenerFactory, quartzSched);
|
||||
noteInterpreterLoader.setNoteId(note.id());
|
||||
|
||||
// restore angular object --------------
|
||||
Date lastUpdatedDate = new Date(0);
|
||||
for (Paragraph p : note.getParagraphs()) {
|
||||
if (p.getDateFinished() != null &&
|
||||
lastUpdatedDate.before(p.getDateFinished())) {
|
||||
lastUpdatedDate = p.getDateFinished();
|
||||
}
|
||||
}
|
||||
|
||||
Map<String, List<AngularObject>> savedObjects = note.getAngularObjects();
|
||||
|
||||
if (savedObjects != null) {
|
||||
for (String intpGroupName : savedObjects.keySet()) {
|
||||
List<AngularObject> objectList = savedObjects.get(intpGroupName);
|
||||
|
||||
for (AngularObject savedObject : objectList) {
|
||||
SnapshotAngularObject snapshot = angularObjectSnapshot.get(savedObject.getName());
|
||||
if (snapshot == null || snapshot.getLastUpdate().before(lastUpdatedDate)) {
|
||||
angularObjectSnapshot.put(
|
||||
savedObject.getName(),
|
||||
new SnapshotAngularObject(
|
||||
intpGroupName,
|
||||
savedObject,
|
||||
lastUpdatedDate));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (notes) {
|
||||
notes.put(note.id(), note);
|
||||
refreshCron(note.id());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (String name : angularObjectSnapshot.keySet()) {
|
||||
SnapshotAngularObject snapshot = angularObjectSnapshot.get(name);
|
||||
List<InterpreterSetting> settings = replFactory.get();
|
||||
for (InterpreterSetting setting : settings) {
|
||||
InterpreterGroup intpGroup = setting.getInterpreterGroup();
|
||||
if (intpGroup.getId().equals(snapshot.getIntpGroupId())) {
|
||||
AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry();
|
||||
if (registry.get(name) == null) {
|
||||
registry.add(name, snapshot.getAngularObject().get(), false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class SnapshotAngularObject {
|
||||
String intpGroupId;
|
||||
AngularObject angularObject;
|
||||
Date lastUpdate;
|
||||
|
||||
public SnapshotAngularObject(String intpGroupId,
|
||||
AngularObject angularObject, Date lastUpdate) {
|
||||
super();
|
||||
this.intpGroupId = intpGroupId;
|
||||
this.angularObject = angularObject;
|
||||
this.lastUpdate = lastUpdate;
|
||||
}
|
||||
|
||||
public String getIntpGroupId() {
|
||||
return intpGroupId;
|
||||
}
|
||||
public AngularObject getAngularObject() {
|
||||
return angularObject;
|
||||
}
|
||||
public Date getLastUpdate() {
|
||||
return lastUpdate;
|
||||
}
|
||||
}
|
||||
|
||||
public List<Note> getAllNotes() {
|
||||
|
|
|
|||
Loading…
Reference in a new issue