mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-1607. Refactor Livy Interpreter to adapt scope mode
This commit is contained in:
parent
0e4d961683
commit
798de1bb55
10 changed files with 290 additions and 449 deletions
41
livy/pom.xml
41
livy/pom.xml
|
|
@ -287,13 +287,16 @@
|
|||
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.spark</groupId>
|
||||
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
|
||||
<version>${spark.version}</version>
|
||||
<scope>test</scope>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-common</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
@ -310,6 +313,14 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-common</artifactId>
|
||||
<classifier>tests</classifier>
|
||||
<version>${hadoop.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
|
|
@ -317,6 +328,14 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-hdfs</artifactId>
|
||||
<classifier>tests</classifier>
|
||||
<version>${hadoop.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-client</artifactId>
|
||||
|
|
@ -331,9 +350,17 @@
|
|||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-api</artifactId>
|
||||
<version>${hadoop.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-tests</artifactId>
|
||||
<classifier>tests</classifier>
|
||||
<version>${hadoop.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,123 @@
|
|||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Base class for livy interpreters.
|
||||
*/
|
||||
public abstract class BaseLivyInterprereter extends Interpreter {
|
||||
|
||||
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class);
|
||||
|
||||
// -1 means session is not created yet, valid sessionId start from 0
|
||||
protected int sessionId = -1;
|
||||
protected String appId;
|
||||
protected String webUIAddress;
|
||||
protected boolean displayAppInfo;
|
||||
protected LivyOutputStream out;
|
||||
protected LivyHelper livyHelper;
|
||||
|
||||
public BaseLivyInterprereter(Properties property) {
|
||||
super(property);
|
||||
this.out = new LivyOutputStream();
|
||||
this.livyHelper = new LivyHelper(property);
|
||||
}
|
||||
|
||||
public abstract String getSessionKind();
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
// TODO(zjffdu) move session creation here.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (sessionId != -1) {
|
||||
livyHelper.closeSession(sessionId);
|
||||
}
|
||||
}
|
||||
|
||||
protected void createSession(InterpreterContext context) throws Exception {
|
||||
sessionId = livyHelper.createSession(context, getSessionKind());
|
||||
if (displayAppInfo) {
|
||||
this.appId = extractStatementResult(
|
||||
livyHelper.interpret("sc.applicationId", context, sessionId).message());
|
||||
livyHelper.interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
|
||||
context, sessionId);
|
||||
this.webUIAddress = extractStatementResult(
|
||||
livyHelper.interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
|
||||
context, sessionId).message());
|
||||
LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
|
||||
sessionId, appId, webUIAddress);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
try {
|
||||
// add synchronized, because LivySparkSQLInterperter will use ParallelScheduler
|
||||
synchronized (this) {
|
||||
if (sessionId == -1) {
|
||||
try {
|
||||
createSession(context);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception while creating livy session", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (st == null || st.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(st, context, sessionId, out,
|
||||
appId, webUIAddress, displayAppInfo);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivyInterpreter.", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
|
||||
* from following:
|
||||
* res0: String = application_1473129941656_0048
|
||||
*
|
||||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private static String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("=")) >= 0) {
|
||||
return result.substring(pos + 1).trim();
|
||||
} else {
|
||||
throw new RuntimeException("No result can be extracted from '" + result + "', " +
|
||||
"something must be wrong");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
@ -133,7 +133,7 @@ public class LivyHelper {
|
|||
|
||||
public InterpreterResult interpretInput(String stringLines,
|
||||
final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap,
|
||||
int sessionId,
|
||||
LivyOutputStream out,
|
||||
String appId,
|
||||
String webUI,
|
||||
|
|
@ -185,7 +185,7 @@ public class LivyHelper {
|
|||
|
||||
InterpreterResult res;
|
||||
try {
|
||||
res = interpret(incomplete + s, context, userSessionMap);
|
||||
res = interpret(incomplete + s, context, sessionId);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Interpreter exception", e);
|
||||
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
|
||||
|
|
@ -230,12 +230,12 @@ public class LivyHelper {
|
|||
|
||||
public InterpreterResult interpret(String stringLines,
|
||||
final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap)
|
||||
int sessionId)
|
||||
throws Exception {
|
||||
if (stringLines.trim().equals("")) {
|
||||
return new InterpreterResult(Code.SUCCESS, "");
|
||||
}
|
||||
Map jsonMap = executeCommand(stringLines, context, userSessionMap);
|
||||
Map jsonMap = executeCommand(stringLines, context, sessionId);
|
||||
Integer id = ((Double) jsonMap.get("id")).intValue();
|
||||
InterpreterResult res = getResultFromMap(jsonMap);
|
||||
if (res != null) {
|
||||
|
|
@ -247,7 +247,7 @@ public class LivyHelper {
|
|||
if (paragraphHttpMap.get(context.getParagraphId()) == null) {
|
||||
return new InterpreterResult(Code.INCOMPLETE, "");
|
||||
}
|
||||
jsonMap = getStatusById(context, userSessionMap, id);
|
||||
jsonMap = getStatusById(context, sessionId, id);
|
||||
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
|
||||
if (interpreterResult != null) {
|
||||
return interpreterResult;
|
||||
|
|
@ -292,11 +292,10 @@ public class LivyHelper {
|
|||
return null;
|
||||
}
|
||||
|
||||
private Map executeCommand(String lines, InterpreterContext context,
|
||||
Map<String, Integer> userSessionMap) throws Exception {
|
||||
private Map executeCommand(String lines, InterpreterContext context, int sessionId)
|
||||
throws Exception {
|
||||
String json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/"
|
||||
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
|
||||
+ "/statements",
|
||||
+ sessionId + "/statements",
|
||||
"POST",
|
||||
"{\"code\": \"" + StringEscapeUtils.escapeJson(lines) + "\"}",
|
||||
context.getParagraphId());
|
||||
|
|
@ -316,9 +315,9 @@ public class LivyHelper {
|
|||
}
|
||||
|
||||
private Map getStatusById(InterpreterContext context,
|
||||
Map<String, Integer> userSessionMap, Integer id) throws Exception {
|
||||
int sessionId, Integer id) throws Exception {
|
||||
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
|
||||
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
|
||||
+ sessionId
|
||||
+ "/statements/" + id,
|
||||
"GET", null, context.getParagraphId());
|
||||
LOGGER.debug("statement {} response: {}", id, json);
|
||||
|
|
@ -390,19 +389,18 @@ public class LivyHelper {
|
|||
}
|
||||
|
||||
public void cancelHTTP(String paragraphId) {
|
||||
// TODO(zjffdu), use cancel rest api of livy
|
||||
paragraphHttpMap.put(paragraphId, null);
|
||||
}
|
||||
|
||||
public void closeSession(Map<String, Integer> userSessionMap) {
|
||||
for (Map.Entry<String, Integer> entry : userSessionMap.entrySet()) {
|
||||
try {
|
||||
executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
|
||||
+ entry.getValue(),
|
||||
"DELETE", null, null);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(String.format("Error closing session for user with session ID: %s",
|
||||
entry.getValue()), e);
|
||||
}
|
||||
public void closeSession(int sessionId) {
|
||||
try {
|
||||
executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
|
||||
"DELETE", null, null);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(String.format("Error closing session for user with session ID: %s",
|
||||
sessionId), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,81 +33,14 @@ import java.util.Properties;
|
|||
/**
|
||||
* Livy PySpark interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivyPySparkInterpreter extends Interpreter {
|
||||
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);
|
||||
|
||||
protected Map<String, Integer> userSessionMap;
|
||||
protected LivyHelper livyHelper;
|
||||
public class LivyPySparkInterpreter extends BaseLivyInterprereter {
|
||||
|
||||
public LivyPySparkInterpreter(Properties property) {
|
||||
super(property);
|
||||
userSessionMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
public String getSessionKind() {
|
||||
return "pyspark";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
|
||||
try {
|
||||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext,
|
||||
"pyspark")
|
||||
);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpret(line, interpreterContext, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
LivyPySparkInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InterpreterCompletion> completion(String buf, int cursor) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,136 +28,20 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Livy Spark interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivySparkInterpreter extends Interpreter {
|
||||
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivySparkInterpreter.class);
|
||||
private LivyOutputStream out;
|
||||
|
||||
protected static Map<String, Integer> userSessionMap;
|
||||
protected static Map<Integer, String> sessionId2AppIdMap;
|
||||
protected static Map<Integer, String> sessionId2WebUIMap;
|
||||
|
||||
private LivyHelper livyHelper;
|
||||
private boolean displayAppInfo;
|
||||
public class LivySparkInterpreter extends BaseLivyInterprereter {
|
||||
|
||||
public LivySparkInterpreter(Properties property) {
|
||||
super(property);
|
||||
userSessionMap = new HashMap<>();
|
||||
sessionId2AppIdMap = new HashMap<>();
|
||||
sessionId2WebUIMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper(property);
|
||||
out = new LivyOutputStream();
|
||||
this.displayAppInfo = Boolean.parseBoolean(getProperty("zeppelin.livy.displayAppInfo"));
|
||||
}
|
||||
|
||||
protected static Map<String, Integer> getUserSessionMap() {
|
||||
return userSessionMap;
|
||||
}
|
||||
|
||||
public void setUserSessionMap(Map<String, Integer> userSessionMap) {
|
||||
this.userSessionMap = userSessionMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
Integer sessionId = null;
|
||||
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
|
||||
try {
|
||||
sessionId = livyHelper.createSession(interpreterContext, "spark");
|
||||
userSessionMap.put(interpreterContext.getAuthenticationInfo().getUser(), sessionId);
|
||||
if (displayAppInfo) {
|
||||
String appId = extractStatementResult(
|
||||
livyHelper.interpret("sc.applicationId", interpreterContext, userSessionMap)
|
||||
.message().get(0).getData());
|
||||
livyHelper.interpret(
|
||||
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
|
||||
interpreterContext, userSessionMap);
|
||||
String webUI = extractStatementResult(
|
||||
livyHelper.interpret(
|
||||
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
|
||||
interpreterContext, userSessionMap).message().get(0).getData());
|
||||
sessionId2AppIdMap.put(sessionId, appId);
|
||||
sessionId2WebUIMap.put(sessionId, webUI);
|
||||
LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
|
||||
sessionId, appId, webUI);
|
||||
} else {
|
||||
LOGGER.info("Create livy session with sessionId: {}", sessionId);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
} else {
|
||||
sessionId = userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser());
|
||||
}
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out,
|
||||
sessionId2AppIdMap.get(sessionId), sessionId2WebUIMap.get(sessionId), displayAppInfo);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
|
||||
* from following:
|
||||
* res0: String = application_1473129941656_0048
|
||||
* @param result
|
||||
* @return
|
||||
*/
|
||||
private static String extractStatementResult(String result) {
|
||||
int pos = -1;
|
||||
if ((pos = result.indexOf("=")) >= 0) {
|
||||
return result.substring(pos + 1).trim();
|
||||
} else {
|
||||
throw new RuntimeException("No result can be extracted from '" + result + "', " +
|
||||
"something must be wrong");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InterpreterCompletion> completion(String buf, int cursor) {
|
||||
return null;
|
||||
public String getSessionKind() {
|
||||
return "spark";
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,81 +33,14 @@ import java.util.Properties;
|
|||
/**
|
||||
* Livy PySpark interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivySparkRInterpreter extends Interpreter {
|
||||
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivySparkRInterpreter.class);
|
||||
|
||||
protected Map<String, Integer> userSessionMap;
|
||||
private LivyHelper livyHelper;
|
||||
public class LivySparkRInterpreter extends BaseLivyInterprereter {
|
||||
|
||||
public LivySparkRInterpreter(Properties property) {
|
||||
super(property);
|
||||
userSessionMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
public String getSessionKind() {
|
||||
return "sparkr";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
|
||||
try {
|
||||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext,
|
||||
"sparkr")
|
||||
);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpret(line, interpreterContext, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
LivySparkRInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InterpreterCompletion> completion(String buf, int cursor) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,66 +27,67 @@ import org.slf4j.LoggerFactory;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
||||
/**
|
||||
* Livy PySpark interpreter for Zeppelin.
|
||||
* Livy SparkSQL Interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivySparkSQLInterpreter extends Interpreter {
|
||||
public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
|
||||
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class);
|
||||
private LivySparkInterpreter sparkInterpreter;
|
||||
|
||||
protected Map<String, Integer> userSessionMap;
|
||||
private LivyHelper livyHelper;
|
||||
private boolean sqlContextCreated = false;
|
||||
|
||||
public LivySparkSQLInterpreter(Properties property) {
|
||||
super(property);
|
||||
livyHelper = new LivyHelper(property);
|
||||
userSessionMap = LivySparkInterpreter.getUserSessionMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getSessionKind() {
|
||||
return "spark";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
super.open();
|
||||
this.sparkInterpreter =
|
||||
(LivySparkInterpreter) getInterpreterInTheSameSessionByClassName(
|
||||
LivySparkInterpreter.class.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
livyHelper.closeSession(userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
try {
|
||||
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
|
||||
try {
|
||||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext,
|
||||
"spark")
|
||||
);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
InterpreterResult res = livyHelper.interpret("sqlContext.sql(\"" +
|
||||
line.replaceAll("\"", "\\\\\"")
|
||||
.replaceAll("\\n", " ")
|
||||
+ "\").show(" +
|
||||
property.get("zeppelin.livy.spark.sql.maxResult") + ")",
|
||||
interpreterContext, userSessionMap);
|
||||
// create sqlContext implicitly, as in livy 0.2 sqlContext is not available.
|
||||
if (!sqlContextCreated) {
|
||||
InterpreterResult result = sparkInterpreter.interpret("sqlContext", context);
|
||||
if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
result = sparkInterpreter.interpret(
|
||||
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
|
||||
+ "import sqlContext.implicits._", context);
|
||||
if (result.code() == InterpreterResult.Code.ERROR) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to create sqlContext,"
|
||||
+ result.message());
|
||||
}
|
||||
}
|
||||
sqlContextCreated = true;
|
||||
}
|
||||
// delegate the work to LivySparkInterpreter in the same session.
|
||||
InterpreterResult res = sparkInterpreter.interpret("sqlContext.sql(\"" +
|
||||
line.replaceAll("\"", "\\\\\"")
|
||||
.replaceAll("\\n", " ")
|
||||
+ "\").show(" +
|
||||
property.get("zeppelin.livy.spark.sql.maxResult") + ")", context);
|
||||
|
||||
if (res.code() == InterpreterResult.Code.SUCCESS) {
|
||||
StringBuilder resMsg = new StringBuilder();
|
||||
resMsg.append("%table ");
|
||||
String[] rows = res.message().get(0).getData().split("\n");
|
||||
|
||||
String[] rows = new String(context.out.toByteArray()).split("\n");
|
||||
String[] headers = rows[1].split("\\|");
|
||||
for (int head = 1; head < headers.length; head++) {
|
||||
resMsg.append(headers[head].trim()).append("\t");
|
||||
|
|
@ -114,7 +115,6 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
return res;
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
|
|
@ -126,21 +126,6 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
livyHelper.cancelHTTP(context.getParagraphId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
if (concurrentSQL()) {
|
||||
|
|
@ -158,9 +143,4 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<InterpreterCompletion> completion(String buf, int cursor) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,9 +56,6 @@ public class LivyHelperTest {
|
|||
|
||||
@Before
|
||||
public void prepareContext() throws Exception {
|
||||
interpreter.userSessionMap = new HashMap<>();
|
||||
interpreter.userSessionMap.put(null, 1);
|
||||
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
|
||||
livyHelper.property = properties;
|
||||
|
|
@ -103,10 +100,9 @@ public class LivyHelperTest {
|
|||
@Test
|
||||
public void checkInterpret() {
|
||||
try {
|
||||
InterpreterResult result = livyHelper.interpret("print(1)", interpreterContext, interpreter.userSessionMap);
|
||||
|
||||
collector.checkThat("check sessionId", InterpreterResult.Code.SUCCESS, CoreMatchers.equalTo(result.code()));
|
||||
|
||||
InterpreterResult result = livyHelper.interpret("print(1)", interpreterContext, 1);
|
||||
collector.checkThat("check sessionId", InterpreterResult.Code.SUCCESS,
|
||||
CoreMatchers.equalTo(result.code()));
|
||||
} catch (Exception e) {
|
||||
collector.addError(e);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,15 +27,16 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class LivyIntegrationTest {
|
||||
public class LivyInterpreterIT {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(LivyIntegrationTest.class);
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(LivyInterpreterIT.class);
|
||||
private static Cluster cluster;
|
||||
private static Properties properties;
|
||||
|
||||
|
|
@ -76,8 +77,11 @@ public class LivyIntegrationTest {
|
|||
if (!checkPreCondition()) {
|
||||
return;
|
||||
}
|
||||
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
|
||||
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
|
||||
sparkInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
interpreterGroup.get("session_1").add(sparkInterpreter);
|
||||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
|
|
@ -86,13 +90,16 @@ public class LivyIntegrationTest {
|
|||
sparkInterpreter.open();
|
||||
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertNull(result.message());
|
||||
assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
|
||||
|
||||
// test RDD api
|
||||
outputListener.reset();
|
||||
result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertNull(result.message());
|
||||
assertTrue(outputListener.getOutputAppended().contains("Double = 55.0"));
|
||||
|
||||
// test DataFrame api
|
||||
|
|
@ -102,7 +109,8 @@ public class LivyIntegrationTest {
|
|||
result = sparkInterpreter.interpret("val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n"
|
||||
+ "df.collect()" , context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertNull(result.message());
|
||||
assertTrue(outputListener.getOutputAppended()
|
||||
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
|
||||
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
|
||||
|
|
@ -110,6 +118,8 @@ public class LivyIntegrationTest {
|
|||
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
|
||||
outputListener.reset();
|
||||
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
|
||||
interpreterGroup.get("session_1").add(sqlInterpreter);
|
||||
sqlInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
sqlInterpreter.open();
|
||||
result = sqlInterpreter.interpret("select * from df", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
|
|
@ -163,6 +173,37 @@ public class LivyIntegrationTest {
|
|||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertNull(result.message().get(0).getData());
|
||||
assertTrue(outputListener.getOutputAppended().contains("defined module Person"));
|
||||
|
||||
// error
|
||||
result = sparkInterpreter.interpret("println(a)", context);
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertTrue(result.message().contains("error: not found: value a"));
|
||||
|
||||
sparkInterpreter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSparkSQLInterpreter() {
|
||||
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
|
||||
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
|
||||
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
|
||||
sparkInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
interpreterGroup.get("session_1").add(sparkInterpreter);
|
||||
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
|
||||
interpreterGroup.get("session_1").add(sqlInterpreter);
|
||||
sqlInterpreter.setInterpreterGroup(interpreterGroup);
|
||||
sqlInterpreter.open();
|
||||
|
||||
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
|
||||
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
|
||||
InterpreterOutput output = new InterpreterOutput(outputListener);
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "title",
|
||||
"text", authInfo, null, null, null, null, null, output);
|
||||
InterpreterResult result = sqlInterpreter.interpret("show tables", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TABLE, result.type());
|
||||
assertTrue(result.message().contains("tableName"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -180,14 +221,17 @@ public class LivyIntegrationTest {
|
|||
pysparkInterpreter.open();
|
||||
InterpreterResult result = pysparkInterpreter.interpret("sc.version", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertNull(result.message());
|
||||
assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
|
||||
|
||||
// test RDD api
|
||||
outputListener.reset();
|
||||
result = pysparkInterpreter.interpret("sc.range(1, 10).sum()", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals("45", result.message().get(0).getData());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertNull(result.message());
|
||||
assertTrue(outputListener.getOutputAppended().contains("45"));
|
||||
|
||||
// test DataFrame api
|
||||
outputListener.reset();
|
||||
|
|
@ -195,9 +239,18 @@ public class LivyIntegrationTest {
|
|||
+ "sqlContext = SQLContext(sc)", context);
|
||||
result = pysparkInterpreter.interpret("df=sqlContext.createDataFrame([(\"hello\",20)])\n"
|
||||
+ "df.collect()" , context);
|
||||
assertTrue(result.message().contains("[Row(_1=u'hello', _2=20)]"));
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
assertNull(result.message());
|
||||
assertTrue(outputListener.getOutputAppended().contains("[Row(_1=u'hello', _2=20)]"));
|
||||
|
||||
// error
|
||||
result = pysparkInterpreter.interpret("print(a)", context);
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code());
|
||||
assertEquals(InterpreterResult.Type.TEXT, result.type());
|
||||
assertTrue(result.message().contains("name 'a' is not defined"));
|
||||
|
||||
pysparkInterpreter.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -205,7 +258,7 @@ public class LivyIntegrationTest {
|
|||
if (!checkPreCondition()) {
|
||||
return;
|
||||
}
|
||||
// TODO (zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
|
||||
// TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
|
||||
}
|
||||
|
||||
public static class MyInterpreterOutputListener implements InterpreterOutputListener {
|
||||
|
|
@ -1,86 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.hamcrest.CoreMatchers;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ErrorCollector;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Answers;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class LivyInterpreterTest {
|
||||
|
||||
@Rule
|
||||
public ErrorCollector collector = new ErrorCollector();
|
||||
|
||||
private static LivyPySparkInterpreter interpreter;
|
||||
|
||||
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
|
||||
private InterpreterContext interpreterContext;
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
interpreter.close();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void prepareContext() throws Exception {
|
||||
interpreter = new LivyPySparkInterpreter(new Properties());
|
||||
interpreter.userSessionMap = new HashMap<>();
|
||||
interpreter.userSessionMap.put(null, 0);
|
||||
interpreter.livyHelper = Mockito.mock(LivyHelper.class);
|
||||
interpreter.open();
|
||||
|
||||
doReturn(new InterpreterResult(InterpreterResult.Code.SUCCESS)).when(interpreter.livyHelper)
|
||||
.interpret("print \"x is 1.\"", interpreterContext, interpreter.userSessionMap);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkInitVariables() throws Exception {
|
||||
collector.checkThat("Check that, if userSessionMap is made: ",
|
||||
interpreter.userSessionMap, CoreMatchers.notNullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void checkBasicInterpreter() throws Exception {
|
||||
|
||||
String paragraphString = "print \"x is 1.\"";
|
||||
|
||||
final InterpreterResult actual = interpreter.interpret(paragraphString, interpreterContext);
|
||||
|
||||
collector.checkThat("Check that, result is computed: ",
|
||||
actual.code(), CoreMatchers.equalTo(InterpreterResult.Code.SUCCESS));
|
||||
assertThat(actual).isNotNull();
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue