mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-2355 Fix race conditions while cancelling a paragraph
This commit is contained in:
parent
e7d41c3497
commit
8673acfd5d
3 changed files with 55 additions and 42 deletions
|
|
@ -17,15 +17,30 @@
|
|||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.KeyStore;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
|
||||
import org.apache.http.conn.ssl.SSLContexts;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpEntity;
|
||||
|
|
@ -38,16 +53,9 @@ import org.springframework.web.client.HttpClientErrorException;
|
|||
import org.springframework.web.client.RestClientException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.security.KeyStore;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
/**
|
||||
* Base class for livy interpreters.
|
||||
|
|
@ -67,9 +75,8 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
protected LivyVersion livyVersion;
|
||||
private RestTemplate restTemplate;
|
||||
|
||||
// keep tracking the mapping between paragraphId and statementId, so that we can cancel the
|
||||
// statement after we execute it.
|
||||
private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMap = new ConcurrentHashMap<>();
|
||||
Set<Object> paragraphsToCancel = Collections.newSetFromMap(
|
||||
new ConcurrentHashMap<Object, Boolean>());
|
||||
private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
|
|
@ -162,21 +169,8 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
if (livyVersion.isCancelSupported()) {
|
||||
String paraId = context.getParagraphId();
|
||||
Integer stmtId = paragraphId2StmtIdMap.get(paraId);
|
||||
try {
|
||||
if (stmtId != null) {
|
||||
cancelStatement(stmtId);
|
||||
}
|
||||
} catch (LivyException e) {
|
||||
LOGGER.error("Fail to cancel statement " + stmtId + " for paragraph " + paraId, e);
|
||||
} finally {
|
||||
paragraphId2StmtIdMap.remove(paraId);
|
||||
}
|
||||
} else {
|
||||
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
|
||||
}
|
||||
paragraphsToCancel.add(context.getParagraphId());
|
||||
LOGGER.info("Added paragraph " + context.getParagraphId() + " for cancellation.");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -260,11 +254,12 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
stmtInfo = executeStatement(new ExecuteRequest(code));
|
||||
}
|
||||
if (paragraphId != null) {
|
||||
paragraphId2StmtIdMap.put(paragraphId, stmtInfo.id);
|
||||
}
|
||||
// pull the statement status
|
||||
while (!stmtInfo.isAvailable()) {
|
||||
if (paragraphId != null && paragraphsToCancel.contains(paragraphId)) {
|
||||
cancel(stmtInfo.id, paragraphId);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Job is cancelled");
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pullStatusInterval);
|
||||
} catch (InterruptedException e) {
|
||||
|
|
@ -284,12 +279,25 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
}
|
||||
} finally {
|
||||
if (paragraphId != null) {
|
||||
paragraphId2StmtIdMap.remove(paragraphId);
|
||||
paragraphId2StmtProgressMap.remove(paragraphId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void cancel(int id, String paragraphId) {
|
||||
if (livyVersion.isCancelSupported()) {
|
||||
try {
|
||||
LOGGER.info("Cancelling statement " + id);
|
||||
cancelStatement(id);
|
||||
paragraphsToCancel.remove(paragraphId);
|
||||
} catch (LivyException e) {
|
||||
LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
|
||||
}
|
||||
} else {
|
||||
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
|
||||
}
|
||||
}
|
||||
|
||||
protected LivyVersion getLivyVersion() throws LivyException {
|
||||
return new LivyVersion((LivyVersionResponse.fromJson(callRestAPI("/version", "GET")).version));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -229,6 +229,11 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
sparkInterpreter.cancel(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.sparkInterpreter.close();
|
||||
|
|
|
|||
|
|
@ -162,9 +162,9 @@ public class LivyInterpreterIT {
|
|||
Thread cancelThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
// invoke cancel after 3 seconds to wait job starting
|
||||
// invoke cancel after 1 millisecond to wait job starting
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
|
@ -495,9 +495,9 @@ public class LivyInterpreterIT {
|
|||
Thread cancelThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
// invoke cancel after 3 seconds to wait job starting
|
||||
// invoke cancel after 1 millisecond to wait job starting
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
|
@ -586,9 +586,9 @@ public class LivyInterpreterIT {
|
|||
Thread cancelThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
// invoke cancel after 3 seconds to wait job starting
|
||||
// invoke cancel after 1 millisecond to wait job starting
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
Thread.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue