mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-1430. Display appId and webui link in LivyInterpreter's output
This commit is contained in:
parent
1da262829a
commit
a087a1d74d
3 changed files with 65 additions and 27 deletions
|
|
@ -20,6 +20,7 @@ package org.apache.zeppelin.livy;
|
|||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.lang3.StringEscapeUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
|
|
@ -133,21 +134,24 @@ public class LivyHelper {
|
|||
public InterpreterResult interpretInput(String stringLines,
|
||||
final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap,
|
||||
LivyOutputStream out) {
|
||||
LivyOutputStream out,
|
||||
String appId,
|
||||
String webUI) {
|
||||
try {
|
||||
out.setInterpreterOutput(context.out);
|
||||
context.out.clear();
|
||||
out.write("%angular ");
|
||||
String incomplete = "";
|
||||
boolean inComment = false;
|
||||
out.write("<pre><code>");
|
||||
|
||||
String[] lines = stringLines.split("\n");
|
||||
String[] linesToRun = new String[lines.length + 1];
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
linesToRun[i] = lines[i];
|
||||
}
|
||||
linesToRun[lines.length] = "print(\"\")";
|
||||
|
||||
out.setInterpreterOutput(context.out);
|
||||
context.out.clear();
|
||||
Code r = null;
|
||||
String incomplete = "";
|
||||
boolean inComment = false;
|
||||
|
||||
for (int l = 0; l < linesToRun.length; l++) {
|
||||
String s = linesToRun[l];
|
||||
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
|
||||
|
|
@ -196,11 +200,15 @@ public class LivyHelper {
|
|||
} else if (r == Code.INCOMPLETE) {
|
||||
incomplete += s + "\n";
|
||||
} else {
|
||||
out.write((res.message() + "\n").getBytes(Charset.forName("UTF-8")));
|
||||
out.write((res.message() + "\n"));
|
||||
incomplete = "";
|
||||
}
|
||||
}
|
||||
|
||||
out.write("</code></pre>");
|
||||
out.write("<hr/>");
|
||||
out.write("Spark Application Id:" + appId + "<br/>");
|
||||
out.write("Spark WebUI: <a href=" + webUI + ">" + webUI + "</a>");
|
||||
if (r == Code.INCOMPLETE) {
|
||||
out.setInterpreterOutput(null);
|
||||
return new InterpreterResult(r, "Incomplete expression");
|
||||
|
|
@ -208,7 +216,6 @@ public class LivyHelper {
|
|||
out.setInterpreterOutput(null);
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("error in interpretInput", e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
|
|
@ -219,16 +226,6 @@ public class LivyHelper {
|
|||
final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap)
|
||||
throws Exception {
|
||||
stringLines = stringLines
|
||||
//for "\n" present in string
|
||||
.replaceAll("\\\\n", "\\\\\\\\n")
|
||||
//for new line present in string
|
||||
.replaceAll("\\n", "\\\\n")
|
||||
// for \" present in string
|
||||
.replaceAll("\\\\\"", "\\\\\\\\\"")
|
||||
// for " present in string
|
||||
.replaceAll("\"", "\\\\\"");
|
||||
|
||||
if (stringLines.trim().equals("")) {
|
||||
return new InterpreterResult(Code.SUCCESS, "");
|
||||
}
|
||||
|
|
@ -295,7 +292,7 @@ public class LivyHelper {
|
|||
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
|
||||
+ "/statements",
|
||||
"POST",
|
||||
"{\"code\": \"" + lines + "\" }",
|
||||
"{\"code\": \"" + StringEscapeUtils.escapeJson(lines) + "\"}",
|
||||
context.getParagraphId());
|
||||
if (json.matches("^(\")?Session (\'[0-9]\' )?not found(.?\"?)$")) {
|
||||
throw new Exception("Exception: Session not found, Livy server would have restarted, " +
|
||||
|
|
@ -340,6 +337,7 @@ public class LivyHelper {
|
|||
|
||||
protected String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
|
||||
throws Exception {
|
||||
LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
|
||||
RestTemplate restTemplate = getRestTemplate();
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.add("Content-Type", "application/json");
|
||||
|
|
|
|||
|
|
@ -52,6 +52,10 @@ public class LivyOutputStream extends OutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
public void write(String text) throws IOException {
|
||||
write(text.getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int offset, int len) throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
|
|
|
|||
|
|
@ -38,11 +38,16 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
private LivyOutputStream out;
|
||||
|
||||
protected static Map<String, Integer> userSessionMap;
|
||||
protected static Map<Integer, String> sessionId2AppIdMap;
|
||||
protected static Map<Integer, String> sessionId2WebUIMap;
|
||||
|
||||
private LivyHelper livyHelper;
|
||||
|
||||
public LivySparkInterpreter(Properties property) {
|
||||
super(property);
|
||||
userSessionMap = new HashMap<>();
|
||||
sessionId2AppIdMap = new HashMap<>();
|
||||
sessionId2WebUIMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper(property);
|
||||
out = new LivyOutputStream();
|
||||
}
|
||||
|
|
@ -67,24 +72,38 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
Integer sessionId = null;
|
||||
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
|
||||
try {
|
||||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext,
|
||||
"spark")
|
||||
);
|
||||
sessionId = livyHelper.createSession(interpreterContext, "spark");
|
||||
userSessionMap.put(interpreterContext.getAuthenticationInfo().getUser(), sessionId);
|
||||
String appId = extractStatementResult(
|
||||
livyHelper.interpret("sc.applicationId", interpreterContext, userSessionMap)
|
||||
.message());
|
||||
livyHelper.interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
|
||||
interpreterContext, userSessionMap);
|
||||
String webUI = extractStatementResult(
|
||||
livyHelper.interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
|
||||
interpreterContext, userSessionMap).message());
|
||||
sessionId2AppIdMap.put(sessionId, appId);
|
||||
sessionId2WebUIMap.put(sessionId, webUI);
|
||||
LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
|
||||
sessionId, appId, webUI);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
} else {
|
||||
sessionId = userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser());
|
||||
}
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out);
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out,
|
||||
sessionId2AppIdMap.get(sessionId), sessionId2WebUIMap.get(sessionId));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
|
|
@ -92,6 +111,23 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
|
||||
* from following:
|
||||
* res0: String = application_1473129941656_0048
|
||||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private static String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("=")) >= 0) {
|
||||
return result.substring(pos + 1).trim();
|
||||
} else {
|
||||
throw new RuntimeException("No result can be extracted from '" + result + "', " +
|
||||
"something must be wrong");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
|
|
|
|||
Loading…
Reference in a new issue