ZEPPELIN-3051. Support Interpreter Process Recovery

This commit is contained in:
Jeff Zhang 2017-11-14 21:48:06 +08:00
parent 13f8e6cc65
commit da7cbb90bc
25 changed files with 789 additions and 164 deletions

View file

@ -61,7 +61,7 @@
"description": "Spark master uri. ex) spark://masterhost:7077",
"type": "string"
},
"zeppelin.spark.unSupportedVersionCheck": {
"zeppelin.spark.enableSupportedVersionCheck": {
"envName": null,
"propertyName": "zeppelin.spark.enableSupportedVersionCheck",
"defaultValue": true,

View file

@ -355,6 +355,14 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
}
public String getRecoveryDir() {
return getRelativeDir(ConfVars.ZEPPELIN_RECOVERY_DIR);
}
public boolean isRecoveryEnabled() {
return getBoolean(ConfVars.ZEPPELIN_RECOVERY_ENABLED);
}
public String getUser() {
return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER);
}
@ -658,6 +666,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"),
ZEPPELIN_RECOVERY_ENABLED("zeppelin.recovery.enabled", "false"),
// use specified notebook (id) as homescreen
ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),
// whether homescreen notebook will be hidden from notebook list or not

View file

@ -19,8 +19,20 @@ package org.apache.zeppelin.interpreter.launcher;
/**
* Interface to InterpreterClient which is created by InterpreterLauncher. This is the component
* that is used to for the communication fromzeppelin-server process to zeppelin interpreter process
* that is used to for the communication from zeppelin-server process to zeppelin interpreter
* process.
*/
public interface InterpreterClient {
String getInterpreterSettingName();
void start(String userName, Boolean isUserImpersonate);
void stop();
String getHost();
int getPort();
boolean isRunning();
}

View file

@ -30,6 +30,7 @@ public class InterpreterLaunchContext {
private Properties properties;
private InterpreterOption option;
private InterpreterRunner runner;
private String interpreterGroupId;
private String interpreterSettingId;
private String interpreterSettingGroup;
private String interpreterSettingName;
@ -37,12 +38,14 @@ public class InterpreterLaunchContext {
public InterpreterLaunchContext(Properties properties,
InterpreterOption option,
InterpreterRunner runner,
String interpreterGroupId,
String interpreterSettingId,
String interpreterSettingGroup,
String interpreterSettingName) {
this.properties = properties;
this.option = option;
this.runner = runner;
this.interpreterGroupId = interpreterGroupId;
this.interpreterSettingId = interpreterSettingId;
this.interpreterSettingGroup = interpreterSettingGroup;
this.interpreterSettingName = interpreterSettingName;
@ -60,6 +63,10 @@ public class InterpreterLaunchContext {
return runner;
}
public String getInterpreterGroupId() {
return interpreterGroupId;
}
public String getInterpreterSettingId() {
return interpreterSettingId;
}

View file

@ -18,6 +18,7 @@
package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import java.io.IOException;
import java.util.Properties;
@ -29,9 +30,11 @@ public abstract class InterpreterLauncher {
protected ZeppelinConfiguration zConf;
protected Properties properties;
protected RecoveryStorage recoveryStorage;
public InterpreterLauncher(ZeppelinConfiguration zConf) {
public InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
this.zConf = zConf;
this.recoveryStorage = recoveryStorage;
}
public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException;

View file

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.recovery;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import java.io.IOException;
import java.util.Map;
/**
* Interface for Interpreter Process Recovery.
*
*/
public abstract class RecoveryStorage {
/**
* Update RecoveryStorage when new InterpreterClient is started
* @param client
* @throws IOException
*/
public abstract void onInterpreterClientStart(InterpreterClient client) throws IOException;
/**
* Update RecoveryStorage when InterpreterClient is stopped
* @param client
* @throws IOException
*/
public abstract void onInterpreterClientStop(InterpreterClient client) throws IOException;
/**
* Restore InterpreterClient when Zeppelin Server is restarted
* @return
* @throws IOException
*/
public abstract Map<String, InterpreterClient> restore() throws IOException;
}

View file

@ -162,7 +162,7 @@ public class ZeppelinServer extends Application {
public static void main(String[] args) throws InterruptedException {
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
final ZeppelinConfiguration conf = ZeppelinConfiguration.create();
conf.setProperty("args", args);
jettyWebServer = setupJettyServer(conf);
@ -199,7 +199,6 @@ public class ZeppelinServer extends Application {
LOG.info("Shutting down Zeppelin Server ... ");
try {
jettyWebServer.stop();
notebook.getInterpreterSettingManager().close();
notebook.close();
Thread.sleep(3000);
} catch (Exception e) {
@ -222,7 +221,9 @@ public class ZeppelinServer extends Application {
}
jettyWebServer.join();
ZeppelinServer.notebook.getInterpreterSettingManager().close();
if (!conf.isRecoveryEnabled()) {
ZeppelinServer.notebook.getInterpreterSettingManager().close();
}
}
private static Server setupJettyServer(ZeppelinConfiguration conf) {

View file

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.recovery;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.rest.AbstractTestRestApi;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.server.ZeppelinServer;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
public class RecoveryTest extends AbstractTestRestApi {
Gson gson = new Gson();
@BeforeClass
public static void init() throws Exception {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_ENABLED.getVarName(), "true");
startUp(RecoveryTest.class.getSimpleName());
}
@AfterClass
public static void destroy() throws Exception {
shutDown();
}
@Test
public void testRecovery() throws Exception {
Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
// run python interpreter and create new variable `user`
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("%python user='abc'");
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
assertThat(post, isAllowed());
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
}.getType());
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
assertEquals(Job.Status.FINISHED, p1.getStatus());
// shutdown zeppelin and restart it
shutDown();
startUp(RecoveryTest.class.getSimpleName());
// run the paragraph again, but change the text to print variable `user`
note1 = ZeppelinServer.notebook.getNote(note1.getId());
p1 = note1.getParagraph(p1.getId());
p1.setText("%python print(user)");
post = httpPost("/notebook/job/" + note1.getId(), "");
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
assertEquals(Job.Status.FINISHED, p1.getStatus());
assertEquals("abc\n", p1.getResult().message().get(0).getData());
}
@Test
public void testRecovery_2() throws Exception {
Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
// run python interpreter and create new variable `user`
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
p1.setText("%python user='abc'");
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
assertThat(post, isAllowed());
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
}.getType());
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
assertEquals(Job.Status.FINISHED, p1.getStatus());
// restart the python interpreter
ZeppelinServer.notebook.getInterpreterSettingManager().restart(
((ManagedInterpreterGroup) p1.getBindedInterpreter().getInterpreterGroup())
.getInterpreterSetting().getId()
);
// shutdown zeppelin and restart it
shutDown();
startUp(RecoveryTest.class.getSimpleName());
// run the paragraph again, but change the text to print variable `user`.
// can not recover the python interpreter, because it has been shutdown.
note1 = ZeppelinServer.notebook.getNote(note1.getId());
p1 = note1.getParagraph(p1.getId());
p1.setText("%python print(user)");
post = httpPost("/notebook/job/" + note1.getId(), "");
assertEquals(resp.get("status"), "OK");
post.releaseConnection();
assertEquals(Job.Status.ERROR, p1.getStatus());
}
}

View file

@ -318,8 +318,10 @@ public abstract class AbstractTestRestApi {
if (!wasRunning) {
// restart interpreter to stop all interpreter processes
List<InterpreterSetting> settingList = ZeppelinServer.notebook.getInterpreterSettingManager().get();
for (InterpreterSetting setting : settingList) {
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) {
for (InterpreterSetting setting : settingList) {
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
}
}
if (shiroIni != null) {
FileUtils.deleteQuietly(shiroIni);
@ -350,7 +352,12 @@ public abstract class AbstractTestRestApi {
.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName());
}
FileUtils.deleteDirectory(confDir);
if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) {
// don't delete interpreter.json when recovery is enabled. otherwise the interpreter setting
// id will change after zeppelin restart, then we can not recover interpreter process
// properly
FileUtils.deleteDirectory(confDir);
}
}
}

View file

@ -38,6 +38,8 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
@ -144,6 +146,9 @@ public class InterpreterSetting {
private transient RecoveryStorage recoveryStorage;
///////////////////////////////////////////////////////////////////////////////////////////
/**
* Builder class for InterpreterSetting
*/
@ -242,6 +247,11 @@ public class InterpreterSetting {
return this;
}
public Builder setRecoveryStorage(RecoveryStorage recoveryStorage) {
interpreterSetting.recoveryStorage = recoveryStorage;
return this;
}
public InterpreterSetting create() {
// post processing
interpreterSetting.postProcessing();
@ -261,6 +271,9 @@ public class InterpreterSetting {
if (this.lifecycleManager == null) {
this.lifecycleManager = new NullLifecycleManager(conf);
}
if (this.recoveryStorage == null) {
this.recoveryStorage = new NullRecoveryStorage();
}
}
/**
@ -285,9 +298,9 @@ public class InterpreterSetting {
private void createLauncher() {
if (group.equals("spark")) {
this.launcher = new SparkInterpreterLauncher(this.conf);
this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
} else {
this.launcher = new ShellScriptLauncher(this.conf);
this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage);
}
}
@ -344,6 +357,15 @@ public class InterpreterSetting {
return this;
}
public InterpreterSetting setRecoveryStorage(RecoveryStorage recoveryStorage) {
this.recoveryStorage = recoveryStorage;
return this;
}
public RecoveryStorage getRecoveryStorage() {
return recoveryStorage;
}
public LifecycleManager getLifecycleManager() {
return lifecycleManager;
}
@ -408,7 +430,12 @@ public class InterpreterSetting {
}
void removeInterpreterGroup(String groupId) {
this.interpreterGroups.remove(groupId);
try {
interpreterGroupWriteLock.lock();
this.interpreterGroups.remove(groupId);
} finally {
interpreterGroupWriteLock.unlock();
}
}
public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
@ -425,7 +452,6 @@ public class InterpreterSetting {
return interpreterGroups.get(groupId);
}
@VisibleForTesting
public ArrayList<ManagedInterpreterGroup> getAllInterpreterGroups() {
try {
interpreterGroupReadLock.lock();
@ -668,16 +694,19 @@ public class InterpreterSetting {
return interpreters;
}
synchronized RemoteInterpreterProcess createInterpreterProcess(Properties properties)
synchronized RemoteInterpreterProcess createInterpreterProcess(String interpreterGroupId,
Properties properties)
throws IOException {
if (launcher == null) {
createLauncher();
}
InterpreterLaunchContext launchContext = new
InterpreterLaunchContext(properties, option, interpreterRunner, id, group, name);
InterpreterLaunchContext(properties, option, interpreterRunner,
interpreterGroupId, id, group, name);
RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
process.setRemoteInterpreterEventPoller(
new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));
recoveryStorage.onInterpreterClientStart(process);
return process;
}

View file

@ -34,6 +34,9 @@ import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.AngularObjectRegistryListener;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
@ -118,6 +121,7 @@ public class InterpreterSettingManager {
private ApplicationEventListener appEventListener;
private DependencyResolver dependencyResolver;
private LifecycleManager lifecycleManager;
private RecoveryStorage recoveryStorage;
public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
AngularObjectRegistryListener angularObjectRegistryListener,
@ -154,6 +158,11 @@ public class InterpreterSettingManager {
this.angularObjectRegistryListener = angularObjectRegistryListener;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.appEventListener = appEventListener;
if (conf.isRecoveryEnabled()) {
this.recoveryStorage = new FileSystemRecoveryStorage(conf, this);
} else {
this.recoveryStorage = new NullRecoveryStorage();
}
try {
this.lifecycleManager = (LifecycleManager)
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
@ -174,6 +183,7 @@ public class InterpreterSettingManager {
.setAppEventListener(appEventListener)
.setDependencyResolver(dependencyResolver)
.setLifecycleManager(lifecycleManager)
.setRecoveryStorage(recoveryStorage)
.postProcessing();
}
@ -507,6 +517,10 @@ public class InterpreterSettingManager {
return resourceSet;
}
public RecoveryStorage getRecoveryStorage() {
return recoveryStorage;
}
public void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) {
ResourceSet resourceSet = new ResourceSet();

View file

@ -55,15 +55,31 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
return interpreterSetting;
}
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(Properties properties)
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName,
Properties properties)
throws IOException {
if (remoteInterpreterProcess == null) {
LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId());
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(properties);
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, properties);
synchronized (remoteInterpreterProcess) {
if (!remoteInterpreterProcess.isRunning()) {
remoteInterpreterProcess.start(userName, false);
remoteInterpreterProcess.getRemoteInterpreterEventPoller()
.setInterpreterProcess(remoteInterpreterProcess);
remoteInterpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(this);
remoteInterpreterProcess.getRemoteInterpreterEventPoller().start();
getInterpreterSetting().getRecoveryStorage()
.onInterpreterClientStart(remoteInterpreterProcess);
}
}
}
return remoteInterpreterProcess;
}
public RemoteInterpreterProcess getInterpreterProcess() {
return remoteInterpreterProcess;
}
public RemoteInterpreterProcess getRemoteInterpreterProcess() {
return remoteInterpreterProcess;
}
@ -94,6 +110,11 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
if (remoteInterpreterProcess != null) {
LOGGER.info("Kill RemoteInterpreterProcess");
remoteInterpreterProcess.stop();
try {
interpreterSetting.getRecoveryStorage().onInterpreterClientStop(remoteInterpreterProcess);
} catch (IOException e) {
LOGGER.error("Fail to store recovery data", e);
}
remoteInterpreterProcess = null;
}
}

View file

@ -21,50 +21,67 @@ package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterRunner;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Interpreter Launcher which use shell script to launch the interpreter process.
*
*/
public class ShellScriptLauncher extends InterpreterLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class);
public ShellScriptLauncher(ZeppelinConfiguration zConf) {
super(zConf);
public ShellScriptLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
super(zConf, recoveryStorage);
}
@Override
public InterpreterClient launch(InterpreterLaunchContext context) {
public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
this.properties = context.getProperties();
InterpreterOption option = context.getOption();
InterpreterRunner runner = context.getRunner();
String groupName = context.getInterpreterSettingGroup();
String name = context.getInterpreterSettingName();
int connectTimeout =
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
if (option.isExistingProcess()) {
return new RemoteInterpreterRunningProcess(
context.getInterpreterSettingName(),
connectTimeout,
option.getHost(),
option.getPort());
} else {
// try to recover it first
if (zConf.isRecoveryEnabled()) {
Map<String, InterpreterClient> clients = recoveryStorage.restore();
if (clients.containsKey(context.getInterpreterGroupId())) {
InterpreterClient client = clients.get(context.getInterpreterGroupId());
if (client.isRunning()) {
LOGGER.info("Recover InterpreterProcess: " + client.getHost() + ":" + client.getPort());
return (RemoteInterpreterRunningProcess) client;
} else {
LOGGER.warn("Can not recovery interpreter process: " + client.getHost() + ":"
+ client.getPort() + ", as it is already terminated.");
}
}
}
// create new remote process
String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+ context.getInterpreterSettingId();
return new RemoteInterpreterManagedProcess(
runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
buildEnvFromProperties(), connectTimeout, name);
}

View file

@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -35,8 +36,8 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
public SparkInterpreterLauncher(ZeppelinConfiguration zConf) {
super(zConf);
public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
super(zConf, recoveryStorage);
}
@Override

View file

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.recovery;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
import org.apache.zeppelin.notebook.FileSystemStorage;
import org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Hadoop compatible FileSystem based RecoveryStorage implementation.
*
* Save InterpreterProcess in the format of:
* InterpreterGroupId host:port
*/
public class FileSystemRecoveryStorage extends RecoveryStorage {
private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemRecoveryStorage.class);
private InterpreterSettingManager interpreterSettingManager;
private ZeppelinConfiguration zConf;
private FileSystemStorage fs;
private Path recoveryDir;
public FileSystemRecoveryStorage(ZeppelinConfiguration zConf,
InterpreterSettingManager interpreterSettingManager)
throws IOException {
this.interpreterSettingManager = interpreterSettingManager;
this.zConf = zConf;
this.fs = FileSystemStorage.get(zConf);
this.recoveryDir = this.fs.makeQualified(new Path(zConf.getNotebookDir()));
LOGGER.info("Using folder {} to store recovery data", recoveryDir);
this.fs.tryMkDir(recoveryDir);
}
@Override
public void onInterpreterClientStart(InterpreterClient client) throws IOException {
save(client.getInterpreterSettingName());
}
@Override
public void onInterpreterClientStop(InterpreterClient client) throws IOException {
save(client.getInterpreterSettingName());
}
private void save(String interpreterSettingName) throws IOException {
InterpreterSetting interpreterSetting =
interpreterSettingManager.getInterpreterSettingByName(interpreterSettingName);
List<String> recoveryContent = new ArrayList<>();
for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) {
RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess();
if (interpreterProcess != null) {
recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" +
interpreterProcess.getPort());
}
}
LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName);
LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, "\n"));
Path recoveryFile = new Path(recoveryDir, interpreterSettingName + ".recovery");
fs.writeFile(StringUtils.join(recoveryContent, "\n"), recoveryFile, true);
}
@Override
public Map<String, InterpreterClient> restore() throws IOException {
Map<String, InterpreterClient> clients = new HashMap<>();
List<Path> paths = fs.list(new Path(recoveryDir + "/*.recovery"));
for (Path path : paths) {
String fileName = path.getName();
String interpreterSettingName = fileName.substring(0,
fileName.length() - ".recovery".length());
String recoveryContent = fs.readFile(path);
if (!StringUtils.isBlank(recoveryContent)) {
for (String line : recoveryContent.split("\n")) {
String[] tokens = line.split("\t");
String groupId = tokens[0];
String[] hostPort = tokens[1].split(":");
int connectTimeout =
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
InterpreterClient client = new RemoteInterpreterRunningProcess(interpreterSettingName,
connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
clients.put(groupId, client);
}
}
}
return clients;
}
}

View file

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.recovery;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import java.io.IOException;
import java.util.Map;
/**
* RecoveryStorage that do nothing, used when recovery is not enabled.
*
*/
public class NullRecoveryStorage extends RecoveryStorage {
@Override
public void onInterpreterClientStart(InterpreterClient client) throws IOException {
}
@Override
public void onInterpreterClientStop(InterpreterClient client) throws IOException {
}
@Override
public Map<String, InterpreterClient> restore() throws IOException {
return null;
}
}

View file

@ -102,16 +102,7 @@ public class RemoteInterpreter extends Interpreter {
return this.interpreterProcess;
}
ManagedInterpreterGroup intpGroup = getInterpreterGroup();
this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(properties);
synchronized (interpreterProcess) {
if (!interpreterProcess.isRunning()) {
interpreterProcess.start(this.getUserName(), false);
interpreterProcess.getRemoteInterpreterEventPoller()
.setInterpreterProcess(interpreterProcess);
interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup);
interpreterProcess.getRemoteInterpreterEventPoller().start();
}
}
this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(getUserName(), properties);
return interpreterProcess;
}

View file

@ -263,7 +263,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
return interpreterDir;
}
@VisibleForTesting
public String getInterpreterSettingName() {
return interpreterSettingName;
}

View file

@ -51,12 +51,6 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
this.remoteInterpreterEventPoller = eventPoller;
}
public abstract String getHost();
public abstract int getPort();
public abstract void start(String userName, Boolean isUserImpersonate);
public abstract void stop();
public abstract boolean isRunning();
public int getConnectTimeout() {
return connectTimeout;
}

View file

@ -27,13 +27,16 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
private final String host;
private final int port;
private final String interpreterSettingName;
public RemoteInterpreterRunningProcess(
String interpreterSettingName,
int connectTimeout,
String host,
int port
) {
super(connectTimeout);
this.interpreterSettingName = interpreterSettingName;
this.host = host;
this.port = port;
}
@ -48,6 +51,11 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
return port;
}
@Override
public String getInterpreterSettingName() {
return interpreterSettingName;
}
@Override
public void start(String userName, Boolean isUserImpersonate) {
// assume process is externally managed. nothing to do

View file

@ -0,0 +1,166 @@
package org.apache.zeppelin.notebook;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
/**
* Hadoop FileSystem wrapper. Support both secure and no-secure mode
*/
public class FileSystemStorage {
private static Logger LOGGER = LoggerFactory.getLogger(FileSystemStorage.class);
private static FileSystemStorage instance;
private ZeppelinConfiguration zConf;
private Configuration hadoopConf;
private boolean isSecurityEnabled = false;
private FileSystem fs;
private FileSystemStorage(ZeppelinConfiguration zConf) throws IOException {
this.zConf = zConf;
this.hadoopConf = new Configuration();
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
if (isSecurityEnabled) {
String keytab = zConf.getString(
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
String principal = zConf.getString(
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
throw new IOException("keytab and principal can not be empty, keytab: " + keytab
+ ", principal: " + principal);
}
UserGroupInformation.loginUserFromKeytab(principal, keytab);
}
try {
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new Configuration());
LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName());
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
public static synchronized FileSystemStorage get(ZeppelinConfiguration zConf) throws IOException {
if (instance == null) {
instance = new FileSystemStorage(zConf);
}
return instance;
}
public Path makeQualified(Path path) {
return fs.makeQualified(path);
}
public void tryMkDir(final Path dir) throws IOException {
callHdfsOperation(new HdfsOperation<Void>() {
@Override
public Void call() throws IOException {
if (!fs.exists(dir)) {
fs.mkdirs(dir);
LOGGER.info("Create dir {} in hdfs", dir.toString());
}
if (fs.isFile(dir)) {
throw new IOException("{} is file instead of directory, please remove it or " +
"specify another directory");
}
fs.mkdirs(dir);
return null;
}
});
}
public List<Path> list(final Path path) throws IOException {
return callHdfsOperation(new HdfsOperation<List<Path>>() {
@Override
public List<Path> call() throws IOException {
List<Path> paths = new ArrayList<>();
for (FileStatus status : fs.globStatus(path)) {
paths.add(status.getPath());
}
return paths;
}
});
}
public boolean delete(final Path path) throws IOException {
return callHdfsOperation(new HdfsOperation<Boolean>() {
@Override
public Boolean call() throws IOException {
return fs.delete(path, true);
}
});
}
public String readFile(final Path file) throws IOException {
return callHdfsOperation(new HdfsOperation<String>() {
@Override
public String call() throws IOException {
LOGGER.debug("Read from file: " + file);
ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
IOUtils.copyBytes(fs.open(file), noteBytes, hadoopConf);
return new String(noteBytes.toString(
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
}
});
}
public void writeFile(final String content, final Path file, boolean writeTempFileFirst)
throws IOException {
callHdfsOperation(new HdfsOperation<Void>() {
@Override
public Void call() throws IOException {
InputStream in = new ByteArrayInputStream(content.getBytes(
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
Path tmpFile = new Path(file.toString() + ".tmp");
IOUtils.copyBytes(in, fs.create(tmpFile), hadoopConf);
fs.delete(file, true);
fs.rename(tmpFile, file);
return null;
}
});
}
private interface HdfsOperation<T> {
T call() throws IOException;
}
public synchronized <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
if (isSecurityEnabled) {
UserGroupInformation.getLoginUser().reloginFromKeytab();
try {
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
return func.call();
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
} else {
return func.call();
}
}
}

View file

@ -8,6 +8,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.FileSystemStorage;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.user.AuthenticationInfo;
@ -37,108 +38,45 @@ import java.util.Map;
public class FileSystemNotebookRepo implements NotebookRepo {
private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemNotebookRepo.class);
private Configuration hadoopConf;
private ZeppelinConfiguration zConf;
private boolean isSecurityEnabled = false;
private FileSystem fs;
private FileSystemStorage fs;
private Path notebookDir;
public FileSystemNotebookRepo(ZeppelinConfiguration zConf) throws IOException {
this.zConf = zConf;
this.hadoopConf = new Configuration();
this.fs = FileSystemStorage.get(zConf);
this.notebookDir = this.fs.makeQualified(new Path(zConf.getNotebookDir()));
LOGGER.info("Using folder {} to store notebook", notebookDir);
this.fs.tryMkDir(notebookDir);
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
if (isSecurityEnabled) {
String keytab = zConf.getString(
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
String principal = zConf.getString(
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
throw new IOException("keytab and principal can not be empty, keytab: " + keytab
+ ", principal: " + principal);
}
UserGroupInformation.loginUserFromKeytab(principal, keytab);
}
try {
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new Configuration());
LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName());
this.notebookDir = fs.makeQualified(new Path(zConf.getNotebookDir()));
LOGGER.info("Using folder {} to store notebook", notebookDir);
} catch (URISyntaxException e) {
throw new IOException(e);
}
if (!fs.exists(notebookDir)) {
fs.mkdirs(notebookDir);
LOGGER.info("Create notebook dir {} in hdfs", notebookDir.toString());
}
if (fs.isFile(notebookDir)) {
throw new IOException("notebookDir {} is file instead of directory, please remove it or " +
"specify another directory");
}
}
@Override
public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
return callHdfsOperation(new HdfsOperation<List<NoteInfo>>() {
@Override
public List<NoteInfo> call() throws IOException {
List<NoteInfo> noteInfos = new ArrayList<>();
for (FileStatus status : fs.globStatus(new Path(notebookDir, "*/note.json"))) {
NoteInfo noteInfo = new NoteInfo(status.getPath().getParent().getName(), "", null);
noteInfos.add(noteInfo);
}
return noteInfos;
}
});
List<Path> notePaths = fs.list(new Path(notebookDir, "*/note.json"));
List<NoteInfo> noteInfos = new ArrayList<>();
for (Path path : notePaths) {
NoteInfo noteInfo = new NoteInfo(path.getParent().getName(), "", null);
noteInfos.add(noteInfo);
}
return noteInfos;
}
@Override
public Note get(final String noteId, AuthenticationInfo subject) throws IOException {
return callHdfsOperation(new HdfsOperation<Note>() {
@Override
public Note call() throws IOException {
Path notePath = new Path(notebookDir.toString() + "/" + noteId + "/note.json");
LOGGER.debug("Read note from file: " + notePath);
ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
IOUtils.copyBytes(fs.open(notePath), noteBytes, hadoopConf);
return Note.fromJson(new String(noteBytes.toString(
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING))));
}
});
String content = this.fs.readFile(
new Path(notebookDir.toString() + "/" + noteId + "/note.json"));
return Note.fromJson(content);
}
@Override
public void save(final Note note, AuthenticationInfo subject) throws IOException {
callHdfsOperation(new HdfsOperation<Void>() {
@Override
public Void call() throws IOException {
Path notePath = new Path(notebookDir.toString() + "/" + note.getId() + "/note.json");
Path tmpNotePath = new Path(notebookDir.toString() + "/" + note.getId() + "/.note.json");
LOGGER.debug("Saving note to file: " + notePath);
if (fs.exists(tmpNotePath)) {
fs.delete(tmpNotePath, true);
}
InputStream in = new ByteArrayInputStream(note.toJson().getBytes(
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
IOUtils.copyBytes(in, fs.create(tmpNotePath), hadoopConf);
fs.delete(notePath, true);
fs.rename(tmpNotePath, notePath);
return null;
}
});
this.fs.writeFile(note.toJson(),
new Path(notebookDir.toString() + "/" + note.getId() + "/note.json"),
true);
}
@Override
public void remove(final String noteId, AuthenticationInfo subject) throws IOException {
callHdfsOperation(new HdfsOperation<Void>() {
@Override
public Void call() throws IOException {
Path noteFolder = new Path(notebookDir.toString() + "/" + noteId);
fs.delete(noteFolder, true);
return null;
}
});
this.fs.delete(new Path(notebookDir.toString() + "/" + noteId));
}
@Override
@ -182,26 +120,4 @@ public class FileSystemNotebookRepo implements NotebookRepo {
public void updateSettings(Map<String, String> settings, AuthenticationInfo subject) {
LOGGER.warn("updateSettings is not implemented for HdfsNotebookRepo");
}
private interface HdfsOperation<T> {
T call() throws IOException;
}
public synchronized <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
if (isSecurityEnabled) {
UserGroupInformation.getLoginUser().reloginFromKeytab();
try {
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<T>() {
@Override
public T run() throws Exception {
return func.call();
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
} else {
return func.call();
}
}
}

View file

@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.junit.Test;
import java.io.IOException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@ -30,14 +31,14 @@ import static org.junit.Assert.assertTrue;
public class ShellScriptLauncherTest {
@Test
public void testLauncher() {
public void testLauncher() throws IOException {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
ShellScriptLauncher launcher = new ShellScriptLauncher(zConf);
ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("ENV_1", "VALUE_1");
properties.setProperty("property_1", "value_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "groupName", "name");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "groupName", "name");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;

View file

@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.junit.Test;
import java.io.IOException;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
@ -30,9 +31,9 @@ import static org.junit.Assert.assertTrue;
public class SparkInterpreterLauncherTest {
@Test
public void testLocalMode() {
public void testLocalMode() throws IOException {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
@ -41,7 +42,7 @@ public class SparkInterpreterLauncherTest {
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
@ -55,9 +56,9 @@ public class SparkInterpreterLauncherTest {
}
@Test
public void testYarnClientMode_1() {
public void testYarnClientMode_1() throws IOException {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
@ -66,7 +67,7 @@ public class SparkInterpreterLauncherTest {
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
@ -80,9 +81,9 @@ public class SparkInterpreterLauncherTest {
}
@Test
public void testYarnClientMode_2() {
public void testYarnClientMode_2() throws IOException {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
@ -92,7 +93,7 @@ public class SparkInterpreterLauncherTest {
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
@ -106,9 +107,9 @@ public class SparkInterpreterLauncherTest {
}
@Test
public void testYarnClusterMode_1() {
public void testYarnClusterMode_1() throws IOException {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
@ -117,7 +118,7 @@ public class SparkInterpreterLauncherTest {
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
@ -132,9 +133,9 @@ public class SparkInterpreterLauncherTest {
}
@Test
public void testYarnClusterMode_2() {
public void testYarnClusterMode_2() throws IOException {
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
Properties properties = new Properties();
properties.setProperty("SPARK_HOME", "/user/spark");
properties.setProperty("property_1", "value_1");
@ -144,7 +145,7 @@ public class SparkInterpreterLauncherTest {
properties.setProperty("spark.jars", "jar_1");
InterpreterOption option = new InterpreterOption();
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
InterpreterClient client = launcher.launch(context);
assertTrue( client instanceof RemoteInterpreterManagedProcess);
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;

View file

@ -0,0 +1,78 @@
package org.apache.zeppelin.interpreter.recovery;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import static org.junit.Assert.assertEquals;
public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
@Before
public void setUp() throws Exception {
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_ENABLED.getVarName(), "true");
super.setUp();
}
@Test
public void testSingleInterpreterProcess() throws InterpreterException, IOException {
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
remoteInterpreter1.interpret("hello", context1);
assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
interpreterSetting.close();
assertEquals(0, interpreterSettingManager.getRecoveryStorage().restore().size());
}
@Test
public void testMultipleInterpreterProcess() throws InterpreterException, IOException {
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
interpreterSetting.getOption().setPerUser(InterpreterOption.ISOLATED);
Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
remoteInterpreter1.interpret("hello", context1);
assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note2");
RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
InterpreterContext context2 = new InterpreterContext("noteId", "paragraphId", "repl",
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
remoteInterpreter2.interpret("hello", context2);
assertEquals(2, interpreterSettingManager.getRecoveryStorage().restore().size());
interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1");
assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
interpreterSetting.close();
assertEquals(0, interpreterSettingManager.getRecoveryStorage().restore().size());
}
}