ZEPPELIN-1432. Support cancellation of paragraph execution

This commit is contained in:
Jeff Zhang 2016-12-26 10:06:01 +08:00
parent f2d51d95f3
commit 070fea05ca
4 changed files with 244 additions and 48 deletions

View file

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
/**
* APINotFoundException happens because we may introduce new apis in new livy version.
*/
public class APINotFoundException extends LivyException {
public APINotFoundException() {
}
public APINotFoundException(String message) {
super(message);
}
public APINotFoundException(String message, Throwable cause) {
super(message, cause);
}
public APINotFoundException(Throwable cause) {
super(cause);
}
public APINotFoundException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View file

@ -37,6 +37,7 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
/**
* Base class for livy interpreters.
@ -51,6 +52,11 @@ public abstract class BaseLivyInterprereter extends Interpreter {
private long sessionCreationTimeout;
protected boolean displayAppInfo;
private AtomicBoolean sessionExpired = new AtomicBoolean(false);
private LivyVersion livyVersion;
// keep tracking the mapping between paragraphId and statementId, so that we can cancel the
// statement after we execute it.
private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMapping = new ConcurrentHashMap<>();
public BaseLivyInterprereter(Properties property) {
super(property);
@ -89,17 +95,17 @@ public abstract class BaseLivyInterprereter extends Interpreter {
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
// explicitly by ourselves.
sessionInfo.appId = extractStatementResult(
interpret("sc.applicationId", false, false).message()
interpret("sc.applicationId", null, false, false).message()
.get(0).getData());
}
interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
false, false);
null, false, false);
if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
sessionInfo.webUIAddress = extractStatementResult(
interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false, false)
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", null, false, false)
.message().get(0).getData());
} else {
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
@ -107,6 +113,14 @@ public abstract class BaseLivyInterprereter extends Interpreter {
LOGGER.info("Create livy session successfully with sessionId: {}, appId: {}, webUI: {}",
sessionInfo.id, sessionInfo.appId, sessionInfo.webUIAddress);
}
// check livy version
try {
this.livyVersion = getLivyVersion();
LOGGER.info("Use livy " + livyVersion);
} catch (APINotFoundException e) {
this.livyVersion = new LivyVersion("0.2.0");
LOGGER.info("Use livy 0.2.0");
}
}
public SessionInfo getSessionInfo() {
@ -120,7 +134,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
try {
return interpret(st, this.displayAppInfo, true);
return interpret(st, context.getParagraphId(), this.displayAppInfo, true);
} catch (LivyException e) {
LOGGER.error("Fail to interpret:" + st, e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
@ -148,7 +162,18 @@ public abstract class BaseLivyInterprereter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
//TODO(zjffdu). Use livy cancel api which is available in livy 0.3
if (livyVersion.isCancelSupported()) {
try {
cancelStatement(paragraphId2StmtIdMapping.get(context.getParagraphId()));
} catch (LivyException e) {
LOGGER.warn("Fail to cancel statement {} for paragraph {} ",
paragraphId2StmtIdMapping.get(context.getParagraphId()), context.getParagraphId());
} finally {
paragraphId2StmtIdMapping.remove(context.getParagraphId());
}
} else {
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
}
}
@Override
@ -206,44 +231,58 @@ public abstract class BaseLivyInterprereter extends Interpreter {
return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
}
public InterpreterResult interpret(String code, boolean displayAppInfo,
boolean appendSessionExpired)
throws LivyException {
public InterpreterResult interpret(String code,
String paragraphId,
boolean displayAppInfo,
boolean appendSessionExpired) throws LivyException {
StatementInfo stmtInfo = null;
boolean sessionExpired = false;
try {
stmtInfo = executeStatement(new ExecuteRequest(code));
} catch (SessionNotFoundException e) {
LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id);
sessionExpired = true;
// we don't want to create multiple sessions because it is possible to have multiple thread
// to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
// to check session status again in this sync block
synchronized (this) {
if (isSessionExpired()) {
initLivySession();
}
}
stmtInfo = executeStatement(new ExecuteRequest(code));
}
// pull the statement status
while (!stmtInfo.isAvailable()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.error("InterruptedException when pulling statement status.", e);
throw new LivyException(e);
stmtInfo = executeStatement(new ExecuteRequest(code));
} catch (SessionNotFoundException e) {
LOGGER.warn("Livy session {} is expired, new session will be created.", sessionInfo.id);
sessionExpired = true;
// we don't want to create multiple sessions because it is possible to have multiple thread
// to call this method, like LivySparkSQLInterpreter which use ParallelScheduler. So we need
// to check session status again in this sync block
synchronized (this) {
if (isSessionExpired()) {
initLivySession();
}
}
stmtInfo = executeStatement(new ExecuteRequest(code));
}
if (paragraphId != null) {
paragraphId2StmtIdMapping.put(paragraphId, stmtInfo.id);
}
// pull the statement status
while (!stmtInfo.isAvailable()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOGGER.error("InterruptedException when pulling statement status.", e);
throw new LivyException(e);
}
stmtInfo = getStatementInfo(stmtInfo.id);
}
if (appendSessionExpired) {
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
sessionExpired);
} else {
return getResultFromStatementInfo(stmtInfo, displayAppInfo);
}
} finally {
if (paragraphId != null) {
paragraphId2StmtIdMapping.remove(paragraphId);
}
stmtInfo = getStatementInfo(stmtInfo.id);
}
if (appendSessionExpired) {
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
sessionExpired);
} else {
return getResultFromStatementInfo(stmtInfo, displayAppInfo);
}
}
private LivyVersion getLivyVersion() throws LivyException {
return new LivyVersion((LivyVersionResponse.fromJson(callRestAPI("/version", "GET")).version));
}
private boolean isSessionExpired() throws LivyException {
try {
getSessionInfo(sessionInfo.id);
@ -270,6 +309,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
}
private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo,
boolean displayAppInfo) {
if (stmtInfo.output.isError()) {
@ -341,6 +381,10 @@ public abstract class BaseLivyInterprereter extends Interpreter {
callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId, "GET"));
}
private void cancelStatement(int statementId) throws LivyException {
callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId + "/cancel", "POST");
}
private RestTemplate getRestTemplate() {
String keytabLocation = property.getProperty("zeppelin.livy.keytab");
String principal = property.getProperty("zeppelin.livy.principal");
@ -385,21 +429,20 @@ public abstract class BaseLivyInterprereter extends Interpreter {
LOGGER.debug("Get response, StatusCode: {}, responseBody: {}", response.getStatusCode(),
response.getBody());
if (response.getStatusCode().value() == 200
|| response.getStatusCode().value() == 201
|| response.getStatusCode().value() == 404) {
String responseBody = response.getBody();
if (responseBody.matches("\"Session '\\d+' not found.\"")) {
throw new SessionNotFoundException(responseBody);
|| response.getStatusCode().value() == 201) {
return response.getBody();
} else if (response.getStatusCode().value() == 404) {
if (response.getBody().matches("Session '\\d+' not found.")) {
throw new SessionNotFoundException(response.getBody());
} else {
return responseBody;
throw new APINotFoundException("No rest api found for " + targetURL +
", " + response.getStatusCode());
}
} else {
String responseString = response.getBody();
if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
return responseString;
}
LOGGER.error(String.format("Error with %s StatusCode: %s",
response.getStatusCode().value(), responseString));
throw new LivyException(String.format("Error with %s StatusCode: %s",
response.getStatusCode().value(), responseString));
}
@ -502,7 +545,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
public boolean isAvailable() {
return state.equals("available");
return state.equals("available") || state.equals("cancelled");
}
private static class StatementOutput {
@ -543,4 +586,17 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
}
private static class LivyVersionResponse {
public String url;
public String branch;
public String revision;
public String version;
public String date;
public String user;
public static LivyVersionResponse fromJson(String json) {
return gson.fromJson(json, LivyVersionResponse.class);
}
}
}

View file

@ -51,7 +51,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
// to judge whether it is using spark2.
try {
InterpreterResult result = sparkInterpreter.interpret("spark", false, false);
InterpreterResult result = sparkInterpreter.interpret("spark", null, false, false);
if (result.code() == InterpreterResult.Code.SUCCESS &&
result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) {
LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}",
@ -59,7 +59,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
isSpark2 = true;
} else {
// spark 1.x
result = sparkInterpreter.interpret("sqlContext", false, false);
result = sparkInterpreter.interpret("sqlContext", null, false, false);
if (result.code() == InterpreterResult.Code.SUCCESS) {
LOGGER.info("sqlContext is detected.");
} else if (result.code() == InterpreterResult.Code.ERROR) {
@ -68,7 +68,7 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves");
result = sparkInterpreter.interpret(
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", false, false);
+ "import sqlContext.implicits._", null, false, false);
if (result.code() == InterpreterResult.Code.ERROR) {
throw new LivyException("Fail to create SQLContext," +
result.message().get(0).getData());
@ -113,7 +113,8 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
} else {
sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
}
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo, true);
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(),
this.displayAppInfo, true);
if (result.code() == InterpreterResult.Code.SUCCESS) {
InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS);

View file

@ -0,0 +1,96 @@
package org.apache.zeppelin.livy;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provide reading comparing capability of livy version
*/
public class LivyVersion {
private static final Logger logger = LoggerFactory.getLogger(LivyVersion.class);
private static final LivyVersion LIVY_0_2_0 = LivyVersion.fromVersionString("0.2.0");
private static final LivyVersion LIVY_0_3_0 = LivyVersion.fromVersionString("0.3.0");
private int version;
private String versionString;
LivyVersion(String versionString) {
this.versionString = versionString;
try {
int pos = versionString.indexOf('-');
String numberPart = versionString;
if (pos > 0) {
numberPart = versionString.substring(0, pos);
}
String versions[] = numberPart.split("\\.");
int major = Integer.parseInt(versions[0]);
int minor = Integer.parseInt(versions[1]);
int patch = Integer.parseInt(versions[2]);
// version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602)
version = Integer.parseInt(String.format("%d%02d%02d", major, minor, patch));
} catch (Exception e) {
logger.error("Can not recognize Livy version " + versionString +
". Assume it's a future release", e);
// assume it is future release
version = 99999;
}
}
public int toNumber() {
return version;
}
public String toString() {
return versionString;
}
public static LivyVersion fromVersionString(String versionString) {
return new LivyVersion(versionString);
}
public boolean isCancelSupported() {
return this.newerThanEquals(LIVY_0_3_0);
}
public boolean equals(Object versionToCompare) {
return version == ((LivyVersion) versionToCompare).version;
}
public boolean newerThan(LivyVersion versionToCompare) {
return version > versionToCompare.version;
}
public boolean newerThanEquals(LivyVersion versionToCompare) {
return version >= versionToCompare.version;
}
public boolean olderThan(LivyVersion versionToCompare) {
return version < versionToCompare.version;
}
public boolean olderThanEquals(LivyVersion versionToCompare) {
return version <= versionToCompare.version;
}
}