added testcase for cancellation support on LivySparkSQLInterpreter and moved the removal to finally block

This commit is contained in:
Benoy Antony 2017-04-10 17:13:41 -07:00
parent 9fc6dbf695
commit 75fe5746fd
2 changed files with 76 additions and 9 deletions

View file

@ -281,16 +281,19 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
private void cancel(int id, String paragraphId) {
if (livyVersion.isCancelSupported()) {
try {
LOGGER.info("Cancelling statement " + id);
cancelStatement(id);
paragraphsToCancel.remove(paragraphId);
} catch (LivyException e) {
LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
try {
if (livyVersion.isCancelSupported()) {
try {
LOGGER.info("Cancelling statement " + id);
cancelStatement(id);
} catch (LivyException e) {
LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
}
} else {
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
}
} else {
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
} finally {
paragraphsToCancel.remove(paragraphId);
}
}

View file

@ -306,6 +306,70 @@ public class LivyInterpreterIT {
}
}
@Test
public void testSparkSQLCancellation() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
new LivySparkInterpreter(properties));
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
final LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
try {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql",
"title", "text", authInfo, null, null, null, null, null, output);
InterpreterResult result = sqlInterpreter.interpret("show tables", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertTrue(result.message().get(0).getData().contains("tableName"));
// cancel
if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
Thread cancelThread = new Thread() {
@Override
public void run() {
// invoke cancel after 1 millisecond to wait job starting
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
sqlInterpreter.cancel(context);
}
};
cancelThread.start();
boolean bCancelled = false;
for (int i=0; i <1000; i++) {
result = sqlInterpreter
.interpret("show tables", context);
if (result.code().equals(InterpreterResult.Code.ERROR)) {
String message = result.message().get(0).getData();
// 2 possibilities, sometimes livy doesn't return the real cancel exception
assertTrue(message.contains("cancelled part of cancelled job group") ||
message.contains("Job is cancelled"));
bCancelled = true;
break;
}
}
assertTrue(bCancelled);
}
} catch (LivyException e) {
} finally {
sqlInterpreter.close();
}
}
@Test
public void testStringWithTruncation() {
if (!checkPreCondition()) {