mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
test livy
This commit is contained in:
parent
0709b9c51c
commit
40534974ae
5 changed files with 465 additions and 0 deletions
140
livy/pom.xml
Normal file
140
livy/pom.xml
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<relativePath>..</relativePath>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-livy</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<name>Zeppelin: Livy interpreter</name>
|
||||
<url>http://zeppelin.incubator.apache.org</url>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-exec</artifactId>
|
||||
<version>1.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>4.3.4</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/livy</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/livy</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
219
livy/src/main/java/org/apache/zeppelin/livy/LivyInterpreter.java
Normal file
219
livy/src/main/java/org/apache/zeppelin/livy/LivyInterpreter.java
Normal file
|
|
@ -0,0 +1,219 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.apache.zeppelin.livy.RequestHelper.executeHTTP;
|
||||
|
||||
/**
|
||||
* Livy interpreter for Zeppelin.
|
||||
*/
|
||||
public class LivyInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(LivyInterpreter.class);
|
||||
private static final String EXECUTOR_KEY = "executor";
|
||||
int commandTimeOut = 600000;
|
||||
|
||||
|
||||
static final String DEFAULT_URL = "http://localhost:8998";
|
||||
static final String DEFAULT_USER = "livy";
|
||||
|
||||
private Map<String, Integer> userSessionMap;
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"livy",
|
||||
"livy",
|
||||
LivyInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add("zeppelin.livy.url", DEFAULT_URL, "The URL for Livy Server.")
|
||||
.add("zeppelin.livy.user", DEFAULT_USER, "The livy user")
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public LivyInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
userSessionMap = new HashMap<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String line, InterpreterContext context) {
|
||||
if (userSessionMap.get(context.getAuthenticationInfo().getUser()) == null) {
|
||||
userSessionMap.put(context.getAuthenticationInfo().getUser(), createSession(context.getAuthenticationInfo().getUser()));
|
||||
}
|
||||
if (line == null || line.trim().length() == 0) {
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
}
|
||||
return interpret(line.split("\n"), context);
|
||||
}
|
||||
|
||||
private Integer createSession(String user) {
|
||||
try {
|
||||
String json = executeHTTP(DEFAULT_URL + "/sessions", "POST", "{\"kind\": \"pyspark\", \"proxyUser\": \"" + user + "\"}");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public InterpreterResult interpret(String[] lines, InterpreterContext context) {
|
||||
synchronized (this) {
|
||||
InterpreterResult res = interpretInput(lines, context);
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
public InterpreterResult interpretInput(String[] lines, InterpreterContext context) {
|
||||
// logger.debug("Run livy command '" + cmd + "'");
|
||||
// CommandLine cmdLine = CommandLine.parse("bash");
|
||||
// cmdLine.addArgument("-c", false);
|
||||
// cmdLine.addArgument(cmd, false);
|
||||
// DefaultExecutor executor = new DefaultExecutor();
|
||||
// ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
// ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
// executor.setStreamHandler(new PumpStreamHandler(contextInterpreter.out, errorStream));
|
||||
// executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
|
||||
//
|
||||
// Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
|
||||
// Map<String, Object> info = runningJob.info();
|
||||
// info.put(EXECUTOR_KEY, executor);
|
||||
// try {
|
||||
// int exitVal = executor.execute(cmdLine);
|
||||
// logger.info("Paragraph " + contextInterpreter.getParagraphId()
|
||||
// + "return with exit value: " + exitVal);
|
||||
// return new InterpreterResult(InterpreterResult.Code.SUCCESS, null);
|
||||
// } catch (ExecuteException e) {
|
||||
// int exitValue = e.getExitValue();
|
||||
// logger.error("Can not run " + cmd, e);
|
||||
// Code code = Code.ERROR;
|
||||
// String msg = errorStream.toString();
|
||||
// if (exitValue == 143) {
|
||||
// code = Code.INCOMPLETE;
|
||||
// msg = msg + "Paragraph received a SIGTERM.\n";
|
||||
// logger.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
// + " stopped executing: " + msg);
|
||||
// }
|
||||
// msg += "ExitValue: " + exitValue;
|
||||
// return new InterpreterResult(code, msg);
|
||||
// } catch (IOException e) {
|
||||
// logger.error("Can not run " + cmd, e);
|
||||
// return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
// }
|
||||
|
||||
|
||||
String[] linesToRun = new String[lines.length + 1];
|
||||
for (int i = 0; i < lines.length; i++) {
|
||||
linesToRun[i] = lines[i];
|
||||
}
|
||||
linesToRun[lines.length] = "print(\"\")";
|
||||
|
||||
String incomplete = "";
|
||||
Code r = null;
|
||||
|
||||
for (int l = 0; l < linesToRun.length; l++) {
|
||||
String s = linesToRun[l];
|
||||
// check if next line starts with "." (but not ".." or "./") it is treated as an invocation
|
||||
if (l + 1 < linesToRun.length) {
|
||||
String nextLine = linesToRun[l + 1].trim();
|
||||
if (nextLine.startsWith(".") && !nextLine.startsWith("..") && !nextLine.startsWith("./")) {
|
||||
incomplete += s + "\n";
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
executeHTTP()
|
||||
// incomplete + s;
|
||||
// return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
if (r == Code.INCOMPLETE) {
|
||||
return new InterpreterResult(r, "Incomplete expression");
|
||||
} else {
|
||||
return new InterpreterResult(Code.SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
private Job getRunningJob(String paragraphId) {
|
||||
Job foundJob = null;
|
||||
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
|
||||
for (Job job : jobsRunning) {
|
||||
if (job.getId().equals(paragraphId)) {
|
||||
foundJob = job;
|
||||
}
|
||||
}
|
||||
return foundJob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
if (runningJob != null) {
|
||||
Map<String, Object> info = runningJob.info();
|
||||
Object object = info.get(EXECUTOR_KEY);
|
||||
if (object != null) {
|
||||
Executor executor = (Executor) object;
|
||||
ExecuteWatchdog watchdog = executor.getWatchdog();
|
||||
watchdog.destroyProcess();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
LivyInterpreter.class.getName() + this.hashCode(), 10);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String buf, int cursor) {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
104
livy/src/main/java/org/apache/zeppelin/livy/RequestHelper.java
Normal file
104
livy/src/main/java/org/apache/zeppelin/livy/RequestHelper.java
Normal file
|
|
@ -0,0 +1,104 @@
|
|||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.http.params.HttpConnectionParams;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
|
||||
/**
|
||||
* Created by prabhjyot.singh on 17/02/16.
|
||||
*/
|
||||
public class RequestHelper {
|
||||
|
||||
public static String executeHTTP(String targetURL, String method, String jsonData)
|
||||
throws Exception {
|
||||
|
||||
HttpClient client = HttpClientBuilder.create().build();
|
||||
|
||||
HttpResponse response;
|
||||
if (method.equals("POST")) {
|
||||
HttpPost request = new HttpPost(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
StringEntity se = new StringEntity(jsonData);
|
||||
request.setEntity(se);
|
||||
response = client.execute(request);
|
||||
} else {
|
||||
HttpGet request = new HttpGet(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
response = client.execute(request);
|
||||
}
|
||||
|
||||
|
||||
if (response.getStatusLine().getStatusCode() == 200 || response.getStatusLine().getStatusCode() == 201 ) {
|
||||
return null;
|
||||
}
|
||||
BufferedReader rd = new BufferedReader(
|
||||
new InputStreamReader(response.getEntity().getContent()));
|
||||
|
||||
StringBuffer result = new StringBuffer();
|
||||
String line = "";
|
||||
while ((line = rd.readLine()) != null) {
|
||||
result.append(line);
|
||||
}
|
||||
return result.toString();
|
||||
// HttpURLConnection connection = null;
|
||||
// try {
|
||||
// //Create connection
|
||||
// URL url = new URL(targetURL);
|
||||
// connection = (HttpURLConnection) url.openConnection();
|
||||
// connection.setRequestProperty("Content-Type",
|
||||
// "application/json; charset=UTF-8");
|
||||
// connection.setDoOutput(true);
|
||||
// connection.setDoInput(true);
|
||||
// connection.setRequestMethod(method);
|
||||
//
|
||||
// connection.setRequestProperty("Content-Length",
|
||||
// Integer.toString(urlParameters.getBytes().length));
|
||||
// connection.setRequestProperty("Content-Language", "en-US");
|
||||
//
|
||||
// connection.setUseCaches(false);
|
||||
// connection.setDoOutput(true);
|
||||
//
|
||||
// //Send request
|
||||
// OutputStreamWriter wr = new OutputStreamWriter(connection.getOutputStream());
|
||||
// if (jsonData != null) {
|
||||
// wr.write(jsonData);
|
||||
// }
|
||||
// wr.close();
|
||||
//
|
||||
// //Get Response
|
||||
// InputStream is = connection.getInputStream();
|
||||
// BufferedReader rd = new BufferedReader(new InputStreamReader(is));
|
||||
// StringBuilder response = new StringBuilder(); // or StringBuffer if not Java 5+
|
||||
// String line;
|
||||
// while ((line = rd.readLine()) != null) {
|
||||
// response.append(line);
|
||||
// response.append('\r');
|
||||
// }
|
||||
// rd.close();
|
||||
// return response.toString();
|
||||
// } catch (Exception e) {
|
||||
// e.printStackTrace();
|
||||
// throw e;
|
||||
// } finally {
|
||||
// if (connection != null) {
|
||||
// connection.disconnect();
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
public static void main(String[] asd) throws Exception {
|
||||
String host = "http://192.168.0.106";
|
||||
String sessionCreate = executeHTTP(host + ":8998/sessions", "POST", "{\"kind\": \"pyspark\", \"proxyUser\": \"bob222\"}");
|
||||
System.out.println(sessionCreate);
|
||||
|
||||
String countSession = executeHTTP(host + ":8998/sessions", "GET", null);
|
||||
System.out.println(countSession);
|
||||
}
|
||||
}
|
||||
1
pom.xml
1
pom.xml
|
|
@ -92,6 +92,7 @@
|
|||
<module>markdown</module>
|
||||
<module>angular</module>
|
||||
<module>shell</module>
|
||||
<module>livy</module>
|
||||
<module>hive</module>
|
||||
<module>hbase</module>
|
||||
<module>phoenix</module>
|
||||
|
|
|
|||
|
|
@ -444,6 +444,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.markdown.Markdown,"
|
||||
+ "org.apache.zeppelin.angular.AngularInterpreter,"
|
||||
+ "org.apache.zeppelin.shell.ShellInterpreter,"
|
||||
+ "org.apache.zeppelin.livy.LivyInterpreter,"
|
||||
+ "org.apache.zeppelin.hive.HiveInterpreter,"
|
||||
+ "org.apache.zeppelin.tachyon.TachyonInterpreter,"
|
||||
+ "org.apache.zeppelin.phoenix.PhoenixInterpreter,"
|
||||
|
|
|
|||
Loading…
Reference in a new issue