mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Working livy interpreter with spark, pyspark, R
This commit is contained in:
parent
ace28a8000
commit
d0519d5a02
7 changed files with 315 additions and 280 deletions
|
|
@ -18,98 +18,164 @@
|
|||
package org.apache.zeppelin.livy;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.zeppelin.livy.RequestHelper.executeHTTP;
|
||||
|
||||
/**
|
||||
* Livy interpreter for Zeppelin.
|
||||
/***
|
||||
* Livy helper class
|
||||
*/
|
||||
public class LivyHelper {
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivyHelper.class);
|
||||
Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
||||
Properties property;
|
||||
|
||||
LivyHelper(Properties property) {
|
||||
this.property = property;
|
||||
}
|
||||
|
||||
Gson gson = new Gson();
|
||||
|
||||
protected Integer createSession(String user, String kind) {
|
||||
protected Integer createSession(String user, String kind) throws Exception {
|
||||
try {
|
||||
String json = executeHTTP(System.getProperty("zeppelin.livy.url") + "/sessions",
|
||||
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions",
|
||||
"POST",
|
||||
"{\"kind\": \"" + kind + "\", \"proxyUser\": \"" + user + "\"}"
|
||||
);
|
||||
Map jsonMap = (Map<String, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<String, Object>>() {
|
||||
Map jsonMap = (Map<Object, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<Object, Object>>() {
|
||||
}.getType());
|
||||
return ((Double) jsonMap.get("id")).intValue();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Error getting session for user", e);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public InterpreterResult interpret(String[] lines, InterpreterContext context,
|
||||
Map<String, Integer> userSessionMap) {
|
||||
synchronized (this) {
|
||||
InterpreterResult res = interpretInput(lines, context, userSessionMap);
|
||||
return res;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public InterpreterResult interpretInput(String[] lines, InterpreterContext context,
|
||||
Map<String, Integer> userSessionMap) {
|
||||
String[] linesToRun = new String[lines.length + 1];
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
linesToRun[i] = lines[i];
|
||||
}
|
||||
linesToRun[lines.length] = "print(\"\")";
|
||||
public InterpreterResult interpretInput(String lines,
|
||||
final InterpreterContext context,
|
||||
final Map<String, Integer> userSessionMap) {
|
||||
try {
|
||||
// LOGGER.error("line i got::" + lines);
|
||||
lines = lines.replaceAll("\\n", "\\\\n").replaceAll("\"", "\\\\\"");
|
||||
// LOGGER.error("line i made::" + lines);
|
||||
Map jsonMap = executeCommand(lines, context, userSessionMap);
|
||||
Integer id = ((Double) jsonMap.get("id")).intValue();
|
||||
|
||||
String incomplete = "";
|
||||
Code r = null;
|
||||
InterpreterResult res = getResultFromMap(jsonMap);
|
||||
if (res != null) {
|
||||
return res;
|
||||
}
|
||||
|
||||
for (int l = 0; l < linesToRun.length; l++) {
|
||||
String s = linesToRun[l];
|
||||
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
|
||||
if (l + 1 < linesToRun.length) {
|
||||
String nextLine = linesToRun[l + 1].trim();
|
||||
if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) {
|
||||
incomplete += s + "\n";
|
||||
continue;
|
||||
while (true) {
|
||||
Thread.sleep(5);
|
||||
jsonMap = getStatusById(context, userSessionMap, id);
|
||||
InterpreterResult interpreterResult = getResultFromMap(jsonMap);
|
||||
if (interpreterResult != null) {
|
||||
return interpreterResult;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS,
|
||||
executeCommand(incomplete + s, context, userSessionMap));
|
||||
} catch (Exception e) {
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
if (r == Code.INCOMPLETE) {
|
||||
return new InterpreterResult(r, "Incomplete expression");
|
||||
} else {
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("error in interpretInput", e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private String executeCommand(String lines, InterpreterContext context,
|
||||
Map<String, Integer> userSessionMap) throws Exception {
|
||||
String json = executeHTTP(System.getProperty("zeppelin.livy.url") + "/sessions/"
|
||||
private InterpreterResult getResultFromMap(Map jsonMap) {
|
||||
if (jsonMap.get("state").equals("available")) {
|
||||
if (((Map) jsonMap.get("output")).get("status").equals("error")) {
|
||||
return new InterpreterResult(Code.ERROR,
|
||||
gson.toJson(((Map) jsonMap.get("output")).get("traceback")));
|
||||
}
|
||||
if (((Map) jsonMap.get("output")).get("status").equals("ok")) {
|
||||
return new InterpreterResult(Code.SUCCESS,
|
||||
(String) ((Map) ((Map) jsonMap.get("output")).get("data")).get("text/plain"));
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Map executeCommand(String lines, InterpreterContext context,
|
||||
Map<String, Integer> userSessionMap) throws Exception {
|
||||
String json = executeHTTP(property.get("zeppelin.livy.url") + "/sessions/"
|
||||
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
|
||||
+ "/statements",
|
||||
"POST",
|
||||
"{\"code\": \"" + lines + "\" }");
|
||||
Map jsonMap = (Map<String, Object>) gson.fromJson(json,
|
||||
new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
try {
|
||||
LOGGER.error(json);
|
||||
Map jsonMap = gson.fromJson(json,
|
||||
new TypeToken<Map>() {
|
||||
}.getType());
|
||||
return jsonMap;
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private Map getStatusById(InterpreterContext context,
|
||||
Map<String, Integer> userSessionMap, Integer id) throws Exception {
|
||||
String json = executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
|
||||
+ userSessionMap.get(context.getAuthenticationInfo().getUser())
|
||||
+ "/statements/" + id,
|
||||
"GET", null);
|
||||
try {
|
||||
Map jsonMap = gson.fromJson(json,
|
||||
new TypeToken<Map>() {
|
||||
}.getType());
|
||||
return jsonMap;
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
public String executeHTTP(String targetURL, String method, String jsonData)
|
||||
throws Exception {
|
||||
|
||||
HttpClient client = HttpClientBuilder.create().build();
|
||||
|
||||
HttpResponse response;
|
||||
LOGGER.error(jsonData);
|
||||
if (method.equals("POST")) {
|
||||
HttpPost request = new HttpPost(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
StringEntity se = new StringEntity(jsonData);
|
||||
request.setEntity(se);
|
||||
response = client.execute(request);
|
||||
} else {
|
||||
HttpGet request = new HttpGet(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
response = client.execute(request);
|
||||
}
|
||||
|
||||
|
||||
if (response.getStatusLine().getStatusCode() == 200
|
||||
|| response.getStatusLine().getStatusCode() == 201) {
|
||||
BufferedReader rd = new BufferedReader(
|
||||
new InputStreamReader(response.getEntity().getContent()));
|
||||
|
||||
StringBuffer result = new StringBuffer();
|
||||
String line = "";
|
||||
while ((line = rd.readLine()) != null) {
|
||||
result.append(line);
|
||||
}
|
||||
return result.toString();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,22 +17,16 @@
|
|||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.io.output.ByteArrayOutputStream;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
/**
|
||||
|
|
@ -40,32 +34,29 @@ import java.util.*;
|
|||
*/
|
||||
public class LivyPySparkInterpreter extends Interpreter {
|
||||
|
||||
static String DEFAULT_URL = "http://192.168.0.106:8080";
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"livy",
|
||||
"pyspark",
|
||||
LivyHelper.class.getName(),
|
||||
"lspark",
|
||||
LivyPySparkInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
private Map<String, Integer> userSessionMap;
|
||||
private LivyHelper livyHelper;
|
||||
private static final String EXECUTOR_KEY = "executor";
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);
|
||||
|
||||
public LivyPySparkInterpreter(Properties property) {
|
||||
super(property);
|
||||
userSessionMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
userSessionMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -73,41 +64,35 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
if (userSessionMap.get(context.getAuthenticationInfo().getUser()) == null) {
|
||||
userSessionMap.put(
|
||||
context.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(context.getAuthenticationInfo().getUser(), "pyspark"));
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
|
||||
try {
|
||||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
"pyspark")
|
||||
);
|
||||
} catch (Exception e) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivyPySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
|
||||
}
|
||||
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(context.out, errorStream));
|
||||
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
|
||||
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
Map<String, Object> info = runningJob.info();
|
||||
info.put(EXECUTOR_KEY, executor);
|
||||
|
||||
return livyHelper.interpret(line.split("\n"), context, userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
if (runningJob != null) {
|
||||
Map<String, Object> info = runningJob.info();
|
||||
Object object = info.get(EXECUTOR_KEY);
|
||||
if (object != null) {
|
||||
Executor executor = (Executor) object;
|
||||
ExecuteWatchdog watchdog = executor.getWatchdog();
|
||||
watchdog.destroyProcess();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -123,18 +108,7 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivyHelper.class.getName() + this.hashCode(), 10);
|
||||
}
|
||||
|
||||
private Job getRunningJob(String paragraphId) {
|
||||
Job foundJob = null;
|
||||
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
|
||||
for (Job job : jobsRunning) {
|
||||
if (job.getId().equals(paragraphId)) {
|
||||
foundJob = job;
|
||||
}
|
||||
}
|
||||
return foundJob;
|
||||
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -17,97 +17,83 @@
|
|||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.io.output.ByteArrayOutputStream;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Livy Spark interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivySparkInterpreter extends Interpreter {
|
||||
|
||||
static String DEFAULT_URL = "http://localhost:8998";
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivyPySparkInterpreter.class);
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"livy",
|
||||
"spark",
|
||||
LivyHelper.class.getName(),
|
||||
"lspark",
|
||||
"lspark",
|
||||
LivySparkInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
private Map<String, Integer> userSessionMap;
|
||||
private LivyHelper livyHelper;
|
||||
private static final String EXECUTOR_KEY = "executor";
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivySparkInterpreter.class);
|
||||
|
||||
public LivySparkInterpreter(Properties property) {
|
||||
super(property);
|
||||
userSessionMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper(property);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
userSessionMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
if (context.getAuthenticationInfo().getUser() == null) {
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
|
||||
try {
|
||||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
"spark")
|
||||
);
|
||||
} catch (Exception e) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
if (userSessionMap.get(context.getAuthenticationInfo().getUser()) == null) {
|
||||
userSessionMap.put(
|
||||
context.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(context.getAuthenticationInfo().getUser(), "spark"));
|
||||
}
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
|
||||
}
|
||||
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(context.out, errorStream));
|
||||
executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT));
|
||||
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
Map<String, Object> info = runningJob.info();
|
||||
info.put(EXECUTOR_KEY, executor);
|
||||
|
||||
return livyHelper.interpret(line.split("\n"), context, userSessionMap);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
if (runningJob != null) {
|
||||
Map<String, Object> info = runningJob.info();
|
||||
Object object = info.get(EXECUTOR_KEY);
|
||||
if (object != null) {
|
||||
Executor executor = (Executor) object;
|
||||
ExecuteWatchdog watchdog = executor.getWatchdog();
|
||||
watchdog.destroyProcess();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -123,22 +109,12 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivyHelper.class.getName() + this.hashCode(), 10);
|
||||
}
|
||||
|
||||
private Job getRunningJob(String paragraphId) {
|
||||
Job foundJob = null;
|
||||
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
|
||||
for (Job job : jobsRunning) {
|
||||
if (job.getId().equals(paragraphId)) {
|
||||
foundJob = job;
|
||||
}
|
||||
}
|
||||
return foundJob;
|
||||
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String buf, int cursor) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
|
||||
/**
|
||||
* Livy PySpark interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivySparkRInterpreter extends Interpreter {
|
||||
|
||||
Logger LOGGER = LoggerFactory.getLogger(LivySparkRInterpreter.class);
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"r",
|
||||
"lspark",
|
||||
LivySparkRInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
private Map<String, Integer> userSessionMap;
|
||||
private LivyHelper livyHelper;
|
||||
|
||||
public LivySparkRInterpreter(Properties property) {
|
||||
super(property);
|
||||
userSessionMap = new HashMap<>();
|
||||
livyHelper = new LivyHelper(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext interpreterContext) {
|
||||
try {
|
||||
if (userSessionMap.get(interpreterContext.getAuthenticationInfo().getUser()) == null) {
|
||||
try {
|
||||
userSessionMap.put(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
livyHelper.createSession(
|
||||
interpreterContext.getAuthenticationInfo().getUser(),
|
||||
"sparkr")
|
||||
);
|
||||
} catch (Exception e) {
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
|
||||
}
|
||||
|
||||
return livyHelper.interpretInput(line, interpreterContext, userSessionMap);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("Exception in LivySparkRInterpreter while interpret ", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivySparkInterpreter.class.getName() + this.hashCode(), 7);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String buf, int cursor) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,101 +0,0 @@
|
|||
package org.apache.zeppelin.livy;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Created by prabhjyot.singh on 17/02/16.
|
||||
*/
|
||||
public class RequestHelper {
|
||||
static Logger LOGGER = LoggerFactory.getLogger(RequestHelper.class);
|
||||
|
||||
public static String executeHTTP(String targetURL, String method, String jsonData)
|
||||
throws Exception {
|
||||
|
||||
HttpClient client = HttpClientBuilder.create().build();
|
||||
|
||||
HttpResponse response;
|
||||
LOGGER.error(jsonData);
|
||||
if (method.equals("POST")) {
|
||||
HttpPost request = new HttpPost(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
StringEntity se = new StringEntity(jsonData);
|
||||
request.setEntity(se);
|
||||
response = client.execute(request);
|
||||
} else {
|
||||
HttpGet request = new HttpGet(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
response = client.execute(request);
|
||||
}
|
||||
|
||||
|
||||
if (response.getStatusLine().getStatusCode() == 200
|
||||
|| response.getStatusLine().getStatusCode() == 201) {
|
||||
BufferedReader rd = new BufferedReader(
|
||||
new InputStreamReader(response.getEntity().getContent()));
|
||||
|
||||
StringBuffer result = new StringBuffer();
|
||||
String line = "";
|
||||
while ((line = rd.readLine()) != null) {
|
||||
result.append(line);
|
||||
}
|
||||
return result.toString();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static void main(String[] asd) throws Exception {
|
||||
// String host = "http://192.168.0.106";
|
||||
// String sessionCreate = executeHTTP(host + ":8998/sessions", "POST",
|
||||
// "{\"kind\": \"pyspark\", \"proxyUser\": \"bob222\"}");
|
||||
// System.out.println(sessionCreate);
|
||||
//
|
||||
// String countSession = executeHTTP(host + ":8998/sessions", "GET", null);
|
||||
// System.out.println(countSession);
|
||||
String user = "user";
|
||||
String DEFAULT_URL = "http://192.168.0.106:8998";
|
||||
// String json = executeHTTP(DEFAULT_URL + "/sessions", "POST",
|
||||
// "{\"kind\": \"pyspark\", \"proxyUser\": \"" + user + "\"}");
|
||||
Gson gson = new Gson();
|
||||
// Map jsonMap = (Map<String, Object>) gson.fromJson(json,
|
||||
// new TypeToken<Map<String, Object>>() {}.getType());
|
||||
|
||||
Map<String, Integer> stringIntegerHashMap = new HashMap<>();
|
||||
stringIntegerHashMap.put(null, 1);
|
||||
System.out.println();
|
||||
// String lines = "import time\\n" +
|
||||
// "REFRAIN = '''\\n" +
|
||||
// "%d bottles of beer on the wall,\\n" +
|
||||
// "%d bottles of beer,\\n" +
|
||||
// "take one down, pass it around,\\n" +
|
||||
// "%d bottles of beer on the wall!\\n" +
|
||||
// "'''\\n" +
|
||||
// "bottles_of_beer = 10\\n" +
|
||||
// "while bottles_of_beer > 1:\\n" +
|
||||
// " time.sleep(1)\\n" +
|
||||
// " print REFRAIN % (bottles_of_beer, bottles_of_beer,\\n" +
|
||||
// " bottles_of_beer - 1)\\n" +
|
||||
// " bottles_of_beer -= 1\\n";
|
||||
// String json = executeHTTP(DEFAULT_URL + "/sessions/"
|
||||
// + 0
|
||||
// + "/statements",
|
||||
// "POST",
|
||||
// "{\"code\": \"" + lines + "\" }");
|
||||
// Map jsonMap = (Map<String, Object>) gson.fromJson(json,
|
||||
// new TypeToken<Map<String, Object>>() {
|
||||
// }.getType());
|
||||
// System.out.println(json);
|
||||
}
|
||||
}
|
||||
|
|
@ -34,7 +34,7 @@ if [ ! -d "${SPARK_HOME}" ]; then
|
|||
echo "${SPARK_VERSION}" | grep "^1.[12].[0-9]" > /dev/null
|
||||
if [ $? -eq 0 ]; then
|
||||
# spark 1.1.x and spark 1.2.x can be downloaded from archive
|
||||
wget -q http://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz
|
||||
wget -q http://archive.apache.org/dist/spark/spark-$1.4.1/spark-1.4.1-bin-hadoop2.3.tgz
|
||||
else
|
||||
# spark 1.3.x and later can be downloaded from mirror
|
||||
# get download address from mirror
|
||||
|
|
|
|||
|
|
@ -444,8 +444,9 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.markdown.Markdown,"
|
||||
+ "org.apache.zeppelin.angular.AngularInterpreter,"
|
||||
+ "org.apache.zeppelin.shell.ShellInterpreter,"
|
||||
+ "org.apache.zeppelin.livy.LivyPySparkInterpreter,"
|
||||
+ "org.apache.zeppelin.livy.LivySparkInterpreter,"
|
||||
+ "org.apache.zeppelin.livy.LivyPySparkInterpreter,"
|
||||
+ "org.apache.zeppelin.livy.LivySparkRInterpreter,"
|
||||
+ "org.apache.zeppelin.hive.HiveInterpreter,"
|
||||
+ "org.apache.zeppelin.tachyon.TachyonInterpreter,"
|
||||
+ "org.apache.zeppelin.phoenix.PhoenixInterpreter,"
|
||||
|
|
|
|||
Loading…
Reference in a new issue