mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-25 implement watcher
This commit is contained in:
parent
6df7f23e7c
commit
6ce8f364d2
8 changed files with 226 additions and 23 deletions
|
|
@ -462,6 +462,16 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// add some implicit conversion
|
||||
intp.interpret("implicit def watcherConversion(watcher: (Object, Object) => Unit):"
|
||||
+ "com.nflabs.zeppelin.display.AngularObjectWatcher = {"
|
||||
+ " new com.nflabs.zeppelin.display.AngularObjectWatcher() {"
|
||||
+ " def watch(before:Object, after:Object) = {"
|
||||
+ " watcher(before, after)"
|
||||
+ " }"
|
||||
+ " }"
|
||||
+ "}");
|
||||
}
|
||||
|
||||
private List<File> currentClassPath() {
|
||||
|
|
|
|||
|
|
@ -29,6 +29,9 @@ import java.util.Iterator;
|
|||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.spark.sql.hive.HiveContext;
|
||||
import com.nflabs.zeppelin.display.AngularObject;
|
||||
import com.nflabs.zeppelin.display.AngularObjectRegistry;
|
||||
import com.nflabs.zeppelin.display.AngularObjectWatcher;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.display.Input.ParamOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
|
|
@ -251,22 +254,58 @@ public class ZeppelinContext extends HashMap<String, Object> {
|
|||
this.interpreterContext = interpreterContext;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
public void angularBind(String name, sObject o) {
|
||||
|
||||
public Object angular(String name) {
|
||||
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
|
||||
AngularObject ao = registry.get(name);
|
||||
if (ao == null) {
|
||||
return null;
|
||||
} else {
|
||||
return ao.get();
|
||||
}
|
||||
}
|
||||
|
||||
public void angularBind(String name, sObject o, sWatcher w) {
|
||||
|
||||
public void angularBind(String name, Object o) {
|
||||
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
|
||||
if (registry.get(name) == null) {
|
||||
registry.add(name, o);
|
||||
} else {
|
||||
registry.get(name).set(o);
|
||||
}
|
||||
}
|
||||
|
||||
public void angularBind(String name, Function f) {
|
||||
|
||||
public void angularBind(String name, Object o, AngularObjectWatcher w) {
|
||||
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
|
||||
if (registry.get(name) == null) {
|
||||
registry.add(name, o);
|
||||
} else {
|
||||
registry.get(name).set(o);
|
||||
}
|
||||
angularWatch(name, w);
|
||||
}
|
||||
|
||||
public void angularUnbind(sString name) {
|
||||
|
||||
public void angularWatch(String name, AngularObjectWatcher w) {
|
||||
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
|
||||
if (registry.get(name) != null) {
|
||||
registry.get(name).addWatcher(w);
|
||||
}
|
||||
}
|
||||
|
||||
public void angularUnwatch(String name, AngularObjectWatcher w) {
|
||||
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
|
||||
if (registry.get(name) != null) {
|
||||
registry.get(name).removeWatcher(w);
|
||||
}
|
||||
}
|
||||
|
||||
public void angularUnwatch(String name) {
|
||||
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
|
||||
if (registry.get(name) != null) {
|
||||
registry.get(name).clearAllWatchers();
|
||||
}
|
||||
}
|
||||
|
||||
public void angularUnbind(String name) {
|
||||
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
|
||||
registry.remove(name);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,15 @@
|
|||
|
||||
package com.nflabs.zeppelin.display;
|
||||
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.nflabs.zeppelin.scheduler.ExecutorFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
@ -38,6 +46,8 @@ public class AngularObject<T> {
|
|||
private String name;
|
||||
private T object;
|
||||
private transient AngularObjectListener listener;
|
||||
private transient List<AngularObjectWatcher> watchers
|
||||
= new LinkedList<AngularObjectWatcher>();
|
||||
private AngularObjectType type;
|
||||
|
||||
protected AngularObject(String name, T o,
|
||||
|
|
@ -91,10 +101,28 @@ public class AngularObject<T> {
|
|||
}
|
||||
|
||||
public void set(T o, boolean emit) {
|
||||
final T before = object;
|
||||
final T after = o;
|
||||
object = o;
|
||||
if (emit) {
|
||||
emit();
|
||||
}
|
||||
|
||||
List<AngularObjectWatcher> ws = new LinkedList<AngularObjectWatcher>();
|
||||
synchronized (watchers) {
|
||||
ws.addAll(watchers);
|
||||
}
|
||||
|
||||
ExecutorService executor = ExecutorFactory.singleton().createOrGet("angularObjectWatcher", 50);
|
||||
for (final AngularObjectWatcher w : ws) {
|
||||
Logger logger = LoggerFactory.getLogger(AngularObject.class);
|
||||
executor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
w.watch(before, after);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public void setListener(AngularObjectListener listener) {
|
||||
|
|
@ -104,4 +132,23 @@ public class AngularObject<T> {
|
|||
public AngularObjectListener getListener() {
|
||||
return listener;
|
||||
}
|
||||
|
||||
public void addWatcher(AngularObjectWatcher watcher) {
|
||||
synchronized (watchers) {
|
||||
watchers.add(watcher);
|
||||
}
|
||||
}
|
||||
|
||||
public void removeWatcher(AngularObjectWatcher watcher) {
|
||||
synchronized (watchers) {
|
||||
watchers.remove(watcher);
|
||||
}
|
||||
}
|
||||
|
||||
public void clearAllWatchers() {
|
||||
synchronized (watchers) {
|
||||
watchers.clear();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.nflabs.zeppelin.display;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface AngularObjectWatcher {
|
||||
public void watch(Object oldObject, Object newObject);
|
||||
}
|
||||
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.nflabs.zeppelin.scheduler;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class ExecutorFactory {
|
||||
private static ExecutorFactory _executor;
|
||||
private static Long _executorLock = new Long(0);
|
||||
|
||||
Map<String, ExecutorService> executor = new HashMap<String, ExecutorService>();
|
||||
|
||||
public ExecutorFactory() {
|
||||
|
||||
}
|
||||
|
||||
public static ExecutorFactory singleton() {
|
||||
if (_executor == null) {
|
||||
synchronized (_executorLock) {
|
||||
if (_executor == null) {
|
||||
_executor = new ExecutorFactory();
|
||||
}
|
||||
}
|
||||
}
|
||||
return _executor;
|
||||
}
|
||||
|
||||
public ExecutorService getDefaultExecutor() {
|
||||
return createOrGet("default");
|
||||
}
|
||||
|
||||
public ExecutorService createOrGet(String name) {
|
||||
return createOrGet(name, 100);
|
||||
}
|
||||
|
||||
public ExecutorService createOrGet(String name, int numThread) {
|
||||
synchronized (executor) {
|
||||
if (!executor.containsKey(name)) {
|
||||
executor.put(name, Executors.newScheduledThreadPool(numThread));
|
||||
}
|
||||
return executor.get(name);
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown(String name) {
|
||||
synchronized (executor) {
|
||||
if (executor.containsKey(name)) {
|
||||
ExecutorService e = executor.get(name);
|
||||
e.shutdown();
|
||||
executor.remove(name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void shutdownAll() {
|
||||
synchronized (executor) {
|
||||
for (String name : executor.keySet()){
|
||||
shutdown(name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -22,8 +22,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -37,7 +36,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class SchedulerFactory implements SchedulerListener {
|
||||
private final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
|
||||
ScheduledExecutorService executor;
|
||||
ExecutorService executor;
|
||||
Map<String, Scheduler> schedulers = new LinkedHashMap<String, Scheduler>();
|
||||
|
||||
private static SchedulerFactory singleton;
|
||||
|
|
@ -59,11 +58,11 @@ public class SchedulerFactory implements SchedulerListener {
|
|||
}
|
||||
|
||||
public SchedulerFactory() throws Exception {
|
||||
executor = Executors.newScheduledThreadPool(100);
|
||||
executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
|
||||
}
|
||||
|
||||
public void destroy() {
|
||||
executor.shutdown();
|
||||
ExecutorFactory.singleton().shutdown("schedulerFactory");
|
||||
}
|
||||
|
||||
public Scheduler createOrGetFIFOScheduler(String name) {
|
||||
|
|
|
|||
|
|
@ -241,12 +241,14 @@ public class NotebookServer extends WebSocketServer implements
|
|||
}
|
||||
|
||||
private void broadcast(String noteId, Message m) {
|
||||
LOG.info("SEND >> " + m.op);
|
||||
synchronized (noteSocketMap) {
|
||||
List<WebSocket> socketLists = noteSocketMap.get(noteId);
|
||||
if (socketLists == null || socketLists.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG.info("SEND >> " + m.op);
|
||||
|
||||
for (WebSocket conn : socketLists) {
|
||||
conn.send(serializeMessage(m));
|
||||
}
|
||||
|
|
@ -563,7 +565,7 @@ public class NotebookServer extends WebSocketServer implements
|
|||
|
||||
private void sendAllAngularObjects(Note note, WebSocket conn) {
|
||||
List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
|
||||
if (settings == null || settings.size() ==0) {
|
||||
if (settings == null || settings.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -572,9 +574,9 @@ public class NotebookServer extends WebSocketServer implements
|
|||
List<AngularObject> objects = registry.getAll();
|
||||
for (AngularObject object : objects) {
|
||||
conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
.put("angularObject", object)
|
||||
.put("interpreterGroupId", intpSetting.getInterpreterGroup().getId())
|
||||
.put("noteId", note.id())));
|
||||
.put("angularObject", object)
|
||||
.put("interpreterGroupId", intpSetting.getInterpreterGroup().getId())
|
||||
.put("noteId", note.id())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -596,7 +598,6 @@ public class NotebookServer extends WebSocketServer implements
|
|||
if (intpSettings.isEmpty()) continue;
|
||||
|
||||
for (InterpreterSetting setting : intpSettings) {
|
||||
|
||||
if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) {
|
||||
broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
.put("angularObject", object)
|
||||
|
|
|
|||
|
|
@ -261,7 +261,6 @@ public class Notebook {
|
|||
public List<Note> getAllNotes() {
|
||||
synchronized (notes) {
|
||||
List<Note> noteList = new ArrayList<Note>(notes.values());
|
||||
logger.info("" + noteList.size());
|
||||
Collections.sort(noteList, new Comparator() {
|
||||
@Override
|
||||
public int compare(Object one, Object two) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue