ZEPPELIN-25 implement watcher

This commit is contained in:
Lee moon soo 2015-04-05 17:29:37 +09:00
parent 6df7f23e7c
commit 6ce8f364d2
8 changed files with 226 additions and 23 deletions

View file

@ -462,6 +462,16 @@ public class SparkInterpreter extends Interpreter {
}
}
}
// add some implicit conversion
intp.interpret("implicit def watcherConversion(watcher: (Object, Object) => Unit):"
+ "com.nflabs.zeppelin.display.AngularObjectWatcher = {"
+ " new com.nflabs.zeppelin.display.AngularObjectWatcher() {"
+ " def watch(before:Object, after:Object) = {"
+ " watcher(before, after)"
+ " }"
+ " }"
+ "}");
}
private List<File> currentClassPath() {

View file

@ -29,6 +29,9 @@ import java.util.Iterator;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import com.nflabs.zeppelin.display.AngularObject;
import com.nflabs.zeppelin.display.AngularObjectRegistry;
import com.nflabs.zeppelin.display.AngularObjectWatcher;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input.ParamOption;
import org.apache.zeppelin.interpreter.InterpreterContext;
@ -251,22 +254,58 @@ public class ZeppelinContext extends HashMap<String, Object> {
this.interpreterContext = interpreterContext;
}
/*
public void angularBind(String name, sObject o) {
public Object angular(String name) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
AngularObject ao = registry.get(name);
if (ao == null) {
return null;
} else {
return ao.get();
}
}
public void angularBind(String name, sObject o, sWatcher w) {
public void angularBind(String name, Object o) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name) == null) {
registry.add(name, o);
} else {
registry.get(name).set(o);
}
}
public void angularBind(String name, Function f) {
public void angularBind(String name, Object o, AngularObjectWatcher w) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name) == null) {
registry.add(name, o);
} else {
registry.get(name).set(o);
}
angularWatch(name, w);
}
public void angularUnbind(sString name) {
public void angularWatch(String name, AngularObjectWatcher w) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name) != null) {
registry.get(name).addWatcher(w);
}
}
public void angularUnwatch(String name, AngularObjectWatcher w) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name) != null) {
registry.get(name).removeWatcher(w);
}
}
public void angularUnwatch(String name) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
if (registry.get(name) != null) {
registry.get(name).clearAllWatchers();
}
}
public void angularUnbind(String name) {
AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry();
registry.remove(name);
}
*/
}

View file

@ -17,7 +17,15 @@
package com.nflabs.zeppelin.display;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nflabs.zeppelin.scheduler.ExecutorFactory;
/**
*
@ -38,6 +46,8 @@ public class AngularObject<T> {
private String name;
private T object;
private transient AngularObjectListener listener;
private transient List<AngularObjectWatcher> watchers
= new LinkedList<AngularObjectWatcher>();
private AngularObjectType type;
protected AngularObject(String name, T o,
@ -91,10 +101,28 @@ public class AngularObject<T> {
}
public void set(T o, boolean emit) {
final T before = object;
final T after = o;
object = o;
if (emit) {
emit();
}
List<AngularObjectWatcher> ws = new LinkedList<AngularObjectWatcher>();
synchronized (watchers) {
ws.addAll(watchers);
}
ExecutorService executor = ExecutorFactory.singleton().createOrGet("angularObjectWatcher", 50);
for (final AngularObjectWatcher w : ws) {
Logger logger = LoggerFactory.getLogger(AngularObject.class);
executor.submit(new Runnable() {
@Override
public void run() {
w.watch(before, after);
}
});
}
}
public void setListener(AngularObjectListener listener) {
@ -104,4 +132,23 @@ public class AngularObject<T> {
public AngularObjectListener getListener() {
return listener;
}
public void addWatcher(AngularObjectWatcher watcher) {
synchronized (watchers) {
watchers.add(watcher);
}
}
public void removeWatcher(AngularObjectWatcher watcher) {
synchronized (watchers) {
watchers.remove(watcher);
}
}
public void clearAllWatchers() {
synchronized (watchers) {
watchers.clear();
}
}
}

View file

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.display;
/**
*
*/
public interface AngularObjectWatcher {
public void watch(Object oldObject, Object newObject);
}

View file

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.nflabs.zeppelin.scheduler;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
*
*/
public class ExecutorFactory {
private static ExecutorFactory _executor;
private static Long _executorLock = new Long(0);
Map<String, ExecutorService> executor = new HashMap<String, ExecutorService>();
public ExecutorFactory() {
}
public static ExecutorFactory singleton() {
if (_executor == null) {
synchronized (_executorLock) {
if (_executor == null) {
_executor = new ExecutorFactory();
}
}
}
return _executor;
}
public ExecutorService getDefaultExecutor() {
return createOrGet("default");
}
public ExecutorService createOrGet(String name) {
return createOrGet(name, 100);
}
public ExecutorService createOrGet(String name, int numThread) {
synchronized (executor) {
if (!executor.containsKey(name)) {
executor.put(name, Executors.newScheduledThreadPool(numThread));
}
return executor.get(name);
}
}
public void shutdown(String name) {
synchronized (executor) {
if (executor.containsKey(name)) {
ExecutorService e = executor.get(name);
e.shutdown();
executor.remove(name);
}
}
}
public void shutdownAll() {
synchronized (executor) {
for (String name : executor.keySet()){
shutdown(name);
}
}
}
}

View file

@ -22,8 +22,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ExecutorService;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.slf4j.Logger;
@ -37,7 +36,7 @@ import org.slf4j.LoggerFactory;
*/
public class SchedulerFactory implements SchedulerListener {
private final Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);
ScheduledExecutorService executor;
ExecutorService executor;
Map<String, Scheduler> schedulers = new LinkedHashMap<String, Scheduler>();
private static SchedulerFactory singleton;
@ -59,11 +58,11 @@ public class SchedulerFactory implements SchedulerListener {
}
public SchedulerFactory() throws Exception {
executor = Executors.newScheduledThreadPool(100);
executor = ExecutorFactory.singleton().createOrGet("schedulerFactory", 100);
}
public void destroy() {
executor.shutdown();
ExecutorFactory.singleton().shutdown("schedulerFactory");
}
public Scheduler createOrGetFIFOScheduler(String name) {

View file

@ -241,12 +241,14 @@ public class NotebookServer extends WebSocketServer implements
}
private void broadcast(String noteId, Message m) {
LOG.info("SEND >> " + m.op);
synchronized (noteSocketMap) {
List<WebSocket> socketLists = noteSocketMap.get(noteId);
if (socketLists == null || socketLists.size() == 0) {
return;
}
LOG.info("SEND >> " + m.op);
for (WebSocket conn : socketLists) {
conn.send(serializeMessage(m));
}
@ -563,7 +565,7 @@ public class NotebookServer extends WebSocketServer implements
private void sendAllAngularObjects(Note note, WebSocket conn) {
List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
if (settings == null || settings.size() ==0) {
if (settings == null || settings.size() == 0) {
return;
}
@ -572,9 +574,9 @@ public class NotebookServer extends WebSocketServer implements
List<AngularObject> objects = registry.getAll();
for (AngularObject object : objects) {
conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", object)
.put("interpreterGroupId", intpSetting.getInterpreterGroup().getId())
.put("noteId", note.id())));
.put("angularObject", object)
.put("interpreterGroupId", intpSetting.getInterpreterGroup().getId())
.put("noteId", note.id())));
}
}
}
@ -596,7 +598,6 @@ public class NotebookServer extends WebSocketServer implements
if (intpSettings.isEmpty()) continue;
for (InterpreterSetting setting : intpSettings) {
if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) {
broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
.put("angularObject", object)

View file

@ -261,7 +261,6 @@ public class Notebook {
public List<Note> getAllNotes() {
synchronized (notes) {
List<Note> noteList = new ArrayList<Note>(notes.values());
logger.info("" + noteList.size());
Collections.sort(noteList, new Comparator() {
@Override
public int compare(Object one, Object two) {