mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-1432. Support cancellation of paragraph execution
This commit is contained in:
parent
f2d51d95f3
commit
070fea05ca
4 changed files with 244 additions and 48 deletions
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
/**
|
||||
* APINotFoundException happens because we may introduce new apis in new livy version.
|
||||
*/
|
||||
public class APINotFoundException extends LivyException {
|
||||
public APINotFoundException() {
|
||||
}
|
||||
|
||||
public APINotFoundException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public APINotFoundException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public APINotFoundException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public APINotFoundException(String message, Throwable cause, boolean enableSuppression,
|
||||
boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
||||
|
|
@ -37,6 +37,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Base class for livy interpreters.
|
||||
|
|
@ -51,6 +52,11 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
private long sessionCreationTimeout;
|
||||
protected boolean displayAppInfo;
|
||||
private AtomicBoolean sessionExpired = new AtomicBoolean(false);
|
||||
private LivyVersion livyVersion;
|
||||
|
||||
// keep tracking the mapping between paragraphId and statementId, so that we can cancel the
|
||||
// statement after we execute it.
|
||||
private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMapping = new ConcurrentHashMap<>();
|
||||
|
||||
public BaseLivyInterprereter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -89,17 +95,17 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
|
||||
// explicitly by ourselves.
|
||||
sessionInfo.appId = extractStatementResult(
|
||||
interpret("sc.applicationId", false, false).message()
|
||||
interpret("sc.applicationId", null, false, false).message()
|
||||
.get(0).getData());
|
||||
}
|
||||
|
||||
interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
|
||||
false, false);
|
||||
null, false, false);
|
||||
if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
|
||||
sessionInfo.webUIAddress = extractStatementResult(
|
||||
interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false, false)
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
|
||||
.message().get(0).getData());
|
||||
} else {
|
||||
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
|
||||
|
|
@ -107,6 +113,14 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
LOGGER.info("Create livy session successfully with sessionId: {}, appId: {}, webUI: {}",
|
||||
sessionInfo.id, sessionInfo.appId, sessionInfo.webUIAddress);
|
||||
}
|
||||
// check livy version
|
||||
try {
|
||||
this.livyVersion = getLivyVersion();
|
||||
LOGGER.info("Use livy " + livyVersion);
|
||||
} catch (APINotFoundException e) {
|
||||
this.livyVersion = new LivyVersion("0.2.0");
|
||||
LOGGER.info("Use livy 0.2.0");
|
||||
}
|
||||
}
|
||||
|
||||
public SessionInfo getSessionInfo() {
|
||||
|
|
@ -120,7 +134,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
|
||||
try {
|
||||
return interpret(st, this.displayAppInfo, true);
|
||||
return interpret(st, context.getParagraphId(), this.displayAppInfo, true);
|
||||
} catch (LivyException e) {
|
||||
LOGGER.error("Fail to interpret:" + st, e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
|
|
@ -148,7 +162,18 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
//TODO(zjffdu). Use livy cancel api which is available in livy 0.3
|
||||
if (livyVersion.isCancelSupported()) {
|
||||
try {
|
||||
cancelStatement(paragraphId2StmtIdMapping.get(context.getParagraphId()));
|
||||
} catch (LivyException e) {
|
||||
LOGGER.warn("Fail to cancel statement {} for paragraph {} ",
|
||||
paragraphId2StmtIdMapping.get(context.getParagraphId()), context.getParagraphId());
|
||||
} finally {
|
||||
paragraphId2StmtIdMapping.remove(context.getParagraphId());
|
||||
}
|
||||
} else {
|
||||
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -206,44 +231,58 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
|
||||
}
|
||||
|
||||
public InterpreterResult interpret(String code, boolean displayAppInfo,
|
||||
boolean appendSessionExpired)
|
||||
throws LivyException {
|
||||
public InterpreterResult interpret(String code,
|
||||
String paragraphId,
|
||||
boolean displayAppInfo,
|
||||
boolean appendSessionExpired) throws LivyException {
|
||||
StatementInfo stmtInfo = null;
|
||||
boolean sessionExpired = false;
|
||||
try {
|
||||
stmtInfo = executeStatement(new ExecuteRequest(code));
|
||||
} catch (SessionNotFoundException e) {
|
||||
LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id);
|
||||
sessionExpired = true;
|
||||
// we don't want to create multiple sessions because it is possible to have multiple thread
|
||||
// to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
|
||||
// to check session status again in this sync block
|
||||
synchronized (this) {
|
||||
if (isSessionExpired()) {
|
||||
initLivySession();
|
||||
}
|
||||
}
|
||||
stmtInfo = executeStatement(new ExecuteRequest(code));
|
||||
}
|
||||
// pull the statement status
|
||||
while (!stmtInfo.isAvailable()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("InterruptedException when pulling statement status.", e);
|
||||
throw new LivyException(e);
|
||||
stmtInfo = executeStatement(new ExecuteRequest(code));
|
||||
} catch (SessionNotFoundException e) {
|
||||
LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id);
|
||||
sessionExpired = true;
|
||||
// we don't want to create multiple sessions because it is possible to have multiple thread
|
||||
// to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
|
||||
// to check session status again in this sync block
|
||||
synchronized (this) {
|
||||
if (isSessionExpired()) {
|
||||
initLivySession();
|
||||
}
|
||||
}
|
||||
stmtInfo = executeStatement(new ExecuteRequest(code));
|
||||
}
|
||||
if (paragraphId != null) {
|
||||
paragraphId2StmtIdMapping.put(paragraphId, stmtInfo.id);
|
||||
}
|
||||
// pull the statement status
|
||||
while (!stmtInfo.isAvailable()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error("InterruptedException when pulling statement status.", e);
|
||||
throw new LivyException(e);
|
||||
}
|
||||
stmtInfo = getStatementInfo(stmtInfo.id);
|
||||
}
|
||||
if (appendSessionExpired) {
|
||||
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
|
||||
sessionExpired);
|
||||
} else {
|
||||
return getResultFromStatementInfo(stmtInfo, displayAppInfo);
|
||||
}
|
||||
} finally {
|
||||
if (paragraphId != null) {
|
||||
paragraphId2StmtIdMapping.remove(paragraphId);
|
||||
}
|
||||
stmtInfo = getStatementInfo(stmtInfo.id);
|
||||
}
|
||||
if (appendSessionExpired) {
|
||||
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
|
||||
sessionExpired);
|
||||
} else {
|
||||
return getResultFromStatementInfo(stmtInfo, displayAppInfo);
|
||||
}
|
||||
}
|
||||
|
||||
private LivyVersion getLivyVersion() throws LivyException {
|
||||
return new LivyVersion((LivyVersionResponse.fromJson(callRestAPI("/version", "GET")).version));
|
||||
}
|
||||
|
||||
private boolean isSessionExpired() throws LivyException {
|
||||
try {
|
||||
getSessionInfo(sessionInfo.id);
|
||||
|
|
@ -270,6 +309,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo,
|
||||
boolean displayAppInfo) {
|
||||
if (stmtInfo.output.isError()) {
|
||||
|
|
@ -341,6 +381,10 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId, "GET"));
|
||||
}
|
||||
|
||||
private void cancelStatement(int statementId) throws LivyException {
|
||||
callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId + "/cancel", "POST");
|
||||
}
|
||||
|
||||
private RestTemplate getRestTemplate() {
|
||||
String keytabLocation = property.getProperty("zeppelin.livy.keytab");
|
||||
String principal = property.getProperty("zeppelin.livy.principal");
|
||||
|
|
@ -385,21 +429,20 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
LOGGER.debug("Get response, StatusCode: {}, responseBody: {}", response.getStatusCode(),
|
||||
response.getBody());
|
||||
if (response.getStatusCode().value() == 200
|
||||
|| response.getStatusCode().value() == 201
|
||||
|| response.getStatusCode().value() == 404) {
|
||||
String responseBody = response.getBody();
|
||||
if (responseBody.matches("\"Session '\\d+' not found.\"")) {
|
||||
throw new SessionNotFoundException(responseBody);
|
||||
|| response.getStatusCode().value() == 201) {
|
||||
return response.getBody();
|
||||
} else if (response.getStatusCode().value() == 404) {
|
||||
if (response.getBody().matches("Session '\\d+' not found.")) {
|
||||
throw new SessionNotFoundException(response.getBody());
|
||||
} else {
|
||||
return responseBody;
|
||||
throw new APINotFoundException("No rest api found for " + targetURL +
|
||||
", " + response.getStatusCode());
|
||||
}
|
||||
} else {
|
||||
String responseString = response.getBody();
|
||||
if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
|
||||
return responseString;
|
||||
}
|
||||
LOGGER.error(String.format("Error with %s StatusCode: %s",
|
||||
response.getStatusCode().value(), responseString));
|
||||
throw new LivyException(String.format("Error with %s StatusCode: %s",
|
||||
response.getStatusCode().value(), responseString));
|
||||
}
|
||||
|
|
@ -502,7 +545,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
|
||||
public boolean isAvailable() {
|
||||
return state.equals("available");
|
||||
return state.equals("available") || state.equals("cancelled");
|
||||
}
|
||||
|
||||
private static class StatementOutput {
|
||||
|
|
@ -543,4 +586,17 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
private static class LivyVersionResponse {
|
||||
public String url;
|
||||
public String branch;
|
||||
public String revision;
|
||||
public String version;
|
||||
public String date;
|
||||
public String user;
|
||||
|
||||
public static LivyVersionResponse fromJson(String json) {
|
||||
return gson.fromJson(json, LivyVersionResponse.class);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
|
||||
// to judge whether it is using spark2.
|
||||
try {
|
||||
InterpreterResult result = sparkInterpreter.interpret("spark", false, false);
|
||||
InterpreterResult result = sparkInterpreter.interpret("spark", null, false, false);
|
||||
if (result.code() == InterpreterResult.Code.SUCCESS &&
|
||||
result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) {
|
||||
LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}",
|
||||
|
|
@ -59,7 +59,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
isSpark2 = true;
|
||||
} else {
|
||||
// spark 1.x
|
||||
result = sparkInterpreter.interpret("sqlContext", false, false);
|
||||
result = sparkInterpreter.interpret("sqlContext", null, false, false);
|
||||
if (result.code() == InterpreterResult.Code.SUCCESS) {
|
||||
LOGGER.info("sqlContext is detected.");
|
||||
} else if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
|
|
@ -68,7 +68,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves");
|
||||
result = sparkInterpreter.interpret(
|
||||
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
|
||||
+ "import sqlContext.implicits._", false, false);
|
||||
+ "import sqlContext.implicits._", null, false, false);
|
||||
if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
throw new LivyException("Fail to create SQLContext," +
|
||||
result.message().get(0).getData());
|
||||
|
|
@ -113,7 +113,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
} else {
|
||||
sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
|
||||
}
|
||||
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo, true);
|
||||
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(),
|
||||
this.displayAppInfo, true);
|
||||
|
||||
if (result.code() == InterpreterResult.Code.SUCCESS) {
|
||||
InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS);
|
||||
|
|
|
|||
96
livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
Normal file
96
livy/src/main/java/org/apache/zeppelin/livy/LivyVersion.java
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
package org.apache.zeppelin.livy;
|
||||
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Provide reading comparing capability of livy version
|
||||
*/
|
||||
public class LivyVersion {
|
||||
private static final Logger logger = LoggerFactory.getLogger(LivyVersion.class);
|
||||
|
||||
private static final LivyVersion LIVY_0_2_0 = LivyVersion.fromVersionString("0.2.0");
|
||||
private static final LivyVersion LIVY_0_3_0 = LivyVersion.fromVersionString("0.3.0");
|
||||
|
||||
private int version;
|
||||
private String versionString;
|
||||
|
||||
LivyVersion(String versionString) {
|
||||
this.versionString = versionString;
|
||||
|
||||
try {
|
||||
int pos = versionString.indexOf('-');
|
||||
|
||||
String numberPart = versionString;
|
||||
if (pos > 0) {
|
||||
numberPart = versionString.substring(0, pos);
|
||||
}
|
||||
|
||||
String versions[] = numberPart.split("\\.");
|
||||
int major = Integer.parseInt(versions[0]);
|
||||
int minor = Integer.parseInt(versions[1]);
|
||||
int patch = Integer.parseInt(versions[2]);
|
||||
// version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602)
|
||||
version = Integer.parseInt(String.format("%d%02d%02d", major, minor, patch));
|
||||
} catch (Exception e) {
|
||||
logger.error("Can not recognize Livy version " + versionString +
|
||||
". Assume it's a future release", e);
|
||||
|
||||
// assume it is future release
|
||||
version = 99999;
|
||||
}
|
||||
}
|
||||
|
||||
public int toNumber() {
|
||||
return version;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return versionString;
|
||||
}
|
||||
|
||||
public static LivyVersion fromVersionString(String versionString) {
|
||||
return new LivyVersion(versionString);
|
||||
}
|
||||
|
||||
public boolean isCancelSupported() {
|
||||
return this.newerThanEquals(LIVY_0_3_0);
|
||||
}
|
||||
|
||||
public boolean equals(Object versionToCompare) {
|
||||
return version == ((LivyVersion) versionToCompare).version;
|
||||
}
|
||||
|
||||
public boolean newerThan(LivyVersion versionToCompare) {
|
||||
return version > versionToCompare.version;
|
||||
}
|
||||
|
||||
public boolean newerThanEquals(LivyVersion versionToCompare) {
|
||||
return version >= versionToCompare.version;
|
||||
}
|
||||
|
||||
public boolean olderThan(LivyVersion versionToCompare) {
|
||||
return version < versionToCompare.version;
|
||||
}
|
||||
|
||||
public boolean olderThanEquals(LivyVersion versionToCompare) {
|
||||
return version <= versionToCompare.version;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue