mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Fix appId and webui extraction for pyspark
This commit is contained in:
parent
3f86bff293
commit
16a18de970
9 changed files with 123 additions and 37 deletions
|
|
@ -98,20 +98,12 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
if (sessionInfo.appId == null) {
|
||||
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
|
||||
// explicitly by ourselves.
|
||||
sessionInfo.appId = extractStatementResult(
|
||||
interpret("sc.applicationId", null, false, false).message()
|
||||
.get(0).getData());
|
||||
sessionInfo.appId = extractAppId();
|
||||
}
|
||||
|
||||
if (sessionInfo.appInfo == null ||
|
||||
StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
|
||||
interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
|
||||
null, false, false);
|
||||
sessionInfo.webUIAddress = extractStatementResult(
|
||||
interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
|
||||
.message().get(0).getData());
|
||||
sessionInfo.webUIAddress = extractWebUIAddress();
|
||||
} else {
|
||||
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
|
||||
}
|
||||
|
|
@ -130,6 +122,10 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
protected abstract String extractAppId() throws LivyException;
|
||||
|
||||
protected abstract String extractWebUIAddress() throws LivyException;
|
||||
|
||||
public SessionInfo getSessionInfo() {
|
||||
return sessionInfo;
|
||||
}
|
||||
|
|
@ -148,25 +144,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
|
||||
* from following:
|
||||
* res0: String = application_1473129941656_0048
|
||||
*
|
||||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("=")) >= 0) {
|
||||
return result.substring(pos + 1).trim();
|
||||
} else {
|
||||
throw new RuntimeException("No result can be extracted from '" + result + "', " +
|
||||
"something must be wrong");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
if (livyVersion.isCancelSupported()) {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ import java.util.Properties;
|
|||
/**
|
||||
* Livy PySpark interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivyPySpark3Interpreter extends BaseLivyInterprereter {
|
||||
public class LivyPySpark3Interpreter extends LivyPySparkBaseInterpreter {
|
||||
|
||||
public LivyPySpark3Interpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -43,4 +43,5 @@ public class LivyPySpark3Interpreter extends BaseLivyInterprereter {
|
|||
public String getSessionKind() {
|
||||
return "pyspark3";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,46 @@
|
|||
package org.apache.zeppelin.livy;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Base class for PySpark Interpreter
|
||||
*/
|
||||
public abstract class LivyPySparkBaseInterpreter extends BaseLivyInterprereter {
|
||||
|
||||
public LivyPySparkBaseInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractAppId() throws LivyException {
|
||||
return extractStatementResult(
|
||||
interpret("sc.applicationId", null, false, false).message()
|
||||
.get(0).getData());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractWebUIAddress() throws LivyException {
|
||||
return extractStatementResult(
|
||||
interpret(
|
||||
"sc._jsc.sc().ui().get().appUIAddress()", null, false, false)
|
||||
.message().get(0).getData());
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
|
||||
* from following:
|
||||
* u'application_1473129941656_0048'
|
||||
*
|
||||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("'")) >= 0) {
|
||||
return result.substring(pos + 1, result.length() - 1).trim();
|
||||
} else {
|
||||
throw new RuntimeException("No result can be extracted from '" + result + "', " +
|
||||
"something must be wrong");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -33,7 +33,7 @@ import java.util.Properties;
|
|||
/**
|
||||
* Livy PySpark interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivyPySparkInterpreter extends BaseLivyInterprereter {
|
||||
public class LivyPySparkInterpreter extends LivyPySparkBaseInterpreter {
|
||||
|
||||
public LivyPySparkInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -43,4 +43,6 @@ public class LivyPySparkInterpreter extends BaseLivyInterprereter {
|
|||
public String getSessionKind() {
|
||||
return "pyspark";
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,4 +44,39 @@ public class LivySparkInterpreter extends BaseLivyInterprereter {
|
|||
return "spark";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractAppId() throws LivyException {
|
||||
return extractStatementResult(
|
||||
interpret("sc.applicationId", null, false, false).message()
|
||||
.get(0).getData());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractWebUIAddress() throws LivyException {
|
||||
interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
|
||||
null, false, false);
|
||||
return extractStatementResult(
|
||||
interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
|
||||
.message().get(0).getData());
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
|
||||
* from following:
|
||||
* res0: String = application_1473129941656_0048
|
||||
*
|
||||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("=")) >= 0) {
|
||||
return result.substring(pos + 1).trim();
|
||||
} else {
|
||||
throw new RuntimeException("No result can be extracted from '" + result + "', " +
|
||||
"something must be wrong");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,4 +43,16 @@ public class LivySparkRInterpreter extends BaseLivyInterprereter {
|
|||
public String getSessionKind() {
|
||||
return "sparkr";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractAppId() throws LivyException {
|
||||
//TODO(zjffdu) depends on SparkR
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractWebUIAddress() throws LivyException {
|
||||
//TODO(zjffdu) depends on SparkR
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -218,4 +218,16 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
public void close() {
|
||||
this.sparkInterpreter.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractAppId() throws LivyException {
|
||||
// it wont' be called because it would delegate to LivySparkInterpreter
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String extractWebUIAddress() throws LivyException {
|
||||
// it wont' be called because it would delegate to LivySparkInterpreter
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -75,7 +75,7 @@ public class LivyInterpreterIT {
|
|||
return true;
|
||||
}
|
||||
|
||||
// @Test
|
||||
@Test
|
||||
public void testSparkInterpreterRDD() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
@ -157,7 +157,7 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
}
|
||||
|
||||
// @Test
|
||||
@Test
|
||||
public void testSparkInterpreterDataFrame() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
@ -225,7 +225,7 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
}
|
||||
|
||||
// @Test
|
||||
@Test
|
||||
public void testSparkSQLInterpreter() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
@ -311,7 +311,7 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
}
|
||||
|
||||
// @Test
|
||||
@Test
|
||||
public void testSparkInterpreterWithDisplayAppInfo() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
@ -351,7 +351,7 @@ public class LivyInterpreterIT {
|
|||
// TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
|
||||
}
|
||||
|
||||
// @Test
|
||||
@Test
|
||||
public void testLivyTutorialNote() throws IOException {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ export default class TableData {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (textRow === '<!--TABLE_COMMENT-->') {
|
||||
if (textRow === '') {
|
||||
if (rows.length > 0) {
|
||||
commentRow = true;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue