mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' of https://github.com/apache/incubator-zeppelin into ZEPPELIN-630
Conflicts: zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
This commit is contained in:
commit
62a75c938e
17 changed files with 1203 additions and 18 deletions
|
|
@ -111,7 +111,7 @@
|
|||
|
||||
<property>
|
||||
<name>zeppelin.interpreters</name>
|
||||
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter</value>
|
||||
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.tachyon.TachyonInterpreter</value>
|
||||
<description>Comma separated interpreter configurations. First interpreter become a default</description>
|
||||
</property>
|
||||
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@
|
|||
<li><a href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li>
|
||||
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Shell</a></li>
|
||||
<li><a href="{{BASE_PATH}}/interpreter/spark.html">Spark</a></li>
|
||||
<li><a href="{{BASE_PATH}}/interpreter/tachyon.html">Tachyon</a></li>
|
||||
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Tajo</a></li>
|
||||
<li role="separator" class="divider"></li>
|
||||
<li><a href="{{BASE_PATH}}/manual/dynamicinterpreterload.html">Dynamic Interpreter Loading</a></li>
|
||||
|
|
|
|||
BIN
docs/assets/themes/zeppelin/img/docs-img/tachyon-example.png
Normal file
BIN
docs/assets/themes/zeppelin/img/docs-img/tachyon-example.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 43 KiB |
216
docs/interpreter/tachyon.md
Normal file
216
docs/interpreter/tachyon.md
Normal file
|
|
@ -0,0 +1,216 @@
|
|||
---
|
||||
layout: page
|
||||
title: "Tachyon Interpreter"
|
||||
description: "Tachyon Interpreter"
|
||||
group: manual
|
||||
---
|
||||
{% include JB/setup %}
|
||||
|
||||
## Tachyon Interpreter for Apache Zeppelin
|
||||
[Tachyon](http://tachyon-project.org/) is a memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster frameworks.
|
||||
|
||||
## Configuration
|
||||
<table class="table-configuration">
|
||||
<tr>
|
||||
<th>Name</th>
|
||||
<th>Class</th>
|
||||
<th>Description</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>tachyon.master.hostname</td>
|
||||
<td>localhost</td>
|
||||
<td>Tachyon master hostname</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>tachyon.master.port</td>
|
||||
<td>19998</td>
|
||||
<td>Tachyon master port</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
## Enabling Tachyon Interpreter
|
||||
In a notebook, to enable the **Tachyon** interpreter, click on the **Gear** icon and select **Tachyon**.
|
||||
|
||||
## Using the Tachyon Interpreter
|
||||
In a paragraph, use `%tachyon` to select the **Tachyon** interpreter and then input all commands.
|
||||
|
||||
```bash
|
||||
%tachyon
|
||||
help
|
||||
```
|
||||
|
||||
> **Tip :** Use ( Ctrl + . ) for autocompletion.
|
||||
|
||||
## Interpreter Commands
|
||||
The **Tachyon** interpreter accepts the following commands.
|
||||
|
||||
<center>
|
||||
<table class="table-configuration">
|
||||
<tr>
|
||||
<th>Operation</th>
|
||||
<th>Syntax</th>
|
||||
<th>Description</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>copyFromLocal</td>
|
||||
<td>copyFromLocal "source path" "remote path"</td>
|
||||
<td>Copy the specified file specified by "source path" to the path specified by "remote path".
|
||||
This command will fail if "remote path" already exists.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>copyToLocal</td>
|
||||
<td>copyToLocal "remote path" "local path"</td>
|
||||
<td>Copy the specified file from the path specified by "remote source" to a local
|
||||
destination.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>count</td>
|
||||
<td>count "path"</td>
|
||||
<td>Display the number of folders and files matching the specified prefix in "path".</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>du</td>
|
||||
<td>du "path"</td>
|
||||
<td>Display the size of a file or a directory specified by the input path.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>fileinfo</td>
|
||||
<td>fileinfo "path"</td>
|
||||
<td>Print the information of the blocks of a specified file.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>free</td>
|
||||
<td>free "path"</td>
|
||||
<td>Free a file or all files under a directory from Tachyon. If the file/directory is also
|
||||
in under storage, it will still be available there.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>getCapacityBytes</td>
|
||||
<td>getCapacityBytes</td>
|
||||
<td>Get the capacity of the TachyonFS.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>getUsedBytes</td>
|
||||
<td>getUsedBytes</td>
|
||||
<td>Get number of bytes used in the TachyonFS.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>load</td>
|
||||
<td>load "path"</td>
|
||||
<td>Load the data of a file or a directory from under storage into Tachyon.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>loadMetadata</td>
|
||||
<td>loadMetadata "path"</td>
|
||||
<td>Load the metadata of a file or a directory from under storage into Tachyon.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>location</td>
|
||||
<td>location "path"</td>
|
||||
<td>Display a list of hosts that have the file data.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>ls</td>
|
||||
<td>ls "path"</td>
|
||||
<td>List all the files and directories directly under the given path with information such as
|
||||
size.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>lsr</td>
|
||||
<td>lsr "path"</td>
|
||||
<td>Recursively list all the files and directories under the given path with information such
|
||||
as size.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>mkdir</td>
|
||||
<td>mkdir "path"</td>
|
||||
<td>Create a directory under the given path, along with any necessary parent directories. This
|
||||
command will fail if the given path already exists.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>mount</td>
|
||||
<td>mount "path" "uri"</td>
|
||||
<td>Mount the underlying file system path "uri" into the Tachyon namespace as "path". The "path"
|
||||
is assumed not to exist and is created by the operation. No data or metadata is loaded from under
|
||||
storage into Tachyon. After a path is mounted, operations on objects under the mounted path are
|
||||
mirror to the mounted under storage.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>mv</td>
|
||||
<td>mv "source" "destination"</td>
|
||||
<td>Move a file or directory specified by "source" to a new location "destination". This command
|
||||
will fail if "destination" already exists.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>pin</td>
|
||||
<td>pin "path"</td>
|
||||
<td>Pin the given file to avoid evicting it from memory. If the given path is a directory, it
|
||||
recursively pins all the files contained and any new files created within this directory.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>report</td>
|
||||
<td>report "path"</td>
|
||||
<td>Report to the master that a file is lost.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>request</td>
|
||||
<td>request "path" "dependency ID"</td>
|
||||
<td>Request the file for a given dependency ID.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>rm</td>
|
||||
<td>rm "path"</td>
|
||||
<td>Remove a file. This command will fail if the given path is a directory rather than a
|
||||
file.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>rmr</td>
|
||||
<td>rmr "path"</td>
|
||||
<td>Remove a file, or a directory with all the files and sub-directories that this directory
|
||||
contains.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>tail</td>
|
||||
<td>tail "path"</td>
|
||||
<td>Print the last 1KB of the specified file to the console.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>touch</td>
|
||||
<td>touch "path"</td>
|
||||
<td>Create a 0-byte file at the specified location.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>unmount</td>
|
||||
<td>unmount "path"</td>
|
||||
<td>Unmount the underlying file system path mounted in the Tachyon namespace as "path". Tachyon
|
||||
objects under "path" are removed from Tachyon, but they still exist in the previously mounted
|
||||
under storage.</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>unpin</td>
|
||||
<td>unpin "path"</td>
|
||||
<td>Unpin the given file to allow Tachyon to evict this file again. If the given path is a
|
||||
directory, it recursively unpins all files contained and any new files created within this
|
||||
directory.</td>
|
||||
</tr>
|
||||
</table>
|
||||
</center>
|
||||
|
||||
## How to test it's working
|
||||
Be sure to have configured correctly the Tachyon interpreter, then open a new paragraph and type one of the above commands.
|
||||
|
||||
Below a simple example to show how to interact with Tachyon interpreter.
|
||||
Following steps are performed:
|
||||
|
||||
* using sh interpreter a new text file is created on local machine
|
||||
* using Tachyon interpreter:
|
||||
* is listed the content of the tfs (Tachyon File System) root
|
||||
* the file previously created is copied to tfs
|
||||
* is listed again the content of the tfs root to check the existence of the new copied file
|
||||
* is showed the content of the copied file (using the tail command)
|
||||
* the file previously copied to tfs is copied to local machine
|
||||
* using sh interpreter it's checked the existence of the new file copied from Tachyon and its content is showed
|
||||
|
||||
<center>
|
||||

|
||||
</center>
|
||||
|
|
@ -35,6 +35,7 @@
|
|||
{
|
||||
"title": "Load data into table",
|
||||
"text": "import org.apache.commons.io.IOUtils\nimport java.net.URL\nimport java.nio.charset.Charset\n\n// Zeppelin creates and injects sc (SparkContext) and sqlContext (HiveContext or SqlContext)\n// So you don\u0027t need create them manually\n\n// load bank data\nval bankText \u003d sc.parallelize(\n IOUtils.toString(\n new URL(\"https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv\"),\n Charset.forName(\"utf8\")).split(\"\\n\"))\n\ncase class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)\n\nval bank \u003d bankText.map(s \u003d\u003e s.split(\";\")).filter(s \u003d\u003e s(0) !\u003d \"\\\"age\\\"\").map(\n s \u003d\u003e Bank(s(0).toInt, \n s(1).replaceAll(\"\\\"\", \"\"),\n s(2).replaceAll(\"\\\"\", \"\"),\n s(3).replaceAll(\"\\\"\", \"\"),\n s(5).replaceAll(\"\\\"\", \"\").toInt\n )\n).toDF()\nbank.registerTempTable(\"bank\")",
|
||||
"dateUpdated": "Jan 14, 2016 7:58:56 PM",
|
||||
"config": {
|
||||
"colWidth": 12.0,
|
||||
"graph": {
|
||||
|
|
@ -46,7 +47,9 @@
|
|||
"groups": [],
|
||||
"scatter": {}
|
||||
},
|
||||
"title": true
|
||||
"title": true,
|
||||
"enabled": true,
|
||||
"editorMode": "ace/mode/scala"
|
||||
},
|
||||
"settings": {
|
||||
"params": {},
|
||||
|
|
@ -333,7 +336,10 @@
|
|||
],
|
||||
"name": "Zeppelin Tutorial",
|
||||
"id": "2A94M5J1Z",
|
||||
"angularObjects": {},
|
||||
"angularObjects": {
|
||||
"2B6FF8NNU": [],
|
||||
"2B67PH63Z": []
|
||||
},
|
||||
"config": {
|
||||
"looknfeel": "default"
|
||||
},
|
||||
|
|
|
|||
1
pom.xml
1
pom.xml
|
|
@ -102,6 +102,7 @@
|
|||
<module>lens</module>
|
||||
<module>cassandra</module>
|
||||
<module>elasticsearch</module>
|
||||
<module>tachyon</module>
|
||||
<module>zeppelin-web</module>
|
||||
<module>zeppelin-server</module>
|
||||
<module>zeppelin-distribution</module>
|
||||
|
|
|
|||
|
|
@ -19,18 +19,22 @@ package org.apache.zeppelin.shell;
|
|||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.commons.exec.CommandLine;
|
||||
import org.apache.commons.exec.DefaultExecutor;
|
||||
import org.apache.commons.exec.ExecuteException;
|
||||
import org.apache.commons.exec.ExecuteWatchdog;
|
||||
import org.apache.commons.exec.Executor;
|
||||
import org.apache.commons.exec.PumpStreamHandler;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.scheduler.Job;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -41,6 +45,7 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public class ShellInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
|
||||
private static final String EXECUTOR_KEY = "executor";
|
||||
int commandTimeOut = 600000;
|
||||
|
||||
static {
|
||||
|
|
@ -61,30 +66,66 @@ public class ShellInterpreter extends Interpreter {
|
|||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.debug("Run shell command '" + cmd + "'");
|
||||
long start = System.currentTimeMillis();
|
||||
CommandLine cmdLine = CommandLine.parse("bash");
|
||||
cmdLine.addArgument("-c", false);
|
||||
cmdLine.addArgument(cmd, false);
|
||||
DefaultExecutor executor = new DefaultExecutor();
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outputStream));
|
||||
|
||||
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
|
||||
executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
|
||||
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
|
||||
|
||||
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
|
||||
Map<String, Object> info = runningJob.info();
|
||||
info.put(EXECUTOR_KEY, executor);
|
||||
try {
|
||||
int exitValue = executor.execute(cmdLine);
|
||||
int exitVal = executor.execute(cmdLine);
|
||||
logger.info("Paragraph " + contextInterpreter.getParagraphId()
|
||||
+ "return with exit value: " + exitVal);
|
||||
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
|
||||
} catch (ExecuteException e) {
|
||||
int exitValue = e.getExitValue();
|
||||
logger.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
Code code = Code.ERROR;
|
||||
String msg = errorStream.toString();
|
||||
if (exitValue == 143) {
|
||||
code = Code.INCOMPLETE;
|
||||
msg = msg + "Paragraph received a SIGTERM.\n";
|
||||
logger.info("The paragraph " + contextInterpreter.getParagraphId()
|
||||
+ " stopped executing: " + msg);
|
||||
}
|
||||
msg += "Exitvalue: " + exitValue;
|
||||
return new InterpreterResult(code, msg);
|
||||
} catch (IOException e) {
|
||||
logger.error("Can not run " + cmd, e);
|
||||
return new InterpreterResult(Code.ERROR, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {}
|
||||
private Job getRunningJob(String paragraphId) {
|
||||
Job foundJob = null;
|
||||
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
|
||||
for (Job job : jobsRunning) {
|
||||
if (job.getId().equals(paragraphId)) {
|
||||
foundJob = job;
|
||||
}
|
||||
}
|
||||
return foundJob;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
Job runningJob = getRunningJob(context.getParagraphId());
|
||||
if (runningJob != null) {
|
||||
Map<String, Object> info = runningJob.info();
|
||||
Object object = info.get(EXECUTOR_KEY);
|
||||
if (object != null) {
|
||||
Executor executor = (Executor) object;
|
||||
ExecuteWatchdog watchdog = executor.getWatchdog();
|
||||
watchdog.destroyProcess();
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
|
|
@ -97,8 +138,8 @@ public class ShellInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
ShellInterpreter.class.getName() + this.hashCode());
|
||||
return SchedulerFactory.singleton().createOrGetParallelScheduler(
|
||||
ShellInterpreter.class.getName() + this.hashCode(), 10);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
164
tachyon/pom.xml
Normal file
164
tachyon/pom.xml
Normal file
|
|
@ -0,0 +1,164 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<relativePath>..</relativePath>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-tachyon</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<name>Zeppelin: Tachyon interpreter</name>
|
||||
<url>http://www.apache.org</url>
|
||||
|
||||
<properties>
|
||||
<tachyon.version>0.8.2</tachyon.version>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.tachyonproject</groupId>
|
||||
<artifactId>tachyon-shell</artifactId>
|
||||
<version>${tachyon.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- TEST -->
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.tachyonproject</groupId>
|
||||
<artifactId>tachyon-servers</artifactId>
|
||||
<version>${tachyon.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.tachyonproject</groupId>
|
||||
<artifactId>tachyon-minicluster</artifactId>
|
||||
<version>${tachyon.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.tachyonproject</groupId>
|
||||
<artifactId>tachyon-underfs-local</artifactId>
|
||||
<version>${tachyon.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/tachyon</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/tachyon</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
|
|
@ -0,0 +1,251 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.tachyon;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.util.*;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import tachyon.conf.TachyonConf;
|
||||
import tachyon.shell.TfsShell;
|
||||
|
||||
/**
|
||||
* Tachyon interpreter for Zeppelin.
|
||||
*/
|
||||
public class TachyonInterpreter extends Interpreter {
|
||||
|
||||
Logger logger = LoggerFactory.getLogger(TachyonInterpreter.class);
|
||||
|
||||
protected static final String TACHYON_MASTER_HOSTNAME = "tachyon.master.hostname";
|
||||
protected static final String TACHYON_MASTER_PORT = "tachyon.master.port";
|
||||
|
||||
private TfsShell tfs;
|
||||
|
||||
private int totalCommands = 0;
|
||||
private int completedCommands = 0;
|
||||
|
||||
private final String tachyonMasterHostname;
|
||||
private final String tachyonMasterPort;
|
||||
|
||||
protected final List<String> keywords = Arrays.asList("cat", "copyFromLocal",
|
||||
"copyToLocal", "count", "du", "fileinfo", "free", "getUsedBytes",
|
||||
"getCapacityBytes", "load", "loadMetadata", "location", "ls", "lsr",
|
||||
"mkdir", "mount", "mv", "pin", "report", "request", "rm", "rmr",
|
||||
"setTTL", "unsetTTL", "tail", "touch", "unmount", "unpin");
|
||||
|
||||
public TachyonInterpreter(Properties property) {
|
||||
super(property);
|
||||
|
||||
tachyonMasterHostname = property.getProperty(TACHYON_MASTER_HOSTNAME);
|
||||
tachyonMasterPort = property.getProperty(TACHYON_MASTER_PORT);
|
||||
}
|
||||
|
||||
static {
|
||||
Interpreter.register("tachyon", "tachyon",
|
||||
TachyonInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add(TACHYON_MASTER_HOSTNAME, "localhost", "Tachyon master hostname")
|
||||
.add(TACHYON_MASTER_PORT, "19998", "Tachyon master port")
|
||||
.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
logger.info("Starting Tachyon shell to connect to " + tachyonMasterHostname +
|
||||
" on port " + tachyonMasterPort);
|
||||
|
||||
System.setProperty(TACHYON_MASTER_HOSTNAME, tachyonMasterHostname);
|
||||
System.setProperty(TACHYON_MASTER_PORT, tachyonMasterPort);
|
||||
tfs = new TfsShell(new TachyonConf());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
logger.info("Closing Tachyon shell");
|
||||
|
||||
try {
|
||||
tfs.close();
|
||||
} catch (IOException e) {
|
||||
logger.error("Cannot close connection", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String st, InterpreterContext context) {
|
||||
String[] lines = splitAndRemoveEmpty(st, "\n");
|
||||
return interpret(lines, context);
|
||||
}
|
||||
|
||||
private InterpreterResult interpret(String[] commands, InterpreterContext context) {
|
||||
boolean isSuccess = true;
|
||||
totalCommands = commands.length;
|
||||
completedCommands = 0;
|
||||
|
||||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
PrintStream ps = new PrintStream(baos);
|
||||
PrintStream old = System.out;
|
||||
|
||||
System.setOut(ps);
|
||||
|
||||
for (String command : commands) {
|
||||
int commandResuld = 1;
|
||||
String[] args = splitAndRemoveEmpty(command, " ");
|
||||
if (args.length > 0 && args[0].equals("help")) {
|
||||
System.out.println(getCommandList());
|
||||
} else {
|
||||
commandResuld = tfs.run(args);
|
||||
}
|
||||
if (commandResuld != 0) {
|
||||
isSuccess = false;
|
||||
break;
|
||||
} else {
|
||||
completedCommands += 1;
|
||||
}
|
||||
System.out.println();
|
||||
}
|
||||
|
||||
System.out.flush();
|
||||
System.setOut(old);
|
||||
|
||||
if (isSuccess) {
|
||||
return new InterpreterResult(Code.SUCCESS, baos.toString());
|
||||
} else {
|
||||
return new InterpreterResult(Code.ERROR, baos.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private String[] splitAndRemoveEmpty(String st, String splitSeparator) {
|
||||
String[] voices = st.split(splitSeparator);
|
||||
ArrayList<String> result = new ArrayList<String>();
|
||||
for (String voice : voices) {
|
||||
if (!voice.trim().isEmpty()) {
|
||||
result.add(voice);
|
||||
}
|
||||
}
|
||||
return result.toArray(new String[result.size()]);
|
||||
}
|
||||
|
||||
private String[] splitAndRemoveEmpty(String[] sts, String splitSeparator) {
|
||||
ArrayList<String> result = new ArrayList<String>();
|
||||
for (String st : sts) {
|
||||
result.addAll(Arrays.asList(splitAndRemoveEmpty(st, splitSeparator)));
|
||||
}
|
||||
return result.toArray(new String[result.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) { }
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.NATIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return completedCommands * 100 / totalCommands;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String buf, int cursor) {
|
||||
String[] words = splitAndRemoveEmpty(splitAndRemoveEmpty(buf, "\n"), " ");
|
||||
String lastWord = "";
|
||||
if (words.length > 0) {
|
||||
lastWord = words[ words.length - 1 ];
|
||||
}
|
||||
ArrayList<String> voices = new ArrayList<String>();
|
||||
for (String command : keywords) {
|
||||
if (command.startsWith(lastWord)) {
|
||||
voices.add(command);
|
||||
}
|
||||
}
|
||||
return voices;
|
||||
}
|
||||
|
||||
private String getCommandList() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Commands list:");
|
||||
sb.append("\n\t[help] - List all available commands.");
|
||||
sb.append("\n\t[cat <path>] - Print the content of the file to the console.");
|
||||
sb.append("\n\t[copyFromLocal <src> <remoteDst>] - Copy the specified file specified " +
|
||||
"by \"source path\" to the path specified by \"remote path\". " +
|
||||
"This command will fail if \"remote path\" already exists.");
|
||||
sb.append("\n\t[copyToLocal <src> <localDst>] - Copy the specified file from the path " +
|
||||
"specified by \"remote source\" to a local destination.");
|
||||
sb.append("\n\t[count <path>] - Display the number of folders and files matching " +
|
||||
"the specified prefix in \"path\".");
|
||||
sb.append("\n\t[du <path>] - Display the size of a file or a directory specified " +
|
||||
"by the input path.");
|
||||
sb.append("\n\t[fileinfo <path>] - Print the information of the blocks of a specified file.");
|
||||
sb.append("\n\t[free <file path|folder path>] - Free a file or all files under a " +
|
||||
"directory from Tachyon. If the file/directory is also in under storage, " +
|
||||
"it will still be available there.");
|
||||
sb.append("\n\t[getUsedBytes] - Get number of bytes used in the TachyonFS.");
|
||||
sb.append("\n\t[getCapacityBytes] - Get the capacity of the TachyonFS.");
|
||||
sb.append("\n\t[load <path>] - Load the data of a file or a directory from under " +
|
||||
"storage into Tachyon.");
|
||||
sb.append("\n\t[loadMetadata <path>] - Load the metadata of a file or a directory " +
|
||||
"from under storage into Tachyon.");
|
||||
sb.append("\n\t[location <path>] - Display a list of hosts that have the file data.");
|
||||
sb.append("\n\t[ls <path>] - List all the files and directories directly under the " +
|
||||
"given path with information such as size.");
|
||||
sb.append("\n\t[lsr <path>] - Recursively list all the files and directories under " +
|
||||
"the given path with information such as size.");
|
||||
sb.append("\n\t[mkdir <path>] - Create a directory under the given path, along with " +
|
||||
"any necessary parent directories. This command will fail if the given " +
|
||||
"path already exists.");
|
||||
sb.append("\n\t[mount <tachyonPath> <ufsURI>] - Mount the underlying file system " +
|
||||
"path \"uri\" into the Tachyon namespace as \"path\". The \"path\" is assumed " +
|
||||
"not to exist and is created by the operation. No data or metadata is loaded " +
|
||||
"from under storage into Tachyon. After a path is mounted, operations on objects " +
|
||||
"under the mounted path are mirror to the mounted under storage.");
|
||||
sb.append("\n\t[mv <src> <dst>] - Move a file or directory specified by \"source\" " +
|
||||
"to a new location \"destination\". This command will fail if " +
|
||||
"\"destination\" already exists.");
|
||||
sb.append("\n\t[pin <path>] - Pin the given file to avoid evicting it from memory. " +
|
||||
"If the given path is a directory, it recursively pins all the files contained " +
|
||||
"and any new files created within this directory.");
|
||||
sb.append("\n\t[report <path>] - Report to the master that a file is lost.");
|
||||
sb.append("\n\t[request <tachyonaddress> <dependencyId>] - Request the file for " +
|
||||
"a given dependency ID.");
|
||||
sb.append("\n\t[rm <path>] - Remove a file. This command will fail if the given " +
|
||||
"path is a directory rather than a file.");
|
||||
sb.append("\n\t[rmr <path>] - Remove a file, or a directory with all the files and " +
|
||||
"sub-directories that this directory contains.");
|
||||
sb.append("\n\t[tail <path>] - Print the last 1KB of the specified file to the console.");
|
||||
sb.append("\n\t[touch <path>] - Create a 0-byte file at the specified location.");
|
||||
sb.append("\n\t[unmount <tachyonPath>] - Unmount the underlying file system path " +
|
||||
"mounted in the Tachyon namespace as \"path\". Tachyon objects under \"path\" " +
|
||||
"are removed from Tachyon, but they still exist in the previously " +
|
||||
"mounted under storage.");
|
||||
sb.append("\n\t[unpin <path>] - Unpin the given file to allow Tachyon to evict " +
|
||||
"this file again. If the given path is a directory, it recursively unpins " +
|
||||
"all files contained and any new files created within this directory.");
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,484 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.tachyon;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.junit.*;
|
||||
|
||||
import tachyon.Constants;
|
||||
import tachyon.TachyonURI;
|
||||
import tachyon.client.TachyonFSTestUtils;
|
||||
import tachyon.client.TachyonStorageType;
|
||||
import tachyon.client.UnderStorageType;
|
||||
import tachyon.client.file.FileInStream;
|
||||
import tachyon.client.file.TachyonFile;
|
||||
import tachyon.client.file.TachyonFileSystem;
|
||||
import tachyon.client.file.options.InStreamOptions;
|
||||
import tachyon.conf.TachyonConf;
|
||||
import tachyon.exception.ExceptionMessage;
|
||||
import tachyon.exception.TachyonException;
|
||||
import tachyon.master.LocalTachyonCluster;
|
||||
import tachyon.shell.TfsShell;
|
||||
import tachyon.thrift.FileInfo;
|
||||
import tachyon.util.FormatUtils;
|
||||
import tachyon.util.io.BufferUtils;
|
||||
import tachyon.util.io.PathUtils;
|
||||
|
||||
|
||||
public class TachyonInterpreterTest {
|
||||
private TachyonInterpreter tachyonInterpreter;
|
||||
private static final int SIZE_BYTES = Constants.MB * 10;
|
||||
private LocalTachyonCluster mLocalTachyonCluster = null;
|
||||
private TachyonFileSystem mTfs = null;
|
||||
|
||||
@After
|
||||
public final void after() throws Exception {
|
||||
if (tachyonInterpreter != null) {
|
||||
tachyonInterpreter.close();
|
||||
}
|
||||
mLocalTachyonCluster.stop();
|
||||
}
|
||||
|
||||
@Before
|
||||
public final void before() throws Exception {
|
||||
mLocalTachyonCluster = new LocalTachyonCluster(SIZE_BYTES, 1000, Constants.GB);
|
||||
mLocalTachyonCluster.start();
|
||||
mTfs = mLocalTachyonCluster.getClient();
|
||||
|
||||
final Properties props = new Properties();
|
||||
props.put(TachyonInterpreter.TACHYON_MASTER_HOSTNAME, mLocalTachyonCluster.getMasterHostname());
|
||||
props.put(TachyonInterpreter.TACHYON_MASTER_PORT, mLocalTachyonCluster.getMasterPort() + "");
|
||||
tachyonInterpreter = new TachyonInterpreter(props);
|
||||
tachyonInterpreter.open();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCompletion() {
|
||||
List<String> expectedResultOne = Arrays.asList("cat", "copyFromLocal",
|
||||
"copyToLocal", "count");
|
||||
List<String> expectedResultTwo = Arrays.asList("copyFromLocal",
|
||||
"copyToLocal", "count");
|
||||
List<String> expectedResultThree = Arrays.asList("copyFromLocal", "copyToLocal");
|
||||
List<String> expectedResultNone = new ArrayList<String>();
|
||||
|
||||
List<String> resultOne = tachyonInterpreter.completion("c", 0);
|
||||
List<String> resultTwo = tachyonInterpreter.completion("co", 0);
|
||||
List<String> resultThree = tachyonInterpreter.completion("copy", 0);
|
||||
List<String> resultNotMatch = tachyonInterpreter.completion("notMatch", 0);
|
||||
List<String> resultAll = tachyonInterpreter.completion("", 0);
|
||||
|
||||
Assert.assertEquals(expectedResultOne, resultOne);
|
||||
Assert.assertEquals(expectedResultTwo, resultTwo);
|
||||
Assert.assertEquals(expectedResultThree, resultThree);
|
||||
Assert.assertEquals(expectedResultNone, resultNotMatch);
|
||||
Assert.assertEquals(tachyonInterpreter.keywords, resultAll);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void catDirectoryTest() throws IOException {
|
||||
String expected = "Successfully created directory /testDir\n\n" +
|
||||
"/testDir is not a file.\n";
|
||||
|
||||
InterpreterResult output = tachyonInterpreter.interpret("mkdir /testDir" +
|
||||
"\ncat /testDir", null);
|
||||
|
||||
Assert.assertEquals(Code.ERROR, output.code());
|
||||
Assert.assertEquals(expected, output.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void catNotExistTest() throws IOException {
|
||||
InterpreterResult output = tachyonInterpreter.interpret("cat /testFile", null);
|
||||
Assert.assertEquals(Code.ERROR, output.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void catTest() throws IOException {
|
||||
TachyonFSTestUtils.createByteFile(mTfs, "/testFile", TachyonStorageType.STORE,
|
||||
UnderStorageType.NO_PERSIST, 10);
|
||||
InterpreterResult output = tachyonInterpreter.interpret("cat /testFile", null);
|
||||
|
||||
byte[] expected = BufferUtils.getIncreasingByteArray(10);
|
||||
|
||||
Assert.assertEquals(Code.SUCCESS, output.code());
|
||||
Assert.assertArrayEquals(expected,
|
||||
output.message().substring(0, output.message().length() - 1).getBytes());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void copyFromLocalLargeTest() throws IOException, TachyonException {
|
||||
File testFile = new File(mLocalTachyonCluster.getTachyonHome() + "/testFile");
|
||||
testFile.createNewFile();
|
||||
FileOutputStream fos = new FileOutputStream(testFile);
|
||||
byte[] toWrite = BufferUtils.getIncreasingByteArray(SIZE_BYTES);
|
||||
fos.write(toWrite);
|
||||
fos.close();
|
||||
|
||||
InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
|
||||
testFile.getAbsolutePath() + " /testFile", null);
|
||||
Assert.assertEquals(
|
||||
"Copied " + testFile.getAbsolutePath() + " to /testFile\n\n",
|
||||
output.message());
|
||||
|
||||
TachyonFile tFile = mTfs.open(new TachyonURI("/testFile"));
|
||||
FileInfo fileInfo = mTfs.getInfo(tFile);
|
||||
Assert.assertNotNull(fileInfo);
|
||||
Assert.assertEquals(SIZE_BYTES, fileInfo.length);
|
||||
|
||||
InStreamOptions options =
|
||||
new InStreamOptions.Builder(new TachyonConf()).setTachyonStorageType(
|
||||
TachyonStorageType.NO_STORE).build();
|
||||
FileInStream tfis = mTfs.getInStream(tFile, options);
|
||||
byte[] read = new byte[SIZE_BYTES];
|
||||
tfis.read(read);
|
||||
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(SIZE_BYTES, read));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadFileTest() throws IOException, TachyonException {
|
||||
TachyonFile file =
|
||||
TachyonFSTestUtils.createByteFile(mTfs, "/testFile", TachyonStorageType.NO_STORE,
|
||||
UnderStorageType.SYNC_PERSIST, 10);
|
||||
FileInfo fileInfo = mTfs.getInfo(file);
|
||||
Assert.assertFalse(fileInfo.getInMemoryPercentage() == 100);
|
||||
|
||||
tachyonInterpreter.interpret("load /testFile", null);
|
||||
|
||||
fileInfo = mTfs.getInfo(file);
|
||||
Assert.assertTrue(fileInfo.getInMemoryPercentage() == 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadDirTest() throws IOException, TachyonException {
|
||||
TachyonFile fileA = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
|
||||
TachyonStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 10);
|
||||
TachyonFile fileB = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileB",
|
||||
TachyonStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
|
||||
FileInfo fileInfoA = mTfs.getInfo(fileA);
|
||||
FileInfo fileInfoB = mTfs.getInfo(fileB);
|
||||
Assert.assertFalse(fileInfoA.getInMemoryPercentage() == 100);
|
||||
Assert.assertTrue(fileInfoB.getInMemoryPercentage() == 100);
|
||||
|
||||
tachyonInterpreter.interpret("load /testRoot", null);
|
||||
|
||||
fileInfoA = mTfs.getInfo(fileA);
|
||||
fileInfoB = mTfs.getInfo(fileB);
|
||||
Assert.assertTrue(fileInfoA.getInMemoryPercentage() == 100);
|
||||
Assert.assertTrue(fileInfoB.getInMemoryPercentage() == 100);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void copyFromLocalTest() throws IOException, TachyonException {
|
||||
File testDir = new File(mLocalTachyonCluster.getTachyonHome() + "/testDir");
|
||||
testDir.mkdir();
|
||||
File testDirInner = new File(mLocalTachyonCluster.getTachyonHome() + "/testDir/testDirInner");
|
||||
testDirInner.mkdir();
|
||||
File testFile =
|
||||
generateFileContent("/testDir/testFile", BufferUtils.getIncreasingByteArray(10));
|
||||
|
||||
generateFileContent("/testDir/testDirInner/testFile2",
|
||||
BufferUtils.getIncreasingByteArray(10, 20));
|
||||
|
||||
InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
|
||||
testFile.getParent() + " /testDir", null);
|
||||
Assert.assertEquals(
|
||||
"Copied " + testFile.getParent() + " to /testDir\n\n",
|
||||
output.message());
|
||||
|
||||
TachyonFile file1 = mTfs.open(new TachyonURI("/testDir/testFile"));
|
||||
TachyonFile file2 = mTfs.open(new TachyonURI("/testDir/testDirInner/testFile2"));
|
||||
FileInfo fileInfo1 = mTfs.getInfo(file1);
|
||||
FileInfo fileInfo2 = mTfs.getInfo(file2);
|
||||
Assert.assertNotNull(fileInfo1);
|
||||
Assert.assertNotNull(fileInfo2);
|
||||
Assert.assertEquals(10, fileInfo1.length);
|
||||
Assert.assertEquals(20, fileInfo2.length);
|
||||
|
||||
byte[] read = readContent(file1, 10);
|
||||
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, read));
|
||||
read = readContent(file2, 20);
|
||||
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, 20, read));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void copyFromLocalTestWithFullURI() throws IOException, TachyonException {
|
||||
File testFile = generateFileContent("/srcFileURI", BufferUtils.getIncreasingByteArray(10));
|
||||
String tachyonURI = "tachyon://" + mLocalTachyonCluster.getMasterHostname() + ":"
|
||||
+ mLocalTachyonCluster.getMasterPort() + "/destFileURI";
|
||||
|
||||
InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
|
||||
testFile.getPath() + " " + tachyonURI, null);
|
||||
Assert.assertEquals(
|
||||
"Copied " + testFile.getPath() + " to " + tachyonURI + "\n\n",
|
||||
output.message());
|
||||
|
||||
TachyonFile file = mTfs.open(new TachyonURI("/destFileURI"));
|
||||
FileInfo fileInfo = mTfs.getInfo(file);
|
||||
Assert.assertEquals(10L, fileInfo.length);
|
||||
byte[] read = readContent(file, 10);
|
||||
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, read));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void copyFromLocalFileToDstPathTest() throws IOException, TachyonException {
|
||||
String dataString = "copyFromLocalFileToDstPathTest";
|
||||
byte[] data = dataString.getBytes();
|
||||
File localDir = new File(mLocalTachyonCluster.getTachyonHome() + "/localDir");
|
||||
localDir.mkdir();
|
||||
File localFile = generateFileContent("/localDir/testFile", data);
|
||||
|
||||
tachyonInterpreter.interpret("mkdir /dstDir", null);
|
||||
tachyonInterpreter.interpret("copyFromLocal " + localFile.getPath() + " /dstDir", null);
|
||||
|
||||
TachyonFile file = mTfs.open(new TachyonURI("/dstDir/testFile"));
|
||||
FileInfo fileInfo = mTfs.getInfo(file);
|
||||
Assert.assertNotNull(fileInfo);
|
||||
byte[] read = readContent(file, data.length);
|
||||
Assert.assertEquals(new String(read), dataString);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void copyToLocalLargeTest() throws IOException {
|
||||
copyToLocalWithBytes(SIZE_BYTES);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void copyToLocalTest() throws IOException {
|
||||
copyToLocalWithBytes(10);
|
||||
}
|
||||
|
||||
private void copyToLocalWithBytes(int bytes) throws IOException {
|
||||
TachyonFSTestUtils.createByteFile(mTfs, "/testFile", TachyonStorageType.STORE,
|
||||
UnderStorageType.NO_PERSIST, bytes);
|
||||
|
||||
InterpreterResult output = tachyonInterpreter.interpret("copyToLocal /testFile " +
|
||||
mLocalTachyonCluster.getTachyonHome() + "/testFile", null);
|
||||
|
||||
Assert.assertEquals(
|
||||
"Copied /testFile to " + mLocalTachyonCluster.getTachyonHome() + "/testFile\n\n",
|
||||
output.message());
|
||||
fileReadTest("/testFile", 10);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void countNotExistTest() throws IOException {
|
||||
InterpreterResult output = tachyonInterpreter.interpret("count /NotExistFile", null);
|
||||
Assert.assertEquals(Code.ERROR, output.code());
|
||||
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
|
||||
output.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void countTest() throws IOException {
|
||||
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA", TachyonStorageType.STORE,
|
||||
UnderStorageType.NO_PERSIST, 10);
|
||||
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", TachyonStorageType.STORE,
|
||||
UnderStorageType.NO_PERSIST, 20);
|
||||
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileB", TachyonStorageType.STORE,
|
||||
UnderStorageType.NO_PERSIST, 30);
|
||||
|
||||
InterpreterResult output = tachyonInterpreter.interpret("count /testRoot", null);
|
||||
|
||||
String expected = "";
|
||||
String format = "%-25s%-25s%-15s\n";
|
||||
expected += String.format(format, "File Count", "Folder Count", "Total Bytes");
|
||||
expected += String.format(format, 3, 2, 60);
|
||||
expected += "\n";
|
||||
Assert.assertEquals(expected, output.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fileinfoNotExistTest() throws IOException {
|
||||
InterpreterResult output = tachyonInterpreter.interpret("fileinfo /NotExistFile", null);
|
||||
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
|
||||
output.message());
|
||||
Assert.assertEquals(Code.ERROR, output.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void locationNotExistTest() throws IOException {
|
||||
InterpreterResult output = tachyonInterpreter.interpret("location /NotExistFile", null);
|
||||
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
|
||||
output.message());
|
||||
Assert.assertEquals(Code.ERROR, output.code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lsTest() throws IOException, TachyonException {
|
||||
FileInfo[] files = new FileInfo[3];
|
||||
|
||||
TachyonFile fileA = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
|
||||
TachyonStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
|
||||
files[0] = mTfs.getInfo(fileA);
|
||||
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", TachyonStorageType.STORE,
|
||||
UnderStorageType.NO_PERSIST, 20);
|
||||
files[1] = mTfs.getInfo(mTfs.open(new TachyonURI("/testRoot/testDir")));
|
||||
TachyonFile fileC = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileC",
|
||||
TachyonStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 30);
|
||||
files[2] = mTfs.getInfo(fileC);
|
||||
|
||||
InterpreterResult output = tachyonInterpreter.interpret("ls /testRoot", null);
|
||||
|
||||
String expected = "";
|
||||
String format = "%-10s%-25s%-15s%-5s\n";
|
||||
expected += String.format(format, FormatUtils.getSizeFromBytes(10),
|
||||
TfsShell.convertMsToDate(files[0].getCreationTimeMs()), "In Memory", "/testRoot/testFileA");
|
||||
expected += String.format(format, FormatUtils.getSizeFromBytes(0),
|
||||
TfsShell.convertMsToDate(files[1].getCreationTimeMs()), "", "/testRoot/testDir");
|
||||
expected += String.format(format, FormatUtils.getSizeFromBytes(30),
|
||||
TfsShell.convertMsToDate(files[2].getCreationTimeMs()), "Not In Memory",
|
||||
"/testRoot/testFileC");
|
||||
expected += "\n";
|
||||
|
||||
Assert.assertEquals(Code.SUCCESS, output.code());
|
||||
Assert.assertEquals(expected, output.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void lsrTest() throws IOException, TachyonException {
|
||||
FileInfo[] files = new FileInfo[4];
|
||||
TachyonFile fileA = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
|
||||
TachyonStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
|
||||
files[0] = mTfs.getInfo(fileA);
|
||||
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", TachyonStorageType.STORE,
|
||||
UnderStorageType.NO_PERSIST, 20);
|
||||
files[1] = mTfs.getInfo(mTfs.open(new TachyonURI("/testRoot/testDir")));
|
||||
files[2] = mTfs.getInfo(mTfs.open(new TachyonURI("/testRoot/testDir/testFileB")));
|
||||
TachyonFile fileC = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileC",
|
||||
TachyonStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 30);
|
||||
files[3] = mTfs.getInfo(fileC);
|
||||
|
||||
InterpreterResult output = tachyonInterpreter.interpret("lsr /testRoot", null);
|
||||
|
||||
String expected = "";
|
||||
String format = "%-10s%-25s%-15s%-5s\n";
|
||||
expected +=
|
||||
String.format(format, FormatUtils.getSizeFromBytes(10),
|
||||
TfsShell.convertMsToDate(files[0].getCreationTimeMs()), "In Memory",
|
||||
"/testRoot/testFileA");
|
||||
expected +=
|
||||
String.format(format, FormatUtils.getSizeFromBytes(0),
|
||||
TfsShell.convertMsToDate(files[1].getCreationTimeMs()), "", "/testRoot/testDir");
|
||||
expected +=
|
||||
String.format(format, FormatUtils.getSizeFromBytes(20),
|
||||
TfsShell.convertMsToDate(files[2].getCreationTimeMs()), "In Memory",
|
||||
"/testRoot/testDir/testFileB");
|
||||
expected +=
|
||||
String.format(format, FormatUtils.getSizeFromBytes(30),
|
||||
TfsShell.convertMsToDate(files[3].getCreationTimeMs()), "Not In Memory",
|
||||
"/testRoot/testFileC");
|
||||
expected += "\n";
|
||||
Assert.assertEquals(expected, output.message());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mkdirComplexPathTest() throws IOException, TachyonException {
|
||||
InterpreterResult output = tachyonInterpreter.interpret(
|
||||
"mkdir /Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File", null);
|
||||
|
||||
TachyonFile tFile = mTfs.open(new TachyonURI("/Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File"));
|
||||
FileInfo fileInfo = mTfs.getInfo(tFile);
|
||||
Assert.assertNotNull(fileInfo);
|
||||
Assert.assertEquals(
|
||||
"Successfully created directory /Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File\n\n",
|
||||
output.message());
|
||||
Assert.assertTrue(fileInfo.isIsFolder());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mkdirExistingTest() throws IOException {
|
||||
String command = "mkdir /festFile1";
|
||||
Assert.assertEquals(Code.SUCCESS, tachyonInterpreter.interpret(command, null).code());
|
||||
Assert.assertEquals(Code.SUCCESS, tachyonInterpreter.interpret(command, null).code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mkdirInvalidPathTest() throws IOException {
|
||||
Assert.assertEquals(
|
||||
Code.ERROR,
|
||||
tachyonInterpreter.interpret("mkdir /test File Invalid Path", null).code());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mkdirShortPathTest() throws IOException, TachyonException {
|
||||
InterpreterResult output = tachyonInterpreter.interpret("mkdir /root/testFile1", null);
|
||||
TachyonFile tFile = mTfs.open(new TachyonURI("/root/testFile1"));
|
||||
FileInfo fileInfo = mTfs.getInfo(tFile);
|
||||
Assert.assertNotNull(fileInfo);
|
||||
Assert.assertEquals(
|
||||
"Successfully created directory /root/testFile1\n\n",
|
||||
output.message());
|
||||
Assert.assertTrue(fileInfo.isIsFolder());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mkdirTest() throws IOException, TachyonException {
|
||||
String qualifiedPath =
|
||||
"tachyon://" + mLocalTachyonCluster.getMasterHostname() + ":"
|
||||
+ mLocalTachyonCluster.getMasterPort() + "/root/testFile1";
|
||||
InterpreterResult output = tachyonInterpreter.interpret("mkdir " + qualifiedPath, null);
|
||||
TachyonFile tFile = mTfs.open(new TachyonURI("/root/testFile1"));
|
||||
FileInfo fileInfo = mTfs.getInfo(tFile);
|
||||
Assert.assertNotNull(fileInfo);
|
||||
Assert.assertEquals(
|
||||
"Successfully created directory " + qualifiedPath + "\n\n",
|
||||
output.message());
|
||||
Assert.assertTrue(fileInfo.isIsFolder());
|
||||
}
|
||||
|
||||
private File generateFileContent(String path, byte[] toWrite)
|
||||
throws IOException, FileNotFoundException {
|
||||
File testFile = new File(mLocalTachyonCluster.getTachyonHome() + path);
|
||||
testFile.createNewFile();
|
||||
FileOutputStream fos = new FileOutputStream(testFile);
|
||||
fos.write(toWrite);
|
||||
fos.close();
|
||||
return testFile;
|
||||
}
|
||||
|
||||
private byte[] readContent(TachyonFile tFile, int length) throws IOException, TachyonException {
|
||||
InStreamOptions options =
|
||||
new InStreamOptions.Builder(new TachyonConf()).setTachyonStorageType(
|
||||
TachyonStorageType.NO_STORE).build();
|
||||
FileInStream tfis = mTfs.getInStream(tFile, options);
|
||||
byte[] read = new byte[length];
|
||||
tfis.read(read);
|
||||
return read;
|
||||
}
|
||||
|
||||
private void fileReadTest(String fileName, int size) throws IOException {
|
||||
File testFile = new File(PathUtils.concatPath(mLocalTachyonCluster.getTachyonHome(), fileName));
|
||||
FileInputStream fis = new FileInputStream(testFile);
|
||||
byte[] read = new byte[size];
|
||||
fis.read(read);
|
||||
fis.close();
|
||||
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(size, read));
|
||||
}
|
||||
}
|
||||
|
|
@ -94,6 +94,10 @@ The following components are provided under Apache License.
|
|||
(Apache 2.0) Shiro Core (org.apache.shiro:shiro-core:1.2.3 - https://shiro.apache.org)
|
||||
(Apache 2.0) Shiro Web (org.apache.shiro:shiro-web:1.2.3 - https://shiro.apache.org)
|
||||
(Apache 2.0) SnakeYAML (org.yaml:snakeyaml:1.15 - http://www.snakeyaml.org)
|
||||
(Apache 2.0) Tachyon Shell (org.tachyonproject:tachyon-shell:0.8.2 - http://tachyon-project.org)
|
||||
(Apache 2.0) Tachyon Servers (org.tachyonproject:tachyon-servers:0.8.2 - http://tachyon-project.org)
|
||||
(Apache 2.0) Tachyon Minicluster (org.tachyonproject:tachyon-minicluster:0.8.2 - http://tachyon-project.org)
|
||||
(Apache 2.0) Tachyon Underfs Local (org.tachyonproject:tachyon-underfs-local:0.8.2 - http://tachyon-project.org)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -56,8 +56,8 @@ public class RemoteInterpreter extends Interpreter {
|
|||
FormType formType;
|
||||
boolean initialized;
|
||||
private Map<String, String> env;
|
||||
|
||||
private int connectTimeout;
|
||||
private int maxPoolSize;
|
||||
|
||||
public RemoteInterpreter(Properties property,
|
||||
String className,
|
||||
|
|
@ -65,6 +65,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
String interpreterPath,
|
||||
String localRepoPath,
|
||||
int connectTimeout,
|
||||
int maxPoolSize,
|
||||
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
|
||||
super(property);
|
||||
this.className = className;
|
||||
|
|
@ -74,6 +75,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.localRepoPath = localRepoPath;
|
||||
env = new HashMap<String, String>();
|
||||
this.connectTimeout = connectTimeout;
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
}
|
||||
|
||||
|
|
@ -92,6 +94,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
this.localRepoPath = localRepoPath;
|
||||
this.env = env;
|
||||
this.connectTimeout = connectTimeout;
|
||||
this.maxPoolSize = 10;
|
||||
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
|
||||
}
|
||||
|
||||
|
|
@ -127,7 +130,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
int rc = interpreterProcess.reference(getInterpreterGroup());
|
||||
|
||||
interpreterProcess.setMaxPoolSize(this.maxPoolSize);
|
||||
synchronized (interpreterProcess) {
|
||||
// when first process created
|
||||
if (rc == 1) {
|
||||
|
|
@ -334,7 +337,7 @@ public class RemoteInterpreter extends Interpreter {
|
|||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
int maxConcurrency = 10;
|
||||
int maxConcurrency = maxPoolSize;
|
||||
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
|
||||
if (interpreterProcess == null) {
|
||||
return null;
|
||||
|
|
|
|||
|
|
@ -271,6 +271,12 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public void setMaxPoolSize(int size) {
|
||||
if (clientPool != null) {
|
||||
//Size + 2 for progress poller , cancel operation
|
||||
clientPool.setMaxTotal(size + 2);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Called when angular object is updated in client side to propagate
|
||||
* change to the remote process
|
||||
|
|
|
|||
|
|
@ -263,6 +263,7 @@ public class RemoteInterpreterServer
|
|||
private Interpreter interpreter;
|
||||
private String script;
|
||||
private InterpreterContext context;
|
||||
private Map<String, Object> infos;
|
||||
|
||||
public InterpretJob(
|
||||
String jobId,
|
||||
|
|
@ -285,7 +286,10 @@ public class RemoteInterpreterServer
|
|||
|
||||
@Override
|
||||
public Map<String, Object> info() {
|
||||
return null;
|
||||
if (infos == null) {
|
||||
infos = new HashMap<>();
|
||||
}
|
||||
return infos;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@
|
|||
},
|
||||
"devDependencies": {
|
||||
"autoprefixer": "^6.1.0",
|
||||
"bower": "",
|
||||
"bower": "1.7.2",
|
||||
"grunt": "^0.4.1",
|
||||
"grunt-concurrent": "^0.5.0",
|
||||
"grunt-contrib-clean": "^0.5.0",
|
||||
|
|
|
|||
|
|
@ -445,6 +445,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
+ "org.apache.zeppelin.angular.AngularInterpreter,"
|
||||
+ "org.apache.zeppelin.shell.ShellInterpreter,"
|
||||
+ "org.apache.zeppelin.hive.HiveInterpreter,"
|
||||
+ "org.apache.zeppelin.tachyon.TachyonInterpreter,"
|
||||
+ "org.apache.zeppelin.phoenix.PhoenixInterpreter,"
|
||||
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
|
||||
+ "org.apache.zeppelin.tajo.TajoInterpreter,"
|
||||
|
|
@ -461,6 +462,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
|
||||
ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
|
||||
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
|
||||
ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
|
||||
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
|
||||
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
|
||||
// use specified notebook (id) as homescreen
|
||||
|
|
|
|||
|
|
@ -724,9 +724,11 @@ public class InterpreterFactory {
|
|||
Properties property, String interpreterId) {
|
||||
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
|
||||
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterId;
|
||||
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
|
||||
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
|
||||
property, className, conf.getInterpreterRemoteRunnerPath(),
|
||||
interpreterPath, localRepoPath, connectTimeout, remoteInterpreterProcessListener));
|
||||
interpreterPath, localRepoPath, connectTimeout,
|
||||
maxPoolSize, remoteInterpreterProcessListener));
|
||||
return intp;
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue