mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
have spark streaming
This commit is contained in:
parent
9cb081993d
commit
de2fd3cd60
6 changed files with 190 additions and 32 deletions
|
|
@ -29,11 +29,13 @@ import org.apache.http.impl.client.HttpClientBuilder;
|
|||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
|
|
@ -83,37 +85,69 @@ public class LivyHelper {
|
|||
}
|
||||
|
||||
protected void initializeSpark(final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap) {
|
||||
interpretInput("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
|
||||
final Map<String, Integer> userSessionMap) throws Exception {
|
||||
interpret("val sqlContext= new org.apache.spark.sql.SQLContext(sc)\n" +
|
||||
"import sqlContext.implicits._", context, userSessionMap);
|
||||
}
|
||||
|
||||
public InterpreterResult interpretInput(String stringLines,
|
||||
final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap) {
|
||||
final Map<String, Integer> userSessionMap,
|
||||
LivyOutputStream out) {
|
||||
try {
|
||||
String[] lines = stringLines.split("\n");
|
||||
String[] linesToRun = new String[lines.length + 1];
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
linesToRun[i] = lines[i];
|
||||
}
|
||||
linesToRun[lines.length] = "print(\"\")";
|
||||
|
||||
stringLines = stringLines
|
||||
.replaceAll("\\\\n", "\\\\\\\\n")
|
||||
.replaceAll("\\n", "\\\\n")
|
||||
.replaceAll("\\\\\"", "\\\\\\\\\"")
|
||||
.replaceAll("\"", "\\\\\"");
|
||||
out.setInterpreterOutput(context.out);
|
||||
context.out.clear();
|
||||
Code r = null;
|
||||
String incomplete = "";
|
||||
|
||||
Map jsonMap = executeCommand(stringLines, context, userSessionMap);
|
||||
Integer id = ((Double) jsonMap.get("id")).intValue();
|
||||
for (int l = 0; l < linesToRun.length; l++) {
|
||||
String s = linesToRun[l];
|
||||
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
|
||||
//for spark
|
||||
if (l + 1 < linesToRun.length) {
|
||||
String nextLine = linesToRun[l + 1].trim();
|
||||
if (nextLine.startsWith(".")
|
||||
&& !nextLine.startsWith("..")
|
||||
&& !nextLine.startsWith("./")) {
|
||||
incomplete += s + "\n";
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
InterpreterResult res = getResultFromMap(jsonMap);
|
||||
if (res != null) {
|
||||
return res;
|
||||
InterpreterResult res;
|
||||
try {
|
||||
res = interpret(incomplete + s, context, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
LOGGER.info("Interpreter exception", e);
|
||||
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
|
||||
r = getResultCode(res);
|
||||
|
||||
if (r == Code.ERROR) {
|
||||
out.setInterpreterOutput(null);
|
||||
return new InterpreterResult(r, "");
|
||||
} else if (r == Code.INCOMPLETE) {
|
||||
incomplete += s + "\n";
|
||||
} else {
|
||||
out.write((res.message() + "\n").getBytes(Charset.forName("UTF-8")));
|
||||
incomplete = "";
|
||||
}
|
||||
}
|
||||
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
jsonMap = getStatusById(context, userSessionMap, id);
|
||||
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
|
||||
if (interpreterResult != null) {
|
||||
return interpreterResult;
|
||||
}
|
||||
if (r == Code.INCOMPLETE) {
|
||||
out.setInterpreterOutput(null);
|
||||
return new InterpreterResult(r, "Incomplete expression");
|
||||
} else {
|
||||
out.setInterpreterOutput(null);
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
|
|
@ -122,11 +156,58 @@ public class LivyHelper {
|
|||
}
|
||||
}
|
||||
|
||||
private Code getResultCode(InterpreterResult r) {
|
||||
if (r.code().equals(Code.SUCCESS)) {
|
||||
return Code.SUCCESS;
|
||||
} else if (r.code().equals(Code.INCOMPLETE)) {
|
||||
return Code.INCOMPLETE;
|
||||
} else {
|
||||
return Code.ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
protected InterpreterResult interpret(String stringLines,
|
||||
final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap)
|
||||
throws Exception {
|
||||
stringLines = stringLines
|
||||
.replaceAll("\\\\n", "\\\\\\\\n")
|
||||
.replaceAll("\\n", "\\\\n")
|
||||
.replaceAll("\\\\\"", "\\\\\\\\\"")
|
||||
.replaceAll("\"", "\\\\\"");
|
||||
|
||||
if (stringLines.trim().equals("")) {
|
||||
return new InterpreterResult(Code.SUCCESS, "");
|
||||
}
|
||||
LOGGER.error("stringLines==" + stringLines);
|
||||
Map jsonMap = executeCommand(stringLines, context, userSessionMap);
|
||||
Integer id = ((Double) jsonMap.get("id")).intValue();
|
||||
LOGGER.error("jsonMap==" + jsonMap);
|
||||
InterpreterResult res = getResultFromMap(jsonMap);
|
||||
if (res != null) {
|
||||
return res;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
Thread.sleep(1000);
|
||||
jsonMap = getStatusById(context, userSessionMap, id);
|
||||
LOGGER.error("jsonMap==" + jsonMap);
|
||||
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
|
||||
if (interpreterResult != null) {
|
||||
return interpreterResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private InterpreterResult getResultFromMap(Map jsonMap) {
|
||||
if (jsonMap.get("state").equals("available")) {
|
||||
if (((Map) jsonMap.get("output")).get("status").equals("error")) {
|
||||
StringBuilder errorMessage = new StringBuilder((String) ((Map) jsonMap
|
||||
.get("output")).get("evalue"));
|
||||
if (errorMessage.toString().equals("incomplete statement")
|
||||
|| errorMessage.toString().contains("EOF")) {
|
||||
return new InterpreterResult(Code.INCOMPLETE, "");
|
||||
}
|
||||
String traceback = gson.toJson(((Map) jsonMap.get("output")).get("traceback"));
|
||||
if (!traceback.equals("[]")) {
|
||||
errorMessage
|
||||
|
|
|
|||
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
|
||||
/**
|
||||
* InterpreterOutput can be attached / detached.
|
||||
*/
|
||||
public class LivyOutputStream extends OutputStream {
|
||||
InterpreterOutput interpreterOutput;
|
||||
|
||||
public LivyOutputStream() {
|
||||
}
|
||||
|
||||
public InterpreterOutput getInterpreterOutput() {
|
||||
return interpreterOutput;
|
||||
}
|
||||
|
||||
public void setInterpreterOutput(InterpreterOutput interpreterOutput) {
|
||||
this.interpreterOutput = interpreterOutput;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(int b) throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.write(b);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte [] b) throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.write(b);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte [] b, int offset, int len) throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.write(b, offset, len);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flush() throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.flush();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -83,7 +83,7 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap);
|
||||
return livyHelper.interpret(line, interpreterContext, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
|
|
@ -107,8 +107,8 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
LivyPySparkInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -35,6 +35,7 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
|
||||
static String DEFAULT_URL = "http://localhost:8998";
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);
|
||||
private LivyOutputStream out;
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
|
|
@ -54,6 +55,7 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
super(property);
|
||||
userSessionMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper(property);
|
||||
out = new LivyOutputStream();
|
||||
}
|
||||
|
||||
protected static Map<String, Integer> getUserSessionMap() {
|
||||
|
|
@ -92,7 +94,7 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap);
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap, out);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
|
|
@ -116,8 +118,8 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ public class LivySparkRInterpreter extends Interpreter {
|
|||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap);
|
||||
return livyHelper.interpret(line, interpreterContext, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
|
|
@ -107,8 +107,8 @@ public class LivySparkRInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
LivySparkRInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
InterpreterResult res = livyHelper.interpretInput("sqlContext.sql(\"" + line + "\").show("
|
||||
InterpreterResult res = livyHelper.interpret("sqlContext.sql(\"" + line + "\").show("
|
||||
+ property.get("livy.spark.maxResult") + ")",
|
||||
interpreterContext, userSessionMap);
|
||||
|
||||
|
|
@ -147,8 +147,8 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in a new issue