ZEPPELIN-1607. Refactor Livy Interpreter to adapt scope mode

This commit is contained in:
Jeff Zhang 2016-11-03 14:21:33 +08:00
parent 0e4d961683
commit 798de1bb55
10 changed files with 290 additions and 449 deletions

View file

@ -287,13 +287,16 @@
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@ -310,6 +313,14 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<classifier>tests</classifier>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
@ -317,6 +328,14 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<classifier>tests</classifier>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@ -331,9 +350,17 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<classifier>tests</classifier>
<version>${hadoop.version}</version>
<scope>test</scope>
</dependency>

View file

@ -0,0 +1,123 @@
package org.apache.zeppelin.livy;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
/**
* Base class for livy interpreters.
*/
public abstract class BaseLivyInterprereter extends Interpreter {
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseLivyInterprereter.class);
// -1 means session is not created yet, valid sessionId start from 0
protected int sessionId = -1;
protected String appId;
protected String webUIAddress;
protected boolean displayAppInfo;
protected LivyOutputStream out;
protected LivyHelper livyHelper;
public BaseLivyInterprereter(Properties property) {
super(property);
this.out = new LivyOutputStream();
this.livyHelper = new LivyHelper(property);
}
public abstract String getSessionKind();
@Override
public void open() {
// TODO(zjffdu) move session creation here.
}
@Override
public void close() {
if (sessionId != -1) {
livyHelper.closeSession(sessionId);
}
}
protected void createSession(InterpreterContext context) throws Exception {
sessionId = livyHelper.createSession(context, getSessionKind());
if (displayAppInfo) {
this.appId = extractStatementResult(
livyHelper.interpret("sc.applicationId", context, sessionId).message());
livyHelper.interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
context, sessionId);
this.webUIAddress = extractStatementResult(
livyHelper.interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
context, sessionId).message());
LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
sessionId, appId, webUIAddress);
}
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
try {
// add synchronized, because LivySparkSQLInterperter will use ParallelScheduler
synchronized (this) {
if (sessionId == -1) {
try {
createSession(context);
} catch (Exception e) {
LOGGER.error("Exception while creating livy session", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
}
}
if (st == null || st.trim().length() == 0) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
return livyHelper.interpretInput(st, context, sessionId, out,
appId, webUIAddress, displayAppInfo);
} catch (Exception e) {
LOGGER.error("Exception in LivyInterpreter.", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
InterpreterUtils.getMostRelevantMessage(e));
}
}
/**
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
* from following:
* res0: String = application_1473129941656_0048
*
* @param result
* @return
*/
private static String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("=")) >= 0) {
return result.substring(pos + 1).trim();
} else {
throw new RuntimeException("No result can be extracted from '" + result + "', " +
"something must be wrong");
}
}
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
}

View file

@ -133,7 +133,7 @@ public class LivyHelper {
public InterpreterResult interpretInput(String stringLines,
final InterpreterContext context,
final Map<String, Integer> userSessionMap,
int sessionId,
LivyOutputStream out,
String appId,
String webUI,
@ -185,7 +185,7 @@ public class LivyHelper {
InterpreterResult res;
try {
res = interpret(incomplete + s, context, userSessionMap);
res = interpret(incomplete + s, context, sessionId);
} catch (Exception e) {
LOGGER.error("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
@ -230,12 +230,12 @@ public class LivyHelper {
public InterpreterResult interpret(String stringLines,
final InterpreterContext context,
final Map<String, Integer> userSessionMap)
int sessionId)
throws Exception {
if (stringLines.trim().equals("")) {
return new InterpreterResult(Code.SUCCESS, "");
}
Map jsonMap = executeCommand(stringLines, context, userSessionMap);
Map jsonMap = executeCommand(stringLines, context, sessionId);
Integer id = ((Double) jsonMap.get("id")).intValue();
InterpreterResult res = getResultFromMap(jsonMap);
if (res != null) {
@ -247,7 +247,7 @@ public class LivyHelper {
if (paragraphHttpMap.get(context.getParagraphId()) == null) {
return new InterpreterResult(Code.INCOMPLETE, "");
}
jsonMap = getStatusById(context, userSessionMap, id);
jsonMap = getStatusById(context, sessionId, id);
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
if (interpreterResult != null) {
return interpreterResult;
@ -292,11 +292,10 @@ public class LivyHelper {
return null;
}
private Map executeCommand(String lines, InterpreterContext context,
Map<String, Integer> userSessionMap) throws Exception {
private Map executeCommand(String lines, InterpreterContext context, int sessionId)
throws Exception {
String json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/"
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
+ "/statements",
+ sessionId + "/statements",
"POST",
"{\"code\": \"" + StringEscapeUtils.escapeJson(lines) + "\"}",
context.getParagraphId());
@ -316,9 +315,9 @@ public class LivyHelper {
}
private Map getStatusById(InterpreterContext context,
Map<String, Integer> userSessionMap, Integer id) throws Exception {
int sessionId, Integer id) throws Exception {
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
+ sessionId
+ "/statements/" + id,
"GET", null, context.getParagraphId());
LOGGER.debug("statement {} response: {}", id, json);
@ -390,19 +389,18 @@ public class LivyHelper {
}
public void cancelHTTP(String paragraphId) {
// TODO(zjffdu), use cancel rest api of livy
paragraphHttpMap.put(paragraphId, null);
}
public void closeSession(Map<String, Integer> userSessionMap) {
for (Map.Entry<String, Integer> entry : userSessionMap.entrySet()) {
try {
executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
+ entry.getValue(),
"DELETE", null, null);
} catch (Exception e) {
LOGGER.error(String.format("Error closing session for user with session ID: %s",
entry.getValue()), e);
}
public void closeSession(int sessionId) {
try {
executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/" + sessionId,
"DELETE", null, null);
} catch (Exception e) {
LOGGER.error(String.format("Error closing session for user with session ID: %s",
sessionId), e);
}
}
}

View file

@ -33,81 +33,14 @@ import java.util.Properties;
/**
* Livy PySpark interpreter for Zeppelin.
*/
public class LivyPySparkInterpreter extends Interpreter {
Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);
protected Map<String, Integer> userSessionMap;
protected LivyHelper livyHelper;
public class LivyPySparkInterpreter extends BaseLivyInterprereter {
public LivyPySparkInterpreter(Properties property) {
super(property);
userSessionMap = new HashMap<>();
livyHelper = new LivyHelper(property);
}
@Override
public void open() {
public String getSessionKind() {
return "pyspark";
}
@Override
public void close() {
livyHelper.closeSession(userSessionMap);
}
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
try {
userSessionMap.put(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext,
"pyspark")
);
} catch (Exception e) {
LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
}
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
return livyHelper.interpret(line, interpreterContext, userSessionMap);
} catch (Exception e) {
LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
InterpreterUtils.getMostRelevantMessage(e));
}
}
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
LivyPySparkInterpreter.class.getName() + this.hashCode());
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor) {
return null;
}
}

View file

@ -28,136 +28,20 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
/**
* Livy Spark interpreter for Zeppelin.
*/
public class LivySparkInterpreter extends Interpreter {
Logger LOGGER = LoggerFactory.getLogger(LivySparkInterpreter.class);
private LivyOutputStream out;
protected static Map<String, Integer> userSessionMap;
protected static Map<Integer, String> sessionId2AppIdMap;
protected static Map<Integer, String> sessionId2WebUIMap;
private LivyHelper livyHelper;
private boolean displayAppInfo;
public class LivySparkInterpreter extends BaseLivyInterprereter {
public LivySparkInterpreter(Properties property) {
super(property);
userSessionMap = new HashMap<>();
sessionId2AppIdMap = new HashMap<>();
sessionId2WebUIMap = new HashMap<>();
livyHelper = new LivyHelper(property);
out = new LivyOutputStream();
this.displayAppInfo = Boolean.parseBoolean(getProperty("zeppelin.livy.displayAppInfo"));
}
protected static Map<String, Integer> getUserSessionMap() {
return userSessionMap;
}
public void setUserSessionMap(Map<String, Integer> userSessionMap) {
this.userSessionMap = userSessionMap;
}
@Override
public void open() {
}
@Override
public void close() {
livyHelper.closeSession(userSessionMap);
}
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {
Integer sessionId = null;
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
try {
sessionId = livyHelper.createSession(interpreterContext, "spark");
userSessionMap.put(interpreterContext.getAuthenticationInfo().getUser(), sessionId);
if (displayAppInfo) {
String appId = extractStatementResult(
livyHelper.interpret("sc.applicationId", interpreterContext, userSessionMap)
.message().get(0).getData());
livyHelper.interpret(
"val webui=sc.getClass.getMethod(\"ui\").invoke(sc).asInstanceOf[Some[_]].get",
interpreterContext, userSessionMap);
String webUI = extractStatementResult(
livyHelper.interpret(
"webui.getClass.getMethod(\"appUIAddress\").invoke(webui)",
interpreterContext, userSessionMap).message().get(0).getData());
sessionId2AppIdMap.put(sessionId, appId);
sessionId2WebUIMap.put(sessionId, webUI);
LOGGER.info("Create livy session with sessionId: {}, appId: {}, webUI: {}",
sessionId, appId, webUI);
} else {
LOGGER.info("Create livy session with sessionId: {}", sessionId);
}
} catch (Exception e) {
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
} else {
sessionId = userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser());
}
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out,
sessionId2AppIdMap.get(sessionId), sessionId2WebUIMap.get(sessionId), displayAppInfo);
} catch (Exception e) {
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
InterpreterUtils.getMostRelevantMessage(e));
}
}
/**
* Extract the eval result of spark shell, e.g. extract application_1473129941656_0048
* from following:
* res0: String = application_1473129941656_0048
* @param result
* @return
*/
private static String extractStatementResult(String result) {
int pos = -1;
if ((pos = result.indexOf("=")) >= 0) {
return result.substring(pos + 1).trim();
} else {
throw new RuntimeException("No result can be extracted from '" + result + "', " +
"something must be wrong");
}
}
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
LivySparkInterpreter.class.getName() + this.hashCode());
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor) {
return null;
public String getSessionKind() {
return "spark";
}
}

View file

@ -33,81 +33,14 @@ import java.util.Properties;
/**
* Livy PySpark interpreter for Zeppelin.
*/
public class LivySparkRInterpreter extends Interpreter {
Logger LOGGER = LoggerFactory.getLogger(LivySparkRInterpreter.class);
protected Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
public class LivySparkRInterpreter extends BaseLivyInterprereter {
public LivySparkRInterpreter(Properties property) {
super(property);
userSessionMap = new HashMap<>();
livyHelper = new LivyHelper(property);
}
@Override
public void open() {
public String getSessionKind() {
return "sparkr";
}
@Override
public void close() {
livyHelper.closeSession(userSessionMap);
}
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
try {
userSessionMap.put(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext,
"sparkr")
);
} catch (Exception e) {
LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
}
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
return livyHelper.interpret(line, interpreterContext, userSessionMap);
} catch (Exception e) {
LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
InterpreterUtils.getMostRelevantMessage(e));
}
}
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
LivySparkRInterpreter.class.getName() + this.hashCode());
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor) {
return null;
}
}

View file

@ -27,66 +27,67 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
/**
* Livy PySpark interpreter for Zeppelin.
* Livy SparkSQL Interpreter for Zeppelin.
*/
public class LivySparkSQLInterpreter extends Interpreter {
public class LivySparkSQLInterpreter extends BaseLivyInterprereter {
Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class);
private LivySparkInterpreter sparkInterpreter;
protected Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
private boolean sqlContextCreated = false;
public LivySparkSQLInterpreter(Properties property) {
super(property);
livyHelper = new LivyHelper(property);
userSessionMap = LivySparkInterpreter.getUserSessionMap();
}
@Override
public String getSessionKind() {
return "spark";
}
@Override
public void open() {
super.open();
this.sparkInterpreter =
(LivySparkInterpreter) getInterpreterInTheSameSessionByClassName(
LivySparkInterpreter.class.getName());
}
@Override
public void close() {
livyHelper.closeSession(userSessionMap);
}
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
public InterpreterResult interpret(String line, InterpreterContext context) {
try {
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
try {
userSessionMap.put(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext,
"spark")
);
} catch (Exception e) {
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
}
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
InterpreterResult res = livyHelper.interpret("sqlContext.sql(\"" +
line.replaceAll("\"", "\\\\\"")
.replaceAll("\\n", " ")
+ "\").show(" +
property.get("zeppelin.livy.spark.sql.maxResult") + ")",
interpreterContext, userSessionMap);
// create sqlContext implicitly, as in livy 0.2 sqlContext is not available.
if (!sqlContextCreated) {
InterpreterResult result = sparkInterpreter.interpret("sqlContext", context);
if (result.code() == InterpreterResult.Code.ERROR) {
result = sparkInterpreter.interpret(
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", context);
if (result.code() == InterpreterResult.Code.ERROR) {
return new InterpreterResult(InterpreterResult.Code.ERROR, "Fail to create sqlContext,"
+ result.message());
}
}
sqlContextCreated = true;
}
// delegate the work to LivySparkInterpreter in the same session.
InterpreterResult res = sparkInterpreter.interpret("sqlContext.sql(\"" +
line.replaceAll("\"", "\\\\\"")
.replaceAll("\\n", " ")
+ "\").show(" +
property.get("zeppelin.livy.spark.sql.maxResult") + ")", context);
if (res.code() == InterpreterResult.Code.SUCCESS) {
StringBuilder resMsg = new StringBuilder();
resMsg.append("%table ");
String[] rows = res.message().get(0).getData().split("\n");
String[] rows = new String(context.out.toByteArray()).split("\n");
String[] headers = rows[1].split("\\|");
for (int head = 1; head < headers.length; head++) {
resMsg.append(headers[head].trim()).append("\t");
@ -114,7 +115,6 @@ public class LivySparkSQLInterpreter extends Interpreter {
return res;
}
} catch (Exception e) {
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
@ -126,21 +126,6 @@ public class LivySparkSQLInterpreter extends Interpreter {
return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
}
@Override
public void cancel(InterpreterContext context) {
livyHelper.cancelHTTP(context.getParagraphId());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public Scheduler getScheduler() {
if (concurrentSQL()) {
@ -158,9 +143,4 @@ public class LivySparkSQLInterpreter extends Interpreter {
}
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor) {
return null;
}
}

View file

@ -56,9 +56,6 @@ public class LivyHelperTest {
@Before
public void prepareContext() throws Exception {
interpreter.userSessionMap = new HashMap<>();
interpreter.userSessionMap.put(null, 1);
Properties properties = new Properties();
properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
livyHelper.property = properties;
@ -103,10 +100,9 @@ public class LivyHelperTest {
@Test
public void checkInterpret() {
try {
InterpreterResult result = livyHelper.interpret("print(1)", interpreterContext, interpreter.userSessionMap);
collector.checkThat("check sessionId", InterpreterResult.Code.SUCCESS, CoreMatchers.equalTo(result.code()));
InterpreterResult result = livyHelper.interpret("print(1)", interpreterContext, 1);
collector.checkThat("check sessionId", InterpreterResult.Code.SUCCESS,
CoreMatchers.equalTo(result.code()));
} catch (Exception e) {
collector.addError(e);
}

View file

@ -27,15 +27,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class LivyIntegrationTest {
public class LivyInterpreterIT {
private static Logger LOGGER = LoggerFactory.getLogger(LivyIntegrationTest.class);
private static Logger LOGGER = LoggerFactory.getLogger(LivyInterpreterIT.class);
private static Cluster cluster;
private static Properties properties;
@ -76,8 +77,11 @@ public class LivyIntegrationTest {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
@ -86,13 +90,16 @@ public class LivyIntegrationTest {
sparkInterpreter.open();
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertNull(result.message());
assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
// test RDD api
outputListener.reset();
result = sparkInterpreter.interpret("sc.parallelize(1 to 10).sum()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertNull(result.message());
assertTrue(outputListener.getOutputAppended().contains("Double = 55.0"));
// test DataFrame api
@ -102,7 +109,8 @@ public class LivyIntegrationTest {
result = sparkInterpreter.interpret("val df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n"
+ "df.collect()" , context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertNull(result.message());
assertTrue(outputListener.getOutputAppended()
.contains("Array[org.apache.spark.sql.Row] = Array([hello,20])"));
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
@ -110,6 +118,8 @@ public class LivyIntegrationTest {
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
outputListener.reset();
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
result = sqlInterpreter.interpret("select * from df", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@ -163,6 +173,37 @@ public class LivyIntegrationTest {
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertNull(result.message().get(0).getData());
assertTrue(outputListener.getOutputAppended().contains("defined module Person"));
// error
result = sparkInterpreter.interpret("println(a)", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertTrue(result.message().contains("error: not found: value a"));
sparkInterpreter.close();
}
@Test
public void testSparkSQLInterpreter() {
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "title",
"text", authInfo, null, null, null, null, null, output);
InterpreterResult result = sqlInterpreter.interpret("show tables", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.type());
assertTrue(result.message().contains("tableName"));
}
@Test
@ -180,14 +221,17 @@ public class LivyIntegrationTest {
pysparkInterpreter.open();
InterpreterResult result = pysparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertNull(result.message());
assertTrue(outputListener.getOutputAppended().contains("1.5.2"));
// test RDD api
outputListener.reset();
result = pysparkInterpreter.interpret("sc.range(1, 10).sum()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals("45", result.message().get(0).getData());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertNull(result.message());
assertTrue(outputListener.getOutputAppended().contains("45"));
// test DataFrame api
outputListener.reset();
@ -195,9 +239,18 @@ public class LivyIntegrationTest {
+ "sqlContext = SQLContext(sc)", context);
result = pysparkInterpreter.interpret("df=sqlContext.createDataFrame([(\"hello\",20)])\n"
+ "df.collect()" , context);
assertTrue(result.message().contains("[Row(_1=u'hello', _2=20)]"));
assertEquals(InterpreterResult.Type.TEXT, result.message().get(0).getType());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertNull(result.message());
assertTrue(outputListener.getOutputAppended().contains("[Row(_1=u'hello', _2=20)]"));
// error
result = pysparkInterpreter.interpret("print(a)", context);
assertEquals(InterpreterResult.Code.ERROR, result.code());
assertEquals(InterpreterResult.Type.TEXT, result.type());
assertTrue(result.message().contains("name 'a' is not defined"));
pysparkInterpreter.close();
}
@Test
@ -205,7 +258,7 @@ public class LivyIntegrationTest {
if (!checkPreCondition()) {
return;
}
// TODO (zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
// TODO(zjffdu), Livy's SparkRIntepreter has some issue, do it after livy-0.3 release.
}
public static class MyInterpreterOutputListener implements InterpreterOutputListener {

View file

@ -1,86 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ErrorCollector;
import org.junit.runner.RunWith;
import org.mockito.Answers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.HashMap;
import java.util.Properties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
@RunWith(MockitoJUnitRunner.class)
public class LivyInterpreterTest {
@Rule
public ErrorCollector collector = new ErrorCollector();
private static LivyPySparkInterpreter interpreter;
@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private InterpreterContext interpreterContext;
@AfterClass
public static void tearDown() {
interpreter.close();
}
@Before
public void prepareContext() throws Exception {
interpreter = new LivyPySparkInterpreter(new Properties());
interpreter.userSessionMap = new HashMap<>();
interpreter.userSessionMap.put(null, 0);
interpreter.livyHelper = Mockito.mock(LivyHelper.class);
interpreter.open();
doReturn(new InterpreterResult(InterpreterResult.Code.SUCCESS)).when(interpreter.livyHelper)
.interpret("print \"x is 1.\"", interpreterContext, interpreter.userSessionMap);
}
@Test
public void checkInitVariables() throws Exception {
collector.checkThat("Check that, if userSessionMap is made: ",
interpreter.userSessionMap, CoreMatchers.notNullValue());
}
@Test
public void checkBasicInterpreter() throws Exception {
String paragraphString = "print \"x is 1.\"";
final InterpreterResult actual = interpreter.interpret(paragraphString, interpreterContext);
collector.checkThat("Check that, result is computed: ",
actual.code(), CoreMatchers.equalTo(InterpreterResult.Code.SUCCESS));
assertThat(actual).isNotNull();
}
}