fix paragraph abort

This commit is contained in:
Prabhjyot Singh 2016-04-12 10:52:57 +05:30
parent 1b79c071f7
commit eb8706f6f9
5 changed files with 39 additions and 14 deletions

View file

@ -38,6 +38,7 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -48,17 +49,20 @@ import java.util.Properties;
public class LivyHelper {
Logger LOGGER = LoggerFactory.getLogger(LivyHelper.class);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
HashMap<String, Object> paragraphHttpMap = new HashMap<>();
Properties property;
LivyHelper(Properties property) {
this.property = property;
}
protected Integer createSession(String user, String kind) throws Exception {
protected Integer createSession(InterpreterContext context, String kind) throws Exception {
try {
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions",
"POST",
"{\"kind\": \"" + kind + "\", \"proxyUser\": \"" + user + "\"}"
"{\"kind\": \"" + kind + "\", \"proxyUser\": \""
+ context.getAuthenticationInfo().getUser() + "\"}",
context.getParagraphId()
);
Map jsonMap = (Map<Object, Object>) gson.fromJson(json,
new TypeToken<Map<Object, Object>>() {
@ -70,7 +74,8 @@ public class LivyHelper {
jsonMap.get("id"), jsonMap.get("state")));
Thread.sleep(1000);
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
"GET", null);
"GET", null,
context.getParagraphId());
jsonMap = (Map<Object, Object>) gson.fromJson(json,
new TypeToken<Map<Object, Object>>() {
}.getType());
@ -79,7 +84,8 @@ public class LivyHelper {
} else if (jsonMap.get("state").equals("error")) {
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
sessionId + "/log",
"GET", null);
"GET", null,
context.getParagraphId());
jsonMap = (Map<Object, Object>) gson.fromJson(json,
new TypeToken<Map<Object, Object>>() {
}.getType());
@ -204,8 +210,10 @@ public class LivyHelper {
while (true) {
Thread.sleep(1000);
if (paragraphHttpMap.get(context.getParagraphId()) == null) {
return new InterpreterResult(Code.INCOMPLETE, "");
}
jsonMap = getStatusById(context, userSessionMap, id);
LOGGER.error("jsonMap==" + jsonMap);
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
if (interpreterResult != null) {
return interpreterResult;
@ -256,7 +264,8 @@ public class LivyHelper {
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
+ "/statements",
"POST",
"{\"code\": \"" + lines + "\" }");
"{\"code\": \"" + lines + "\" }",
context.getParagraphId());
if (json.equals("Session not found")) {
throw new Exception("Exception: Session not found, Livy server would have restarted, " +
"or lost session.");
@ -277,7 +286,7 @@ public class LivyHelper {
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
+ "/statements/" + id,
"GET", null);
"GET", null, context.getParagraphId());
try {
Map jsonMap = gson.fromJson(json,
new TypeToken<Map>() {
@ -288,7 +297,7 @@ public class LivyHelper {
}
}
public String executeHTTP(String targetURL, String method, String jsonData)
public String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
throws Exception {
HttpClient client = HttpClientBuilder.create().build();
HttpResponse response;
@ -298,14 +307,14 @@ public class LivyHelper {
StringEntity se = new StringEntity(jsonData);
request.setEntity(se);
response = client.execute(request);
paragraphHttpMap.put(paragraphId, request);
} else {
HttpGet request = new HttpGet(targetURL);
request.addHeader("Content-Type", "application/json");
response = client.execute(request);
paragraphHttpMap.put(paragraphId, request);
}
LOGGER.error("response.getStatusLine().getStatusCode()============================="
+ response.getStatusLine().getStatusCode());
if (response.getStatusLine().getStatusCode() == 200
|| response.getStatusLine().getStatusCode() == 201
|| response.getStatusLine().getStatusCode() == 404) {
@ -322,4 +331,13 @@ public class LivyHelper {
return null;
}
public void cancelHTTP(String paragraphId) {
if (paragraphHttpMap.get(paragraphId).getClass().getName().contains("HttpPost")) {
((HttpPost) paragraphHttpMap.get(paragraphId)).abort();
} else {
((HttpGet) paragraphHttpMap.get(paragraphId)).abort();
}
paragraphHttpMap.put(paragraphId, null);
}
}

View file

@ -71,7 +71,7 @@ public class LivyPySparkInterpreter extends Interpreter {
userSessionMap.put(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext.getAuthenticationInfo().getUser(),
interpreterContext,
"pyspark")
);
} catch (Exception e) {
@ -93,6 +93,7 @@ public class LivyPySparkInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
}
@Override

View file

@ -17,7 +17,10 @@
package org.apache.zeppelin.livy;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.Executor;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@ -82,7 +85,7 @@ public class LivySparkInterpreter extends Interpreter {
userSessionMap.put(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext.getAuthenticationInfo().getUser(),
interpreterContext,
"spark")
);
livyHelper.initializeSpark(interpreterContext, userSessionMap);
@ -104,6 +107,7 @@ public class LivySparkInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
}
@Override

View file

@ -71,7 +71,7 @@ public class LivySparkRInterpreter extends Interpreter {
userSessionMap.put(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext.getAuthenticationInfo().getUser(),
interpreterContext,
"sparkr")
);
} catch (Exception e) {
@ -93,6 +93,7 @@ public class LivySparkRInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
}
@Override

View file

@ -74,7 +74,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
userSessionMap.put(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext.getAuthenticationInfo().getUser(),
interpreterContext,
"spark")
);
livyHelper.initializeSpark(interpreterContext, userSessionMap);
@ -136,6 +136,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
}
@Override