mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-1786. Refactor LivyHelper
This commit is contained in:
parent
04c62e41fd
commit
6e5cbb8e36
13 changed files with 632 additions and 749 deletions
|
|
@ -92,10 +92,8 @@ install:
|
|||
|
||||
before_script:
|
||||
- travis_retry ./testing/downloadSpark.sh $SPARK_VER $HADOOP_VER
|
||||
- if [[ -n $LIVY_VER ]]; then travis_retry ./testing/downloadLivy.sh $LIVY_VER; fi
|
||||
- ./testing/setupLivy.sh
|
||||
- echo "export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER" > conf/zeppelin-env.sh
|
||||
- if [[ -n $LIVY_VER ]]; then export LIVY_HOME=`pwd`/livy-server-$LIVY_VER; fi
|
||||
- if [[ -n $LIVY_VER ]]; then export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER; fi
|
||||
- tail conf/zeppelin-env.sh
|
||||
|
||||
script:
|
||||
|
|
|
|||
|
|
@ -17,6 +17,9 @@
|
|||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
|
|
@ -24,7 +27,17 @@ import org.apache.zeppelin.interpreter.InterpreterResult;
|
|||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.security.kerberos.client.KerberosRestTemplate;
|
||||
import org.springframework.web.client.HttpClientErrorException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
|
|
@ -33,76 +46,81 @@ import java.util.Properties;
|
|||
public abstract class BaseLivyInterprereter extends Interpreter {
|
||||
|
||||
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class);
|
||||
private static Gson gson = new GsonBuilder().setPrettyPrinting().disableHtmlEscaping().create();
|
||||
|
||||
// -1 means session is not created yet, valid sessionId start from 0
|
||||
protected int sessionId = -1;
|
||||
protected String appId;
|
||||
protected String webUIAddress;
|
||||
protected SessionInfo sessionInfo;
|
||||
private String livyURL;
|
||||
private long sessionCreationTimeout;
|
||||
protected boolean displayAppInfo;
|
||||
protected LivyOutputStream out;
|
||||
protected LivyHelper livyHelper;
|
||||
|
||||
public BaseLivyInterprereter(Properties property) {
|
||||
super(property);
|
||||
this.out = new LivyOutputStream();
|
||||
this.livyHelper = new LivyHelper(property);
|
||||
this.livyURL = property.getProperty("zeppelin.livy.url");
|
||||
this.sessionCreationTimeout = Long.parseLong(
|
||||
property.getProperty("zeppelin.livy.create.session.timeout", 120 + ""));
|
||||
}
|
||||
|
||||
public abstract String getSessionKind();
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
// TODO(zjffdu) move session creation here.
|
||||
try {
|
||||
initLivySession();
|
||||
} catch (LivyException e) {
|
||||
throw new RuntimeException("Fail to create session, please check livy interpreter log and " +
|
||||
"livy server log", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (sessionId != -1) {
|
||||
livyHelper.closeSession(sessionId);
|
||||
// reset sessionId to -1
|
||||
sessionId = -1;
|
||||
if (sessionInfo != null) {
|
||||
closeSession(sessionInfo.id);
|
||||
// reset sessionInfo to null so that we won't close it twice.
|
||||
sessionInfo = null;
|
||||
}
|
||||
}
|
||||
|
||||
protected void createSession(InterpreterContext context) throws Exception {
|
||||
sessionId = livyHelper.createSession(context, getSessionKind());
|
||||
protected void initLivySession() throws LivyException {
|
||||
this.sessionInfo = createSession(getUserName(), getSessionKind());
|
||||
if (displayAppInfo) {
|
||||
this.appId = extractStatementResult(
|
||||
livyHelper.interpret("sc.applicationId", context, sessionId).message().get(0).getData());
|
||||
livyHelper.interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
|
||||
context, sessionId);
|
||||
this.webUIAddress = extractStatementResult(
|
||||
livyHelper.interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
|
||||
context, sessionId).message().get(0).getData());
|
||||
LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
|
||||
sessionId, appId, webUIAddress);
|
||||
if (sessionInfo.appId == null) {
|
||||
// livy 0.2 don't return appId and sparkUiUrl in response so that we need to get it
|
||||
// explicitly by ourselves.
|
||||
sessionInfo.appId = extractStatementResult(
|
||||
interpret("sc.applicationId", false).message()
|
||||
.get(0).getData());
|
||||
}
|
||||
|
||||
interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get", false);
|
||||
if (StringUtils.isEmpty(sessionInfo.appInfo.get("sparkUiUrl"))) {
|
||||
sessionInfo.webUIAddress = extractStatementResult(
|
||||
interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)", false)
|
||||
.message().get(0).getData());
|
||||
} else {
|
||||
sessionInfo.webUIAddress = sessionInfo.appInfo.get("sparkUiUrl");
|
||||
}
|
||||
LOGGER.info("Create livy session successfully with sessionId: {}, appId: {}, webUI: {}",
|
||||
sessionInfo.id, sessionInfo.appId, sessionInfo.webUIAddress);
|
||||
}
|
||||
}
|
||||
|
||||
public SessionInfo getSessionInfo() {
|
||||
return sessionInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
try {
|
||||
// add synchronized, because LivySparkSQLInterperter will use ParallelScheduler
|
||||
synchronized (this) {
|
||||
if (sessionId == -1) {
|
||||
try {
|
||||
createSession(context);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception while creating livy session", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (StringUtils.isEmpty(st)) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
if (StringUtils.isEmpty(st)) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(st, context, sessionId, out,
|
||||
appId, webUIAddress, displayAppInfo);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivyInterpreter.", e);
|
||||
try {
|
||||
return interpret(st, this.displayAppInfo);
|
||||
} catch (LivyException e) {
|
||||
LOGGER.error("Fail to interpret:" + st, e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
|
|
@ -116,7 +134,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private static String extractStatementResult(String result) {
|
||||
private String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("=")) >= 0) {
|
||||
return result.substring(pos + 1).trim();
|
||||
|
|
@ -128,7 +146,7 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
//TODO(zjffdu). Use livy cancel api which is available in livy 0.3
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -140,4 +158,304 @@ public abstract class BaseLivyInterprereter extends Interpreter {
|
|||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
private SessionInfo createSession(String user, String kind)
|
||||
throws LivyException {
|
||||
try {
|
||||
Map<String, String> conf = new HashMap<>();
|
||||
for (Map.Entry<Object, Object> entry : property.entrySet()) {
|
||||
if (entry.getKey().toString().startsWith("livy.spark.") &&
|
||||
!entry.getValue().toString().isEmpty())
|
||||
conf.put(entry.getKey().toString().substring(5), entry.getValue().toString());
|
||||
}
|
||||
|
||||
CreateSessionRequest request = new CreateSessionRequest(kind, user, conf);
|
||||
SessionInfo sessionInfo = SessionInfo.fromJson(
|
||||
callRestAPI("/sessions", "POST", request.toJson()));
|
||||
long start = System.currentTimeMillis();
|
||||
// pull the session status until it is idle or timeout
|
||||
while (!sessionInfo.isReady()) {
|
||||
LOGGER.info("Session {} is in state {}, appId {}", sessionInfo.id, sessionInfo.state,
|
||||
sessionInfo.appId);
|
||||
if (sessionInfo.isFinished()) {
|
||||
String msg = "Session " + sessionInfo.id + " is finished, appId: " + sessionInfo.appId
|
||||
+ ", log: " + sessionInfo.log;
|
||||
LOGGER.error(msg);
|
||||
throw new LivyException(msg);
|
||||
}
|
||||
if ((System.currentTimeMillis() - start) / 1000 > sessionCreationTimeout) {
|
||||
String msg = "The creation of session " + sessionInfo.id + " is timeout within "
|
||||
+ sessionCreationTimeout + " seconds, appId: " + sessionInfo.appId
|
||||
+ ", log: " + sessionInfo.log;
|
||||
LOGGER.error(msg);
|
||||
throw new LivyException(msg);
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
sessionInfo = getSessionInfo(sessionInfo.id);
|
||||
}
|
||||
return sessionInfo;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error when creating livy session for user " + user, e);
|
||||
throw new LivyException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private SessionInfo getSessionInfo(int sessionId) throws LivyException {
|
||||
return SessionInfo.fromJson(callRestAPI("/sessions/" + sessionId, "GET"));
|
||||
}
|
||||
|
||||
public InterpreterResult interpret(String code, boolean displayAppInfo)
|
||||
throws LivyException {
|
||||
StatementInfo stmtInfo = executeStatement(new ExecuteRequest(code));
|
||||
// pull the statement status
|
||||
while (!stmtInfo.isAvailable()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
throw new LivyException(e);
|
||||
}
|
||||
stmtInfo = getStatementInfo(stmtInfo.id);
|
||||
}
|
||||
return getResultFromStatementInfo(stmtInfo, displayAppInfo);
|
||||
}
|
||||
|
||||
private InterpreterResult getResultFromStatementInfo(StatementInfo stmtInfo,
|
||||
boolean displayAppInfo) {
|
||||
if (stmtInfo.output.isError()) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, stmtInfo.output.evalue);
|
||||
} else {
|
||||
//TODO(zjffdu) support other types of data (like json, image and etc)
|
||||
String result = stmtInfo.output.data.plain_text;
|
||||
if (result != null) {
|
||||
result = result.trim();
|
||||
if (result.startsWith("<link")
|
||||
|| result.startsWith("<script")
|
||||
|| result.startsWith("<style")
|
||||
|| result.startsWith("<div")) {
|
||||
result = "%html " + result;
|
||||
}
|
||||
}
|
||||
if (displayAppInfo) {
|
||||
//TODO(zjffdu), use multiple InterpreterResult to display appInfo
|
||||
StringBuilder outputBuilder = new StringBuilder();
|
||||
outputBuilder.append("%angular ");
|
||||
outputBuilder.append("<pre><code>");
|
||||
outputBuilder.append(result);
|
||||
outputBuilder.append("</code></pre>");
|
||||
outputBuilder.append("<hr/>");
|
||||
outputBuilder.append("Spark Application Id:" + sessionInfo.appId + "<br/>");
|
||||
outputBuilder.append("Spark WebUI: <a href=" + sessionInfo.webUIAddress + ">"
|
||||
+ sessionInfo.webUIAddress + "</a>");
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputBuilder.toString());
|
||||
} else {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private StatementInfo executeStatement(ExecuteRequest executeRequest)
|
||||
throws LivyException {
|
||||
return StatementInfo.fromJson(callRestAPI("/sessions/" + sessionInfo.id + "/statements", "POST",
|
||||
executeRequest.toJson()));
|
||||
}
|
||||
|
||||
private StatementInfo getStatementInfo(int statementId)
|
||||
throws LivyException {
|
||||
return StatementInfo.fromJson(
|
||||
callRestAPI("/sessions/" + sessionInfo.id + "/statements/" + statementId, "GET"));
|
||||
}
|
||||
|
||||
private RestTemplate getRestTemplate() {
|
||||
String keytabLocation = property.getProperty("zeppelin.livy.keytab");
|
||||
String principal = property.getProperty("zeppelin.livy.principal");
|
||||
if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) {
|
||||
return new KerberosRestTemplate(keytabLocation, principal);
|
||||
}
|
||||
return new RestTemplate();
|
||||
}
|
||||
|
||||
private String callRestAPI(String targetURL, String method) throws LivyException {
|
||||
return callRestAPI(targetURL, method, "");
|
||||
}
|
||||
|
||||
private String callRestAPI(String targetURL, String method, String jsonData)
|
||||
throws LivyException {
|
||||
targetURL = livyURL + targetURL;
|
||||
LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
|
||||
RestTemplate restTemplate = getRestTemplate();
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.add("Content-Type", "application/json");
|
||||
headers.add("X-Requested-By", "zeppelin");
|
||||
ResponseEntity<String> response = null;
|
||||
try {
|
||||
if (method.equals("POST")) {
|
||||
HttpEntity<String> entity = new HttpEntity<>(jsonData, headers);
|
||||
response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class);
|
||||
} else if (method.equals("GET")) {
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class);
|
||||
} else if (method.equals("DELETE")) {
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class);
|
||||
}
|
||||
} catch (HttpClientErrorException e) {
|
||||
response = new ResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
|
||||
LOGGER.error(String.format("Error with %s StatusCode: %s",
|
||||
response.getStatusCode().value(), e.getResponseBodyAsString()));
|
||||
}
|
||||
if (response == null) {
|
||||
throw new LivyException("No http response returned");
|
||||
}
|
||||
LOGGER.debug("Get response, StatusCode: {}, responseBody: {}", response.getStatusCode(),
|
||||
response.getBody());
|
||||
if (response.getStatusCode().value() == 200
|
||||
|| response.getStatusCode().value() == 201
|
||||
|| response.getStatusCode().value() == 404) {
|
||||
String responseBody = response.getBody();
|
||||
if (responseBody.matches("Session '\\d+' not found.")) {
|
||||
throw new SessionNotFoundException(responseBody);
|
||||
} else {
|
||||
return responseBody;
|
||||
}
|
||||
} else {
|
||||
String responseString = response.getBody();
|
||||
if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
|
||||
return responseString;
|
||||
}
|
||||
LOGGER.error(String.format("Error with %s StatusCode: %s",
|
||||
response.getStatusCode().value(), responseString));
|
||||
throw new LivyException(String.format("Error with %s StatusCode: %s",
|
||||
response.getStatusCode().value(), responseString));
|
||||
}
|
||||
}
|
||||
|
||||
private void closeSession(int sessionId) {
|
||||
try {
|
||||
callRestAPI("/sessions/" + sessionId, "DELETE");
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(String.format("Error closing session for user with session ID: %s",
|
||||
sessionId), e);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We create these POJO here to accommodate livy 0.3 which is not released yet. livy rest api has
|
||||
* some changes from version to version. So we create these POJO in zeppelin side to accommodate
|
||||
* incompatibility between versions. Later, when livy become more stable, we could just depend on
|
||||
* livy client jar.
|
||||
*/
|
||||
private static class CreateSessionRequest {
|
||||
public final String kind;
|
||||
@SerializedName("proxyUser")
|
||||
public final String user;
|
||||
public final Map<String, String> conf;
|
||||
|
||||
public CreateSessionRequest(String kind, String user, Map<String, String> conf) {
|
||||
this.kind = kind;
|
||||
this.user = user;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return gson.toJson(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public static class SessionInfo {
|
||||
|
||||
public final int id;
|
||||
public String appId;
|
||||
public String webUIAddress;
|
||||
public final String owner;
|
||||
public final String proxyUser;
|
||||
public final String state;
|
||||
public final String kind;
|
||||
public final Map<String, String> appInfo;
|
||||
public final List<String> log;
|
||||
|
||||
public SessionInfo(int id, String appId, String owner, String proxyUser, String state,
|
||||
String kind, Map<String, String> appInfo, List<String> log) {
|
||||
this.id = id;
|
||||
this.appId = appId;
|
||||
this.owner = owner;
|
||||
this.proxyUser = proxyUser;
|
||||
this.state = state;
|
||||
this.kind = kind;
|
||||
this.appInfo = appInfo;
|
||||
this.log = log;
|
||||
}
|
||||
|
||||
public boolean isReady() {
|
||||
return state.equals("idle");
|
||||
}
|
||||
|
||||
public boolean isFinished() {
|
||||
return state.equals("error") || state.equals("dead") || state.equals("success");
|
||||
}
|
||||
|
||||
public static SessionInfo fromJson(String json) {
|
||||
return gson.fromJson(json, SessionInfo.class);
|
||||
}
|
||||
}
|
||||
|
||||
private static class ExecuteRequest {
|
||||
public final String code;
|
||||
|
||||
public ExecuteRequest(String code) {
|
||||
this.code = code;
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return gson.toJson(this);
|
||||
}
|
||||
}
|
||||
|
||||
private static class StatementInfo {
|
||||
public Integer id;
|
||||
public String state;
|
||||
public StatementOutput output;
|
||||
|
||||
public StatementInfo() {
|
||||
}
|
||||
|
||||
public static StatementInfo fromJson(String json) {
|
||||
return gson.fromJson(json, StatementInfo.class);
|
||||
}
|
||||
|
||||
public boolean isAvailable() {
|
||||
return state.equals("available");
|
||||
}
|
||||
|
||||
private static class StatementOutput {
|
||||
public String status;
|
||||
public String execution_count;
|
||||
public Data data;
|
||||
public String ename;
|
||||
public String evalue;
|
||||
public Object traceback;
|
||||
|
||||
public boolean isError() {
|
||||
return status.equals("error");
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return gson.toJson(this);
|
||||
}
|
||||
|
||||
private static class Data {
|
||||
@SerializedName("text/plain")
|
||||
public String plain_text;
|
||||
@SerializedName("image/png")
|
||||
public String image_png;
|
||||
@SerializedName("application/json")
|
||||
public String application_json;
|
||||
@SerializedName("application/vnd.livy.table.v1+json")
|
||||
public String application_livy_table_json;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
/**
|
||||
* Livy api related exception
|
||||
*/
|
||||
public class LivyException extends Exception {
|
||||
public LivyException() {
|
||||
}
|
||||
|
||||
public LivyException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public LivyException(String message, Throwable cause) {
|
||||
super(message, cause);
|
||||
}
|
||||
|
||||
public LivyException(Throwable cause) {
|
||||
super(cause);
|
||||
}
|
||||
|
||||
public LivyException(String message, Throwable cause, boolean enableSuppression,
|
||||
boolean writableStackTrace) {
|
||||
super(message, cause, enableSuppression, writableStackTrace);
|
||||
}
|
||||
}
|
||||
|
|
@ -1,406 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.lang3.StringEscapeUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpEntity;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.security.kerberos.client.KerberosRestTemplate;
|
||||
import org.springframework.web.client.HttpClientErrorException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
|
||||
/***
|
||||
* Livy helper class
|
||||
*/
|
||||
public class LivyHelper {
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivyHelper.class);
|
||||
Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
||||
HashMap<String, Object> paragraphHttpMap = new HashMap<>();
|
||||
Properties property;
|
||||
|
||||
LivyHelper(Properties property) {
|
||||
this.property = property;
|
||||
}
|
||||
|
||||
public Integer createSession(InterpreterContext context, String kind) throws Exception {
|
||||
try {
|
||||
Map<String, String> conf = new HashMap<>();
|
||||
|
||||
Iterator<Entry<Object, Object>> it = property.entrySet().iterator();
|
||||
while (it.hasNext()) {
|
||||
Entry<Object, Object> pair = it.next();
|
||||
if (pair.getKey().toString().startsWith("livy.spark.") &&
|
||||
!pair.getValue().toString().isEmpty())
|
||||
conf.put(pair.getKey().toString().substring(5), pair.getValue().toString());
|
||||
}
|
||||
|
||||
String confData = gson.toJson(conf);
|
||||
String user = context.getAuthenticationInfo().getUser();
|
||||
|
||||
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions", "POST",
|
||||
"{" +
|
||||
"\"kind\": \"" + kind + "\", " +
|
||||
"\"conf\": " + confData + ", " +
|
||||
"\"proxyUser\": " + (StringUtils.isEmpty(user) ? null : "\"" + user + "\"") +
|
||||
"}",
|
||||
context.getParagraphId()
|
||||
);
|
||||
|
||||
Map jsonMap = (Map<Object, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<Object, Object>>() {
|
||||
}.getType());
|
||||
Integer sessionId = ((Double) jsonMap.get("id")).intValue();
|
||||
if (!jsonMap.get("state").equals("idle")) {
|
||||
Integer retryCount = 60;
|
||||
|
||||
try {
|
||||
retryCount = Integer.valueOf(
|
||||
property.getProperty("zeppelin.livy.create.session.retries"));
|
||||
} catch (Exception e) {
|
||||
LOGGER.info("zeppelin.livy.create.session.retries property is not configured." +
|
||||
" Using default retry count.");
|
||||
}
|
||||
|
||||
while (retryCount >= 0) {
|
||||
LOGGER.error(String.format("sessionId:%s state is %s",
|
||||
jsonMap.get("id"), jsonMap.get("state")));
|
||||
Thread.sleep(1000);
|
||||
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
|
||||
sessionId, "GET", null, context.getParagraphId());
|
||||
jsonMap = (Map<Object, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<Object, Object>>() {
|
||||
}.getType());
|
||||
if (jsonMap.get("state").equals("idle")) {
|
||||
break;
|
||||
} else if (jsonMap.get("state").equals("error") || jsonMap.get("state").equals("dead")) {
|
||||
json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" +
|
||||
sessionId + "/log",
|
||||
"GET", null,
|
||||
context.getParagraphId());
|
||||
jsonMap = (Map<Object, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<Object, Object>>() {
|
||||
}.getType());
|
||||
String logs = StringUtils.join((ArrayList<String>) jsonMap.get("log"), '\n');
|
||||
LOGGER.error(String.format("Cannot start %s.\n%s", kind, logs));
|
||||
throw new Exception(String.format("Cannot start %s.\n%s", kind, logs));
|
||||
}
|
||||
retryCount--;
|
||||
}
|
||||
if (retryCount <= 0) {
|
||||
LOGGER.error("Error getting session for user within the configured number of retries.");
|
||||
throw new Exception(String.format("Cannot start %s.", kind));
|
||||
}
|
||||
}
|
||||
return sessionId;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error getting session for user", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public InterpreterResult interpretInput(String stringLines,
|
||||
final InterpreterContext context,
|
||||
int sessionId,
|
||||
LivyOutputStream out,
|
||||
String appId,
|
||||
String webUI,
|
||||
boolean displayAppInfo) {
|
||||
try {
|
||||
out.setInterpreterOutput(context.out);
|
||||
context.out.clear();
|
||||
String incomplete = "";
|
||||
boolean inComment = false;
|
||||
String[] lines = stringLines.split("\n");
|
||||
String[] linesToRun = new String[lines.length + 1];
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
linesToRun[i] = lines[i];
|
||||
}
|
||||
linesToRun[lines.length] = "print(\"\")";
|
||||
Code r = null;
|
||||
StringBuilder outputBuilder = new StringBuilder();
|
||||
for (int l = 0; l < linesToRun.length; l++) {
|
||||
String s = linesToRun[l];
|
||||
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
|
||||
//for spark
|
||||
if (l + 1 < linesToRun.length) {
|
||||
String nextLine = linesToRun[l + 1].trim();
|
||||
boolean continuation = false;
|
||||
if (nextLine.isEmpty()
|
||||
|| nextLine.startsWith("//") // skip empty line or comment
|
||||
|| nextLine.startsWith("}")
|
||||
|| nextLine.startsWith("object")) { // include "} object" for Scala companion object
|
||||
continuation = true;
|
||||
} else if (!inComment && nextLine.startsWith("/*")) {
|
||||
inComment = true;
|
||||
continuation = true;
|
||||
} else if (inComment && nextLine.lastIndexOf("*/") >= 0) {
|
||||
inComment = false;
|
||||
continuation = true;
|
||||
} else if (nextLine.length() > 1
|
||||
&& nextLine.charAt(0) == '.'
|
||||
&& nextLine.charAt(1) != '.' // ".."
|
||||
&& nextLine.charAt(1) != '/') { // "./"
|
||||
continuation = true;
|
||||
} else if (inComment) {
|
||||
continuation = true;
|
||||
}
|
||||
if (continuation) {
|
||||
incomplete += s + "\n";
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
InterpreterResult res;
|
||||
try {
|
||||
res = interpret(incomplete + s, context, sessionId);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Interpreter exception", e);
|
||||
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
|
||||
r = res.code();
|
||||
|
||||
if (r == Code.ERROR) {
|
||||
out.setInterpreterOutput(null);
|
||||
return res;
|
||||
} else if (r == Code.INCOMPLETE) {
|
||||
incomplete += s + "\n";
|
||||
} else {
|
||||
outputBuilder.append(res.message() + "\n");
|
||||
incomplete = "";
|
||||
}
|
||||
}
|
||||
|
||||
if (r == Code.INCOMPLETE) {
|
||||
out.setInterpreterOutput(null);
|
||||
return new InterpreterResult(r, "Incomplete expression");
|
||||
} else {
|
||||
if (displayAppInfo) {
|
||||
out.write("%angular ");
|
||||
out.write("<pre><code>");
|
||||
out.write(outputBuilder.toString());
|
||||
out.write("</code></pre>");
|
||||
out.write("<hr/>");
|
||||
out.write("Spark Application Id:" + appId + "<br/>");
|
||||
out.write("Spark WebUI: <a href=" + webUI + ">" + webUI + "</a>");
|
||||
} else {
|
||||
out.write(outputBuilder.toString());
|
||||
}
|
||||
out.setInterpreterOutput(null);
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("error in interpretInput", e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public InterpreterResult interpret(String stringLines,
|
||||
final InterpreterContext context,
|
||||
int sessionId)
|
||||
throws Exception {
|
||||
if (stringLines.trim().equals("")) {
|
||||
return new InterpreterResult(Code.SUCCESS, "");
|
||||
}
|
||||
Map jsonMap = executeCommand(stringLines, context, sessionId);
|
||||
Integer id = ((Double) jsonMap.get("id")).intValue();
|
||||
InterpreterResult res = getResultFromMap(jsonMap);
|
||||
if (res != null) {
|
||||
return res;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
if (paragraphHttpMap.get(context.getParagraphId()) == null) {
|
||||
return new InterpreterResult(Code.INCOMPLETE, "");
|
||||
}
|
||||
jsonMap = getStatusById(context, sessionId, id);
|
||||
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
|
||||
if (interpreterResult != null) {
|
||||
return interpreterResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private InterpreterResult getResultFromMap(Map jsonMap) {
|
||||
if (jsonMap.get("state").equals("available")) {
|
||||
if (((Map) jsonMap.get("output")).get("status").equals("error")) {
|
||||
StringBuilder errorMessage = new StringBuilder((String) ((Map) jsonMap
|
||||
.get("output")).get("evalue"));
|
||||
if (errorMessage.toString().equals("incomplete statement")
|
||||
|| errorMessage.toString().contains("EOF")) {
|
||||
return new InterpreterResult(Code.INCOMPLETE, "");
|
||||
}
|
||||
String traceback = gson.toJson(((Map) jsonMap.get("output")).get("traceback"));
|
||||
if (!traceback.equals("[]")) {
|
||||
errorMessage
|
||||
.append("\n")
|
||||
.append("traceback: \n")
|
||||
.append(traceback);
|
||||
}
|
||||
|
||||
return new InterpreterResult(Code.ERROR, errorMessage.toString());
|
||||
}
|
||||
if (((Map) jsonMap.get("output")).get("status").equals("ok")) {
|
||||
String result = (String) ((Map) ((Map) jsonMap.get("output"))
|
||||
.get("data")).get("text/plain");
|
||||
if (result != null) {
|
||||
result = result.trim();
|
||||
if (result.startsWith("<link")
|
||||
|| result.startsWith("<script")
|
||||
|| result.startsWith("<style")
|
||||
|| result.startsWith("<div")) {
|
||||
result = "%html " + result;
|
||||
}
|
||||
}
|
||||
return new InterpreterResult(Code.SUCCESS, result);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Map executeCommand(String lines, InterpreterContext context, int sessionId)
|
||||
throws Exception {
|
||||
String json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/"
|
||||
+ sessionId + "/statements",
|
||||
"POST",
|
||||
"{\"code\": \"" + StringEscapeUtils.escapeJson(lines) + "\"}",
|
||||
context.getParagraphId());
|
||||
if (json.matches("^(\")?Session (\'[0-9]\' )?not found(.?\"?)$")) {
|
||||
throw new Exception("Exception: Session not found, Livy server would have restarted, " +
|
||||
"or lost session.");
|
||||
}
|
||||
try {
|
||||
Map jsonMap = gson.fromJson(json,
|
||||
new TypeToken<Map>() {
|
||||
}.getType());
|
||||
return jsonMap;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error executeCommand", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private Map getStatusById(InterpreterContext context,
|
||||
int sessionId, Integer id) throws Exception {
|
||||
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
|
||||
+ sessionId
|
||||
+ "/statements/" + id,
|
||||
"GET", null, context.getParagraphId());
|
||||
LOGGER.debug("statement {} response: {}", id, json);
|
||||
try {
|
||||
Map jsonMap = gson.fromJson(json,
|
||||
new TypeToken<Map>() {
|
||||
}.getType());
|
||||
return jsonMap;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error getStatusById", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private RestTemplate getRestTemplate() {
|
||||
String keytabLocation = property.getProperty("zeppelin.livy.keytab");
|
||||
String principal = property.getProperty("zeppelin.livy.principal");
|
||||
if (StringUtils.isNotEmpty(keytabLocation) && StringUtils.isNotEmpty(principal)) {
|
||||
return new KerberosRestTemplate(keytabLocation, principal);
|
||||
}
|
||||
return new RestTemplate();
|
||||
}
|
||||
|
||||
protected String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
|
||||
throws Exception {
|
||||
LOGGER.debug("Call rest api in {}, method: {}, jsonData: {}", targetURL, method, jsonData);
|
||||
RestTemplate restTemplate = getRestTemplate();
|
||||
HttpHeaders headers = new HttpHeaders();
|
||||
headers.add("Content-Type", "application/json");
|
||||
headers.add("X-Requested-By", "zeppelin");
|
||||
ResponseEntity<String> response = null;
|
||||
try {
|
||||
if (method.equals("POST")) {
|
||||
HttpEntity<String> entity = new HttpEntity<>(jsonData, headers);
|
||||
|
||||
response = restTemplate.exchange(targetURL, HttpMethod.POST, entity, String.class);
|
||||
paragraphHttpMap.put(paragraphId, response);
|
||||
} else if (method.equals("GET")) {
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
response = restTemplate.exchange(targetURL, HttpMethod.GET, entity, String.class);
|
||||
paragraphHttpMap.put(paragraphId, response);
|
||||
} else if (method.equals("DELETE")) {
|
||||
HttpEntity<String> entity = new HttpEntity<>(headers);
|
||||
response = restTemplate.exchange(targetURL, HttpMethod.DELETE, entity, String.class);
|
||||
}
|
||||
} catch (HttpClientErrorException e) {
|
||||
response = new ResponseEntity(e.getResponseBodyAsString(), e.getStatusCode());
|
||||
LOGGER.error(String.format("Error with %s StatusCode: %s",
|
||||
response.getStatusCode().value(), e.getResponseBodyAsString()));
|
||||
}
|
||||
if (response == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (response.getStatusCode().value() == 200
|
||||
|| response.getStatusCode().value() == 201
|
||||
|| response.getStatusCode().value() == 404) {
|
||||
return response.getBody();
|
||||
} else {
|
||||
String responseString = response.getBody();
|
||||
if (responseString.contains("CreateInteractiveRequest[\\\"master\\\"]")) {
|
||||
return responseString;
|
||||
}
|
||||
LOGGER.error(String.format("Error with %s StatusCode: %s",
|
||||
response.getStatusCode().value(), responseString));
|
||||
throw new Exception(String.format("Error with %s StatusCode: %s",
|
||||
response.getStatusCode().value(), responseString));
|
||||
}
|
||||
}
|
||||
|
||||
public void cancelHTTP(String paragraphId) {
|
||||
// TODO(zjffdu), use cancel rest api of livy
|
||||
paragraphHttpMap.put(paragraphId, null);
|
||||
}
|
||||
|
||||
public void closeSession(int sessionId) {
|
||||
try {
|
||||
executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
|
||||
"DELETE", null, null);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(String.format("Error closing session for user with session ID: %s",
|
||||
sessionId), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,84 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
/**
|
||||
* InterpreterOutput can be attached / detached.
|
||||
*/
|
||||
public class LivyOutputStream extends OutputStream {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(LivyOutputStream.class);
|
||||
InterpreterOutput interpreterOutput;
|
||||
|
||||
public LivyOutputStream() {
|
||||
}
|
||||
|
||||
public InterpreterOutput getInterpreterOutput() {
|
||||
return interpreterOutput;
|
||||
}
|
||||
|
||||
public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
|
||||
this.interpreterOutput = interpreterOutput;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.write(b);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b) throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.write(b);
|
||||
}
|
||||
}
|
||||
|
||||
public void write(String text) throws IOException {
|
||||
LOGGER.debug("livy output:" + text);
|
||||
write(text.getBytes("UTF-8"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte[] b, int offset, int len) throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.write(b, offset, len);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.flush();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -38,10 +38,12 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
|
||||
private LivySparkInterpreter sparkInterpreter;
|
||||
|
||||
private boolean sqlContextCreated = false;
|
||||
private boolean isSpark2 = false;
|
||||
private int maxResult = 1000;
|
||||
|
||||
public LivySparkSQLInterpreter(Properties property) {
|
||||
super(property);
|
||||
this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -51,10 +53,56 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
|
||||
@Override
|
||||
public void open() {
|
||||
super.open();
|
||||
this.sparkInterpreter =
|
||||
(LivySparkInterpreter) getInterpreterInTheSameSessionByClassName(
|
||||
LivySparkInterpreter.class.getName());
|
||||
this.sparkInterpreter = getSparkInterpreter();
|
||||
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
|
||||
// to judge whether it is using spark2.
|
||||
try {
|
||||
InterpreterResult result = sparkInterpreter.interpret("spark", false);
|
||||
if (result.code() == InterpreterResult.Code.SUCCESS &&
|
||||
result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) {
|
||||
LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}",
|
||||
sparkInterpreter.getSessionInfo().id);
|
||||
isSpark2 = true;
|
||||
} else {
|
||||
// spark 1.x
|
||||
result = sparkInterpreter.interpret("sqlContext", false);
|
||||
if (result.code() == InterpreterResult.Code.SUCCESS) {
|
||||
LOGGER.info("sqlContext is detected.");
|
||||
} else if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
// create SqlContext if it is not available, as in livy 0.2 sqlContext
|
||||
// is not available.
|
||||
LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves");
|
||||
result = sparkInterpreter.interpret(
|
||||
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
|
||||
+ "import sqlContext.implicits._", false);
|
||||
if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
throw new LivyException("Fail to create SQLContext," +
|
||||
result.message().get(0).getData());
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (LivyException e) {
|
||||
throw new RuntimeException("Fail to Detect SparkVersion", e);
|
||||
}
|
||||
}
|
||||
|
||||
private LivySparkInterpreter getSparkInterpreter() {
|
||||
LazyOpenInterpreter lazy = null;
|
||||
LivySparkInterpreter spark = null;
|
||||
Interpreter p = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
|
||||
|
||||
while (p instanceof WrappedInterpreter) {
|
||||
if (p instanceof LazyOpenInterpreter) {
|
||||
lazy = (LazyOpenInterpreter) p;
|
||||
}
|
||||
p = ((WrappedInterpreter) p).getInnerInterpreter();
|
||||
}
|
||||
spark = (LivySparkInterpreter) p;
|
||||
|
||||
if (lazy != null) {
|
||||
lazy.open();
|
||||
}
|
||||
return spark;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -63,38 +111,20 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
if (StringUtils.isEmpty(line)) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
// create sqlContext implicitly if it is not available, as in livy 0.2 sqlContext
|
||||
// is not available.
|
||||
synchronized (this) {
|
||||
if (!sqlContextCreated) {
|
||||
InterpreterResult result = sparkInterpreter.interpret("sqlContext", context);
|
||||
if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
result = sparkInterpreter.interpret(
|
||||
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
|
||||
+ "import sqlContext.implicits._", context);
|
||||
if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"Fail to create sqlContext," + result.message());
|
||||
}
|
||||
}
|
||||
sqlContextCreated = true;
|
||||
}
|
||||
// replace line separator with space
|
||||
line = line.replace("\n", " ").replace("\r", " ");
|
||||
String sqlQuery = null;
|
||||
if (isSpark2) {
|
||||
sqlQuery = "spark.sql(\"" + line + "\").show(" + maxResult + ")";
|
||||
} else {
|
||||
sqlQuery = "sqlContext.sql(\"" + line + "\").show(" + maxResult + ")";
|
||||
}
|
||||
|
||||
// delegate the work to LivySparkInterpreter in the same session.
|
||||
// TODO(zjffdu), we may create multiple session for the same user here. This can be fixed
|
||||
// after we move session creation to open()
|
||||
InterpreterResult res = sparkInterpreter.interpret("sqlContext.sql(\"" +
|
||||
line.replaceAll("\"", "\\\\\"")
|
||||
.replaceAll("\\n", " ")
|
||||
+ "\").show(" +
|
||||
property.get("zeppelin.livy.spark.sql.maxResult") + ")", context);
|
||||
InterpreterResult res = sparkInterpreter.interpret(sqlQuery, this.displayAppInfo);
|
||||
|
||||
if (res.code() == InterpreterResult.Code.SUCCESS) {
|
||||
StringBuilder resMsg = new StringBuilder();
|
||||
resMsg.append("%table ");
|
||||
String[] rows = new String(context.out.toByteArray()).split("\n");
|
||||
String[] rows = res.message().get(0).getData().split("\n");
|
||||
String[] headers = rows[1].split("\\|");
|
||||
for (int head = 1; head < headers.length; head++) {
|
||||
resMsg.append(headers[head].trim()).append("\t");
|
||||
|
|
@ -121,7 +151,6 @@ public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
|||
} else {
|
||||
return res;
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class SessionNotFoundException extends LivyException {
|
||||
|
||||
public SessionNotFoundException(String message) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
|
@ -13,14 +13,9 @@
|
|||
},
|
||||
"zeppelin.livy.create.session.retries": {
|
||||
"envName": "ZEPPELIN_LIVY_CREATE_SESSION_RETRIES",
|
||||
"propertyName": "zeppelin.livy.create.session.retries",
|
||||
"propertyName": "zeppelin.livy.create.session.timeout",
|
||||
"defaultValue": "120",
|
||||
"description": "Livy Server create session retry count."
|
||||
},
|
||||
"livy.spark.master": {
|
||||
"propertyName": "livy.spark.master",
|
||||
"defaultValue": "local[*]",
|
||||
"description": "Spark master uri. ex) spark://masterhost:7077"
|
||||
"description": "Livy Server create session timeout (seconds)."
|
||||
},
|
||||
"livy.spark.driver.cores": {
|
||||
"propertyName": "livy.spark.driver.cores",
|
||||
|
|
|
|||
|
|
@ -1,111 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import com.google.gson.GsonBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ErrorCollector;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
|
||||
/**
|
||||
* Created for org.apache.zeppelin.livy on 22/04/16.
|
||||
*/
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class LivyHelperTest {
|
||||
|
||||
@Rule
|
||||
public ErrorCollector collector = new ErrorCollector();
|
||||
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
private static LivyPySparkInterpreter interpreter;
|
||||
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
private InterpreterContext interpreterContext;
|
||||
|
||||
@Mock(answer = Answers.CALLS_REAL_METHODS)
|
||||
private LivyHelper livyHelper;
|
||||
|
||||
@Before
|
||||
public void prepareContext() throws Exception {
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
|
||||
livyHelper.property = properties;
|
||||
livyHelper.paragraphHttpMap = new HashMap<>();
|
||||
livyHelper.gson = new GsonBuilder().setPrettyPrinting().create();
|
||||
livyHelper.LOGGER = LoggerFactory.getLogger(LivyHelper.class);
|
||||
|
||||
doReturn("{\"id\":1,\"state\":\"idle\",\"kind\":\"spark\",\"proxyUser\":\"null\",\"log\":[]}")
|
||||
.when(livyHelper)
|
||||
.executeHTTP(
|
||||
livyHelper.property.getProperty("zeppelin.livy.url") + "/sessions",
|
||||
"POST",
|
||||
"{\"kind\": \"spark\", \"conf\": {}, \"proxyUser\": null}",
|
||||
null
|
||||
);
|
||||
|
||||
doReturn("{\"id\":1,\"state\":\"available\",\"output\":{\"status\":\"ok\"," +
|
||||
"\"execution_count\":1,\"data\":{\"text/plain\":\"1\"}}}")
|
||||
.when(livyHelper)
|
||||
.executeHTTP(
|
||||
livyHelper.property.getProperty("zeppelin.livy.url") + "/sessions/1/statements",
|
||||
"POST",
|
||||
"{\"code\": \"print(1)\"}",
|
||||
null
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void checkCreateSession() {
|
||||
try {
|
||||
Integer sessionId = livyHelper.createSession(interpreterContext, "spark");
|
||||
|
||||
collector.checkThat("check sessionId", 1, CoreMatchers.equalTo(sessionId));
|
||||
|
||||
} catch (Exception e) {
|
||||
collector.addError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkInterpret() {
|
||||
try {
|
||||
InterpreterResult result = livyHelper.interpret("print(1)", interpreterContext, 1);
|
||||
collector.checkThat("check sessionId", InterpreterResult.Code.SUCCESS,
|
||||
CoreMatchers.equalTo(result.code()));
|
||||
} catch (Exception e) {
|
||||
collector.addError(e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -20,6 +20,8 @@ package org.apache.zeppelin.livy;
|
|||
|
||||
import com.cloudera.livy.test.framework.Cluster;
|
||||
import com.cloudera.livy.test.framework.Cluster$;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.*;
|
||||
|
|
@ -49,13 +51,14 @@ public class LivyInterpreterIT {
|
|||
LOGGER.info("Starting livy at {}", cluster.livyEndpoint());
|
||||
properties = new Properties();
|
||||
properties.setProperty("zeppelin.livy.url", cluster.livyEndpoint());
|
||||
properties.setProperty("zeppelin.livy.create.session.retries", "120");
|
||||
properties.setProperty("zeppelin.livy.create.session.timeout", "120");
|
||||
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "100");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
if (cluster != null) {
|
||||
LOGGER.info("Shutting down livy at {}", cluster.livyEndpoint());
|
||||
cluster.cleanUp();
|
||||
}
|
||||
}
|
||||
|
|
@ -92,63 +95,63 @@ public class LivyInterpreterIT {
|
|||
try {
|
||||
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData().contains("1.5.2"));
|
||||
|
||||
// test RDD api
|
||||
outputListener.reset();
|
||||
result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertTrue(outputListener.getOutputAppended().contains("Double = 55.0"));
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData().contains("Double = 55.0"));
|
||||
|
||||
// single line comment
|
||||
outputListener.reset();
|
||||
String singleLineComment = "// my comment";
|
||||
String singleLineComment = "println(1)// my comment";
|
||||
result = sparkInterpreter.interpret(singleLineComment, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertEquals(1, result.message().size());
|
||||
|
||||
// multiple line comment
|
||||
outputListener.reset();
|
||||
String multipleLineComment = "/* multiple \n" + "line \n" + "comment */";
|
||||
String multipleLineComment = "println(1)/* multiple \n" + "line \n" + "comment */";
|
||||
result = sparkInterpreter.interpret(multipleLineComment, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertEquals(1, result.message().size());
|
||||
|
||||
// multi-line string
|
||||
outputListener.reset();
|
||||
String multiLineString = "val str = \"\"\"multiple\n" +
|
||||
"line\"\"\"\n" +
|
||||
"println(str)";
|
||||
result = sparkInterpreter.interpret(multiLineString, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertTrue(outputListener.getOutputAppended().contains("multiple\nline"));
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData().contains("multiple\nline"));
|
||||
|
||||
// case class
|
||||
outputListener.reset();
|
||||
String caseClassCode = "case class Person(id:Int, \n" +
|
||||
"name:String)\n" +
|
||||
"val p=Person(1, \"name_a\")";
|
||||
result = sparkInterpreter.interpret(caseClassCode, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertTrue(outputListener.getOutputAppended().contains("defined class Person"));
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData().contains("p: Person = Person(1,name_a)"));
|
||||
|
||||
// object class
|
||||
outputListener.reset();
|
||||
String objectClassCode = "object Person {}";
|
||||
result = sparkInterpreter.interpret(objectClassCode, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertTrue(outputListener.getOutputAppended().contains("defined module Person"));
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData().contains("defined module Person"));
|
||||
|
||||
// error
|
||||
result = sparkInterpreter.interpret("println(a)", context);
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertTrue(result.message().get(0).getData().contains("error: not found: value a"));
|
||||
|
||||
// incomplete code
|
||||
result = sparkInterpreter.interpret("if(true){", context);
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertTrue(result.message().get(0).getData().contains("incomplete statement"));
|
||||
} finally {
|
||||
sparkInterpreter.close();
|
||||
}
|
||||
|
|
@ -178,19 +181,17 @@ public class LivyInterpreterIT {
|
|||
|
||||
try {
|
||||
// test DataFrame api
|
||||
outputListener.reset();
|
||||
sparkInterpreter.interpret("val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
|
||||
+ "import sqlContext.implicits._", context);
|
||||
InterpreterResult result = sparkInterpreter.interpret("val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n"
|
||||
+ "df.collect()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertTrue(outputListener.getOutputAppended()
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData()
|
||||
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
|
||||
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
|
||||
|
||||
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
|
||||
outputListener.reset();
|
||||
result = sqlInterpreter.interpret("select * from df", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
|
|
@ -209,10 +210,12 @@ public class LivyInterpreterIT {
|
|||
}
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
|
||||
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
|
||||
LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
|
||||
new LivySparkInterpreter(properties));
|
||||
sparkInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
interpreterGroup.get("session_1").add(sparkInterpreter);
|
||||
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
|
||||
LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter(
|
||||
new LivySparkSQLInterpreter(properties));
|
||||
interpreterGroup.get("session_1").add(sqlInterpreter);
|
||||
sqlInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
sqlInterpreter.open();
|
||||
|
|
@ -249,25 +252,23 @@ public class LivyInterpreterIT {
|
|||
try {
|
||||
InterpreterResult result = pysparkInterpreter.interpret("sc.version", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData().contains("1.5.2"));
|
||||
|
||||
// test RDD api
|
||||
outputListener.reset();
|
||||
result = pysparkInterpreter.interpret("sc.range(1, 10).sum()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertTrue(outputListener.getOutputAppended().contains("45"));
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData().contains("45"));
|
||||
|
||||
// test DataFrame api
|
||||
outputListener.reset();
|
||||
pysparkInterpreter.interpret("from pyspark.sql import SQLContext\n"
|
||||
+ "sqlContext = SQLContext(sc)", context);
|
||||
result = pysparkInterpreter.interpret("df=sqlContext.createDataFrame([(\"hello\",20)])\n"
|
||||
+ "df.collect()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(0, result.message().size());
|
||||
assertTrue(outputListener.getOutputAppended().contains("[Row(_1=u'hello', _2=20)]"));
|
||||
assertEquals(1, result.message().size());
|
||||
assertTrue(result.message().get(0).getData().contains("[Row(_1=u'hello', _2=20)]"));
|
||||
|
||||
// error
|
||||
result = pysparkInterpreter.interpret("print(a)", context);
|
||||
|
|
@ -287,37 +288,52 @@ public class LivyInterpreterIT {
|
|||
// TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
|
||||
}
|
||||
|
||||
public static class MyInterpreterOutputListener implements InterpreterOutputListener {
|
||||
private StringBuilder outputAppended = new StringBuilder();
|
||||
private StringBuilder outputUpdated = new StringBuilder();
|
||||
@Test
|
||||
public void testLivyTutorialNote() throws IOException {
|
||||
if (!checkPreCondition()) {
|
||||
return;
|
||||
}
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
|
||||
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
LazyOpenInterpreter sparkInterpreter = new LazyOpenInterpreter(
|
||||
new LivySparkInterpreter(properties));
|
||||
sparkInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
interpreterGroup.get("session_1").add(sparkInterpreter);
|
||||
LazyOpenInterpreter sqlInterpreter = new LazyOpenInterpreter(
|
||||
new LivySparkSQLInterpreter(properties));
|
||||
interpreterGroup.get("session_1").add(sqlInterpreter);
|
||||
sqlInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
sqlInterpreter.open();
|
||||
|
||||
try {
|
||||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.sql",
|
||||
"title", "text", authInfo, null, null, null, null, null, output);
|
||||
|
||||
String p1 = IOUtils.toString(getClass().getResourceAsStream("/livy_tutorial_1.scala"));
|
||||
InterpreterResult result = sparkInterpreter.interpret(p1, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
||||
String p2 = IOUtils.toString(getClass().getResourceAsStream("/livy_tutorial_2.sql"));
|
||||
result = sqlInterpreter.interpret(p2, context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
|
||||
} finally {
|
||||
sparkInterpreter.close();
|
||||
sqlInterpreter.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static class MyInterpreterOutputListener implements InterpreterOutputListener {
|
||||
@Override
|
||||
public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) {
|
||||
LOGGER.info("onAppend:" + new String(line));
|
||||
outputAppended.append(new String(line));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(int index, InterpreterResultMessageOutput out) {
|
||||
try {
|
||||
LOGGER.info("onUpdate:" + new String(out.toByteArray()));
|
||||
outputUpdated.append(new String(out.toByteArray()));
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public String getOutputAppended() {
|
||||
return outputAppended.toString();
|
||||
}
|
||||
|
||||
public String getOutputUpdated() {
|
||||
return outputUpdated.toString();
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
outputAppended = new StringBuilder();
|
||||
outputUpdated = new StringBuilder();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
24
livy/src/test/resources/livy_tutorial_1.scala
Normal file
24
livy/src/test/resources/livy_tutorial_1.scala
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
import org.apache.commons.io.IOUtils
|
||||
import java.net.URL
|
||||
import java.nio.charset.Charset
|
||||
|
||||
// Zeppelin creates and injects sc (SparkContext) and sqlContext (HiveContext or SqlContext)
|
||||
// So you don't need create them manually
|
||||
|
||||
// load bank data
|
||||
val bankText = sc.parallelize(
|
||||
IOUtils.toString(
|
||||
new URL("https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv"),
|
||||
Charset.forName("utf8")).split("\n"))
|
||||
|
||||
case class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)
|
||||
|
||||
val bank = bankText.map(s => s.split(";")).filter(s => s(0) != "\"age\"").map(
|
||||
s => Bank(s(0).toInt,
|
||||
s(1).replaceAll("\"", ""),
|
||||
s(2).replaceAll("\"", ""),
|
||||
s(3).replaceAll("\"", ""),
|
||||
s(5).replaceAll("\"", "").toInt
|
||||
)
|
||||
).toDF()
|
||||
bank.registerTempTable("bank")
|
||||
5
livy/src/test/resources/livy_tutorial_2.sql
Normal file
5
livy/src/test/resources/livy_tutorial_2.sql
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
select age, count(1) value
|
||||
from bank
|
||||
where age < 30
|
||||
group by age
|
||||
order by age
|
||||
28
testing/setupLivy.sh
Executable file
28
testing/setupLivy.sh
Executable file
|
|
@ -0,0 +1,28 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
|
||||
set -xe
|
||||
|
||||
if [[ -n $LIVY_VER ]]; then
|
||||
./testing/downloadLivy.sh
|
||||
export LIVY_HOME=`pwd`/livy-server-$LIVY_VER
|
||||
export SPARK_HOME=`pwd`/spark-$SPARK_VER-bin-hadoop$HADOOP_VER
|
||||
fi
|
||||
|
||||
set +xe
|
||||
Loading…
Reference in a new issue