mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
rebase
This commit is contained in:
parent
63dfa0f4ca
commit
c069074680
4 changed files with 127 additions and 53 deletions
BIN
docs/assets/themes/zeppelin/img/docs-img/shell-example.png
Normal file
BIN
docs/assets/themes/zeppelin/img/docs-img/shell-example.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 41 KiB |
20
docs/interpreter/shell.md
Normal file
20
docs/interpreter/shell.md
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
---
|
||||
layout: page
|
||||
title: "Shell Interpreter"
|
||||
description: "Shell Interpreter"
|
||||
group: manual
|
||||
---
|
||||
{% include JB/setup %}
|
||||
|
||||
## Shell interpreter for Apache Zeppelin
|
||||
|
||||
### Overview
|
||||
Shell interpreter uses [Apache Commons Exec](https://commons.apache.org/proper/commons-exec) to execute external processes.
|
||||
|
||||
In Zeppelin notebook, you can use ` %sh ` in the beginning of a paragraph to invoke system shell and run commands.
|
||||
Note: Currently each command runs as Zeppelin user.
|
||||
|
||||
### Example
|
||||
The following example demonstrates the basic usage of Shell in a Zeppelin notebook.
|
||||
|
||||
<img src="{{BASE_PATH}}/assets/themes/zeppelin/img/docs-img/shell-example.png" width="70%" />
|
||||
|
|
@ -19,12 +19,16 @@ package org.apache.zeppelin.shell;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.io.OutputStream;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
|
|
@ -33,7 +37,6 @@ import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
|||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -43,14 +46,22 @@ import org.slf4j.LoggerFactory;
|
|||
* Shell interpreter for Zeppelin.
|
||||
*/
|
||||
public class ShellInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
private static final String EXECUTOR_KEY = "executor";
|
||||
public static final String SHELL_COMMAND_TIMEOUT = "shell.command.timeout.millisecs";
|
||||
int commandTimeOut;
|
||||
private static final boolean isWindows = System
|
||||
.getProperty("os.name")
|
||||
.startsWith("Windows");
|
||||
final String shell = isWindows ? "cmd /c" : "bash -c";
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
private static final String SHELL_COMMAND_TIMEOUT = "shell.command.timeout.millisecs";
|
||||
private static final String DEFAULT_COMMAND_TIMEOUT = "600000";
|
||||
private final boolean isWindows = System.getProperty("os.name").startsWith("Windows");
|
||||
private final String shell = isWindows ? "cmd /c" : "bash -c";
|
||||
private Map<String, DefaultExecutor> executors;
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"sh", "sh",
|
||||
ShellInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add(SHELL_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT,
|
||||
"Shell command time out in millisecs. Default = " + DEFAULT_COMMAND_TIMEOUT).build()
|
||||
);
|
||||
}
|
||||
|
||||
public ShellInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
|
@ -58,9 +69,8 @@ public class ShellInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public void open() {
|
||||
logger.info("Command timeout is set as:", SHELL_COMMAND_TIMEOUT);
|
||||
|
||||
commandTimeOut = Integer.valueOf(getProperty(SHELL_COMMAND_TIMEOUT));
|
||||
LOGGER.info("Command timeout is set to: {}", DEFAULT_COMMAND_TIMEOUT);
|
||||
executors = new HashMap<String, DefaultExecutor>();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -69,7 +79,10 @@ public class ShellInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.debug("Run shell command '" + cmd + "'");
|
||||
LOGGER.debug("Run shell command '" + cmd + "'");
|
||||
OutputStream outStream = new ByteArrayOutputStream();
|
||||
OutputStream errStream = new ByteArrayOutputStream();
|
||||
|
||||
CommandLine cmdLine = CommandLine.parse(shell);
|
||||
// the Windows CMD shell doesn't handle multiline statements,
|
||||
// they need to be delimited by '&&' instead
|
||||
|
|
@ -78,62 +91,45 @@ public class ShellInterpreter extends Interpreter {
|
|||
cmd = StringUtils.join(lines, " && ");
|
||||
}
|
||||
cmdLine.addArgument(cmd, false);
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(contextInterpreter.out, errorStream));
|
||||
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
|
||||
|
||||
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
|
||||
Map<String, Object> info = runningJob.info();
|
||||
info.put(EXECUTOR_KEY, executor);
|
||||
try {
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outStream, errStream));
|
||||
executor.setWatchdog(new ExecuteWatchdog(Long.valueOf(getProperty(SHELL_COMMAND_TIMEOUT))));
|
||||
executors.put(contextInterpreter.getParagraphId(), executor);
|
||||
int exitVal = executor.execute(cmdLine);
|
||||
logger.info("Paragraph " + contextInterpreter.getParagraphId()
|
||||
+ "return with exit value: " + exitVal);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, null);
|
||||
LOGGER.info("Paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " return with exit value: " + exitVal);
|
||||
return new InterpreterResult(Code.SUCCESS, outStream.toString());
|
||||
} catch (ExecuteException e) {
|
||||
int exitValue = e.getExitValue();
|
||||
logger.error("Can not run " + cmd, e);
|
||||
LOGGER.error("Can not run " + cmd, e);
|
||||
Code code = Code.ERROR;
|
||||
String msg = errorStream.toString();
|
||||
String message = errStream.toString();
|
||||
if (exitValue == 143) {
|
||||
code = Code.INCOMPLETE;
|
||||
msg = msg + "Paragraph received a SIGTERM.\n";
|
||||
logger.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " stopped executing: " + msg);
|
||||
message += "Paragraph received a SIGTERM.\n";
|
||||
LOGGER.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " stopped executing: " + message);
|
||||
}
|
||||
msg += "ExitValue: " + exitValue;
|
||||
return new InterpreterResult(code, msg);
|
||||
message += "ExitValue: " + exitValue;
|
||||
return new InterpreterResult(code, message);
|
||||
} catch (IOException e) {
|
||||
logger.error("Can not run " + cmd, e);
|
||||
LOGGER.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private Job getRunningJob(String paragraphId) {
|
||||
Job foundJob = null;
|
||||
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
|
||||
for (Job job : jobsRunning) {
|
||||
if (job.getId().equals(paragraphId)) {
|
||||
foundJob = job;
|
||||
}
|
||||
}
|
||||
return foundJob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
if (runningJob != null) {
|
||||
Map<String, Object> info = runningJob.info();
|
||||
Object object = info.get(EXECUTOR_KEY);
|
||||
if (object != null) {
|
||||
Executor executor = (Executor) object;
|
||||
ExecuteWatchdog watchdog = executor.getWatchdog();
|
||||
watchdog.destroyProcess();
|
||||
for (String paragraphId : executors.keySet()) {
|
||||
if (paragraphId.equals(context.getParagraphId())) {
|
||||
DefaultExecutor executor = executors.get(paragraphId);
|
||||
executor.getWatchdog().destroyProcess();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.shell;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class ShellInterpreterTest {
|
||||
|
||||
private ShellInterpreter shell;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
shell = new ShellInterpreter(new Properties());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
shell.open();
|
||||
InterpreterContext context = new InterpreterContext("", "1", "", "", null, null, null, null, null, null, null);
|
||||
InterpreterResult result = new InterpreterResult(Code.ERROR);
|
||||
if (System.getProperty("os.name").startsWith("Windows")) {
|
||||
result = shell.interpret("dir", context);
|
||||
} else {
|
||||
result = shell.interpret("ls", context);
|
||||
}
|
||||
// System.out.println(result.message());
|
||||
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue