address comments

This commit is contained in:
Jeff Zhang 2017-11-16 08:26:11 +08:00
parent da7cbb90bc
commit 02b118f4f9
7 changed files with 58 additions and 18 deletions

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter.recovery;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import java.io.IOException;
@ -29,6 +30,13 @@ import java.util.Map;
*/
public abstract class RecoveryStorage {
protected ZeppelinConfiguration zConf;
protected Map<String, InterpreterClient> restoredClients;
public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
this.zConf = zConf;
}
/**
* Update RecoveryStorage when new InterpreterClient is started
* @param client
@ -44,9 +52,29 @@ public abstract class RecoveryStorage {
public abstract void onInterpreterClientStop(InterpreterClient client) throws IOException;
/**
* Restore InterpreterClient when Zeppelin Server is restarted
*
* It is only called one time when Zeppelin Server is started.
*
* @return
* @throws IOException
*/
public abstract Map<String, InterpreterClient> restore() throws IOException;
/**
* It is called after constructor
*
* @throws IOException
*/
public void init() throws IOException {
this.restoredClients = restore();
}
public InterpreterClient getInterpreterClient(String interpreterGroupId) {
if (restoredClients.containsKey(interpreterGroupId)) {
return restoredClients.get(interpreterGroupId);
} else {
return null;
}
}
}

View file

@ -272,7 +272,11 @@ public class InterpreterSetting {
this.lifecycleManager = new NullLifecycleManager(conf);
}
if (this.recoveryStorage == null) {
this.recoveryStorage = new NullRecoveryStorage();
try {
this.recoveryStorage = new NullRecoveryStorage(conf);
} catch (IOException e) {
// ignore this exception as NullRecoveryStorage will do nothing.
}
}
}

View file

@ -161,8 +161,10 @@ public class InterpreterSettingManager {
if (conf.isRecoveryEnabled()) {
this.recoveryStorage = new FileSystemRecoveryStorage(conf, this);
} else {
this.recoveryStorage = new NullRecoveryStorage();
this.recoveryStorage = new NullRecoveryStorage(conf);
}
this.recoveryStorage.init();
try {
this.lifecycleManager = (LifecycleManager)
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)

View file

@ -63,15 +63,16 @@ public class ShellScriptLauncher extends InterpreterLauncher {
} else {
// try to recover it first
if (zConf.isRecoveryEnabled()) {
Map<String, InterpreterClient> clients = recoveryStorage.restore();
if (clients.containsKey(context.getInterpreterGroupId())) {
InterpreterClient client = clients.get(context.getInterpreterGroupId());
if (client.isRunning()) {
LOGGER.info("Recover InterpreterProcess: " + client.getHost() + ":" + client.getPort());
return (RemoteInterpreterRunningProcess) client;
InterpreterClient recoveredClient =
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
if (recoveredClient != null) {
if (recoveredClient.isRunning()) {
LOGGER.info("Recover InterpreterProcess: " + recoveredClient.getHost() + ":" +
recoveredClient.getPort());
return recoveredClient;
} else {
LOGGER.warn("Can not recovery interpreter process: " + client.getHost() + ":"
+ client.getPort() + ", as it is already terminated.");
LOGGER.warn("Cannot recovery interpreter process: " + recoveredClient.getHost() + ":"
+ recoveredClient.getPort() + ", as it is already terminated.");
}
}
}

View file

@ -59,14 +59,13 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemRecoveryStorage.class);
private InterpreterSettingManager interpreterSettingManager;
private ZeppelinConfiguration zConf;
private FileSystemStorage fs;
private Path recoveryDir;
public FileSystemRecoveryStorage(ZeppelinConfiguration zConf,
InterpreterSettingManager interpreterSettingManager)
throws IOException {
super(zConf);
this.interpreterSettingManager = interpreterSettingManager;
this.zConf = zConf;
this.fs = FileSystemStorage.get(zConf);
@ -97,9 +96,9 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
}
}
LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName);
LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, "\n"));
LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, System.lineSeparator()));
Path recoveryFile = new Path(recoveryDir, interpreterSettingName + ".recovery");
fs.writeFile(StringUtils.join(recoveryContent, "\n"), recoveryFile, true);
fs.writeFile(StringUtils.join(recoveryContent, System.lineSeparator()), recoveryFile, true);
}
@Override
@ -113,7 +112,7 @@ public class FileSystemRecoveryStorage extends RecoveryStorage {
fileName.length() - ".recovery".length());
String recoveryContent = fs.readFile(path);
if (!StringUtils.isBlank(recoveryContent)) {
for (String line : recoveryContent.split("\n")) {
for (String line : recoveryContent.split(System.lineSeparator())) {
String[] tokens = line.split("\t");
String groupId = tokens[0];
String[] hostPort = tokens[1].split(":");

View file

@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter.recovery;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import java.io.IOException;
@ -28,6 +29,11 @@ import java.util.Map;
*
*/
public class NullRecoveryStorage extends RecoveryStorage {
public NullRecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
super(zConf);
}
@Override
public void onInterpreterClientStart(InterpreterClient client) throws IOException {

View file

@ -81,8 +81,8 @@ public class FileSystemStorage {
LOGGER.info("Create dir {} in hdfs", dir.toString());
}
if (fs.isFile(dir)) {
throw new IOException("{} is file instead of directory, please remove it or " +
"specify another directory");
throw new IOException(dir.toString() + " is file instead of directory, please remove " +
"it or specify another directory");
}
fs.mkdirs(dir);
return null;