mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
fix the pid of interpreter process id
This commit is contained in:
parent
02b118f4f9
commit
575b7b9649
24 changed files with 387 additions and 45 deletions
|
|
@ -220,8 +220,8 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUB
|
|||
fi
|
||||
|
||||
eval $INTERPRETER_RUN_COMMAND &
|
||||
|
||||
pid=$!
|
||||
|
||||
if [[ -z "${pid}" ]]; then
|
||||
exit 1;
|
||||
else
|
||||
|
|
|
|||
47
bin/stop-interpreter.sh
Executable file
47
bin/stop-interpreter.sh
Executable file
|
|
@ -0,0 +1,47 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# Run Zeppelin
|
||||
#
|
||||
|
||||
bin=$(dirname "${BASH_SOURCE-$0}")
|
||||
bin=$(cd "${bin}">/dev/null; pwd)
|
||||
|
||||
. "${bin}/common.sh"
|
||||
|
||||
export ZEPPELIN_FORCE_STOP=1
|
||||
|
||||
ZEPPELIN_STOP_INTERPRETER_MAIN=org.apache.zeppelin.interpreter.recovery.StopInterpreter
|
||||
ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/stop-interpreter.log"
|
||||
JAVA_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
|
||||
|
||||
if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/classes" ]]; then
|
||||
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/classes"
|
||||
fi
|
||||
|
||||
if [[ -d "${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes" ]]; then
|
||||
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes"
|
||||
fi
|
||||
|
||||
addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
|
||||
addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib"
|
||||
addJarInDir "${ZEPPELIN_HOME}/lib"
|
||||
addJarInDir "${ZEPPELIN_HOME}/lib/interpreter"
|
||||
|
||||
CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
|
||||
$ZEPPELIN_RUNNER $JAVA_OPTS -cp $CLASSPATH $ZEPPELIN_STOP_INTERPRETER_MAIN ${@}
|
||||
|
|
@ -217,18 +217,6 @@ function stop() {
|
|||
action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}"
|
||||
fi
|
||||
fi
|
||||
|
||||
# list all pid that used in remote interpreter and kill them
|
||||
for f in ${ZEPPELIN_PID_DIR}/*.pid; do
|
||||
if [[ ! -f ${f} ]]; then
|
||||
continue;
|
||||
fi
|
||||
|
||||
pid=$(cat ${f})
|
||||
wait_for_zeppelin_to_die $pid 20
|
||||
$(rm -f ${f})
|
||||
done
|
||||
|
||||
}
|
||||
|
||||
function find_zeppelin_process() {
|
||||
|
|
|
|||
|
|
@ -480,4 +480,45 @@
|
|||
<value>10000:10010</value>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.class</name>
|
||||
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
|
||||
<value>6000</value>
|
||||
<description>Check interval of interpreter expiration in seconds</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name>
|
||||
<value>3600000</value>
|
||||
<description>Threshold of interpreter idle time in seconds, interpeter exceed this threshold will be killed</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.recovery.storage.class</name>
|
||||
<value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value>
|
||||
<description>ReoveryStorage implementation</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.recovery.dir</name>
|
||||
<value>recovery</value>
|
||||
<description>Location where recovery metadata is stored</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
|
||||
</configuration>
|
||||
|
|
|
|||
|
|
@ -144,3 +144,11 @@ So users needs to understand the ([interpreter mode setting ](../usage/interpret
|
|||
In this scenario, user need to put `ConfInterpreter` as the first paragraph as the below example. Otherwise the customized setting can not be applied (Actually it would report ERROR)
|
||||
<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/screenshots/conf_interpreter.png" width="500px">
|
||||
|
||||
|
||||
## Interpreter Process Recovery
|
||||
|
||||
Before 0.8.0, shutting down Zeppelin also mean to shutdown all the running interpreter processes. Usually admin will shutdown zeppelin server for maintenance or upgrade, but don't want to shut down the running interpreter process.
|
||||
In such cases, interpreter process recovery is necessary. Starting from 0.8.0, user can enable interpreter process recovering via setting `zeppelin.recovery.storage.class` as
|
||||
`org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage` or other implementations if available in future, by default it is `org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage`
|
||||
which means recovery is not enabled. Enable recover means shutting down zeppelin would not terminating interpreter process,
|
||||
and when zeppelin is restarted, it would try to reconnect to the existing running interpreter process. If you want to kill all the interpreter process after terminating zeppelin even when recovery is enabled, you can run `bin/stop-interpreter.sh`
|
||||
|
|
|
|||
|
|
@ -359,8 +359,13 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getRelativeDir(ConfVars.ZEPPELIN_RECOVERY_DIR);
|
||||
}
|
||||
|
||||
public String getRecoveryStorageClass() {
|
||||
return getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS);
|
||||
}
|
||||
|
||||
public boolean isRecoveryEnabled() {
|
||||
return getBoolean(ConfVars.ZEPPELIN_RECOVERY_ENABLED);
|
||||
return !getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS).equals(
|
||||
"org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage");
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
|
|
@ -667,7 +672,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"),
|
||||
ZEPPELIN_RECOVERY_ENABLED("zeppelin.recovery.enabled", "false"),
|
||||
ZEPPELIN_RECOVERY_STORAGE_CLASS("zeppelin.recovery.storage.class",
|
||||
"org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"),
|
||||
|
||||
// use specified notebook (id) as homescreen
|
||||
ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ import java.util.Map;
|
|||
|
||||
|
||||
/**
|
||||
* Interface for Interpreter Process Recovery.
|
||||
* Interface for storing interpreter process recovery metadata.
|
||||
*
|
||||
*/
|
||||
public abstract class RecoveryStorage {
|
||||
|
|
@ -53,7 +53,7 @@ public abstract class RecoveryStorage {
|
|||
|
||||
/**
|
||||
*
|
||||
* It is only called one time when Zeppelin Server is started.
|
||||
* It is only called when Zeppelin Server is started.
|
||||
*
|
||||
* @return
|
||||
* @throws IOException
|
||||
|
|
|
|||
BIN
zeppelin-server/notebook/.python.recovery.crc
Normal file
BIN
zeppelin-server/notebook/.python.recovery.crc
Normal file
Binary file not shown.
1
zeppelin-server/notebook/python.recovery
Normal file
1
zeppelin-server/notebook/python.recovery
Normal file
|
|
@ -0,0 +1 @@
|
|||
2CZA1DVUG:shared_process 192.168.3.2:55410
|
||||
|
|
@ -349,6 +349,21 @@
|
|||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>${plugin.surefire.version}</version>
|
||||
<configuration combine.children="append">
|
||||
<argLine>-Xmx2g -Xms1g -Dfile.encoding=UTF-8</argLine>
|
||||
<excludes>
|
||||
<exclude>${tests.to.exclude}</exclude>
|
||||
</excludes>
|
||||
<environmentVariables>
|
||||
<ZEPPELIN_FORCE_STOP>1</ZEPPELIN_FORCE_STOP>
|
||||
</environmentVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
|
||||
<plugin>
|
||||
<groupId>org.scala-tools</groupId>
|
||||
<artifactId>maven-scala-plugin</artifactId>
|
||||
|
|
|
|||
|
|
@ -199,6 +199,9 @@ public class ZeppelinServer extends Application {
|
|||
LOG.info("Shutting down Zeppelin Server ... ");
|
||||
try {
|
||||
jettyWebServer.stop();
|
||||
if (!conf.isRecoveryEnabled()) {
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().close();
|
||||
}
|
||||
notebook.close();
|
||||
Thread.sleep(3000);
|
||||
} catch (Exception e) {
|
||||
|
|
|
|||
|
|
@ -17,11 +17,15 @@
|
|||
|
||||
package org.apache.zeppelin.recovery;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.httpclient.methods.PostMethod;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.recovery.StopInterpreter;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.rest.AbstractTestRestApi;
|
||||
|
|
@ -32,6 +36,7 @@ import org.junit.AfterClass;
|
|||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -39,17 +44,22 @@ import static org.junit.Assert.assertThat;
|
|||
|
||||
public class RecoveryTest extends AbstractTestRestApi {
|
||||
|
||||
Gson gson = new Gson();
|
||||
private Gson gson = new Gson();
|
||||
private static File recoveryDir = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_ENABLED.getVarName(), "true");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
|
||||
FileSystemRecoveryStorage.class.getName());
|
||||
recoveryDir = Files.createTempDir();
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath());
|
||||
startUp(RecoveryTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destroy() throws Exception {
|
||||
shutDown();
|
||||
FileUtils.deleteDirectory(recoveryDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -117,4 +127,36 @@ public class RecoveryTest extends AbstractTestRestApi {
|
|||
post.releaseConnection();
|
||||
assertEquals(Job.Status.ERROR, p1.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecovery_3() throws Exception {
|
||||
Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
|
||||
|
||||
// run python interpreter and create new variable `user`
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("%python user='abc'");
|
||||
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertThat(post, isAllowed());
|
||||
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.FINISHED, p1.getStatus());
|
||||
|
||||
// shutdown zeppelin and restart it
|
||||
shutDown();
|
||||
StopInterpreter.main(new String[]{});
|
||||
|
||||
startUp(RecoveryTest.class.getSimpleName());
|
||||
|
||||
// run the paragraph again, but change the text to print variable `user`.
|
||||
// can not recover the python interpreter, because it has been shutdown.
|
||||
note1 = ZeppelinServer.notebook.getNote(note1.getId());
|
||||
p1 = note1.getParagraph(p1.getId());
|
||||
p1.setText("%python print(user)");
|
||||
post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.ERROR, p1.getStatus());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -273,7 +273,7 @@ public class InterpreterSetting {
|
|||
}
|
||||
if (this.recoveryStorage == null) {
|
||||
try {
|
||||
this.recoveryStorage = new NullRecoveryStorage(conf);
|
||||
this.recoveryStorage = new NullRecoveryStorage(conf, interpreterSettingManager);
|
||||
} catch (IOException e) {
|
||||
// ignore this exception as NullRecoveryStorage will do nothing.
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
|||
import org.apache.zeppelin.resource.Resource;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
import org.apache.zeppelin.util.ReflectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.sonatype.aether.repository.Authentication;
|
||||
|
|
@ -158,20 +159,17 @@ public class InterpreterSettingManager {
|
|||
this.angularObjectRegistryListener = angularObjectRegistryListener;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.appEventListener = appEventListener;
|
||||
if (conf.isRecoveryEnabled()) {
|
||||
this.recoveryStorage = new FileSystemRecoveryStorage(conf, this);
|
||||
} else {
|
||||
this.recoveryStorage = new NullRecoveryStorage(conf);
|
||||
}
|
||||
this.recoveryStorage.init();
|
||||
|
||||
try {
|
||||
this.lifecycleManager = (LifecycleManager)
|
||||
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
|
||||
.newInstance(conf);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Fail to create LifecycleManager", e);
|
||||
}
|
||||
this.recoveryStorage = ReflectionUtils.createClazzInstance(conf.getRecoveryStorageClass(),
|
||||
new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class},
|
||||
new Object[] {conf, this});
|
||||
this.recoveryStorage.init();
|
||||
LOGGER.info("Using RecoveryStorage: " + this.recoveryStorage.getClass().getName());
|
||||
|
||||
this.lifecycleManager = ReflectionUtils.createClazzInstance(conf.getLifecycleManagerClass(),
|
||||
new Class[] {ZeppelinConfiguration.class},
|
||||
new Object[] {conf});
|
||||
LOGGER.info("Using LifecycleManager: " + this.lifecycleManager.getClass().getName());
|
||||
|
||||
init();
|
||||
}
|
||||
|
|
@ -319,8 +317,16 @@ public class InterpreterSettingManager {
|
|||
saveToFile();
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcessListener getRemoteInterpreterProcessListener() {
|
||||
return remoteInterpreterProcessListener;
|
||||
}
|
||||
|
||||
public ApplicationEventListener getAppEventListener() {
|
||||
return appEventListener;
|
||||
}
|
||||
|
||||
private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
|
||||
String interpreterJson) throws IOException {
|
||||
String interpreterJson) throws IOException {
|
||||
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
|
||||
ClassLoader tempClassLoader = new URLClassLoader(urls, null);
|
||||
|
||||
|
|
|
|||
|
|
@ -67,11 +67,11 @@ public class ShellScriptLauncher extends InterpreterLauncher {
|
|||
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
|
||||
if (recoveredClient != null) {
|
||||
if (recoveredClient.isRunning()) {
|
||||
LOGGER.info("Recover InterpreterProcess: " + recoveredClient.getHost() + ":" +
|
||||
LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" +
|
||||
recoveredClient.getPort());
|
||||
return recoveredClient;
|
||||
} else {
|
||||
LOGGER.warn("Cannot recovery interpreter process: " + recoveredClient.getHost() + ":"
|
||||
LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":"
|
||||
+ recoveredClient.getPort() + ", as it is already terminated.");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting;
|
|||
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
|
||||
import org.apache.zeppelin.notebook.FileSystemStorage;
|
||||
|
|
@ -69,7 +70,7 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
|
|||
this.interpreterSettingManager = interpreterSettingManager;
|
||||
this.zConf = zConf;
|
||||
this.fs = FileSystemStorage.get(zConf);
|
||||
this.recoveryDir = this.fs.makeQualified(new Path(zConf.getNotebookDir()));
|
||||
this.recoveryDir = this.fs.makeQualified(new Path(zConf.getRecoveryDir()));
|
||||
LOGGER.info("Using folder {} to store recovery data", recoveryDir);
|
||||
this.fs.tryMkDir(recoveryDir);
|
||||
}
|
||||
|
|
@ -118,9 +119,17 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
|
|||
String[] hostPort = tokens[1].split(":");
|
||||
int connectTimeout =
|
||||
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
InterpreterClient client = new RemoteInterpreterRunningProcess(interpreterSettingName,
|
||||
connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
|
||||
RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess(
|
||||
interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
|
||||
// interpreterSettingManager may be null when this class is used when it is used
|
||||
// stop-interpreter.sh
|
||||
if (interpreterSettingManager != null) {
|
||||
client.setRemoteInterpreterEventPoller(new RemoteInterpreterEventPoller(
|
||||
interpreterSettingManager.getRemoteInterpreterProcessListener(),
|
||||
interpreterSettingManager.getAppEventListener()));
|
||||
}
|
||||
clients.put(groupId, client);
|
||||
LOGGER.info("Recovering Interpreter Process: " + hostPort[0] + ":" + hostPort[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -30,7 +31,9 @@ import java.util.Map;
|
|||
*/
|
||||
public class NullRecoveryStorage extends RecoveryStorage {
|
||||
|
||||
public NullRecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
|
||||
public NullRecoveryStorage(ZeppelinConfiguration zConf,
|
||||
InterpreterSettingManager interpreterSettingManager)
|
||||
throws IOException {
|
||||
super(zConf);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,40 @@
|
|||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
import org.apache.zeppelin.util.ReflectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* Utility class for stopping interpreter in the case that you want to stop all the
|
||||
* interpreter process even when you enable recovery, or you want to kill interpreter process
|
||||
* to avoid orphan process.
|
||||
*/
|
||||
public class StopInterpreter {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(StopInterpreter.class);
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
|
||||
RecoveryStorage recoveryStorage = null;
|
||||
|
||||
recoveryStorage = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(),
|
||||
new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class},
|
||||
new Object[] {zConf, null});
|
||||
|
||||
LOGGER.info("Using RecoveryStorage: " + recoveryStorage.getClass().getName());
|
||||
Map<String, InterpreterClient> restoredClients = recoveryStorage.restore();
|
||||
if (restoredClients != null) {
|
||||
for (InterpreterClient client : restoredClients.values()) {
|
||||
LOGGER.info("Stop Interpreter Process: " + client.getHost() + ":" + client.getPort());
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -214,7 +214,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
callbackServer.stop();
|
||||
}
|
||||
if (isRunning()) {
|
||||
logger.info("kill interpreter process");
|
||||
logger.info("Kill interpreter process");
|
||||
try {
|
||||
callRemoteFunction(new RemoteFunction<Void>() {
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -63,7 +64,24 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
|
||||
@Override
|
||||
public void stop() {
|
||||
// assume process is externally managed. nothing to do
|
||||
// assume process is externally managed. nothing to do. But will kill it
|
||||
// when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that.
|
||||
if (System.getenv("ZEPPELIN_FORCE_STOP") != null) {
|
||||
if (isRunning()) {
|
||||
logger.info("Kill interpreter process");
|
||||
try {
|
||||
callRemoteFunction(new RemoteFunction<Void>() {
|
||||
@Override
|
||||
public Void call(RemoteInterpreterService.Client client) throws Exception {
|
||||
client.shutdown();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.warn("ignore the exception when shutting down");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
|
|
@ -39,6 +40,7 @@ public class FileSystemStorage {
|
|||
private FileSystemStorage(ZeppelinConfiguration zConf) throws IOException {
|
||||
this.zConf = zConf;
|
||||
this.hadoopConf = new Configuration();
|
||||
this.hadoopConf.set("fs.file.impl", RawLocalFileSystem.class.getName());
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
|
||||
if (isSecurityEnabled) {
|
||||
|
|
@ -54,7 +56,7 @@ public class FileSystemStorage {
|
|||
}
|
||||
|
||||
try {
|
||||
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new Configuration());
|
||||
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), this.hadoopConf);
|
||||
LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
|
||||
/**
|
||||
* Utility class for creating instances via java reflection.
|
||||
*
|
||||
*/
|
||||
public class ReflectionUtils {
|
||||
|
||||
public static Class<?> getClazz(String className) throws IOException {
|
||||
Class clazz = null;
|
||||
try {
|
||||
clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException("Unable to load class: " + className, e);
|
||||
}
|
||||
|
||||
return clazz;
|
||||
}
|
||||
|
||||
private static <T> T getNewInstance(Class<T> clazz) throws IOException {
|
||||
T instance;
|
||||
try {
|
||||
instance = clazz.newInstance();
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
private static <T> T getNewInstance(Class<T> clazz,
|
||||
Class<?>[] parameterTypes,
|
||||
Object[] parameters)
|
||||
throws IOException {
|
||||
T instance;
|
||||
try {
|
||||
Constructor<T> constructor = clazz.getConstructor(parameterTypes);
|
||||
instance = constructor.newInstance(parameters);
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with " + parameters.length + " arguments: " +
|
||||
clazz.getName(), e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with " + parameters.length + " arguments: " +
|
||||
clazz.getName(), e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with " + parameters.length + " arguments: " +
|
||||
clazz.getName(), e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with " + parameters.length + " arguments: " +
|
||||
clazz.getName(), e);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static <T> T createClazzInstance(String className) throws IOException {
|
||||
Class<?> clazz = getClazz(className);
|
||||
@SuppressWarnings("unchecked")
|
||||
T instance = (T) getNewInstance(clazz);
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static <T> T createClazzInstance(String className,
|
||||
Class<?>[] parameterTypes,
|
||||
Object[] parameters) throws IOException {
|
||||
Class<?> clazz = getClazz(className);
|
||||
T instance = (T) getNewInstance(clazz, parameterTypes, parameters);
|
||||
return instance;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -33,7 +33,7 @@ public abstract class AbstractInterpreterTest {
|
|||
protected File interpreterDir;
|
||||
protected File confDir;
|
||||
protected File notebookDir;
|
||||
protected ZeppelinConfiguration conf = new ZeppelinConfiguration();
|
||||
protected ZeppelinConfiguration conf;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
|
||||
|
|
@ -14,6 +16,7 @@ import org.apache.zeppelin.user.AuthenticationInfo;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -22,12 +25,23 @@ import static org.junit.Assert.assertEquals;
|
|||
|
||||
public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
|
||||
|
||||
private File recoveryDir = null;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_ENABLED.getVarName(), "true");
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
|
||||
FileSystemRecoveryStorage.class.getName());
|
||||
recoveryDir = Files.createTempDir();
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath());
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
FileUtils.deleteDirectory(recoveryDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleInterpreterProcess() throws InterpreterException, IOException {
|
||||
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
|
||||
|
|
|
|||
Loading…
Reference in a new issue