update tachyon interpreter to alluxio interpreter

This commit is contained in:
maocorte 2016-02-25 21:15:59 +01:00
parent ea9f139308
commit 42b1884275
13 changed files with 838 additions and 818 deletions

View file

@ -27,14 +27,14 @@
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-tachyon</artifactId>
<artifactId>zeppelin-alluxio</artifactId>
<packaging>jar</packaging>
<version>0.6.0-incubating-SNAPSHOT</version>
<name>Zeppelin: Tachyon interpreter</name>
<name>Zeppelin: Alluxio interpreter</name>
<url>http://www.apache.org</url>
<properties>
<tachyon.version>0.8.2</tachyon.version>
<alluxio.version>1.0.0</alluxio.version>
</properties>
<dependencies>
<dependency>
@ -50,9 +50,9 @@
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-shell</artifactId>
<version>${tachyon.version}</version>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-shell</artifactId>
<version>${alluxio.version}</version>
</dependency>
<dependency>
@ -73,23 +73,23 @@
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-servers</artifactId>
<version>${tachyon.version}</version>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-core-server</artifactId>
<version>${alluxio.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-minicluster</artifactId>
<version>${tachyon.version}</version>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-minicluster</artifactId>
<version>${alluxio.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-underfs-local</artifactId>
<version>${tachyon.version}</version>
<groupId>org.alluxio</groupId>
<artifactId>alluxio-underfs-local</artifactId>
<version>${alluxio.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
@ -127,7 +127,7 @@
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/tachyon</outputDirectory>
<outputDirectory>${project.build.directory}/../../interpreter/alluxio</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
@ -141,7 +141,7 @@
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/tachyon</outputDirectory>
<outputDirectory>${project.build.directory}/../../interpreter/alluxio</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>

View file

@ -0,0 +1,254 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.alluxio;
import java.io.IOException;
import java.io.PrintStream;
import java.io.ByteArrayOutputStream;
import java.util.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import alluxio.Configuration;
import alluxio.shell.AlluxioShell;
/**
* Alluxio interpreter for Zeppelin.
*/
public class AlluxioInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(AlluxioInterpreter.class);
protected static final String ALLUXIO_MASTER_HOSTNAME = "alluxio.master.hostname";
protected static final String ALLUXIO_MASTER_PORT = "alluxio.master.port";
private AlluxioShell fs;
private int totalCommands = 0;
private int completedCommands = 0;
private final String alluxioMasterHostname;
private final String alluxioMasterPort;
protected final List<String> keywords = Arrays.asList("cat", "chgrp",
"chmod", "chown", "copyFromLocal", "copyToLocal", "count",
"createLineage", "deleteLineage", "du", "fileInfo", "free",
"getCapacityBytes", "getUsedBytes", "listLineages", "load",
"loadMetadata", "location", "ls", "mkdir", "mount", "mv",
"persist", "pin", "report", "rm", "setTtl", "tail", "touch",
"unmount", "unpin", "unsetTtl");
public AlluxioInterpreter(Properties property) {
super(property);
alluxioMasterHostname = property.getProperty(ALLUXIO_MASTER_HOSTNAME);
alluxioMasterPort = property.getProperty(ALLUXIO_MASTER_PORT);
}
static {
Interpreter.register("alluxio", "alluxio",
AlluxioInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(ALLUXIO_MASTER_HOSTNAME, "localhost", "Alluxio master hostname")
.add(ALLUXIO_MASTER_PORT, "19998", "Alluxio master port")
.build());
}
@Override
public void open() {
logger.info("Starting Alluxio shell to connect to " + alluxioMasterHostname +
" on port " + alluxioMasterPort);
System.setProperty(ALLUXIO_MASTER_HOSTNAME, alluxioMasterHostname);
System.setProperty(ALLUXIO_MASTER_PORT, alluxioMasterPort);
fs = new AlluxioShell(new Configuration());
}
@Override
public void close() {
logger.info("Closing Alluxio shell");
try {
fs.close();
} catch (IOException e) {
logger.error("Cannot close connection", e);
}
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] lines = splitAndRemoveEmpty(st, "\n");
return interpret(lines, context);
}
private InterpreterResult interpret(String[] commands, InterpreterContext context) {
boolean isSuccess = true;
totalCommands = commands.length;
completedCommands = 0;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
PrintStream old = System.out;
System.setOut(ps);
for (String command : commands) {
int commandResuld = 1;
String[] args = splitAndRemoveEmpty(command, " ");
if (args.length > 0 && args[0].equals("help")) {
System.out.println(getCommandList());
} else {
commandResuld = fs.run(args);
}
if (commandResuld != 0) {
isSuccess = false;
break;
} else {
completedCommands += 1;
}
System.out.println();
}
System.out.flush();
System.setOut(old);
if (isSuccess) {
return new InterpreterResult(Code.SUCCESS, baos.toString());
} else {
return new InterpreterResult(Code.ERROR, baos.toString());
}
}
private String[] splitAndRemoveEmpty(String st, String splitSeparator) {
String[] voices = st.split(splitSeparator);
ArrayList<String> result = new ArrayList<String>();
for (String voice : voices) {
if (!voice.trim().isEmpty()) {
result.add(voice);
}
}
return result.toArray(new String[result.size()]);
}
private String[] splitAndRemoveEmpty(String[] sts, String splitSeparator) {
ArrayList<String> result = new ArrayList<String>();
for (String st : sts) {
result.addAll(Arrays.asList(splitAndRemoveEmpty(st, splitSeparator)));
}
return result.toArray(new String[result.size()]);
}
@Override
public void cancel(InterpreterContext context) { }
@Override
public FormType getFormType() {
return FormType.NATIVE;
}
@Override
public int getProgress(InterpreterContext context) {
return completedCommands * 100 / totalCommands;
}
@Override
public List<String> completion(String buf, int cursor) {
String[] words = splitAndRemoveEmpty(splitAndRemoveEmpty(buf, "\n"), " ");
String lastWord = "";
if (words.length > 0) {
lastWord = words[ words.length - 1 ];
}
ArrayList<String> voices = new ArrayList<String>();
for (String command : keywords) {
if (command.startsWith(lastWord)) {
voices.add(command);
}
}
return voices;
}
private String getCommandList() {
StringBuilder sb = new StringBuilder();
sb.append("Commands list:");
sb.append("\n\t[help] - List all available commands.");
sb.append("\n\t[cat <path>] - Prints the file's contents to the console.");
sb.append("\n\t[chgrp [-R] <group> <path>] - Changes the group of a file or directory " +
"specified by args. Specify -R to change the group recursively.");
sb.append("\n\t[chmod -R <mode> <path>] - Changes the permission of a file or directory " +
"specified by args. Specify -R to change the permission recursively.");
sb.append("\n\t[chown -R <owner> <path>] - Changes the owner of a file or directory " +
"specified by args. Specify -R to change the owner recursively.");
sb.append("\n\t[copyFromLocal <src> <remoteDst>] - Copies a file or a directory from " +
"local filesystem to Alluxio filesystem.");
sb.append("\n\t[copyToLocal <src> <localDst>] - Copies a file or a directory from the " +
"Alluxio filesystem to the local filesystem.");
sb.append("\n\t[count <path>] - Displays the number of files and directories matching " +
"the specified prefix.");
sb.append("\n\t[createLineage <inputFile1,...> <outputFile1,...> " +
"[<cmd_arg1> <cmd_arg2> ...]] - Creates a lineage.");
sb.append("\n\t[deleteLineage <lineageId> <cascade(true|false)>] - Deletes a lineage. If " +
"cascade is specified as true, dependent lineages will also be deleted.");
sb.append("\n\t[du <path>] - Displays the size of the specified file or directory.");
sb.append("\n\t[fileInfo <path>] - Displays all block info for the specified file.");
sb.append("\n\t[free <file path|folder path>] - Removes the file or directory(recursively) " +
"from Alluxio memory space.");
sb.append("\n\t[getCapacityBytes] - Gets the capacity of the Alluxio file system.");
sb.append("\n\t[getUsedBytes] - Gets number of bytes used in the Alluxio file system.");
sb.append("\n\t[listLineages] - Lists all lineages.");
sb.append("\n\t[load <path>] - Loads a file or directory in Alluxio space, makes it " +
"resident in memory.");
sb.append("\n\t[loadMetadata <path>] - Loads metadata for the given Alluxio path from the " +
"under file system.");
sb.append("\n\t[location <path>] - Displays the list of hosts storing the specified file.");
sb.append("\n\t[ls [-R] <path>] - Displays information for all files and directories " +
"directly under the specified path. Specify -R to display files and " +
"directories recursively.");
sb.append("\n\t[mkdir <path1> [path2] ... [pathn]] - Creates the specified directories, " +
"including any parent directories that are required.");
sb.append("\n\t[mount <alluxioPath> <ufsURI>] - Mounts a UFS path onto an Alluxio path.");
sb.append("\n\t[mv <src> <dst>] - Renames a file or directory.");
sb.append("\n\t[persist <alluxioPath>] - Persists a file or directory currently stored " +
"only in Alluxio to the UnderFileSystem.");
sb.append("\n\t[pin <path>] - Pins the given file or directory in memory (works " +
"recursively for directories). Pinned files are never evicted from memory, unless " +
"TTL is set.");
sb.append("\n\t[report <path>] - Reports to the master that a file is lost.");
sb.append("\n\t[rm [-R] <path>] - Removes the specified file. Specify -R to remove file or " +
"directory recursively.");
sb.append("\n\t[setTtl <path> <time to live(in milliseconds)>] - Sets a new TTL value for " +
"the file at path.");
sb.append("\n\t[tail <path>] - Prints the file's last 1KB of contents to the console.");
sb.append("\n\t[touch <path>] - Creates a 0 byte file. The file will be written to the " +
"under file system.");
sb.append("\n\t[unmount <alluxioPath>] - Unmounts an Alluxio path.");
sb.append("\n\t[unpin <path>] - Unpins the given file or folder from memory " +
"(works recursively for a directory).");
sb.append("\n\\t[unsetTtl <path>] - Unsets the TTL value for the given path.");
sb.append("\n\t[unpin <path>] - Unpin the given file to allow Alluxio to evict " +
"this file again. If the given path is a directory, it recursively unpins " +
"all files contained and any new files created within this directory.");
return sb.toString();
}
}

View file

@ -0,0 +1,484 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.alluxio;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.*;
import alluxio.Configuration;
import alluxio.Constants;
import alluxio.AlluxioURI;
import alluxio.client.FileSystemTestUtils;
import alluxio.client.AlluxioStorageType;
import alluxio.client.UnderStorageType;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.options.InStreamOptions;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.AlluxioException;
import alluxio.master.LocalAlluxioCluster;
import alluxio.shell.AlluxioShell;
import alluxio.thrift.FileInfo;
import alluxio.util.FormatUtils;
import alluxio.util.io.BufferUtils;
import alluxio.util.io.PathUtils;
public class AlluxioInterpreterTest {
private AlluxioInterpreter tachyonInterpreter;
private static final int SIZE_BYTES = Constants.MB * 10;
private LocalAlluxioCluster mLocalAlluxioCluster = null;
private FileSystem mTfs = null;
@After
public final void after() throws Exception {
if (tachyonInterpreter != null) {
tachyonInterpreter.close();
}
mLocalAlluxioCluster.stop();
}
@Before
public final void before() throws Exception {
mLocalAlluxioCluster = new LocalAlluxioCluster(SIZE_BYTES, 1000);
mLocalAlluxioCluster.start();
mTfs = mLocalAlluxioCluster.getClient();
final Properties props = new Properties();
props.put(AlluxioInterpreter.ALLUXIO_MASTER_HOSTNAME, mLocalAlluxioCluster.getMasterHostname());
props.put(AlluxioInterpreter.ALLUXIO_MASTER_PORT, mLocalAlluxioCluster.getMasterPort() + "");
tachyonInterpreter = new AlluxioInterpreter(props);
tachyonInterpreter.open();
}
@Test
public void testCompletion() {
List<String> expectedResultOne = Arrays.asList("cat", "chgrp",
"chmod", "chown", "copyFromLocal", "copyToLocal", "count",
"createLineage");
List<String> expectedResultTwo = Arrays.asList("copyFromLocal",
"copyToLocal", "count");
List<String> expectedResultThree = Arrays.asList("copyFromLocal", "copyToLocal");
List<String> expectedResultNone = new ArrayList<String>();
List<String> resultOne = tachyonInterpreter.completion("c", 0);
List<String> resultTwo = tachyonInterpreter.completion("co", 0);
List<String> resultThree = tachyonInterpreter.completion("copy", 0);
List<String> resultNotMatch = tachyonInterpreter.completion("notMatch", 0);
List<String> resultAll = tachyonInterpreter.completion("", 0);
Assert.assertEquals(expectedResultOne, resultOne);
Assert.assertEquals(expectedResultTwo, resultTwo);
Assert.assertEquals(expectedResultThree, resultThree);
Assert.assertEquals(expectedResultNone, resultNotMatch);
Assert.assertEquals(tachyonInterpreter.keywords, resultAll);
}
@Test
public void catDirectoryTest() throws IOException {
String expected = "Successfully created directory /testDir\n\n" +
"/testDir is not a file.\n";
InterpreterResult output = tachyonInterpreter.interpret("mkdir /testDir" +
"\ncat /testDir", null);
Assert.assertEquals(Code.ERROR, output.code());
Assert.assertEquals(expected, output.message());
}
@Test
public void catNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("cat /testFile", null);
Assert.assertEquals(Code.ERROR, output.code());
}
// @Test
// public void catTest() throws IOException {
// FileSystemTestUtils.createByteFile(mTfs, "/testFile", AlluxioStorageType.STORE,
// UnderStorageType.NO_PERSIST, 10);
// InterpreterResult output = tachyonInterpreter.interpret("cat /testFile", null);
//
// byte[] expected = BufferUtils.getIncreasingByteArray(10);
//
// Assert.assertEquals(Code.SUCCESS, output.code());
// Assert.assertArrayEquals(expected,
// output.message().substring(0, output.message().length() - 1).getBytes());
// }
// @Test
// public void copyFromLocalLargeTest() throws IOException, AlluxioException {
// File testFile = new File(mLocalAlluxioCluster.getAlluxioHome() + "/testFile");
// testFile.createNewFile();
// FileOutputStream fos = new FileOutputStream(testFile);
// byte[] toWrite = BufferUtils.getIncreasingByteArray(SIZE_BYTES);
// fos.write(toWrite);
// fos.close();
//
// InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
// testFile.getAbsolutePath() + " /testFile", null);
// Assert.assertEquals(
// "Copied " + testFile.getAbsolutePath() + " to /testFile\n\n",
// output.message());
//
// FileInStream tFile = mTfs.openFile(new AlluxioURI("/testFile"));
// FileInfo fileInfo = mTfs.getInfo(tFile);
// Assert.assertNotNull(fileInfo);
// Assert.assertEquals(SIZE_BYTES, tFile.length);
//
// InStreamOptions options =
// new InStreamOptions.Builder(new Configuration()).setAlluxioStorageType(
// AlluxioStorageType.NO_STORE).build();
// FileInStream tfis = mTfs.getInStream(tFile, options);
// byte[] read = new byte[SIZE_BYTES];
// tfis.read(read);
// Assert.assertTrue(BufferUtils.equalIncreasingByteArray(SIZE_BYTES, read));
// }
// @Test
// public void loadFileTest() throws IOException, AlluxioException {
// TachyonFile file =
// FileSystemTestUtils.createByteFile(mTfs, "/testFile", AlluxioStorageType.NO_STORE,
// UnderStorageType.SYNC_PERSIST, 10);
// FileInfo fileInfo = mTfs.getInfo(file);
// Assert.assertFalse(fileInfo.getInMemoryPercentage() == 100);
//
// tachyonInterpreter.interpret("load /testFile", null);
//
// fileInfo = mTfs.getInfo(file);
// Assert.assertTrue(fileInfo.getInMemoryPercentage() == 100);
// }
// @Test
// public void loadDirTest() throws IOException, AlluxioException {
// TachyonFile fileA = FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
// AlluxioStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 10);
// TachyonFile fileB = FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testFileB",
// AlluxioStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
// FileInfo fileInfoA = mTfs.getInfo(fileA);
// FileInfo fileInfoB = mTfs.getInfo(fileB);
// Assert.assertFalse(fileInfoA.getInMemoryPercentage() == 100);
// Assert.assertTrue(fileInfoB.getInMemoryPercentage() == 100);
//
// tachyonInterpreter.interpret("load /testRoot", null);
//
// fileInfoA = mTfs.getInfo(fileA);
// fileInfoB = mTfs.getInfo(fileB);
// Assert.assertTrue(fileInfoA.getInMemoryPercentage() == 100);
// Assert.assertTrue(fileInfoB.getInMemoryPercentage() == 100);
// }
// @Test
// public void copyFromLocalTest() throws IOException, AlluxioException {
// File testDir = new File(mLocalAlluxioCluster.getAlluxioHome() + "/testDir");
// testDir.mkdir();
// File testDirInner = new File(mLocalAlluxioCluster.getAlluxioHome() + "/testDir/testDirInner");
// testDirInner.mkdir();
// File testFile =
// generateFileContent("/testDir/testFile", BufferUtils.getIncreasingByteArray(10));
//
// generateFileContent("/testDir/testDirInner/testFile2",
// BufferUtils.getIncreasingByteArray(10, 20));
//
// InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
// testFile.getParent() + " /testDir", null);
// Assert.assertEquals(
// "Copied " + testFile.getParent() + " to /testDir\n\n",
// output.message());
//
// TachyonFile file1 = mTfs.open(new AlluxioURI("/testDir/testFile"));
// TachyonFile file2 = mTfs.open(new AlluxioURI("/testDir/testDirInner/testFile2"));
// FileInfo fileInfo1 = mTfs.getInfo(file1);
// FileInfo fileInfo2 = mTfs.getInfo(file2);
// Assert.assertNotNull(fileInfo1);
// Assert.assertNotNull(fileInfo2);
// Assert.assertEquals(10, fileInfo1.length);
// Assert.assertEquals(20, fileInfo2.length);
//
// byte[] read = readContent(file1, 10);
// Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, read));
// read = readContent(file2, 20);
// Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, 20, read));
// }
// @Test
// public void copyFromLocalTestWithFullURI() throws IOException, AlluxioException {
// File testFile = generateFileContent("/srcFileURI", BufferUtils.getIncreasingByteArray(10));
// String tachyonURI = "tachyon://" + mLocalAlluxioCluster.getMasterHostname() + ":"
// + mLocalAlluxioCluster.getMasterPort() + "/destFileURI";
//
// InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
// testFile.getPath() + " " + tachyonURI, null);
// Assert.assertEquals(
// "Copied " + testFile.getPath() + " to " + tachyonURI + "\n\n",
// output.message());
//
// TachyonFile file = mTfs.open(new AlluxioURI("/destFileURI"));
// FileInfo fileInfo = mTfs.getInfo(file);
// Assert.assertEquals(10L, fileInfo.length);
// byte[] read = readContent(file, 10);
// Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, read));
// }
// @Test
// public void copyFromLocalFileToDstPathTest() throws IOException, AlluxioException {
// String dataString = "copyFromLocalFileToDstPathTest";
// byte[] data = dataString.getBytes();
// File localDir = new File(mLocalAlluxioCluster.getAlluxioHome() + "/localDir");
// localDir.mkdir();
// File localFile = generateFileContent("/localDir/testFile", data);
//
// tachyonInterpreter.interpret("mkdir /dstDir", null);
// tachyonInterpreter.interpret("copyFromLocal " + localFile.getPath() + " /dstDir", null);
//
// TachyonFile file = mTfs.open(new AlluxioURI("/dstDir/testFile"));
// FileInfo fileInfo = mTfs.getInfo(file);
// Assert.assertNotNull(fileInfo);
// byte[] read = readContent(file, data.length);
// Assert.assertEquals(new String(read), dataString);
// }
// @Test
// public void copyToLocalLargeTest() throws IOException {
// copyToLocalWithBytes(SIZE_BYTES);
// }
// @Test
// public void copyToLocalTest() throws IOException {
// copyToLocalWithBytes(10);
// }
// private void copyToLocalWithBytes(int bytes) throws IOException {
// FileSystemTestUtils.createByteFile(mTfs, "/testFile", AlluxioStorageType.STORE,
// UnderStorageType.NO_PERSIST, bytes);
//
// InterpreterResult output = tachyonInterpreter.interpret("copyToLocal /testFile " +
// mLocalAlluxioCluster.getAlluxioHome() + "/testFile", null);
//
// Assert.assertEquals(
// "Copied /testFile to " + mLocalAlluxioCluster.getAlluxioHome() + "/testFile\n\n",
// output.message());
// fileReadTest("/testFile", 10);
// }
@Test
public void countNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("count /NotExistFile", null);
Assert.assertEquals(Code.ERROR, output.code());
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
output.message());
}
// @Test
// public void countTest() throws IOException {
// FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testFileA", AlluxioStorageType.STORE,
// UnderStorageType.NO_PERSIST, 10);
// FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", AlluxioStorageType.STORE,
// UnderStorageType.NO_PERSIST, 20);
// FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testFileB", AlluxioStorageType.STORE,
// UnderStorageType.NO_PERSIST, 30);
//
// InterpreterResult output = tachyonInterpreter.interpret("count /testRoot", null);
//
// String expected = "";
// String format = "%-25s%-25s%-15s\n";
// expected += String.format(format, "File Count", "Folder Count", "Total Bytes");
// expected += String.format(format, 3, 2, 60);
// expected += "\n";
// Assert.assertEquals(expected, output.message());
// }
@Test
public void fileinfoNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("fileinfo /NotExistFile", null);
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
output.message());
Assert.assertEquals(Code.ERROR, output.code());
}
@Test
public void locationNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("location /NotExistFile", null);
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
output.message());
Assert.assertEquals(Code.ERROR, output.code());
}
// @Test
// public void lsTest() throws IOException, AlluxioException {
// FileInfo[] files = new FileInfo[3];
//
// TachyonFile fileA = FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
// AlluxioStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
// files[0] = mTfs.getInfo(fileA);
// FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", AlluxioStorageType.STORE,
// UnderStorageType.NO_PERSIST, 20);
// files[1] = mTfs.getInfo(mTfs.open(new AlluxioURI("/testRoot/testDir")));
// TachyonFile fileC = FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testFileC",
// AlluxioStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 30);
// files[2] = mTfs.getInfo(fileC);
//
// InterpreterResult output = tachyonInterpreter.interpret("ls /testRoot", null);
//
// String expected = "";
// String format = "%-10s%-25s%-15s%-5s\n";
// expected += String.format(format, FormatUtils.getSizeFromBytes(10),
// AlluxioShell.convertMsToDate(files[0].getCreationTimeMs()), "In Memory", "/testRoot/testFileA");
// expected += String.format(format, FormatUtils.getSizeFromBytes(0),
// AlluxioShell.convertMsToDate(files[1].getCreationTimeMs()), "", "/testRoot/testDir");
// expected += String.format(format, FormatUtils.getSizeFromBytes(30),
// AlluxioShell.convertMsToDate(files[2].getCreationTimeMs()), "Not In Memory",
// "/testRoot/testFileC");
// expected += "\n";
//
// Assert.assertEquals(Code.SUCCESS, output.code());
// Assert.assertEquals(expected, output.message());
// }
// @Test
// public void lsrTest() throws IOException, AlluxioException {
// FileInfo[] files = new FileInfo[4];
// TachyonFile fileA = FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
// AlluxioStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
// files[0] = mTfs.getInfo(fileA);
// FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", AlluxioStorageType.STORE,
// UnderStorageType.NO_PERSIST, 20);
// files[1] = mTfs.getInfo(mTfs.open(new AlluxioURI("/testRoot/testDir")));
// files[2] = mTfs.getInfo(mTfs.open(new AlluxioURI("/testRoot/testDir/testFileB")));
// TachyonFile fileC = FileSystemTestUtils.createByteFile(mTfs, "/testRoot/testFileC",
// AlluxioStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 30);
// files[3] = mTfs.getInfo(fileC);
//
// InterpreterResult output = tachyonInterpreter.interpret("lsr /testRoot", null);
//
// String expected = "";
// String format = "%-10s%-25s%-15s%-5s\n";
// expected +=
// String.format(format, FormatUtils.getSizeFromBytes(10),
// AlluxioShell.convertMsToDate(files[0].getCreationTimeMs()), "In Memory",
// "/testRoot/testFileA");
// expected +=
// String.format(format, FormatUtils.getSizeFromBytes(0),
// AlluxioShell.convertMsToDate(files[1].getCreationTimeMs()), "", "/testRoot/testDir");
// expected +=
// String.format(format, FormatUtils.getSizeFromBytes(20),
// AlluxioShell.convertMsToDate(files[2].getCreationTimeMs()), "In Memory",
// "/testRoot/testDir/testFileB");
// expected +=
// String.format(format, FormatUtils.getSizeFromBytes(30),
// AlluxioShell.convertMsToDate(files[3].getCreationTimeMs()), "Not In Memory",
// "/testRoot/testFileC");
// expected += "\n";
// Assert.assertEquals(expected, output.message());
// }
// @Test
// public void mkdirComplexPathTest() throws IOException, AlluxioException {
// InterpreterResult output = tachyonInterpreter.interpret(
// "mkdir /Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File", null);
//
// TachyonFile tFile = mTfs.open(new AlluxioURI("/Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File"));
// FileInfo fileInfo = mTfs.getInfo(tFile);
// Assert.assertNotNull(fileInfo);
// Assert.assertEquals(
// "Successfully created directory /Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File\n\n",
// output.message());
// Assert.assertTrue(fileInfo.isIsFolder());
// }
@Test
public void mkdirExistingTest() throws IOException {
String command = "mkdir /festFile1";
Assert.assertEquals(Code.SUCCESS, tachyonInterpreter.interpret(command, null).code());
Assert.assertEquals(Code.SUCCESS, tachyonInterpreter.interpret(command, null).code());
}
@Test
public void mkdirInvalidPathTest() throws IOException {
Assert.assertEquals(
Code.ERROR,
tachyonInterpreter.interpret("mkdir /test File Invalid Path", null).code());
}
// @Test
// public void mkdirShortPathTest() throws IOException, AlluxioException {
// InterpreterResult output = tachyonInterpreter.interpret("mkdir /root/testFile1", null);
// TachyonFile tFile = mTfs.open(new AlluxioURI("/root/testFile1"));
// FileInfo fileInfo = mTfs.getInfo(tFile);
// Assert.assertNotNull(fileInfo);
// Assert.assertEquals(
// "Successfully created directory /root/testFile1\n\n",
// output.message());
// Assert.assertTrue(fileInfo.isIsFolder());
// }
// @Test
// public void mkdirTest() throws IOException, AlluxioException {
// String qualifiedPath =
// "tachyon://" + mLocalAlluxioCluster.getMasterHostname() + ":"
// + mLocalAlluxioCluster.getMasterPort() + "/root/testFile1";
// InterpreterResult output = tachyonInterpreter.interpret("mkdir " + qualifiedPath, null);
// TachyonFile tFile = mTfs.open(new AlluxioURI("/root/testFile1"));
// FileInfo fileInfo = mTfs.getInfo(tFile);
// Assert.assertNotNull(fileInfo);
// Assert.assertEquals(
// "Successfully created directory " + qualifiedPath + "\n\n",
// output.message());
// Assert.assertTrue(fileInfo.isIsFolder());
// }
private File generateFileContent(String path, byte[] toWrite)
throws IOException, FileNotFoundException {
File testFile = new File(mLocalAlluxioCluster.getAlluxioHome() + path);
testFile.createNewFile();
FileOutputStream fos = new FileOutputStream(testFile);
fos.write(toWrite);
fos.close();
return testFile;
}
// private byte[] readContent(TachyonFile tFile, int length) throws IOException, AlluxioException {
// InStreamOptions options =
// new InStreamOptions.Builder(new Configuration()).setAlluxioStorageType(
// AlluxioStorageType.NO_STORE).build();
// FileInStream tfis = mTfs.getInStream(tFile, options);
// byte[] read = new byte[length];
// tfis.read(read);
// return read;
// }
private void fileReadTest(String fileName, int size) throws IOException {
File testFile = new File(PathUtils.concatPath(mLocalAlluxioCluster.getAlluxioHome(), fileName));
FileInputStream fis = new FileInputStream(testFile);
byte[] read = new byte[size];
fis.read(read);
fis.close();
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(size, read));
}
}

View file

@ -138,7 +138,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.tachyon.TachyonInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

View file

@ -53,7 +53,7 @@
<li><a href="{{BASE_PATH}}/interpreter/scalding.html">Scalding</a></li>
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Shell</a></li>
<li><a href="{{BASE_PATH}}/interpreter/spark.html">Spark</a></li>
<li><a href="{{BASE_PATH}}/interpreter/tachyon.html">Tachyon</a></li>
<li><a href="{{BASE_PATH}}/interpreter/alluxio.html">Alluxio</a></li>
<li><a href="{{BASE_PATH}}/pleasecontribute.html">Tajo</a></li>
<li role="separator" class="divider"></li>
<li><a href="{{BASE_PATH}}/manual/dynamicinterpreterload.html">Dynamic Interpreter Loading</a></li>

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 43 KiB

View file

@ -1,13 +1,13 @@
---
layout: page
title: "Tachyon Interpreter"
description: "Tachyon Interpreter"
title: "Alluxio Interpreter"
description: "Alluxio Interpreter"
group: manual
---
{% include JB/setup %}
## Tachyon Interpreter for Apache Zeppelin
[Tachyon](http://tachyon-project.org/) is a memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster frameworks.
## Alluxio Interpreter for Apache Zeppelin
[Alluxio](http://alluxio.org/) is a memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster frameworks.
## Configuration
<table class="table-configuration">
@ -17,32 +17,32 @@ group: manual
<th>Description</th>
</tr>
<tr>
<td>tachyon.master.hostname</td>
<td>alluxio.master.hostname</td>
<td>localhost</td>
<td>Tachyon master hostname</td>
<td>Alluxio master hostname</td>
</tr>
<tr>
<td>tachyon.master.port</td>
<td>alluxio.master.port</td>
<td>19998</td>
<td>Tachyon master port</td>
<td>Alluxio master port</td>
</tr>
</table>
## Enabling Tachyon Interpreter
In a notebook, to enable the **Tachyon** interpreter, click on the **Gear** icon and select **Tachyon**.
## Enabling Alluxio Interpreter
In a notebook, to enable the **Alluxio** interpreter, click on the **Gear** icon and select **Alluxio**.
## Using the Tachyon Interpreter
In a paragraph, use `%tachyon` to select the **Tachyon** interpreter and then input all commands.
## Using the Alluxio Interpreter
In a paragraph, use `%alluxio` to select the **Alluxio** interpreter and then input all commands.
```bash
%tachyon
%alluxio
help
```
> **Tip :** Use ( Ctrl + . ) for autocompletion.
## Interpreter Commands
The **Tachyon** interpreter accepts the following commands.
The **Alluxio** interpreter accepts the following commands.
<center>
<table class="table-configuration">
@ -51,17 +51,36 @@ The **Tachyon** interpreter accepts the following commands.
<th>Syntax</th>
<th>Description</th>
</tr>
<tr>
<td>cat</td>
<td>cat "path"</td>
<td>Print the content of the file to the console.</td>
</tr>
<tr>
<td>chgrp</td>
<td>chgrp "group" "path"</td>
<td>Change the group of the directory or file.</td>
</tr>
<tr>
<td>chmod</td>
<td>chmod "permission" "path"</td>
<td>Change the permission of the directory or file.</td>
</tr>
<tr>
<td>chown</td>
<td>chown "owner" "path"</td>
<td>Change the owner of the directory or file.</td>
</tr>
<tr>
<td>copyFromLocal</td>
<td>copyFromLocal "source path" "remote path"</td>
<td>Copy the specified file specified by "source path" to the path specified by "remote path".
<td>Copy the specified file specified by "source path" to the path specified by "remote path".
This command will fail if "remote path" already exists.</td>
</tr>
<tr>
<td>copyToLocal</td>
<td>copyToLocal "remote path" "local path"</td>
<td>Copy the specified file from the path specified by "remote source" to a local
destination.</td>
<td>Copy the specified file from the path specified by "remote path" to a local destination.</td>
</tr>
<tr>
<td>count</td>
@ -74,35 +93,35 @@ The **Tachyon** interpreter accepts the following commands.
<td>Display the size of a file or a directory specified by the input path.</td>
</tr>
<tr>
<td>fileinfo</td>
<td>fileinfo "path"</td>
<td>fileInfo</td>
<td>fileInfo "path"</td>
<td>Print the information of the blocks of a specified file.</td>
</tr>
<tr>
<td>free</td>
<td>free "path"</td>
<td>Free a file or all files under a directory from Tachyon. If the file/directory is also
<td>Free a file or all files under a directory from Alluxio. If the file/directory is also
in under storage, it will still be available there.</td>
</tr>
<tr>
<td>getCapacityBytes</td>
<td>getCapacityBytes</td>
<td>Get the capacity of the TachyonFS.</td>
<td>Get the capacity of the AlluxioFS.</td>
</tr>
<tr>
<td>getUsedBytes</td>
<td>getUsedBytes</td>
<td>Get number of bytes used in the TachyonFS.</td>
<td>Get number of bytes used in the AlluxioFS.</td>
</tr>
<tr>
<td>load</td>
<td>load "path"</td>
<td>Load the data of a file or a directory from under storage into Tachyon.</td>
<td>Load the data of a file or a directory from under storage into Alluxio.</td>
</tr>
<tr>
<td>loadMetadata</td>
<td>loadMetadata "path"</td>
<td>Load the metadata of a file or a directory from under storage into Tachyon.</td>
<td>Load the metadata of a file or a directory from under storage into Alluxio.</td>
</tr>
<tr>
<td>location</td>
@ -115,24 +134,19 @@ The **Tachyon** interpreter accepts the following commands.
<td>List all the files and directories directly under the given path with information such as
size.</td>
</tr>
<tr>
<td>lsr</td>
<td>lsr "path"</td>
<td>Recursively list all the files and directories under the given path with information such
as size.</td>
</tr>
<tr>
<td>mkdir</td>
<td>mkdir "path"</td>
<td>Create a directory under the given path, along with any necessary parent directories. This
command will fail if the given path already exists.</td>
<td>mkdir "path1" ... "pathn"</td>
<td>Create directory(ies) under the given paths, along with any necessary parent directories.
Multiple paths separated by spaces or tabs. This command will fail if any of the given paths
already exist.</td>
</tr>
<tr>
<td>mount</td>
<td>mount "path" "uri"</td>
<td>Mount the underlying file system path "uri" into the Tachyon namespace as "path". The "path"
<td>Mount the underlying file system path "uri" into the Alluxio namespace as "path". The "path"
is assumed not to exist and is created by the operation. No data or metadata is loaded from under
storage into Tachyon. After a path is mounted, operations on objects under the mounted path are
storage into Alluxio. After a path is mounted, operations on objects under the mounted path are
mirror to the mounted under storage.</td>
</tr>
<tr>
@ -141,6 +155,11 @@ The **Tachyon** interpreter accepts the following commands.
<td>Move a file or directory specified by "source" to a new location "destination". This command
will fail if "destination" already exists.</td>
</tr>
<tr>
<td>persist</td>
<td>persist "path"</td>
<td>Persist a file or directory currently stored only in Alluxio to the underlying file system.</td>
</tr>
<tr>
<td>pin</td>
<td>pin "path"</td>
@ -152,22 +171,15 @@ The **Tachyon** interpreter accepts the following commands.
<td>report "path"</td>
<td>Report to the master that a file is lost.</td>
</tr>
<tr>
<td>request</td>
<td>request "path" "dependency ID"</td>
<td>Request the file for a given dependency ID.</td>
</tr>
<tr>
<td>rm</td>
<td>rm "path"</td>
<td>Remove a file. This command will fail if the given path is a directory rather than a
file.</td>
<td>Remove a file. This command will fail if the given path is a directory rather than a file.</td>
</tr>
<tr>
<td>rmr</td>
<td>rmr "path"</td>
<td>Remove a file, or a directory with all the files and sub-directories that this directory
contains.</td>
<td>setTtl</td>
<td>setTtl "time"</td>
<td>Set the TTL (time to live) in milliseconds to a file.</td>
</tr>
<tr>
<td>tail</td>
@ -182,35 +194,40 @@ The **Tachyon** interpreter accepts the following commands.
<tr>
<td>unmount</td>
<td>unmount "path"</td>
<td>Unmount the underlying file system path mounted in the Tachyon namespace as "path". Tachyon
objects under "path" are removed from Tachyon, but they still exist in the previously mounted
<td>Unmount the underlying file system path mounted in the Alluxio namespace as "path". Alluxio
objects under "path" are removed from Alluxio, but they still exist in the previously mounted
under storage.</td>
</tr>
<tr>
<td>unpin</td>
<td>unpin "path"</td>
<td>Unpin the given file to allow Tachyon to evict this file again. If the given path is a
<td>Unpin the given file to allow Alluxio to evict this file again. If the given path is a
directory, it recursively unpins all files contained and any new files created within this
directory.</td>
</tr>
<tr>
<td>unsetTtl</td>
<td>unsetTtl</td>
<td>Remove the TTL (time to live) setting from a file.</td>
</tr>
</table>
</center>
## How to test it's working
Be sure to have configured correctly the Tachyon interpreter, then open a new paragraph and type one of the above commands.
Be sure to have configured correctly the Alluxio interpreter, then open a new paragraph and type one of the above commands.
Below a simple example to show how to interact with Tachyon interpreter.
Below a simple example to show how to interact with Alluxio interpreter.
Following steps are performed:
* using sh interpreter a new text file is created on local machine
* using Tachyon interpreter:
* is listed the content of the tfs (Tachyon File System) root
* the file previously created is copied to tfs
* is listed again the content of the tfs root to check the existence of the new copied file
* using Alluxio interpreter:
* is listed the content of the afs (Alluxio File System) root
* the file previously created is copied to afs
* is listed again the content of the afs root to check the existence of the new copied file
* is showed the content of the copied file (using the tail command)
* the file previously copied to tfs is copied to local machine
* using sh interpreter it's checked the existence of the new file copied from Tachyon and its content is showed
* the file previously copied to afs is copied to local machine
* using sh interpreter it's checked the existence of the new file copied from Alluxio and its content is showed
<center>
![Tachyon Interpreter Example](../assets/themes/zeppelin/img/docs-img/tachyon-example.png)
![Alluxio Interpreter Example](../assets/themes/zeppelin/img/docs-img/alluxio-example.png)
</center>

View file

@ -104,7 +104,7 @@
<module>lens</module>
<module>cassandra</module>
<module>elasticsearch</module>
<module>tachyon</module>
<module>alluxio</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-distribution</module>

View file

@ -1,251 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.tachyon;
import java.io.IOException;
import java.io.PrintStream;
import java.io.ByteArrayOutputStream;
import java.util.*;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tachyon.conf.TachyonConf;
import tachyon.shell.TfsShell;
/**
* Tachyon interpreter for Zeppelin.
*/
public class TachyonInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(TachyonInterpreter.class);
protected static final String TACHYON_MASTER_HOSTNAME = "tachyon.master.hostname";
protected static final String TACHYON_MASTER_PORT = "tachyon.master.port";
private TfsShell tfs;
private int totalCommands = 0;
private int completedCommands = 0;
private final String tachyonMasterHostname;
private final String tachyonMasterPort;
protected final List<String> keywords = Arrays.asList("cat", "copyFromLocal",
"copyToLocal", "count", "du", "fileinfo", "free", "getUsedBytes",
"getCapacityBytes", "load", "loadMetadata", "location", "ls", "lsr",
"mkdir", "mount", "mv", "pin", "report", "request", "rm", "rmr",
"setTTL", "unsetTTL", "tail", "touch", "unmount", "unpin");
public TachyonInterpreter(Properties property) {
super(property);
tachyonMasterHostname = property.getProperty(TACHYON_MASTER_HOSTNAME);
tachyonMasterPort = property.getProperty(TACHYON_MASTER_PORT);
}
static {
Interpreter.register("tachyon", "tachyon",
TachyonInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add(TACHYON_MASTER_HOSTNAME, "localhost", "Tachyon master hostname")
.add(TACHYON_MASTER_PORT, "19998", "Tachyon master port")
.build());
}
@Override
public void open() {
logger.info("Starting Tachyon shell to connect to " + tachyonMasterHostname +
" on port " + tachyonMasterPort);
System.setProperty(TACHYON_MASTER_HOSTNAME, tachyonMasterHostname);
System.setProperty(TACHYON_MASTER_PORT, tachyonMasterPort);
tfs = new TfsShell(new TachyonConf());
}
@Override
public void close() {
logger.info("Closing Tachyon shell");
try {
tfs.close();
} catch (IOException e) {
logger.error("Cannot close connection", e);
}
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String[] lines = splitAndRemoveEmpty(st, "\n");
return interpret(lines, context);
}
private InterpreterResult interpret(String[] commands, InterpreterContext context) {
boolean isSuccess = true;
totalCommands = commands.length;
completedCommands = 0;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
PrintStream old = System.out;
System.setOut(ps);
for (String command : commands) {
int commandResuld = 1;
String[] args = splitAndRemoveEmpty(command, " ");
if (args.length > 0 && args[0].equals("help")) {
System.out.println(getCommandList());
} else {
commandResuld = tfs.run(args);
}
if (commandResuld != 0) {
isSuccess = false;
break;
} else {
completedCommands += 1;
}
System.out.println();
}
System.out.flush();
System.setOut(old);
if (isSuccess) {
return new InterpreterResult(Code.SUCCESS, baos.toString());
} else {
return new InterpreterResult(Code.ERROR, baos.toString());
}
}
private String[] splitAndRemoveEmpty(String st, String splitSeparator) {
String[] voices = st.split(splitSeparator);
ArrayList<String> result = new ArrayList<String>();
for (String voice : voices) {
if (!voice.trim().isEmpty()) {
result.add(voice);
}
}
return result.toArray(new String[result.size()]);
}
private String[] splitAndRemoveEmpty(String[] sts, String splitSeparator) {
ArrayList<String> result = new ArrayList<String>();
for (String st : sts) {
result.addAll(Arrays.asList(splitAndRemoveEmpty(st, splitSeparator)));
}
return result.toArray(new String[result.size()]);
}
@Override
public void cancel(InterpreterContext context) { }
@Override
public FormType getFormType() {
return FormType.NATIVE;
}
@Override
public int getProgress(InterpreterContext context) {
return completedCommands * 100 / totalCommands;
}
@Override
public List<String> completion(String buf, int cursor) {
String[] words = splitAndRemoveEmpty(splitAndRemoveEmpty(buf, "\n"), " ");
String lastWord = "";
if (words.length > 0) {
lastWord = words[ words.length - 1 ];
}
ArrayList<String> voices = new ArrayList<String>();
for (String command : keywords) {
if (command.startsWith(lastWord)) {
voices.add(command);
}
}
return voices;
}
private String getCommandList() {
StringBuilder sb = new StringBuilder();
sb.append("Commands list:");
sb.append("\n\t[help] - List all available commands.");
sb.append("\n\t[cat <path>] - Print the content of the file to the console.");
sb.append("\n\t[copyFromLocal <src> <remoteDst>] - Copy the specified file specified " +
"by \"source path\" to the path specified by \"remote path\". " +
"This command will fail if \"remote path\" already exists.");
sb.append("\n\t[copyToLocal <src> <localDst>] - Copy the specified file from the path " +
"specified by \"remote source\" to a local destination.");
sb.append("\n\t[count <path>] - Display the number of folders and files matching " +
"the specified prefix in \"path\".");
sb.append("\n\t[du <path>] - Display the size of a file or a directory specified " +
"by the input path.");
sb.append("\n\t[fileinfo <path>] - Print the information of the blocks of a specified file.");
sb.append("\n\t[free <file path|folder path>] - Free a file or all files under a " +
"directory from Tachyon. If the file/directory is also in under storage, " +
"it will still be available there.");
sb.append("\n\t[getUsedBytes] - Get number of bytes used in the TachyonFS.");
sb.append("\n\t[getCapacityBytes] - Get the capacity of the TachyonFS.");
sb.append("\n\t[load <path>] - Load the data of a file or a directory from under " +
"storage into Tachyon.");
sb.append("\n\t[loadMetadata <path>] - Load the metadata of a file or a directory " +
"from under storage into Tachyon.");
sb.append("\n\t[location <path>] - Display a list of hosts that have the file data.");
sb.append("\n\t[ls <path>] - List all the files and directories directly under the " +
"given path with information such as size.");
sb.append("\n\t[lsr <path>] - Recursively list all the files and directories under " +
"the given path with information such as size.");
sb.append("\n\t[mkdir <path>] - Create a directory under the given path, along with " +
"any necessary parent directories. This command will fail if the given " +
"path already exists.");
sb.append("\n\t[mount <tachyonPath> <ufsURI>] - Mount the underlying file system " +
"path \"uri\" into the Tachyon namespace as \"path\". The \"path\" is assumed " +
"not to exist and is created by the operation. No data or metadata is loaded " +
"from under storage into Tachyon. After a path is mounted, operations on objects " +
"under the mounted path are mirror to the mounted under storage.");
sb.append("\n\t[mv <src> <dst>] - Move a file or directory specified by \"source\" " +
"to a new location \"destination\". This command will fail if " +
"\"destination\" already exists.");
sb.append("\n\t[pin <path>] - Pin the given file to avoid evicting it from memory. " +
"If the given path is a directory, it recursively pins all the files contained " +
"and any new files created within this directory.");
sb.append("\n\t[report <path>] - Report to the master that a file is lost.");
sb.append("\n\t[request <tachyonaddress> <dependencyId>] - Request the file for " +
"a given dependency ID.");
sb.append("\n\t[rm <path>] - Remove a file. This command will fail if the given " +
"path is a directory rather than a file.");
sb.append("\n\t[rmr <path>] - Remove a file, or a directory with all the files and " +
"sub-directories that this directory contains.");
sb.append("\n\t[tail <path>] - Print the last 1KB of the specified file to the console.");
sb.append("\n\t[touch <path>] - Create a 0-byte file at the specified location.");
sb.append("\n\t[unmount <tachyonPath>] - Unmount the underlying file system path " +
"mounted in the Tachyon namespace as \"path\". Tachyon objects under \"path\" " +
"are removed from Tachyon, but they still exist in the previously " +
"mounted under storage.");
sb.append("\n\t[unpin <path>] - Unpin the given file to allow Tachyon to evict " +
"this file again. If the given path is a directory, it recursively unpins " +
"all files contained and any new files created within this directory.");
return sb.toString();
}
}

View file

@ -1,484 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.tachyon;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.junit.*;
import tachyon.Constants;
import tachyon.TachyonURI;
import tachyon.client.TachyonFSTestUtils;
import tachyon.client.TachyonStorageType;
import tachyon.client.UnderStorageType;
import tachyon.client.file.FileInStream;
import tachyon.client.file.TachyonFile;
import tachyon.client.file.TachyonFileSystem;
import tachyon.client.file.options.InStreamOptions;
import tachyon.conf.TachyonConf;
import tachyon.exception.ExceptionMessage;
import tachyon.exception.TachyonException;
import tachyon.master.LocalTachyonCluster;
import tachyon.shell.TfsShell;
import tachyon.thrift.FileInfo;
import tachyon.util.FormatUtils;
import tachyon.util.io.BufferUtils;
import tachyon.util.io.PathUtils;
public class TachyonInterpreterTest {
private TachyonInterpreter tachyonInterpreter;
private static final int SIZE_BYTES = Constants.MB * 10;
private LocalTachyonCluster mLocalTachyonCluster = null;
private TachyonFileSystem mTfs = null;
@After
public final void after() throws Exception {
if (tachyonInterpreter != null) {
tachyonInterpreter.close();
}
mLocalTachyonCluster.stop();
}
@Before
public final void before() throws Exception {
mLocalTachyonCluster = new LocalTachyonCluster(SIZE_BYTES, 1000, Constants.GB);
mLocalTachyonCluster.start();
mTfs = mLocalTachyonCluster.getClient();
final Properties props = new Properties();
props.put(TachyonInterpreter.TACHYON_MASTER_HOSTNAME, mLocalTachyonCluster.getMasterHostname());
props.put(TachyonInterpreter.TACHYON_MASTER_PORT, mLocalTachyonCluster.getMasterPort() + "");
tachyonInterpreter = new TachyonInterpreter(props);
tachyonInterpreter.open();
}
@Test
public void testCompletion() {
List<String> expectedResultOne = Arrays.asList("cat", "copyFromLocal",
"copyToLocal", "count");
List<String> expectedResultTwo = Arrays.asList("copyFromLocal",
"copyToLocal", "count");
List<String> expectedResultThree = Arrays.asList("copyFromLocal", "copyToLocal");
List<String> expectedResultNone = new ArrayList<String>();
List<String> resultOne = tachyonInterpreter.completion("c", 0);
List<String> resultTwo = tachyonInterpreter.completion("co", 0);
List<String> resultThree = tachyonInterpreter.completion("copy", 0);
List<String> resultNotMatch = tachyonInterpreter.completion("notMatch", 0);
List<String> resultAll = tachyonInterpreter.completion("", 0);
Assert.assertEquals(expectedResultOne, resultOne);
Assert.assertEquals(expectedResultTwo, resultTwo);
Assert.assertEquals(expectedResultThree, resultThree);
Assert.assertEquals(expectedResultNone, resultNotMatch);
Assert.assertEquals(tachyonInterpreter.keywords, resultAll);
}
@Test
public void catDirectoryTest() throws IOException {
String expected = "Successfully created directory /testDir\n\n" +
"/testDir is not a file.\n";
InterpreterResult output = tachyonInterpreter.interpret("mkdir /testDir" +
"\ncat /testDir", null);
Assert.assertEquals(Code.ERROR, output.code());
Assert.assertEquals(expected, output.message());
}
@Test
public void catNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("cat /testFile", null);
Assert.assertEquals(Code.ERROR, output.code());
}
@Test
public void catTest() throws IOException {
TachyonFSTestUtils.createByteFile(mTfs, "/testFile", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 10);
InterpreterResult output = tachyonInterpreter.interpret("cat /testFile", null);
byte[] expected = BufferUtils.getIncreasingByteArray(10);
Assert.assertEquals(Code.SUCCESS, output.code());
Assert.assertArrayEquals(expected,
output.message().substring(0, output.message().length() - 1).getBytes());
}
@Test
public void copyFromLocalLargeTest() throws IOException, TachyonException {
File testFile = new File(mLocalTachyonCluster.getTachyonHome() + "/testFile");
testFile.createNewFile();
FileOutputStream fos = new FileOutputStream(testFile);
byte[] toWrite = BufferUtils.getIncreasingByteArray(SIZE_BYTES);
fos.write(toWrite);
fos.close();
InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
testFile.getAbsolutePath() + " /testFile", null);
Assert.assertEquals(
"Copied " + testFile.getAbsolutePath() + " to /testFile\n\n",
output.message());
TachyonFile tFile = mTfs.open(new TachyonURI("/testFile"));
FileInfo fileInfo = mTfs.getInfo(tFile);
Assert.assertNotNull(fileInfo);
Assert.assertEquals(SIZE_BYTES, fileInfo.length);
InStreamOptions options =
new InStreamOptions.Builder(new TachyonConf()).setTachyonStorageType(
TachyonStorageType.NO_STORE).build();
FileInStream tfis = mTfs.getInStream(tFile, options);
byte[] read = new byte[SIZE_BYTES];
tfis.read(read);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(SIZE_BYTES, read));
}
@Test
public void loadFileTest() throws IOException, TachyonException {
TachyonFile file =
TachyonFSTestUtils.createByteFile(mTfs, "/testFile", TachyonStorageType.NO_STORE,
UnderStorageType.SYNC_PERSIST, 10);
FileInfo fileInfo = mTfs.getInfo(file);
Assert.assertFalse(fileInfo.getInMemoryPercentage() == 100);
tachyonInterpreter.interpret("load /testFile", null);
fileInfo = mTfs.getInfo(file);
Assert.assertTrue(fileInfo.getInMemoryPercentage() == 100);
}
@Test
public void loadDirTest() throws IOException, TachyonException {
TachyonFile fileA = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
TachyonStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 10);
TachyonFile fileB = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileB",
TachyonStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
FileInfo fileInfoA = mTfs.getInfo(fileA);
FileInfo fileInfoB = mTfs.getInfo(fileB);
Assert.assertFalse(fileInfoA.getInMemoryPercentage() == 100);
Assert.assertTrue(fileInfoB.getInMemoryPercentage() == 100);
tachyonInterpreter.interpret("load /testRoot", null);
fileInfoA = mTfs.getInfo(fileA);
fileInfoB = mTfs.getInfo(fileB);
Assert.assertTrue(fileInfoA.getInMemoryPercentage() == 100);
Assert.assertTrue(fileInfoB.getInMemoryPercentage() == 100);
}
@Test
public void copyFromLocalTest() throws IOException, TachyonException {
File testDir = new File(mLocalTachyonCluster.getTachyonHome() + "/testDir");
testDir.mkdir();
File testDirInner = new File(mLocalTachyonCluster.getTachyonHome() + "/testDir/testDirInner");
testDirInner.mkdir();
File testFile =
generateFileContent("/testDir/testFile", BufferUtils.getIncreasingByteArray(10));
generateFileContent("/testDir/testDirInner/testFile2",
BufferUtils.getIncreasingByteArray(10, 20));
InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
testFile.getParent() + " /testDir", null);
Assert.assertEquals(
"Copied " + testFile.getParent() + " to /testDir\n\n",
output.message());
TachyonFile file1 = mTfs.open(new TachyonURI("/testDir/testFile"));
TachyonFile file2 = mTfs.open(new TachyonURI("/testDir/testDirInner/testFile2"));
FileInfo fileInfo1 = mTfs.getInfo(file1);
FileInfo fileInfo2 = mTfs.getInfo(file2);
Assert.assertNotNull(fileInfo1);
Assert.assertNotNull(fileInfo2);
Assert.assertEquals(10, fileInfo1.length);
Assert.assertEquals(20, fileInfo2.length);
byte[] read = readContent(file1, 10);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, read));
read = readContent(file2, 20);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, 20, read));
}
@Test
public void copyFromLocalTestWithFullURI() throws IOException, TachyonException {
File testFile = generateFileContent("/srcFileURI", BufferUtils.getIncreasingByteArray(10));
String tachyonURI = "tachyon://" + mLocalTachyonCluster.getMasterHostname() + ":"
+ mLocalTachyonCluster.getMasterPort() + "/destFileURI";
InterpreterResult output = tachyonInterpreter.interpret("copyFromLocal " +
testFile.getPath() + " " + tachyonURI, null);
Assert.assertEquals(
"Copied " + testFile.getPath() + " to " + tachyonURI + "\n\n",
output.message());
TachyonFile file = mTfs.open(new TachyonURI("/destFileURI"));
FileInfo fileInfo = mTfs.getInfo(file);
Assert.assertEquals(10L, fileInfo.length);
byte[] read = readContent(file, 10);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(10, read));
}
@Test
public void copyFromLocalFileToDstPathTest() throws IOException, TachyonException {
String dataString = "copyFromLocalFileToDstPathTest";
byte[] data = dataString.getBytes();
File localDir = new File(mLocalTachyonCluster.getTachyonHome() + "/localDir");
localDir.mkdir();
File localFile = generateFileContent("/localDir/testFile", data);
tachyonInterpreter.interpret("mkdir /dstDir", null);
tachyonInterpreter.interpret("copyFromLocal " + localFile.getPath() + " /dstDir", null);
TachyonFile file = mTfs.open(new TachyonURI("/dstDir/testFile"));
FileInfo fileInfo = mTfs.getInfo(file);
Assert.assertNotNull(fileInfo);
byte[] read = readContent(file, data.length);
Assert.assertEquals(new String(read), dataString);
}
@Test
public void copyToLocalLargeTest() throws IOException {
copyToLocalWithBytes(SIZE_BYTES);
}
@Test
public void copyToLocalTest() throws IOException {
copyToLocalWithBytes(10);
}
private void copyToLocalWithBytes(int bytes) throws IOException {
TachyonFSTestUtils.createByteFile(mTfs, "/testFile", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, bytes);
InterpreterResult output = tachyonInterpreter.interpret("copyToLocal /testFile " +
mLocalTachyonCluster.getTachyonHome() + "/testFile", null);
Assert.assertEquals(
"Copied /testFile to " + mLocalTachyonCluster.getTachyonHome() + "/testFile\n\n",
output.message());
fileReadTest("/testFile", 10);
}
@Test
public void countNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("count /NotExistFile", null);
Assert.assertEquals(Code.ERROR, output.code());
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
output.message());
}
@Test
public void countTest() throws IOException {
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 10);
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 20);
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileB", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 30);
InterpreterResult output = tachyonInterpreter.interpret("count /testRoot", null);
String expected = "";
String format = "%-25s%-25s%-15s\n";
expected += String.format(format, "File Count", "Folder Count", "Total Bytes");
expected += String.format(format, 3, 2, 60);
expected += "\n";
Assert.assertEquals(expected, output.message());
}
@Test
public void fileinfoNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("fileinfo /NotExistFile", null);
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
output.message());
Assert.assertEquals(Code.ERROR, output.code());
}
@Test
public void locationNotExistTest() throws IOException {
InterpreterResult output = tachyonInterpreter.interpret("location /NotExistFile", null);
Assert.assertEquals(ExceptionMessage.PATH_DOES_NOT_EXIST.getMessage("/NotExistFile") + "\n",
output.message());
Assert.assertEquals(Code.ERROR, output.code());
}
@Test
public void lsTest() throws IOException, TachyonException {
FileInfo[] files = new FileInfo[3];
TachyonFile fileA = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
TachyonStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
files[0] = mTfs.getInfo(fileA);
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 20);
files[1] = mTfs.getInfo(mTfs.open(new TachyonURI("/testRoot/testDir")));
TachyonFile fileC = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileC",
TachyonStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 30);
files[2] = mTfs.getInfo(fileC);
InterpreterResult output = tachyonInterpreter.interpret("ls /testRoot", null);
String expected = "";
String format = "%-10s%-25s%-15s%-5s\n";
expected += String.format(format, FormatUtils.getSizeFromBytes(10),
TfsShell.convertMsToDate(files[0].getCreationTimeMs()), "In Memory", "/testRoot/testFileA");
expected += String.format(format, FormatUtils.getSizeFromBytes(0),
TfsShell.convertMsToDate(files[1].getCreationTimeMs()), "", "/testRoot/testDir");
expected += String.format(format, FormatUtils.getSizeFromBytes(30),
TfsShell.convertMsToDate(files[2].getCreationTimeMs()), "Not In Memory",
"/testRoot/testFileC");
expected += "\n";
Assert.assertEquals(Code.SUCCESS, output.code());
Assert.assertEquals(expected, output.message());
}
@Test
public void lsrTest() throws IOException, TachyonException {
FileInfo[] files = new FileInfo[4];
TachyonFile fileA = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileA",
TachyonStorageType.STORE, UnderStorageType.NO_PERSIST, 10);
files[0] = mTfs.getInfo(fileA);
TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testDir/testFileB", TachyonStorageType.STORE,
UnderStorageType.NO_PERSIST, 20);
files[1] = mTfs.getInfo(mTfs.open(new TachyonURI("/testRoot/testDir")));
files[2] = mTfs.getInfo(mTfs.open(new TachyonURI("/testRoot/testDir/testFileB")));
TachyonFile fileC = TachyonFSTestUtils.createByteFile(mTfs, "/testRoot/testFileC",
TachyonStorageType.NO_STORE, UnderStorageType.SYNC_PERSIST, 30);
files[3] = mTfs.getInfo(fileC);
InterpreterResult output = tachyonInterpreter.interpret("lsr /testRoot", null);
String expected = "";
String format = "%-10s%-25s%-15s%-5s\n";
expected +=
String.format(format, FormatUtils.getSizeFromBytes(10),
TfsShell.convertMsToDate(files[0].getCreationTimeMs()), "In Memory",
"/testRoot/testFileA");
expected +=
String.format(format, FormatUtils.getSizeFromBytes(0),
TfsShell.convertMsToDate(files[1].getCreationTimeMs()), "", "/testRoot/testDir");
expected +=
String.format(format, FormatUtils.getSizeFromBytes(20),
TfsShell.convertMsToDate(files[2].getCreationTimeMs()), "In Memory",
"/testRoot/testDir/testFileB");
expected +=
String.format(format, FormatUtils.getSizeFromBytes(30),
TfsShell.convertMsToDate(files[3].getCreationTimeMs()), "Not In Memory",
"/testRoot/testFileC");
expected += "\n";
Assert.assertEquals(expected, output.message());
}
@Test
public void mkdirComplexPathTest() throws IOException, TachyonException {
InterpreterResult output = tachyonInterpreter.interpret(
"mkdir /Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File", null);
TachyonFile tFile = mTfs.open(new TachyonURI("/Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File"));
FileInfo fileInfo = mTfs.getInfo(tFile);
Assert.assertNotNull(fileInfo);
Assert.assertEquals(
"Successfully created directory /Complex!@#$%^&*()-_=+[]{};\"'<>,.?/File\n\n",
output.message());
Assert.assertTrue(fileInfo.isIsFolder());
}
@Test
public void mkdirExistingTest() throws IOException {
String command = "mkdir /festFile1";
Assert.assertEquals(Code.SUCCESS, tachyonInterpreter.interpret(command, null).code());
Assert.assertEquals(Code.SUCCESS, tachyonInterpreter.interpret(command, null).code());
}
@Test
public void mkdirInvalidPathTest() throws IOException {
Assert.assertEquals(
Code.ERROR,
tachyonInterpreter.interpret("mkdir /test File Invalid Path", null).code());
}
@Test
public void mkdirShortPathTest() throws IOException, TachyonException {
InterpreterResult output = tachyonInterpreter.interpret("mkdir /root/testFile1", null);
TachyonFile tFile = mTfs.open(new TachyonURI("/root/testFile1"));
FileInfo fileInfo = mTfs.getInfo(tFile);
Assert.assertNotNull(fileInfo);
Assert.assertEquals(
"Successfully created directory /root/testFile1\n\n",
output.message());
Assert.assertTrue(fileInfo.isIsFolder());
}
@Test
public void mkdirTest() throws IOException, TachyonException {
String qualifiedPath =
"tachyon://" + mLocalTachyonCluster.getMasterHostname() + ":"
+ mLocalTachyonCluster.getMasterPort() + "/root/testFile1";
InterpreterResult output = tachyonInterpreter.interpret("mkdir " + qualifiedPath, null);
TachyonFile tFile = mTfs.open(new TachyonURI("/root/testFile1"));
FileInfo fileInfo = mTfs.getInfo(tFile);
Assert.assertNotNull(fileInfo);
Assert.assertEquals(
"Successfully created directory " + qualifiedPath + "\n\n",
output.message());
Assert.assertTrue(fileInfo.isIsFolder());
}
private File generateFileContent(String path, byte[] toWrite)
throws IOException, FileNotFoundException {
File testFile = new File(mLocalTachyonCluster.getTachyonHome() + path);
testFile.createNewFile();
FileOutputStream fos = new FileOutputStream(testFile);
fos.write(toWrite);
fos.close();
return testFile;
}
private byte[] readContent(TachyonFile tFile, int length) throws IOException, TachyonException {
InStreamOptions options =
new InStreamOptions.Builder(new TachyonConf()).setTachyonStorageType(
TachyonStorageType.NO_STORE).build();
FileInStream tfis = mTfs.getInStream(tFile, options);
byte[] read = new byte[length];
tfis.read(read);
return read;
}
private void fileReadTest(String fileName, int size) throws IOException {
File testFile = new File(PathUtils.concatPath(mLocalTachyonCluster.getTachyonHome(), fileName));
FileInputStream fis = new FileInputStream(testFile);
byte[] read = new byte[size];
fis.read(read);
fis.close();
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(size, read));
}
}

View file

@ -96,10 +96,10 @@ The following components are provided under Apache License.
(Apache 2.0) Shiro Web (org.apache.shiro:shiro-web:1.2.3 - https://shiro.apache.org)
(Apache 2.0) SnakeYAML (org.yaml:snakeyaml:1.15 - http://www.snakeyaml.org)
(Apache 2.0) Protocol Buffers (com.google.protobuf:protobuf-java:2.4.1 - https://github.com/google/protobuf/releases)
(Apache 2.0) Tachyon Shell (org.tachyonproject:tachyon-shell:0.8.2 - http://tachyon-project.org)
(Apache 2.0) Tachyon Servers (org.tachyonproject:tachyon-servers:0.8.2 - http://tachyon-project.org)
(Apache 2.0) Tachyon Minicluster (org.tachyonproject:tachyon-minicluster:0.8.2 - http://tachyon-project.org)
(Apache 2.0) Tachyon Underfs Local (org.tachyonproject:tachyon-underfs-local:0.8.2 - http://tachyon-project.org)
(Apache 2.0) Alluxio Shell (org.alluxio:alluxio-shell:1.0.0 - http://alluxio.org)
(Apache 2.0) Alluxio Servers (org.alluxio:alluxio-core-server:1.0.0 - http://alluxio.org)
(Apache 2.0) Alluxio Minicluster (org.alluxio:alluxio-minicluster:1.0.0 - http://alluxio.org)
(Apache 2.0) Alluxio Underfs Local (org.alluxio:alluxio-underfs-local:1.0.0 - http://alluxio.org)
(Apache 2.0) Microsoft Azure Storage Library for Java (com.microsoft.azure:azure-storage:4.0.0 - https://github.com/Azure/azure-storage-java)

View file

@ -445,7 +445,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.angular.AngularInterpreter,"
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
+ "org.apache.zeppelin.tachyon.TachyonInterpreter,"
+ "org.apache.zeppelin.alluxio.AlluxioInterpreter,"
+ "org.apache.zeppelin.phoenix.PhoenixInterpreter,"
+ "org.apache.zeppelin.postgresql.PostgreSqlInterpreter,"
+ "org.apache.zeppelin.tajo.TajoInterpreter,"