Merge branch 'master' of https://github.com/apache/incubator-zeppelin into ZEPPELIN-630

Conflicts:
	zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
	zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
This commit is contained in:
Mina Lee 2016-01-30 08:55:58 -08:00
commit 62a75c938e
17 changed files with 1203 additions and 18 deletions

View file

@ -111,7 +111,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.tachyon.TachyonInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

View file

@ -50,6 +50,7 @@
<li><a href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li>
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Shell</a></li>
<li><a href="{{BASE_PATH}}/interpreter/spark.html">Spark</a></li>
<li><a href="{{BASE_PATH}}/interpreter/tachyon.html">Tachyon</a></li>
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Tajo</a></li>
<li role="separator" class="divider"></li>
<li><a href="{{BASE_PATH}}/manual/dynamicinterpreterload.html">Dynamic Interpreter Loading</a></li>

Binary file not shown.

After

Width:  |  Height:  |  Size: 43 KiB

216
docs/interpreter/tachyon.md Normal file
View file

@ -0,0 +1,216 @@
---
layout: page
title: "Tachyon Interpreter"
description: "Tachyon Interpreter"
group: manual
---
{% include JB/setup %}
## Tachyon Interpreter for Apache Zeppelin
[Tachyon](http://tachyon-project.org/) is a memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster frameworks.
## Configuration
<table class="table-configuration">
<tr>
<th>Name</th>
<th>Class</th>
<th>Description</th>
</tr>
<tr>
<td>tachyon.master.hostname</td>
<td>localhost</td>
<td>Tachyon master hostname</td>
</tr>
<tr>
<td>tachyon.master.port</td>
<td>19998</td>
<td>Tachyon master port</td>
</tr>
</table>
## Enabling Tachyon Interpreter
In a notebook, to enable the **Tachyon** interpreter, click on the **Gear** icon and select **Tachyon**.
## Using the Tachyon Interpreter
In a paragraph, use `%tachyon` to select the **Tachyon** interpreter and then input all commands.
```bash
%tachyon
help
```
> **Tip :** Use ( Ctrl + . ) for autocompletion.
## Interpreter Commands
The **Tachyon** interpreter accepts the following commands.
<center>
<table class="table-configuration">
<tr>
<th>Operation</th>
<th>Syntax</th>
<th>Description</th>
</tr>
<tr>
<td>copyFromLocal</td>
<td>copyFromLocal "source path" "remote path"</td>
<td>Copy the specified file specified by "source path" to the path specified by "remote path".
This command will fail if "remote path" already exists.</td>
</tr>
<tr>
<td>copyToLocal</td>
<td>copyToLocal "remote path" "local path"</td>
<td>Copy the specified file from the path specified by "remote source" to a local
destination.</td>
</tr>
<tr>
<td>count</td>
<td>count "path"</td>
<td>Display the number of folders and files matching the specified prefix in "path".</td>
</tr>
<tr>
<td>du</td>
<td>du "path"</td>
<td>Display the size of a file or a directory specified by the input path.</td>
</tr>
<tr>
<td>fileinfo</td>
<td>fileinfo "path"</td>
<td>Print the information of the blocks of a specified file.</td>
</tr>
<tr>
<td>free</td>
<td>free "path"</td>
<td>Free a file or all files under a directory from Tachyon. If the file/directory is also
in under storage, it will still be available there.</td>
</tr>
<tr>
<td>getCapacityBytes</td>
<td>getCapacityBytes</td>
<td>Get the capacity of the TachyonFS.</td>
</tr>
<tr>
<td>getUsedBytes</td>
<td>getUsedBytes</td>
<td>Get number of bytes used in the TachyonFS.</td>
</tr>
<tr>
<td>load</td>
<td>load "path"</td>
<td>Load the data of a file or a directory from under storage into Tachyon.</td>
</tr>
<tr>
<td>loadMetadata</td>
<td>loadMetadata "path"</td>
<td>Load the metadata of a file or a directory from under storage into Tachyon.</td>
</tr>
<tr>
<td>location</td>
<td>location "path"</td>
<td>Display a list of hosts that have the file data.</td>
</tr>
<tr>
<td>ls</td>
<td>ls "path"</td>
<td>List all the files and directories directly under the given path with information such as
size.</td>
</tr>
<tr>
<td>lsr</td>
<td>lsr "path"</td>
<td>Recursively list all the files and directories under the given path with information such
as size.</td>
</tr>
<tr>
<td>mkdir</td>
<td>mkdir "path"</td>
<td>Create a directory under the given path, along with any necessary parent directories. This
command will fail if the given path already exists.</td>
</tr>
<tr>
<td>mount</td>
<td>mount "path" "uri"</td>
<td>Mount the underlying file system path "uri" into the Tachyon namespace as "path". The "path"
is assumed not to exist and is created by the operation. No data or metadata is loaded from under
storage into Tachyon. After a path is mounted, operations on objects under the mounted path are
mirror to the mounted under storage.</td>
</tr>
<tr>
<td>mv</td>
<td>mv "source" "destination"</td>
<td>Move a file or directory specified by "source" to a new location "destination". This command
will fail if "destination" already exists.</td>
</tr>
<tr>
<td>pin</td>
<td>pin "path"</td>
<td>Pin the given file to avoid evicting it from memory. If the given path is a directory, it
recursively pins all the files contained and any new files created within this directory.</td>
</tr>
<tr>
<td>report</td>
<td>report "path"</td>
<td>Report to the master that a file is lost.</td>
</tr>
<tr>
<td>request</td>
<td>request "path" "dependency ID"</td>
<td>Request the file for a given dependency ID.</td>
</tr>
<tr>
<td>rm</td>
<td>rm "path"</td>
<td>Remove a file. This command will fail if the given path is a directory rather than a
file.</td>
</tr>
<tr>
<td>rmr</td>
<td>rmr "path"</td>
<td>Remove a file, or a directory with all the files and sub-directories that this directory
contains.</td>
</tr>
<tr>
<td>tail</td>
<td>tail "path"</td>
<td>Print the last 1KB of the specified file to the console.</td>
</tr>
<tr>
<td>touch</td>
<td>touch "path"</td>
<td>Create a 0-byte file at the specified location.</td>
</tr>
<tr>
<td>unmount</td>
<td>unmount "path"</td>
<td>Unmount the underlying file system path mounted in the Tachyon namespace as "path". Tachyon
objects under "path" are removed from Tachyon, but they still exist in the previously mounted
under storage.</td>
</tr>
<tr>
<td>unpin</td>
<td>unpin "path"</td>
<td>Unpin the given file to allow Tachyon to evict this file again. If the given path is a
directory, it recursively unpins all files contained and any new files created within this
directory.</td>
</tr>
</table>
</center>
## How to test it's working
Be sure to have configured correctly the Tachyon interpreter, then open a new paragraph and type one of the above commands.
Below a simple example to show how to interact with Tachyon interpreter.
Following steps are performed:
* using sh interpreter a new text file is created on local machine
* using Tachyon interpreter:
* is listed the content of the tfs (Tachyon File System) root
* the file previously created is copied to tfs
* is listed again the content of the tfs root to check the existence of the new copied file
* is showed the content of the copied file (using the tail command)
* the file previously copied to tfs is copied to local machine
* using sh interpreter it's checked the existence of the new file copied from Tachyon and its content is showed
<center>
![Tachyon Interpreter Example](../assets/themes/zeppelin/img/docs-img/tachyon-example.png)
</center>

View file

@ -35,6 +35,7 @@
{
"title": "Load data into table",
"text": "import org.apache.commons.io.IOUtils\nimport java.net.URL\nimport java.nio.charset.Charset\n\n// Zeppelin creates and injects sc (SparkContext) and sqlContext (HiveContext or SqlContext)\n// So you don\u0027t need create them manually\n\n// load bank data\nval bankText \u003d sc.parallelize(\n IOUtils.toString(\n new URL(\"https://s3.amazonaws.com/apache-zeppelin/tutorial/bank/bank.csv\"),\n Charset.forName(\"utf8\")).split(\"\\n\"))\n\ncase class Bank(age: Integer, job: String, marital: String, education: String, balance: Integer)\n\nval bank \u003d bankText.map(s \u003d\u003e s.split(\";\")).filter(s \u003d\u003e s(0) !\u003d \"\\\"age\\\"\").map(\n s \u003d\u003e Bank(s(0).toInt, \n s(1).replaceAll(\"\\\"\", \"\"),\n s(2).replaceAll(\"\\\"\", \"\"),\n s(3).replaceAll(\"\\\"\", \"\"),\n s(5).replaceAll(\"\\\"\", \"\").toInt\n )\n).toDF()\nbank.registerTempTable(\"bank\")",
"dateUpdated": "Jan 14, 2016 7:58:56 PM",
"config": {
"colWidth": 12.0,
"graph": {
@ -46,7 +47,9 @@
"groups": [],
"scatter": {}
},
"title": true
"title": true,
"enabled": true,
"editorMode": "ace/mode/scala"
},
"settings": {
"params": {},
@ -333,7 +336,10 @@
],
"name": "Zeppelin Tutorial",
"id": "2A94M5J1Z",
"angularObjects": {},
"angularObjects": {
"2B6FF8NNU": [],
"2B67PH63Z": []
},
"config": {
"looknfeel": "default"
},

View file

@ -102,6 +102,7 @@
<module>lens</module>
<module>cassandra</module>
<module>elasticsearch</module>
<module>tachyon</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-distribution</module>

View file

@ -19,18 +19,22 @@ package org.apache.zeppelin.shell;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteWatchdog;
import org.apache.commons.exec.Executor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@ -41,6 +45,7 @@ import org.slf4j.LoggerFactory;
*/
public class ShellInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(ShellInterpreter.class);
private static final String EXECUTOR_KEY = "executor";
int commandTimeOut = 600000;
static {
@ -61,30 +66,66 @@ public class ShellInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
logger.debug("Run shell command '" + cmd + "'");
long start = System.currentTimeMillis();
CommandLine cmdLine = CommandLine.parse("bash");
cmdLine.addArgument("-c", false);
cmdLine.addArgument(cmd, false);
DefaultExecutor executor = new DefaultExecutor();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
executor.setStreamHandler(new PumpStreamHandler(outputStream));
ByteArrayOutputStream errorStream = new ByteArrayOutputStream();
executor.setStreamHandler(new PumpStreamHandler(outputStream, errorStream));
executor.setWatchdog(new ExecuteWatchdog(commandTimeOut));
Job runningJob = getRunningJob(contextInterpreter.getParagraphId());
Map<String, Object> info = runningJob.info();
info.put(EXECUTOR_KEY, executor);
try {
int exitValue = executor.execute(cmdLine);
int exitVal = executor.execute(cmdLine);
logger.info("Paragraph " + contextInterpreter.getParagraphId()
+ "return with exit value: " + exitVal);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, outputStream.toString());
} catch (ExecuteException e) {
int exitValue = e.getExitValue();
logger.error("Can not run " + cmd, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
Code code = Code.ERROR;
String msg = errorStream.toString();
if (exitValue == 143) {
code = Code.INCOMPLETE;
msg = msg + "Paragraph received a SIGTERM.\n";
logger.info("The paragraph " + contextInterpreter.getParagraphId()
+ " stopped executing: " + msg);
}
msg += "Exitvalue: " + exitValue;
return new InterpreterResult(code, msg);
} catch (IOException e) {
logger.error("Can not run " + cmd, e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
}
@Override
public void cancel(InterpreterContext context) {}
private Job getRunningJob(String paragraphId) {
Job foundJob = null;
Collection<Job> jobsRunning = getScheduler().getJobsRunning();
for (Job job : jobsRunning) {
if (job.getId().equals(paragraphId)) {
foundJob = job;
}
}
return foundJob;
}
@Override
public void cancel(InterpreterContext context) {
Job runningJob = getRunningJob(context.getParagraphId());
if (runningJob != null) {
Map<String, Object> info = runningJob.info();
Object object = info.get(EXECUTOR_KEY);
if (object != null) {
Executor executor = (Executor) object;
ExecuteWatchdog watchdog = executor.getWatchdog();
watchdog.destroyProcess();
}
}
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
@ -97,8 +138,8 @@ public class ShellInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
ShellInterpreter.class.getName() + this.hashCode());
return SchedulerFactory.singleton().createOrGetParallelScheduler(
ShellInterpreter.class.getName() + this.hashCode(), 10);
}
@Override

164
tachyon/pom.xml Normal file
View file

@ -0,0 +1,164 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.6.0-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-tachyon</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<name>Zeppelin: Tachyon interpreter</name>
<url>http://www.apache.org</url>
<properties>
<tachyon.version>0.8.2</tachyon.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-shell</artifactId>
<version>${tachyon.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<!-- TEST -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-servers</artifactId>
<version>${tachyon.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-minicluster</artifactId>
<version>${tachyon.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-underfs-local</artifactId>
<version>${tachyon.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/tachyon</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/tachyon</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,251 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.tachyon;
import java.io.IOException;
import java.io.PrintStream;
import java.io.ByteArrayOutputStream;
import java.util.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.conf.TachyonConf;
import tachyon.shell.TfsShell;
/**
* Tachyon interpreter for Zeppelin.
*/
public class TachyonInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(TachyonInterpreter.class);
protected static final String TACHYON_MASTER_HOSTNAME = "tachyon.master.hostname";
protected static final String TACHYON_MASTER_PORT = "tachyon.master.port";
private TfsShell tfs;
private int totalCommands = 0;
private int completedCommands = 0;
private final String tachyonMasterHostname;
private final String tachyonMasterPort;
protected final List<String> keywords = Arrays.asList("cat", "copyFromLocal",
"copyToLocal", "count", "du", "fileinfo", "free", "getUsedBytes",
"getCapacityBytes", "load", "loadMetadata", "location", "ls", "lsr",
"mkdir", "mount", "mv", "pin", "report", "request", "rm", "rmr",
"setTTL", "unsetTTL", "tail", "touch", "unmount", "unpin");
public TachyonInterpreter(Properties property) {
super(property);
tachyonMasterHostname = property.getProperty(TACHYON_MASTER_HOSTNAME);
tachyonMasterPort = property.getProperty(TACHYON_MASTER_PORT);
}
static {
Interpreter.register("tachyon", "tachyon",
TachyonInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(TACHYON_MASTER_HOSTNAME, "localhost", "Tachyon master hostname")
.add(TACHYON_MASTER_PORT, "19998", "Tachyon master port")
.build());
}
@Override
public void open() {
logger.info("Starting Tachyon shell to connect to " + tachyonMasterHostname +
" on port " + tachyonMasterPort);
System.setProperty(TACHYON_MASTER_HOSTNAME, tachyonMasterHostname);
System.setProperty(TACHYON_MASTER_PORT, tachyonMasterPort);
tfs = new TfsShell(new TachyonConf());
}
@Override
public void close() {
logger.info("Closing Tachyon shell");
try {
tfs.close();
} catch (IOException e) {
logger.error("Cannot close connection", e);
}
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] lines = splitAndRemoveEmpty(st, "\n");
return interpret(lines, context);
}
private InterpreterResult interpret(String[] commands, InterpreterContext context) {
boolean isSuccess = true;
totalCommands = commands.length;
completedCommands = 0;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
PrintStream old = System.out;
System.setOut(ps);
for (String command : commands) {
int commandResuld = 1;
String[] args = splitAndRemoveEmpty(command, " ");
if (args.length > 0 && args[0].equals("help")) {
System.out.println(getCommandList());
} else {
commandResuld = tfs.run(args);
}
if (commandResuld != 0) {
isSuccess = false;
break;
} else {
completedCommands += 1;
}
System.out.println();
}
System.out.flush();
System.setOut(old);
if (isSuccess) {
return new InterpreterResult(Code.SUCCESS, baos.toString());
} else {
return new InterpreterResult(Code.ERROR, baos.toString());
}
}
private String[] splitAndRemoveEmpty(String st, String splitSeparator) {
String[] voices = st.split(splitSeparator);
ArrayList<String> result = new ArrayList<String>();
for (String voice : voices) {
if (!voice.trim().isEmpty()) {
result.add(voice);
}
}
return result.toArray(new String[result.size()]);
}
private String[] splitAndRemoveEmpty(String[] sts, String splitSeparator) {
ArrayList<String> result = new ArrayList<String>();
for (String st : sts) {
result.addAll(Arrays.asList(splitAndRemoveEmpty(st, splitSeparator)));
}
return result.toArray(new String[result.size()]);
}
@Override
public void cancel(InterpreterContext context) { }
@Override
public FormType getFormType() {
return FormType.NATIVE;
}
@Override
public int getProgress(InterpreterContext context) {
return completedCommands * 100 / totalCommands;
}
@Override
public List<String> completion(String buf, int cursor) {
String[] words = splitAndRemoveEmpty(splitAndRemoveEmpty(buf, "\n"), " ");
String lastWord = "";
if (words.length > 0) {
lastWord = words[ words.length - 1 ];
}
ArrayList<String> voices = new ArrayList<String>();
for (String command : keywords) {
if (command.startsWith(lastWord)) {
voices.add(command);
}
}
return voices;
}
private String getCommandList() {
StringBuilder sb = new StringBuilder();
sb.append("Commands list:");
sb.append("\n\t[help] - List all available commands.");
sb.append("\n\t[cat <path>] - Print the content of the file to the console.");
sb.append("\n\t[copyFromLocal <src> <remoteDst>] - Copy the specified file specified " +
"by \"source path\" to the path specified by \"remote path\". " +
"This command will fail if \"remote path\" already exists.");
sb.append("\n\t[copyToLocal <src> <localDst>] - Copy the specified file from the path " +
"specified by \"remote source\" to a local destination.");
sb.append("\n\t[count <path>] - Display the number of folders and files matching " +
"the specified prefix in \"path\".");
sb.append("\n\t[du <path>] - Display the size of a file or a directory specified " +
"by the input path.");
sb.append("\n\t[fileinfo <path>] - Print the information of the blocks of a specified file.");
sb.append("\n\t[free <file path|folder path>] - Free a file or all files under a " +
"directory from Tachyon. If the file/directory is also in under storage, " +
"it will still be available there.");
sb.append("\n\t[getUsedBytes] - Get number of bytes used in the TachyonFS.");
sb.append("\n\t[getCapacityBytes] - Get the capacity of the TachyonFS.");
sb.append("\n\t[load <path>] - Load the data of a file or a directory from under " +
"storage into Tachyon.");
sb.append("\n\t[loadMetadata <path>] - Load the metadata of a file or a directory " +
"from under storage into Tachyon.");
sb.append("\n\t[location <path>] - Display a list of hosts that have the file data.");
sb.append("\n\t[ls <path>] - List all the files and directories directly under the " +
"given path with information such as size.");
sb.append("\n\t[lsr <path>] - Recursively list all the files and directories under " +
"the given path with information such as size.");
sb.append("\n\t[mkdir <path>] - Create a directory under the given path, along with " +
"any necessary parent directories. This command will fail if the given " +
"path already exists.");
sb.append("\n\t[mount <tachyonPath> <ufsURI>] - Mount the underlying file system " +
"path \"uri\" into the Tachyon namespace as \"path\". The \"path\" is assumed " +
"not to exist and is created by the operation. No data or metadata is loaded " +
"from under storage into Tachyon. After a path is mounted, operations on objects " +
"under the mounted path are mirror to the mounted under storage.");
sb.append("\n\t[mv <src> <dst>] - Move a file or directory specified by \"source\" " +
"to a new location \"destination\". This command will fail if " +
"\"destination\" already exists.");
sb.append("\n\t[pin <path>] - Pin the given file to avoid evicting it from memory. " +
"If the given path is a directory, it recursively pins all the files contained " +
"and any new files created within this directory.");
sb.append("\n\t[report <path>] - Report to the master that a file is lost.");
sb.append("\n\t[request <tachyonaddress> <dependencyId>] - Request the file for " +
"a given dependency ID.");
sb.append("\n\t[rm <path>] - Remove a file. This command will fail if the given " +
"path is a directory rather than a file.");
sb.append("\n\t[rmr <path>] - Remove a file, or a directory with all the files and " +
"sub-directories that this directory contains.");
sb.append("\n\t[tail <path>] - Print the last 1KB of the specified file to the console.");
sb.append("\n\t[touch <path>] - Create a 0-byte file at the specified location.");
sb.append("\n\t[unmount <tachyonPath>] - Unmount the underlying file system path " +
"mounted in the Tachyon namespace as \"path\". Tachyon objects under \"path\" " +
"are removed from Tachyon, but they still exist in the previously " +
"mounted under storage.");
sb.append("\n\t[unpin <path>] - Unpin the given file to allow Tachyon to evict " +
"this file again. If the given path is a directory, it recursively unpins " +
"all files contained and any new files created within this directory.");
return sb.toString();
}
}

View file

@ -0,0 +1,484 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.tachyon;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.*;
import tachyon.Constants;
import tachyon.TachyonURI;
import tachyon.client.TachyonFSTestUtils;
import tachyon.client.TachyonStorageType;
import tachyon.client.UnderStorageType;
import tachyon.client.file.FileInStream;
import tachyon.client.file.TachyonFile;
import tachyon.client.file.TachyonFileSystem;
import tachyon.client.file.options.InStreamOptions;
import tachyon.conf.TachyonConf;
import tachyon.exception.ExceptionMessage;
import tachyon.exception.TachyonException;
import tachyon.master.LocalTachyonCluster;
import tachyon.shell.TfsShell;
import tachyon.thrift.FileInfo;
import tachyon.util.FormatUtils;
import tachyon.util.io.BufferUtils;
import tachyon.util.io.PathUtils;
public class TachyonInterpreterTest {
private TachyonInterpreter tachyonInterpreter;
private static final int SIZE_BYTES = Constants.MB * 10;
private LocalTachyonCluster mLocalTachyonCluster = null;
private TachyonFileSystem mTfs = null;
@After
public final void after() throws Exception {
if (tachyonInterpreter != null) {
tachyonInterpreter.close();
}
mLocalTachyonCluster.stop();
}
@Before
public final void before() throws Exception {
mLocalTachyonCluster = new LocalTachyonCluster(SIZE_BYTES, 1000, Constants.GB);
mLocalTachyonCluster.start();
mTfs = mLocalTachyonCluster.getClient();
final Properties props = new Properties();
props.put(TachyonInterpreter.TACHYON_MASTER_HOSTNAME, mLocalTachyonCluster.getMasterHostname());
props.put(TachyonInterpreter.TACHYON_MASTER_PORT, mLocalTachyonCluster.getMasterPort() + "");
tachyonInterpreter = new TachyonInterpreter(props);
tachyonInterpreter.open();
}
@Test
public void testCompletion() {
List<String> expectedResultOne = Arrays.asList("cat", "copyFromLocal",
"copyToLocal", "count");
List<String> expectedResultTwo = Arrays.asList("copyFromLocal",
"copyToLocal", "count");
List<String> expectedResultThree = Arrays.asList("copyFromLocal", "copyToLocal");
List<String> expectedResultNone = new ArrayList<String>();
List<String> resultOne = tachyonInterpreter.completion("c", 0);
List<String> resultTwo = tachyonInterpreter.completion("co", 0);
List<String> resultThree = tachyonInterpreter.completion("copy", 0);
List<String> resultNotMatch = tachyonInterpreter.completion("notMatch", 0);
List<String> resultAll = tachyonInterpreter.completion("", 0);
Assert.assertEquals(expectedResultOne, resultOne);
Assert.assertEquals(expectedResultTwo, resultTwo);
Assert.assertEquals(expectedResultThree, resultThree);
Assert.assertEquals(expectedResultNone, resultNotMatch);
Assert.assertEquals(tachyonInterpreter.keywords, resultAll);
}
@Test
public void catDirectoryTest() throws IOException {
String expected = "Successfully created directory /testDir\n\n" +
"/testDir is not a file.\n";
InterpreterResult output = tachyonInterpreter.interpret("mkdir /testDir" +
"\ncat /testDir", null);
Assert.assertEquals(Code.ERROR, output.code());
Assert.assertEquals(expected, output.message());
}
@Test
public void catNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("cat /testFile", null);
Assert.assertEquals(Code.ERROR, output.code());
}
@Test
public void catTest() throws IOException {
TachyonFSTestUtils.createByteFile(mTfs, "/testFile", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 10);
InterpreterResult output = tachyonInterpreter.interpret("cat /testFile", null);
byte[] expected = BufferUtils.getIncreasingByteArray(10);
Assert.assertEquals(Code.SUCCESS, output.code());
Assert.assertArrayEquals(expected,
output.message().substring(0, output.message().length() - 1).getBytes());
}
@Test
public void copyFromLocalLargeTest() throws IOException, TachyonException {
File testFile = new File(mLocalTachyonCluster.getTachyonHome() + "/testFile");
testFile.createNewFile();
FileOutputStream fos = new FileOutputStream(testFile);
byte[] toWrite = BufferUtils.getIncreasingByteArray(SIZE_BYTES);
fos.write(toWrite);
fos.close();
InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
testFile.getAbsolutePath() + " /testFile", null);
Assert.assertEquals(
"Copied " + testFile.getAbsolutePath() + " to /testFile\n\n",
output.message());
TachyonFile tFile = mTfs.open(new TachyonURI("/testFile"));
FileInfo fileInfo = mTfs.getInfo(tFile);
Assert.assertNotNull(fileInfo);
Assert.assertEquals(SIZE_BYTES, fileInfo.length);
InStreamOptions options =
new InStreamOptions.Builder(new TachyonConf()).setTachyonStorageType(
TachyonStorageType.NO_STORE).build();
FileInStream tfis = mTfs.getInStream(tFile, options);
byte[] read = new byte[SIZE_BYTES];
tfis.read(read);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(SIZE_BYTES, read));
}
@Test
public void loadFileTest() throws IOException, TachyonException {
TachyonFile file =
TachyonFSTestUtils.createByteFile(mTfs, "/testFile", TachyonStorageType.NO_STORE,
UnderStorageType.SYNC_PERSIST, 10);
FileInfo fileInfo = mTfs.getInfo(file);
Assert.assertFalse(fileInfo.getInMemoryPercentage() == 100);
tachyonInterpreter.interpret("load /testFile", null);
fileInfo = mTfs.getInfo(file);
Assert.assertTrue(fileInfo.getInMemoryPercentage() == 100);
}
@Test
public void loadDirTest() throws IOException, TachyonException {
TachyonFile fileA = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
TachyonStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 10);
TachyonFile fileB = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileB",
TachyonStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
FileInfo fileInfoA = mTfs.getInfo(fileA);
FileInfo fileInfoB = mTfs.getInfo(fileB);
Assert.assertFalse(fileInfoA.getInMemoryPercentage() == 100);
Assert.assertTrue(fileInfoB.getInMemoryPercentage() == 100);
tachyonInterpreter.interpret("load /testRoot", null);
fileInfoA = mTfs.getInfo(fileA);
fileInfoB = mTfs.getInfo(fileB);
Assert.assertTrue(fileInfoA.getInMemoryPercentage() == 100);
Assert.assertTrue(fileInfoB.getInMemoryPercentage() == 100);
}
@Test
public void copyFromLocalTest() throws IOException, TachyonException {
File testDir = new File(mLocalTachyonCluster.getTachyonHome() + "/testDir");
testDir.mkdir();
File testDirInner = new File(mLocalTachyonCluster.getTachyonHome() + "/testDir/testDirInner");
testDirInner.mkdir();
File testFile =
generateFileContent("/testDir/testFile", BufferUtils.getIncreasingByteArray(10));
generateFileContent("/testDir/testDirInner/testFile2",
BufferUtils.getIncreasingByteArray(10, 20));
InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
testFile.getParent() + " /testDir", null);
Assert.assertEquals(
"Copied " + testFile.getParent() + " to /testDir\n\n",
output.message());
TachyonFile file1 = mTfs.open(new TachyonURI("/testDir/testFile"));
TachyonFile file2 = mTfs.open(new TachyonURI("/testDir/testDirInner/testFile2"));
FileInfo fileInfo1 = mTfs.getInfo(file1);
FileInfo fileInfo2 = mTfs.getInfo(file2);
Assert.assertNotNull(fileInfo1);
Assert.assertNotNull(fileInfo2);
Assert.assertEquals(10, fileInfo1.length);
Assert.assertEquals(20, fileInfo2.length);
byte[] read = readContent(file1, 10);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, read));
read = readContent(file2, 20);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, 20, read));
}
@Test
public void copyFromLocalTestWithFullURI() throws IOException, TachyonException {
File testFile = generateFileContent("/srcFileURI", BufferUtils.getIncreasingByteArray(10));
String tachyonURI = "tachyon://" + mLocalTachyonCluster.getMasterHostname() + ":"
+ mLocalTachyonCluster.getMasterPort() + "/destFileURI";
InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
testFile.getPath() + " " + tachyonURI, null);
Assert.assertEquals(
"Copied " + testFile.getPath() + " to " + tachyonURI + "\n\n",
output.message());
TachyonFile file = mTfs.open(new TachyonURI("/destFileURI"));
FileInfo fileInfo = mTfs.getInfo(file);
Assert.assertEquals(10L, fileInfo.length);
byte[] read = readContent(file, 10);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, read));
}
@Test
public void copyFromLocalFileToDstPathTest() throws IOException, TachyonException {
String dataString = "copyFromLocalFileToDstPathTest";
byte[] data = dataString.getBytes();
File localDir = new File(mLocalTachyonCluster.getTachyonHome() + "/localDir");
localDir.mkdir();
File localFile = generateFileContent("/localDir/testFile", data);
tachyonInterpreter.interpret("mkdir /dstDir", null);
tachyonInterpreter.interpret("copyFromLocal " + localFile.getPath() + " /dstDir", null);
TachyonFile file = mTfs.open(new TachyonURI("/dstDir/testFile"));
FileInfo fileInfo = mTfs.getInfo(file);
Assert.assertNotNull(fileInfo);
byte[] read = readContent(file, data.length);
Assert.assertEquals(new String(read), dataString);
}
@Test
public void copyToLocalLargeTest() throws IOException {
copyToLocalWithBytes(SIZE_BYTES);
}
@Test
public void copyToLocalTest() throws IOException {
copyToLocalWithBytes(10);
}
private void copyToLocalWithBytes(int bytes) throws IOException {
TachyonFSTestUtils.createByteFile(mTfs, "/testFile", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, bytes);
InterpreterResult output = tachyonInterpreter.interpret("copyToLocal /testFile " +
mLocalTachyonCluster.getTachyonHome() + "/testFile", null);
Assert.assertEquals(
"Copied /testFile to " + mLocalTachyonCluster.getTachyonHome() + "/testFile\n\n",
output.message());
fileReadTest("/testFile", 10);
}
@Test
public void countNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("count /NotExistFile", null);
Assert.assertEquals(Code.ERROR, output.code());
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
output.message());
}
@Test
public void countTest() throws IOException {
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 10);
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 20);
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileB", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 30);
InterpreterResult output = tachyonInterpreter.interpret("count /testRoot", null);
String expected = "";
String format = "%-25s%-25s%-15s\n";
expected += String.format(format, "File Count", "Folder Count", "Total Bytes");
expected += String.format(format, 3, 2, 60);
expected += "\n";
Assert.assertEquals(expected, output.message());
}
@Test
public void fileinfoNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("fileinfo /NotExistFile", null);
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
output.message());
Assert.assertEquals(Code.ERROR, output.code());
}
@Test
public void locationNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("location /NotExistFile", null);
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
output.message());
Assert.assertEquals(Code.ERROR, output.code());
}
@Test
public void lsTest() throws IOException, TachyonException {
FileInfo[] files = new FileInfo[3];
TachyonFile fileA = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
TachyonStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
files[0] = mTfs.getInfo(fileA);
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 20);
files[1] = mTfs.getInfo(mTfs.open(new TachyonURI("/testRoot/testDir")));
TachyonFile fileC = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileC",
TachyonStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 30);
files[2] = mTfs.getInfo(fileC);
InterpreterResult output = tachyonInterpreter.interpret("ls /testRoot", null);
String expected = "";
String format = "%-10s%-25s%-15s%-5s\n";
expected += String.format(format, FormatUtils.getSizeFromBytes(10),
TfsShell.convertMsToDate(files[0].getCreationTimeMs()), "In Memory", "/testRoot/testFileA");
expected += String.format(format, FormatUtils.getSizeFromBytes(0),
TfsShell.convertMsToDate(files[1].getCreationTimeMs()), "", "/testRoot/testDir");
expected += String.format(format, FormatUtils.getSizeFromBytes(30),
TfsShell.convertMsToDate(files[2].getCreationTimeMs()), "Not In Memory",
"/testRoot/testFileC");
expected += "\n";
Assert.assertEquals(Code.SUCCESS, output.code());
Assert.assertEquals(expected, output.message());
}
@Test
public void lsrTest() throws IOException, TachyonException {
FileInfo[] files = new FileInfo[4];
TachyonFile fileA = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
TachyonStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
files[0] = mTfs.getInfo(fileA);
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 20);
files[1] = mTfs.getInfo(mTfs.open(new TachyonURI("/testRoot/testDir")));
files[2] = mTfs.getInfo(mTfs.open(new TachyonURI("/testRoot/testDir/testFileB")));
TachyonFile fileC = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileC",
TachyonStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 30);
files[3] = mTfs.getInfo(fileC);
InterpreterResult output = tachyonInterpreter.interpret("lsr /testRoot", null);
String expected = "";
String format = "%-10s%-25s%-15s%-5s\n";
expected +=
String.format(format, FormatUtils.getSizeFromBytes(10),
TfsShell.convertMsToDate(files[0].getCreationTimeMs()), "In Memory",
"/testRoot/testFileA");
expected +=
String.format(format, FormatUtils.getSizeFromBytes(0),
TfsShell.convertMsToDate(files[1].getCreationTimeMs()), "", "/testRoot/testDir");
expected +=
String.format(format, FormatUtils.getSizeFromBytes(20),
TfsShell.convertMsToDate(files[2].getCreationTimeMs()), "In Memory",
"/testRoot/testDir/testFileB");
expected +=
String.format(format, FormatUtils.getSizeFromBytes(30),
TfsShell.convertMsToDate(files[3].getCreationTimeMs()), "Not In Memory",
"/testRoot/testFileC");
expected += "\n";
Assert.assertEquals(expected, output.message());
}
@Test
public void mkdirComplexPathTest() throws IOException, TachyonException {
InterpreterResult output = tachyonInterpreter.interpret(
"mkdir /Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File", null);
TachyonFile tFile = mTfs.open(new TachyonURI("/Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File"));
FileInfo fileInfo = mTfs.getInfo(tFile);
Assert.assertNotNull(fileInfo);
Assert.assertEquals(
"Successfully created directory /Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File\n\n",
output.message());
Assert.assertTrue(fileInfo.isIsFolder());
}
@Test
public void mkdirExistingTest() throws IOException {
String command = "mkdir /festFile1";
Assert.assertEquals(Code.SUCCESS, tachyonInterpreter.interpret(command, null).code());
Assert.assertEquals(Code.SUCCESS, tachyonInterpreter.interpret(command, null).code());
}
@Test
public void mkdirInvalidPathTest() throws IOException {
Assert.assertEquals(
Code.ERROR,
tachyonInterpreter.interpret("mkdir /test File Invalid Path", null).code());
}
@Test
public void mkdirShortPathTest() throws IOException, TachyonException {
InterpreterResult output = tachyonInterpreter.interpret("mkdir /root/testFile1", null);
TachyonFile tFile = mTfs.open(new TachyonURI("/root/testFile1"));
FileInfo fileInfo = mTfs.getInfo(tFile);
Assert.assertNotNull(fileInfo);
Assert.assertEquals(
"Successfully created directory /root/testFile1\n\n",
output.message());
Assert.assertTrue(fileInfo.isIsFolder());
}
@Test
public void mkdirTest() throws IOException, TachyonException {
String qualifiedPath =
"tachyon://" + mLocalTachyonCluster.getMasterHostname() + ":"
+ mLocalTachyonCluster.getMasterPort() + "/root/testFile1";
InterpreterResult output = tachyonInterpreter.interpret("mkdir " + qualifiedPath, null);
TachyonFile tFile = mTfs.open(new TachyonURI("/root/testFile1"));
FileInfo fileInfo = mTfs.getInfo(tFile);
Assert.assertNotNull(fileInfo);
Assert.assertEquals(
"Successfully created directory " + qualifiedPath + "\n\n",
output.message());
Assert.assertTrue(fileInfo.isIsFolder());
}
private File generateFileContent(String path, byte[] toWrite)
throws IOException, FileNotFoundException {
File testFile = new File(mLocalTachyonCluster.getTachyonHome() + path);
testFile.createNewFile();
FileOutputStream fos = new FileOutputStream(testFile);
fos.write(toWrite);
fos.close();
return testFile;
}
private byte[] readContent(TachyonFile tFile, int length) throws IOException, TachyonException {
InStreamOptions options =
new InStreamOptions.Builder(new TachyonConf()).setTachyonStorageType(
TachyonStorageType.NO_STORE).build();
FileInStream tfis = mTfs.getInStream(tFile, options);
byte[] read = new byte[length];
tfis.read(read);
return read;
}
private void fileReadTest(String fileName, int size) throws IOException {
File testFile = new File(PathUtils.concatPath(mLocalTachyonCluster.getTachyonHome(), fileName));
FileInputStream fis = new FileInputStream(testFile);
byte[] read = new byte[size];
fis.read(read);
fis.close();
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(size, read));
}
}

View file

@ -94,6 +94,10 @@ The following components are provided under Apache License.
(Apache 2.0) Shiro Core (org.apache.shiro:shiro-core:1.2.3 - https://shiro.apache.org)
(Apache 2.0) Shiro Web (org.apache.shiro:shiro-web:1.2.3 - https://shiro.apache.org)
(Apache 2.0) SnakeYAML (org.yaml:snakeyaml:1.15 - http://www.snakeyaml.org)
(Apache 2.0) Tachyon Shell (org.tachyonproject:tachyon-shell:0.8.2 - http://tachyon-project.org)
(Apache 2.0) Tachyon Servers (org.tachyonproject:tachyon-servers:0.8.2 - http://tachyon-project.org)
(Apache 2.0) Tachyon Minicluster (org.tachyonproject:tachyon-minicluster:0.8.2 - http://tachyon-project.org)
(Apache 2.0) Tachyon Underfs Local (org.tachyonproject:tachyon-underfs-local:0.8.2 - http://tachyon-project.org)

View file

@ -56,8 +56,8 @@ public class RemoteInterpreter extends Interpreter {
FormType formType;
boolean initialized;
private Map<String, String> env;
private int connectTimeout;
private int maxPoolSize;
public RemoteInterpreter(Properties property,
String className,
@ -65,6 +65,7 @@ public class RemoteInterpreter extends Interpreter {
String interpreterPath,
String localRepoPath,
int connectTimeout,
int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener) {
super(property);
this.className = className;
@ -74,6 +75,7 @@ public class RemoteInterpreter extends Interpreter {
this.localRepoPath = localRepoPath;
env = new HashMap<String, String>();
this.connectTimeout = connectTimeout;
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
@ -92,6 +94,7 @@ public class RemoteInterpreter extends Interpreter {
this.localRepoPath = localRepoPath;
this.env = env;
this.connectTimeout = connectTimeout;
this.maxPoolSize = 10;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
}
@ -127,7 +130,7 @@ public class RemoteInterpreter extends Interpreter {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
int rc = interpreterProcess.reference(getInterpreterGroup());
interpreterProcess.setMaxPoolSize(this.maxPoolSize);
synchronized (interpreterProcess) {
// when first process created
if (rc == 1) {
@ -334,7 +337,7 @@ public class RemoteInterpreter extends Interpreter {
@Override
public Scheduler getScheduler() {
int maxConcurrency = 10;
int maxConcurrency = maxPoolSize;
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
if (interpreterProcess == null) {
return null;

View file

@ -271,6 +271,12 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler {
}
}
public void setMaxPoolSize(int size) {
if (clientPool != null) {
//Size + 2 for progress poller , cancel operation
clientPool.setMaxTotal(size + 2);
}
}
/**
* Called when angular object is updated in client side to propagate
* change to the remote process

View file

@ -263,6 +263,7 @@ public class RemoteInterpreterServer
private Interpreter interpreter;
private String script;
private InterpreterContext context;
private Map<String, Object> infos;
public InterpretJob(
String jobId,
@ -285,7 +286,10 @@ public class RemoteInterpreterServer
@Override
public Map<String, Object> info() {
return null;
if (infos == null) {
infos = new HashMap<>();
}
return infos;
}
@Override

View file

@ -7,7 +7,7 @@
},
"devDependencies": {
"autoprefixer": "^6.1.0",
"bower": "",
"bower": "1.7.2",
"grunt": "^0.4.1",
"grunt-concurrent": "^0.5.0",
"grunt-contrib-clean": "^0.5.0",

View file

@ -445,6 +445,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.angular.AngularInterpreter,"
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
+ "org.apache.zeppelin.tachyon.TachyonInterpreter,"
+ "org.apache.zeppelin.phoenix.PhoenixInterpreter,"
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+ "org.apache.zeppelin.tajo.TajoInterpreter,"
@ -461,6 +462,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000),
ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
// use specified notebook (id) as homescreen

View file

@ -724,9 +724,11 @@ public class InterpreterFactory {
Properties property, String interpreterId) {
int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterId;
int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(
property, className, conf.getInterpreterRemoteRunnerPath(),
interpreterPath, localRepoPath, connectTimeout, remoteInterpreterProcessListener));
interpreterPath, localRepoPath, connectTimeout,
maxPoolSize, remoteInterpreterProcessListener));
return intp;
}