fix the pid of interpreter process id

This commit is contained in:
Jeff Zhang 2017-11-29 11:18:43 +08:00
parent 02b118f4f9
commit 575b7b9649
24 changed files with 387 additions and 45 deletions

View file

@ -220,8 +220,8 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUB
fi
eval $INTERPRETER_RUN_COMMAND &
pid=$!
if [[ -z "${pid}" ]]; then
exit 1;
else

47
bin/stop-interpreter.sh Executable file
View file

@ -0,0 +1,47 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Run Zeppelin
#
bin=$(dirname "${BASH_SOURCE-$0}")
bin=$(cd "${bin}">/dev/null; pwd)
. "${bin}/common.sh"
export ZEPPELIN_FORCE_STOP=1
ZEPPELIN_STOP_INTERPRETER_MAIN=org.apache.zeppelin.interpreter.recovery.StopInterpreter
ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/stop-interpreter.log"
JAVA_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/classes" ]]; then
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/classes"
fi
if [[ -d "${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes" ]]; then
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes"
fi
addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib"
addJarInDir "${ZEPPELIN_HOME}/lib"
addJarInDir "${ZEPPELIN_HOME}/lib/interpreter"
CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
$ZEPPELIN_RUNNER $JAVA_OPTS -cp $CLASSPATH $ZEPPELIN_STOP_INTERPRETER_MAIN ${@}

View file

@ -217,18 +217,6 @@ function stop() {
action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}"
fi
fi
# list all pid that used in remote interpreter and kill them
for f in ${ZEPPELIN_PID_DIR}/*.pid; do
if [[ ! -f ${f} ]]; then
continue;
fi
pid=$(cat ${f})
wait_for_zeppelin_to_die $pid 20
$(rm -f ${f})
done
}
function find_zeppelin_process() {

View file

@ -480,4 +480,45 @@
<value>10000:10010</value>
</property>
-->
<!--
<property>
<name>zeppelin.interpreter.lifecyclemanager.class</name>
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
</property>
-->
<!--
<property>
<name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
<value>6000</value>
<description>Check interval of interpreter expiration in seconds</description>
</property>
-->
<!--
<property>
<name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name>
<value>3600000</value>
<description>Threshold of interpreter idle time in seconds, interpeter exceed this threshold will be killed</description>
</property>
-->
<!--
<property>
<name>zeppelin.recovery.storage.class</name>
<value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value>
<description>ReoveryStorage implementation</description>
</property>
-->
<!--
<property>
<name>zeppelin.recovery.dir</name>
<value>recovery</value>
<description>Location where recovery metadata is stored</description>
</property>
-->
</configuration>

View file

@ -144,3 +144,11 @@ So users needs to understand the ([interpreter mode setting ](../usage/interpret
In this scenario, user need to put `ConfInterpreter` as the first paragraph as the below example. Otherwise the customized setting can not be applied (Actually it would report ERROR)
<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/screenshots/conf_interpreter.png" width="500px">
## Interpreter Process Recovery
Before 0.8.0, shutting down Zeppelin also mean to shutdown all the running interpreter processes. Usually admin will shutdown zeppelin server for maintenance or upgrade, but don't want to shut down the running interpreter process.
In such cases, interpreter process recovery is necessary. Starting from 0.8.0, user can enable interpreter process recovering via setting `zeppelin.recovery.storage.class` as
`org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage` or other implementations if available in future, by default it is `org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage`
which means recovery is not enabled. Enable recover means shutting down zeppelin would not terminating interpreter process,
and when zeppelin is restarted, it would try to reconnect to the existing running interpreter process. If you want to kill all the interpreter process after terminating zeppelin even when recovery is enabled, you can run `bin/stop-interpreter.sh`

View file

@ -359,8 +359,13 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getRelativeDir(ConfVars.ZEPPELIN_RECOVERY_DIR);
}
public String getRecoveryStorageClass() {
return getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS);
}
public boolean isRecoveryEnabled() {
return getBoolean(ConfVars.ZEPPELIN_RECOVERY_ENABLED);
return !getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS).equals(
"org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage");
}
public String getUser() {
@ -667,7 +672,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"),
ZEPPELIN_RECOVERY_ENABLED("zeppelin.recovery.enabled", "false"),
ZEPPELIN_RECOVERY_STORAGE_CLASS("zeppelin.recovery.storage.class",
"org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"),
// use specified notebook (id) as homescreen
ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),

View file

@ -25,7 +25,7 @@ import java.util.Map;
/**
* Interface for Interpreter Process Recovery.
* Interface for storing interpreter process recovery metadata.
*
*/
public abstract class RecoveryStorage {
@ -53,7 +53,7 @@ public abstract class RecoveryStorage {
/**
*
* It is only called one time when Zeppelin Server is started.
* It is only called when Zeppelin Server is started.
*
* @return
* @throws IOException

Binary file not shown.

View file

@ -0,0 +1 @@
2CZA1DVUG:shared_process 192.168.3.2:55410

View file

@ -349,6 +349,21 @@
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>${plugin.surefire.version}</version>
<configuration combine.children="append">
<argLine>-Xmx2g -Xms1g -Dfile.encoding=UTF-8</argLine>
<excludes>
<exclude>${tests.to.exclude}</exclude>
</excludes>
<environmentVariables>
<ZEPPELIN_FORCE_STOP>1</ZEPPELIN_FORCE_STOP>
</environmentVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>

View file

@ -199,6 +199,9 @@ public class ZeppelinServer extends Application {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyWebServer.stop();
if (!conf.isRecoveryEnabled()) {
ZeppelinServer.notebook.getInterpreterSettingManager().close();
}
notebook.close();
Thread.sleep(3000);
} catch (Exception e) {

View file

@ -17,11 +17,15 @@
package org.apache.zeppelin.recovery;
import com.google.common.io.Files;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
import org.apache.zeppelin.interpreter.recovery.StopInterpreter;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.rest.AbstractTestRestApi;
@ -32,6 +36,7 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.Map;
import static org.junit.Assert.assertEquals;
@ -39,17 +44,22 @@ import static org.junit.Assert.assertThat;
public class RecoveryTest extends AbstractTestRestApi {
Gson gson = new Gson();
private Gson gson = new Gson();
private static File recoveryDir = null;
@BeforeClass
public static void init() throws Exception {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_ENABLED.getVarName(), "true");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
FileSystemRecoveryStorage.class.getName());
recoveryDir = Files.createTempDir();
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath());
startUp(RecoveryTest.class.getSimpleName());
}
@AfterClass
public static void destroy() throws Exception {
shutDown();
FileUtils.deleteDirectory(recoveryDir);
}
@Test
@ -117,4 +127,36 @@ public class RecoveryTest extends AbstractTestRestApi {
post.releaseConnection();
assertEquals(Job.Status.ERROR, p1.getStatus());
}
@Test
public void testRecovery_3() throws Exception {
Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
// run python interpreter and create new variable `user`
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("%python user='abc'");
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
assertThat(post, isAllowed());
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
}.getType());
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
assertEquals(Job.Status.FINISHED, p1.getStatus());
// shutdown zeppelin and restart it
shutDown();
StopInterpreter.main(new String[]{});
startUp(RecoveryTest.class.getSimpleName());
// run the paragraph again, but change the text to print variable `user`.
// can not recover the python interpreter, because it has been shutdown.
note1 = ZeppelinServer.notebook.getNote(note1.getId());
p1 = note1.getParagraph(p1.getId());
p1.setText("%python print(user)");
post = httpPost("/notebook/job/" + note1.getId(), "");
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
assertEquals(Job.Status.ERROR, p1.getStatus());
}
}

View file

@ -273,7 +273,7 @@ public class InterpreterSetting {
}
if (this.recoveryStorage == null) {
try {
this.recoveryStorage = new NullRecoveryStorage(conf);
this.recoveryStorage = new NullRecoveryStorage(conf, interpreterSettingManager);
} catch (IOException e) {
// ignore this exception as NullRecoveryStorage will do nothing.
}

View file

@ -43,6 +43,7 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourceSet;
import org.apache.zeppelin.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sonatype.aether.repository.Authentication;
@ -158,20 +159,17 @@ public class InterpreterSettingManager {
this.angularObjectRegistryListener = angularObjectRegistryListener;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.appEventListener = appEventListener;
if (conf.isRecoveryEnabled()) {
this.recoveryStorage = new FileSystemRecoveryStorage(conf, this);
} else {
this.recoveryStorage = new NullRecoveryStorage(conf);
}
this.recoveryStorage.init();
try {
this.lifecycleManager = (LifecycleManager)
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
.newInstance(conf);
} catch (Exception e) {
throw new IOException("Fail to create LifecycleManager", e);
}
this.recoveryStorage = ReflectionUtils.createClazzInstance(conf.getRecoveryStorageClass(),
new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class},
new Object[] {conf, this});
this.recoveryStorage.init();
LOGGER.info("Using RecoveryStorage: " + this.recoveryStorage.getClass().getName());
this.lifecycleManager = ReflectionUtils.createClazzInstance(conf.getLifecycleManagerClass(),
new Class[] {ZeppelinConfiguration.class},
new Object[] {conf});
LOGGER.info("Using LifecycleManager: " + this.lifecycleManager.getClass().getName());
init();
}
@ -319,8 +317,16 @@ public class InterpreterSettingManager {
saveToFile();
}
public RemoteInterpreterProcessListener getRemoteInterpreterProcessListener() {
return remoteInterpreterProcessListener;
}
public ApplicationEventListener getAppEventListener() {
return appEventListener;
}
private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
String interpreterJson) throws IOException {
String interpreterJson) throws IOException {
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
ClassLoader tempClassLoader = new URLClassLoader(urls, null);

View file

@ -67,11 +67,11 @@ public class ShellScriptLauncher extends InterpreterLauncher {
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
if (recoveredClient != null) {
if (recoveredClient.isRunning()) {
LOGGER.info("Recover InterpreterProcess: " + recoveredClient.getHost() + ":" +
LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" +
recoveredClient.getPort());
return recoveredClient;
} else {
LOGGER.warn("Cannot recovery interpreter process: " + recoveredClient.getHost() + ":"
LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":"
+ recoveredClient.getPort() + ", as it is already terminated.");
}
}

View file

@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
import org.apache.zeppelin.notebook.FileSystemStorage;
@ -69,7 +70,7 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
this.interpreterSettingManager = interpreterSettingManager;
this.zConf = zConf;
this.fs = FileSystemStorage.get(zConf);
this.recoveryDir = this.fs.makeQualified(new Path(zConf.getNotebookDir()));
this.recoveryDir = this.fs.makeQualified(new Path(zConf.getRecoveryDir()));
LOGGER.info("Using folder {} to store recovery data", recoveryDir);
this.fs.tryMkDir(recoveryDir);
}
@ -118,9 +119,17 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
String[] hostPort = tokens[1].split(":");
int connectTimeout =
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
InterpreterClient client = new RemoteInterpreterRunningProcess(interpreterSettingName,
connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess(
interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
// interpreterSettingManager may be null when this class is used when it is used
// stop-interpreter.sh
if (interpreterSettingManager != null) {
client.setRemoteInterpreterEventPoller(new RemoteInterpreterEventPoller(
interpreterSettingManager.getRemoteInterpreterProcessListener(),
interpreterSettingManager.getAppEventListener()));
}
clients.put(groupId, client);
LOGGER.info("Recovering Interpreter Process: " + hostPort[0] + ":" + hostPort[1]);
}
}
}

View file

@ -18,6 +18,7 @@
package org.apache.zeppelin.interpreter.recovery;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import java.io.IOException;
@ -30,7 +31,9 @@ import java.util.Map;
*/
public class NullRecoveryStorage extends RecoveryStorage {
public NullRecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
public NullRecoveryStorage(ZeppelinConfiguration zConf,
InterpreterSettingManager interpreterSettingManager)
throws IOException {
super(zConf);
}

View file

@ -0,0 +1,40 @@
package org.apache.zeppelin.interpreter.recovery;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import org.apache.zeppelin.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
/**
* Utility class for stopping interpreter in the case that you want to stop all the
* interpreter process even when you enable recovery, or you want to kill interpreter process
* to avoid orphan process.
*/
public class StopInterpreter {
private static Logger LOGGER = LoggerFactory.getLogger(StopInterpreter.class);
public static void main(String[] args) throws IOException {
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
RecoveryStorage recoveryStorage = null;
recoveryStorage = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(),
new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class},
new Object[] {zConf, null});
LOGGER.info("Using RecoveryStorage: " + recoveryStorage.getClass().getName());
Map<String, InterpreterClient> restoredClients = recoveryStorage.restore();
if (restoredClients != null) {
for (InterpreterClient client : restoredClients.values()) {
LOGGER.info("Stop Interpreter Process: " + client.getHost() + ":" + client.getPort());
client.stop();
}
}
}
}

View file

@ -214,7 +214,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
callbackServer.stop();
}
if (isRunning()) {
logger.info("kill interpreter process");
logger.info("Kill interpreter process");
try {
callRemoteFunction(new RemoteFunction<Void>() {
@Override

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter.remote;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -63,7 +64,24 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
@Override
public void stop() {
// assume process is externally managed. nothing to do
// assume process is externally managed. nothing to do. But will kill it
// when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that.
if (System.getenv("ZEPPELIN_FORCE_STOP") != null) {
if (isRunning()) {
logger.info("Kill interpreter process");
try {
callRemoteFunction(new RemoteFunction<Void>() {
@Override
public Void call(RemoteInterpreterService.Client client) throws Exception {
client.shutdown();
return null;
}
});
} catch (Exception e) {
logger.warn("ignore the exception when shutting down");
}
}
}
}
@Override

View file

@ -5,6 +5,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
@ -39,6 +40,7 @@ public class FileSystemStorage {
private FileSystemStorage(ZeppelinConfiguration zConf) throws IOException {
this.zConf = zConf;
this.hadoopConf = new Configuration();
this.hadoopConf.set("fs.file.impl", RawLocalFileSystem.class.getName());
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
if (isSecurityEnabled) {
@ -54,7 +56,7 @@ public class FileSystemStorage {
}
try {
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new Configuration());
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), this.hadoopConf);
LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName());
} catch (URISyntaxException e) {
throw new IOException(e);

View file

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.util;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
/**
* Utility class for creating instances via java reflection.
*
*/
public class ReflectionUtils {
public static Class<?> getClazz(String className) throws IOException {
Class clazz = null;
try {
clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("Unable to load class: " + className, e);
}
return clazz;
}
private static <T> T getNewInstance(Class<T> clazz) throws IOException {
T instance;
try {
instance = clazz.newInstance();
} catch (InstantiationException e) {
throw new IOException(
"Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
} catch (IllegalAccessException e) {
throw new IOException(
"Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
}
return instance;
}
private static <T> T getNewInstance(Class<T> clazz,
Class<?>[] parameterTypes,
Object[] parameters)
throws IOException {
T instance;
try {
Constructor<T> constructor = clazz.getConstructor(parameterTypes);
instance = constructor.newInstance(parameters);
} catch (InstantiationException e) {
throw new IOException(
"Unable to instantiate class with " + parameters.length + " arguments: " +
clazz.getName(), e);
} catch (IllegalAccessException e) {
throw new IOException(
"Unable to instantiate class with " + parameters.length + " arguments: " +
clazz.getName(), e);
} catch (NoSuchMethodException e) {
throw new IOException(
"Unable to instantiate class with " + parameters.length + " arguments: " +
clazz.getName(), e);
} catch (InvocationTargetException e) {
throw new IOException(
"Unable to instantiate class with " + parameters.length + " arguments: " +
clazz.getName(), e);
}
return instance;
}
public static <T> T createClazzInstance(String className) throws IOException {
Class<?> clazz = getClazz(className);
@SuppressWarnings("unchecked")
T instance = (T) getNewInstance(clazz);
return instance;
}
public static <T> T createClazzInstance(String className,
Class<?>[] parameterTypes,
Object[] parameters) throws IOException {
Class<?> clazz = getClazz(className);
T instance = (T) getNewInstance(clazz, parameterTypes, parameters);
return instance;
}
}

View file

@ -33,7 +33,7 @@ public abstract class AbstractInterpreterTest {
protected File interpreterDir;
protected File confDir;
protected File notebookDir;
protected ZeppelinConfiguration conf = new ZeppelinConfiguration();
protected ZeppelinConfiguration conf;
@Before
public void setUp() throws Exception {

View file

@ -1,5 +1,7 @@
package org.apache.zeppelin.interpreter.recovery;
import com.google.common.io.Files;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
@ -14,6 +16,7 @@ import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@ -22,12 +25,23 @@ import static org.junit.Assert.assertEquals;
public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
private File recoveryDir = null;
@Before
public void setUp() throws Exception {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_ENABLED.getVarName(), "true");
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
FileSystemRecoveryStorage.class.getName());
recoveryDir = Files.createTempDir();
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath());
super.setUp();
}
@Override
public void tearDown() throws Exception {
super.tearDown();
FileUtils.deleteDirectory(recoveryDir);
}
@Test
public void testSingleInterpreterProcess() throws InterpreterException, IOException {
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");