ZEPPELIN-1852. Use multiple InterpreterResult for displaying appInfo

This commit is contained in:
Jeff Zhang 2016-12-23 11:24:27 +08:00
parent 2fcfaa8c74
commit e1c2eb98f5
3 changed files with 53 additions and 23 deletions

View file

@ -62,8 +62,10 @@ public abstract class BaseLivyInterprereter extends Interpreter {
public BaseLivyInterprereter(Properties property) {
super(property);
this.livyURL = property.getProperty("zeppelin.livy.url");
this.displayAppInfo = Boolean.parseBoolean(
property.getProperty("zeppelin.livy.displayAppInfo", "false"));
this.sessionCreationTimeout = Integer.parseInt(
property.getProperty("zeppelin.livy.create.session.timeout", 120 + ""));
property.getProperty("zeppelin.livy.session.create_timeout", 120 + ""));
this.pullStatusInterval = Integer.parseInt(
property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + ""));
}
@ -77,7 +79,6 @@ public abstract class BaseLivyInterprereter extends Interpreter {
} catch (LivyException e) {
String msg = "Fail to create session, please check livy interpreter log and " +
"livy server log";
LOGGER.error(msg);
throw new RuntimeException(msg, e);
}
}
@ -102,10 +103,11 @@ public abstract class BaseLivyInterprereter extends Interpreter {
.get(0).getData());
}
interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
null, false, false);
if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
if (sessionInfo.appInfo == null ||
StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
null, false, false);
sessionInfo.webUIAddress = extractStatementResult(
interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
@ -215,14 +217,12 @@ public abstract class BaseLivyInterprereter extends Interpreter {
if (sessionInfo.isFinished()) {
String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
+ ", log: " + sessionInfo.log;
LOGGER.error(msg);
throw new LivyException(msg);
}
if ((System.currentTimeMillis() - start) / 1000 > sessionCreationTimeout) {
String msg = "The creation of session " + sessionInfo.id + " is timeout within "
+ sessionCreationTimeout + " seconds, appId: " + sessionInfo.appId
+ ", log: " + sessionInfo.log;
LOGGER.error(msg);
throw new LivyException(msg);
}
Thread.sleep(pullStatusInterval);
@ -361,16 +361,14 @@ public abstract class BaseLivyInterprereter extends Interpreter {
if (displayAppInfo) {
//TODO(zjffdu), use multiple InterpreterResult to display appInfo
StringBuilder outputBuilder = new StringBuilder();
outputBuilder.append("%angular ");
outputBuilder.append("<pre><code>");
outputBuilder.append(result);
outputBuilder.append("</code></pre>");
outputBuilder.append("<hr/>");
outputBuilder.append("Spark Application Id:" + sessionInfo.appId + "<br/>");
outputBuilder.append("Spark WebUI: <a href=" + sessionInfo.webUIAddress + ">"
+ sessionInfo.webUIAddress + "</a>");
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputBuilder.toString());
InterpreterResult interpreterResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
interpreterResult.add(InterpreterResult.Type.TEXT, result);
String appInfoHtml = "<hr/>Spark Application Id: " + sessionInfo.appId + "<br/>"
+ "Spark WebUI: <a href=\"" + sessionInfo.webUIAddress + "\">"
+ sessionInfo.webUIAddress + "</a>";
LOGGER.info("appInfoHtml:" + appInfoHtml);
interpreterResult.add(InterpreterResult.Type.HTML, appInfoHtml);
return interpreterResult;
} else {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, result);
}

View file

@ -11,9 +11,9 @@
"defaultValue": "http://localhost:8998",
"description": "The URL for Livy Server."
},
"zeppelin.livy.create.session.retries": {
"envName": "ZEPPELIN_LIVY_CREATE_SESSION_RETRIES",
"propertyName": "zeppelin.livy.create.session.timeout",
"zeppelin.livy.session.create_timeout": {
"envName": "ZEPPELIN_LIVY_SESSION_CREATE_TIMEOUT",
"propertyName": "zeppelin.livy.session.create_timeout",
"defaultValue": "120",
"description": "Livy Server create session timeout (seconds)."
},
@ -87,7 +87,7 @@
"defaultValue": "",
"description": "Adding extra libraries to livy interpreter"
},
"livy.spark.displayAppInfo": {
"zeppelin.livy.displayAppInfo": {
"propertyName": "zeppelin.livy.displayAppInfo",
"defaultValue": "false",
"description": "Whether display app info"

View file

@ -51,7 +51,7 @@ public class LivyInterpreterIT {
LOGGER.info("Starting livy at {}", cluster.livyEndpoint());
properties = new Properties();
properties.setProperty("zeppelin.livy.url", cluster.livyEndpoint());
properties.setProperty("zeppelin.livy.create.session.timeout", "120");
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "100");
}
@ -313,6 +313,38 @@ public class LivyInterpreterIT {
}
}
@Test
public void testSparkInterpreterWithDisplayAppInfo() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
Properties properties2 = new Properties(properties);
properties2.put("zeppelin.livy.displayAppInfo", "true");
// enable spark ui because it is disabled by livy integration test
properties2.put("livy.spark.ui.enabled", "true");
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties2);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();
try {
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(2, result.message().size());
assertTrue(result.message().get(0).getData().contains("1.5.2"));
assertTrue(result.message().get(1).getData().contains("Spark Application Id"));
} finally {
sparkInterpreter.close();
}
}
@Test
public void testSparkRInterpreter() {
if (!checkPreCondition()) {