ZEPPELIN-2355 Fix race conditions while cancelling a paragraph

This commit is contained in:
Benoy Antony 2017-04-04 20:27:30 -07:00
parent e7d41c3497
commit 8673acfd5d
3 changed files with 55 additions and 42 deletions

View file

@ -17,15 +17,30 @@
package org.apache.zeppelin.livy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang.StringUtils;
import org.apache.http.client.HttpClient;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.impl.client.HttpClients;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
@ -38,16 +53,9 @@ import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import javax.net.ssl.SSLContext;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
/**
* Base class for livy interpreters.
@ -67,9 +75,8 @@ public abstract class BaseLivyInterprereter extends Interpreter {
protected LivyVersion livyVersion;
private RestTemplate restTemplate;
// keep tracking the mapping between paragraphId and statementId, so that we can cancel the
// statement after we execute it.
private ConcurrentHashMap<String, Integer> paragraphId2StmtIdMap = new ConcurrentHashMap<>();
Set<Object> paragraphsToCancel = Collections.newSetFromMap(
new ConcurrentHashMap<Object, Boolean>());
private ConcurrentHashMap<String, Integer> paragraphId2StmtProgressMap =
new ConcurrentHashMap<>();
@ -162,21 +169,8 @@ public abstract class BaseLivyInterprereter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
if (livyVersion.isCancelSupported()) {
String paraId = context.getParagraphId();
Integer stmtId = paragraphId2StmtIdMap.get(paraId);
try {
if (stmtId != null) {
cancelStatement(stmtId);
}
} catch (LivyException e) {
LOGGER.error("Fail to cancel statement " + stmtId + " for paragraph " + paraId, e);
} finally {
paragraphId2StmtIdMap.remove(paraId);
}
} else {
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
}
paragraphsToCancel.add(context.getParagraphId());
LOGGER.info("Added paragraph " + context.getParagraphId() + " for cancellation.");
}
@Override
@ -260,11 +254,12 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
stmtInfo = executeStatement(new ExecuteRequest(code));
}
if (paragraphId != null) {
paragraphId2StmtIdMap.put(paragraphId, stmtInfo.id);
}
// pull the statement status
while (!stmtInfo.isAvailable()) {
if (paragraphId != null && paragraphsToCancel.contains(paragraphId)) {
cancel(stmtInfo.id, paragraphId);
return new InterpreterResult(InterpreterResult.Code.ERROR, "Job is cancelled");
}
try {
Thread.sleep(pullStatusInterval);
} catch (InterruptedException e) {
@ -284,12 +279,25 @@ public abstract class BaseLivyInterprereter extends Interpreter {
}
} finally {
if (paragraphId != null) {
paragraphId2StmtIdMap.remove(paragraphId);
paragraphId2StmtProgressMap.remove(paragraphId);
}
}
}
private void cancel(int id, String paragraphId) {
if (livyVersion.isCancelSupported()) {
try {
LOGGER.info("Cancelling statement " + id);
cancelStatement(id);
paragraphsToCancel.remove(paragraphId);
} catch (LivyException e) {
LOGGER.error("Fail to cancel statement " + id + " for paragraph " + paragraphId, e);
}
} else {
LOGGER.warn("cancel is not supported for this version of livy: " + livyVersion);
}
}
protected LivyVersion getLivyVersion() throws LivyException {
return new LivyVersion((LivyVersionResponse.fromJson(callRestAPI("/version", "GET")).version));
}

View file

@ -229,6 +229,11 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
}
}
@Override
public void cancel(InterpreterContext context) {
sparkInterpreter.cancel(context);
}
@Override
public void close() {
this.sparkInterpreter.close();

View file

@ -162,9 +162,9 @@ public class LivyInterpreterIT {
Thread cancelThread = new Thread() {
@Override
public void run() {
// invoke cancel after 3 seconds to wait job starting
// invoke cancel after 1 millisecond to wait job starting
try {
Thread.sleep(3000);
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
@ -495,9 +495,9 @@ public class LivyInterpreterIT {
Thread cancelThread = new Thread() {
@Override
public void run() {
// invoke cancel after 3 seconds to wait job starting
// invoke cancel after 1 millisecond to wait job starting
try {
Thread.sleep(3000);
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
@ -586,9 +586,9 @@ public class LivyInterpreterIT {
Thread cancelThread = new Thread() {
@Override
public void run() {
// invoke cancel after 3 seconds to wait job starting
// invoke cancel after 1 millisecond to wait job starting
try {
Thread.sleep(3000);
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}