Added Interpreter Hooks to Interpreter Process

This commit is contained in:
Alex Goodman 2016-09-28 12:27:12 -07:00
parent c717daf655
commit 8fad936744
6 changed files with 390 additions and 1 deletions

View file

@ -27,6 +27,7 @@ import java.util.Properties;
import com.google.gson.annotations.SerializedName;
import org.apache.zeppelin.annotation.ZeppelinApi;
import org.apache.zeppelin.annotation.Experimental;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@ -203,6 +204,71 @@ public abstract class Interpreter {
this.classloaderUrls = classloaderUrls;
}
/**
* General function to register hook event
* @param noteId - Note to bind hook to
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerHook(String noteId, String event, String cmd) {
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
String className = getClassName();
hooks.register(noteId, className, event, cmd);
}
/**
* registerHook() wrapper for global scope
* @param event The type of event to hook to (pre_exec, post_exec)
* @param cmd The code to be executed by the interpreter on given event
*/
@Experimental
public void registerHook(String event, String cmd) {
registerHook(null, event, cmd);
}
/**
* Get the hook code
* @param noteId - Note to bind hook to
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public String getHook(String noteId, String event) {
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
String className = getClassName();
return hooks.get(noteId, className, event);
}
/**
* getHook() wrapper for global scope
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public String getHook(String event) {
return getHook(null, event);
}
/**
* Unbind code from given hook event
* @param noteId - Note to bind hook to
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterHook(String noteId, String event) {
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
String className = getClassName();
hooks.unregister(noteId, className, event);
}
/**
* unregisterHook() wrapper for global scope
* @param event The type of event to hook to (pre_exec, post_exec)
*/
@Experimental
public void unregisterHook(String event) {
unregisterHook(null, event);
}
@ZeppelinApi
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
synchronized (interpreterGroup) {

View file

@ -45,6 +45,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
Logger LOGGER = Logger.getLogger(InterpreterGroup.class);
AngularObjectRegistry angularObjectRegistry;
InterpreterHookRegistry hookRegistry;
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
ResourcePool resourcePool;
boolean angularRegistryPushed = false;
@ -118,10 +119,18 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
public AngularObjectRegistry getAngularObjectRegistry() {
return angularObjectRegistry;
}
public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
this.angularObjectRegistry = angularObjectRegistry;
}
public InterpreterHookRegistry getInterpreterHookRegistry() {
return hookRegistry;
}
public void setInterpreterHookRegistry(InterpreterHookRegistry hookRegistry) {
this.hookRegistry = hookRegistry;
}
public RemoteInterpreterProcess getRemoteInterpreterProcess() {
return remoteInterpreterProcess;

View file

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
/**
* An interface for processing custom callback code into the interpreter.
*/
public interface InterpreterHookListener {
/**
* Prepends pre-execute hook code to the script that will be interpreted
*/
public void onPreExecute(String script);
/**
* Prepends pre-execute hook code to the script that will be interpreted
*/
public void onPostExecute(String script);
}

View file

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import java.util.HashMap;
import java.util.Map;
/**
* The InterpreterinterpreterHookRegistry specifies code to be conditionally executed by an
* interpreter. The constants defined in this class denote currently
* supported events. Each instance is bound to a single InterpreterGroup.
* Scope is determined on a per-note basis (except when null for global scope).
*/
public class InterpreterHookRegistry {
public static final String GLOBAL_KEY = "_GLOBAL_";
private String interpreterId;
private Map<String, Map<String, Map<String, String>>> registry =
new HashMap<String, Map<String, Map<String, String>>>();
/**
* hookRegistry constructor.
*
* @param interpreterId The Id of the InterpreterGroup instance to bind to
*/
public InterpreterHookRegistry(final String interpreterId) {
this.interpreterId = interpreterId;
}
/**
* Get the interpreterGroup id this instance is bound to
*/
public String getInterpreterId() {
return interpreterId;
}
/**
* Adds a note to the registry
*
* @param noteId The Id of the Note instance to add
*/
public void addNote(String noteId) {
synchronized (registry) {
if (registry.get(noteId) == null) {
registry.put(noteId, new HashMap<String, Map<String, String>>());
}
}
}
/**
* Adds a className to the registry
*
* @param noteId The note id
* @param className The name of the interpreter repl to map the hooks to
*/
public void addRepl(String noteId, String className) {
synchronized (registry) {
addNote(noteId);
if (registry.get(noteId).get(className) == null) {
registry.get(noteId).put(className, new HashMap<String, String>());
}
}
}
/**
* Register a hook for a specific event.
*
* @param noteId Denotes the note this instance belongs to
* @param className The name of the interpreter repl to map the hooks to
* @param event hook event (see constants defined in this class)
* @param cmd Code to be executed by the interpreter
*/
public void register(String noteId, String className,
String event, String cmd) throws IllegalArgumentException {
synchronized (registry) {
if (noteId == null) {
noteId = GLOBAL_KEY;
}
addRepl(noteId, className);
if (!event.equals(HookType.POST_EXEC) && !event.equals(HookType.PRE_EXEC) &&
!event.equals(HookType.POST_EXEC_DEV) && !event.equals(HookType.PRE_EXEC_DEV)) {
throw new IllegalArgumentException("Must be " + HookType.POST_EXEC + ", " +
HookType.POST_EXEC_DEV + ", " +
HookType.PRE_EXEC + " or " +
HookType.PRE_EXEC_DEV);
}
registry.get(noteId).get(className).put(event, cmd);
}
}
/**
* Unregister a hook for a specific event.
*
* @param noteId Denotes the note this instance belongs to
* @param className The name of the interpreter repl to map the hooks to
* @param event hook event (see constants defined in this class)
*/
public void unregister(String noteId, String className, String event) {
synchronized (registry) {
if (noteId == null) {
noteId = GLOBAL_KEY;
}
addRepl(noteId, className);
registry.get(noteId).get(className).remove(event);
}
}
/**
* Get a hook for a specific event.
*
* @param noteId Denotes the note this instance belongs to
* @param className The name of the interpreter repl to map the hooks to
* @param event hook event (see constants defined in this class)
*/
public String get(String noteId, String className, String event) {
synchronized (registry) {
if (noteId == null) {
noteId = GLOBAL_KEY;
}
addRepl(noteId, className);
return registry.get(noteId).get(className).get(event);
}
}
/**
* Container for hook event type constants
*/
public static final class HookType {
// Execute the hook code PRIOR to main paragraph code execution
public static final String PRE_EXEC = "pre_exec";
// Execute the hook code AFTER main paragraph code execution
public static final String POST_EXEC = "post_exec";
// Same as above but reserved for interpreter developers, in order to allow
// notebook users to use the above without overwriting registry settings
// that are initialized directly in subclasses of Interpreter.
public static final String PRE_EXEC_DEV = "pre_exec_dev";
public static final String POST_EXEC_DEV = "post_exec_dev";
}
}

View file

@ -33,6 +33,8 @@ import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.*;
import org.apache.zeppelin.helium.*;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
import org.apache.zeppelin.interpreter.InterpreterHookListener;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
import org.apache.zeppelin.interpreter.thrift.*;
@ -60,6 +62,7 @@ public class RemoteInterpreterServer
InterpreterGroup interpreterGroup;
AngularObjectRegistry angularObjectRegistry;
InterpreterHookRegistry hookRegistry;
DistributedResourcePool resourcePool;
private ApplicationLoader appLoader;
@ -152,7 +155,9 @@ public class RemoteInterpreterServer
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
hookRegistry = new InterpreterHookRegistry(interpreterGroup.getId());
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
interpreterGroup.setInterpreterHookRegistry(hookRegistry);
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
interpreterGroup.setResourcePool(resourcePool);
@ -383,10 +388,53 @@ public class RemoteInterpreterServer
return infos;
}
private void processInterpreterHooks(final String noteId) {
InterpreterHookListener hookListener = new InterpreterHookListener() {
@Override
public void onPreExecute(String script) {
String cmdDev = interpreter.getHook(noteId, HookType.PRE_EXEC_DEV);
String cmdUser = interpreter.getHook(noteId, HookType.PRE_EXEC);
// User defined hook should be executed before dev hook
List<String> cmds = Arrays.asList(cmdDev, cmdUser);
for (String cmd : cmds) {
if (cmd != null) {
script = cmd + '\n' + script;
}
}
InterpretJob.this.script = script;
}
@Override
public void onPostExecute(String script) {
String cmdDev = interpreter.getHook(noteId, HookType.POST_EXEC_DEV);
String cmdUser = interpreter.getHook(noteId, HookType.POST_EXEC);
// User defined hook should be executed after dev hook
List<String> cmds = Arrays.asList(cmdUser, cmdDev);
for (String cmd : cmds) {
if (cmd != null) {
script += '\n' + cmd;
}
}
InterpretJob.this.script = script;
}
};
hookListener.onPreExecute(script);
hookListener.onPostExecute(script);
}
@Override
protected Object jobRun() throws Throwable {
try {
InterpreterContext.set(context);
// Add hooks to script from registry.
// Global scope first, followed by notebook scope
processInterpreterHooks(null);
processInterpreterHooks(context.getNoteId());
InterpreterResult result = interpreter.interpret(script, context);
// data from context.out is prepended to InterpreterResult if both defined

View file

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
public class InterpreterHookRegistryTest {
@Test
public void testBasic() {
final String PRE_EXEC = InterpreterHookRegistry.HookType.PRE_EXEC;
final String POST_EXEC = InterpreterHookRegistry.HookType.POST_EXEC;
final String PRE_EXEC_DEV = InterpreterHookRegistry.HookType.PRE_EXEC_DEV;
final String POST_EXEC_DEV = InterpreterHookRegistry.HookType.POST_EXEC_DEV;
final String GLOBAL_KEY = InterpreterHookRegistry.GLOBAL_KEY;
final String noteId = "note";
final String className = "class";
final String preExecHook = "pre";
final String postExecHook = "post";
InterpreterHookRegistry registry = new InterpreterHookRegistry("intpId");
// Test register()
registry.register(noteId, className, PRE_EXEC, preExecHook);
registry.register(noteId, className, POST_EXEC, postExecHook);
registry.register(noteId, className, PRE_EXEC_DEV, preExecHook);
registry.register(noteId, className, POST_EXEC_DEV, postExecHook);
// Test get()
assertEquals(registry.get(noteId, className, PRE_EXEC), preExecHook);
assertEquals(registry.get(noteId, className, POST_EXEC), postExecHook);
assertEquals(registry.get(noteId, className, PRE_EXEC_DEV), preExecHook);
assertEquals(registry.get(noteId, className, POST_EXEC_DEV), postExecHook);
// Test Unregister
registry.unregister(noteId, className, PRE_EXEC);
registry.unregister(noteId, className, POST_EXEC);
registry.unregister(noteId, className, PRE_EXEC_DEV);
registry.unregister(noteId, className, POST_EXEC_DEV);
assertNull(registry.get(noteId, className, PRE_EXEC));
assertNull(registry.get(noteId, className, POST_EXEC));
assertNull(registry.get(noteId, className, PRE_EXEC_DEV));
assertNull(registry.get(noteId, className, POST_EXEC_DEV));
// Test Global Scope
registry.register(null, className, PRE_EXEC, preExecHook);
assertEquals(registry.get(GLOBAL_KEY, className, PRE_EXEC), preExecHook);
}
@Test(expected = IllegalArgumentException.class)
public void testValidEventCode() {
InterpreterHookRegistry registry = new InterpreterHookRegistry("intpId");
// Test that only valid event codes ("pre_exec", "post_exec") are accepted
registry.register("foo", "bar", "baz", "whatever");
}
}