with lspark sql

This commit is contained in:
Prabhjyot Singh 2016-02-21 00:21:35 +05:30
parent 10311d3baf
commit 07f0846d78
4 changed files with 175 additions and 2 deletions

View file

@ -86,7 +86,7 @@ public class LivyHelper {
}
while (true) {
Thread.sleep(5);
Thread.sleep(1000);
jsonMap = getStatusById(context, userSessionMap, id);
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
if (interpreterResult != null) {

View file

@ -47,7 +47,7 @@ public class LivySparkInterpreter extends Interpreter {
);
}
private Map<String, Integer> userSessionMap;
private static Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
public LivySparkInterpreter(Properties property) {
@ -56,6 +56,13 @@ public class LivySparkInterpreter extends Interpreter {
livyHelper = new LivyHelper(property);
}
protected static Map<String, Integer> getUserSessionMap() {
return userSessionMap;
}
public void setUserSessionMap(Map<String, Integer> userSessionMap) {
this.userSessionMap = userSessionMap;
}
@Override
public void open() {

View file

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Livy PySpark interpreter for Zeppelin.
*/
public class LivySparkSQLInterpreter extends Interpreter {
Logger LOGGER = LoggerFactory.getLogger(LivySparkSQLInterpreter.class);
static String DEFAULT_MAX_RESULT = "1000";
static {
Interpreter.register(
"sql",
"lspark",
LivySparkSQLInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("livy.spark.maxResult",
DEFAULT_MAX_RESULT,
"Max number of SparkSQL result to display.")
.build()
);
}
private Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
public LivySparkSQLInterpreter(Properties property) {
super(property);
livyHelper = new LivyHelper(property);
userSessionMap = LivySparkInterpreter.getUserSessionMap();
}
@Override
public void open() {
}
@Override
public void close() {
}
@Override
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
try {
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
try {
userSessionMap.put(
interpreterContext.getAuthenticationInfo().getUser(),
livyHelper.createSession(
interpreterContext.getAuthenticationInfo().getUser(),
"spark")
);
} catch (Exception e) {
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
}
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
InterpreterResult res = livyHelper.interpretInput("sqlContext.sql(\"" + line + "\").show("
+ property.get("livy.spark.maxResult") + ")",
interpreterContext, userSessionMap);
if (res.code() == InterpreterResult.Code.SUCCESS) {
StringBuilder resMsg = new StringBuilder();
resMsg.append("%table ");
LOGGER.error("=====================");
LOGGER.error("=====================");
LOGGER.error("=====================");
LOGGER.error(res.message());
LOGGER.error("=====================");
LOGGER.error("=====================");
LOGGER.error("=====================");
String[] rows = res.message().split("\n");
String[] headers = rows[1].split("\\|");
for (int head = 1; head < headers.length; head++) {
resMsg.append(headers[head].trim()).append("\t");
}
resMsg.append("\n");
if (rows[3].indexOf("+") == 0) {
} else {
for (int cols = 3; cols < rows.length - 1; cols++) {
String[] col = rows[cols].split("\\|");
for (int data = 1; data < col.length; data++) {
resMsg.append(col[data].trim()).append("\t");
}
resMsg.append("\n");
}
}
if (rows[rows.length - 1].indexOf("only") == 0) {
resMsg.append("<font color=red>" + rows[rows.length - 1] + ".</font>");
}
return new InterpreterResult(InterpreterResult.Code.SUCCESS,
resMsg.toString()
);
} else {
return res;
}
} catch (Exception e) {
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
InterpreterUtils.getMostRelevantMessage(e));
}
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetParallelScheduler(
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
}
@Override
public List<String> completion(String buf, int cursor) {
return null;
}
}

View file

@ -445,6 +445,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.angular.AngularInterpreter,"
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.livy.LivySparkInterpreter,"
+ "org.apache.zeppelin.livy.LivySparkSQLInterpreter,"
+ "org.apache.zeppelin.livy.LivyPySparkInterpreter,"
+ "org.apache.zeppelin.livy.LivySparkRInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"