close livy session on connection close/restart interpreter

This commit is contained in:
Prabhjyot Singh 2016-04-20 11:57:33 +05:30
parent 84bd755720
commit 9bfbe47f99
7 changed files with 172 additions and 125 deletions

View file

@ -16,130 +16,155 @@
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.6.0-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-livy</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<name>Zeppelin: Livy interpreter</name>
<url>http://zeppelin.incubator.apache.org</url>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-livy</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<name>Zeppelin: Livy interpreter</name>
<url>http://zeppelin.incubator.apache.org</url>
<properties>
<!--TEST-->
<junit.version>4.12</junit.version>
<achilles.version>3.2.4-Zeppelin</achilles.version>
<assertj.version>1.7.0</assertj.version>
<mockito.version>1.9.5</mockito.version>
</properties>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.4</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.4</version>
</dependency>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/livy</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/livy</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/livy
</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/livy
</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -23,6 +23,7 @@ import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
@ -300,7 +301,7 @@ public class LivyHelper {
public String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
throws Exception {
HttpClient client = HttpClientBuilder.create().build();
HttpResponse response;
HttpResponse response = null;
if (method.equals("POST")) {
HttpPost request = new HttpPost(targetURL);
request.addHeader("Content-Type", "application/json");
@ -308,11 +309,19 @@ public class LivyHelper {
request.setEntity(se);
response = client.execute(request);
paragraphHttpMap.put(paragraphId, request);
} else {
} else if (method.equals("GET")) {
HttpGet request = new HttpGet(targetURL);
request.addHeader("Content-Type", "application/json");
response = client.execute(request);
paragraphHttpMap.put(paragraphId, request);
} else if (method.equals("DELETE")) {
HttpDelete request = new HttpDelete(targetURL);
request.addHeader("Content-Type", "application/json");
response = client.execute(request);
}
if (response == null) {
return null;
}
if (response.getStatusLine().getStatusCode() == 200
@ -340,4 +349,16 @@ public class LivyHelper {
paragraphHttpMap.put(paragraphId, null);
}
protected void closeSession(Map<String, Integer> userSessionMap, String kind) {
for (Map.Entry<String, Integer> entry : userSessionMap.entrySet()) {
try {
executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
+ entry.getValue(),
"DELETE", null, null);
} catch (Exception e) {
LOGGER.error(String.format("Error closing session for user with session ID: %s, kind: %s",
entry.getValue(), kind), e);
}
}
}
}

View file

@ -46,14 +46,14 @@ public class LivyOutputStream extends OutputStream {
}
@Override
public void write(byte [] b) throws IOException {
public void write(byte[] b) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b);
}
}
@Override
public void write(byte [] b, int offset, int len) throws IOException {
public void write(byte[] b, int offset, int len) throws IOException {
if (interpreterOutput != null) {
interpreterOutput.write(b, offset, len);
}

View file

@ -46,7 +46,7 @@ public class LivyPySparkInterpreter extends Interpreter {
);
}
private Map<String, Integer> userSessionMap;
protected Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
public LivyPySparkInterpreter(Properties property) {
@ -61,6 +61,7 @@ public class LivyPySparkInterpreter extends Interpreter {
@Override
public void close() {
livyHelper.closeSession(userSessionMap, "pyspark");
}
@Override

View file

@ -17,10 +17,7 @@
package org.apache.zeppelin.livy;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.Executor;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@ -51,7 +48,7 @@ public class LivySparkInterpreter extends Interpreter {
);
}
private static Map<String, Integer> userSessionMap;
protected static Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
public LivySparkInterpreter(Properties property) {
@ -75,6 +72,7 @@ public class LivySparkInterpreter extends Interpreter {
@Override
public void close() {
livyHelper.closeSession(userSessionMap, "spark");
}
@Override

View file

@ -46,7 +46,7 @@ public class LivySparkRInterpreter extends Interpreter {
);
}
private Map<String, Integer> userSessionMap;
protected Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
public LivySparkRInterpreter(Properties property) {
@ -61,6 +61,7 @@ public class LivySparkRInterpreter extends Interpreter {
@Override
public void close() {
livyHelper.closeSession(userSessionMap, "sparkr");
}
@Override

View file

@ -49,7 +49,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
);
}
private Map<String, Integer> userSessionMap;
protected Map<String, Integer> userSessionMap;
private LivyHelper livyHelper;
public LivySparkSQLInterpreter(Properties property) {
@ -64,6 +64,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
@Override
public void close() {
livyHelper.closeSession(userSessionMap, "spark");
}
@Override