ZEPPELIN-1430. Display appId and webui link in LivyInterpreter's output

This commit is contained in:
Jeff Zhang 2016-09-13 14:07:02 +08:00
parent 1da262829a
commit a087a1d74d
3 changed files with 65 additions and 27 deletions

View file

@ -20,6 +20,7 @@ package org.apache.zeppelin.livy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
@ -133,21 +134,24 @@ public class LivyHelper {
public InterpreterResult interpretInput(String stringLines,
final InterpreterContext context,
final Map<String, Integer> userSessionMap,
LivyOutputStream out) {
LivyOutputStream out,
String appId,
String webUI) {
try {
out.setInterpreterOutput(context.out);
context.out.clear();
out.write("%angular ");
String incomplete = "";
boolean inComment = false;
out.write("<pre><code>");
String[] lines = stringLines.split("\n");
String[] linesToRun = new String[lines.length + 1];
for (int i = 0; i < lines.length; i++) {
linesToRun[i] = lines[i];
}
linesToRun[lines.length] = "print(\"\")";
out.setInterpreterOutput(context.out);
context.out.clear();
Code r = null;
String incomplete = "";
boolean inComment = false;
for (int l = 0; l < linesToRun.length; l++) {
String s = linesToRun[l];
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
@ -196,11 +200,15 @@ public class LivyHelper {
} else if (r == Code.INCOMPLETE) {
incomplete += s + "\n";
} else {
out.write((res.message() + "\n").getBytes(Charset.forName("UTF-8")));
out.write((res.message() + "\n"));
incomplete = "";
}
}
out.write("</code></pre>");
out.write("<hr/>");
out.write("Spark Application Id:" + appId + "<br/>");
out.write("Spark WebUI: <a href=" + webUI + ">" + webUI + "</a>");
if (r == Code.INCOMPLETE) {
out.setInterpreterOutput(null);
return new InterpreterResult(r, "Incomplete expression");
@ -208,7 +216,6 @@ public class LivyHelper {
out.setInterpreterOutput(null);
return new InterpreterResult(Code.SUCCESS);
}
} catch (Exception e) {
LOGGER.error("error in interpretInput", e);
return new InterpreterResult(Code.ERROR, e.getMessage());
@ -219,16 +226,6 @@ public class LivyHelper {
final InterpreterContext context,
final Map<String, Integer> userSessionMap)
throws Exception {
stringLines = stringLines
//for "\n" present in string
.replaceAll("\\\\n", "\\\\\\\\n")
//for new line present in string
.replaceAll("\\n", "\\\\n")
// for \" present in string
.replaceAll("\\\\\"", "\\\\\\\\\"")
// for " present in string
.replaceAll("\"", "\\\\\"");
if (stringLines.trim().equals("")) {
return new InterpreterResult(Code.SUCCESS, "");
}
@ -295,7 +292,7 @@ public class LivyHelper {
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
+ "/statements",
"POST",
"{\"code\": \"" + lines + "\" }",
"{\"code\": \"" + StringEscapeUtils.escapeJson(lines) + "\"}",
context.getParagraphId());
if (json.matches("^(\")?Session (\'[0-9]\' )?not found(.?\"?)$")) {
throw new Exception("Exception: Session not found, Livy server would have restarted, " +
@ -340,6 +337,7 @@ public class LivyHelper {
protected String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
throws Exception {
LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
RestTemplate restTemplate = getRestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.add("Content-Type", "application/json");

View file

@ -52,6 +52,10 @@ public class LivyOutputStream extends OutputStream {
}
}
public void write(String text) throws IOException {
write(text.getBytes("UTF-8"));
}
@Override
public void write(byte[] b, int offset, int len) throws IOException {
if (interpreterOutput != null) {

View file

@ -38,11 +38,16 @@ public class LivySparkInterpreter extends Interpreter {
private LivyOutputStream out;
protected static Map<String, Integer> userSessionMap;
protected static Map<Integer, String> sessionId2AppIdMap;
protected static Map<Integer, String> sessionId2WebUIMap;
private LivyHelper livyHelper;
public LivySparkInterpreter(Properties property) {
super(property);
userSessionMap = new HashMap<>();
sessionId2AppIdMap = new HashMap<>();
sessionId2WebUIMap = new HashMap<>();
livyHelper = new LivyHelper(property);
out = new LivyOutputStream();
}
@ -67,24 +72,38 @@ public class LivySparkInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {
Integer sessionId = null;
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
try {
userSessionMap.put(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext,
"spark")
);
sessionId = livyHelper.createSession(interpreterContext, "spark");
userSessionMap.put(interpreterContext.getAuthenticationInfo().getUser(), sessionId);
String appId = extractStatementResult(
livyHelper.interpret("sc.applicationId", interpreterContext, userSessionMap)
.message());
livyHelper.interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
interpreterContext, userSessionMap);
String webUI = extractStatementResult(
livyHelper.interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
interpreterContext, userSessionMap).message());
sessionId2AppIdMap.put(sessionId, appId);
sessionId2WebUIMap.put(sessionId, webUI);
LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
sessionId, appId, webUI);
} catch (Exception e) {
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
} else {
sessionId = userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser());
}
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out);
return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out,
sessionId2AppIdMap.get(sessionId), sessionId2WebUIMap.get(sessionId));
} catch (Exception e) {
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
@ -92,6 +111,23 @@ public class LivySparkInterpreter extends Interpreter {
}
}
/**
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
* from following:
* res0: String = application_1473129941656_0048
* @param result
* @return
*/
private static String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("=")) >= 0) {
return result.substring(pos + 1).trim();
} else {
throw new RuntimeException("No result can be extracted from '" + result + "', " +
"something must be wrong");
}
}
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());