mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
address comments
This commit is contained in:
parent
da7cbb90bc
commit
02b118f4f9
7 changed files with 58 additions and 18 deletions
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -29,6 +30,13 @@ import java.util.Map;
|
|||
*/
|
||||
public abstract class RecoveryStorage {
|
||||
|
||||
protected ZeppelinConfiguration zConf;
|
||||
protected Map<String, InterpreterClient> restoredClients;
|
||||
|
||||
public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
|
||||
this.zConf = zConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update RecoveryStorage when new InterpreterClient is started
|
||||
* @param client
|
||||
|
|
@ -44,9 +52,29 @@ public abstract class RecoveryStorage {
|
|||
public abstract void onInterpreterClientStop(InterpreterClient client) throws IOException;
|
||||
|
||||
/**
|
||||
* Restore InterpreterClient when Zeppelin Server is restarted
|
||||
*
|
||||
* It is only called one time when Zeppelin Server is started.
|
||||
*
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract Map<String, InterpreterClient> restore() throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* It is called after constructor
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void init() throws IOException {
|
||||
this.restoredClients = restore();
|
||||
}
|
||||
|
||||
public InterpreterClient getInterpreterClient(String interpreterGroupId) {
|
||||
if (restoredClients.containsKey(interpreterGroupId)) {
|
||||
return restoredClients.get(interpreterGroupId);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -272,7 +272,11 @@ public class InterpreterSetting {
|
|||
this.lifecycleManager = new NullLifecycleManager(conf);
|
||||
}
|
||||
if (this.recoveryStorage == null) {
|
||||
this.recoveryStorage = new NullRecoveryStorage();
|
||||
try {
|
||||
this.recoveryStorage = new NullRecoveryStorage(conf);
|
||||
} catch (IOException e) {
|
||||
// ignore this exception as NullRecoveryStorage will do nothing.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -161,8 +161,10 @@ public class InterpreterSettingManager {
|
|||
if (conf.isRecoveryEnabled()) {
|
||||
this.recoveryStorage = new FileSystemRecoveryStorage(conf, this);
|
||||
} else {
|
||||
this.recoveryStorage = new NullRecoveryStorage();
|
||||
this.recoveryStorage = new NullRecoveryStorage(conf);
|
||||
}
|
||||
this.recoveryStorage.init();
|
||||
|
||||
try {
|
||||
this.lifecycleManager = (LifecycleManager)
|
||||
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
|
||||
|
|
|
|||
|
|
@ -63,15 +63,16 @@ public class ShellScriptLauncher extends InterpreterLauncher {
|
|||
} else {
|
||||
// try to recover it first
|
||||
if (zConf.isRecoveryEnabled()) {
|
||||
Map<String, InterpreterClient> clients = recoveryStorage.restore();
|
||||
if (clients.containsKey(context.getInterpreterGroupId())) {
|
||||
InterpreterClient client = clients.get(context.getInterpreterGroupId());
|
||||
if (client.isRunning()) {
|
||||
LOGGER.info("Recover InterpreterProcess: " + client.getHost() + ":" + client.getPort());
|
||||
return (RemoteInterpreterRunningProcess) client;
|
||||
InterpreterClient recoveredClient =
|
||||
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
|
||||
if (recoveredClient != null) {
|
||||
if (recoveredClient.isRunning()) {
|
||||
LOGGER.info("Recover InterpreterProcess: " + recoveredClient.getHost() + ":" +
|
||||
recoveredClient.getPort());
|
||||
return recoveredClient;
|
||||
} else {
|
||||
LOGGER.warn("Can not recovery interpreter process: " + client.getHost() + ":"
|
||||
+ client.getPort() + ", as it is already terminated.");
|
||||
LOGGER.warn("Cannot recovery interpreter process: " + recoveredClient.getHost() + ":"
|
||||
+ recoveredClient.getPort() + ", as it is already terminated.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,14 +59,13 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
|
|||
private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemRecoveryStorage.class);
|
||||
|
||||
private InterpreterSettingManager interpreterSettingManager;
|
||||
private ZeppelinConfiguration zConf;
|
||||
private FileSystemStorage fs;
|
||||
private Path recoveryDir;
|
||||
|
||||
public FileSystemRecoveryStorage(ZeppelinConfiguration zConf,
|
||||
InterpreterSettingManager interpreterSettingManager)
|
||||
throws IOException {
|
||||
|
||||
super(zConf);
|
||||
this.interpreterSettingManager = interpreterSettingManager;
|
||||
this.zConf = zConf;
|
||||
this.fs = FileSystemStorage.get(zConf);
|
||||
|
|
@ -97,9 +96,9 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
|
|||
}
|
||||
}
|
||||
LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName);
|
||||
LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, "\n"));
|
||||
LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, System.lineSeparator()));
|
||||
Path recoveryFile = new Path(recoveryDir, interpreterSettingName + ".recovery");
|
||||
fs.writeFile(StringUtils.join(recoveryContent, "\n"), recoveryFile, true);
|
||||
fs.writeFile(StringUtils.join(recoveryContent, System.lineSeparator()), recoveryFile, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -113,7 +112,7 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
|
|||
fileName.length() - ".recovery".length());
|
||||
String recoveryContent = fs.readFile(path);
|
||||
if (!StringUtils.isBlank(recoveryContent)) {
|
||||
for (String line : recoveryContent.split("\n")) {
|
||||
for (String line : recoveryContent.split(System.lineSeparator())) {
|
||||
String[] tokens = line.split("\t");
|
||||
String groupId = tokens[0];
|
||||
String[] hostPort = tokens[1].split(":");
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
|
||||
import java.io.IOException;
|
||||
|
|
@ -28,6 +29,11 @@ import java.util.Map;
|
|||
*
|
||||
*/
|
||||
public class NullRecoveryStorage extends RecoveryStorage {
|
||||
|
||||
public NullRecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
|
||||
super(zConf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterClientStart(InterpreterClient client) throws IOException {
|
||||
|
||||
|
|
|
|||
|
|
@ -81,8 +81,8 @@ public class FileSystemStorage {
|
|||
LOGGER.info("Create dir {} in hdfs", dir.toString());
|
||||
}
|
||||
if (fs.isFile(dir)) {
|
||||
throw new IOException("{} is file instead of directory, please remove it or " +
|
||||
"specify another directory");
|
||||
throw new IOException(dir.toString() + " is file instead of directory, please remove " +
|
||||
"it or specify another directory");
|
||||
}
|
||||
fs.mkdirs(dir);
|
||||
return null;
|
||||
|
|
|
|||
Loading…
Reference in a new issue