mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
close livy session on connection close/restart interpreter
This commit is contained in:
parent
84bd755720
commit
9bfbe47f99
7 changed files with 172 additions and 125 deletions
253
livy/pom.xml
253
livy/pom.xml
|
|
@ -16,130 +16,155 @@
|
|||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<relativePath>..</relativePath>
|
||||
</parent>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-livy</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<relativePath>..</relativePath>
|
||||
</parent>
|
||||
<name>Zeppelin: Livy interpreter</name>
|
||||
<url>http://zeppelin.incubator.apache.org</url>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-livy</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<name>Zeppelin: Livy interpreter</name>
|
||||
<url>http://zeppelin.incubator.apache.org</url>
|
||||
<properties>
|
||||
<!--TEST-->
|
||||
<junit.version>4.12</junit.version>
|
||||
<achilles.version>3.2.4-Zeppelin</achilles.version>
|
||||
<assertj.version>1.7.0</assertj.version>
|
||||
<mockito.version>1.9.5</mockito.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-exec</artifactId>
|
||||
<version>1.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-exec</artifactId>
|
||||
<version>1.3</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>4.3.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</dependency>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<dependency>
|
||||
<groupId>org.apache.httpcomponents</groupId>
|
||||
<artifactId>httpclient</artifactId>
|
||||
<version>4.3.4</version>
|
||||
</dependency>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<dependency>
|
||||
<groupId>com.google.code.gson</groupId>
|
||||
<artifactId>gson</artifactId>
|
||||
</dependency>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/livy</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/livy</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>${assertj.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<version>${mockito.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/livy
|
||||
</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/livy
|
||||
</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ import com.google.gson.reflect.TypeToken;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpDelete;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
|
|
@ -300,7 +301,7 @@ public class LivyHelper {
|
|||
public String executeHTTP(String targetURL, String method, String jsonData, String paragraphId)
|
||||
throws Exception {
|
||||
HttpClient client = HttpClientBuilder.create().build();
|
||||
HttpResponse response;
|
||||
HttpResponse response = null;
|
||||
if (method.equals("POST")) {
|
||||
HttpPost request = new HttpPost(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
|
|
@ -308,11 +309,19 @@ public class LivyHelper {
|
|||
request.setEntity(se);
|
||||
response = client.execute(request);
|
||||
paragraphHttpMap.put(paragraphId, request);
|
||||
} else {
|
||||
} else if (method.equals("GET")) {
|
||||
HttpGet request = new HttpGet(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
response = client.execute(request);
|
||||
paragraphHttpMap.put(paragraphId, request);
|
||||
} else if (method.equals("DELETE")) {
|
||||
HttpDelete request = new HttpDelete(targetURL);
|
||||
request.addHeader("Content-Type", "application/json");
|
||||
response = client.execute(request);
|
||||
}
|
||||
|
||||
if (response == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (response.getStatusLine().getStatusCode() == 200
|
||||
|
|
@ -340,4 +349,16 @@ public class LivyHelper {
|
|||
paragraphHttpMap.put(paragraphId, null);
|
||||
}
|
||||
|
||||
protected void closeSession(Map<String, Integer> userSessionMap, String kind) {
|
||||
for (Map.Entry<String, Integer> entry : userSessionMap.entrySet()) {
|
||||
try {
|
||||
executeHTTP(property.getProperty("zeppelin.livy.url") + "/sessions/"
|
||||
+ entry.getValue(),
|
||||
"DELETE", null, null);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(String.format("Error closing session for user with session ID: %s, kind: %s",
|
||||
entry.getValue(), kind), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,14 +46,14 @@ public class LivyOutputStream extends OutputStream {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void write(byte [] b) throws IOException {
|
||||
public void write(byte[] b) throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.write(b);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(byte [] b, int offset, int len) throws IOException {
|
||||
public void write(byte[] b, int offset, int len) throws IOException {
|
||||
if (interpreterOutput != null) {
|
||||
interpreterOutput.write(b, offset, len);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
);
|
||||
}
|
||||
|
||||
private Map<String, Integer> userSessionMap;
|
||||
protected Map<String, Integer> userSessionMap;
|
||||
private LivyHelper livyHelper;
|
||||
|
||||
public LivyPySparkInterpreter(Properties property) {
|
||||
|
|
@ -61,6 +61,7 @@ public class LivyPySparkInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
livyHelper.closeSession(userSessionMap, "pyspark");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -17,10 +17,7 @@
|
|||
|
||||
package org.apache.zeppelin.livy;
|
||||
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.zeppelin.interpreter.*;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -51,7 +48,7 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
);
|
||||
}
|
||||
|
||||
private static Map<String, Integer> userSessionMap;
|
||||
protected static Map<String, Integer> userSessionMap;
|
||||
private LivyHelper livyHelper;
|
||||
|
||||
public LivySparkInterpreter(Properties property) {
|
||||
|
|
@ -75,6 +72,7 @@ public class LivySparkInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
livyHelper.closeSession(userSessionMap, "spark");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ public class LivySparkRInterpreter extends Interpreter {
|
|||
);
|
||||
}
|
||||
|
||||
private Map<String, Integer> userSessionMap;
|
||||
protected Map<String, Integer> userSessionMap;
|
||||
private LivyHelper livyHelper;
|
||||
|
||||
public LivySparkRInterpreter(Properties property) {
|
||||
|
|
@ -61,6 +61,7 @@ public class LivySparkRInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
livyHelper.closeSession(userSessionMap, "sparkr");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
);
|
||||
}
|
||||
|
||||
private Map<String, Integer> userSessionMap;
|
||||
protected Map<String, Integer> userSessionMap;
|
||||
private LivyHelper livyHelper;
|
||||
|
||||
public LivySparkSQLInterpreter(Properties property) {
|
||||
|
|
@ -64,6 +64,7 @@ public class LivySparkSQLInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
livyHelper.closeSession(userSessionMap, "spark");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in a new issue