ZEPPELIN-1786. Refactor LivyHelper

This commit is contained in:
Jeff Zhang 2016-12-12 12:02:03 +08:00
parent 04c62e41fd
commit 6e5cbb8e36
13 changed files with 632 additions and 749 deletions

View file

@ -92,10 +92,8 @@ install:
before_script:
- travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
- if [[ -n $LIVY_VER ]]; then travis_retry ./testing/downloadLivy.sh $LIVY_VER; fi
- ./testing/setupLivy.sh
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-server-$LIVY_VER; fi
- if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
- tail conf/zeppelin-env.sh
script:

View file

@ -17,6 +17,9 @@
package org.apache.zeppelin.livy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@ -24,7 +27,17 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.security.kerberos.client.KerberosRestTemplate;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
@ -33,76 +46,81 @@ import java.util.Properties;
public abstract class BaseLivyInterprereter extends Interpreter {
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class);
private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create();
// -1 means session is not created yet, valid sessionId start from 0
protected int sessionId = -1;
protected String appId;
protected String webUIAddress;
protected SessionInfo sessionInfo;
private String livyURL;
private long sessionCreationTimeout;
protected boolean displayAppInfo;
protected LivyOutputStream out;
protected LivyHelper livyHelper;
public BaseLivyInterprereter(Properties property) {
super(property);
this.out = new LivyOutputStream();
this.livyHelper = new LivyHelper(property);
this.livyURL = property.getProperty("zeppelin.livy.url");
this.sessionCreationTimeout = Long.parseLong(
property.getProperty("zeppelin.livy.create.session.timeout", 120 + ""));
}
public abstract String getSessionKind();
@Override
public void open() {
// TODO(zjffdu) move session creation here.
try {
initLivySession();
} catch (LivyException e) {
throw new RuntimeException("Fail to create session, please check livy interpreter log and " +
"livy server log", e);
}
}
@Override
public void close() {
if (sessionId != -1) {
livyHelper.closeSession(sessionId);
// reset sessionId to -1
sessionId = -1;
if (sessionInfo != null) {
closeSession(sessionInfo.id);
// reset sessionInfo to null so that we won't close it twice.
sessionInfo = null;
}
}
protected void createSession(InterpreterContext context) throws Exception {
sessionId = livyHelper.createSession(context, getSessionKind());
protected void initLivySession() throws LivyException {
this.sessionInfo = createSession(getUserName(), getSessionKind());
if (displayAppInfo) {
this.appId = extractStatementResult(
livyHelper.interpret("sc.applicationId", context, sessionId).message().get(0).getData());
livyHelper.interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
context, sessionId);
this.webUIAddress = extractStatementResult(
livyHelper.interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
context, sessionId).message().get(0).getData());
LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
sessionId, appId, webUIAddress);
if (sessionInfo.appId == null) {
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
// explicitly by ourselves.
sessionInfo.appId = extractStatementResult(
interpret("sc.applicationId", false).message()
.get(0).getData());
}
interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", false);
if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
sessionInfo.webUIAddress = extractStatementResult(
interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false)
.message().get(0).getData());
} else {
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
}
LOGGER.info("Create livy session successfully with sessionId: {}, appId: {}, webUI: {}",
sessionInfo.id, sessionInfo.appId, sessionInfo.webUIAddress);
}
}
public SessionInfo getSessionInfo() {
return sessionInfo;
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
try {
// add synchronized, because LivySparkSQLInterperter will use ParallelScheduler
synchronized (this) {
if (sessionId == -1) {
try {
createSession(context);
} catch (Exception e) {
LOGGER.error("Exception while creating livy session", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
}
}
if (StringUtils.isEmpty(st)) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
if (StringUtils.isEmpty(st)) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
return livyHelper.interpretInput(st, context, sessionId, out,
appId, webUIAddress, displayAppInfo);
} catch (Exception e) {
LOGGER.error("Exception in LivyInterpreter.", e);
try {
return interpret(st, this.displayAppInfo);
} catch (LivyException e) {
LOGGER.error("Fail to interpret:" + st, e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
InterpreterUtils.getMostRelevantMessage(e));
}
@ -116,7 +134,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
* @param result
* @return
*/
private static String extractStatementResult(String result) {
private String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("=")) >= 0) {
return result.substring(pos + 1).trim();
@ -128,7 +146,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
//TODO(zjffdu). Use livy cancel api which is available in livy 0.3
}
@Override
@ -140,4 +158,304 @@ public abstract class BaseLivyInterprereter extends Interpreter {
public int getProgress(InterpreterContext context) {
return 0;
}
private SessionInfo createSession(String user, String kind)
throws LivyException {
try {
Map<String, String> conf = new HashMap<>();
for (Map.Entry<Object, Object> entry : property.entrySet()) {
if (entry.getKey().toString().startsWith("livy.spark.") &&
!entry.getValue().toString().isEmpty())
conf.put(entry.getKey().toString().substring(5), entry.getValue().toString());
}
CreateSessionRequest request = new CreateSessionRequest(kind, user, conf);
SessionInfo sessionInfo = SessionInfo.fromJson(
callRestAPI("/sessions", "POST", request.toJson()));
long start = System.currentTimeMillis();
// pull the session status until it is idle or timeout
while (!sessionInfo.isReady()) {
LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state,
sessionInfo.appId);
if (sessionInfo.isFinished()) {
String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
+ ", log: " + sessionInfo.log;
LOGGER.error(msg);
throw new LivyException(msg);
}
if ((System.currentTimeMillis() - start) / 1000 > sessionCreationTimeout) {
String msg = "The creation of session " + sessionInfo.id + " is timeout within "
+ sessionCreationTimeout + " seconds, appId: " + sessionInfo.appId
+ ", log: " + sessionInfo.log;
LOGGER.error(msg);
throw new LivyException(msg);
}
Thread.sleep(1000);
sessionInfo = getSessionInfo(sessionInfo.id);
}
return sessionInfo;
} catch (Exception e) {
LOGGER.error("Error when creating livy session for user " + user, e);
throw new LivyException(e);
}
}
private SessionInfo getSessionInfo(int sessionId) throws LivyException {
return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
}
public InterpreterResult interpret(String code, boolean displayAppInfo)
throws LivyException {
StatementInfo stmtInfo = executeStatement(new ExecuteRequest(code));
// pull the statement status
while (!stmtInfo.isAvailable()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new LivyException(e);
}
stmtInfo = getStatementInfo(stmtInfo.id);
}
return getResultFromStatementInfo(stmtInfo, displayAppInfo);
}
private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo,
boolean displayAppInfo) {
if (stmtInfo.output.isError()) {
return new InterpreterResult(InterpreterResult.Code.ERROR, stmtInfo.output.evalue);
} else {
//TODO(zjffdu) support other types of data (like json, image and etc)
String result = stmtInfo.output.data.plain_text;
if (result != null) {
result = result.trim();
if (result.startsWith("<link")
|| result.startsWith("<script")
|| result.startsWith("<style")
|| result.startsWith("<div")) {
result = "%html " + result;
}
}
if (displayAppInfo) {
//TODO(zjffdu), use multiple InterpreterResult to display appInfo
StringBuilder outputBuilder = new StringBuilder();
outputBuilder.append("%angular ");
outputBuilder.append("<pre><code>");
outputBuilder.append(result);
outputBuilder.append("</code></pre>");
outputBuilder.append("<hr/>");
outputBuilder.append("Spark Application Id:" + sessionInfo.appId + "<br/>");
outputBuilder.append("Spark WebUI: <a href=" + sessionInfo.webUIAddress + ">"
+ sessionInfo.webUIAddress + "</a>");
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputBuilder.toString());
} else {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, result);
}
}
}
private StatementInfo executeStatement(ExecuteRequest executeRequest)
throws LivyException {
return StatementInfo.fromJson(callRestAPI("/sessions/" + sessionInfo.id + "/statements", "POST",
executeRequest.toJson()));
}
private StatementInfo getStatementInfo(int statementId)
throws LivyException {
return StatementInfo.fromJson(
callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId, "GET"));
}
private RestTemplate getRestTemplate() {
String keytabLocation = property.getProperty("zeppelin.livy.keytab");
String principal = property.getProperty("zeppelin.livy.principal");
if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) {
return new KerberosRestTemplate(keytabLocation, principal);
}
return new RestTemplate();
}
private String callRestAPI(String targetURL, String method) throws LivyException {
return callRestAPI(targetURL, method, "");
}
private String callRestAPI(String targetURL, String method, String jsonData)
throws LivyException {
targetURL = livyURL + targetURL;
LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
RestTemplate restTemplate = getRestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.add("Content-Type", "application/json");
headers.add("X-Requested-By", "zeppelin");
ResponseEntity<String> response = null;
try {
if (method.equals("POST")) {
HttpEntity<String> entity = new HttpEntity<>(jsonData, headers);
response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class);
} else if (method.equals("GET")) {
HttpEntity<String> entity = new HttpEntity<>(headers);
response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class);
} else if (method.equals("DELETE")) {
HttpEntity<String> entity = new HttpEntity<>(headers);
response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class);
}
} catch (HttpClientErrorException e) {
response = new ResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
LOGGER.error(String.format("Error with %s StatusCode: %s",
response.getStatusCode().value(), e.getResponseBodyAsString()));
}
if (response == null) {
throw new LivyException("No http response returned");
}
LOGGER.debug("Get response, StatusCode: {}, responseBody: {}", response.getStatusCode(),
response.getBody());
if (response.getStatusCode().value() == 200
|| response.getStatusCode().value() == 201
|| response.getStatusCode().value() == 404) {
String responseBody = response.getBody();
if (responseBody.matches("Session '\\d+' not found.")) {
throw new SessionNotFoundException(responseBody);
} else {
return responseBody;
}
} else {
String responseString = response.getBody();
if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
return responseString;
}
LOGGER.error(String.format("Error with %s StatusCode: %s",
response.getStatusCode().value(), responseString));
throw new LivyException(String.format("Error with %s StatusCode: %s",
response.getStatusCode().value(), responseString));
}
}
private void closeSession(int sessionId) {
try {
callRestAPI("/sessions/" + sessionId, "DELETE");
} catch (Exception e) {
LOGGER.error(String.format("Error closing session for user with session ID: %s",
sessionId), e);
}
}
/*
* We create these POJO here to accommodate livy 0.3 which is not released yet. livy rest api has
* some changes from version to version. So we create these POJO in zeppelin side to accommodate
* incompatibility between versions. Later, when livy become more stable, we could just depend on
* livy client jar.
*/
private static class CreateSessionRequest {
public final String kind;
@SerializedName("proxyUser")
public final String user;
public final Map<String, String> conf;
public CreateSessionRequest(String kind, String user, Map<String, String> conf) {
this.kind = kind;
this.user = user;
this.conf = conf;
}
public String toJson() {
return gson.toJson(this);
}
}
/**
*
*/
public static class SessionInfo {
public final int id;
public String appId;
public String webUIAddress;
public final String owner;
public final String proxyUser;
public final String state;
public final String kind;
public final Map<String, String> appInfo;
public final List<String> log;
public SessionInfo(int id, String appId, String owner, String proxyUser, String state,
String kind, Map<String, String> appInfo, List<String> log) {
this.id = id;
this.appId = appId;
this.owner = owner;
this.proxyUser = proxyUser;
this.state = state;
this.kind = kind;
this.appInfo = appInfo;
this.log = log;
}
public boolean isReady() {
return state.equals("idle");
}
public boolean isFinished() {
return state.equals("error") || state.equals("dead") || state.equals("success");
}
public static SessionInfo fromJson(String json) {
return gson.fromJson(json, SessionInfo.class);
}
}
private static class ExecuteRequest {
public final String code;
public ExecuteRequest(String code) {
this.code = code;
}
public String toJson() {
return gson.toJson(this);
}
}
private static class StatementInfo {
public Integer id;
public String state;
public StatementOutput output;
public StatementInfo() {
}
public static StatementInfo fromJson(String json) {
return gson.fromJson(json, StatementInfo.class);
}
public boolean isAvailable() {
return state.equals("available");
}
private static class StatementOutput {
public String status;
public String execution_count;
public Data data;
public String ename;
public String evalue;
public Object traceback;
public boolean isError() {
return status.equals("error");
}
public String toJson() {
return gson.toJson(this);
}
private static class Data {
@SerializedName("text/plain")
public String plain_text;
@SerializedName("image/png")
public String image_png;
@SerializedName("application/json")
public String application_json;
@SerializedName("application/vnd.livy.table.v1+json")
public String application_livy_table_json;
}
}
}
}

View file

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
/**
* Livy api related exception
*/
public class LivyException extends Exception {
public LivyException() {
}
public LivyException(String message) {
super(message);
}
public LivyException(String message, Throwable cause) {
super(message, cause);
}
public LivyException(Throwable cause) {
super(cause);
}
public LivyException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View file

@ -1,406 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.security.kerberos.client.KerberosRestTemplate;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import java.nio.charset.Charset;
import java.util.*;
import java.util.Map.Entry;
/***
* Livy helper class
*/
public class LivyHelper {
Logger LOGGER = LoggerFactory.getLogger(LivyHelper.class);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
HashMap<String, Object> paragraphHttpMap = new HashMap<>();
Properties property;
LivyHelper(Properties property) {
this.property = property;
}
public Integer createSession(InterpreterContext context, String kind) throws Exception {
try {
Map<String, String> conf = new HashMap<>();
Iterator<Entry<Object, Object>> it = property.entrySet().iterator();
while (it.hasNext()) {
Entry<Object, Object> pair = it.next();
if (pair.getKey().toString().startsWith("livy.spark.") &&
!pair.getValue().toString().isEmpty())
conf.put(pair.getKey().toString().substring(5), pair.getValue().toString());
}
String confData = gson.toJson(conf);
String user = context.getAuthenticationInfo().getUser();
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions", "POST",
"{" +
"\"kind\": \"" + kind + "\", " +
"\"conf\": " + confData + ", " +
"\"proxyUser\": " + (StringUtils.isEmpty(user) ? null : "\"" + user + "\"") +
"}",
context.getParagraphId()
);
Map jsonMap = (Map<Object, Object>) gson.fromJson(json,
new TypeToken<Map<Object, Object>>() {
}.getType());
Integer sessionId = ((Double) jsonMap.get("id")).intValue();
if (!jsonMap.get("state").equals("idle")) {
Integer retryCount = 60;
try {
retryCount = Integer.valueOf(
property.getProperty("zeppelin.livy.create.session.retries"));
} catch (Exception e) {
LOGGER.info("zeppelin.livy.create.session.retries property is not configured." +
" Using default retry count.");
}
while (retryCount >= 0) {
LOGGER.error(String.format("sessionId:%s state is %s",
jsonMap.get("id"), jsonMap.get("state")));
Thread.sleep(1000);
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
sessionId, "GET", null, context.getParagraphId());
jsonMap = (Map<Object, Object>) gson.fromJson(json,
new TypeToken<Map<Object, Object>>() {
}.getType());
if (jsonMap.get("state").equals("idle")) {
break;
} else if (jsonMap.get("state").equals("error") || jsonMap.get("state").equals("dead")) {
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
sessionId + "/log",
"GET", null,
context.getParagraphId());
jsonMap = (Map<Object, Object>) gson.fromJson(json,
new TypeToken<Map<Object, Object>>() {
}.getType());
String logs = StringUtils.join((ArrayList<String>) jsonMap.get("log"), '\n');
LOGGER.error(String.format("Cannot start %s.\n%s", kind, logs));
throw new Exception(String.format("Cannot start %s.\n%s", kind, logs));
}
retryCount--;
}
if (retryCount <= 0) {
LOGGER.error("Error getting session for user within the configured number of retries.");
throw new Exception(String.format("Cannot start %s.", kind));
}
}
return sessionId;
} catch (Exception e) {
LOGGER.error("Error getting session for user", e);
throw e;
}
}
public InterpreterResult interpretInput(String stringLines,
final InterpreterContext context,
int sessionId,
LivyOutputStream out,
String appId,
String webUI,
boolean displayAppInfo) {
try {
out.setInterpreterOutput(context.out);
context.out.clear();
String incomplete = "";
boolean inComment = false;
String[] lines = stringLines.split("\n");
String[] linesToRun = new String[lines.length + 1];
for (int i = 0; i < lines.length; i++) {
linesToRun[i] = lines[i];
}
linesToRun[lines.length] = "print(\"\")";
Code r = null;
StringBuilder outputBuilder = new StringBuilder();
for (int l = 0; l < linesToRun.length; l++) {
String s = linesToRun[l];
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
//for spark
if (l + 1 < linesToRun.length) {
String nextLine = linesToRun[l + 1].trim();
boolean continuation = false;
if (nextLine.isEmpty()
|| nextLine.startsWith("//") // skip empty line or comment
|| nextLine.startsWith("}")
|| nextLine.startsWith("object")) { // include "} object" for Scala companion object
continuation = true;
} else if (!inComment && nextLine.startsWith("/*")) {
inComment = true;
continuation = true;
} else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
inComment = false;
continuation = true;
} else if (nextLine.length() > 1
&& nextLine.charAt(0) == '.'
&& nextLine.charAt(1) != '.' // ".."
&& nextLine.charAt(1) != '/') { // "./"
continuation = true;
} else if (inComment) {
continuation = true;
}
if (continuation) {
incomplete += s + "\n";
continue;
}
}
InterpreterResult res;
try {
res = interpret(incomplete + s, context, sessionId);
} catch (Exception e) {
LOGGER.error("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
}
r = res.code();
if (r == Code.ERROR) {
out.setInterpreterOutput(null);
return res;
} else if (r == Code.INCOMPLETE) {
incomplete += s + "\n";
} else {
outputBuilder.append(res.message() + "\n");
incomplete = "";
}
}
if (r == Code.INCOMPLETE) {
out.setInterpreterOutput(null);
return new InterpreterResult(r, "Incomplete expression");
} else {
if (displayAppInfo) {
out.write("%angular ");
out.write("<pre><code>");
out.write(outputBuilder.toString());
out.write("</code></pre>");
out.write("<hr/>");
out.write("Spark Application Id:" + appId + "<br/>");
out.write("Spark WebUI: <a href=" + webUI + ">" + webUI + "</a>");
} else {
out.write(outputBuilder.toString());
}
out.setInterpreterOutput(null);
return new InterpreterResult(Code.SUCCESS);
}
} catch (Exception e) {
LOGGER.error("error in interpretInput", e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
}
public InterpreterResult interpret(String stringLines,
final InterpreterContext context,
int sessionId)
throws Exception {
if (stringLines.trim().equals("")) {
return new InterpreterResult(Code.SUCCESS, "");
}
Map jsonMap = executeCommand(stringLines, context, sessionId);
Integer id = ((Double) jsonMap.get("id")).intValue();
InterpreterResult res = getResultFromMap(jsonMap);
if (res != null) {
return res;
}
while (true) {
Thread.sleep(1000);
if (paragraphHttpMap.get(context.getParagraphId()) == null) {
return new InterpreterResult(Code.INCOMPLETE, "");
}
jsonMap = getStatusById(context, sessionId, id);
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
if (interpreterResult != null) {
return interpreterResult;
}
}
}
private InterpreterResult getResultFromMap(Map jsonMap) {
if (jsonMap.get("state").equals("available")) {
if (((Map) jsonMap.get("output")).get("status").equals("error")) {
StringBuilder errorMessage = new StringBuilder((String) ((Map) jsonMap
.get("output")).get("evalue"));
if (errorMessage.toString().equals("incomplete statement")
|| errorMessage.toString().contains("EOF")) {
return new InterpreterResult(Code.INCOMPLETE, "");
}
String traceback = gson.toJson(((Map) jsonMap.get("output")).get("traceback"));
if (!traceback.equals("[]")) {
errorMessage
.append("\n")
.append("traceback: \n")
.append(traceback);
}
return new InterpreterResult(Code.ERROR, errorMessage.toString());
}
if (((Map) jsonMap.get("output")).get("status").equals("ok")) {
String result = (String) ((Map) ((Map) jsonMap.get("output"))
.get("data")).get("text/plain");
if (result != null) {
result = result.trim();
if (result.startsWith("<link")
|| result.startsWith("<script")
|| result.startsWith("<style")
|| result.startsWith("<div")) {
result = "%html " + result;
}
}
return new InterpreterResult(Code.SUCCESS, result);
}
}
return null;
}
private Map executeCommand(String lines, InterpreterContext context, int sessionId)
throws Exception {
String json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/"
+ sessionId + "/statements",
"POST",
"{\"code\": \"" + StringEscapeUtils.escapeJson(lines) + "\"}",
context.getParagraphId());
if (json.matches("^(\")?Session (\'[0-9]\' )?not found(.?\"?)$")) {
throw new Exception("Exception: Session not found, Livy server would have restarted, " +
"or lost session.");
}
try {
Map jsonMap = gson.fromJson(json,
new TypeToken<Map>() {
}.getType());
return jsonMap;
} catch (Exception e) {
LOGGER.error("Error executeCommand", e);
throw e;
}
}
private Map getStatusById(InterpreterContext context,
int sessionId, Integer id) throws Exception {
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
+ sessionId
+ "/statements/" + id,
"GET", null, context.getParagraphId());
LOGGER.debug("statement {} response: {}", id, json);
try {
Map jsonMap = gson.fromJson(json,
new TypeToken<Map>() {
}.getType());
return jsonMap;
} catch (Exception e) {
LOGGER.error("Error getStatusById", e);
throw e;
}
}
private RestTemplate getRestTemplate() {
String keytabLocation = property.getProperty("zeppelin.livy.keytab");
String principal = property.getProperty("zeppelin.livy.principal");
if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) {
return new KerberosRestTemplate(keytabLocation, principal);
}
return new RestTemplate();
}
protected String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
throws Exception {
LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
RestTemplate restTemplate = getRestTemplate();
HttpHeaders headers = new HttpHeaders();
headers.add("Content-Type", "application/json");
headers.add("X-Requested-By", "zeppelin");
ResponseEntity<String> response = null;
try {
if (method.equals("POST")) {
HttpEntity<String> entity = new HttpEntity<>(jsonData, headers);
response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class);
paragraphHttpMap.put(paragraphId, response);
} else if (method.equals("GET")) {
HttpEntity<String> entity = new HttpEntity<>(headers);
response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class);
paragraphHttpMap.put(paragraphId, response);
} else if (method.equals("DELETE")) {
HttpEntity<String> entity = new HttpEntity<>(headers);
response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class);
}
} catch (HttpClientErrorException e) {
response = new ResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
LOGGER.error(String.format("Error with %s StatusCode: %s",
response.getStatusCode().value(), e.getResponseBodyAsString()));
}
if (response == null) {
return null;
}
if (response.getStatusCode().value() == 200
|| response.getStatusCode().value() == 201
|| response.getStatusCode().value() == 404) {
return response.getBody();
} else {
String responseString = response.getBody();
if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
return responseString;
}
LOGGER.error(String.format("Error with %s StatusCode: %s",
response.getStatusCode().value(), responseString));
throw new Exception(String.format("Error with %s StatusCode: %s",
response.getStatusCode().value(), responseString));
}
}
public void cancelHTTP(String paragraphId) {
// TODO(zjffdu), use cancel rest api of livy
paragraphHttpMap.put(paragraphId, null);
}
public void closeSession(int sessionId) {
try {
executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
"DELETE", null, null);
} catch (Exception e) {
LOGGER.error(String.format("Error closing session for user with session ID: %s",
sessionId), e);
}
}
}

View file

@ -1,84 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
/**
* InterpreterOutput can be attached / detached.
*/
public class LivyOutputStream extends OutputStream {
private static Logger LOGGER = LoggerFactory.getLogger(LivyOutputStream.class);
InterpreterOutput interpreterOutput;
public LivyOutputStream() {
}
public InterpreterOutput getInterpreterOutput() {
return interpreterOutput;
}
public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
this.interpreterOutput = interpreterOutput;
}
@Override
public void write(int b) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b);
}
}
@Override
public void write(byte[] b) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b);
}
}
public void write(String text) throws IOException {
LOGGER.debug("livy output:" + text);
write(text.getBytes("UTF-8"));
}
@Override
public void write(byte[] b, int offset, int len) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b, offset, len);
}
}
@Override
public void close() throws IOException {
if (interpreterOutput != null) {
interpreterOutput.close();
}
}
@Override
public void flush() throws IOException {
if (interpreterOutput != null) {
interpreterOutput.flush();
}
}
}

View file

@ -38,10 +38,12 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
private LivySparkInterpreter sparkInterpreter;
private boolean sqlContextCreated = false;
private boolean isSpark2 = false;
private int maxResult = 1000;
public LivySparkSQLInterpreter(Properties property) {
super(property);
this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
}
@Override
@ -51,10 +53,56 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
@Override
public void open() {
super.open();
this.sparkInterpreter =
(LivySparkInterpreter) getInterpreterInTheSameSessionByClassName(
LivySparkInterpreter.class.getName());
this.sparkInterpreter = getSparkInterpreter();
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
// to judge whether it is using spark2.
try {
InterpreterResult result = sparkInterpreter.interpret("spark", false);
if (result.code() == InterpreterResult.Code.SUCCESS &&
result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) {
LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}",
sparkInterpreter.getSessionInfo().id);
isSpark2 = true;
} else {
// spark 1.x
result = sparkInterpreter.interpret("sqlContext", false);
if (result.code() == InterpreterResult.Code.SUCCESS) {
LOGGER.info("sqlContext is detected.");
} else if (result.code() == InterpreterResult.Code.ERROR) {
// create SqlContext if it is not available, as in livy 0.2 sqlContext
// is not available.
LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves");
result = sparkInterpreter.interpret(
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", false);
if (result.code() == InterpreterResult.Code.ERROR) {
throw new LivyException("Fail to create SQLContext," +
result.message().get(0).getData());
}
}
}
} catch (LivyException e) {
throw new RuntimeException("Fail to Detect SparkVersion", e);
}
}
private LivySparkInterpreter getSparkInterpreter() {
LazyOpenInterpreter lazy = null;
LivySparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (LivySparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
}
@Override
@ -63,38 +111,20 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
if (StringUtils.isEmpty(line)) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
// create sqlContext implicitly if it is not available, as in livy 0.2 sqlContext
// is not available.
synchronized (this) {
if (!sqlContextCreated) {
InterpreterResult result = sparkInterpreter.interpret("sqlContext", context);
if (result.code() == InterpreterResult.Code.ERROR) {
result = sparkInterpreter.interpret(
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", context);
if (result.code() == InterpreterResult.Code.ERROR) {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"Fail to create sqlContext," + result.message());
}
}
sqlContextCreated = true;
}
// replace line separator with space
line = line.replace("\n", " ").replace("\r", " ");
String sqlQuery = null;
if (isSpark2) {
sqlQuery = "spark.sql(\"" + line + "\").show(" + maxResult + ")";
} else {
sqlQuery = "sqlContext.sql(\"" + line + "\").show(" + maxResult + ")";
}
// delegate the work to LivySparkInterpreter in the same session.
// TODO(zjffdu), we may create multiple session for the same user here. This can be fixed
// after we move session creation to open()
InterpreterResult res = sparkInterpreter.interpret("sqlContext.sql(\"" +
line.replaceAll("\"", "\\\\\"")
.replaceAll("\\n", " ")
+ "\").show(" +
property.get("zeppelin.livy.spark.sql.maxResult") + ")", context);
InterpreterResult res = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo);
if (res.code() == InterpreterResult.Code.SUCCESS) {
StringBuilder resMsg = new StringBuilder();
resMsg.append("%table ");
String[] rows = new String(context.out.toByteArray()).split("\n");
String[] rows = res.message().get(0).getData().split("\n");
String[] headers = rows[1].split("\\|");
for (int head = 1; head < headers.length; head++) {
resMsg.append(headers[head].trim()).append("\t");
@ -121,7 +151,6 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
} else {
return res;
}
} catch (Exception e) {
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,

View file

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
/**
*
*/
public class SessionNotFoundException extends LivyException {
public SessionNotFoundException(String message) {
super(message);
}
}

View file

@ -13,14 +13,9 @@
},
"zeppelin.livy.create.session.retries": {
"envName": "ZEPPELIN_LIVY_CREATE_SESSION_RETRIES",
"propertyName": "zeppelin.livy.create.session.retries",
"propertyName": "zeppelin.livy.create.session.timeout",
"defaultValue": "120",
"description": "Livy Server create session retry count."
},
"livy.spark.master": {
"propertyName": "livy.spark.master",
"defaultValue": "local[*]",
"description": "Spark master uri. ex) spark://masterhost:7077"
"description": "Livy Server create session timeout (seconds)."
},
"livy.spark.driver.cores": {
"propertyName": "livy.spark.driver.cores",

View file

@ -1,111 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
import com.google.gson.GsonBuilder;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Properties;
import static org.mockito.Mockito.doReturn;
/**
* Created for org.apache.zeppelin.livy on 22/04/16.
*/
@RunWith(MockitoJUnitRunner.class)
public class LivyHelperTest {
@Rule
public ErrorCollector collector = new ErrorCollector();
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private static LivyPySparkInterpreter interpreter;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private InterpreterContext interpreterContext;
@Mock(answer = Answers.CALLS_REAL_METHODS)
private LivyHelper livyHelper;
@Before
public void prepareContext() throws Exception {
Properties properties = new Properties();
properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
livyHelper.property = properties;
livyHelper.paragraphHttpMap = new HashMap<>();
livyHelper.gson = new GsonBuilder().setPrettyPrinting().create();
livyHelper.LOGGER = LoggerFactory.getLogger(LivyHelper.class);
doReturn("{\"id\":1,\"state\":\"idle\",\"kind\":\"spark\",\"proxyUser\":\"null\",\"log\":[]}")
.when(livyHelper)
.executeHTTP(
livyHelper.property.getProperty("zeppelin.livy.url") + "/sessions",
"POST",
"{\"kind\": \"spark\", \"conf\": {}, \"proxyUser\": null}",
null
);
doReturn("{\"id\":1,\"state\":\"available\",\"output\":{\"status\":\"ok\"," +
"\"execution_count\":1,\"data\":{\"text/plain\":\"1\"}}}")
.when(livyHelper)
.executeHTTP(
livyHelper.property.getProperty("zeppelin.livy.url") + "/sessions/1/statements",
"POST",
"{\"code\": \"print(1)\"}",
null
);
}
@Test
public void checkCreateSession() {
try {
Integer sessionId = livyHelper.createSession(interpreterContext, "spark");
collector.checkThat("check sessionId", 1, CoreMatchers.equalTo(sessionId));
} catch (Exception e) {
collector.addError(e);
}
}
@Test
public void checkInterpret() {
try {
InterpreterResult result = livyHelper.interpret("print(1)", interpreterContext, 1);
collector.checkThat("check sessionId", InterpreterResult.Code.SUCCESS,
CoreMatchers.equalTo(result.code()));
} catch (Exception e) {
collector.addError(e);
}
}
}

View file

@ -20,6 +20,8 @@ package org.apache.zeppelin.livy;
import com.cloudera.livy.test.framework.Cluster;
import com.cloudera.livy.test.framework.Cluster$;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.junit.*;
@ -49,13 +51,14 @@ public class LivyInterpreterIT {
LOGGER.info("Starting livy at {}", cluster.livyEndpoint());
properties = new Properties();
properties.setProperty("zeppelin.livy.url", cluster.livyEndpoint());
properties.setProperty("zeppelin.livy.create.session.retries", "120");
properties.setProperty("zeppelin.livy.create.session.timeout", "120");
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "100");
}
@AfterClass
public static void tearDown() {
if (cluster != null) {
LOGGER.info("Shutting down livy at {}", cluster.livyEndpoint());
cluster.cleanUp();
}
}
@ -92,63 +95,63 @@ public class LivyInterpreterIT {
try {
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData().contains("1.5.2"));
// test RDD api
outputListener.reset();
result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertTrue(outputListener.getOutputAppended().contains("Double = 55.0"));
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData().contains("Double = 55.0"));
// single line comment
outputListener.reset();
String singleLineComment = "// my comment";
String singleLineComment = "println(1)// my comment";
result = sparkInterpreter.interpret(singleLineComment, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertEquals(1, result.message().size());
// multiple line comment
outputListener.reset();
String multipleLineComment = "/* multiple \n" + "line \n" + "comment */";
String multipleLineComment = "println(1)/* multiple \n" + "line \n" + "comment */";
result = sparkInterpreter.interpret(multipleLineComment, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertEquals(1, result.message().size());
// multi-line string
outputListener.reset();
String multiLineString = "val str = \"\"\"multiple\n" +
"line\"\"\"\n" +
"println(str)";
result = sparkInterpreter.interpret(multiLineString, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertTrue(outputListener.getOutputAppended().contains("multiple\nline"));
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData().contains("multiple\nline"));
// case class
outputListener.reset();
String caseClassCode = "case class Person(id:Int, \n" +
"name:String)\n" +
"val p=Person(1, \"name_a\")";
result = sparkInterpreter.interpret(caseClassCode, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertTrue(outputListener.getOutputAppended().contains("defined class Person"));
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData().contains("p: Person = Person(1,name_a)"));
// object class
outputListener.reset();
String objectClassCode = "object Person {}";
result = sparkInterpreter.interpret(objectClassCode, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertTrue(outputListener.getOutputAppended().contains("defined module Person"));
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData().contains("defined module Person"));
// error
result = sparkInterpreter.interpret("println(a)", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertTrue(result.message().get(0).getData().contains("error: not found: value a"));
// incomplete code
result = sparkInterpreter.interpret("if(true){", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertTrue(result.message().get(0).getData().contains("incomplete statement"));
} finally {
sparkInterpreter.close();
}
@ -178,19 +181,17 @@ public class LivyInterpreterIT {
try {
// test DataFrame api
outputListener.reset();
sparkInterpreter.interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", context);
InterpreterResult result = sparkInterpreter.interpret("val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertTrue(outputListener.getOutputAppended()
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
outputListener.reset();
result = sqlInterpreter.interpret("select * from df", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
@ -209,10 +210,12 @@ public class LivyInterpreterIT {
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
new LivySparkInterpreter(properties));
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter(
new LivySparkSQLInterpreter(properties));
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
@ -249,25 +252,23 @@ public class LivyInterpreterIT {
try {
InterpreterResult result = pysparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData().contains("1.5.2"));
// test RDD api
outputListener.reset();
result = pysparkInterpreter.interpret("sc.range(1, 10).sum()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertTrue(outputListener.getOutputAppended().contains("45"));
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData().contains("45"));
// test DataFrame api
outputListener.reset();
pysparkInterpreter.interpret("from pyspark.sql import SQLContext\n"
+ "sqlContext = SQLContext(sc)", context);
result = pysparkInterpreter.interpret("df=sqlContext.createDataFrame([(\"hello\",20)])\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(0, result.message().size());
assertTrue(outputListener.getOutputAppended().contains("[Row(_1=u'hello', _2=20)]"));
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]"));
// error
result = pysparkInterpreter.interpret("print(a)", context);
@ -287,37 +288,52 @@ public class LivyInterpreterIT {
// TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
}
public static class MyInterpreterOutputListener implements InterpreterOutputListener {
private StringBuilder outputAppended = new StringBuilder();
private StringBuilder outputUpdated = new StringBuilder();
@Test
public void testLivyTutorialNote() throws IOException {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
new LivySparkInterpreter(properties));
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter(
new LivySparkSQLInterpreter(properties));
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
try {
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql",
"title", "text", authInfo, null, null, null, null, null, output);
String p1 = IOUtils.toString(getClass().getResourceAsStream("/livy_tutorial_1.scala"));
InterpreterResult result = sparkInterpreter.interpret(p1, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
String p2 = IOUtils.toString(getClass().getResourceAsStream("/livy_tutorial_2.sql"));
result = sqlInterpreter.interpret(p2, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
}
}
public static class MyInterpreterOutputListener implements InterpreterOutputListener {
@Override
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
LOGGER.info("onAppend:" + new String(line));
outputAppended.append(new String(line));
}
@Override
public void onUpdate(int index, InterpreterResultMessageOutput out) {
try {
LOGGER.info("onUpdate:" + new String(out.toByteArray()));
outputUpdated.append(new String(out.toByteArray()));
} catch (IOException e) {
e.printStackTrace();
}
}
public String getOutputAppended() {
return outputAppended.toString();
}
public String getOutputUpdated() {
return outputUpdated.toString();
}
public void reset() {
outputAppended = new StringBuilder();
outputUpdated = new StringBuilder();
}
@Override

View file

@ -0,0 +1,24 @@
import org.apache.commons.io.IOUtils
import java.net.URL
import java.nio.charset.Charset
// Zeppelin creates and injects sc (SparkContext) and sqlContext (HiveContext or SqlContext)
// So you don't need create them manually
// load bank data
val bankText = sc.parallelize(
IOUtils.toString(
new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"),
Charset.forName("utf8")).split("\n"))
case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)
val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map(
s => Bank(s(0).toInt,
s(1).replaceAll("\"", ""),
s(2).replaceAll("\"", ""),
s(3).replaceAll("\"", ""),
s(5).replaceAll("\"", "").toInt
)
).toDF()
bank.registerTempTable("bank")

View file

@ -0,0 +1,5 @@
select age, count(1) value
from bank
where age < 30
group by age
order by age

28
testing/setupLivy.sh Executable file
View file

@ -0,0 +1,28 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -xe
if [[ -n $LIVY_VER ]]; then
./testing/downloadLivy.sh
export LIVY_HOME=`pwd`/livy-server-$LIVY_VER
export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
fi
set +xe