mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
fix paragraph abort
This commit is contained in:
parent
1b79c071f7
commit
eb8706f6f9
5 changed files with 39 additions and 14 deletions
|
|
@ -38,6 +38,7 @@ import java.io.BufferedReader;
|
|||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
|
|
@ -48,17 +49,20 @@ import java.util.Properties;
|
|||
public class LivyHelper {
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivyHelper.class);
|
||||
Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
||||
HashMap<String, Object> paragraphHttpMap = new HashMap<>();
|
||||
Properties property;
|
||||
|
||||
LivyHelper(Properties property) {
|
||||
this.property = property;
|
||||
}
|
||||
|
||||
protected Integer createSession(String user, String kind) throws Exception {
|
||||
protected Integer createSession(InterpreterContext context, String kind) throws Exception {
|
||||
try {
|
||||
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions",
|
||||
"POST",
|
||||
"{\"kind\": \"" + kind + "\", \"proxyUser\": \"" + user + "\"}"
|
||||
"{\"kind\": \"" + kind + "\", \"proxyUser\": \""
|
||||
+ context.getAuthenticationInfo().getUser() + "\"}",
|
||||
context.getParagraphId()
|
||||
);
|
||||
Map jsonMap = (Map<Object, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<Object, Object>>() {
|
||||
|
|
@ -70,7 +74,8 @@ public class LivyHelper {
|
|||
jsonMap.get("id"), jsonMap.get("state")));
|
||||
Thread.sleep(1000);
|
||||
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
|
||||
"GET", null);
|
||||
"GET", null,
|
||||
context.getParagraphId());
|
||||
jsonMap = (Map<Object, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<Object, Object>>() {
|
||||
}.getType());
|
||||
|
|
@ -79,7 +84,8 @@ public class LivyHelper {
|
|||
} else if (jsonMap.get("state").equals("error")) {
|
||||
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
|
||||
sessionId + "/log",
|
||||
"GET", null);
|
||||
"GET", null,
|
||||
context.getParagraphId());
|
||||
jsonMap = (Map<Object, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<Object, Object>>() {
|
||||
}.getType());
|
||||
|
|
@ -204,8 +210,10 @@ public class LivyHelper {
|
|||
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
if (paragraphHttpMap.get(context.getParagraphId()) == null) {
|
||||
return new InterpreterResult(Code.INCOMPLETE, "");
|
||||
}
|
||||
jsonMap = getStatusById(context, userSessionMap, id);
|
||||
LOGGER.error("jsonMap==" + jsonMap);
|
||||
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
|
||||
if (interpreterResult != null) {
|
||||
return interpreterResult;
|
||||
|
|
@ -256,7 +264,8 @@ public class LivyHelper {
|
|||
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
|
||||
+ "/statements",
|
||||
"POST",
|
||||
"{\"code\": \"" + lines + "\" }");
|
||||
"{\"code\": \"" + lines + "\" }",
|
||||
context.getParagraphId());
|
||||
if (json.equals("Session not found")) {
|
||||
throw new Exception("Exception: Session not found, Livy server would have restarted, " +
|
||||
"or lost session.");
|
||||
|
|
@ -277,7 +286,7 @@ public class LivyHelper {
|
|||
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
|
||||
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
|
||||
+ "/statements/" + id,
|
||||
"GET", null);
|
||||
"GET", null, context.getParagraphId());
|
||||
try {
|
||||
Map jsonMap = gson.fromJson(json,
|
||||
new TypeToken<Map>() {
|
||||
|
|
@ -288,7 +297,7 @@ public class LivyHelper {
|
|||
}
|
||||
}
|
||||
|
||||
public String executeHTTP(String targetURL, String method, String jsonData)
|
||||
public String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
|
||||
throws Exception {
|
||||
HttpClient client = HttpClientBuilder.create().build();
|
||||
HttpResponse response;
|
||||
|
|
@ -298,14 +307,14 @@ public class LivyHelper {
|
|||
StringEntity se = new StringEntity(jsonData);
|
||||
request.setEntity(se);
|
||||
response = client.execute(request);
|
||||
paragraphHttpMap.put(paragraphId, request);
|
||||
} else {
|
||||
HttpGet request = new HttpGet(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
response = client.execute(request);
|
||||
paragraphHttpMap.put(paragraphId, request);
|
||||
}
|
||||
|
||||
LOGGER.error("response.getStatusLine().getStatusCode()============================="
|
||||
+ response.getStatusLine().getStatusCode());
|
||||
if (response.getStatusLine().getStatusCode() == 200
|
||||
|| response.getStatusLine().getStatusCode() == 201
|
||||
|| response.getStatusLine().getStatusCode() == 404) {
|
||||
|
|
@ -322,4 +331,13 @@ public class LivyHelper {
|
|||
return null;
|
||||
}
|
||||
|
||||
public void cancelHTTP(String paragraphId) {
|
||||
if (paragraphHttpMap.get(paragraphId).getClass().getName().contains("HttpPost")) {
|
||||
((HttpPost) paragraphHttpMap.get(paragraphId)).abort();
|
||||
} else {
|
||||
((HttpGet) paragraphHttpMap.get(paragraphId)).abort();
|
||||
}
|
||||
paragraphHttpMap.put(paragraphId, null);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
interpreterContext,
|
||||
"pyspark")
|
||||
);
|
||||
} catch (Exception e) {
|
||||
|
|
@ -93,6 +93,7 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -17,7 +17,10 @@
|
|||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -82,7 +85,7 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
interpreterContext,
|
||||
"spark")
|
||||
);
|
||||
livyHelper.initializeSpark(interpreterContext, userSessionMap);
|
||||
|
|
@ -104,6 +107,7 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ public class LivySparkRInterpreter extends Interpreter {
|
|||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
interpreterContext,
|
||||
"sparkr")
|
||||
);
|
||||
} catch (Exception e) {
|
||||
|
|
@ -93,6 +93,7 @@ public class LivySparkRInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
interpreterContext,
|
||||
"spark")
|
||||
);
|
||||
livyHelper.initializeSpark(interpreterContext, userSessionMap);
|
||||
|
|
@ -136,6 +136,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in a new issue