have spark streaming

This commit is contained in:
Prabhjyot Singh 2016-02-22 20:13:35 +05:30
parent 9cb081993d
commit de2fd3cd60
6 changed files with 190 additions and 32 deletions

View file

@ -29,11 +29,13 @@ import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Properties;
@ -83,37 +85,69 @@ public class LivyHelper {
}
protected void initializeSpark(final InterpreterContext context,
final Map<String, Integer> userSessionMap) {
interpretInput("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
final Map<String, Integer> userSessionMap) throws Exception {
interpret("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
"import sqlContext.implicits._", context, userSessionMap);
}
public InterpreterResult interpretInput(String stringLines,
final InterpreterContext context,
final Map<String, Integer> userSessionMap) {
final Map<String, Integer> userSessionMap,
LivyOutputStream out) {
try {
String[] lines = stringLines.split("\n");
String[] linesToRun = new String[lines.length + 1];
for (int i = 0; i < lines.length; i++) {
linesToRun[i] = lines[i];
}
linesToRun[lines.length] = "print(\"\")";
stringLines = stringLines
.replaceAll("\\\\n", "\\\\\\\\n")
.replaceAll("\\n", "\\\\n")
.replaceAll("\\\\\"", "\\\\\\\\\"")
.replaceAll("\"", "\\\\\"");
out.setInterpreterOutput(context.out);
context.out.clear();
Code r = null;
String incomplete = "";
Map jsonMap = executeCommand(stringLines, context, userSessionMap);
Integer id = ((Double) jsonMap.get("id")).intValue();
for (int l = 0; l < linesToRun.length; l++) {
String s = linesToRun[l];
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
//for spark
if (l + 1 < linesToRun.length) {
String nextLine = linesToRun[l + 1].trim();
if (nextLine.startsWith(".")
&& !nextLine.startsWith("..")
&& !nextLine.startsWith("./")) {
incomplete += s + "\n";
continue;
}
}
InterpreterResult res = getResultFromMap(jsonMap);
if (res != null) {
return res;
InterpreterResult res;
try {
res = interpret(incomplete + s, context, userSessionMap);
} catch (Exception e) {
LOGGER.info("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
}
r = getResultCode(res);
if (r == Code.ERROR) {
out.setInterpreterOutput(null);
return new InterpreterResult(r, "");
} else if (r == Code.INCOMPLETE) {
incomplete += s + "\n";
} else {
out.write((res.message() + "\n").getBytes(Charset.forName("UTF-8")));
incomplete = "";
}
}
while (true) {
Thread.sleep(1000);
jsonMap = getStatusById(context, userSessionMap, id);
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
if (interpreterResult != null) {
return interpreterResult;
}
if (r == Code.INCOMPLETE) {
out.setInterpreterOutput(null);
return new InterpreterResult(r, "Incomplete expression");
} else {
out.setInterpreterOutput(null);
return new InterpreterResult(Code.SUCCESS);
}
} catch (Exception e) {
@ -122,11 +156,58 @@ public class LivyHelper {
}
}
private Code getResultCode(InterpreterResult r) {
if (r.code().equals(Code.SUCCESS)) {
return Code.SUCCESS;
} else if (r.code().equals(Code.INCOMPLETE)) {
return Code.INCOMPLETE;
} else {
return Code.ERROR;
}
}
protected InterpreterResult interpret(String stringLines,
final InterpreterContext context,
final Map<String, Integer> userSessionMap)
throws Exception {
stringLines = stringLines
.replaceAll("\\\\n", "\\\\\\\\n")
.replaceAll("\\n", "\\\\n")
.replaceAll("\\\\\"", "\\\\\\\\\"")
.replaceAll("\"", "\\\\\"");
if (stringLines.trim().equals("")) {
return new InterpreterResult(Code.SUCCESS, "");
}
LOGGER.error("stringLines==" + stringLines);
Map jsonMap = executeCommand(stringLines, context, userSessionMap);
Integer id = ((Double) jsonMap.get("id")).intValue();
LOGGER.error("jsonMap==" + jsonMap);
InterpreterResult res = getResultFromMap(jsonMap);
if (res != null) {
return res;
}
while (true) {
Thread.sleep(1000);
jsonMap = getStatusById(context, userSessionMap, id);
LOGGER.error("jsonMap==" + jsonMap);
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
if (interpreterResult != null) {
return interpreterResult;
}
}
}
private InterpreterResult getResultFromMap(Map jsonMap) {
if (jsonMap.get("state").equals("available")) {
if (((Map) jsonMap.get("output")).get("status").equals("error")) {
StringBuilder errorMessage = new StringBuilder((String) ((Map) jsonMap
.get("output")).get("evalue"));
if (errorMessage.toString().equals("incomplete statement")
|| errorMessage.toString().contains("EOF")) {
return new InterpreterResult(Code.INCOMPLETE, "");
}
String traceback = gson.toJson(((Map) jsonMap.get("output")).get("traceback"));
if (!traceback.equals("[]")) {
errorMessage

View file

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import java.io.IOException;
import java.io.OutputStream;
/**
* InterpreterOutput can be attached / detached.
*/
public class LivyOutputStream extends OutputStream {
InterpreterOutput interpreterOutput;
public LivyOutputStream() {
}
public InterpreterOutput getInterpreterOutput() {
return interpreterOutput;
}
public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
this.interpreterOutput = interpreterOutput;
}
@Override
public void write(int b) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b);
}
}
@Override
public void write(byte [] b) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b);
}
}
@Override
public void write(byte [] b, int offset, int len) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b, offset, len);
}
}
@Override
public void close() throws IOException {
if (interpreterOutput != null) {
interpreterOutput.close();
}
}
@Override
public void flush() throws IOException {
if (interpreterOutput != null) {
interpreterOutput.flush();
}
}
}

View file

@ -83,7 +83,7 @@ public class LivyPySparkInterpreter extends Interpreter {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
return livyHelper.interpretInput(line, interpreterContext, userSessionMap);
return livyHelper.interpret(line, interpreterContext, userSessionMap);
} catch (Exception e) {
LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
@ -107,8 +107,8 @@ public class LivyPySparkInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetParallelScheduler(
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
LivyPySparkInterpreter.class.getName() + this.hashCode());
}
@Override

View file

@ -35,6 +35,7 @@ public class LivySparkInterpreter extends Interpreter {
static String DEFAULT_URL = "http://localhost:8998";
Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);
private LivyOutputStream out;
static {
Interpreter.register(
@ -54,6 +55,7 @@ public class LivySparkInterpreter extends Interpreter {
super(property);
userSessionMap = new HashMap<>();
livyHelper = new LivyHelper(property);
out = new LivyOutputStream();
}
protected static Map<String, Integer> getUserSessionMap() {
@ -92,7 +94,7 @@ public class LivySparkInterpreter extends Interpreter {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
return livyHelper.interpretInput(line, interpreterContext, userSessionMap);
return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out);
} catch (Exception e) {
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
@ -116,8 +118,8 @@ public class LivySparkInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetParallelScheduler(
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
LivySparkInterpreter.class.getName() + this.hashCode());
}
@Override

View file

@ -83,7 +83,7 @@ public class LivySparkRInterpreter extends Interpreter {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
return livyHelper.interpretInput(line, interpreterContext, userSessionMap);
return livyHelper.interpret(line, interpreterContext, userSessionMap);
} catch (Exception e) {
LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
@ -107,8 +107,8 @@ public class LivySparkRInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetParallelScheduler(
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
LivySparkRInterpreter.class.getName() + this.hashCode());
}
@Override

View file

@ -87,7 +87,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
InterpreterResult res = livyHelper.interpretInput("sqlContext.sql(\"" + line + "\").show("
InterpreterResult res = livyHelper.interpret("sqlContext.sql(\"" + line + "\").show("
+ property.get("livy.spark.maxResult") + ")",
interpreterContext, userSessionMap);
@ -147,8 +147,8 @@ public class LivySparkSQLInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetParallelScheduler(
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
LivySparkInterpreter.class.getName() + this.hashCode());
}
@Override