mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-25 impelemnet JS(angular) -JVM(scala) two-way binding
This commit is contained in:
parent
bb52d7bf39
commit
67f6926c1f
34 changed files with 2915 additions and 37 deletions
|
|
@ -251,4 +251,22 @@ public class ZeppelinContext extends HashMap<String, Object> {
|
|||
this.interpreterContext = interpreterContext;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
public void angularBind(String name, sObject o) {
|
||||
|
||||
}
|
||||
|
||||
public void angularBind(String name, sObject o, sWatcher w) {
|
||||
|
||||
}
|
||||
|
||||
public void angularBind(String name, Function f) {
|
||||
|
||||
}
|
||||
|
||||
public void angularUnbind(sString name) {
|
||||
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import java.io.File;
|
|||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
|
|
@ -57,7 +58,8 @@ public class DepInterpreterTest {
|
|||
intpGroup.add(dep);
|
||||
dep.setInterpreterGroup(intpGroup);
|
||||
|
||||
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI());
|
||||
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null));
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
|||
|
|
@ -24,8 +24,10 @@ import java.io.File;
|
|||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.spark.SparkInterpreter;
|
||||
|
|
@ -55,7 +57,9 @@ public class SparkInterpreterTest {
|
|||
repl.open();
|
||||
}
|
||||
|
||||
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI());
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null));
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
|
|
@ -38,6 +39,7 @@ public class SparkSqlInterpreterTest {
|
|||
private SparkSqlInterpreter sql;
|
||||
private SparkInterpreter repl;
|
||||
private InterpreterContext context;
|
||||
private InterpreterGroup intpGroup;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
|
@ -55,13 +57,14 @@ public class SparkSqlInterpreterTest {
|
|||
|
||||
sql = new SparkSqlInterpreter(p);
|
||||
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
intpGroup = new InterpreterGroup();
|
||||
intpGroup.add(repl);
|
||||
intpGroup.add(sql);
|
||||
sql.setInterpreterGroup(intpGroup);
|
||||
sql.open();
|
||||
}
|
||||
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI());
|
||||
context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null));
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
|
|||
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.nflabs.zeppelin.display;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
public class AngularObject<T> {
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public static enum AngularObjectType {
|
||||
STRING,
|
||||
MAP,
|
||||
FUNCTION
|
||||
};
|
||||
|
||||
private String name;
|
||||
private T object;
|
||||
private transient AngularObjectListener listener;
|
||||
private AngularObjectType type;
|
||||
|
||||
protected AngularObject(String name, T o,
|
||||
AngularObjectListener listener) {
|
||||
this.name = name;
|
||||
this.listener = listener;
|
||||
object = o;
|
||||
type = checkType();
|
||||
}
|
||||
|
||||
public AngularObjectType getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public AngularObjectType checkType() {
|
||||
if (object == null) {
|
||||
return AngularObjectType.STRING;
|
||||
} else if (object instanceof Map) {
|
||||
return AngularObjectType.MAP;
|
||||
} else if (object instanceof String) {
|
||||
return AngularObjectType.STRING;
|
||||
}
|
||||
return AngularObjectType.STRING;
|
||||
}
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o instanceof AngularObject) {
|
||||
return name.equals(((AngularObject) o).name);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public Object get() {
|
||||
return object;
|
||||
}
|
||||
|
||||
public void emit(){
|
||||
if (listener != null) {
|
||||
listener.updated(this);
|
||||
}
|
||||
}
|
||||
|
||||
public void set(T o) {
|
||||
set(o, true);
|
||||
}
|
||||
|
||||
public void set(T o, boolean emit) {
|
||||
object = o;
|
||||
if (emit) {
|
||||
emit();
|
||||
}
|
||||
}
|
||||
|
||||
public void setListener(AngularObjectListener listener) {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public AngularObjectListener getListener() {
|
||||
return listener;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.nflabs.zeppelin.display;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public interface AngularObjectListener {
|
||||
public void updated(AngularObject updatedObject);
|
||||
}
|
||||
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.nflabs.zeppelin.display;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class AngularObjectRegistry implements AngularObjectListener {
|
||||
Map<String, AngularObject> registry = new HashMap<String, AngularObject>();
|
||||
private AngularObjectRegistryListener listener;
|
||||
private String interpreterId;
|
||||
|
||||
public AngularObjectRegistry(String interpreterId,
|
||||
AngularObjectRegistryListener listener) {
|
||||
this.interpreterId = interpreterId;
|
||||
this.listener = listener;
|
||||
|
||||
}
|
||||
|
||||
public AngularObjectRegistryListener getListener() {
|
||||
return listener;
|
||||
}
|
||||
|
||||
public AngularObject add(String name, Object o) {
|
||||
AngularObject ao = createNewAngularObject(name, o);
|
||||
|
||||
synchronized (registry) {
|
||||
registry.put(name, ao);
|
||||
if (listener != null) {
|
||||
listener.onAdd(interpreterId, ao);
|
||||
}
|
||||
}
|
||||
|
||||
return ao;
|
||||
}
|
||||
|
||||
protected AngularObject createNewAngularObject(String name, Object o) {
|
||||
return new AngularObject(name, o, this);
|
||||
}
|
||||
|
||||
public AngularObject remove(String name) {
|
||||
synchronized (registry) {
|
||||
AngularObject o = registry.remove(name);
|
||||
if (listener != null) {
|
||||
listener.onRemove(interpreterId, o);;
|
||||
}
|
||||
return o;
|
||||
}
|
||||
}
|
||||
|
||||
public AngularObject get(String name) {
|
||||
synchronized (registry) {
|
||||
return registry.get(name);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updated(AngularObject updatedObject) {
|
||||
if (listener != null) {
|
||||
listener.onUpdate(interpreterId, updatedObject);
|
||||
}
|
||||
}
|
||||
|
||||
public String getInterpreterGroupId() {
|
||||
return interpreterId;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.nflabs.zeppelin.display;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
*/
|
||||
public interface AngularObjectRegistryListener {
|
||||
public void onAdd(String interpreterGroupId, AngularObject object);
|
||||
public void onUpdate(String interpreterGroupId, AngularObject object);
|
||||
public void onRemove(String interpreterGroupId, AngularObject object);
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.nflabs.zeppelin.interpreter.remote;
|
||||
|
||||
import com.nflabs.zeppelin.display.AngularObject;
|
||||
import com.nflabs.zeppelin.display.AngularObjectListener;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class RemoteAngularObject extends AngularObject {
|
||||
|
||||
private transient RemoteInterpreterProcess remoteInterpreterProcess;
|
||||
|
||||
RemoteAngularObject(String name, Object o, String interpreterGroupId,
|
||||
AngularObjectListener listener,
|
||||
RemoteInterpreterProcess remoteInterpreterProcess) {
|
||||
super(name, o, listener);
|
||||
this.remoteInterpreterProcess = remoteInterpreterProcess;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void set(Object o, boolean emit) {
|
||||
super.set(o, emit);
|
||||
|
||||
// send updated value to remote interpreter
|
||||
remoteInterpreterProcess.updateRemoteAngularObject(getName(), o);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.nflabs.zeppelin.interpreter.remote;
|
||||
|
||||
import com.nflabs.zeppelin.display.AngularObject;
|
||||
import com.nflabs.zeppelin.display.AngularObjectRegistry;
|
||||
import com.nflabs.zeppelin.display.AngularObjectRegistryListener;
|
||||
import com.nflabs.zeppelin.interpreter.Interpreter;
|
||||
import com.nflabs.zeppelin.interpreter.InterpreterGroup;
|
||||
import com.nflabs.zeppelin.interpreter.WrappedInterpreter;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
|
||||
|
||||
private InterpreterGroup interpreterGroup;
|
||||
|
||||
public RemoteAngularObjectRegistry(String interpreterId,
|
||||
AngularObjectRegistryListener listener,
|
||||
InterpreterGroup interpreterGroup) {
|
||||
super(interpreterId, listener);
|
||||
this.interpreterGroup = interpreterGroup;
|
||||
}
|
||||
|
||||
private RemoteInterpreterProcess getRemoteInterpreterProcess() {
|
||||
if (interpreterGroup.size() == 0) {
|
||||
throw new RuntimeException("Can't get remoteInterpreterProcess");
|
||||
}
|
||||
Interpreter p = interpreterGroup.get(0);
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
|
||||
if (p instanceof RemoteInterpreter) {
|
||||
return ((RemoteInterpreter) p).getInterpreterProcess();
|
||||
} else {
|
||||
throw new RuntimeException("Can't get remoteInterpreterProcess");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AngularObject createNewAngularObject(String name, Object o) {
|
||||
RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess();
|
||||
if (remoteInterpreterProcess == null) {
|
||||
throw new RuntimeException("Remote Interpreter process not found");
|
||||
}
|
||||
return new RemoteAngularObject(name, o, getInterpreterGroupId(), this,
|
||||
getRemoteInterpreterProcess());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,113 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.nflabs.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.thrift.TException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.nflabs.zeppelin.display.AngularObject;
|
||||
import com.nflabs.zeppelin.display.AngularObjectRegistry;
|
||||
import com.nflabs.zeppelin.interpreter.InterpreterGroup;
|
||||
import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
|
||||
import com.nflabs.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class RemoteInterpreterEventPoller extends Thread {
|
||||
Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
|
||||
private RemoteInterpreterProcess interpreterProcess;
|
||||
boolean shutdown;
|
||||
private InterpreterGroup interpreterGroup;
|
||||
|
||||
public RemoteInterpreterEventPoller(
|
||||
InterpreterGroup interpreterGroup,
|
||||
RemoteInterpreterProcess interpreterProcess) {
|
||||
this.interpreterGroup = interpreterGroup;
|
||||
this.interpreterProcess = interpreterProcess;
|
||||
shutdown = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Client client = null;
|
||||
|
||||
while (shutdown == false) {
|
||||
try {
|
||||
client = interpreterProcess.getClient();
|
||||
} catch (Exception e1) {
|
||||
logger.error("Can't get RemoteInterpreterEvent", e1);
|
||||
try {
|
||||
synchronized (this) {
|
||||
wait(1000);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
RemoteInterpreterEvent event = null;
|
||||
try {
|
||||
event = client.getEvent();
|
||||
} catch (TException e) {
|
||||
logger.error("Can't get RemoteInterpreterEvent", e);
|
||||
try {
|
||||
synchronized (this) {
|
||||
wait(1000);
|
||||
}
|
||||
} catch (InterruptedException e1) {
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
interpreterProcess.releaseClient(client);
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
|
||||
|
||||
try {
|
||||
if (event.getType() == RemoteInterpreterEventType.NO_OP) {
|
||||
continue;
|
||||
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
|
||||
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
|
||||
angularObjectRegistry.add(angularObject.getName(), angularObject.get());
|
||||
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
|
||||
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
|
||||
AngularObject localAngularObject = angularObjectRegistry.get(angularObject.getName());
|
||||
localAngularObject.set(angularObject.get());
|
||||
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
|
||||
AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class);
|
||||
angularObjectRegistry.remove(angularObject.getName());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("Can't handle event " + event, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
shutdown = true;
|
||||
synchronized (this) {
|
||||
notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,502 @@
|
|||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.0)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
*/
|
||||
package com.nflabs.zeppelin.interpreter.thrift;
|
||||
|
||||
import org.apache.thrift.scheme.IScheme;
|
||||
import org.apache.thrift.scheme.SchemeFactory;
|
||||
import org.apache.thrift.scheme.StandardScheme;
|
||||
|
||||
import org.apache.thrift.scheme.TupleScheme;
|
||||
import org.apache.thrift.protocol.TTupleProtocol;
|
||||
import org.apache.thrift.protocol.TProtocolException;
|
||||
import org.apache.thrift.EncodingUtils;
|
||||
import org.apache.thrift.TException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.EnumMap;
|
||||
import java.util.Set;
|
||||
import java.util.HashSet;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Collections;
|
||||
import java.util.BitSet;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class RemoteInterpreterEvent implements org.apache.thrift.TBase<RemoteInterpreterEvent, RemoteInterpreterEvent._Fields>, java.io.Serializable, Cloneable {
|
||||
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent");
|
||||
|
||||
private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)1);
|
||||
private static final org.apache.thrift.protocol.TField DATA_FIELD_DESC = new org.apache.thrift.protocol.TField("data", org.apache.thrift.protocol.TType.STRING, (short)2);
|
||||
|
||||
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
|
||||
static {
|
||||
schemes.put(StandardScheme.class, new RemoteInterpreterEventStandardSchemeFactory());
|
||||
schemes.put(TupleScheme.class, new RemoteInterpreterEventTupleSchemeFactory());
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @see RemoteInterpreterEventType
|
||||
*/
|
||||
public RemoteInterpreterEventType type; // required
|
||||
public String data; // required
|
||||
|
||||
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
|
||||
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
|
||||
/**
|
||||
*
|
||||
* @see RemoteInterpreterEventType
|
||||
*/
|
||||
TYPE((short)1, "type"),
|
||||
DATA((short)2, "data");
|
||||
|
||||
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
|
||||
|
||||
static {
|
||||
for (_Fields field : EnumSet.allOf(_Fields.class)) {
|
||||
byName.put(field.getFieldName(), field);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByThriftId(int fieldId) {
|
||||
switch(fieldId) {
|
||||
case 1: // TYPE
|
||||
return TYPE;
|
||||
case 2: // DATA
|
||||
return DATA;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches fieldId, throwing an exception
|
||||
* if it is not found.
|
||||
*/
|
||||
public static _Fields findByThriftIdOrThrow(int fieldId) {
|
||||
_Fields fields = findByThriftId(fieldId);
|
||||
if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
|
||||
return fields;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the _Fields constant that matches name, or null if its not found.
|
||||
*/
|
||||
public static _Fields findByName(String name) {
|
||||
return byName.get(name);
|
||||
}
|
||||
|
||||
private final short _thriftId;
|
||||
private final String _fieldName;
|
||||
|
||||
_Fields(short thriftId, String fieldName) {
|
||||
_thriftId = thriftId;
|
||||
_fieldName = fieldName;
|
||||
}
|
||||
|
||||
public short getThriftFieldId() {
|
||||
return _thriftId;
|
||||
}
|
||||
|
||||
public String getFieldName() {
|
||||
return _fieldName;
|
||||
}
|
||||
}
|
||||
|
||||
// isset id assignments
|
||||
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
|
||||
static {
|
||||
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
|
||||
tmpMap.put(_Fields.TYPE, new org.apache.thrift.meta_data.FieldMetaData("type", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, RemoteInterpreterEventType.class)));
|
||||
tmpMap.put(_Fields.DATA, new org.apache.thrift.meta_data.FieldMetaData("data", org.apache.thrift.TFieldRequirementType.DEFAULT,
|
||||
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
|
||||
metaDataMap = Collections.unmodifiableMap(tmpMap);
|
||||
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteInterpreterEvent.class, metaDataMap);
|
||||
}
|
||||
|
||||
public RemoteInterpreterEvent() {
|
||||
}
|
||||
|
||||
public RemoteInterpreterEvent(
|
||||
RemoteInterpreterEventType type,
|
||||
String data)
|
||||
{
|
||||
this();
|
||||
this.type = type;
|
||||
this.data = data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs a deep copy on <i>other</i>.
|
||||
*/
|
||||
public RemoteInterpreterEvent(RemoteInterpreterEvent other) {
|
||||
if (other.isSetType()) {
|
||||
this.type = other.type;
|
||||
}
|
||||
if (other.isSetData()) {
|
||||
this.data = other.data;
|
||||
}
|
||||
}
|
||||
|
||||
public RemoteInterpreterEvent deepCopy() {
|
||||
return new RemoteInterpreterEvent(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear() {
|
||||
this.type = null;
|
||||
this.data = null;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @see RemoteInterpreterEventType
|
||||
*/
|
||||
public RemoteInterpreterEventType getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @see RemoteInterpreterEventType
|
||||
*/
|
||||
public RemoteInterpreterEvent setType(RemoteInterpreterEventType type) {
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetType() {
|
||||
this.type = null;
|
||||
}
|
||||
|
||||
/** Returns true if field type is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetType() {
|
||||
return this.type != null;
|
||||
}
|
||||
|
||||
public void setTypeIsSet(boolean value) {
|
||||
if (!value) {
|
||||
this.type = null;
|
||||
}
|
||||
}
|
||||
|
||||
public String getData() {
|
||||
return this.data;
|
||||
}
|
||||
|
||||
public RemoteInterpreterEvent setData(String data) {
|
||||
this.data = data;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void unsetData() {
|
||||
this.data = null;
|
||||
}
|
||||
|
||||
/** Returns true if field data is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSetData() {
|
||||
return this.data != null;
|
||||
}
|
||||
|
||||
public void setDataIsSet(boolean value) {
|
||||
if (!value) {
|
||||
this.data = null;
|
||||
}
|
||||
}
|
||||
|
||||
public void setFieldValue(_Fields field, Object value) {
|
||||
switch (field) {
|
||||
case TYPE:
|
||||
if (value == null) {
|
||||
unsetType();
|
||||
} else {
|
||||
setType((RemoteInterpreterEventType)value);
|
||||
}
|
||||
break;
|
||||
|
||||
case DATA:
|
||||
if (value == null) {
|
||||
unsetData();
|
||||
} else {
|
||||
setData((String)value);
|
||||
}
|
||||
break;
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
public Object getFieldValue(_Fields field) {
|
||||
switch (field) {
|
||||
case TYPE:
|
||||
return getType();
|
||||
|
||||
case DATA:
|
||||
return getData();
|
||||
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
/** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
|
||||
public boolean isSet(_Fields field) {
|
||||
if (field == null) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
switch (field) {
|
||||
case TYPE:
|
||||
return isSetType();
|
||||
case DATA:
|
||||
return isSetData();
|
||||
}
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
if (that instanceof RemoteInterpreterEvent)
|
||||
return this.equals((RemoteInterpreterEvent)that);
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean equals(RemoteInterpreterEvent that) {
|
||||
if (that == null)
|
||||
return false;
|
||||
|
||||
boolean this_present_type = true && this.isSetType();
|
||||
boolean that_present_type = true && that.isSetType();
|
||||
if (this_present_type || that_present_type) {
|
||||
if (!(this_present_type && that_present_type))
|
||||
return false;
|
||||
if (!this.type.equals(that.type))
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean this_present_data = true && this.isSetData();
|
||||
boolean that_present_data = true && that.isSetData();
|
||||
if (this_present_data || that_present_data) {
|
||||
if (!(this_present_data && that_present_data))
|
||||
return false;
|
||||
if (!this.data.equals(that.data))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
public int compareTo(RemoteInterpreterEvent other) {
|
||||
if (!getClass().equals(other.getClass())) {
|
||||
return getClass().getName().compareTo(other.getClass().getName());
|
||||
}
|
||||
|
||||
int lastComparison = 0;
|
||||
RemoteInterpreterEvent typedOther = (RemoteInterpreterEvent)other;
|
||||
|
||||
lastComparison = Boolean.valueOf(isSetType()).compareTo(typedOther.isSetType());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetType()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.type, typedOther.type);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
lastComparison = Boolean.valueOf(isSetData()).compareTo(typedOther.isSetData());
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
if (isSetData()) {
|
||||
lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.data, typedOther.data);
|
||||
if (lastComparison != 0) {
|
||||
return lastComparison;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public _Fields fieldForId(int fieldId) {
|
||||
return _Fields.findByThriftId(fieldId);
|
||||
}
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
|
||||
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
|
||||
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder("RemoteInterpreterEvent(");
|
||||
boolean first = true;
|
||||
|
||||
sb.append("type:");
|
||||
if (this.type == null) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
sb.append(this.type);
|
||||
}
|
||||
first = false;
|
||||
if (!first) sb.append(", ");
|
||||
sb.append("data:");
|
||||
if (this.data == null) {
|
||||
sb.append("null");
|
||||
} else {
|
||||
sb.append(this.data);
|
||||
}
|
||||
first = false;
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
public void validate() throws org.apache.thrift.TException {
|
||||
// check for required fields
|
||||
// check for sub-struct validity
|
||||
}
|
||||
|
||||
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
|
||||
try {
|
||||
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
|
||||
try {
|
||||
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
|
||||
} catch (org.apache.thrift.TException te) {
|
||||
throw new java.io.IOException(te);
|
||||
}
|
||||
}
|
||||
|
||||
private static class RemoteInterpreterEventStandardSchemeFactory implements SchemeFactory {
|
||||
public RemoteInterpreterEventStandardScheme getScheme() {
|
||||
return new RemoteInterpreterEventStandardScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class RemoteInterpreterEventStandardScheme extends StandardScheme<RemoteInterpreterEvent> {
|
||||
|
||||
public void read(org.apache.thrift.protocol.TProtocol iprot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
|
||||
org.apache.thrift.protocol.TField schemeField;
|
||||
iprot.readStructBegin();
|
||||
while (true)
|
||||
{
|
||||
schemeField = iprot.readFieldBegin();
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
|
||||
break;
|
||||
}
|
||||
switch (schemeField.id) {
|
||||
case 1: // TYPE
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
|
||||
struct.type = RemoteInterpreterEventType.findByValue(iprot.readI32());
|
||||
struct.setTypeIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
case 2: // DATA
|
||||
if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
|
||||
struct.data = iprot.readString();
|
||||
struct.setDataIsSet(true);
|
||||
} else {
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
|
||||
}
|
||||
iprot.readFieldEnd();
|
||||
}
|
||||
iprot.readStructEnd();
|
||||
|
||||
// check for required fields of primitive type, which can't be checked in the validate method
|
||||
struct.validate();
|
||||
}
|
||||
|
||||
public void write(org.apache.thrift.protocol.TProtocol oprot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
|
||||
struct.validate();
|
||||
|
||||
oprot.writeStructBegin(STRUCT_DESC);
|
||||
if (struct.type != null) {
|
||||
oprot.writeFieldBegin(TYPE_FIELD_DESC);
|
||||
oprot.writeI32(struct.type.getValue());
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
if (struct.data != null) {
|
||||
oprot.writeFieldBegin(DATA_FIELD_DESC);
|
||||
oprot.writeString(struct.data);
|
||||
oprot.writeFieldEnd();
|
||||
}
|
||||
oprot.writeFieldStop();
|
||||
oprot.writeStructEnd();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static class RemoteInterpreterEventTupleSchemeFactory implements SchemeFactory {
|
||||
public RemoteInterpreterEventTupleScheme getScheme() {
|
||||
return new RemoteInterpreterEventTupleScheme();
|
||||
}
|
||||
}
|
||||
|
||||
private static class RemoteInterpreterEventTupleScheme extends TupleScheme<RemoteInterpreterEvent> {
|
||||
|
||||
@Override
|
||||
public void write(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol oprot = (TTupleProtocol) prot;
|
||||
BitSet optionals = new BitSet();
|
||||
if (struct.isSetType()) {
|
||||
optionals.set(0);
|
||||
}
|
||||
if (struct.isSetData()) {
|
||||
optionals.set(1);
|
||||
}
|
||||
oprot.writeBitSet(optionals, 2);
|
||||
if (struct.isSetType()) {
|
||||
oprot.writeI32(struct.type.getValue());
|
||||
}
|
||||
if (struct.isSetData()) {
|
||||
oprot.writeString(struct.data);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void read(org.apache.thrift.protocol.TProtocol prot, RemoteInterpreterEvent struct) throws org.apache.thrift.TException {
|
||||
TTupleProtocol iprot = (TTupleProtocol) prot;
|
||||
BitSet incoming = iprot.readBitSet(2);
|
||||
if (incoming.get(0)) {
|
||||
struct.type = RemoteInterpreterEventType.findByValue(iprot.readI32());
|
||||
struct.setTypeIsSet(true);
|
||||
}
|
||||
if (incoming.get(1)) {
|
||||
struct.data = iprot.readString();
|
||||
struct.setDataIsSet(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Autogenerated by Thrift Compiler (0.9.0)
|
||||
*
|
||||
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
||||
* @generated
|
||||
*/
|
||||
package com.nflabs.zeppelin.interpreter.thrift;
|
||||
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import org.apache.thrift.TEnum;
|
||||
|
||||
public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum {
|
||||
NO_OP(1),
|
||||
ANGULAR_OBJECT_ADD(2),
|
||||
ANGULAR_OBJECT_UPDATE(3),
|
||||
ANGULAR_OBJECT_REMOVE(4);
|
||||
|
||||
private final int value;
|
||||
|
||||
private RemoteInterpreterEventType(int value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the integer value of this enum value, as defined in the Thrift IDL.
|
||||
*/
|
||||
public int getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a the enum type by its integer value, as defined in the Thrift IDL.
|
||||
* @return null if the value is not found.
|
||||
*/
|
||||
public static RemoteInterpreterEventType findByValue(int value) {
|
||||
switch (value) {
|
||||
case 1:
|
||||
return NO_OP;
|
||||
case 2:
|
||||
return ANGULAR_OBJECT_ADD;
|
||||
case 3:
|
||||
return ANGULAR_OBJECT_UPDATE;
|
||||
case 4:
|
||||
return ANGULAR_OBJECT_REMOVE;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter;
|
|||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
|
||||
/**
|
||||
|
|
@ -30,19 +31,22 @@ public class InterpreterContext {
|
|||
private final String paragraphText;
|
||||
private final Map<String, Object> config;
|
||||
private GUI gui;
|
||||
private AngularObjectRegistry angularObjectRegistry;
|
||||
|
||||
|
||||
public InterpreterContext(String paragraphId,
|
||||
String paragraphTitle,
|
||||
String paragraphText,
|
||||
Map<String, Object> config,
|
||||
GUI gui
|
||||
GUI gui,
|
||||
AngularObjectRegistry angularObjectRegistry
|
||||
) {
|
||||
this.paragraphId = paragraphId;
|
||||
this.paragraphTitle = paragraphTitle;
|
||||
this.paragraphText = paragraphText;
|
||||
this.config = config;
|
||||
this.gui = gui;
|
||||
this.angularObjectRegistry = angularObjectRegistry;
|
||||
}
|
||||
|
||||
public String getParagraphId() {
|
||||
|
|
@ -65,4 +69,8 @@ public class InterpreterContext {
|
|||
return gui;
|
||||
}
|
||||
|
||||
public AngularObjectRegistry getAngularObjectRegistry() {
|
||||
return angularObjectRegistry;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ import java.util.LinkedList;
|
|||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
|
||||
import com.nflabs.zeppelin.display.AngularObjectRegistry;
|
||||
|
||||
/**
|
||||
* InterpreterGroup is list of interpreters in the same group.
|
||||
* And unit of interpreter instantiate, restart, bind, unbind.
|
||||
|
|
@ -28,6 +30,12 @@ import java.util.Random;
|
|||
public class InterpreterGroup extends LinkedList<Interpreter>{
|
||||
String id;
|
||||
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
|
||||
public InterpreterGroup() {
|
||||
this.id = getId();
|
||||
}
|
||||
|
||||
private static String generateId() {
|
||||
return "InterpreterGroup_" + System.currentTimeMillis() + "_"
|
||||
+ new Random().nextInt();
|
||||
|
|
@ -51,6 +59,14 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
|
|||
return p;
|
||||
}
|
||||
|
||||
public AngularObjectRegistry getAngularObjectRegistry() {
|
||||
return angularObjectRegistry;
|
||||
}
|
||||
|
||||
public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
|
||||
this.angularObjectRegistry = angularObjectRegistry;
|
||||
}
|
||||
|
||||
public void close() {
|
||||
for (Interpreter intp : this) {
|
||||
intp.close();
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
int rc = interpreterProcess.reference();
|
||||
int rc = interpreterProcess.reference(getInterpreterGroup());
|
||||
|
||||
synchronized (interpreterProcess) {
|
||||
// when first process created
|
||||
|
|
|
|||
|
|
@ -29,10 +29,14 @@ import org.apache.commons.exec.ExecuteWatchdog;
|
|||
import org.apache.commons.exec.environment.EnvironmentUtils;
|
||||
import org.apache.commons.pool2.impl.GenericObjectPool;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.apache.thrift.TException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
|
@ -48,6 +52,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
|
||||
private GenericObjectPool<Client> clientPool;
|
||||
private Map<String, String> env;
|
||||
private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
|
||||
|
||||
public RemoteInterpreterProcess(String intpRunner, String intpDir, Map<String, String> env) {
|
||||
this.interpreterRunner = intpRunner;
|
||||
|
|
@ -60,7 +65,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
return port;
|
||||
}
|
||||
|
||||
public int reference() {
|
||||
public int reference(InterpreterGroup interpreterGroup) {
|
||||
synchronized (referenceCount) {
|
||||
if (executor == null) {
|
||||
// start server process
|
||||
|
|
@ -108,6 +113,9 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
}
|
||||
|
||||
clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port));
|
||||
|
||||
remoteInterpreterEventPoller = new RemoteInterpreterEventPoller(interpreterGroup, this);
|
||||
remoteInterpreterEventPoller.start();
|
||||
}
|
||||
return referenceCount.incrementAndGet();
|
||||
}
|
||||
|
|
@ -126,6 +134,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
int r = referenceCount.decrementAndGet();
|
||||
if (r == 0) {
|
||||
logger.info("shutdown interpreter process");
|
||||
remoteInterpreterEventPoller.shutdown();
|
||||
|
||||
// first try shutdown
|
||||
try {
|
||||
Client client = getClient();
|
||||
|
|
@ -205,4 +215,28 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
return clientPool.getNumIdle();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when angular object is updated in client side to propagate
|
||||
* change to the remote process
|
||||
* @param name
|
||||
* @param o
|
||||
*/
|
||||
public void updateRemoteAngularObject(String name, Object o) {
|
||||
Client client = null;
|
||||
try {
|
||||
client = getClient();
|
||||
} catch (Exception e) {
|
||||
logger.error("Can't update angular object", e);
|
||||
}
|
||||
|
||||
try {
|
||||
Gson gson = new Gson();
|
||||
client.angularObjectUpdate(name, gson.toJson(o));
|
||||
} catch (TException e) {
|
||||
logger.error("Can't update angular object", e);
|
||||
} finally {
|
||||
releaseClient(client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ package org.apache.zeppelin.interpreter.remote;
|
|||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.URL;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
|
@ -29,6 +30,10 @@ import org.apache.thrift.TException;
|
|||
import org.apache.thrift.server.TThreadPoolServer;
|
||||
import org.apache.thrift.transport.TServerSocket;
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObject.AngularObjectType;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.ClassloaderInterpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
|
|
@ -39,6 +44,8 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
|
|||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.FormType;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
|
|
@ -52,16 +59,17 @@ import org.slf4j.LoggerFactory;
|
|||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class RemoteInterpreterServer
|
||||
extends Thread
|
||||
implements RemoteInterpreterService.Iface {
|
||||
implements RemoteInterpreterService.Iface, AngularObjectRegistryListener {
|
||||
Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class);
|
||||
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup();
|
||||
InterpreterGroup interpreterGroup;
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
RemoteInterpreterService.Processor<RemoteInterpreterServer> processor;
|
||||
|
|
@ -69,8 +77,14 @@ public class RemoteInterpreterServer
|
|||
private int port;
|
||||
private TThreadPoolServer server;
|
||||
|
||||
List<RemoteInterpreterEvent> eventQueue = new LinkedList<RemoteInterpreterEvent>();
|
||||
|
||||
public RemoteInterpreterServer(int port) throws TTransportException {
|
||||
this.port = port;
|
||||
interpreterGroup = new InterpreterGroup();
|
||||
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
|
||||
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
|
||||
|
||||
processor = new RemoteInterpreterService.Processor<RemoteInterpreterServer>(this);
|
||||
TServerSocket serverTransport = new TServerSocket(port);
|
||||
server = new TThreadPoolServer(
|
||||
|
|
@ -306,7 +320,8 @@ public class RemoteInterpreterServer
|
|||
ric.getParagraphText(),
|
||||
(Map<String, Object>) gson.fromJson(ric.getConfig(),
|
||||
new TypeToken<Map<String, Object>>() {}.getType()),
|
||||
gson.fromJson(ric.getGui(), GUI.class));
|
||||
gson.fromJson(ric.getGui(), GUI.class),
|
||||
interpreterGroup.getAngularObjectRegistry());
|
||||
}
|
||||
|
||||
private RemoteInterpreterResult convert(InterpreterResult result,
|
||||
|
|
@ -339,4 +354,79 @@ public class RemoteInterpreterServer
|
|||
}
|
||||
return "Unknown";
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void onAdd(String interpreterGroupId, AngularObject object) {
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
RemoteInterpreterEventType.ANGULAR_OBJECT_ADD, gson.toJson(object)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(String interpreterGroupId, AngularObject object) {
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE, gson.toJson(object)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemove(String interpreterGroupId, AngularObject object) {
|
||||
sendEvent(new RemoteInterpreterEvent(
|
||||
RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE, gson.toJson(object)));
|
||||
}
|
||||
|
||||
private void sendEvent(RemoteInterpreterEvent event) {
|
||||
synchronized (eventQueue) {
|
||||
eventQueue.add(event);
|
||||
eventQueue.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public RemoteInterpreterEvent getEvent() throws TException {
|
||||
synchronized (eventQueue) {
|
||||
if (eventQueue.isEmpty()) {
|
||||
try {
|
||||
eventQueue.wait(1000);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
if (eventQueue.isEmpty()) {
|
||||
return new RemoteInterpreterEvent(RemoteInterpreterEventType.NO_OP, "");
|
||||
} else {
|
||||
return eventQueue.remove(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* called when object is updated in client (web) side.
|
||||
* @param className
|
||||
* @param name
|
||||
* @param object
|
||||
* @throws TException
|
||||
*/
|
||||
@Override
|
||||
public void angularObjectUpdate(String name, String object)
|
||||
throws TException {
|
||||
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
|
||||
AngularObject ao = registry.get(name);
|
||||
if (ao == null) {
|
||||
logger.error("Angular object {} not exists", name);
|
||||
return;
|
||||
}
|
||||
|
||||
if (ao.getType() == AngularObjectType.STRING) {
|
||||
String value = gson.fromJson(object, String.class);
|
||||
ao.set(value, false);
|
||||
} else if (ao.getType() == AngularObjectType.MAP) {
|
||||
Map<String, Object> value = gson.fromJson(object,
|
||||
new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
ao.set(value, false);
|
||||
} else {
|
||||
logger.error("Update angular object type {} not supported", ao.getType());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -35,6 +35,18 @@ struct RemoteInterpreterResult {
|
|||
5: string gui // json serialized gui
|
||||
}
|
||||
|
||||
enum RemoteInterpreterEventType {
|
||||
NO_OP = 1,
|
||||
ANGULAR_OBJECT_ADD = 2,
|
||||
ANGULAR_OBJECT_UPDATE = 3,
|
||||
ANGULAR_OBJECT_REMOVE = 4
|
||||
}
|
||||
|
||||
struct RemoteInterpreterEvent {
|
||||
1: RemoteInterpreterEventType type,
|
||||
2: string data // json serialized data
|
||||
}
|
||||
|
||||
service RemoteInterpreterService {
|
||||
void createInterpreter(1: string className, 2: map<string, string> properties);
|
||||
|
||||
|
|
@ -48,4 +60,7 @@ service RemoteInterpreterService {
|
|||
void shutdown();
|
||||
|
||||
string getStatus(1:string jobId);
|
||||
|
||||
RemoteInterpreterEvent getEvent();
|
||||
void angularObjectUpdate(1: string name, 2: string object);
|
||||
}
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.nflabs.zeppelin.display;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class AngularObjectTest {
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse;
|
|||
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
|
||||
import org.junit.Test;
|
||||
|
|
@ -30,11 +31,12 @@ public class RemoteInterpreterProcessTest {
|
|||
|
||||
@Test
|
||||
public void testStartStop() {
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
|
||||
assertFalse(rip.isRunning());
|
||||
assertEquals(0, rip.referenceCount());
|
||||
assertEquals(1, rip.reference());
|
||||
assertEquals(2, rip.reference());
|
||||
assertEquals(1, rip.reference(intpGroup));
|
||||
assertEquals(2, rip.reference(intpGroup));
|
||||
assertEquals(true, rip.isRunning());
|
||||
assertEquals(1, rip.dereference());
|
||||
assertEquals(true, rip.isRunning());
|
||||
|
|
@ -44,8 +46,9 @@ public class RemoteInterpreterProcessTest {
|
|||
|
||||
@Test
|
||||
public void testClientFactory() throws Exception {
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap<String, String>());
|
||||
rip.reference();
|
||||
rip.reference(intpGroup);
|
||||
assertEquals(0, rip.getNumActiveClient());
|
||||
assertEquals(0, rip.getNumIdleClient());
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
|
||||
import org.apache.thrift.transport.TTransportException;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
|
|
@ -109,7 +110,8 @@ public class RemoteInterpreterTest {
|
|||
"title",
|
||||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI()));
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
|
||||
intpB.open();
|
||||
assertEquals(2, process.referenceCount());
|
||||
|
|
@ -159,7 +161,8 @@ public class RemoteInterpreterTest {
|
|||
"title",
|
||||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI()));
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
assertEquals("500", ret.message());
|
||||
|
||||
ret = intpB.interpret("500",
|
||||
|
|
@ -168,7 +171,8 @@ public class RemoteInterpreterTest {
|
|||
"title",
|
||||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI()));
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
assertEquals("1000", ret.message());
|
||||
long end = System.currentTimeMillis();
|
||||
assertTrue(end - start >= 1000);
|
||||
|
|
@ -231,7 +235,8 @@ public class RemoteInterpreterTest {
|
|||
"title",
|
||||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI()));
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -262,7 +267,8 @@ public class RemoteInterpreterTest {
|
|||
"title",
|
||||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI()));
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -333,7 +339,8 @@ public class RemoteInterpreterTest {
|
|||
"title",
|
||||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI()));
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
|
||||
synchronized (results) {
|
||||
results.add(ret.message());
|
||||
|
|
@ -413,7 +420,8 @@ public class RemoteInterpreterTest {
|
|||
"title",
|
||||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI()));
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
|
||||
synchronized (results) {
|
||||
results.add(ret.message());
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterGroup;
|
||||
|
|
@ -53,7 +54,7 @@ public class RemoteSchedulerTest {
|
|||
@Test
|
||||
public void test() throws Exception {
|
||||
Properties p = new Properties();
|
||||
InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
final InterpreterGroup intpGroup = new InterpreterGroup();
|
||||
Map<String, String> env = new HashMap<String, String>();
|
||||
env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath());
|
||||
|
||||
|
|
@ -93,7 +94,8 @@ public class RemoteSchedulerTest {
|
|||
"title",
|
||||
"text",
|
||||
new HashMap<String, Object>(),
|
||||
new GUI()));
|
||||
new GUI(),
|
||||
new AngularObjectRegistry(intpGroup.getId(), null)));
|
||||
return "1000";
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -297,7 +297,7 @@ public class ZeppelinServer extends Application {
|
|||
|
||||
this.schedulerFactory = new SchedulerFactory();
|
||||
|
||||
this.replFactory = new InterpreterFactory(conf);
|
||||
this.replFactory = new InterpreterFactory(conf, notebookServer);
|
||||
notebook = new Notebook(conf, schedulerFactory, replFactory, notebookServer);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -90,6 +90,11 @@ public class Message {
|
|||
// @param notes serialized List<NoteInfo> object
|
||||
|
||||
PARAGRAPH_REMOVE,
|
||||
|
||||
ANGULAR_OBJECT_UPDATE, // [s-c] add/update angular object
|
||||
ANGULAR_OBJECT_REMOVE, // [s-c] add angular object del
|
||||
|
||||
ANGULAR_OBJECT_UPDATED // [c-s] angular object value updated
|
||||
}
|
||||
|
||||
public OP op;
|
||||
|
|
|
|||
|
|
@ -25,7 +25,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.notebook.JobListenerFactory;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Notebook;
|
||||
|
|
@ -50,7 +54,8 @@ import com.google.gson.Gson;
|
|||
*
|
||||
* @author anthonycorbacho
|
||||
*/
|
||||
public class NotebookServer extends WebSocketServer implements JobListenerFactory {
|
||||
public class NotebookServer extends WebSocketServer implements
|
||||
JobListenerFactory, AngularObjectRegistryListener {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class);
|
||||
private static final int DEFAULT_PORT = 8282;
|
||||
|
|
@ -130,6 +135,9 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
|
|||
case COMPLETION:
|
||||
completion(conn, notebook, messagereceived);
|
||||
break;
|
||||
case ANGULAR_OBJECT_UPDATED:
|
||||
angularObjectUpdated(conn, notebook, messagereceived);
|
||||
break;
|
||||
default:
|
||||
broadcastNoteList();
|
||||
break;
|
||||
|
|
@ -380,6 +388,41 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
|
|||
conn.send(serializeMessage(resp));
|
||||
}
|
||||
|
||||
/**
|
||||
* When angular object updated from client
|
||||
* @param conn
|
||||
* @param notebook
|
||||
* @param fromMessage
|
||||
*/
|
||||
private void angularObjectUpdated(WebSocket conn, Notebook notebook,
|
||||
Message fromMessage) {
|
||||
String noteId = (String) fromMessage.get("noteId");
|
||||
String interpreterGroupId = (String) fromMessage.get("interpreterGroupId");
|
||||
String varName = (String) fromMessage.get("name");
|
||||
Object varValue = fromMessage.get("value");
|
||||
|
||||
Note note = notebook.getNote(noteId);
|
||||
List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings();
|
||||
for (InterpreterSetting setting : settings) {
|
||||
if (setting.getInterpreterGroup() == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) {
|
||||
AngularObjectRegistry angularObjectRegistry = setting
|
||||
.getInterpreterGroup().getAngularObjectRegistry();
|
||||
AngularObject ao = angularObjectRegistry.get(varName);
|
||||
if (ao == null) {
|
||||
LOG.warn("Object {} is not binded", varName);
|
||||
} else {
|
||||
// path from client -> server
|
||||
ao.set(varValue, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage)
|
||||
throws IOException {
|
||||
final String paragraphId = (String) fromMessage.get("id");
|
||||
|
|
@ -497,4 +540,48 @@ public class NotebookServer extends WebSocketServer implements JobListenerFactor
|
|||
public JobListener getParagraphJobListener(Note note) {
|
||||
return new ParagraphJobListener(this, note);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAdd(String interpreterGroupId, AngularObject object) {
|
||||
onUpdate(interpreterGroupId, object);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(String interpreterGroupId, AngularObject object) {
|
||||
Notebook notebook = notebook();
|
||||
List<Note> notes = notebook.getAllNotes();
|
||||
for (Note note : notes) {
|
||||
List<InterpreterSetting> intpSettings = note.getNoteReplLoader()
|
||||
.getInterpreterSettings();
|
||||
|
||||
if (intpSettings.isEmpty()) continue;
|
||||
|
||||
for (InterpreterSetting setting : intpSettings) {
|
||||
|
||||
if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) {
|
||||
broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE)
|
||||
.put("angularObject", object)
|
||||
.put("interpreterGroupId", interpreterGroupId)
|
||||
.put("noteId", note.id()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRemove(String interpreterGroupId, AngularObject object) {
|
||||
Notebook notebook = notebook();
|
||||
List<Note> notes = notebook.getAllNotes();
|
||||
for (Note note : notes) {
|
||||
List<String> ids = note.getNoteReplLoader().getInterpreters();
|
||||
for (String id : ids) {
|
||||
if (id.equals(interpreterGroupId)) {
|
||||
broadcast(
|
||||
note.id(),
|
||||
new Message(OP.ANGULAR_OBJECT_REMOVE).put("name",
|
||||
object.getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,8 @@
|
|||
*/
|
||||
angular.module('zeppelinWebApp')
|
||||
.controller('MainCtrl', function($scope, WebSocket, $rootScope, $window) {
|
||||
|
||||
$rootScope.compiledScope = $scope.$new(true, $rootScope);
|
||||
$rootScope.angularObjectRegistry = {};
|
||||
$scope.WebSocketWaitingList = [];
|
||||
$scope.connected = false;
|
||||
$scope.looknfeel = 'default';
|
||||
|
|
@ -65,6 +66,8 @@ angular.module('zeppelinWebApp')
|
|||
$scope.$broadcast('updateProgress', data);
|
||||
} else if (op === 'COMPLETION_LIST') {
|
||||
$scope.$broadcast('completionList', data);
|
||||
} else if (op === 'ANGULAR_OBJECT_UPDATE') {
|
||||
$scope.$broadcast('angularObjectUpdate', data);
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -442,4 +442,35 @@ angular.module('zeppelinWebApp').controller('NotebookCtrl', function($scope, $ro
|
|||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
$scope.$on('angularObjectUpdate', function(event, data) {
|
||||
if (data.noteId === $scope.note.id) {
|
||||
var scope = $rootScope.compiledScope;
|
||||
var varName = data.angularObject.name;
|
||||
|
||||
$rootScope.angularObjectRegistry[varName] = {
|
||||
interpreterGroupId : data.interpreterGroupId
|
||||
};
|
||||
scope[varName] = data.angularObject.object;
|
||||
|
||||
scope.$watch(varName, function(newValue, oldValue) {
|
||||
console.log("value updated to %o %o", varName, newValue);
|
||||
$rootScope.$emit('sendNewEvent', {
|
||||
op: 'ANGULAR_OBJECT_UPDATED',
|
||||
data: {
|
||||
noteId: $routeParams.noteId,
|
||||
name:varName,
|
||||
value:newValue,
|
||||
interpreterGroupId:$rootScope.angularObjectRegistry[varName].interpreterGroupId
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
var isFunction = function(functionToCheck) {
|
||||
var getType = {};
|
||||
return functionToCheck && getType.toString.call(functionToCheck) === '[object Function]';
|
||||
}
|
||||
|
||||
});
|
||||
|
|
|
|||
|
|
@ -84,7 +84,8 @@ angular.module('zeppelinWebApp')
|
|||
if ($('#p'+$scope.paragraph.id+'_angular').length) {
|
||||
try {
|
||||
$('#p'+$scope.paragraph.id+'_angular').html($scope.paragraph.result.msg);
|
||||
$compile($('#p'+$scope.paragraph.id+'_angular').contents())($scope);
|
||||
|
||||
$compile($('#p'+$scope.paragraph.id+'_angular').contents())($rootScope.compiledScope);
|
||||
} catch(err) {
|
||||
console.log('ANGULAR rendering error %o', err);
|
||||
}
|
||||
|
|
@ -1615,5 +1616,4 @@ angular.module('zeppelinWebApp')
|
|||
var redirectToUrl = location.protocol + '//' + location.host + '/#/notebook/' + noteId + '/paragraph/' + $scope.paragraph.id+'?asIframe';
|
||||
$window.open(redirectToUrl);
|
||||
};
|
||||
|
||||
});
|
||||
|
|
|
|||
|
|
@ -43,7 +43,10 @@ import java.util.Set;
|
|||
import org.apache.commons.lang.ArrayUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -73,15 +76,21 @@ public class InterpreterFactory {
|
|||
|
||||
private InterpreterOption defaultOption;
|
||||
|
||||
public InterpreterFactory(ZeppelinConfiguration conf) throws InterpreterException, IOException {
|
||||
this(conf, new InterpreterOption(true));
|
||||
AngularObjectRegistryListener angularObjectRegistryListener;
|
||||
|
||||
public InterpreterFactory(ZeppelinConfiguration conf,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener)
|
||||
throws InterpreterException, IOException {
|
||||
this(conf, new InterpreterOption(true), angularObjectRegistryListener);
|
||||
}
|
||||
|
||||
|
||||
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption)
|
||||
public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener)
|
||||
throws InterpreterException, IOException {
|
||||
this.conf = conf;
|
||||
this.defaultOption = defaultOption;
|
||||
this.angularObjectRegistryListener = angularObjectRegistryListener;
|
||||
String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
|
||||
interpreterClassList = replsConf.split(",");
|
||||
|
||||
|
|
@ -338,7 +347,24 @@ public class InterpreterFactory {
|
|||
InterpreterOption option,
|
||||
Properties properties)
|
||||
throws InterpreterException {
|
||||
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup();
|
||||
if (option.isRemote()) {
|
||||
angularObjectRegistry = new RemoteAngularObjectRegistry(
|
||||
interpreterGroup.getId(),
|
||||
angularObjectRegistryListener,
|
||||
interpreterGroup
|
||||
);
|
||||
} else {
|
||||
angularObjectRegistry = new AngularObjectRegistry(
|
||||
interpreterGroup.getId(),
|
||||
angularObjectRegistryListener);
|
||||
}
|
||||
|
||||
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
|
||||
|
||||
|
||||
for (String className : interpreterClassList) {
|
||||
Set<String> keys = Interpreter.registeredInterpreters.keySet();
|
||||
|
|
|
|||
|
|
@ -23,11 +23,13 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.display.Input;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.FormType;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.JobListener;
|
||||
|
|
@ -207,11 +209,19 @@ public class Paragraph extends Job implements Serializable {
|
|||
}
|
||||
|
||||
private InterpreterContext getInterpreterContext() {
|
||||
AngularObjectRegistry registry = null;
|
||||
|
||||
if (!getNoteReplLoader().getInterpreterSettings().isEmpty()) {
|
||||
InterpreterSetting intpGroup = getNoteReplLoader().getInterpreterSettings().get(0);
|
||||
registry = intpGroup.getInterpreterGroup().getAngularObjectRegistry();
|
||||
}
|
||||
|
||||
InterpreterContext interpreterContext = new InterpreterContext(getId(),
|
||||
this.getTitle(),
|
||||
this.getText(),
|
||||
this.getConfig(),
|
||||
this.settings);
|
||||
this.settings,
|
||||
registry);
|
||||
return interpreterContext;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -54,8 +54,8 @@ public class InterpreterFactoryTest {
|
|||
System.setProperty(ConfVars.ZEPPELIN_HOME.getVarName(), tmpDir.getAbsolutePath());
|
||||
System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
conf = new ZeppelinConfiguration();
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false));
|
||||
context = new InterpreterContext("id", "title", "text", null, null);
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
|
||||
context = new InterpreterContext("id", "title", "text", null, null, null);
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -122,7 +122,7 @@ public class InterpreterFactoryTest {
|
|||
factory.add("newsetting", "mock1", new InterpreterOption(false), new Properties());
|
||||
assertEquals(3, factory.get().size());
|
||||
|
||||
InterpreterFactory factory2 = new InterpreterFactory(conf);
|
||||
InterpreterFactory factory2 = new InterpreterFactory(conf, null);
|
||||
assertEquals(3, factory2.get().size());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1");
|
||||
MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2");
|
||||
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false));
|
||||
factory = new InterpreterFactory(conf, new InterpreterOption(false), null);
|
||||
|
||||
notebook = new Notebook(conf, schedulerFactory, factory, this);
|
||||
}
|
||||
|
|
@ -108,7 +108,7 @@ public class NotebookTest implements JobListenerFactory{
|
|||
p1.setText("hello world");
|
||||
note.persist();
|
||||
|
||||
Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf), this);
|
||||
Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf, null), this);
|
||||
assertEquals(1, notebook2.getAllNotes().size());
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue