clear the cancel requests if livy doesnt't support cancellation and modified testcase

This commit is contained in:
Benoy Antony 2017-04-11 20:55:36 -07:00
parent 75fe5746fd
commit 244e6d3466
2 changed files with 60 additions and 41 deletions

View file

@ -276,24 +276,25 @@ public abstract class BaseLivyInterprereter extends Interpreter {
} finally {
if (paragraphId != null) {
paragraphId2StmtProgressMap.remove(paragraphId);
paragraphsToCancel.remove(paragraphId);
}
}
}
private void cancel(int id, String paragraphId) {
try {
if (livyVersion.isCancelSupported()) {
try {
LOGGER.info("Cancelling statement " + id);
cancelStatement(id);
} catch (LivyException e) {
LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
}
} else {
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
if (livyVersion.isCancelSupported()) {
try {
LOGGER.info("Cancelling statement " + id);
cancelStatement(id);
} catch (LivyException e) {
LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
}
} finally {
paragraphsToCancel.remove(paragraphId);
finally {
paragraphsToCancel.remove(paragraphId);
}
} else {
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
paragraphsToCancel.clear();
}
}

View file

@ -314,58 +314,76 @@ public class LivyInterpreterIT {
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
new LivySparkInterpreter(properties));
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();
final LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
try {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql",
"title", "text", authInfo, null, null, null, null, null, output);
InterpreterResult result = sqlInterpreter.interpret("show tables", context);
// detect spark version
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertTrue(result.message().get(0).getData().contains("tableName"));
assertEquals(1, result.message().size());
boolean isSpark2 = isSpark2(sparkInterpreter, context);
// test DataFrame api
if (!isSpark2) {
result = sparkInterpreter.interpret(
"val df=sqlContext.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
} else {
result = sparkInterpreter.interpret(
"val df=spark.createDataFrame(Seq((\"hello\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
}
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// cancel
if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
Thread cancelThread = new Thread() {
@Override
public void run() {
// invoke cancel after 1 millisecond to wait job starting
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
sqlInterpreter.cancel(context);
}
};
cancelThread.start();
boolean bCancelled = false;
for (int i=0; i <1000; i++) {
result = sqlInterpreter
.interpret("show tables", context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
String message = result.message().get(0).getData();
// 2 possibilities, sometimes livy doesn't return the real cancel exception
assertTrue(message.contains("cancelled part of cancelled job group") ||
message.contains("Job is cancelled"));
bCancelled = true;
break;
}
//sleep so that cancelThread performs a cancel.
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
result = sqlInterpreter
.interpret("select count(1) from df", context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
String message = result.message().get(0).getData();
// 2 possibilities, sometimes livy doesn't return the real cancel exception
assertTrue(message.contains("cancelled part of cancelled job group") ||
message.contains("Job is cancelled"));
}
assertTrue(bCancelled);
}
} catch (LivyException e) {
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
}
}