mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-3051. Support Interpreter Process Recovery
This commit is contained in:
parent
13f8e6cc65
commit
da7cbb90bc
25 changed files with 789 additions and 164 deletions
|
|
@ -61,7 +61,7 @@
|
|||
"description": "Spark master uri. ex) spark://masterhost:7077",
|
||||
"type": "string"
|
||||
},
|
||||
"zeppelin.spark.unSupportedVersionCheck": {
|
||||
"zeppelin.spark.enableSupportedVersionCheck": {
|
||||
"envName": null,
|
||||
"propertyName": "zeppelin.spark.enableSupportedVersionCheck",
|
||||
"defaultValue": true,
|
||||
|
|
|
|||
|
|
@ -355,6 +355,14 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
|
||||
}
|
||||
|
||||
public String getRecoveryDir() {
|
||||
return getRelativeDir(ConfVars.ZEPPELIN_RECOVERY_DIR);
|
||||
}
|
||||
|
||||
public boolean isRecoveryEnabled() {
|
||||
return getBoolean(ConfVars.ZEPPELIN_RECOVERY_ENABLED);
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER);
|
||||
}
|
||||
|
|
@ -658,6 +666,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"),
|
||||
ZEPPELIN_RECOVERY_ENABLED("zeppelin.recovery.enabled", "false"),
|
||||
|
||||
// use specified notebook (id) as homescreen
|
||||
ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),
|
||||
// whether homescreen notebook will be hidden from notebook list or not
|
||||
|
|
|
|||
|
|
@ -19,8 +19,20 @@ package org.apache.zeppelin.interpreter.launcher;
|
|||
|
||||
/**
|
||||
* Interface to InterpreterClient which is created by InterpreterLauncher. This is the component
|
||||
* that is used to for the communication fromzeppelin-server process to zeppelin interpreter process
|
||||
* that is used to for the communication from zeppelin-server process to zeppelin interpreter
|
||||
* process.
|
||||
*/
|
||||
public interface InterpreterClient {
|
||||
|
||||
String getInterpreterSettingName();
|
||||
|
||||
void start(String userName, Boolean isUserImpersonate);
|
||||
|
||||
void stop();
|
||||
|
||||
String getHost();
|
||||
|
||||
int getPort();
|
||||
|
||||
boolean isRunning();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ public class InterpreterLaunchContext {
|
|||
private Properties properties;
|
||||
private InterpreterOption option;
|
||||
private InterpreterRunner runner;
|
||||
private String interpreterGroupId;
|
||||
private String interpreterSettingId;
|
||||
private String interpreterSettingGroup;
|
||||
private String interpreterSettingName;
|
||||
|
|
@ -37,12 +38,14 @@ public class InterpreterLaunchContext {
|
|||
public InterpreterLaunchContext(Properties properties,
|
||||
InterpreterOption option,
|
||||
InterpreterRunner runner,
|
||||
String interpreterGroupId,
|
||||
String interpreterSettingId,
|
||||
String interpreterSettingGroup,
|
||||
String interpreterSettingName) {
|
||||
this.properties = properties;
|
||||
this.option = option;
|
||||
this.runner = runner;
|
||||
this.interpreterGroupId = interpreterGroupId;
|
||||
this.interpreterSettingId = interpreterSettingId;
|
||||
this.interpreterSettingGroup = interpreterSettingGroup;
|
||||
this.interpreterSettingName = interpreterSettingName;
|
||||
|
|
@ -60,6 +63,10 @@ public class InterpreterLaunchContext {
|
|||
return runner;
|
||||
}
|
||||
|
||||
public String getInterpreterGroupId() {
|
||||
return interpreterGroupId;
|
||||
}
|
||||
|
||||
public String getInterpreterSettingId() {
|
||||
return interpreterSettingId;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
|
@ -29,9 +30,11 @@ public abstract class InterpreterLauncher {
|
|||
|
||||
protected ZeppelinConfiguration zConf;
|
||||
protected Properties properties;
|
||||
protected RecoveryStorage recoveryStorage;
|
||||
|
||||
public InterpreterLauncher(ZeppelinConfiguration zConf) {
|
||||
public InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
|
||||
this.zConf = zConf;
|
||||
this.recoveryStorage = recoveryStorage;
|
||||
}
|
||||
|
||||
public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* Interface for Interpreter Process Recovery.
|
||||
*
|
||||
*/
|
||||
public abstract class RecoveryStorage {
|
||||
|
||||
/**
|
||||
* Update RecoveryStorage when new InterpreterClient is started
|
||||
* @param client
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract void onInterpreterClientStart(InterpreterClient client) throws IOException;
|
||||
|
||||
/**
|
||||
* Update RecoveryStorage when InterpreterClient is stopped
|
||||
* @param client
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract void onInterpreterClientStop(InterpreterClient client) throws IOException;
|
||||
|
||||
/**
|
||||
* Restore InterpreterClient when Zeppelin Server is restarted
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract Map<String, InterpreterClient> restore() throws IOException;
|
||||
}
|
||||
|
|
@ -162,7 +162,7 @@ public class ZeppelinServer extends Application {
|
|||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
final ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
conf.setProperty("args", args);
|
||||
|
||||
jettyWebServer = setupJettyServer(conf);
|
||||
|
|
@ -199,7 +199,6 @@ public class ZeppelinServer extends Application {
|
|||
LOG.info("Shutting down Zeppelin Server ... ");
|
||||
try {
|
||||
jettyWebServer.stop();
|
||||
notebook.getInterpreterSettingManager().close();
|
||||
notebook.close();
|
||||
Thread.sleep(3000);
|
||||
} catch (Exception e) {
|
||||
|
|
@ -222,7 +221,9 @@ public class ZeppelinServer extends Application {
|
|||
}
|
||||
|
||||
jettyWebServer.join();
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().close();
|
||||
if (!conf.isRecoveryEnabled()) {
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().close();
|
||||
}
|
||||
}
|
||||
|
||||
private static Server setupJettyServer(ZeppelinConfiguration conf) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.recovery;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.httpclient.methods.PostMethod;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.rest.AbstractTestRestApi;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.server.ZeppelinServer;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class RecoveryTest extends AbstractTestRestApi {
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_ENABLED.getVarName(), "true");
|
||||
startUp(RecoveryTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destroy() throws Exception {
|
||||
shutDown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecovery() throws Exception {
|
||||
Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
|
||||
|
||||
// run python interpreter and create new variable `user`
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("%python user='abc'");
|
||||
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertThat(post, isAllowed());
|
||||
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.FINISHED, p1.getStatus());
|
||||
|
||||
// shutdown zeppelin and restart it
|
||||
shutDown();
|
||||
startUp(RecoveryTest.class.getSimpleName());
|
||||
|
||||
// run the paragraph again, but change the text to print variable `user`
|
||||
note1 = ZeppelinServer.notebook.getNote(note1.getId());
|
||||
p1 = note1.getParagraph(p1.getId());
|
||||
p1.setText("%python print(user)");
|
||||
post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.FINISHED, p1.getStatus());
|
||||
assertEquals("abc\n", p1.getResult().message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecovery_2() throws Exception {
|
||||
Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
|
||||
|
||||
// run python interpreter and create new variable `user`
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("%python user='abc'");
|
||||
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertThat(post, isAllowed());
|
||||
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.FINISHED, p1.getStatus());
|
||||
|
||||
// restart the python interpreter
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().restart(
|
||||
((ManagedInterpreterGroup) p1.getBindedInterpreter().getInterpreterGroup())
|
||||
.getInterpreterSetting().getId()
|
||||
);
|
||||
|
||||
// shutdown zeppelin and restart it
|
||||
shutDown();
|
||||
startUp(RecoveryTest.class.getSimpleName());
|
||||
|
||||
// run the paragraph again, but change the text to print variable `user`.
|
||||
// can not recover the python interpreter, because it has been shutdown.
|
||||
note1 = ZeppelinServer.notebook.getNote(note1.getId());
|
||||
p1 = note1.getParagraph(p1.getId());
|
||||
p1.setText("%python print(user)");
|
||||
post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.ERROR, p1.getStatus());
|
||||
}
|
||||
}
|
||||
|
|
@ -318,8 +318,10 @@ public abstract class AbstractTestRestApi {
|
|||
if (!wasRunning) {
|
||||
// restart interpreter to stop all interpreter processes
|
||||
List<InterpreterSetting> settingList = ZeppelinServer.notebook.getInterpreterSettingManager().get();
|
||||
for (InterpreterSetting setting : settingList) {
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
|
||||
if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) {
|
||||
for (InterpreterSetting setting : settingList) {
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
|
||||
}
|
||||
}
|
||||
if (shiroIni != null) {
|
||||
FileUtils.deleteQuietly(shiroIni);
|
||||
|
|
@ -350,7 +352,12 @@ public abstract class AbstractTestRestApi {
|
|||
.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName());
|
||||
}
|
||||
|
||||
FileUtils.deleteDirectory(confDir);
|
||||
if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) {
|
||||
// don't delete interpreter.json when recovery is enabled. otherwise the interpreter setting
|
||||
// id will change after zeppelin restart, then we can not recover interpreter process
|
||||
// properly
|
||||
FileUtils.deleteDirectory(confDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -38,6 +38,8 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
|
|||
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
|
||||
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
|
||||
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
|
||||
|
|
@ -144,6 +146,9 @@ public class InterpreterSetting {
|
|||
|
||||
|
||||
|
||||
private transient RecoveryStorage recoveryStorage;
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Builder class for InterpreterSetting
|
||||
*/
|
||||
|
|
@ -242,6 +247,11 @@ public class InterpreterSetting {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setRecoveryStorage(RecoveryStorage recoveryStorage) {
|
||||
interpreterSetting.recoveryStorage = recoveryStorage;
|
||||
return this;
|
||||
}
|
||||
|
||||
public InterpreterSetting create() {
|
||||
// post processing
|
||||
interpreterSetting.postProcessing();
|
||||
|
|
@ -261,6 +271,9 @@ public class InterpreterSetting {
|
|||
if (this.lifecycleManager == null) {
|
||||
this.lifecycleManager = new NullLifecycleManager(conf);
|
||||
}
|
||||
if (this.recoveryStorage == null) {
|
||||
this.recoveryStorage = new NullRecoveryStorage();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -285,9 +298,9 @@ public class InterpreterSetting {
|
|||
|
||||
private void createLauncher() {
|
||||
if (group.equals("spark")) {
|
||||
this.launcher = new SparkInterpreterLauncher(this.conf);
|
||||
this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
|
||||
} else {
|
||||
this.launcher = new ShellScriptLauncher(this.conf);
|
||||
this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -344,6 +357,15 @@ public class InterpreterSetting {
|
|||
return this;
|
||||
}
|
||||
|
||||
public InterpreterSetting setRecoveryStorage(RecoveryStorage recoveryStorage) {
|
||||
this.recoveryStorage = recoveryStorage;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecoveryStorage getRecoveryStorage() {
|
||||
return recoveryStorage;
|
||||
}
|
||||
|
||||
public LifecycleManager getLifecycleManager() {
|
||||
return lifecycleManager;
|
||||
}
|
||||
|
|
@ -408,7 +430,12 @@ public class InterpreterSetting {
|
|||
}
|
||||
|
||||
void removeInterpreterGroup(String groupId) {
|
||||
this.interpreterGroups.remove(groupId);
|
||||
try {
|
||||
interpreterGroupWriteLock.lock();
|
||||
this.interpreterGroups.remove(groupId);
|
||||
} finally {
|
||||
interpreterGroupWriteLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
|
||||
|
|
@ -425,7 +452,6 @@ public class InterpreterSetting {
|
|||
return interpreterGroups.get(groupId);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ArrayList<ManagedInterpreterGroup> getAllInterpreterGroups() {
|
||||
try {
|
||||
interpreterGroupReadLock.lock();
|
||||
|
|
@ -668,16 +694,19 @@ public class InterpreterSetting {
|
|||
return interpreters;
|
||||
}
|
||||
|
||||
synchronized RemoteInterpreterProcess createInterpreterProcess(Properties properties)
|
||||
synchronized RemoteInterpreterProcess createInterpreterProcess(String interpreterGroupId,
|
||||
Properties properties)
|
||||
throws IOException {
|
||||
if (launcher == null) {
|
||||
createLauncher();
|
||||
}
|
||||
InterpreterLaunchContext launchContext = new
|
||||
InterpreterLaunchContext(properties, option, interpreterRunner, id, group, name);
|
||||
InterpreterLaunchContext(properties, option, interpreterRunner,
|
||||
interpreterGroupId, id, group, name);
|
||||
RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
|
||||
process.setRemoteInterpreterEventPoller(
|
||||
new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));
|
||||
recoveryStorage.onInterpreterClientStart(process);
|
||||
return process;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -34,6 +34,9 @@ import org.apache.zeppelin.dep.DependencyResolver;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
|
||||
import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
|
|
@ -118,6 +121,7 @@ public class InterpreterSettingManager {
|
|||
private ApplicationEventListener appEventListener;
|
||||
private DependencyResolver dependencyResolver;
|
||||
private LifecycleManager lifecycleManager;
|
||||
private RecoveryStorage recoveryStorage;
|
||||
|
||||
public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener,
|
||||
|
|
@ -154,6 +158,11 @@ public class InterpreterSettingManager {
|
|||
this.angularObjectRegistryListener = angularObjectRegistryListener;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.appEventListener = appEventListener;
|
||||
if (conf.isRecoveryEnabled()) {
|
||||
this.recoveryStorage = new FileSystemRecoveryStorage(conf, this);
|
||||
} else {
|
||||
this.recoveryStorage = new NullRecoveryStorage();
|
||||
}
|
||||
try {
|
||||
this.lifecycleManager = (LifecycleManager)
|
||||
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
|
||||
|
|
@ -174,6 +183,7 @@ public class InterpreterSettingManager {
|
|||
.setAppEventListener(appEventListener)
|
||||
.setDependencyResolver(dependencyResolver)
|
||||
.setLifecycleManager(lifecycleManager)
|
||||
.setRecoveryStorage(recoveryStorage)
|
||||
.postProcessing();
|
||||
}
|
||||
|
||||
|
|
@ -507,6 +517,10 @@ public class InterpreterSettingManager {
|
|||
return resourceSet;
|
||||
}
|
||||
|
||||
public RecoveryStorage getRecoveryStorage() {
|
||||
return recoveryStorage;
|
||||
}
|
||||
|
||||
public void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
|
||||
for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) {
|
||||
ResourceSet resourceSet = new ResourceSet();
|
||||
|
|
|
|||
|
|
@ -55,15 +55,31 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
return interpreterSetting;
|
||||
}
|
||||
|
||||
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(Properties properties)
|
||||
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName,
|
||||
Properties properties)
|
||||
throws IOException {
|
||||
if (remoteInterpreterProcess == null) {
|
||||
LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId());
|
||||
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(properties);
|
||||
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, properties);
|
||||
synchronized (remoteInterpreterProcess) {
|
||||
if (!remoteInterpreterProcess.isRunning()) {
|
||||
remoteInterpreterProcess.start(userName, false);
|
||||
remoteInterpreterProcess.getRemoteInterpreterEventPoller()
|
||||
.setInterpreterProcess(remoteInterpreterProcess);
|
||||
remoteInterpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(this);
|
||||
remoteInterpreterProcess.getRemoteInterpreterEventPoller().start();
|
||||
getInterpreterSetting().getRecoveryStorage()
|
||||
.onInterpreterClientStart(remoteInterpreterProcess);
|
||||
}
|
||||
}
|
||||
}
|
||||
return remoteInterpreterProcess;
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcess getInterpreterProcess() {
|
||||
return remoteInterpreterProcess;
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcess getRemoteInterpreterProcess() {
|
||||
return remoteInterpreterProcess;
|
||||
}
|
||||
|
|
@ -94,6 +110,11 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
if (remoteInterpreterProcess != null) {
|
||||
LOGGER.info("Kill RemoteInterpreterProcess");
|
||||
remoteInterpreterProcess.stop();
|
||||
try {
|
||||
interpreterSetting.getRecoveryStorage().onInterpreterClientStop(remoteInterpreterProcess);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Fail to store recovery data", e);
|
||||
}
|
||||
remoteInterpreterProcess = null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,50 +21,67 @@ package org.apache.zeppelin.interpreter.launcher;
|
|||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterRunner;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interpreter Launcher which use shell script to launch the interpreter process.
|
||||
*
|
||||
*/
|
||||
public class ShellScriptLauncher extends InterpreterLauncher {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class);
|
||||
|
||||
public ShellScriptLauncher(ZeppelinConfiguration zConf) {
|
||||
super(zConf);
|
||||
public ShellScriptLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
|
||||
super(zConf, recoveryStorage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterClient launch(InterpreterLaunchContext context) {
|
||||
public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
|
||||
LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
|
||||
this.properties = context.getProperties();
|
||||
InterpreterOption option = context.getOption();
|
||||
InterpreterRunner runner = context.getRunner();
|
||||
String groupName = context.getInterpreterSettingGroup();
|
||||
String name = context.getInterpreterSettingName();
|
||||
|
||||
int connectTimeout =
|
||||
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
|
||||
if (option.isExistingProcess()) {
|
||||
return new RemoteInterpreterRunningProcess(
|
||||
context.getInterpreterSettingName(),
|
||||
connectTimeout,
|
||||
option.getHost(),
|
||||
option.getPort());
|
||||
} else {
|
||||
// try to recover it first
|
||||
if (zConf.isRecoveryEnabled()) {
|
||||
Map<String, InterpreterClient> clients = recoveryStorage.restore();
|
||||
if (clients.containsKey(context.getInterpreterGroupId())) {
|
||||
InterpreterClient client = clients.get(context.getInterpreterGroupId());
|
||||
if (client.isRunning()) {
|
||||
LOGGER.info("Recover InterpreterProcess: " + client.getHost() + ":" + client.getPort());
|
||||
return (RemoteInterpreterRunningProcess) client;
|
||||
} else {
|
||||
LOGGER.warn("Can not recovery interpreter process: " + client.getHost() + ":"
|
||||
+ client.getPort() + ", as it is already terminated.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// create new remote process
|
||||
String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
|
||||
+ context.getInterpreterSettingId();
|
||||
return new RemoteInterpreterManagedProcess(
|
||||
runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
|
||||
zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
|
||||
zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
|
||||
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
|
||||
buildEnvFromProperties(), connectTimeout, name);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
|
|||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -35,8 +36,8 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
|
|||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
|
||||
|
||||
public SparkInterpreterLauncher(ZeppelinConfiguration zConf) {
|
||||
super(zConf);
|
||||
public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
|
||||
super(zConf, recoveryStorage);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
|
||||
import org.apache.zeppelin.notebook.FileSystemStorage;
|
||||
import org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* Hadoop compatible FileSystem based RecoveryStorage implementation.
|
||||
*
|
||||
* Save InterpreterProcess in the format of:
|
||||
* InterpreterGroupId host:port
|
||||
*/
|
||||
public class FileSystemRecoveryStorage extends RecoveryStorage {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemRecoveryStorage.class);
|
||||
|
||||
private InterpreterSettingManager interpreterSettingManager;
|
||||
private ZeppelinConfiguration zConf;
|
||||
private FileSystemStorage fs;
|
||||
private Path recoveryDir;
|
||||
|
||||
public FileSystemRecoveryStorage(ZeppelinConfiguration zConf,
|
||||
InterpreterSettingManager interpreterSettingManager)
|
||||
throws IOException {
|
||||
|
||||
this.interpreterSettingManager = interpreterSettingManager;
|
||||
this.zConf = zConf;
|
||||
this.fs = FileSystemStorage.get(zConf);
|
||||
this.recoveryDir = this.fs.makeQualified(new Path(zConf.getNotebookDir()));
|
||||
LOGGER.info("Using folder {} to store recovery data", recoveryDir);
|
||||
this.fs.tryMkDir(recoveryDir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterClientStart(InterpreterClient client) throws IOException {
|
||||
save(client.getInterpreterSettingName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterClientStop(InterpreterClient client) throws IOException {
|
||||
save(client.getInterpreterSettingName());
|
||||
}
|
||||
|
||||
private void save(String interpreterSettingName) throws IOException {
|
||||
InterpreterSetting interpreterSetting =
|
||||
interpreterSettingManager.getInterpreterSettingByName(interpreterSettingName);
|
||||
List<String> recoveryContent = new ArrayList<>();
|
||||
for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) {
|
||||
RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess();
|
||||
if (interpreterProcess != null) {
|
||||
recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" +
|
||||
interpreterProcess.getPort());
|
||||
}
|
||||
}
|
||||
LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName);
|
||||
LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, "\n"));
|
||||
Path recoveryFile = new Path(recoveryDir, interpreterSettingName + ".recovery");
|
||||
fs.writeFile(StringUtils.join(recoveryContent, "\n"), recoveryFile, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, InterpreterClient> restore() throws IOException {
|
||||
Map<String, InterpreterClient> clients = new HashMap<>();
|
||||
List<Path> paths = fs.list(new Path(recoveryDir + "/*.recovery"));
|
||||
|
||||
for (Path path : paths) {
|
||||
String fileName = path.getName();
|
||||
String interpreterSettingName = fileName.substring(0,
|
||||
fileName.length() - ".recovery".length());
|
||||
String recoveryContent = fs.readFile(path);
|
||||
if (!StringUtils.isBlank(recoveryContent)) {
|
||||
for (String line : recoveryContent.split("\n")) {
|
||||
String[] tokens = line.split("\t");
|
||||
String groupId = tokens[0];
|
||||
String[] hostPort = tokens[1].split(":");
|
||||
int connectTimeout =
|
||||
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
InterpreterClient client = new RemoteInterpreterRunningProcess(interpreterSettingName,
|
||||
connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
|
||||
clients.put(groupId, client);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return clients;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* RecoveryStorage that do nothing, used when recovery is not enabled.
|
||||
*
|
||||
*/
|
||||
public class NullRecoveryStorage extends RecoveryStorage {
|
||||
@Override
|
||||
public void onInterpreterClientStart(InterpreterClient client) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterClientStop(InterpreterClient client) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, InterpreterClient> restore() throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -102,16 +102,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
return this.interpreterProcess;
|
||||
}
|
||||
ManagedInterpreterGroup intpGroup = getInterpreterGroup();
|
||||
this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(properties);
|
||||
synchronized (interpreterProcess) {
|
||||
if (!interpreterProcess.isRunning()) {
|
||||
interpreterProcess.start(this.getUserName(), false);
|
||||
interpreterProcess.getRemoteInterpreterEventPoller()
|
||||
.setInterpreterProcess(interpreterProcess);
|
||||
interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup);
|
||||
interpreterProcess.getRemoteInterpreterEventPoller().start();
|
||||
}
|
||||
}
|
||||
this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(getUserName(), properties);
|
||||
return interpreterProcess;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -263,7 +263,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
return interpreterDir;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getInterpreterSettingName() {
|
||||
return interpreterSettingName;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,12 +51,6 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
|
|||
this.remoteInterpreterEventPoller = eventPoller;
|
||||
}
|
||||
|
||||
public abstract String getHost();
|
||||
public abstract int getPort();
|
||||
public abstract void start(String userName, Boolean isUserImpersonate);
|
||||
public abstract void stop();
|
||||
public abstract boolean isRunning();
|
||||
|
||||
public int getConnectTimeout() {
|
||||
return connectTimeout;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,13 +27,16 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
|
||||
private final String host;
|
||||
private final int port;
|
||||
private final String interpreterSettingName;
|
||||
|
||||
public RemoteInterpreterRunningProcess(
|
||||
String interpreterSettingName,
|
||||
int connectTimeout,
|
||||
String host,
|
||||
int port
|
||||
) {
|
||||
super(connectTimeout);
|
||||
this.interpreterSettingName = interpreterSettingName;
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
|
@ -48,6 +51,11 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
return port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInterpreterSettingName() {
|
||||
return interpreterSettingName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String userName, Boolean isUserImpersonate) {
|
||||
// assume process is externally managed. nothing to do
|
||||
|
|
|
|||
|
|
@ -0,0 +1,166 @@
|
|||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* Hadoop FileSystem wrapper. Support both secure and no-secure mode
|
||||
*/
|
||||
public class FileSystemStorage {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(FileSystemStorage.class);
|
||||
|
||||
private static FileSystemStorage instance;
|
||||
|
||||
private ZeppelinConfiguration zConf;
|
||||
private Configuration hadoopConf;
|
||||
private boolean isSecurityEnabled = false;
|
||||
private FileSystem fs;
|
||||
|
||||
private FileSystemStorage(ZeppelinConfiguration zConf) throws IOException {
|
||||
this.zConf = zConf;
|
||||
this.hadoopConf = new Configuration();
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
|
||||
if (isSecurityEnabled) {
|
||||
String keytab = zConf.getString(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
|
||||
String principal = zConf.getString(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
|
||||
if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
|
||||
throw new IOException("keytab and principal can not be empty, keytab: " + keytab
|
||||
+ ", principal: " + principal);
|
||||
}
|
||||
UserGroupInformation.loginUserFromKeytab(principal, keytab);
|
||||
}
|
||||
|
||||
try {
|
||||
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new Configuration());
|
||||
LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static synchronized FileSystemStorage get(ZeppelinConfiguration zConf) throws IOException {
|
||||
if (instance == null) {
|
||||
instance = new FileSystemStorage(zConf);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public Path makeQualified(Path path) {
|
||||
return fs.makeQualified(path);
|
||||
}
|
||||
|
||||
public void tryMkDir(final Path dir) throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
if (!fs.exists(dir)) {
|
||||
fs.mkdirs(dir);
|
||||
LOGGER.info("Create dir {} in hdfs", dir.toString());
|
||||
}
|
||||
if (fs.isFile(dir)) {
|
||||
throw new IOException("{} is file instead of directory, please remove it or " +
|
||||
"specify another directory");
|
||||
}
|
||||
fs.mkdirs(dir);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public List<Path> list(final Path path) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<List<Path>>() {
|
||||
@Override
|
||||
public List<Path> call() throws IOException {
|
||||
List<Path> paths = new ArrayList<>();
|
||||
for (FileStatus status : fs.globStatus(path)) {
|
||||
paths.add(status.getPath());
|
||||
}
|
||||
return paths;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public boolean delete(final Path path) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws IOException {
|
||||
return fs.delete(path, true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public String readFile(final Path file) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<String>() {
|
||||
@Override
|
||||
public String call() throws IOException {
|
||||
LOGGER.debug("Read from file: " + file);
|
||||
ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
|
||||
IOUtils.copyBytes(fs.open(file), noteBytes, hadoopConf);
|
||||
return new String(noteBytes.toString(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void writeFile(final String content, final Path file, boolean writeTempFileFirst)
|
||||
throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
InputStream in = new ByteArrayInputStream(content.getBytes(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
|
||||
Path tmpFile = new Path(file.toString() + ".tmp");
|
||||
IOUtils.copyBytes(in, fs.create(tmpFile), hadoopConf);
|
||||
fs.delete(file, true);
|
||||
fs.rename(tmpFile, file);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private interface HdfsOperation<T> {
|
||||
T call() throws IOException;
|
||||
}
|
||||
|
||||
public synchronized <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
|
||||
if (isSecurityEnabled) {
|
||||
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||
try {
|
||||
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<T>() {
|
||||
@Override
|
||||
public T run() throws Exception {
|
||||
return func.call();
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
} else {
|
||||
return func.call();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -8,6 +8,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.notebook.FileSystemStorage;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.NoteInfo;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
|
|
@ -37,108 +38,45 @@ import java.util.Map;
|
|||
public class FileSystemNotebookRepo implements NotebookRepo {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemNotebookRepo.class);
|
||||
|
||||
private Configuration hadoopConf;
|
||||
private ZeppelinConfiguration zConf;
|
||||
private boolean isSecurityEnabled = false;
|
||||
private FileSystem fs;
|
||||
private FileSystemStorage fs;
|
||||
private Path notebookDir;
|
||||
|
||||
public FileSystemNotebookRepo(ZeppelinConfiguration zConf) throws IOException {
|
||||
this.zConf = zConf;
|
||||
this.hadoopConf = new Configuration();
|
||||
this.fs = FileSystemStorage.get(zConf);
|
||||
this.notebookDir = this.fs.makeQualified(new Path(zConf.getNotebookDir()));
|
||||
LOGGER.info("Using folder {} to store notebook", notebookDir);
|
||||
this.fs.tryMkDir(notebookDir);
|
||||
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
if (isSecurityEnabled) {
|
||||
String keytab = zConf.getString(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
|
||||
String principal = zConf.getString(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
|
||||
if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
|
||||
throw new IOException("keytab and principal can not be empty, keytab: " + keytab
|
||||
+ ", principal: " + principal);
|
||||
}
|
||||
UserGroupInformation.loginUserFromKeytab(principal, keytab);
|
||||
}
|
||||
|
||||
try {
|
||||
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new Configuration());
|
||||
LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName());
|
||||
this.notebookDir = fs.makeQualified(new Path(zConf.getNotebookDir()));
|
||||
LOGGER.info("Using folder {} to store notebook", notebookDir);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
if (!fs.exists(notebookDir)) {
|
||||
fs.mkdirs(notebookDir);
|
||||
LOGGER.info("Create notebook dir {} in hdfs", notebookDir.toString());
|
||||
}
|
||||
if (fs.isFile(notebookDir)) {
|
||||
throw new IOException("notebookDir {} is file instead of directory, please remove it or " +
|
||||
"specify another directory");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<List<NoteInfo>>() {
|
||||
@Override
|
||||
public List<NoteInfo> call() throws IOException {
|
||||
List<NoteInfo> noteInfos = new ArrayList<>();
|
||||
for (FileStatus status : fs.globStatus(new Path(notebookDir, "*/note.json"))) {
|
||||
NoteInfo noteInfo = new NoteInfo(status.getPath().getParent().getName(), "", null);
|
||||
noteInfos.add(noteInfo);
|
||||
}
|
||||
return noteInfos;
|
||||
}
|
||||
});
|
||||
List<Path> notePaths = fs.list(new Path(notebookDir, "*/note.json"));
|
||||
List<NoteInfo> noteInfos = new ArrayList<>();
|
||||
for (Path path : notePaths) {
|
||||
NoteInfo noteInfo = new NoteInfo(path.getParent().getName(), "", null);
|
||||
noteInfos.add(noteInfo);
|
||||
}
|
||||
return noteInfos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Note get(final String noteId, AuthenticationInfo subject) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<Note>() {
|
||||
@Override
|
||||
public Note call() throws IOException {
|
||||
Path notePath = new Path(notebookDir.toString() + "/" + noteId + "/note.json");
|
||||
LOGGER.debug("Read note from file: " + notePath);
|
||||
ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
|
||||
IOUtils.copyBytes(fs.open(notePath), noteBytes, hadoopConf);
|
||||
return Note.fromJson(new String(noteBytes.toString(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING))));
|
||||
}
|
||||
});
|
||||
String content = this.fs.readFile(
|
||||
new Path(notebookDir.toString() + "/" + noteId + "/note.json"));
|
||||
return Note.fromJson(content);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void save(final Note note, AuthenticationInfo subject) throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
Path notePath = new Path(notebookDir.toString() + "/" + note.getId() + "/note.json");
|
||||
Path tmpNotePath = new Path(notebookDir.toString() + "/" + note.getId() + "/.note.json");
|
||||
LOGGER.debug("Saving note to file: " + notePath);
|
||||
if (fs.exists(tmpNotePath)) {
|
||||
fs.delete(tmpNotePath, true);
|
||||
}
|
||||
InputStream in = new ByteArrayInputStream(note.toJson().getBytes(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
|
||||
IOUtils.copyBytes(in, fs.create(tmpNotePath), hadoopConf);
|
||||
fs.delete(notePath, true);
|
||||
fs.rename(tmpNotePath, notePath);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
this.fs.writeFile(note.toJson(),
|
||||
new Path(notebookDir.toString() + "/" + note.getId() + "/note.json"),
|
||||
true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(final String noteId, AuthenticationInfo subject) throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
Path noteFolder = new Path(notebookDir.toString() + "/" + noteId);
|
||||
fs.delete(noteFolder, true);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
this.fs.delete(new Path(notebookDir.toString() + "/" + noteId));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -182,26 +120,4 @@ public class FileSystemNotebookRepo implements NotebookRepo {
|
|||
public void updateSettings(Map<String, String> settings, AuthenticationInfo subject) {
|
||||
LOGGER.warn("updateSettings is not implemented for HdfsNotebookRepo");
|
||||
}
|
||||
|
||||
private interface HdfsOperation<T> {
|
||||
T call() throws IOException;
|
||||
}
|
||||
|
||||
public synchronized <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
|
||||
if (isSecurityEnabled) {
|
||||
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||
try {
|
||||
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<T>() {
|
||||
@Override
|
||||
public T run() throws Exception {
|
||||
return func.call();
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
} else {
|
||||
return func.call();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -30,14 +31,14 @@ import static org.junit.Assert.assertTrue;
|
|||
public class ShellScriptLauncherTest {
|
||||
|
||||
@Test
|
||||
public void testLauncher() {
|
||||
public void testLauncher() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
ShellScriptLauncher launcher = new ShellScriptLauncher(zConf);
|
||||
ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("ENV_1", "VALUE_1");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "groupName", "name");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "groupName", "name");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -30,9 +31,9 @@ import static org.junit.Assert.assertTrue;
|
|||
public class SparkInterpreterLauncherTest {
|
||||
|
||||
@Test
|
||||
public void testLocalMode() {
|
||||
public void testLocalMode() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -41,7 +42,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
@ -55,9 +56,9 @@ public class SparkInterpreterLauncherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClientMode_1() {
|
||||
public void testYarnClientMode_1() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -66,7 +67,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
@ -80,9 +81,9 @@ public class SparkInterpreterLauncherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClientMode_2() {
|
||||
public void testYarnClientMode_2() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -92,7 +93,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
@ -106,9 +107,9 @@ public class SparkInterpreterLauncherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClusterMode_1() {
|
||||
public void testYarnClusterMode_1() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -117,7 +118,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
@ -132,9 +133,9 @@ public class SparkInterpreterLauncherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClusterMode_2() {
|
||||
public void testYarnClusterMode_2() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -144,7 +145,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,78 @@
|
|||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_ENABLED.getVarName(), "true");
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleInterpreterProcess() throws InterpreterException, IOException {
|
||||
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
|
||||
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
|
||||
|
||||
Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
|
||||
RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
|
||||
InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
remoteInterpreter1.interpret("hello", context1);
|
||||
|
||||
assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
|
||||
interpreterSetting.close();
|
||||
assertEquals(0, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleInterpreterProcess() throws InterpreterException, IOException {
|
||||
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
|
||||
interpreterSetting.getOption().setPerUser(InterpreterOption.ISOLATED);
|
||||
|
||||
Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
|
||||
RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
|
||||
InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
remoteInterpreter1.interpret("hello", context1);
|
||||
assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
|
||||
Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note2");
|
||||
RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
|
||||
InterpreterContext context2 = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
remoteInterpreter2.interpret("hello", context2);
|
||||
|
||||
assertEquals(2, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
|
||||
interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1");
|
||||
assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
|
||||
interpreterSetting.close();
|
||||
assertEquals(0, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue