mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Added Interpreter Hooks to Interpreter Process
This commit is contained in:
parent
c717daf655
commit
8fad936744
6 changed files with 390 additions and 1 deletions
|
|
@ -27,6 +27,7 @@ import java.util.Properties;
|
|||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.zeppelin.annotation.ZeppelinApi;
|
||||
import org.apache.zeppelin.annotation.Experimental;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
|
|
@ -203,6 +204,71 @@ public abstract class Interpreter {
|
|||
this.classloaderUrls = classloaderUrls;
|
||||
}
|
||||
|
||||
/**
|
||||
* General function to register hook event
|
||||
* @param noteId - Note to bind hook to
|
||||
* @param event The type of event to hook to (pre_exec, post_exec)
|
||||
* @param cmd The code to be executed by the interpreter on given event
|
||||
*/
|
||||
@Experimental
|
||||
public void registerHook(String noteId, String event, String cmd) {
|
||||
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
|
||||
String className = getClassName();
|
||||
hooks.register(noteId, className, event, cmd);
|
||||
}
|
||||
|
||||
/**
|
||||
* registerHook() wrapper for global scope
|
||||
* @param event The type of event to hook to (pre_exec, post_exec)
|
||||
* @param cmd The code to be executed by the interpreter on given event
|
||||
*/
|
||||
@Experimental
|
||||
public void registerHook(String event, String cmd) {
|
||||
registerHook(null, event, cmd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the hook code
|
||||
* @param noteId - Note to bind hook to
|
||||
* @param event The type of event to hook to (pre_exec, post_exec)
|
||||
*/
|
||||
@Experimental
|
||||
public String getHook(String noteId, String event) {
|
||||
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
|
||||
String className = getClassName();
|
||||
return hooks.get(noteId, className, event);
|
||||
}
|
||||
|
||||
/**
|
||||
* getHook() wrapper for global scope
|
||||
* @param event The type of event to hook to (pre_exec, post_exec)
|
||||
*/
|
||||
@Experimental
|
||||
public String getHook(String event) {
|
||||
return getHook(null, event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Unbind code from given hook event
|
||||
* @param noteId - Note to bind hook to
|
||||
* @param event The type of event to hook to (pre_exec, post_exec)
|
||||
*/
|
||||
@Experimental
|
||||
public void unregisterHook(String noteId, String event) {
|
||||
InterpreterHookRegistry hooks = interpreterGroup.getInterpreterHookRegistry();
|
||||
String className = getClassName();
|
||||
hooks.unregister(noteId, className, event);
|
||||
}
|
||||
|
||||
/**
|
||||
* unregisterHook() wrapper for global scope
|
||||
* @param event The type of event to hook to (pre_exec, post_exec)
|
||||
*/
|
||||
@Experimental
|
||||
public void unregisterHook(String event) {
|
||||
unregisterHook(null, event);
|
||||
}
|
||||
|
||||
@ZeppelinApi
|
||||
public Interpreter getInterpreterInTheSameSessionByClassName(String className) {
|
||||
synchronized (interpreterGroup) {
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
Logger LOGGER = Logger.getLogger(InterpreterGroup.class);
|
||||
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
InterpreterHookRegistry hookRegistry;
|
||||
RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
|
||||
ResourcePool resourcePool;
|
||||
boolean angularRegistryPushed = false;
|
||||
|
|
@ -118,10 +119,18 @@ public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter
|
|||
public AngularObjectRegistry getAngularObjectRegistry() {
|
||||
return angularObjectRegistry;
|
||||
}
|
||||
|
||||
|
||||
public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) {
|
||||
this.angularObjectRegistry = angularObjectRegistry;
|
||||
}
|
||||
|
||||
public InterpreterHookRegistry getInterpreterHookRegistry() {
|
||||
return hookRegistry;
|
||||
}
|
||||
|
||||
public void setInterpreterHookRegistry(InterpreterHookRegistry hookRegistry) {
|
||||
this.hookRegistry = hookRegistry;
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcess getRemoteInterpreterProcess() {
|
||||
return remoteInterpreterProcess;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
/**
|
||||
* An interface for processing custom callback code into the interpreter.
|
||||
*/
|
||||
public interface InterpreterHookListener {
|
||||
/**
|
||||
* Prepends pre-execute hook code to the script that will be interpreted
|
||||
*/
|
||||
public void onPreExecute(String script);
|
||||
|
||||
/**
|
||||
* Prepends pre-execute hook code to the script that will be interpreted
|
||||
*/
|
||||
public void onPostExecute(String script);
|
||||
}
|
||||
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* The InterpreterinterpreterHookRegistry specifies code to be conditionally executed by an
|
||||
* interpreter. The constants defined in this class denote currently
|
||||
* supported events. Each instance is bound to a single InterpreterGroup.
|
||||
* Scope is determined on a per-note basis (except when null for global scope).
|
||||
*/
|
||||
public class InterpreterHookRegistry {
|
||||
public static final String GLOBAL_KEY = "_GLOBAL_";
|
||||
private String interpreterId;
|
||||
private Map<String, Map<String, Map<String, String>>> registry =
|
||||
new HashMap<String, Map<String, Map<String, String>>>();
|
||||
|
||||
/**
|
||||
* hookRegistry constructor.
|
||||
*
|
||||
* @param interpreterId The Id of the InterpreterGroup instance to bind to
|
||||
*/
|
||||
public InterpreterHookRegistry(final String interpreterId) {
|
||||
this.interpreterId = interpreterId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the interpreterGroup id this instance is bound to
|
||||
*/
|
||||
public String getInterpreterId() {
|
||||
return interpreterId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a note to the registry
|
||||
*
|
||||
* @param noteId The Id of the Note instance to add
|
||||
*/
|
||||
public void addNote(String noteId) {
|
||||
synchronized (registry) {
|
||||
if (registry.get(noteId) == null) {
|
||||
registry.put(noteId, new HashMap<String, Map<String, String>>());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a className to the registry
|
||||
*
|
||||
* @param noteId The note id
|
||||
* @param className The name of the interpreter repl to map the hooks to
|
||||
*/
|
||||
public void addRepl(String noteId, String className) {
|
||||
synchronized (registry) {
|
||||
addNote(noteId);
|
||||
if (registry.get(noteId).get(className) == null) {
|
||||
registry.get(noteId).put(className, new HashMap<String, String>());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register a hook for a specific event.
|
||||
*
|
||||
* @param noteId Denotes the note this instance belongs to
|
||||
* @param className The name of the interpreter repl to map the hooks to
|
||||
* @param event hook event (see constants defined in this class)
|
||||
* @param cmd Code to be executed by the interpreter
|
||||
*/
|
||||
public void register(String noteId, String className,
|
||||
String event, String cmd) throws IllegalArgumentException {
|
||||
synchronized (registry) {
|
||||
if (noteId == null) {
|
||||
noteId = GLOBAL_KEY;
|
||||
}
|
||||
addRepl(noteId, className);
|
||||
if (!event.equals(HookType.POST_EXEC) && !event.equals(HookType.PRE_EXEC) &&
|
||||
!event.equals(HookType.POST_EXEC_DEV) && !event.equals(HookType.PRE_EXEC_DEV)) {
|
||||
throw new IllegalArgumentException("Must be " + HookType.POST_EXEC + ", " +
|
||||
HookType.POST_EXEC_DEV + ", " +
|
||||
HookType.PRE_EXEC + " or " +
|
||||
HookType.PRE_EXEC_DEV);
|
||||
}
|
||||
registry.get(noteId).get(className).put(event, cmd);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unregister a hook for a specific event.
|
||||
*
|
||||
* @param noteId Denotes the note this instance belongs to
|
||||
* @param className The name of the interpreter repl to map the hooks to
|
||||
* @param event hook event (see constants defined in this class)
|
||||
*/
|
||||
public void unregister(String noteId, String className, String event) {
|
||||
synchronized (registry) {
|
||||
if (noteId == null) {
|
||||
noteId = GLOBAL_KEY;
|
||||
}
|
||||
addRepl(noteId, className);
|
||||
registry.get(noteId).get(className).remove(event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a hook for a specific event.
|
||||
*
|
||||
* @param noteId Denotes the note this instance belongs to
|
||||
* @param className The name of the interpreter repl to map the hooks to
|
||||
* @param event hook event (see constants defined in this class)
|
||||
*/
|
||||
public String get(String noteId, String className, String event) {
|
||||
synchronized (registry) {
|
||||
if (noteId == null) {
|
||||
noteId = GLOBAL_KEY;
|
||||
}
|
||||
addRepl(noteId, className);
|
||||
return registry.get(noteId).get(className).get(event);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Container for hook event type constants
|
||||
*/
|
||||
public static final class HookType {
|
||||
// Execute the hook code PRIOR to main paragraph code execution
|
||||
public static final String PRE_EXEC = "pre_exec";
|
||||
|
||||
// Execute the hook code AFTER main paragraph code execution
|
||||
public static final String POST_EXEC = "post_exec";
|
||||
|
||||
// Same as above but reserved for interpreter developers, in order to allow
|
||||
// notebook users to use the above without overwriting registry settings
|
||||
// that are initialized directly in subclasses of Interpreter.
|
||||
public static final String PRE_EXEC_DEV = "pre_exec_dev";
|
||||
public static final String POST_EXEC_DEV = "post_exec_dev";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -33,6 +33,8 @@ import org.apache.zeppelin.dep.DependencyResolver;
|
|||
import org.apache.zeppelin.display.*;
|
||||
import org.apache.zeppelin.helium.*;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookRegistry.HookType;
|
||||
import org.apache.zeppelin.interpreter.InterpreterHookListener;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
|
||||
import org.apache.zeppelin.interpreter.thrift.*;
|
||||
|
|
@ -60,6 +62,7 @@ public class RemoteInterpreterServer
|
|||
|
||||
InterpreterGroup interpreterGroup;
|
||||
AngularObjectRegistry angularObjectRegistry;
|
||||
InterpreterHookRegistry hookRegistry;
|
||||
DistributedResourcePool resourcePool;
|
||||
private ApplicationLoader appLoader;
|
||||
|
||||
|
|
@ -152,7 +155,9 @@ public class RemoteInterpreterServer
|
|||
if (interpreterGroup == null) {
|
||||
interpreterGroup = new InterpreterGroup(interpreterGroupId);
|
||||
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
|
||||
hookRegistry = new InterpreterHookRegistry(interpreterGroup.getId());
|
||||
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
|
||||
interpreterGroup.setInterpreterHookRegistry(hookRegistry);
|
||||
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
|
||||
interpreterGroup.setResourcePool(resourcePool);
|
||||
|
||||
|
|
@ -383,10 +388,53 @@ public class RemoteInterpreterServer
|
|||
return infos;
|
||||
}
|
||||
|
||||
private void processInterpreterHooks(final String noteId) {
|
||||
InterpreterHookListener hookListener = new InterpreterHookListener() {
|
||||
@Override
|
||||
public void onPreExecute(String script) {
|
||||
String cmdDev = interpreter.getHook(noteId, HookType.PRE_EXEC_DEV);
|
||||
String cmdUser = interpreter.getHook(noteId, HookType.PRE_EXEC);
|
||||
|
||||
// User defined hook should be executed before dev hook
|
||||
List<String> cmds = Arrays.asList(cmdDev, cmdUser);
|
||||
for (String cmd : cmds) {
|
||||
if (cmd != null) {
|
||||
script = cmd + '\n' + script;
|
||||
}
|
||||
}
|
||||
|
||||
InterpretJob.this.script = script;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPostExecute(String script) {
|
||||
String cmdDev = interpreter.getHook(noteId, HookType.POST_EXEC_DEV);
|
||||
String cmdUser = interpreter.getHook(noteId, HookType.POST_EXEC);
|
||||
|
||||
// User defined hook should be executed after dev hook
|
||||
List<String> cmds = Arrays.asList(cmdUser, cmdDev);
|
||||
for (String cmd : cmds) {
|
||||
if (cmd != null) {
|
||||
script += '\n' + cmd;
|
||||
}
|
||||
}
|
||||
|
||||
InterpretJob.this.script = script;
|
||||
}
|
||||
};
|
||||
hookListener.onPreExecute(script);
|
||||
hookListener.onPostExecute(script);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Object jobRun() throws Throwable {
|
||||
try {
|
||||
InterpreterContext.set(context);
|
||||
|
||||
// Add hooks to script from registry.
|
||||
// Global scope first, followed by notebook scope
|
||||
processInterpreterHooks(null);
|
||||
processInterpreterHooks(context.getNoteId());
|
||||
InterpreterResult result = interpreter.interpret(script, context);
|
||||
|
||||
// data from context.out is prepended to InterpreterResult if both defined
|
||||
|
|
|
|||
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
public class InterpreterHookRegistryTest {
|
||||
|
||||
@Test
|
||||
public void testBasic() {
|
||||
final String PRE_EXEC = InterpreterHookRegistry.HookType.PRE_EXEC;
|
||||
final String POST_EXEC = InterpreterHookRegistry.HookType.POST_EXEC;
|
||||
final String PRE_EXEC_DEV = InterpreterHookRegistry.HookType.PRE_EXEC_DEV;
|
||||
final String POST_EXEC_DEV = InterpreterHookRegistry.HookType.POST_EXEC_DEV;
|
||||
final String GLOBAL_KEY = InterpreterHookRegistry.GLOBAL_KEY;
|
||||
final String noteId = "note";
|
||||
final String className = "class";
|
||||
final String preExecHook = "pre";
|
||||
final String postExecHook = "post";
|
||||
InterpreterHookRegistry registry = new InterpreterHookRegistry("intpId");
|
||||
|
||||
// Test register()
|
||||
registry.register(noteId, className, PRE_EXEC, preExecHook);
|
||||
registry.register(noteId, className, POST_EXEC, postExecHook);
|
||||
registry.register(noteId, className, PRE_EXEC_DEV, preExecHook);
|
||||
registry.register(noteId, className, POST_EXEC_DEV, postExecHook);
|
||||
|
||||
// Test get()
|
||||
assertEquals(registry.get(noteId, className, PRE_EXEC), preExecHook);
|
||||
assertEquals(registry.get(noteId, className, POST_EXEC), postExecHook);
|
||||
assertEquals(registry.get(noteId, className, PRE_EXEC_DEV), preExecHook);
|
||||
assertEquals(registry.get(noteId, className, POST_EXEC_DEV), postExecHook);
|
||||
|
||||
// Test Unregister
|
||||
registry.unregister(noteId, className, PRE_EXEC);
|
||||
registry.unregister(noteId, className, POST_EXEC);
|
||||
registry.unregister(noteId, className, PRE_EXEC_DEV);
|
||||
registry.unregister(noteId, className, POST_EXEC_DEV);
|
||||
assertNull(registry.get(noteId, className, PRE_EXEC));
|
||||
assertNull(registry.get(noteId, className, POST_EXEC));
|
||||
assertNull(registry.get(noteId, className, PRE_EXEC_DEV));
|
||||
assertNull(registry.get(noteId, className, POST_EXEC_DEV));
|
||||
|
||||
// Test Global Scope
|
||||
registry.register(null, className, PRE_EXEC, preExecHook);
|
||||
assertEquals(registry.get(GLOBAL_KEY, className, PRE_EXEC), preExecHook);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testValidEventCode() {
|
||||
InterpreterHookRegistry registry = new InterpreterHookRegistry("intpId");
|
||||
|
||||
// Test that only valid event codes ("pre_exec", "post_exec") are accepted
|
||||
registry.register("foo", "bar", "baz", "whatever");
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue