mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' into zeppelin-3092-remote-github-integration
This commit is contained in:
commit
90de14ccdf
60 changed files with 1929 additions and 277 deletions
|
|
@ -220,8 +220,8 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUB
|
|||
fi
|
||||
|
||||
eval $INTERPRETER_RUN_COMMAND &
|
||||
|
||||
pid=$!
|
||||
|
||||
if [[ -z "${pid}" ]]; then
|
||||
exit 1;
|
||||
else
|
||||
|
|
|
|||
47
bin/stop-interpreter.sh
Executable file
47
bin/stop-interpreter.sh
Executable file
|
|
@ -0,0 +1,47 @@
|
|||
#!/bin/bash
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# Stop Zeppelin Interpreter Processes
|
||||
#
|
||||
|
||||
bin=$(dirname "${BASH_SOURCE-$0}")
|
||||
bin=$(cd "${bin}">/dev/null; pwd)
|
||||
|
||||
. "${bin}/common.sh"
|
||||
|
||||
export ZEPPELIN_FORCE_STOP=1
|
||||
|
||||
ZEPPELIN_STOP_INTERPRETER_MAIN=org.apache.zeppelin.interpreter.recovery.StopInterpreter
|
||||
ZEPPELIN_LOGFILE="${ZEPPELIN_LOG_DIR}/stop-interpreter.log"
|
||||
JAVA_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
|
||||
|
||||
if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/classes" ]]; then
|
||||
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/classes"
|
||||
fi
|
||||
|
||||
if [[ -d "${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes" ]]; then
|
||||
ZEPPELIN_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-interpreter/target/classes"
|
||||
fi
|
||||
|
||||
addJarInDir "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
|
||||
addJarInDir "${ZEPPELIN_HOME}/zeppelin-server/target/lib"
|
||||
addJarInDir "${ZEPPELIN_HOME}/lib"
|
||||
addJarInDir "${ZEPPELIN_HOME}/lib/interpreter"
|
||||
|
||||
CLASSPATH+=":${ZEPPELIN_CLASSPATH}"
|
||||
$ZEPPELIN_RUNNER $JAVA_OPTS -cp $CLASSPATH $ZEPPELIN_STOP_INTERPRETER_MAIN ${@}
|
||||
|
|
@ -217,18 +217,6 @@ function stop() {
|
|||
action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}"
|
||||
fi
|
||||
fi
|
||||
|
||||
# list all pid that used in remote interpreter and kill them
|
||||
for f in ${ZEPPELIN_PID_DIR}/*.pid; do
|
||||
if [[ ! -f ${f} ]]; then
|
||||
continue;
|
||||
fi
|
||||
|
||||
pid=$(cat ${f})
|
||||
wait_for_zeppelin_to_die $pid 20
|
||||
$(rm -f ${f})
|
||||
done
|
||||
|
||||
}
|
||||
|
||||
function find_zeppelin_process() {
|
||||
|
|
|
|||
|
|
@ -481,6 +481,45 @@
|
|||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.class</name>
|
||||
<value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name>
|
||||
<value>6000</value>
|
||||
<description>Check interval of interpreter expiration in seconds</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name>
|
||||
<value>3600000</value>
|
||||
<description>Threshold of interpreter idle time in seconds, interpeter exceed this threshold will be killed</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.recovery.storage.class</name>
|
||||
<value>org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage</value>
|
||||
<description>ReoveryStorage implementation</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!--
|
||||
<property>
|
||||
<name>zeppelin.recovery.dir</name>
|
||||
<value>recovery</value>
|
||||
<description>Location where recovery metadata is stored</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
<!-- GitHub configurations
|
||||
<property>
|
||||
<name>zeppelin.notebook.git.remote.url</name>
|
||||
|
|
@ -506,5 +545,4 @@
|
|||
<description>Git repository remote</description>
|
||||
</property>
|
||||
-->
|
||||
|
||||
</configuration>
|
||||
|
|
|
|||
|
|
@ -44,7 +44,7 @@
|
|||
<li><a href="{{BASE_PATH}}/usage/display_system/basic.html#text">Text Display</a></li>
|
||||
<li><a href="{{BASE_PATH}}/usage/display_system/basic.html#html">HTML Display</a></li>
|
||||
<li><a href="{{BASE_PATH}}/usage/display_system/basic.html#table">Table Display</a></li>
|
||||
<li><a href="{{BASE_PATH}}/usage/display_system/basic.html#network">Network</a></li>
|
||||
<li><a href="{{BASE_PATH}}/usage/display_system/basic.html#network">Network Display</a></li>
|
||||
<li><a href="{{BASE_PATH}}/usage/display_system/angular_backend.html">Angular Display using Backend API</a></li>
|
||||
<li><a href="{{BASE_PATH}}/usage/display_system/angular_frontend.html">Angular Display using Frontend API</a></li>
|
||||
<li role="separator" class="divider"></li>
|
||||
|
|
|
|||
BIN
docs/assets/themes/zeppelin/img/screenshots/conf_interpreter.png
Normal file
BIN
docs/assets/themes/zeppelin/img/screenshots/conf_interpreter.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 38 KiB |
|
|
@ -59,6 +59,7 @@ limitations under the License.
|
|||
* [Text Display (`%text`)](./usage/display_system/basic.html#text)
|
||||
* [HTML Display (`%html`)](./usage/display_system/basic.html#html)
|
||||
* [Table Display (`%table`)](./usage/display_system/basic.html#table)
|
||||
* [Network Display (`%network`)](./usage/display_system/basic.html#network)
|
||||
* [Angular Display using Backend API (`%angular`)](./usage/display_system/angular_backend.html)
|
||||
* [Angular Display using Frontend API (`%angular`)](./usage/display_system/angular_frontend.html)
|
||||
* Interpreter
|
||||
|
|
|
|||
|
|
@ -131,3 +131,24 @@ Before 0.8.0, Zeppelin don't have lifecycle management on interpreter. User have
|
|||
`NullLifecycleManager` will do nothing,
|
||||
user need to control the lifecycle of interpreter by themselves as before. `TimeoutLifecycleManager` will shutdown interpreters after interpreter idle for a while. By default, the idle threshold is 1 hour.
|
||||
User can change it via `zeppelin.interpreter.lifecyclemanager.timeout.threshold`. `TimeoutLifecycleManager` is the default lifecycle manager, user can change it via `zeppelin.interpreter.lifecyclemanager.class`.
|
||||
|
||||
|
||||
## Generic ConfInterpreter
|
||||
|
||||
Zeppelin's interpreter setting is shared by all users and notes, if you want to have different setting you have to create new interpreter, e.g. you can create `spark_jar1` for running spark with dependency jar1 and `spark_jar2` for running spark with dependency jar2.
|
||||
This approach works, but not so convenient. `ConfInterpreter` can provide more fine-grained control on interpreter setting and more flexibility.
|
||||
|
||||
`ConfInterpreter` is a generic interpreter that could be used by any interpreters. The input format should be property file format.
|
||||
It can be used to make custom setting for any interpreter. But it requires to run before interpreter process launched. And when interpreter process is launched is determined by interpreter mode setting.
|
||||
So users needs to understand the ([interpreter mode setting ](../usage/interpreter/interpreter_bindings_mode.html) of Zeppelin and be aware when interpreter process is launched. E.g. If we set spark interpreter setting as isolated per note. Under this setting, each note will launch one interpreter process.
|
||||
In this scenario, user need to put `ConfInterpreter` as the first paragraph as the below example. Otherwise the customized setting can not be applied (Actually it would report ERROR)
|
||||
<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/screenshots/conf_interpreter.png" width="500px">
|
||||
|
||||
|
||||
## Interpreter Process Recovery
|
||||
|
||||
Before 0.8.0, shutting down Zeppelin also mean to shutdown all the running interpreter processes. Usually admin will shutdown Zeppelin server for maintenance or upgrade, but don't want to shut down the running interpreter processes.
|
||||
In such cases, interpreter process recovery is necessary. Starting from 0.8.0, user can enable interpreter process recovering via setting `zeppelin.recovery.storage.class` as
|
||||
`org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage` or other implementations if available in future, by default it is `org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage`
|
||||
which means recovery is not enabled. Enable recover means shutting down Zeppelin would not terminating interpreter process,
|
||||
and when Zeppelin is restarted, it would try to reconnect to the existing running interpreter processes. If you want to kill all the interpreter processes after terminating Zeppelin even when recovery is enabled, you can run `bin/stop-interpreter.sh`
|
||||
|
|
|
|||
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.kylin;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonSyntaxException;
|
||||
import org.apache.zeppelin.common.JsonSerializable;
|
||||
|
||||
/**
|
||||
* class for Kylin Error Response.
|
||||
*/
|
||||
class KylinErrorResponse implements JsonSerializable {
|
||||
private static final Gson gson = new Gson();
|
||||
|
||||
private String stacktrace;
|
||||
private String exception;
|
||||
private String url;
|
||||
private String code;
|
||||
private Object data;
|
||||
private String msg;
|
||||
|
||||
public KylinErrorResponse(String stacktrace, String exception, String url,
|
||||
String code, Object data, String msg) {
|
||||
this.stacktrace = stacktrace;
|
||||
this.exception = exception;
|
||||
this.url = url;
|
||||
this.code = code;
|
||||
this.data = data;
|
||||
this.msg = msg;
|
||||
}
|
||||
|
||||
public String getException() {
|
||||
return exception;
|
||||
}
|
||||
|
||||
public String toJson() {
|
||||
return gson.toJson(this);
|
||||
}
|
||||
|
||||
public static KylinErrorResponse fromJson(String json) {
|
||||
try {
|
||||
return gson.fromJson(json, KylinErrorResponse.class);
|
||||
} catch (JsonSyntaxException ex) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.zeppelin.kylin;
|
||||
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.client.HttpClient;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
|
|
@ -30,9 +31,7 @@ import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.regex.Matcher;
|
||||
|
|
@ -166,28 +165,42 @@ public class KylinInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
private InterpreterResult executeQuery(String sql) throws IOException {
|
||||
|
||||
HttpResponse response = prepareRequest(sql);
|
||||
String result;
|
||||
|
||||
if (response.getStatusLine().getStatusCode() != 200) {
|
||||
logger.error("failed to execute query: " + response.getEntity().getContent().toString());
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR,
|
||||
"Failed : HTTP error code " + response.getStatusLine().getStatusCode());
|
||||
try {
|
||||
int code = response.getStatusLine().getStatusCode();
|
||||
result = IOUtils.toString(response.getEntity().getContent(), "UTF-8");
|
||||
|
||||
if (code != 200) {
|
||||
StringBuilder errorMessage = new StringBuilder("Failed : HTTP error code " + code + " .");
|
||||
logger.error("Failed to execute query: " + result);
|
||||
|
||||
KylinErrorResponse kylinErrorResponse = KylinErrorResponse.fromJson(result);
|
||||
if (kylinErrorResponse == null) {
|
||||
logger.error("Cannot get json from string: " + result);
|
||||
// when code is 401, the response is html, not json
|
||||
if (code == 401) {
|
||||
errorMessage.append(" Error message: Unauthorized. This request requires "
|
||||
+ "HTTP authentication. Please make sure your have set your credentials "
|
||||
+ "correctly.");
|
||||
} else {
|
||||
errorMessage.append(" Error message: " + result + " .");
|
||||
}
|
||||
} else {
|
||||
String exception = kylinErrorResponse.getException();
|
||||
logger.error("The exception is " + exception);
|
||||
errorMessage.append(" Error message: " + exception + " .");
|
||||
}
|
||||
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, errorMessage.toString());
|
||||
}
|
||||
} catch (NullPointerException | IOException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
BufferedReader br = new BufferedReader(
|
||||
new InputStreamReader((response.getEntity().getContent())));
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
String output;
|
||||
logger.info("Output from Server .... \n");
|
||||
while ((output = br.readLine()) != null) {
|
||||
logger.info(output);
|
||||
sb.append(output).append('\n');
|
||||
}
|
||||
InterpreterResult rett = new InterpreterResult(InterpreterResult.Code.SUCCESS,
|
||||
formatResult(sb.toString()));
|
||||
return rett;
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS,
|
||||
formatResult(result));
|
||||
}
|
||||
|
||||
String formatResult(String msg) {
|
||||
|
|
@ -205,16 +218,18 @@ public class KylinInterpreter extends Interpreter {
|
|||
table = mr.group(1);
|
||||
}
|
||||
|
||||
String[] row = table.split("],\\[");
|
||||
for (int i = 0; i < row.length; i++) {
|
||||
String[] col = row[i].split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1);
|
||||
for (int j = 0; j < col.length; j++) {
|
||||
if (col[j] != null) {
|
||||
col[j] = col[j].replaceAll("^\"|\"$", "");
|
||||
if (table != null && !table.isEmpty()) {
|
||||
String[] row = table.split("],\\[");
|
||||
for (int i = 0; i < row.length; i++) {
|
||||
String[] col = row[i].split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1);
|
||||
for (int j = 0; j < col.length; j++) {
|
||||
if (col[j] != null) {
|
||||
col[j] = col[j].replaceAll("^\"|\"$", "");
|
||||
}
|
||||
res.append(col[j] + " \t");
|
||||
}
|
||||
res.append(col[j] + " \t");
|
||||
res.append(" \n");
|
||||
}
|
||||
res.append(" \n");
|
||||
}
|
||||
return res.toString();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,6 +108,30 @@ public class KylinInterpreterTest {
|
|||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseEmptyResult() {
|
||||
String msg = "{\"columnMetas\":[{\"isNullable\":1,\"displaySize\":256,\"label\":\"COUNTRY\",\"name\":\"COUNTRY\","
|
||||
+ "\"schemaName\":\"DEFAULT\",\"catelogName\":null,\"tableName\":\"SALES_TABLE\",\"precision\":256,"
|
||||
+ "\"scale\":0,\"columnType\":12,\"columnTypeName\":\"VARCHAR\",\"writable\":false,\"readOnly\":true,"
|
||||
+ "\"definitelyWritable\":false,\"autoIncrement\":false,\"caseSensitive\":true,\"searchable\":false,"
|
||||
+ "\"currency\":false,\"signed\":true},{\"isNullable\":1,\"displaySize\":256,\"label\":\"CURRENCY\","
|
||||
+ "\"name\":\"CURRENCY\",\"schemaName\":\"DEFAULT\",\"catelogName\":null,\"tableName\":\"SALES_TABLE\","
|
||||
+ "\"precision\":256,\"scale\":0,\"columnType\":12,\"columnTypeName\":\"VARCHAR\",\"writable\":false,"
|
||||
+ "\"readOnly\":true,\"definitelyWritable\":false,\"autoIncrement\":false,\"caseSensitive\":true,"
|
||||
+ "\"searchable\":false,\"currency\":false,\"signed\":true},{\"isNullable\":0,\"displaySize\":19,"
|
||||
+ "\"label\":\"COUNT__\",\"name\":\"COUNT__\",\"schemaName\":\"DEFAULT\",\"catelogName\":null,"
|
||||
+ "\"tableName\":\"SALES_TABLE\",\"precision\":19,\"scale\":0,\"columnType\":-5,\"columnTypeName\":"
|
||||
+ "\"BIGINT\",\"writable\":false,\"readOnly\":true,\"definitelyWritable\":false,\"autoIncrement\":false,"
|
||||
+ "\"caseSensitive\":true,\"searchable\":false,\"currency\":false,\"signed\":true}],\"results\":"
|
||||
+ "[]," + "\"cube\":\"Sample_Cube\",\"affectedRowCount\":0,\"isException\":false,\"exceptionMessage\":null,"
|
||||
+ "\"duration\":134,\"totalScanCount\":1,\"hitExceptionCache\":false,\"storageCacheUsed\":false,"
|
||||
+ "\"partial\":false}";
|
||||
String expected="%table COUNTRY \tCURRENCY \tCOUNT__ \t \n";
|
||||
KylinInterpreter t = new MockKylinInterpreter(getDefaultProperties());
|
||||
String actual = t.formatResult(msg);
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
private Properties getDefaultProperties() {
|
||||
Properties prop = new Properties();
|
||||
prop.put("kylin.api.username", "ADMIN");
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@
|
|||
"description": "Spark master uri. ex) spark://masterhost:7077",
|
||||
"type": "string"
|
||||
},
|
||||
"zeppelin.spark.unSupportedVersionCheck": {
|
||||
"zeppelin.spark.enableSupportedVersionCheck": {
|
||||
"envName": null,
|
||||
"propertyName": "zeppelin.spark.enableSupportedVersionCheck",
|
||||
"defaultValue": true,
|
||||
|
|
|
|||
|
|
@ -44,6 +44,6 @@ if [[ -n "$PYTHON" ]] ; then
|
|||
conda update -q conda
|
||||
conda info -a
|
||||
conda config --add channels conda-forge
|
||||
conda install -q matplotlib pandasql ipython jupyter_client ipykernel matplotlib bokeh=0.12.6
|
||||
conda install -q matplotlib pandasql ipython=5.4.1 jupyter_client ipykernel matplotlib bokeh=0.12.6
|
||||
pip install -q grpcio ggplot
|
||||
fi
|
||||
|
|
|
|||
|
|
@ -311,7 +311,7 @@ The text of each license is also included at licenses/LICENSE-[project]-[version
|
|||
|
||||
The following components are provided under the BSD-style License.
|
||||
|
||||
(New BSD License) JGit (org.eclipse.jgit:org.eclipse.jgit:jar:4.1.1.201511131810-r - https://eclipse.org/jgit/)
|
||||
(New BSD License) JGit (org.eclipse.jgit:org.eclipse.jgit:jar:4.5.4.201711221230-r - https://eclipse.org/jgit/)
|
||||
(New BSD License) Kryo (com.esotericsoftware.kryo:kryo:3.0.3 - http://code.google.com/p/kryo/)
|
||||
(New BSD License) MinLog (com.esotericsoftware.minlog:minlog:1.3 - http://code.google.com/p/minlog/)
|
||||
(New BSD License) ReflectASM (com.esotericsoftware.reflectasm:reflectasm:1.07 - http://code.google.com/p/reflectasm/)
|
||||
|
|
|
|||
|
|
@ -355,6 +355,19 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR);
|
||||
}
|
||||
|
||||
public String getRecoveryDir() {
|
||||
return getRelativeDir(ConfVars.ZEPPELIN_RECOVERY_DIR);
|
||||
}
|
||||
|
||||
public String getRecoveryStorageClass() {
|
||||
return getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS);
|
||||
}
|
||||
|
||||
public boolean isRecoveryEnabled() {
|
||||
return !getString(ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS).equals(
|
||||
"org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage");
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return getString(ConfVars.ZEPPELIN_NOTEBOOK_S3_USER);
|
||||
}
|
||||
|
|
@ -674,6 +687,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_INTERPRETER_OUTPUT_LIMIT("zeppelin.interpreter.output.limit", 1024 * 100),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
ZEPPELIN_RECOVERY_DIR("zeppelin.recovery.dir", "recovery"),
|
||||
ZEPPELIN_RECOVERY_STORAGE_CLASS("zeppelin.recovery.storage.class",
|
||||
"org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"),
|
||||
|
||||
// use specified notebook (id) as homescreen
|
||||
ZEPPELIN_NOTEBOOK_HOMESCREEN("zeppelin.notebook.homescreen", null),
|
||||
// whether homescreen notebook will be hidden from notebook list or not
|
||||
|
|
|
|||
|
|
@ -19,8 +19,20 @@ package org.apache.zeppelin.interpreter.launcher;
|
|||
|
||||
/**
|
||||
* Interface to InterpreterClient which is created by InterpreterLauncher. This is the component
|
||||
* that is used to for the communication fromzeppelin-server process to zeppelin interpreter process
|
||||
* that is used to for the communication from zeppelin-server process to zeppelin interpreter
|
||||
* process.
|
||||
*/
|
||||
public interface InterpreterClient {
|
||||
|
||||
String getInterpreterSettingName();
|
||||
|
||||
void start(String userName, Boolean isUserImpersonate);
|
||||
|
||||
void stop();
|
||||
|
||||
String getHost();
|
||||
|
||||
int getPort();
|
||||
|
||||
boolean isRunning();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ public class InterpreterLaunchContext {
|
|||
private Properties properties;
|
||||
private InterpreterOption option;
|
||||
private InterpreterRunner runner;
|
||||
private String interpreterGroupId;
|
||||
private String interpreterSettingId;
|
||||
private String interpreterSettingGroup;
|
||||
private String interpreterSettingName;
|
||||
|
|
@ -37,12 +38,14 @@ public class InterpreterLaunchContext {
|
|||
public InterpreterLaunchContext(Properties properties,
|
||||
InterpreterOption option,
|
||||
InterpreterRunner runner,
|
||||
String interpreterGroupId,
|
||||
String interpreterSettingId,
|
||||
String interpreterSettingGroup,
|
||||
String interpreterSettingName) {
|
||||
this.properties = properties;
|
||||
this.option = option;
|
||||
this.runner = runner;
|
||||
this.interpreterGroupId = interpreterGroupId;
|
||||
this.interpreterSettingId = interpreterSettingId;
|
||||
this.interpreterSettingGroup = interpreterSettingGroup;
|
||||
this.interpreterSettingName = interpreterSettingName;
|
||||
|
|
@ -60,6 +63,10 @@ public class InterpreterLaunchContext {
|
|||
return runner;
|
||||
}
|
||||
|
||||
public String getInterpreterGroupId() {
|
||||
return interpreterGroupId;
|
||||
}
|
||||
|
||||
public String getInterpreterSettingId() {
|
||||
return interpreterSettingId;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.zeppelin.interpreter.launcher;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
|
@ -29,9 +30,11 @@ public abstract class InterpreterLauncher {
|
|||
|
||||
protected ZeppelinConfiguration zConf;
|
||||
protected Properties properties;
|
||||
protected RecoveryStorage recoveryStorage;
|
||||
|
||||
public InterpreterLauncher(ZeppelinConfiguration zConf) {
|
||||
public InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
|
||||
this.zConf = zConf;
|
||||
this.recoveryStorage = recoveryStorage;
|
||||
}
|
||||
|
||||
public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* Interface for storing interpreter process recovery metadata.
|
||||
*
|
||||
*/
|
||||
public abstract class RecoveryStorage {
|
||||
|
||||
protected ZeppelinConfiguration zConf;
|
||||
protected Map<String, InterpreterClient> restoredClients;
|
||||
|
||||
public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException {
|
||||
this.zConf = zConf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update RecoveryStorage when new InterpreterClient is started
|
||||
* @param client
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract void onInterpreterClientStart(InterpreterClient client) throws IOException;
|
||||
|
||||
/**
|
||||
* Update RecoveryStorage when InterpreterClient is stopped
|
||||
* @param client
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract void onInterpreterClientStop(InterpreterClient client) throws IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
* It is only called when Zeppelin Server is started.
|
||||
*
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract Map<String, InterpreterClient> restore() throws IOException;
|
||||
|
||||
|
||||
/**
|
||||
* It is called after constructor
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public void init() throws IOException {
|
||||
this.restoredClients = restore();
|
||||
}
|
||||
|
||||
public InterpreterClient getInterpreterClient(String interpreterGroupId) {
|
||||
if (restoredClients.containsKey(interpreterGroupId)) {
|
||||
return restoredClients.get(interpreterGroupId);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
BIN
zeppelin-server/notebook/.python.recovery.crc
Normal file
BIN
zeppelin-server/notebook/.python.recovery.crc
Normal file
Binary file not shown.
1
zeppelin-server/notebook/python.recovery
Normal file
1
zeppelin-server/notebook/python.recovery
Normal file
|
|
@ -0,0 +1 @@
|
|||
2CZA1DVUG:shared_process 192.168.3.2:55410
|
||||
|
|
@ -349,6 +349,21 @@
|
|||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>${plugin.surefire.version}</version>
|
||||
<configuration combine.children="append">
|
||||
<argLine>-Xmx2g -Xms1g -Dfile.encoding=UTF-8</argLine>
|
||||
<excludes>
|
||||
<exclude>${tests.to.exclude}</exclude>
|
||||
</excludes>
|
||||
<environmentVariables>
|
||||
<ZEPPELIN_FORCE_STOP>1</ZEPPELIN_FORCE_STOP>
|
||||
</environmentVariables>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
|
||||
<plugin>
|
||||
<groupId>org.scala-tools</groupId>
|
||||
<artifactId>maven-scala-plugin</artifactId>
|
||||
|
|
|
|||
|
|
@ -162,7 +162,7 @@ public class ZeppelinServer extends Application {
|
|||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
|
||||
ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
final ZeppelinConfiguration conf = ZeppelinConfiguration.create();
|
||||
conf.setProperty("args", args);
|
||||
|
||||
jettyWebServer = setupJettyServer(conf);
|
||||
|
|
@ -199,7 +199,9 @@ public class ZeppelinServer extends Application {
|
|||
LOG.info("Shutting down Zeppelin Server ... ");
|
||||
try {
|
||||
jettyWebServer.stop();
|
||||
notebook.getInterpreterSettingManager().close();
|
||||
if (!conf.isRecoveryEnabled()) {
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().close();
|
||||
}
|
||||
notebook.close();
|
||||
Thread.sleep(3000);
|
||||
} catch (Exception e) {
|
||||
|
|
@ -222,7 +224,9 @@ public class ZeppelinServer extends Application {
|
|||
}
|
||||
|
||||
jettyWebServer.join();
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().close();
|
||||
if (!conf.isRecoveryEnabled()) {
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().close();
|
||||
}
|
||||
}
|
||||
|
||||
private static Server setupJettyServer(ZeppelinConfiguration conf) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,162 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.recovery;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import org.apache.commons.httpclient.methods.PostMethod;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.recovery.StopInterpreter;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.Paragraph;
|
||||
import org.apache.zeppelin.rest.AbstractTestRestApi;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.server.ZeppelinServer;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
public class RecoveryTest extends AbstractTestRestApi {
|
||||
|
||||
private Gson gson = new Gson();
|
||||
private static File recoveryDir = null;
|
||||
|
||||
@BeforeClass
|
||||
public static void init() throws Exception {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
|
||||
FileSystemRecoveryStorage.class.getName());
|
||||
recoveryDir = Files.createTempDir();
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath());
|
||||
startUp(RecoveryTest.class.getSimpleName());
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void destroy() throws Exception {
|
||||
shutDown();
|
||||
FileUtils.deleteDirectory(recoveryDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecovery() throws Exception {
|
||||
Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
|
||||
|
||||
// run python interpreter and create new variable `user`
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("%python user='abc'");
|
||||
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertThat(post, isAllowed());
|
||||
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.FINISHED, p1.getStatus());
|
||||
|
||||
// shutdown zeppelin and restart it
|
||||
shutDown();
|
||||
startUp(RecoveryTest.class.getSimpleName());
|
||||
|
||||
// run the paragraph again, but change the text to print variable `user`
|
||||
note1 = ZeppelinServer.notebook.getNote(note1.getId());
|
||||
p1 = note1.getParagraph(p1.getId());
|
||||
p1.setText("%python print(user)");
|
||||
post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.FINISHED, p1.getStatus());
|
||||
assertEquals("abc\n", p1.getResult().message().get(0).getData());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecovery_2() throws Exception {
|
||||
Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
|
||||
|
||||
// run python interpreter and create new variable `user`
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("%python user='abc'");
|
||||
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertThat(post, isAllowed());
|
||||
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.FINISHED, p1.getStatus());
|
||||
|
||||
// restart the python interpreter
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().restart(
|
||||
((ManagedInterpreterGroup) p1.getBindedInterpreter().getInterpreterGroup())
|
||||
.getInterpreterSetting().getId()
|
||||
);
|
||||
|
||||
// shutdown zeppelin and restart it
|
||||
shutDown();
|
||||
startUp(RecoveryTest.class.getSimpleName());
|
||||
|
||||
// run the paragraph again, but change the text to print variable `user`.
|
||||
// can not recover the python interpreter, because it has been shutdown.
|
||||
note1 = ZeppelinServer.notebook.getNote(note1.getId());
|
||||
p1 = note1.getParagraph(p1.getId());
|
||||
p1.setText("%python print(user)");
|
||||
post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.ERROR, p1.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecovery_3() throws Exception {
|
||||
Note note1 = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
|
||||
|
||||
// run python interpreter and create new variable `user`
|
||||
Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setText("%python user='abc'");
|
||||
PostMethod post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertThat(post, isAllowed());
|
||||
Map<String, Object> resp = gson.fromJson(post.getResponseBodyAsString(), new TypeToken<Map<String, Object>>() {
|
||||
}.getType());
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.FINISHED, p1.getStatus());
|
||||
|
||||
// shutdown zeppelin and restart it
|
||||
shutDown();
|
||||
StopInterpreter.main(new String[]{});
|
||||
|
||||
startUp(RecoveryTest.class.getSimpleName());
|
||||
|
||||
// run the paragraph again, but change the text to print variable `user`.
|
||||
// can not recover the python interpreter, because it has been shutdown.
|
||||
note1 = ZeppelinServer.notebook.getNote(note1.getId());
|
||||
p1 = note1.getParagraph(p1.getId());
|
||||
p1.setText("%python print(user)");
|
||||
post = httpPost("/notebook/job/" + note1.getId(), "");
|
||||
assertEquals(resp.get("status"), "OK");
|
||||
post.releaseConnection();
|
||||
assertEquals(Job.Status.ERROR, p1.getStatus());
|
||||
}
|
||||
}
|
||||
|
|
@ -318,8 +318,10 @@ public abstract class AbstractTestRestApi {
|
|||
if (!wasRunning) {
|
||||
// restart interpreter to stop all interpreter processes
|
||||
List<InterpreterSetting> settingList = ZeppelinServer.notebook.getInterpreterSettingManager().get();
|
||||
for (InterpreterSetting setting : settingList) {
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
|
||||
if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) {
|
||||
for (InterpreterSetting setting : settingList) {
|
||||
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
|
||||
}
|
||||
}
|
||||
if (shiroIni != null) {
|
||||
FileUtils.deleteQuietly(shiroIni);
|
||||
|
|
@ -350,7 +352,12 @@ public abstract class AbstractTestRestApi {
|
|||
.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName());
|
||||
}
|
||||
|
||||
FileUtils.deleteDirectory(confDir);
|
||||
if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) {
|
||||
// don't delete interpreter.json when recovery is enabled. otherwise the interpreter setting
|
||||
// id will change after zeppelin restart, then we can not recover interpreter process
|
||||
// properly
|
||||
FileUtils.deleteDirectory(confDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -535,6 +535,8 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
assertEquals("1", result[1]);
|
||||
assertEquals("items: Seq[Object] = Buffer(2)", result[2]);
|
||||
assertEquals("2", result[3]);
|
||||
|
||||
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -568,5 +570,33 @@ public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
|
|||
assertEquals("default_name", result[0]);
|
||||
assertEquals("1", result[1]);
|
||||
assertEquals("2", result[2]);
|
||||
|
||||
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfInterpreter() throws IOException {
|
||||
Note note = ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
|
||||
Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
Map config = p.getConfig();
|
||||
config.put("enabled", true);
|
||||
p.setConfig(config);
|
||||
p.setText("%spark.conf spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0");
|
||||
p.setAuthenticationInfo(anonymous);
|
||||
note.run(p.getId());
|
||||
waitForFinish(p);
|
||||
assertEquals(Status.FINISHED, p.getStatus());
|
||||
|
||||
Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
|
||||
p1.setConfig(config);
|
||||
p1.setText("%spark\nimport com.databricks.spark.csv._");
|
||||
p1.setAuthenticationInfo(anonymous);
|
||||
note.run(p1.getId());
|
||||
|
||||
waitForFinish(p1);
|
||||
assertEquals(Status.FINISHED, p1.getStatus());
|
||||
|
||||
ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -164,7 +164,7 @@ function NotebookCtrl ($scope, $route, $routeParams, $location, $rootScope,
|
|||
for (let i = 0; i < $scope.note.paragraphs.length; i++) {
|
||||
let paragraphId = $scope.note.paragraphs[i].id
|
||||
if (jQuery.contains(angular.element('#' + paragraphId + '_container')[0], clickEvent.target)) {
|
||||
$scope.$broadcast('focusParagraph', paragraphId, 0, true)
|
||||
$scope.$broadcast('focusParagraph', paragraphId, 0, null, true)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
@ -512,7 +512,7 @@ function NotebookCtrl ($scope, $route, $routeParams, $location, $rootScope,
|
|||
para.focus = true
|
||||
|
||||
// we need `$timeout` since angular DOM might not be initialized
|
||||
$timeout(() => { $scope.$broadcast('focusParagraph', para.id, 0, false) })
|
||||
$timeout(() => { $scope.$broadcast('focusParagraph', para.id, 0, null, false) })
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
@ -1188,6 +1188,92 @@ function NotebookCtrl ($scope, $route, $routeParams, $location, $rootScope,
|
|||
** $scope.$on functions below
|
||||
*/
|
||||
|
||||
$scope.$on('runAllAbove', function (event, paragraph, isNeedConfirm) {
|
||||
let allParagraphs = $scope.note.paragraphs
|
||||
let toRunParagraphs = []
|
||||
|
||||
for (let i = 0; allParagraphs[i] !== paragraph; i++) {
|
||||
if (i === allParagraphs.length - 1) { return } // if paragraph not in array of all paragraphs
|
||||
toRunParagraphs.push(allParagraphs[i])
|
||||
}
|
||||
|
||||
const paragraphs = toRunParagraphs.map(p => {
|
||||
return {
|
||||
id: p.id,
|
||||
title: p.title,
|
||||
paragraph: p.text,
|
||||
config: p.config,
|
||||
params: p.settings.params
|
||||
}
|
||||
})
|
||||
|
||||
if (!isNeedConfirm) {
|
||||
websocketMsgSrv.runAllParagraphs($scope.note.id, paragraphs)
|
||||
} else {
|
||||
BootstrapDialog.confirm({
|
||||
closable: true,
|
||||
title: '',
|
||||
message: 'Run all above?',
|
||||
callback: function (result) {
|
||||
if (result) {
|
||||
websocketMsgSrv.runAllParagraphs($scope.note.id, paragraphs)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
$scope.saveCursorPosition(paragraph)
|
||||
})
|
||||
|
||||
$scope.$on('runAllBelowAndCurrent', function (event, paragraph, isNeedConfirm) {
|
||||
let allParagraphs = $scope.note.paragraphs
|
||||
let toRunParagraphs = []
|
||||
|
||||
for (let i = allParagraphs.length - 1; allParagraphs[i] !== paragraph; i--) {
|
||||
if (i < 0) { return } // if paragraph not in array of all paragraphs
|
||||
toRunParagraphs.push(allParagraphs[i])
|
||||
}
|
||||
|
||||
toRunParagraphs.push(paragraph)
|
||||
toRunParagraphs.reverse()
|
||||
|
||||
const paragraphs = toRunParagraphs.map(p => {
|
||||
return {
|
||||
id: p.id,
|
||||
title: p.title,
|
||||
paragraph: p.text,
|
||||
config: p.config,
|
||||
params: p.settings.params
|
||||
}
|
||||
})
|
||||
|
||||
if (!isNeedConfirm) {
|
||||
websocketMsgSrv.runAllParagraphs($scope.note.id, paragraphs)
|
||||
} else {
|
||||
BootstrapDialog.confirm({
|
||||
closable: true,
|
||||
title: '',
|
||||
message: 'Run current and all below?',
|
||||
callback: function (result) {
|
||||
if (result) {
|
||||
websocketMsgSrv.runAllParagraphs($scope.note.id, paragraphs)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
$scope.saveCursorPosition(paragraph)
|
||||
})
|
||||
|
||||
$scope.saveCursorPosition = function (paragraph) {
|
||||
let angParagEditor = angular
|
||||
.element('#' + paragraph.id + '_paragraphColumn_main')
|
||||
.scope().editor
|
||||
let col = angParagEditor.selection.lead.column
|
||||
let row = angParagEditor.selection.lead.row
|
||||
$scope.$broadcast('focusParagraph', paragraph.id, row + 1, col)
|
||||
}
|
||||
|
||||
$scope.$on('setConnectedStatus', function (event, param) {
|
||||
if (connectedOnce && param) {
|
||||
initNotebook()
|
||||
|
|
|
|||
|
|
@ -130,8 +130,8 @@
|
|||
}
|
||||
|
||||
.noteAction {
|
||||
margin-left: -10px;
|
||||
margin-right: -10px;
|
||||
margin-left: 0px;
|
||||
margin-right: 0px;
|
||||
font-family: 'Roboto', sans-serif;
|
||||
background: white;
|
||||
position: fixed;
|
||||
|
|
|
|||
|
|
@ -140,6 +140,24 @@ limitations under the License.
|
|||
Insert new
|
||||
</a>
|
||||
</li>
|
||||
<li>
|
||||
<a ng-click="runAllToThis(paragraph)" ng-hide="$first">
|
||||
<span class="icon-action-redo shortcut-icon"
|
||||
style="position: relative; transform: rotate(-90deg); left: -4px;">
|
||||
</span>
|
||||
<span class="shortcut-keys">Ctrl+Shift+Enter</span>
|
||||
Run all above
|
||||
</a>
|
||||
</li>
|
||||
<li>
|
||||
<a ng-click="runAllFromThis(paragraph)" ng-hide="$last">
|
||||
<span class="icon-action-undo shortcut-icon"
|
||||
style="position: relative; transform: rotate(-90deg); left: -4px;">
|
||||
</span>
|
||||
<span class="shortcut-keys">Ctrl+Shift+Enter</span>
|
||||
Run all below
|
||||
</a>
|
||||
</li>
|
||||
<li>
|
||||
<a ng-click="copyParagraph(getEditorValue())"><span class="fa fa-copy shortcut-icon"></span>
|
||||
<span class="shortcut-keys">Ctrl+Shift+C</span>
|
||||
|
|
|
|||
|
|
@ -471,6 +471,43 @@ function ParagraphCtrl ($scope, $rootScope, $route, $window, $routeParams, $loca
|
|||
$scope.runParagraph($scope.getEditorValue(), false, false)
|
||||
}
|
||||
|
||||
$scope.runAllToThis = function(paragraph) {
|
||||
$scope.$emit('runAllAbove', paragraph, true)
|
||||
}
|
||||
|
||||
$scope.runAllFromThis = function(paragraph) {
|
||||
$scope.$emit('runAllBelowAndCurrent', paragraph, true)
|
||||
}
|
||||
|
||||
$scope.runAllToOrFromThis = function (paragraph) {
|
||||
BootstrapDialog.show({
|
||||
message: 'Run paragraphs:',
|
||||
title: '',
|
||||
buttons: [{
|
||||
label: 'Close',
|
||||
action: function(dialog) {
|
||||
dialog.close()
|
||||
}
|
||||
},
|
||||
{
|
||||
label: 'Run all above',
|
||||
cssClass: 'btn-primary',
|
||||
action: function(dialog) {
|
||||
$scope.$emit('runAllAbove', paragraph, false)
|
||||
dialog.close()
|
||||
}
|
||||
},
|
||||
{
|
||||
label: 'Run current and all below',
|
||||
cssClass: 'btn-primary',
|
||||
action: function(dialog) {
|
||||
$scope.$emit('runAllBelowAndCurrent', paragraph, false)
|
||||
dialog.close()
|
||||
}
|
||||
}]
|
||||
})
|
||||
}
|
||||
|
||||
$scope.turnOnAutoRun = function (paragraph) {
|
||||
paragraph.config.runOnSelectionChange = !paragraph.config.runOnSelectionChange
|
||||
commitParagraph(paragraph)
|
||||
|
|
@ -1446,8 +1483,10 @@ function ParagraphCtrl ($scope, $rootScope, $route, $window, $routeParams, $loca
|
|||
// move focus to next paragraph
|
||||
// $timeout stops chaining effect of focus propogation
|
||||
$timeout(() => $scope.$emit('moveFocusToNextParagraph', paragraphId))
|
||||
} else if (keyEvent.shiftKey && keyCode === 13) { // Shift + Enter
|
||||
} else if (!keyEvent.ctrlKey && keyEvent.shiftKey && keyCode === 13) { // Shift + Enter
|
||||
$scope.runParagraphFromShortcut($scope.getEditorValue())
|
||||
} else if (keyEvent.ctrlKey && keyEvent.shiftKey && keyCode === 13) { // Ctrl + Shift + Enter
|
||||
$scope.runAllToOrFromThis($scope.paragraph)
|
||||
} else if (keyEvent.ctrlKey && keyEvent.altKey && keyCode === 67) { // Ctrl + Alt + c
|
||||
$scope.cancelParagraph($scope.paragraph)
|
||||
} else if (keyEvent.ctrlKey && keyEvent.altKey && keyCode === 68) { // Ctrl + Alt + d
|
||||
|
|
@ -1500,7 +1539,10 @@ function ParagraphCtrl ($scope, $rootScope, $route, $window, $routeParams, $loca
|
|||
}
|
||||
})
|
||||
|
||||
$scope.$on('focusParagraph', function (event, paragraphId, cursorPos, mouseEvent) {
|
||||
$scope.$on('focusParagraph', function (event, paragraphId, cursorPosRow, cursorPosCol, mouseEvent) {
|
||||
if (cursorPosCol === null || cursorPosCol === undefined) {
|
||||
cursorPosCol = 0
|
||||
}
|
||||
if ($scope.paragraph.id === paragraphId) {
|
||||
// focus editor
|
||||
if (!$scope.paragraph.config.editorHide) {
|
||||
|
|
@ -1508,14 +1550,14 @@ function ParagraphCtrl ($scope, $rootScope, $route, $window, $routeParams, $loca
|
|||
$scope.editor.focus()
|
||||
// move cursor to the first row (or the last row)
|
||||
let row
|
||||
if (cursorPos >= 0) {
|
||||
row = cursorPos
|
||||
$scope.editor.gotoLine(row, 0)
|
||||
if (cursorPosRow >= 0) {
|
||||
row = cursorPosRow
|
||||
$scope.editor.gotoLine(row, cursorPosCol)
|
||||
} else {
|
||||
row = $scope.editor.session.getLength()
|
||||
$scope.editor.gotoLine(row, 0)
|
||||
$scope.editor.gotoLine(row, cursorPosCol)
|
||||
}
|
||||
$scope.scrollToCursor($scope.paragraph.id, 0)
|
||||
$scope.scrollToCursor($scope.paragraph.id, cursorPosCol)
|
||||
}
|
||||
}
|
||||
handleFocus(true)
|
||||
|
|
|
|||
|
|
@ -37,6 +37,17 @@ limitations under the License.
|
|||
</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
<td>
|
||||
<div class="col-md-8">Run all above/below paragraphs</div>
|
||||
</td>
|
||||
<td>
|
||||
<div class="keys">
|
||||
<kbd class="kbd-default">Ctrl</kbd> + <kbd class="kbd-default">Shift</kbd> + <kbd class="kbd-default">Enter</kbd>
|
||||
</div>
|
||||
</td>
|
||||
</tr>
|
||||
|
||||
<tr>
|
||||
<td>
|
||||
<div class="col-md-8">Cancel</div>
|
||||
|
|
|
|||
|
|
@ -89,6 +89,7 @@ export default class PivotTransformation extends Transformation {
|
|||
for (let j = i + 1; j < list.length; j++) {
|
||||
if (angular.equals(list[i], list[j])) {
|
||||
list.splice(j, 1)
|
||||
j--
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -137,8 +138,8 @@ export default class PivotTransformation extends Transformation {
|
|||
pivot (data, keys, groups, values) {
|
||||
let aggrFunc = {
|
||||
sum: function (a, b) {
|
||||
let varA = (a !== undefined) ? (isNaN(a) ? 1 : parseFloat(a)) : 0
|
||||
let varB = (b !== undefined) ? (isNaN(b) ? 1 : parseFloat(b)) : 0
|
||||
let varA = (a !== undefined) ? (isNaN(a) ? 0 : parseFloat(a)) : 0
|
||||
let varB = (b !== undefined) ? (isNaN(b) ? 0 : parseFloat(b)) : 0
|
||||
return varA + varB
|
||||
},
|
||||
count: function (a, b) {
|
||||
|
|
@ -147,22 +148,38 @@ export default class PivotTransformation extends Transformation {
|
|||
return varA + varB
|
||||
},
|
||||
min: function (a, b) {
|
||||
let varA = (a !== undefined) ? (isNaN(a) ? 1 : parseFloat(a)) : 0
|
||||
let varB = (b !== undefined) ? (isNaN(b) ? 1 : parseFloat(b)) : 0
|
||||
return Math.min(varA, varB)
|
||||
let aIsValid = isValidNumber(a)
|
||||
let bIsValid = isValidNumber(b)
|
||||
if (!aIsValid) {
|
||||
return parseFloat(b)
|
||||
} else if (!bIsValid) {
|
||||
return parseFloat(a)
|
||||
} else {
|
||||
return Math.min(parseFloat(a), parseFloat(b))
|
||||
}
|
||||
},
|
||||
max: function (a, b) {
|
||||
let varA = (a !== undefined) ? (isNaN(a) ? 1 : parseFloat(a)) : 0
|
||||
let varB = (b !== undefined) ? (isNaN(b) ? 1 : parseFloat(b)) : 0
|
||||
return Math.max(varA, varB)
|
||||
let aIsValid = isValidNumber(a)
|
||||
let bIsValid = isValidNumber(b)
|
||||
if (!aIsValid) {
|
||||
return parseFloat(b)
|
||||
} else if (!bIsValid) {
|
||||
return parseFloat(a)
|
||||
} else {
|
||||
return Math.max(parseFloat(a), parseFloat(b))
|
||||
}
|
||||
},
|
||||
avg: function (a, b, c) {
|
||||
let varA = (a !== undefined) ? (isNaN(a) ? 1 : parseFloat(a)) : 0
|
||||
let varB = (b !== undefined) ? (isNaN(b) ? 1 : parseFloat(b)) : 0
|
||||
let varA = (a !== undefined) ? (isNaN(a) ? 0 : parseFloat(a)) : 0
|
||||
let varB = (b !== undefined) ? (isNaN(b) ? 0 : parseFloat(b)) : 0
|
||||
return varA + varB
|
||||
}
|
||||
}
|
||||
|
||||
let isValidNumber = function(num) {
|
||||
return num !== undefined && !isNaN(num)
|
||||
}
|
||||
|
||||
let aggrFuncDiv = {
|
||||
sum: false,
|
||||
count: false,
|
||||
|
|
|
|||
|
|
@ -49,7 +49,11 @@ limitations under the License.
|
|||
data-drop="true" jqyoui-droppable="{multiple:true, onDrop:'save()'}"
|
||||
class="list-unstyled"
|
||||
style="border-radius: 6px; margin-top: 7px;">
|
||||
<li ng-repeat="item in config.keys">
|
||||
<li ng-repeat="item in config.keys track by $index"
|
||||
ng-model="config.keys"
|
||||
data-drag="true"
|
||||
jqyoui-draggable="{index: {{$index}}, animate: false}"
|
||||
data-jqyoui-options="{revert: 'invalid', placeholder: 'keep', helper: 'clone'}">
|
||||
<div class="btn btn-default btn-xs"
|
||||
style="background-color: #EFEFEF; margin: 2px 0px 0px 2px;">
|
||||
{{item.name}}
|
||||
|
|
@ -69,7 +73,11 @@ limitations under the License.
|
|||
jqyoui-droppable="{multiple:true, onDrop:'save()'}"
|
||||
class="list-unstyled"
|
||||
style="border-radius: 6px; margin-top: 7px;">
|
||||
<li ng-repeat="item in config.groups">
|
||||
<li ng-repeat="item in config.groups track by $index"
|
||||
ng-model="config.groups"
|
||||
data-drag="true"
|
||||
jqyoui-draggable="{index: {{$index}}, animate: false}"
|
||||
data-jqyoui-options="{revert: 'invalid', placeholder: 'keep', helper: 'clone'}">
|
||||
<div class="btn btn-default btn-xs"
|
||||
style="background-color: #EFEFEF; margin: 2px 0px 0px 2px;">
|
||||
{{item.name}}
|
||||
|
|
@ -89,7 +97,11 @@ limitations under the License.
|
|||
jqyoui-droppable="{multiple:true, onDrop:'save()'}"
|
||||
class="list-unstyled"
|
||||
style="border-radius: 6px; margin-top: 7px;">
|
||||
<li ng-repeat="item in config.values">
|
||||
<li ng-repeat="item in config.values track by $index"
|
||||
ng-model="config.values"
|
||||
data-drag="true"
|
||||
jqyoui-draggable="{index: {{$index}}, animate: false}"
|
||||
data-jqyoui-options="{revert: 'invalid', placeholder: 'keep', helper: 'clone'}">
|
||||
<div class="btn-group">
|
||||
<div class="btn btn-default btn-xs dropdown-toggle"
|
||||
style="background-color: #EFEFEF;"
|
||||
|
|
@ -112,3 +124,4 @@ limitations under the License.
|
|||
</div>
|
||||
</div> <!-- panel-body -->
|
||||
</div> <!-- panel -->
|
||||
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@
|
|||
*/
|
||||
|
||||
import TableData from './tabledata.js'
|
||||
import PivotTransformation from './pivot.js'
|
||||
|
||||
describe('TableData build', function () {
|
||||
let td
|
||||
|
|
@ -39,3 +40,92 @@ describe('TableData build', function () {
|
|||
expect(td.comment).toBe('hello')
|
||||
})
|
||||
})
|
||||
|
||||
describe('PivotTransformation build', function() {
|
||||
let pt
|
||||
|
||||
beforeEach(function () {
|
||||
console.log(PivotTransformation)
|
||||
pt = new PivotTransformation()
|
||||
})
|
||||
|
||||
it('check the result of keys, groups and values unique', function() {
|
||||
// set inited mock data
|
||||
let config = {
|
||||
common: {
|
||||
pivot: {
|
||||
keys: [{index: 4, name: '4'},
|
||||
{index: 3, name: '3'},
|
||||
{index: 4, name: '4'},
|
||||
{index: 3, name: '3'},
|
||||
{index: 3, name: '3'},
|
||||
{index: 3, name: '3'},
|
||||
{index: 3, name: '3'},
|
||||
{index: 5, name: '5'}],
|
||||
groups: [],
|
||||
values: []
|
||||
}
|
||||
}
|
||||
}
|
||||
pt.tableDataColumns = [
|
||||
{index: 1, name: '1'},
|
||||
{index: 2, name: '2'},
|
||||
{index: 3, name: '3'},
|
||||
{index: 4, name: '4'},
|
||||
{index: 5, name: '5'}]
|
||||
|
||||
pt.setConfig(config)
|
||||
|
||||
pt.removeUnknown()
|
||||
|
||||
expect(config.common.pivot.keys.length).toBe(3)
|
||||
expect(config.common.pivot.keys[0].index).toBe(4)
|
||||
expect(config.common.pivot.keys[1].index).toBe(3)
|
||||
expect(config.common.pivot.keys[2].index).toBe(5)
|
||||
})
|
||||
|
||||
it('should aggregate values correctly', function() {
|
||||
let td = new TableData()
|
||||
td.loadParagraphResult({
|
||||
type: 'TABLE',
|
||||
msg: 'key\tvalue\na\t10\na\tnull\na\t0\na\t1\n'
|
||||
})
|
||||
|
||||
let config = {
|
||||
common: {
|
||||
pivot: {
|
||||
keys: [
|
||||
{
|
||||
'name': 'key',
|
||||
'index': 0.0,
|
||||
}
|
||||
],
|
||||
groups: [],
|
||||
values: [
|
||||
{
|
||||
'name': 'value',
|
||||
'index': 1.0,
|
||||
'aggr': 'sum'
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pt.setConfig(config)
|
||||
let transformed = pt.transform(td)
|
||||
expect(transformed.rows['a']['value(sum)'].value).toBe(11)
|
||||
|
||||
pt.config.common.pivot.values[0].aggr = 'max'
|
||||
transformed = pt.transform(td)
|
||||
expect(transformed.rows['a']['value(max)'].value).toBe(10)
|
||||
|
||||
pt.config.common.pivot.values[0].aggr = 'min'
|
||||
transformed = pt.transform(td)
|
||||
expect(transformed.rows['a']['value(min)'].value).toBe(0)
|
||||
|
||||
pt.config.common.pivot.values[0].aggr = 'count'
|
||||
transformed = pt.transform(td)
|
||||
expect(transformed.rows['a']['value(count)'].value).toBe(4)
|
||||
})
|
||||
})
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ limitations under the License.
|
|||
<div class="form-group">
|
||||
<label for="userName">User Name</label>
|
||||
<input placeholder="User Name" type="text" class="form-control" id="userName"
|
||||
ng-enter="login()"
|
||||
ng-keypress="loginParams.errorText = ''"
|
||||
ng-model="loginParams.userName" />
|
||||
</div>
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@
|
|||
<lucene.version>5.3.1</lucene.version>
|
||||
<org.reflections.version>0.9.8</org.reflections.version>
|
||||
<xml.apis.version>1.4.01</xml.apis.version>
|
||||
<eclipse.jgit.version>4.1.1.201511131810-r</eclipse.jgit.version>
|
||||
<eclipse.jgit.version>4.5.4.201711221230-r</eclipse.jgit.version>
|
||||
<frontend.maven.plugin.version>1.3</frontend.maven.plugin.version>
|
||||
|
||||
<!--test library versions-->
|
||||
|
|
|
|||
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* Special Interpreter for Interpreter Configuration customization. It is attached to each
|
||||
* InterpreterGroup implicitly by Zeppelin.
|
||||
*/
|
||||
public class ConfInterpreter extends Interpreter {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(ConfInterpreter.class);
|
||||
|
||||
private String interpreterGroupId;
|
||||
private InterpreterSetting interpreterSetting;
|
||||
|
||||
|
||||
public ConfInterpreter(Properties properties,
|
||||
String interpreterGroupId,
|
||||
InterpreterSetting interpreterSetting) {
|
||||
super(properties);
|
||||
this.interpreterGroupId = interpreterGroupId;
|
||||
this.interpreterSetting = interpreterSetting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() throws InterpreterException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws InterpreterException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context)
|
||||
throws InterpreterException {
|
||||
|
||||
try {
|
||||
Properties finalProperties = new Properties();
|
||||
finalProperties.putAll(getProperties());
|
||||
Properties newProperties = new Properties();
|
||||
newProperties.load(new StringReader(st));
|
||||
finalProperties.putAll(newProperties);
|
||||
LOGGER.debug("Properties for InterpreterGroup: " + interpreterGroupId + " is "
|
||||
+ finalProperties);
|
||||
interpreterSetting.setInterpreterGroupProperties(interpreterGroupId, finalProperties);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Fail to update interpreter setting", e);
|
||||
return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) throws InterpreterException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() throws InterpreterException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) throws InterpreterException {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
@ -38,6 +38,8 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
|
|||
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
|
||||
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
|
||||
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
|
||||
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
|
||||
|
|
@ -138,9 +140,14 @@ public class InterpreterSetting {
|
|||
// launcher in future when we have other launcher implementation. e.g. third party launcher
|
||||
// service like livy
|
||||
private transient InterpreterLauncher launcher;
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
private transient LifecycleManager lifecycleManager;
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
|
||||
private transient RecoveryStorage recoveryStorage;
|
||||
///////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Builder class for InterpreterSetting
|
||||
|
|
@ -240,6 +247,11 @@ public class InterpreterSetting {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder setRecoveryStorage(RecoveryStorage recoveryStorage) {
|
||||
interpreterSetting.recoveryStorage = recoveryStorage;
|
||||
return this;
|
||||
}
|
||||
|
||||
public InterpreterSetting create() {
|
||||
// post processing
|
||||
interpreterSetting.postProcessing();
|
||||
|
|
@ -259,6 +271,13 @@ public class InterpreterSetting {
|
|||
if (this.lifecycleManager == null) {
|
||||
this.lifecycleManager = new NullLifecycleManager(conf);
|
||||
}
|
||||
if (this.recoveryStorage == null) {
|
||||
try {
|
||||
this.recoveryStorage = new NullRecoveryStorage(conf, interpreterSettingManager);
|
||||
} catch (IOException e) {
|
||||
// ignore this exception as NullRecoveryStorage will do nothing.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -283,9 +302,9 @@ public class InterpreterSetting {
|
|||
|
||||
private void createLauncher() {
|
||||
if (group.equals("spark")) {
|
||||
this.launcher = new SparkInterpreterLauncher(this.conf);
|
||||
this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
|
||||
} else {
|
||||
this.launcher = new ShellScriptLauncher(this.conf);
|
||||
this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -342,6 +361,15 @@ public class InterpreterSetting {
|
|||
return this;
|
||||
}
|
||||
|
||||
public InterpreterSetting setRecoveryStorage(RecoveryStorage recoveryStorage) {
|
||||
this.recoveryStorage = recoveryStorage;
|
||||
return this;
|
||||
}
|
||||
|
||||
public RecoveryStorage getRecoveryStorage() {
|
||||
return recoveryStorage;
|
||||
}
|
||||
|
||||
public LifecycleManager getLifecycleManager() {
|
||||
return lifecycleManager;
|
||||
}
|
||||
|
|
@ -406,7 +434,12 @@ public class InterpreterSetting {
|
|||
}
|
||||
|
||||
void removeInterpreterGroup(String groupId) {
|
||||
this.interpreterGroups.remove(groupId);
|
||||
try {
|
||||
interpreterGroupWriteLock.lock();
|
||||
this.interpreterGroups.remove(groupId);
|
||||
} finally {
|
||||
interpreterGroupWriteLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) {
|
||||
|
|
@ -423,7 +456,6 @@ public class InterpreterSetting {
|
|||
return interpreterGroups.get(groupId);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ArrayList<ManagedInterpreterGroup> getAllInterpreterGroups() {
|
||||
try {
|
||||
interpreterGroupReadLock.lock();
|
||||
|
|
@ -648,12 +680,11 @@ public class InterpreterSetting {
|
|||
///////////////////////////////////////////////////////////////////////////////////////
|
||||
// This is the only place to create interpreters. For now we always create multiple interpreter
|
||||
// together (one session). We don't support to create single interpreter yet.
|
||||
List<Interpreter> createInterpreters(String user, String sessionId) {
|
||||
List<Interpreter> createInterpreters(String user, String interpreterGroupId, String sessionId) {
|
||||
List<Interpreter> interpreters = new ArrayList<>();
|
||||
List<InterpreterInfo> interpreterInfos = getInterpreterInfos();
|
||||
for (InterpreterInfo info : interpreterInfos) {
|
||||
Interpreter interpreter = null;
|
||||
interpreter = new RemoteInterpreter(getJavaProperties(), sessionId,
|
||||
Interpreter interpreter = new RemoteInterpreter(getJavaProperties(), sessionId,
|
||||
info.getClassName(), user, lifecycleManager);
|
||||
if (info.isDefaultInterpreter()) {
|
||||
interpreters.add(0, interpreter);
|
||||
|
|
@ -663,18 +694,23 @@ public class InterpreterSetting {
|
|||
LOGGER.info("Interpreter {} created for user: {}, sessionId: {}",
|
||||
interpreter.getClassName(), user, sessionId);
|
||||
}
|
||||
interpreters.add(new ConfInterpreter(getJavaProperties(), interpreterGroupId, this));
|
||||
return interpreters;
|
||||
}
|
||||
|
||||
synchronized RemoteInterpreterProcess createInterpreterProcess() throws IOException {
|
||||
synchronized RemoteInterpreterProcess createInterpreterProcess(String interpreterGroupId,
|
||||
Properties properties)
|
||||
throws IOException {
|
||||
if (launcher == null) {
|
||||
createLauncher();
|
||||
}
|
||||
InterpreterLaunchContext launchContext = new
|
||||
InterpreterLaunchContext(getJavaProperties(), option, interpreterRunner, id, group, name);
|
||||
InterpreterLaunchContext(properties, option, interpreterRunner,
|
||||
interpreterGroupId, id, group, name);
|
||||
RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext);
|
||||
process.setRemoteInterpreterEventPoller(
|
||||
new RemoteInterpreterEventPoller(remoteInterpreterProcessListener, appEventListener));
|
||||
recoveryStorage.onInterpreterClientStart(process);
|
||||
return process;
|
||||
}
|
||||
|
||||
|
|
@ -716,6 +752,11 @@ public class InterpreterSetting {
|
|||
return info.getClassName();
|
||||
}
|
||||
}
|
||||
//TODO(zjffdu) It requires user can not create interpreter with name `conf`,
|
||||
// conf is a reserved word of interpreter name
|
||||
if (replName.equals("conf")) {
|
||||
return ConfInterpreter.class.getName();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
@ -728,6 +769,29 @@ public class InterpreterSetting {
|
|||
return interpreterGroup;
|
||||
}
|
||||
|
||||
/**
|
||||
* Throw exception when interpreter process has already launched
|
||||
*
|
||||
* @param interpreterGroupId
|
||||
* @param properties
|
||||
* @throws IOException
|
||||
*/
|
||||
public void setInterpreterGroupProperties(String interpreterGroupId, Properties properties)
|
||||
throws IOException {
|
||||
ManagedInterpreterGroup interpreterGroup = this.interpreterGroups.get(interpreterGroupId);
|
||||
for (List<Interpreter> session : interpreterGroup.sessions.values()) {
|
||||
for (Interpreter intp : session) {
|
||||
if (!intp.getProperties().equals(properties) &&
|
||||
interpreterGroup.getRemoteInterpreterProcess() != null &&
|
||||
interpreterGroup.getRemoteInterpreterProcess().isRunning()) {
|
||||
throw new IOException("Can not change interpreter properties when interpreter process " +
|
||||
"has already been launched");
|
||||
}
|
||||
intp.setProperties(properties);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void loadInterpreterDependencies() {
|
||||
setStatus(Status.DOWNLOADING_DEPENDENCIES);
|
||||
setErrorReason(null);
|
||||
|
|
|
|||
|
|
@ -34,12 +34,16 @@ import org.apache.zeppelin.dep.DependencyResolver;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistryListener;
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
|
||||
import org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.apache.zeppelin.resource.Resource;
|
||||
import org.apache.zeppelin.resource.ResourcePool;
|
||||
import org.apache.zeppelin.resource.ResourceSet;
|
||||
import org.apache.zeppelin.util.ReflectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.sonatype.aether.repository.Authentication;
|
||||
|
|
@ -118,6 +122,7 @@ public class InterpreterSettingManager {
|
|||
private ApplicationEventListener appEventListener;
|
||||
private DependencyResolver dependencyResolver;
|
||||
private LifecycleManager lifecycleManager;
|
||||
private RecoveryStorage recoveryStorage;
|
||||
|
||||
public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration,
|
||||
AngularObjectRegistryListener angularObjectRegistryListener,
|
||||
|
|
@ -154,13 +159,17 @@ public class InterpreterSettingManager {
|
|||
this.angularObjectRegistryListener = angularObjectRegistryListener;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
this.appEventListener = appEventListener;
|
||||
try {
|
||||
this.lifecycleManager = (LifecycleManager)
|
||||
Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class)
|
||||
.newInstance(conf);
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Fail to create LifecycleManager", e);
|
||||
}
|
||||
|
||||
this.recoveryStorage = ReflectionUtils.createClazzInstance(conf.getRecoveryStorageClass(),
|
||||
new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class},
|
||||
new Object[] {conf, this});
|
||||
this.recoveryStorage.init();
|
||||
LOGGER.info("Using RecoveryStorage: " + this.recoveryStorage.getClass().getName());
|
||||
|
||||
this.lifecycleManager = ReflectionUtils.createClazzInstance(conf.getLifecycleManagerClass(),
|
||||
new Class[] {ZeppelinConfiguration.class},
|
||||
new Object[] {conf});
|
||||
LOGGER.info("Using LifecycleManager: " + this.lifecycleManager.getClass().getName());
|
||||
|
||||
init();
|
||||
}
|
||||
|
|
@ -174,6 +183,7 @@ public class InterpreterSettingManager {
|
|||
.setAppEventListener(appEventListener)
|
||||
.setDependencyResolver(dependencyResolver)
|
||||
.setLifecycleManager(lifecycleManager)
|
||||
.setRecoveryStorage(recoveryStorage)
|
||||
.postProcessing();
|
||||
}
|
||||
|
||||
|
|
@ -307,8 +317,16 @@ public class InterpreterSettingManager {
|
|||
saveToFile();
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcessListener getRemoteInterpreterProcessListener() {
|
||||
return remoteInterpreterProcessListener;
|
||||
}
|
||||
|
||||
public ApplicationEventListener getAppEventListener() {
|
||||
return appEventListener;
|
||||
}
|
||||
|
||||
private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir,
|
||||
String interpreterJson) throws IOException {
|
||||
String interpreterJson) throws IOException {
|
||||
URL[] urls = recursiveBuildLibList(new File(interpreterDir));
|
||||
ClassLoader tempClassLoader = new URLClassLoader(urls, null);
|
||||
|
||||
|
|
@ -507,6 +525,10 @@ public class InterpreterSettingManager {
|
|||
return resourceSet;
|
||||
}
|
||||
|
||||
public RecoveryStorage getRecoveryStorage() {
|
||||
return recoveryStorage;
|
||||
}
|
||||
|
||||
public void removeResourcesBelongsToParagraph(String noteId, String paragraphId) {
|
||||
for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) {
|
||||
ResourceSet resourceSet = new ResourceSet();
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ import org.slf4j.LoggerFactory;
|
|||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
/**
|
||||
* ManagedInterpreterGroup runs under zeppelin server
|
||||
|
|
@ -54,14 +55,31 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
return interpreterSetting;
|
||||
}
|
||||
|
||||
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
|
||||
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName,
|
||||
Properties properties)
|
||||
throws IOException {
|
||||
if (remoteInterpreterProcess == null) {
|
||||
LOGGER.info("Create InterpreterProcess for InterpreterGroup: " + getId());
|
||||
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess();
|
||||
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, properties);
|
||||
synchronized (remoteInterpreterProcess) {
|
||||
if (!remoteInterpreterProcess.isRunning()) {
|
||||
remoteInterpreterProcess.start(userName, false);
|
||||
remoteInterpreterProcess.getRemoteInterpreterEventPoller()
|
||||
.setInterpreterProcess(remoteInterpreterProcess);
|
||||
remoteInterpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(this);
|
||||
remoteInterpreterProcess.getRemoteInterpreterEventPoller().start();
|
||||
getInterpreterSetting().getRecoveryStorage()
|
||||
.onInterpreterClientStart(remoteInterpreterProcess);
|
||||
}
|
||||
}
|
||||
}
|
||||
return remoteInterpreterProcess;
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcess getInterpreterProcess() {
|
||||
return remoteInterpreterProcess;
|
||||
}
|
||||
|
||||
public RemoteInterpreterProcess getRemoteInterpreterProcess() {
|
||||
return remoteInterpreterProcess;
|
||||
}
|
||||
|
|
@ -92,6 +110,11 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
if (remoteInterpreterProcess != null) {
|
||||
LOGGER.info("Kill RemoteInterpreterProcess");
|
||||
remoteInterpreterProcess.stop();
|
||||
try {
|
||||
interpreterSetting.getRecoveryStorage().onInterpreterClientStop(remoteInterpreterProcess);
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Fail to store recovery data", e);
|
||||
}
|
||||
remoteInterpreterProcess = null;
|
||||
}
|
||||
}
|
||||
|
|
@ -131,7 +154,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup {
|
|||
if (sessions.containsKey(sessionId)) {
|
||||
return sessions.get(sessionId);
|
||||
} else {
|
||||
List<Interpreter> interpreters = interpreterSetting.createInterpreters(user, sessionId);
|
||||
List<Interpreter> interpreters = interpreterSetting.createInterpreters(user, id, sessionId);
|
||||
for (Interpreter interpreter : interpreters) {
|
||||
interpreter.setInterpreterGroup(this);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,50 +21,68 @@ package org.apache.zeppelin.interpreter.launcher;
|
|||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterRunner;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Interpreter Launcher which use shell script to launch the interpreter process.
|
||||
*
|
||||
*/
|
||||
public class ShellScriptLauncher extends InterpreterLauncher {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ShellScriptLauncher.class);
|
||||
|
||||
public ShellScriptLauncher(ZeppelinConfiguration zConf) {
|
||||
super(zConf);
|
||||
public ShellScriptLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
|
||||
super(zConf, recoveryStorage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterClient launch(InterpreterLaunchContext context) {
|
||||
public InterpreterClient launch(InterpreterLaunchContext context) throws IOException {
|
||||
LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup());
|
||||
this.properties = context.getProperties();
|
||||
InterpreterOption option = context.getOption();
|
||||
InterpreterRunner runner = context.getRunner();
|
||||
String groupName = context.getInterpreterSettingGroup();
|
||||
String name = context.getInterpreterSettingName();
|
||||
|
||||
int connectTimeout =
|
||||
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
|
||||
if (option.isExistingProcess()) {
|
||||
return new RemoteInterpreterRunningProcess(
|
||||
context.getInterpreterSettingName(),
|
||||
connectTimeout,
|
||||
option.getHost(),
|
||||
option.getPort());
|
||||
} else {
|
||||
// try to recover it first
|
||||
if (zConf.isRecoveryEnabled()) {
|
||||
InterpreterClient recoveredClient =
|
||||
recoveryStorage.getInterpreterClient(context.getInterpreterGroupId());
|
||||
if (recoveredClient != null) {
|
||||
if (recoveredClient.isRunning()) {
|
||||
LOGGER.info("Recover interpreter process: " + recoveredClient.getHost() + ":" +
|
||||
recoveredClient.getPort());
|
||||
return recoveredClient;
|
||||
} else {
|
||||
LOGGER.warn("Cannot recover interpreter process: " + recoveredClient.getHost() + ":"
|
||||
+ recoveredClient.getPort() + ", as it is already terminated.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// create new remote process
|
||||
String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
|
||||
+ context.getInterpreterSettingId();
|
||||
return new RemoteInterpreterManagedProcess(
|
||||
runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(),
|
||||
zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
|
||||
zConf.getCallbackPortRange(), zConf.getInterpreterPortRange(),
|
||||
zConf.getInterpreterDir() + "/" + groupName, localRepoPath,
|
||||
buildEnvFromProperties(), connectTimeout, name);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
|
|||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
|
@ -35,8 +36,8 @@ public class SparkInterpreterLauncher extends ShellScriptLauncher {
|
|||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterLauncher.class);
|
||||
|
||||
public SparkInterpreterLauncher(ZeppelinConfiguration zConf) {
|
||||
super(zConf);
|
||||
public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) {
|
||||
super(zConf, recoveryStorage);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -0,0 +1,139 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.PathFilter;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
|
||||
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
|
||||
import org.apache.zeppelin.notebook.FileSystemStorage;
|
||||
import org.apache.zeppelin.notebook.repo.FileSystemNotebookRepo;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* Hadoop compatible FileSystem based RecoveryStorage implementation.
|
||||
*
|
||||
* Save InterpreterProcess in the format of:
|
||||
* InterpreterGroupId host:port
|
||||
*/
|
||||
public class FileSystemRecoveryStorage extends RecoveryStorage {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemRecoveryStorage.class);
|
||||
|
||||
private InterpreterSettingManager interpreterSettingManager;
|
||||
private FileSystemStorage fs;
|
||||
private Path recoveryDir;
|
||||
|
||||
public FileSystemRecoveryStorage(ZeppelinConfiguration zConf,
|
||||
InterpreterSettingManager interpreterSettingManager)
|
||||
throws IOException {
|
||||
super(zConf);
|
||||
this.interpreterSettingManager = interpreterSettingManager;
|
||||
this.zConf = zConf;
|
||||
this.fs = FileSystemStorage.get(zConf);
|
||||
this.recoveryDir = this.fs.makeQualified(new Path(zConf.getRecoveryDir()));
|
||||
LOGGER.info("Using folder {} to store recovery data", recoveryDir);
|
||||
this.fs.tryMkDir(recoveryDir);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterClientStart(InterpreterClient client) throws IOException {
|
||||
save(client.getInterpreterSettingName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterClientStop(InterpreterClient client) throws IOException {
|
||||
save(client.getInterpreterSettingName());
|
||||
}
|
||||
|
||||
private void save(String interpreterSettingName) throws IOException {
|
||||
InterpreterSetting interpreterSetting =
|
||||
interpreterSettingManager.getInterpreterSettingByName(interpreterSettingName);
|
||||
List<String> recoveryContent = new ArrayList<>();
|
||||
for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) {
|
||||
RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess();
|
||||
if (interpreterProcess != null) {
|
||||
recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" +
|
||||
interpreterProcess.getPort());
|
||||
}
|
||||
}
|
||||
LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName);
|
||||
LOGGER.debug("Recovery Data: " + StringUtils.join(recoveryContent, System.lineSeparator()));
|
||||
Path recoveryFile = new Path(recoveryDir, interpreterSettingName + ".recovery");
|
||||
fs.writeFile(StringUtils.join(recoveryContent, System.lineSeparator()), recoveryFile, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, InterpreterClient> restore() throws IOException {
|
||||
Map<String, InterpreterClient> clients = new HashMap<>();
|
||||
List<Path> paths = fs.list(new Path(recoveryDir + "/*.recovery"));
|
||||
|
||||
for (Path path : paths) {
|
||||
String fileName = path.getName();
|
||||
String interpreterSettingName = fileName.substring(0,
|
||||
fileName.length() - ".recovery".length());
|
||||
String recoveryContent = fs.readFile(path);
|
||||
if (!StringUtils.isBlank(recoveryContent)) {
|
||||
for (String line : recoveryContent.split(System.lineSeparator())) {
|
||||
String[] tokens = line.split("\t");
|
||||
String groupId = tokens[0];
|
||||
String[] hostPort = tokens[1].split(":");
|
||||
int connectTimeout =
|
||||
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess(
|
||||
interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1]));
|
||||
// interpreterSettingManager may be null when this class is used when it is used
|
||||
// stop-interpreter.sh
|
||||
if (interpreterSettingManager != null) {
|
||||
client.setRemoteInterpreterEventPoller(new RemoteInterpreterEventPoller(
|
||||
interpreterSettingManager.getRemoteInterpreterProcessListener(),
|
||||
interpreterSettingManager.getAppEventListener()));
|
||||
}
|
||||
clients.put(groupId, client);
|
||||
LOGGER.info("Recovering Interpreter Process: " + hostPort[0] + ":" + hostPort[1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return clients;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* RecoveryStorage that do nothing, used when recovery is not enabled.
|
||||
*
|
||||
*/
|
||||
public class NullRecoveryStorage extends RecoveryStorage {
|
||||
|
||||
public NullRecoveryStorage(ZeppelinConfiguration zConf,
|
||||
InterpreterSettingManager interpreterSettingManager)
|
||||
throws IOException {
|
||||
super(zConf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterClientStart(InterpreterClient client) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onInterpreterClientStop(InterpreterClient client) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, InterpreterClient> restore() throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,40 @@
|
|||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
|
||||
import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
|
||||
import org.apache.zeppelin.util.ReflectionUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
/**
|
||||
* Utility class for stopping interpreter in the case that you want to stop all the
|
||||
* interpreter process even when you enable recovery, or you want to kill interpreter process
|
||||
* to avoid orphan process.
|
||||
*/
|
||||
public class StopInterpreter {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(StopInterpreter.class);
|
||||
|
||||
public static void main(String[] args) throws IOException {
|
||||
ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
|
||||
RecoveryStorage recoveryStorage = null;
|
||||
|
||||
recoveryStorage = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(),
|
||||
new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class},
|
||||
new Object[] {zConf, null});
|
||||
|
||||
LOGGER.info("Using RecoveryStorage: " + recoveryStorage.getClass().getName());
|
||||
Map<String, InterpreterClient> restoredClients = recoveryStorage.restore();
|
||||
if (restoredClients != null) {
|
||||
for (InterpreterClient client : restoredClients.values()) {
|
||||
LOGGER.info("Stop Interpreter Process: " + client.getHost() + ":" + client.getPort());
|
||||
client.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -25,6 +25,7 @@ import org.apache.zeppelin.display.AngularObject;
|
|||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.display.Input;
|
||||
import org.apache.zeppelin.interpreter.ConfInterpreter;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
|
|
@ -101,16 +102,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
return this.interpreterProcess;
|
||||
}
|
||||
ManagedInterpreterGroup intpGroup = getInterpreterGroup();
|
||||
this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess();
|
||||
synchronized (interpreterProcess) {
|
||||
if (!interpreterProcess.isRunning()) {
|
||||
interpreterProcess.start(this.getUserName(), false);
|
||||
interpreterProcess.getRemoteInterpreterEventPoller()
|
||||
.setInterpreterProcess(interpreterProcess);
|
||||
interpreterProcess.getRemoteInterpreterEventPoller().setInterpreterGroup(intpGroup);
|
||||
interpreterProcess.getRemoteInterpreterEventPoller().start();
|
||||
}
|
||||
}
|
||||
this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(getUserName(), properties);
|
||||
return interpreterProcess;
|
||||
}
|
||||
|
||||
|
|
@ -130,7 +122,9 @@ public class RemoteInterpreter extends Interpreter {
|
|||
for (Interpreter interpreter : getInterpreterGroup()
|
||||
.getOrCreateSession(this.getUserName(), sessionId)) {
|
||||
try {
|
||||
((RemoteInterpreter) interpreter).internal_create();
|
||||
if (!(interpreter instanceof ConfInterpreter)) {
|
||||
((RemoteInterpreter) interpreter).internal_create();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
|
@ -346,8 +340,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
final InterpreterContext interpreterContext)
|
||||
throws InterpreterException {
|
||||
if (!isOpened) {
|
||||
LOGGER.warn("completion is called when RemoterInterpreter is not opened for " + className);
|
||||
return new ArrayList<>();
|
||||
open();
|
||||
}
|
||||
RemoteInterpreterProcess interpreterProcess = null;
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -214,7 +214,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
callbackServer.stop();
|
||||
}
|
||||
if (isRunning()) {
|
||||
logger.info("kill interpreter process");
|
||||
logger.info("Kill interpreter process");
|
||||
try {
|
||||
callRemoteFunction(new RemoteFunction<Void>() {
|
||||
@Override
|
||||
|
|
@ -263,7 +263,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess
|
|||
return interpreterDir;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getInterpreterSettingName() {
|
||||
return interpreterSettingName;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,12 +51,6 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient {
|
|||
this.remoteInterpreterEventPoller = eventPoller;
|
||||
}
|
||||
|
||||
public abstract String getHost();
|
||||
public abstract int getPort();
|
||||
public abstract void start(String userName, Boolean isUserImpersonate);
|
||||
public abstract void stop();
|
||||
public abstract boolean isRunning();
|
||||
|
||||
public int getConnectTimeout() {
|
||||
return connectTimeout;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
package org.apache.zeppelin.interpreter.remote;
|
||||
|
||||
import org.apache.zeppelin.helium.ApplicationEventListener;
|
||||
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
|
@ -27,13 +28,16 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class);
|
||||
private final String host;
|
||||
private final int port;
|
||||
private final String interpreterSettingName;
|
||||
|
||||
public RemoteInterpreterRunningProcess(
|
||||
String interpreterSettingName,
|
||||
int connectTimeout,
|
||||
String host,
|
||||
int port
|
||||
) {
|
||||
super(connectTimeout);
|
||||
this.interpreterSettingName = interpreterSettingName;
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
|
@ -48,6 +52,11 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
return port;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInterpreterSettingName() {
|
||||
return interpreterSettingName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(String userName, Boolean isUserImpersonate) {
|
||||
// assume process is externally managed. nothing to do
|
||||
|
|
@ -55,7 +64,24 @@ public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess {
|
|||
|
||||
@Override
|
||||
public void stop() {
|
||||
// assume process is externally managed. nothing to do
|
||||
// assume process is externally managed. nothing to do. But will kill it
|
||||
// when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that.
|
||||
if (System.getenv("ZEPPELIN_FORCE_STOP") != null) {
|
||||
if (isRunning()) {
|
||||
logger.info("Kill interpreter process");
|
||||
try {
|
||||
callRemoteFunction(new RemoteFunction<Void>() {
|
||||
@Override
|
||||
public Void call(RemoteInterpreterService.Client client) throws Exception {
|
||||
client.shutdown();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.warn("ignore the exception when shutting down interpreter process.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -0,0 +1,168 @@
|
|||
package org.apache.zeppelin.notebook;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
/**
|
||||
* Hadoop FileSystem wrapper. Support both secure and no-secure mode
|
||||
*/
|
||||
public class FileSystemStorage {
|
||||
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(FileSystemStorage.class);
|
||||
|
||||
private static FileSystemStorage instance;
|
||||
|
||||
private ZeppelinConfiguration zConf;
|
||||
private Configuration hadoopConf;
|
||||
private boolean isSecurityEnabled = false;
|
||||
private FileSystem fs;
|
||||
|
||||
private FileSystemStorage(ZeppelinConfiguration zConf) throws IOException {
|
||||
this.zConf = zConf;
|
||||
this.hadoopConf = new Configuration();
|
||||
this.hadoopConf.set("fs.file.impl", RawLocalFileSystem.class.getName());
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
|
||||
if (isSecurityEnabled) {
|
||||
String keytab = zConf.getString(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
|
||||
String principal = zConf.getString(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
|
||||
if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
|
||||
throw new IOException("keytab and principal can not be empty, keytab: " + keytab
|
||||
+ ", principal: " + principal);
|
||||
}
|
||||
UserGroupInformation.loginUserFromKeytab(principal, keytab);
|
||||
}
|
||||
|
||||
try {
|
||||
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), this.hadoopConf);
|
||||
LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName());
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static synchronized FileSystemStorage get(ZeppelinConfiguration zConf) throws IOException {
|
||||
if (instance == null) {
|
||||
instance = new FileSystemStorage(zConf);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public Path makeQualified(Path path) {
|
||||
return fs.makeQualified(path);
|
||||
}
|
||||
|
||||
public void tryMkDir(final Path dir) throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
if (!fs.exists(dir)) {
|
||||
fs.mkdirs(dir);
|
||||
LOGGER.info("Create dir {} in hdfs", dir.toString());
|
||||
}
|
||||
if (fs.isFile(dir)) {
|
||||
throw new IOException(dir.toString() + " is file instead of directory, please remove " +
|
||||
"it or specify another directory");
|
||||
}
|
||||
fs.mkdirs(dir);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public List<Path> list(final Path path) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<List<Path>>() {
|
||||
@Override
|
||||
public List<Path> call() throws IOException {
|
||||
List<Path> paths = new ArrayList<>();
|
||||
for (FileStatus status : fs.globStatus(path)) {
|
||||
paths.add(status.getPath());
|
||||
}
|
||||
return paths;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public boolean delete(final Path path) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() throws IOException {
|
||||
return fs.delete(path, true);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public String readFile(final Path file) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<String>() {
|
||||
@Override
|
||||
public String call() throws IOException {
|
||||
LOGGER.debug("Read from file: " + file);
|
||||
ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
|
||||
IOUtils.copyBytes(fs.open(file), noteBytes, hadoopConf);
|
||||
return new String(noteBytes.toString(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void writeFile(final String content, final Path file, boolean writeTempFileFirst)
|
||||
throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
InputStream in = new ByteArrayInputStream(content.getBytes(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
|
||||
Path tmpFile = new Path(file.toString() + ".tmp");
|
||||
IOUtils.copyBytes(in, fs.create(tmpFile), hadoopConf);
|
||||
fs.delete(file, true);
|
||||
fs.rename(tmpFile, file);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private interface HdfsOperation<T> {
|
||||
T call() throws IOException;
|
||||
}
|
||||
|
||||
public synchronized <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
|
||||
if (isSecurityEnabled) {
|
||||
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||
try {
|
||||
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<T>() {
|
||||
@Override
|
||||
public T run() throws Exception {
|
||||
return func.call();
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
} else {
|
||||
return func.call();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -191,7 +191,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
|
|||
this.scriptText = this.text.substring(headingSpace.length() + intpText.length() + 1).trim();
|
||||
} else {
|
||||
this.intpText = "";
|
||||
this.scriptText = this.text;
|
||||
this.scriptText = this.text.trim();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -250,14 +250,17 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
|
|||
return note.getInterpreterCompletion();
|
||||
}
|
||||
}
|
||||
String trimmedBuffer = buffer != null ? buffer.trim() : null;
|
||||
cursor = calculateCursorPosition(buffer, trimmedBuffer, cursor);
|
||||
this.interpreter = getBindedInterpreter();
|
||||
|
||||
setText(buffer);
|
||||
|
||||
cursor = calculateCursorPosition(buffer, cursor);
|
||||
|
||||
InterpreterContext interpreterContext = getInterpreterContextWithoutRunner(null);
|
||||
|
||||
try {
|
||||
if (this.interpreter != null) {
|
||||
return this.interpreter.completion(scriptText, cursor, interpreterContext);
|
||||
return this.interpreter.completion(this.scriptText, cursor, interpreterContext);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
|
@ -266,24 +269,15 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
|
|||
}
|
||||
}
|
||||
|
||||
public int calculateCursorPosition(String buffer, String trimmedBuffer, int cursor) {
|
||||
int countWhitespacesAtStart = buffer.indexOf(trimmedBuffer);
|
||||
if (countWhitespacesAtStart > 0) {
|
||||
cursor -= countWhitespacesAtStart;
|
||||
}
|
||||
public int calculateCursorPosition(String buffer, int cursor) {
|
||||
// scriptText trimmed
|
||||
|
||||
// parse text to get interpreter component
|
||||
String repl = null;
|
||||
if (trimmedBuffer != null) {
|
||||
Matcher matcher = REPL_PATTERN.matcher(trimmedBuffer);
|
||||
if (matcher.matches()) {
|
||||
repl = matcher.group(2);
|
||||
}
|
||||
if (this.scriptText.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (repl != null && cursor > repl.length()) {
|
||||
String body = trimmedBuffer.substring(repl.length() + 1);
|
||||
cursor -= repl.length() + 1 + body.indexOf(body.trim());
|
||||
int countCharactersBeforeScript = buffer.indexOf(this.scriptText);
|
||||
if (countCharactersBeforeScript > 0) {
|
||||
cursor -= countCharactersBeforeScript;
|
||||
}
|
||||
|
||||
return cursor;
|
||||
|
|
@ -357,6 +351,7 @@ public class Paragraph extends Job implements Cloneable, JsonSerializable {
|
|||
setStatus(Job.Status.ERROR);
|
||||
throw intpException;
|
||||
}
|
||||
setStatus(Status.READY);
|
||||
if (getConfig().get("enabled") == null || (Boolean) getConfig().get("enabled")) {
|
||||
setAuthenticationInfo(getAuthenticationInfo());
|
||||
interpreter.getScheduler().submit(this);
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.notebook.FileSystemStorage;
|
||||
import org.apache.zeppelin.notebook.Note;
|
||||
import org.apache.zeppelin.notebook.NoteInfo;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
|
|
@ -37,108 +38,45 @@ import java.util.Map;
|
|||
public class FileSystemNotebookRepo implements NotebookRepo {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemNotebookRepo.class);
|
||||
|
||||
private Configuration hadoopConf;
|
||||
private ZeppelinConfiguration zConf;
|
||||
private boolean isSecurityEnabled = false;
|
||||
private FileSystem fs;
|
||||
private FileSystemStorage fs;
|
||||
private Path notebookDir;
|
||||
|
||||
public FileSystemNotebookRepo(ZeppelinConfiguration zConf) throws IOException {
|
||||
this.zConf = zConf;
|
||||
this.hadoopConf = new Configuration();
|
||||
this.fs = FileSystemStorage.get(zConf);
|
||||
this.notebookDir = this.fs.makeQualified(new Path(zConf.getNotebookDir()));
|
||||
LOGGER.info("Using folder {} to store notebook", notebookDir);
|
||||
this.fs.tryMkDir(notebookDir);
|
||||
|
||||
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||
if (isSecurityEnabled) {
|
||||
String keytab = zConf.getString(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB);
|
||||
String principal = zConf.getString(
|
||||
ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL);
|
||||
if (StringUtils.isBlank(keytab) || StringUtils.isBlank(principal)) {
|
||||
throw new IOException("keytab and principal can not be empty, keytab: " + keytab
|
||||
+ ", principal: " + principal);
|
||||
}
|
||||
UserGroupInformation.loginUserFromKeytab(principal, keytab);
|
||||
}
|
||||
|
||||
try {
|
||||
this.fs = FileSystem.get(new URI(zConf.getNotebookDir()), new Configuration());
|
||||
LOGGER.info("Creating FileSystem: " + this.fs.getClass().getCanonicalName());
|
||||
this.notebookDir = fs.makeQualified(new Path(zConf.getNotebookDir()));
|
||||
LOGGER.info("Using folder {} to store notebook", notebookDir);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
if (!fs.exists(notebookDir)) {
|
||||
fs.mkdirs(notebookDir);
|
||||
LOGGER.info("Create notebook dir {} in hdfs", notebookDir.toString());
|
||||
}
|
||||
if (fs.isFile(notebookDir)) {
|
||||
throw new IOException("notebookDir {} is file instead of directory, please remove it or " +
|
||||
"specify another directory");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<List<NoteInfo>>() {
|
||||
@Override
|
||||
public List<NoteInfo> call() throws IOException {
|
||||
List<NoteInfo> noteInfos = new ArrayList<>();
|
||||
for (FileStatus status : fs.globStatus(new Path(notebookDir, "*/note.json"))) {
|
||||
NoteInfo noteInfo = new NoteInfo(status.getPath().getParent().getName(), "", null);
|
||||
noteInfos.add(noteInfo);
|
||||
}
|
||||
return noteInfos;
|
||||
}
|
||||
});
|
||||
List<Path> notePaths = fs.list(new Path(notebookDir, "*/note.json"));
|
||||
List<NoteInfo> noteInfos = new ArrayList<>();
|
||||
for (Path path : notePaths) {
|
||||
NoteInfo noteInfo = new NoteInfo(path.getParent().getName(), "", null);
|
||||
noteInfos.add(noteInfo);
|
||||
}
|
||||
return noteInfos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Note get(final String noteId, AuthenticationInfo subject) throws IOException {
|
||||
return callHdfsOperation(new HdfsOperation<Note>() {
|
||||
@Override
|
||||
public Note call() throws IOException {
|
||||
Path notePath = new Path(notebookDir.toString() + "/" + noteId + "/note.json");
|
||||
LOGGER.debug("Read note from file: " + notePath);
|
||||
ByteArrayOutputStream noteBytes = new ByteArrayOutputStream();
|
||||
IOUtils.copyBytes(fs.open(notePath), noteBytes, hadoopConf);
|
||||
return Note.fromJson(new String(noteBytes.toString(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING))));
|
||||
}
|
||||
});
|
||||
String content = this.fs.readFile(
|
||||
new Path(notebookDir.toString() + "/" + noteId + "/note.json"));
|
||||
return Note.fromJson(content);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void save(final Note note, AuthenticationInfo subject) throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
Path notePath = new Path(notebookDir.toString() + "/" + note.getId() + "/note.json");
|
||||
Path tmpNotePath = new Path(notebookDir.toString() + "/" + note.getId() + "/.note.json");
|
||||
LOGGER.debug("Saving note to file: " + notePath);
|
||||
if (fs.exists(tmpNotePath)) {
|
||||
fs.delete(tmpNotePath, true);
|
||||
}
|
||||
InputStream in = new ByteArrayInputStream(note.toJson().getBytes(
|
||||
zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)));
|
||||
IOUtils.copyBytes(in, fs.create(tmpNotePath), hadoopConf);
|
||||
fs.delete(notePath, true);
|
||||
fs.rename(tmpNotePath, notePath);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
this.fs.writeFile(note.toJson(),
|
||||
new Path(notebookDir.toString() + "/" + note.getId() + "/note.json"),
|
||||
true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove(final String noteId, AuthenticationInfo subject) throws IOException {
|
||||
callHdfsOperation(new HdfsOperation<Void>() {
|
||||
@Override
|
||||
public Void call() throws IOException {
|
||||
Path noteFolder = new Path(notebookDir.toString() + "/" + noteId);
|
||||
fs.delete(noteFolder, true);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
this.fs.delete(new Path(notebookDir.toString() + "/" + noteId));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -182,26 +120,4 @@ public class FileSystemNotebookRepo implements NotebookRepo {
|
|||
public void updateSettings(Map<String, String> settings, AuthenticationInfo subject) {
|
||||
LOGGER.warn("updateSettings is not implemented for HdfsNotebookRepo");
|
||||
}
|
||||
|
||||
private interface HdfsOperation<T> {
|
||||
T call() throws IOException;
|
||||
}
|
||||
|
||||
public synchronized <T> T callHdfsOperation(final HdfsOperation<T> func) throws IOException {
|
||||
if (isSecurityEnabled) {
|
||||
UserGroupInformation.getLoginUser().reloginFromKeytab();
|
||||
try {
|
||||
return UserGroupInformation.getCurrentUser().doAs(new PrivilegedExceptionAction<T>() {
|
||||
@Override
|
||||
public T run() throws Exception {
|
||||
return func.call();
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
} else {
|
||||
return func.call();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
|
||||
/**
|
||||
* Utility class for creating instances via java reflection.
|
||||
*
|
||||
*/
|
||||
public class ReflectionUtils {
|
||||
|
||||
public static Class<?> getClazz(String className) throws IOException {
|
||||
Class clazz = null;
|
||||
try {
|
||||
clazz = Class.forName(className, true, Thread.currentThread().getContextClassLoader());
|
||||
} catch (ClassNotFoundException e) {
|
||||
throw new IOException("Unable to load class: " + className, e);
|
||||
}
|
||||
|
||||
return clazz;
|
||||
}
|
||||
|
||||
private static <T> T getNewInstance(Class<T> clazz) throws IOException {
|
||||
T instance;
|
||||
try {
|
||||
instance = clazz.newInstance();
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with 0 arguments: " + clazz.getName(), e);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
private static <T> T getNewInstance(Class<T> clazz,
|
||||
Class<?>[] parameterTypes,
|
||||
Object[] parameters)
|
||||
throws IOException {
|
||||
T instance;
|
||||
try {
|
||||
Constructor<T> constructor = clazz.getConstructor(parameterTypes);
|
||||
instance = constructor.newInstance(parameters);
|
||||
} catch (InstantiationException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with " + parameters.length + " arguments: " +
|
||||
clazz.getName(), e);
|
||||
} catch (IllegalAccessException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with " + parameters.length + " arguments: " +
|
||||
clazz.getName(), e);
|
||||
} catch (NoSuchMethodException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with " + parameters.length + " arguments: " +
|
||||
clazz.getName(), e);
|
||||
} catch (InvocationTargetException e) {
|
||||
throw new IOException(
|
||||
"Unable to instantiate class with " + parameters.length + " arguments: " +
|
||||
clazz.getName(), e);
|
||||
}
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static <T> T createClazzInstance(String className) throws IOException {
|
||||
Class<?> clazz = getClazz(className);
|
||||
@SuppressWarnings("unchecked")
|
||||
T instance = (T) getNewInstance(clazz);
|
||||
return instance;
|
||||
}
|
||||
|
||||
public static <T> T createClazzInstance(String className,
|
||||
Class<?>[] parameterTypes,
|
||||
Object[] parameters) throws IOException {
|
||||
Class<?> clazz = getClazz(className);
|
||||
T instance = (T) getNewInstance(clazz, parameterTypes, parameters);
|
||||
return instance;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -33,7 +33,7 @@ public abstract class AbstractInterpreterTest {
|
|||
protected File interpreterDir;
|
||||
protected File confDir;
|
||||
protected File notebookDir;
|
||||
protected ZeppelinConfiguration conf = new ZeppelinConfiguration();
|
||||
protected ZeppelinConfiguration conf;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,102 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.interpreter;
|
||||
|
||||
import com.sun.net.httpserver.Authenticator;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class ConfInterpreterTest extends AbstractInterpreterTest {
|
||||
|
||||
@Test
|
||||
public void testCorrectConf() throws IOException, InterpreterException {
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf") instanceof ConfInterpreter);
|
||||
ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf");
|
||||
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), new GUI(),
|
||||
null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
InterpreterResult result = confInterpreter.interpret("property_1\tnew_value\nnew_property\tdummy_value", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code);
|
||||
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test") instanceof RemoteInterpreter);
|
||||
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test");
|
||||
remoteInterpreter.interpret("hello world", context);
|
||||
assertEquals(7, remoteInterpreter.getProperties().size());
|
||||
assertEquals("new_value", remoteInterpreter.getProperty("property_1"));
|
||||
assertEquals("dummy_value", remoteInterpreter.getProperty("new_property"));
|
||||
assertEquals("value_3", remoteInterpreter.getProperty("property_3"));
|
||||
|
||||
// rerun the paragraph with the same properties would result in SUCCESS
|
||||
result = confInterpreter.interpret("property_1\tnew_value\nnew_property\tdummy_value", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code);
|
||||
|
||||
// run the paragraph with the same properties would result in ERROR
|
||||
result = confInterpreter.interpret("property_1\tnew_value_2\nnew_property\tdummy_value", context);
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyConf() throws IOException, InterpreterException {
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf") instanceof ConfInterpreter);
|
||||
ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf");
|
||||
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), new GUI(),
|
||||
null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
InterpreterResult result = confInterpreter.interpret("", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code);
|
||||
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test") instanceof RemoteInterpreter);
|
||||
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test");
|
||||
assertEquals(6, remoteInterpreter.getProperties().size());
|
||||
assertEquals("value_1", remoteInterpreter.getProperty("property_1"));
|
||||
assertEquals("value_3", remoteInterpreter.getProperty("property_3"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testRunningAfterOtherInterpreter() throws IOException, InterpreterException {
|
||||
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds());
|
||||
assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.conf") instanceof ConfInterpreter);
|
||||
ConfInterpreter confInterpreter = (ConfInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.conf");
|
||||
|
||||
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), new GUI(),
|
||||
null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test");
|
||||
InterpreterResult result = remoteInterpreter.interpret("hello world", context);
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code);
|
||||
|
||||
result = confInterpreter.interpret("property_1\tnew_value\nnew_property\tdummy_value", context);
|
||||
assertEquals(InterpreterResult.Code.ERROR, result.code);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -62,7 +62,7 @@ public class ManagedInterpreterGroupTest {
|
|||
|
||||
// create session_1
|
||||
List<Interpreter> interpreters = interpreterGroup.getOrCreateSession("user1", "session_1");
|
||||
assertEquals(2, interpreters.size());
|
||||
assertEquals(3, interpreters.size());
|
||||
assertEquals(EchoInterpreter.class.getName(), interpreters.get(0).getClassName());
|
||||
assertEquals(DoubleEchoInterpreter.class.getName(), interpreters.get(1).getClassName());
|
||||
assertEquals(1, interpreterGroup.getSessionNum());
|
||||
|
|
@ -73,7 +73,7 @@ public class ManagedInterpreterGroupTest {
|
|||
|
||||
// create session_2
|
||||
List<Interpreter> interpreters2 = interpreterGroup.getOrCreateSession("user1", "session_2");
|
||||
assertEquals(2, interpreters2.size());
|
||||
assertEquals(3, interpreters2.size());
|
||||
assertEquals(EchoInterpreter.class.getName(), interpreters2.get(0).getClassName());
|
||||
assertEquals(DoubleEchoInterpreter.class.getName(), interpreters2.get(1).getClassName());
|
||||
assertEquals(2, interpreterGroup.getSessionNum());
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -30,14 +31,14 @@ import static org.junit.Assert.assertTrue;
|
|||
public class ShellScriptLauncherTest {
|
||||
|
||||
@Test
|
||||
public void testLauncher() {
|
||||
public void testLauncher() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
ShellScriptLauncher launcher = new ShellScriptLauncher(zConf);
|
||||
ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("ENV_1", "VALUE_1");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "groupName", "name");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "groupName", "name");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.InterpreterOption;
|
|||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -30,9 +31,9 @@ import static org.junit.Assert.assertTrue;
|
|||
public class SparkInterpreterLauncherTest {
|
||||
|
||||
@Test
|
||||
public void testLocalMode() {
|
||||
public void testLocalMode() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -41,7 +42,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
@ -55,9 +56,9 @@ public class SparkInterpreterLauncherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClientMode_1() {
|
||||
public void testYarnClientMode_1() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -66,7 +67,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
@ -80,9 +81,9 @@ public class SparkInterpreterLauncherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClientMode_2() {
|
||||
public void testYarnClientMode_2() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -92,7 +93,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
@ -106,9 +107,9 @@ public class SparkInterpreterLauncherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClusterMode_1() {
|
||||
public void testYarnClusterMode_1() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -117,7 +118,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
@ -132,9 +133,9 @@ public class SparkInterpreterLauncherTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testYarnClusterMode_2() {
|
||||
public void testYarnClusterMode_2() throws IOException {
|
||||
ZeppelinConfiguration zConf = new ZeppelinConfiguration();
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf);
|
||||
SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null);
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty("SPARK_HOME", "/user/spark");
|
||||
properties.setProperty("property_1", "value_1");
|
||||
|
|
@ -144,7 +145,7 @@ public class SparkInterpreterLauncherTest {
|
|||
properties.setProperty("spark.jars", "jar_1");
|
||||
|
||||
InterpreterOption option = new InterpreterOption();
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "groupId", "spark", "spark");
|
||||
InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "intpGroupId", "groupId", "spark", "spark");
|
||||
InterpreterClient client = launcher.launch(context);
|
||||
assertTrue( client instanceof RemoteInterpreterManagedProcess);
|
||||
RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,92 @@
|
|||
package org.apache.zeppelin.interpreter.recovery;
|
||||
|
||||
import com.google.common.io.Files;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.display.GUI;
|
||||
import org.apache.zeppelin.interpreter.AbstractInterpreterTest;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterOption;
|
||||
import org.apache.zeppelin.interpreter.InterpreterSetting;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
|
||||
import org.apache.zeppelin.user.AuthenticationInfo;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class FileSystemRecoveryStorageTest extends AbstractInterpreterTest {
|
||||
|
||||
private File recoveryDir = null;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_STORAGE_CLASS.getVarName(),
|
||||
FileSystemRecoveryStorage.class.getName());
|
||||
recoveryDir = Files.createTempDir();
|
||||
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR.getVarName(), recoveryDir.getAbsolutePath());
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
FileUtils.deleteDirectory(recoveryDir);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSingleInterpreterProcess() throws InterpreterException, IOException {
|
||||
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
|
||||
interpreterSetting.getOption().setPerUser(InterpreterOption.SHARED);
|
||||
|
||||
Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
|
||||
RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
|
||||
InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
remoteInterpreter1.interpret("hello", context1);
|
||||
|
||||
assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
|
||||
interpreterSetting.close();
|
||||
assertEquals(0, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleInterpreterProcess() throws InterpreterException, IOException {
|
||||
InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test");
|
||||
interpreterSetting.getOption().setPerUser(InterpreterOption.ISOLATED);
|
||||
|
||||
Interpreter interpreter1 = interpreterSetting.getDefaultInterpreter("user1", "note1");
|
||||
RemoteInterpreter remoteInterpreter1 = (RemoteInterpreter) interpreter1;
|
||||
InterpreterContext context1 = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
remoteInterpreter1.interpret("hello", context1);
|
||||
assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
|
||||
Interpreter interpreter2 = interpreterSetting.getDefaultInterpreter("user2", "note2");
|
||||
RemoteInterpreter remoteInterpreter2 = (RemoteInterpreter) interpreter2;
|
||||
InterpreterContext context2 = new InterpreterContext("noteId", "paragraphId", "repl",
|
||||
"title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(),
|
||||
new GUI(), null, null, new ArrayList<InterpreterContextRunner>(), null);
|
||||
remoteInterpreter2.interpret("hello", context2);
|
||||
|
||||
assertEquals(2, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
|
||||
interpreterSettingManager.restart(interpreterSetting.getId(), "note1", "user1");
|
||||
assertEquals(1, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
|
||||
interpreterSetting.close();
|
||||
assertEquals(0, interpreterSettingManager.getRecoveryStorage().restore().size());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -271,7 +271,6 @@ public class ParagraphTest extends AbstractInterpreterTest {
|
|||
@Test
|
||||
public void testCursorPosition() {
|
||||
Paragraph paragraph = spy(new Paragraph());
|
||||
doReturn(null).when(paragraph).getIntpText();
|
||||
// left = buffer, middle = cursor position into source code, right = cursor position after parse
|
||||
List<Triple<String, Integer, Integer>> dataSet = Arrays.asList(
|
||||
Triple.of("%jdbc schema.", 13, 7),
|
||||
|
|
@ -294,7 +293,8 @@ public class ParagraphTest extends AbstractInterpreterTest {
|
|||
);
|
||||
|
||||
for (Triple<String, Integer, Integer> data : dataSet) {
|
||||
Integer actual = paragraph.calculateCursorPosition(data.getLeft(), data.getLeft().trim(), data.getMiddle());
|
||||
paragraph.setText(data.getLeft());
|
||||
Integer actual = paragraph.calculateCursorPosition(data.getLeft(), data.getMiddle());
|
||||
assertEquals(data.getRight(), actual);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue