Fix appId and webui extraction for pyspark

This commit is contained in:
Jeff Zhang 2017-01-26 22:47:56 +08:00
parent 3f86bff293
commit 16a18de970
9 changed files with 123 additions and 37 deletions

View file

@ -98,20 +98,12 @@ public abstract class BaseLivyInterprereter extends Interpreter {
if (sessionInfo.appId == null) {
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
// explicitly by ourselves.
sessionInfo.appId = extractStatementResult(
interpret("sc.applicationId", null, false, false).message()
.get(0).getData());
sessionInfo.appId = extractAppId();
}
if (sessionInfo.appInfo == null ||
StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
null, false, false);
sessionInfo.webUIAddress = extractStatementResult(
interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
.message().get(0).getData());
sessionInfo.webUIAddress = extractWebUIAddress();
} else {
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
}
@ -130,6 +122,10 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
}
protected abstract String extractAppId() throws LivyException;
protected abstract String extractWebUIAddress() throws LivyException;
public SessionInfo getSessionInfo() {
return sessionInfo;
}
@ -148,25 +144,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
InterpreterUtils.getMostRelevantMessage(e));
}
}
/**
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
* from following:
* res0: String = application_1473129941656_0048
*
* @param result
* @return
*/
private String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("=")) >= 0) {
return result.substring(pos + 1).trim();
} else {
throw new RuntimeException("No result can be extracted from '" + result + "', " +
"something must be wrong");
}
}
@Override
public void cancel(InterpreterContext context) {
if (livyVersion.isCancelSupported()) {

View file

@ -33,7 +33,7 @@ import java.util.Properties;
/**
* Livy PySpark interpreter for Zeppelin.
*/
public class LivyPySpark3Interpreter extends BaseLivyInterprereter {
public class LivyPySpark3Interpreter extends LivyPySparkBaseInterpreter {
public LivyPySpark3Interpreter(Properties property) {
super(property);
@ -43,4 +43,5 @@ public class LivyPySpark3Interpreter extends BaseLivyInterprereter {
public String getSessionKind() {
return "pyspark3";
}
}

View file

@ -0,0 +1,46 @@
package org.apache.zeppelin.livy;
import java.util.Properties;
/**
* Base class for PySpark Interpreter
*/
public abstract class LivyPySparkBaseInterpreter extends BaseLivyInterprereter {
public LivyPySparkBaseInterpreter(Properties property) {
super(property);
}
@Override
protected String extractAppId() throws LivyException {
return extractStatementResult(
interpret("sc.applicationId", null, false, false).message()
.get(0).getData());
}
@Override
protected String extractWebUIAddress() throws LivyException {
return extractStatementResult(
interpret(
"sc._jsc.sc().ui().get().appUIAddress()", null, false, false)
.message().get(0).getData());
}
/**
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
* from following:
* u'application_1473129941656_0048'
*
* @param result
* @return
*/
private String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("'")) >= 0) {
return result.substring(pos + 1, result.length() - 1).trim();
} else {
throw new RuntimeException("No result can be extracted from '" + result + "', " +
"something must be wrong");
}
}
}

View file

@ -33,7 +33,7 @@ import java.util.Properties;
/**
* Livy PySpark interpreter for Zeppelin.
*/
public class LivyPySparkInterpreter extends BaseLivyInterprereter {
public class LivyPySparkInterpreter extends LivyPySparkBaseInterpreter {
public LivyPySparkInterpreter(Properties property) {
super(property);
@ -43,4 +43,6 @@ public class LivyPySparkInterpreter extends BaseLivyInterprereter {
public String getSessionKind() {
return "pyspark";
}
}

View file

@ -44,4 +44,39 @@ public class LivySparkInterpreter extends BaseLivyInterprereter {
return "spark";
}
@Override
protected String extractAppId() throws LivyException {
return extractStatementResult(
interpret("sc.applicationId", null, false, false).message()
.get(0).getData());
}
@Override
protected String extractWebUIAddress() throws LivyException {
interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
null, false, false);
return extractStatementResult(
interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
.message().get(0).getData());
}
/**
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
* from following:
* res0: String = application_1473129941656_0048
*
* @param result
* @return
*/
private String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("=")) >= 0) {
return result.substring(pos + 1).trim();
} else {
throw new RuntimeException("No result can be extracted from '" + result + "', " +
"something must be wrong");
}
}
}

View file

@ -43,4 +43,16 @@ public class LivySparkRInterpreter extends BaseLivyInterprereter {
public String getSessionKind() {
return "sparkr";
}
@Override
protected String extractAppId() throws LivyException {
//TODO(zjffdu) depends on SparkR
return null;
}
@Override
protected String extractWebUIAddress() throws LivyException {
//TODO(zjffdu) depends on SparkR
return null;
}
}

View file

@ -218,4 +218,16 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
public void close() {
this.sparkInterpreter.close();
}
@Override
protected String extractAppId() throws LivyException {
// it wont' be called because it would delegate to LivySparkInterpreter
throw new UnsupportedOperationException();
}
@Override
protected String extractWebUIAddress() throws LivyException {
// it wont' be called because it would delegate to LivySparkInterpreter
throw new UnsupportedOperationException();
}
}

View file

@ -75,7 +75,7 @@ public class LivyInterpreterIT {
return true;
}
// @Test
@Test
public void testSparkInterpreterRDD() {
if (!checkPreCondition()) {
return;
@ -157,7 +157,7 @@ public class LivyInterpreterIT {
}
}
// @Test
@Test
public void testSparkInterpreterDataFrame() {
if (!checkPreCondition()) {
return;
@ -225,7 +225,7 @@ public class LivyInterpreterIT {
}
}
// @Test
@Test
public void testSparkSQLInterpreter() {
if (!checkPreCondition()) {
return;
@ -311,7 +311,7 @@ public class LivyInterpreterIT {
}
}
// @Test
@Test
public void testSparkInterpreterWithDisplayAppInfo() {
if (!checkPreCondition()) {
return;
@ -351,7 +351,7 @@ public class LivyInterpreterIT {
// TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
}
// @Test
@Test
public void testLivyTutorialNote() throws IOException {
if (!checkPreCondition()) {
return;

View file

@ -42,7 +42,7 @@ export default class TableData {
continue;
}
if (textRow === '<!--TABLE_COMMENT-->') {
if (textRow === '') {
if (rows.length > 0) {
commentRow = true;
}