Fix for 1st request failing

This commit is contained in:
Prabhjyot Singh 2016-02-21 11:59:14 +05:30
parent 8f4ec479d8
commit 9cb081993d
3 changed files with 32 additions and 6 deletions

View file

@ -59,13 +59,35 @@ public class LivyHelper {
Map jsonMap = (Map<Object, Object>) gson.fromJson(json,
new TypeToken<Map<Object, Object>>() {
}.getType());
return ((Double) jsonMap.get("id")).intValue();
Integer sessionId = ((Double) jsonMap.get("id")).intValue();
if (!jsonMap.get("state").equals("idle")) {
while (true) {
LOGGER.error(String.format("sessionId:%s state is %s",
jsonMap.get("id"), jsonMap.get("state")));
Thread.sleep(1000);
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
"GET", null);
jsonMap = (Map<Object, Object>) gson.fromJson(json,
new TypeToken<Map<Object, Object>>() {
}.getType());
if (jsonMap.get("state").equals("idle")) {
break;
}
}
}
return sessionId;
} catch (Exception e) {
LOGGER.error("Error getting session for user", e);
throw e;
}
}
protected void initializeSpark(final InterpreterContext context,
final Map<String, Integer> userSessionMap) {
interpretInput("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
"import sqlContext.implicits._", context, userSessionMap);
}
public InterpreterResult interpretInput(String stringLines,
final InterpreterContext context,
final Map<String, Integer> userSessionMap) {
@ -130,8 +152,12 @@ public class LivyHelper {
+ "/statements",
"POST",
"{\"code\": \"" + lines + "\" }");
if (json.equals("Session not found")) {
throw new Exception("Exception: Session not found, Livy server would have restarted, " +
"or lost session.");
}
try {
LOGGER.error(json);
Map jsonMap = gson.fromJson(json,
new TypeToken<Map>() {
}.getType());
@ -159,11 +185,8 @@ public class LivyHelper {
public String executeHTTP(String targetURL, String method, String jsonData)
throws Exception {
HttpClient client = HttpClientBuilder.create().build();
HttpResponse response;
LOGGER.error(jsonData);
if (method.equals("POST")) {
HttpPost request = new HttpPost(targetURL);
request.addHeader("Content-Type", "application/json");
@ -178,7 +201,8 @@ public class LivyHelper {
if (response.getStatusLine().getStatusCode() == 200
|| response.getStatusLine().getStatusCode() == 201) {
|| response.getStatusLine().getStatusCode() == 201
|| response.getStatusLine().getStatusCode() == 404) {
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));

View file

@ -83,6 +83,7 @@ public class LivySparkInterpreter extends Interpreter {
interpreterContext.getAuthenticationInfo().getUser(),
"spark")
);
livyHelper.initializeSpark(interpreterContext, userSessionMap);
} catch (Exception e) {
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}

View file

@ -77,6 +77,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
interpreterContext.getAuthenticationInfo().getUser(),
"spark")
);
livyHelper.initializeSpark(interpreterContext, userSessionMap);
} catch (Exception e) {
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}