mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Fix for 1st request failing
This commit is contained in:
parent
8f4ec479d8
commit
9cb081993d
3 changed files with 32 additions and 6 deletions
|
|
@ -59,13 +59,35 @@ public class LivyHelper {
|
|||
Map jsonMap = (Map<Object, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<Object, Object>>() {
|
||||
}.getType());
|
||||
return ((Double) jsonMap.get("id")).intValue();
|
||||
Integer sessionId = ((Double) jsonMap.get("id")).intValue();
|
||||
if (!jsonMap.get("state").equals("idle")) {
|
||||
while (true) {
|
||||
LOGGER.error(String.format("sessionId:%s state is %s",
|
||||
jsonMap.get("id"), jsonMap.get("state")));
|
||||
Thread.sleep(1000);
|
||||
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
|
||||
"GET", null);
|
||||
jsonMap = (Map<Object, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<Object, Object>>() {
|
||||
}.getType());
|
||||
if (jsonMap.get("state").equals("idle")) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return sessionId;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error getting session for user", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected void initializeSpark(final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap) {
|
||||
interpretInput("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
|
||||
"import sqlContext.implicits._", context, userSessionMap);
|
||||
}
|
||||
|
||||
public InterpreterResult interpretInput(String stringLines,
|
||||
final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap) {
|
||||
|
|
@ -130,8 +152,12 @@ public class LivyHelper {
|
|||
+ "/statements",
|
||||
"POST",
|
||||
"{\"code\": \"" + lines + "\" }");
|
||||
if (json.equals("Session not found")) {
|
||||
throw new Exception("Exception: Session not found, Livy server would have restarted, " +
|
||||
"or lost session.");
|
||||
}
|
||||
try {
|
||||
LOGGER.error(json);
|
||||
|
||||
Map jsonMap = gson.fromJson(json,
|
||||
new TypeToken<Map>() {
|
||||
}.getType());
|
||||
|
|
@ -159,11 +185,8 @@ public class LivyHelper {
|
|||
|
||||
public String executeHTTP(String targetURL, String method, String jsonData)
|
||||
throws Exception {
|
||||
|
||||
HttpClient client = HttpClientBuilder.create().build();
|
||||
|
||||
HttpResponse response;
|
||||
LOGGER.error(jsonData);
|
||||
if (method.equals("POST")) {
|
||||
HttpPost request = new HttpPost(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
|
|
@ -178,7 +201,8 @@ public class LivyHelper {
|
|||
|
||||
|
||||
if (response.getStatusLine().getStatusCode() == 200
|
||||
|| response.getStatusLine().getStatusCode() == 201) {
|
||||
|| response.getStatusLine().getStatusCode() == 201
|
||||
|| response.getStatusLine().getStatusCode() == 404) {
|
||||
BufferedReader rd = new BufferedReader(
|
||||
new InputStreamReader(response.getEntity().getContent()));
|
||||
|
||||
|
|
|
|||
|
|
@ -83,6 +83,7 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
"spark")
|
||||
);
|
||||
livyHelper.initializeSpark(interpreterContext, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -77,6 +77,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
"spark")
|
||||
);
|
||||
livyHelper.initializeSpark(interpreterContext, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue