mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
added testcase for cancellation support on LivySparkSQLInterpreter and moved the removal to finally block
This commit is contained in:
parent
9fc6dbf695
commit
75fe5746fd
2 changed files with 76 additions and 9 deletions
|
|
@ -281,16 +281,19 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
|
||||
private void cancel(int id, String paragraphId) {
|
||||
if (livyVersion.isCancelSupported()) {
|
||||
try {
|
||||
LOGGER.info("Cancelling statement " + id);
|
||||
cancelStatement(id);
|
||||
paragraphsToCancel.remove(paragraphId);
|
||||
} catch (LivyException e) {
|
||||
LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
|
||||
try {
|
||||
if (livyVersion.isCancelSupported()) {
|
||||
try {
|
||||
LOGGER.info("Cancelling statement " + id);
|
||||
cancelStatement(id);
|
||||
} catch (LivyException e) {
|
||||
LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
|
||||
}
|
||||
} else {
|
||||
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
|
||||
}
|
||||
} else {
|
||||
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
|
||||
} finally {
|
||||
paragraphsToCancel.remove(paragraphId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -306,6 +306,70 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSparkSQLCancellation() {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
}
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
|
||||
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
|
||||
new LivySparkInterpreter(properties));
|
||||
sparkInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
interpreterGroup.get("session_1").add(sparkInterpreter);
|
||||
final LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
|
||||
interpreterGroup.get("session_1").add(sqlInterpreter);
|
||||
sqlInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
sqlInterpreter.open();
|
||||
|
||||
try {
|
||||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
final InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql",
|
||||
"title", "text", authInfo, null, null, null, null, null, output);
|
||||
InterpreterResult result = sqlInterpreter.interpret("show tables", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
assertTrue(result.message().get(0).getData().contains("tableName"));
|
||||
|
||||
// cancel
|
||||
if (sqlInterpreter.getLivyVersion().newerThanEquals(LivyVersion.LIVY_0_3_0)) {
|
||||
Thread cancelThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
// invoke cancel after 1 millisecond to wait job starting
|
||||
try {
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
sqlInterpreter.cancel(context);
|
||||
}
|
||||
};
|
||||
cancelThread.start();
|
||||
boolean bCancelled = false;
|
||||
for (int i=0; i <1000; i++) {
|
||||
result = sqlInterpreter
|
||||
.interpret("show tables", context);
|
||||
if (result.code().equals(InterpreterResult.Code.ERROR)) {
|
||||
String message = result.message().get(0).getData();
|
||||
// 2 possibilities, sometimes livy doesn't return the real cancel exception
|
||||
assertTrue(message.contains("cancelled part of cancelled job group") ||
|
||||
message.contains("Job is cancelled"));
|
||||
bCancelled = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertTrue(bCancelled);
|
||||
}
|
||||
} catch (LivyException e) {
|
||||
} finally {
|
||||
sqlInterpreter.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStringWithTruncation() {
|
||||
if (!checkPreCondition()) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue