mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Merge branch 'master' of http://github.com/raj-bains/incubator-zeppelin
This commit is contained in:
commit
79f0d905b4
8 changed files with 968 additions and 1 deletions
|
|
@ -138,7 +138,7 @@
|
|||
|
||||
<property>
|
||||
<name>zeppelin.interpreters</name>
|
||||
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.tachyon.TachyonInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter</value>
|
||||
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.phoenix.PhoenixInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.tachyon.TachyonInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter</value>
|
||||
<description>Comma separated interpreter configurations. First interpreter become a default</description>
|
||||
</property>
|
||||
|
||||
|
|
|
|||
68
docs/docs/interpreter/hdfs.md
Normal file
68
docs/docs/interpreter/hdfs.md
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
---
|
||||
layout: page
|
||||
title: "HDFS File Interpreter"
|
||||
description: ""
|
||||
group: manual
|
||||
---
|
||||
{% include JB/setup %}
|
||||
|
||||
|
||||
## HDFS File Interpreter for Apache Zeppelin
|
||||
|
||||
<br/>
|
||||
<table class="table-configuration">
|
||||
<tr>
|
||||
<th>Name</th>
|
||||
<th>Class</th>
|
||||
<th>Description</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>%hdfs</td>
|
||||
<td>HDFSFileInterpreter</td>
|
||||
<td>Provides File System commands for HDFS</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
<br/>
|
||||
This interpreter connects to HDFS using the HTTP WebHDFS interface.
|
||||
It supports the basic shell file commands applied to HDFS, it currently only supports browsing
|
||||
* You can use <i>ls [PATH]</i> and <i>ls -l [PATH]</i> to list a directory. If the path is missing, then the current directory is listed.
|
||||
* You can use <i>cd [PATH]</i> to change your current directory by giving a relative or an absolute path.
|
||||
* You can invoke <i>pwd</i> to see your current directory.
|
||||
|
||||
### Create Interpreter
|
||||
|
||||
You can create the HDFS browser by pointing it to the WebHDFS interface of your Hadoop cluster.
|
||||
|
||||
### Configuration
|
||||
You can modify the configuration of HDFS from the `Interpreter` section. The HDFS interpreter express the following properties:
|
||||
|
||||
<table class="table-configuration">
|
||||
<tr>
|
||||
<th>Property Name</th>
|
||||
<th>Description</th>
|
||||
<th>Default Value</th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>hdfs.url</td>
|
||||
<td>The URL for WebHDFS</td>
|
||||
<td>http://localhost:50070/webhdfs/v1/</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>hdfs.user</td>
|
||||
<td>The WebHDFS user</td>
|
||||
<td>hdfs</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>hdfs.maxlength</td>
|
||||
<td>Maximum number of lines of results fetched</td>
|
||||
<td>1000</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
|
||||
#### WebHDFS REST API
|
||||
You can confirm that you're able to access the WebHDFS API by running a curl command against the WebHDFS end point provided to the interpreter.
|
||||
|
||||
Here is an example:
|
||||
$> curl "http://localhost:50070/webhdfs/v1/?op=LISTSTATUS"
|
||||
139
file/pom.xml
Normal file
139
file/pom.xml
Normal file
|
|
@ -0,0 +1,139 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
~ Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
~ contributor license agreements. See the NOTICE file distributed with
|
||||
~ this work for additional information regarding copyright ownership.
|
||||
~ The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
~ (the "License"); you may not use this file except in compliance with
|
||||
~ the License. You may obtain a copy of the License at
|
||||
~
|
||||
~ http://www.apache.org/licenses/LICENSE-2.0
|
||||
~
|
||||
~ Unless required by applicable law or agreed to in writing, software
|
||||
~ distributed under the License is distributed on an "AS IS" BASIS,
|
||||
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
~ See the License for the specific language governing permissions and
|
||||
~ limitations under the License.
|
||||
-->
|
||||
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>
|
||||
<artifactId>zeppelin</artifactId>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-file</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<version>0.6.0-incubating-SNAPSHOT</version>
|
||||
<name>Zeppelin: File Manager</name>
|
||||
<url>http://www.apache.org</url>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.zeppelin</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>javax.ws.rs</groupId>
|
||||
<artifactId>javax.ws.rs-api</artifactId>
|
||||
<version>2.0</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-log4j12</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-deploy-plugin</artifactId>
|
||||
<version>2.7</version>
|
||||
<configuration>
|
||||
<skip>true</skip>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
<version>2.18.1</version>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-enforcer-plugin</artifactId>
|
||||
<version>1.3.1</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>enforce</id>
|
||||
<phase>none</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<version>2.8</version>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>copy-dependencies</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy-dependencies</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/file</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<includeScope>runtime</includeScope>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>copy-artifact</id>
|
||||
<phase>package</phase>
|
||||
<goals>
|
||||
<goal>copy</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputDirectory>${project.build.directory}/../../interpreter/file</outputDirectory>
|
||||
<overWriteReleases>false</overWriteReleases>
|
||||
<overWriteSnapshots>false</overWriteSnapshots>
|
||||
<overWriteIfNewer>true</overWriteIfNewer>
|
||||
<!--<includeScope>runtime</includeScope>-->
|
||||
<artifactItems>
|
||||
<artifactItem>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>${project.artifactId}</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<type>${project.packaging}</type>
|
||||
</artifactItem>
|
||||
</artifactItems>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
</project>
|
||||
169
file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java
Normal file
169
file/src/main/java/org/apache/zeppelin/file/FileInterpreter.java
Normal file
|
|
@ -0,0 +1,169 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.file;
|
||||
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* File interpreter for Zeppelin.
|
||||
*
|
||||
*/
|
||||
public abstract class FileInterpreter extends Interpreter {
|
||||
Logger logger = LoggerFactory.getLogger(FileInterpreter.class);
|
||||
String currentDir = null;
|
||||
CommandArgs args = null;
|
||||
|
||||
public FileInterpreter(Properties property) {
|
||||
super(property);
|
||||
currentDir = new String("/");
|
||||
}
|
||||
|
||||
/**
|
||||
* Handling the arguments of the command
|
||||
*/
|
||||
public class CommandArgs {
|
||||
public String input = null;
|
||||
public String command = null;
|
||||
public ArrayList<String> args = null;
|
||||
public HashSet<Character> flags = null;
|
||||
|
||||
public CommandArgs(String cmd) {
|
||||
input = cmd;
|
||||
args = new ArrayList();
|
||||
flags = new HashSet();
|
||||
}
|
||||
|
||||
private void parseArg(String arg) {
|
||||
if (arg.charAt(0) == '-') { // handle flags
|
||||
for (int i = 0; i < arg.length(); i++) {
|
||||
Character c = arg.charAt(i);
|
||||
flags.add(c);
|
||||
}
|
||||
} else { // handle other args
|
||||
args.add(arg);
|
||||
}
|
||||
}
|
||||
|
||||
public void parseArgs() {
|
||||
if (input == null)
|
||||
return;
|
||||
StringTokenizer st = new StringTokenizer(input);
|
||||
if (st.hasMoreTokens()) {
|
||||
command = st.nextToken();
|
||||
while (st.hasMoreTokens())
|
||||
parseArg(st.nextToken());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Functions that each file system implementation must override
|
||||
|
||||
public abstract String listAll(String path);
|
||||
|
||||
public abstract boolean isDirectory(String path);
|
||||
|
||||
// Combine paths, takes care of arguments such as ..
|
||||
|
||||
private String getNewPath(String argument){
|
||||
Path arg = Paths.get(argument);
|
||||
Path ret = arg.isAbsolute() ? arg : Paths.get(currentDir, argument);
|
||||
return ret.normalize().toString();
|
||||
}
|
||||
|
||||
// Handle the command handling uniformly across all file systems
|
||||
|
||||
@Override
|
||||
public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) {
|
||||
logger.info("Run File command '" + cmd + "'");
|
||||
|
||||
args = new CommandArgs(cmd);
|
||||
args.parseArgs();
|
||||
|
||||
if (args.command == null) {
|
||||
logger.info("Error: No command");
|
||||
return new InterpreterResult(Code.ERROR, Type.TEXT, "No command");
|
||||
}
|
||||
|
||||
// Simple parsing of the command
|
||||
|
||||
if (args.command.equals("cd")) {
|
||||
|
||||
String newPath = !args.args.isEmpty() ? getNewPath(args.args.get(0)) : currentDir;
|
||||
if (!isDirectory(newPath))
|
||||
return new InterpreterResult(Code.ERROR, Type.TEXT, "Invalid Directory");
|
||||
|
||||
currentDir = newPath;
|
||||
return new InterpreterResult(Code.SUCCESS, Type.TEXT, "OK");
|
||||
|
||||
} else if (args.command.equals("ls")) {
|
||||
|
||||
String newPath = !args.args.isEmpty() ? getNewPath(args.args.get(0)) : currentDir;
|
||||
if (!isDirectory(newPath))
|
||||
return new InterpreterResult(Code.ERROR, Type.TEXT, "Invalid List Directory");
|
||||
|
||||
String results = listAll(newPath);
|
||||
return new InterpreterResult(Code.SUCCESS, Type.TEXT, results);
|
||||
|
||||
} else if (args.command.equals("pwd")) {
|
||||
|
||||
return new InterpreterResult(Code.SUCCESS, Type.TEXT, currentDir);
|
||||
|
||||
} else {
|
||||
|
||||
return new InterpreterResult(Code.ERROR, Type.TEXT, "Unknown command");
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(InterpreterContext context) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public FormType getFormType() {
|
||||
return FormType.SIMPLE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getProgress(InterpreterContext context) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler getScheduler() {
|
||||
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
|
||||
FileInterpreter.class.getName() + this.hashCode());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> completion(String buf, int cursor) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
156
file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java
Normal file
156
file/src/main/java/org/apache/zeppelin/file/HDFSCommand.java
Normal file
|
|
@ -0,0 +1,156 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.file;
|
||||
|
||||
import java.net.URL;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.InputStreamReader;
|
||||
import javax.ws.rs.core.UriBuilder;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
/**
|
||||
* Definition and HTTP invocation methods for all WebHDFS commands
|
||||
*
|
||||
*/
|
||||
public class HDFSCommand {
|
||||
|
||||
/**
|
||||
* Type of HTTP request
|
||||
*/
|
||||
public enum HttpType {
|
||||
GET,
|
||||
PUT
|
||||
}
|
||||
|
||||
/**
|
||||
* Definition of WebHDFS operator
|
||||
*/
|
||||
public class Op {
|
||||
public String op;
|
||||
public HttpType cmd;
|
||||
public int minArgs;
|
||||
|
||||
public Op(String op, HttpType cmd, int minArgs) {
|
||||
this.op = op;
|
||||
this.cmd = cmd;
|
||||
this.minArgs = minArgs;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Definition of argument to an operator
|
||||
*/
|
||||
public class Arg {
|
||||
public String key;
|
||||
public String value;
|
||||
|
||||
public Arg(String key, String value) {
|
||||
this.key = key;
|
||||
this.value = value;
|
||||
}
|
||||
}
|
||||
|
||||
// How to connect to WebHDFS
|
||||
String url = null;
|
||||
String user = null;
|
||||
int maxLength = 0;
|
||||
Logger logger;
|
||||
|
||||
// Define all the commands available
|
||||
public Op getFileStatus = new Op("GETFILESTATUS", HttpType.GET, 0);
|
||||
public Op listStatus = new Op("LISTSTATUS", HttpType.GET, 0);
|
||||
|
||||
public HDFSCommand(String url, String user, Logger logger, int maxLength) {
|
||||
super();
|
||||
this.url = url;
|
||||
this.user = user;
|
||||
this.maxLength = maxLength;
|
||||
this.logger = logger;
|
||||
}
|
||||
|
||||
public String checkArgs(Op op, String path, Arg[] args) throws Exception {
|
||||
if (op == null ||
|
||||
path == null ||
|
||||
(op.minArgs > 0 &&
|
||||
(args == null ||
|
||||
args.length != op.minArgs)))
|
||||
{
|
||||
String a = "";
|
||||
a = (op != null) ? a + op.op + "\n" : a;
|
||||
a = (path != null) ? a + path + "\n" : a;
|
||||
a = (args != null) ? a + args + "\n" : a;
|
||||
return a;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
// The operator that runs all commands
|
||||
public String runCommand(Op op, String path, Arg[] args) throws Exception {
|
||||
|
||||
// Check arguments
|
||||
String error = checkArgs(op, path, args);
|
||||
if (error != null) {
|
||||
logger.error("Bad arguments to command: " + error);
|
||||
return "ERROR: BAD ARGS";
|
||||
}
|
||||
|
||||
// Build URI
|
||||
UriBuilder builder = UriBuilder
|
||||
.fromPath(url)
|
||||
.path(path)
|
||||
.queryParam("op", op.op);
|
||||
|
||||
if (args != null) {
|
||||
for (Arg a : args) {
|
||||
builder = builder.queryParam(a.key, a.value);
|
||||
}
|
||||
}
|
||||
java.net.URI uri = builder.build();
|
||||
|
||||
// Connect and get response string
|
||||
URL hdfsUrl = uri.toURL();
|
||||
HttpURLConnection con = (HttpURLConnection) hdfsUrl.openConnection();
|
||||
|
||||
if (op.cmd == HttpType.GET) {
|
||||
con.setRequestMethod("GET");
|
||||
int responseCode = con.getResponseCode();
|
||||
logger.info("Sending 'GET' request to URL : " + hdfsUrl);
|
||||
logger.info("Response Code : " + responseCode);
|
||||
|
||||
BufferedReader in = new BufferedReader(
|
||||
new InputStreamReader(con.getInputStream()));
|
||||
String inputLine;
|
||||
StringBuffer response = new StringBuffer();
|
||||
|
||||
int i = 0;
|
||||
while ((inputLine = in.readLine()) != null) {
|
||||
if (inputLine.length() < maxLength)
|
||||
response.append(inputLine);
|
||||
i++;
|
||||
if (i >= maxLength)
|
||||
break;
|
||||
}
|
||||
in.close();
|
||||
return response.toString();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,225 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.file;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Properties;
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
|
||||
/**
|
||||
* HDFS implementation of File interpreter for Zeppelin.
|
||||
*
|
||||
*/
|
||||
public class HDFSFileInterpreter extends FileInterpreter {
|
||||
static final String HDFS_URL = "hdfs.url";
|
||||
static final String HDFS_USER = "hdfs.user";
|
||||
static final String HDFS_MAXLENGTH = "hdfs.maxlength";
|
||||
|
||||
static {
|
||||
Interpreter.register(
|
||||
"hdfs",
|
||||
"hdfs",
|
||||
HDFSFileInterpreter.class.getName(),
|
||||
new InterpreterPropertyBuilder()
|
||||
.add(HDFS_URL, "http://localhost:50070/webhdfs/v1/", "The URL for WebHDFS")
|
||||
.add(HDFS_USER, "hdfs", "The WebHDFS user")
|
||||
.add(HDFS_MAXLENGTH, "1000", "Maximum number of lines of results fetched").build());
|
||||
}
|
||||
|
||||
Exception exceptionOnConnect = null;
|
||||
HDFSCommand cmd = null;
|
||||
Gson gson = null;
|
||||
|
||||
public void prepare() {
|
||||
String userName = getProperty(HDFS_USER);
|
||||
String hdfsUrl = getProperty(HDFS_URL);
|
||||
int i = Integer.parseInt(getProperty(HDFS_MAXLENGTH));
|
||||
cmd = new HDFSCommand(hdfsUrl, userName, logger, i);
|
||||
gson = new Gson();
|
||||
}
|
||||
|
||||
public HDFSFileInterpreter(Properties property){
|
||||
super(property);
|
||||
prepare();
|
||||
}
|
||||
|
||||
/**
|
||||
* Status of one file
|
||||
*
|
||||
* matches returned JSON
|
||||
*/
|
||||
public class OneFileStatus {
|
||||
public long accessTime;
|
||||
public int blockSize;
|
||||
public int childrenNum;
|
||||
public int fileId;
|
||||
public String group;
|
||||
public long length;
|
||||
public long modificationTime;
|
||||
public String owner;
|
||||
public String pathSuffix;
|
||||
public String permission;
|
||||
public int replication;
|
||||
public int storagePolicy;
|
||||
public String type;
|
||||
public String toString() {
|
||||
String str = "";
|
||||
str += "\nAccessTime = " + accessTime;
|
||||
str += "\nBlockSize = " + blockSize;
|
||||
str += "\nChildrenNum = " + childrenNum;
|
||||
str += "\nFileId = " + fileId;
|
||||
str += "\nGroup = " + group;
|
||||
str += "\nLength = " + length;
|
||||
str += "\nModificationTime = " + modificationTime;
|
||||
str += "\nOwner = " + owner;
|
||||
str += "\nPathSuffix = " + pathSuffix;
|
||||
str += "\nPermission = " + permission;
|
||||
str += "\nReplication = " + replication;
|
||||
str += "\nStoragePolicy = " + storagePolicy;
|
||||
str += "\nType = " + type;
|
||||
return str;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Status of one file
|
||||
*
|
||||
* matches returned JSON
|
||||
*/
|
||||
public class SingleFileStatus {
|
||||
public OneFileStatus FileStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Status of all files in a directory
|
||||
*
|
||||
* matches returned JSON
|
||||
*/
|
||||
public class MultiFileStatus {
|
||||
public OneFileStatus[] FileStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Status of all files in a directory
|
||||
*
|
||||
* matches returned JSON
|
||||
*/
|
||||
public class AllFileStatus {
|
||||
public MultiFileStatus FileStatuses;
|
||||
}
|
||||
|
||||
// tests whether we're able to connect to HDFS
|
||||
|
||||
private void testConnection() {
|
||||
try {
|
||||
if (isDirectory("/"))
|
||||
logger.info("Successfully created WebHDFS connection");
|
||||
} catch (Exception e) {
|
||||
logger.error("testConnection: Cannot open WebHDFS connection. Bad URL: " + "/", e);
|
||||
exceptionOnConnect = e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open() {
|
||||
testConnection();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
}
|
||||
|
||||
private String listDir(String path) throws Exception {
|
||||
return cmd.runCommand(cmd.listStatus, path, null);
|
||||
}
|
||||
|
||||
private String listPermission(OneFileStatus fs){
|
||||
String s = "";
|
||||
s += fs.type.equalsIgnoreCase("Directory") ? 'd' : '-';
|
||||
int p = Integer.parseInt(fs.permission, 16);
|
||||
s += ((p & 0x400) == 0) ? '-' : 'r';
|
||||
s += ((p & 0x200) == 0) ? '-' : 'w';
|
||||
s += ((p & 0x100) == 0) ? '-' : 'x';
|
||||
s += ((p & 0x40) == 0) ? '-' : 'r';
|
||||
s += ((p & 0x20) == 0) ? '-' : 'w';
|
||||
s += ((p & 0x10) == 0) ? '-' : 'x';
|
||||
s += ((p & 0x4) == 0) ? '-' : 'r';
|
||||
s += ((p & 0x2) == 0) ? '-' : 'w';
|
||||
s += ((p & 0x1) == 0) ? '-' : 'x';
|
||||
return s;
|
||||
}
|
||||
private String listDate(OneFileStatus fs) {
|
||||
return new SimpleDateFormat("yyyy-MM-dd HH:mm").format(new Date(fs.modificationTime));
|
||||
}
|
||||
private String ListOne(String path, OneFileStatus fs) {
|
||||
if (args.flags.contains(new Character('l'))) {
|
||||
String s = "";
|
||||
s += listPermission(fs) + "\t ";
|
||||
s += ((fs.replication == 0) ? "-" : fs.replication) + "\t ";
|
||||
s += fs.owner + "\t ";
|
||||
s += fs.group + "\t ";
|
||||
s += fs.length + "\t ";
|
||||
s += listDate(fs) + " GMT\t ";
|
||||
s += (path.length() == 1) ? path + fs.pathSuffix : path + '/' + fs.pathSuffix;
|
||||
return s;
|
||||
}
|
||||
return fs.pathSuffix;
|
||||
}
|
||||
|
||||
public String listAll(String path) {
|
||||
String all = "";
|
||||
if (exceptionOnConnect != null)
|
||||
return all;
|
||||
try {
|
||||
String sfs = listDir(path);
|
||||
if (sfs != null) {
|
||||
AllFileStatus allFiles = gson.fromJson(sfs, AllFileStatus.class);
|
||||
|
||||
if (allFiles != null &&
|
||||
allFiles.FileStatuses != null &&
|
||||
allFiles.FileStatuses.FileStatus != null)
|
||||
{
|
||||
for (OneFileStatus fs : allFiles.FileStatuses.FileStatus)
|
||||
all = all + ListOne(path, fs) + '\n';
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("listall: listDir " + path, e);
|
||||
}
|
||||
return all;
|
||||
}
|
||||
|
||||
public boolean isDirectory(String path) {
|
||||
boolean ret = false;
|
||||
if (exceptionOnConnect != null)
|
||||
return ret;
|
||||
try {
|
||||
String str = cmd.runCommand(cmd.getFileStatus, path, null);
|
||||
SingleFileStatus sfs = gson.fromJson(str, SingleFileStatus.class);
|
||||
if (sfs != null)
|
||||
return sfs.FileStatus.type.equals("DIRECTORY");
|
||||
} catch (Exception e) {
|
||||
logger.error("IsDirectory: " + path, e);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,209 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.file;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import junit.framework.TestCase;
|
||||
import static org.junit.Assert.*;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import java.util.HashMap;
|
||||
import java.util.Properties;
|
||||
import java.lang.Override;
|
||||
import java.lang.String;
|
||||
|
||||
|
||||
/**
|
||||
* Tests Interpreter by running pre-determined commands against mock file system
|
||||
*
|
||||
*/
|
||||
public class HDFSFileInterpreterTest extends TestCase {
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
HDFSFileInterpreter t = new MockHDFSFileInterpreter(new Properties());
|
||||
t.open();
|
||||
|
||||
// We have info for /, /user, /tmp, /mr-history/done
|
||||
|
||||
// Ensure
|
||||
// 1. ls -l works
|
||||
// 2. paths (. and ..) are correctly handled
|
||||
// 3. flags and arguments to commands are correctly handled
|
||||
|
||||
InterpreterResult result1 = t.interpret("ls -l /", null);
|
||||
assertEquals(result1.type(), InterpreterResult.Type.TEXT);
|
||||
|
||||
InterpreterResult result2 = t.interpret("ls -l /./user/..", null);
|
||||
assertEquals(result2.type(), InterpreterResult.Type.TEXT);
|
||||
|
||||
assertEquals(result1.message(), result2.message());
|
||||
|
||||
// Ensure you can do cd and after that the ls uses current directory correctly
|
||||
|
||||
InterpreterResult result3 = t.interpret("cd user", null);
|
||||
assertEquals(result3.type(), InterpreterResult.Type.TEXT);
|
||||
assertEquals(result3.message(), "OK");
|
||||
|
||||
InterpreterResult result4 = t.interpret("ls", null);
|
||||
assertEquals(result4.type(), InterpreterResult.Type.TEXT);
|
||||
|
||||
InterpreterResult result5 = t.interpret("ls /user", null);
|
||||
assertEquals(result5.type(), InterpreterResult.Type.TEXT);
|
||||
|
||||
assertEquals(result4.message(), result5.message());
|
||||
|
||||
// Ensure pwd works correctly
|
||||
|
||||
InterpreterResult result6 = t.interpret("pwd", null);
|
||||
assertEquals(result6.type(), InterpreterResult.Type.TEXT);
|
||||
assertEquals(result6.message(), "/user");
|
||||
|
||||
// Move a couple of levels and check we're in the right place
|
||||
|
||||
InterpreterResult result7 = t.interpret("cd ../mr-history/done", null);
|
||||
assertEquals(result7.type(), InterpreterResult.Type.TEXT);
|
||||
assertEquals(result7.message(), "OK");
|
||||
|
||||
InterpreterResult result8 = t.interpret("ls -l ", null);
|
||||
assertEquals(result8.type(), InterpreterResult.Type.TEXT);
|
||||
|
||||
InterpreterResult result9 = t.interpret("ls -l /mr-history/done", null);
|
||||
assertEquals(result9.type(), InterpreterResult.Type.TEXT);
|
||||
|
||||
assertEquals(result8.message(), result9.message());
|
||||
|
||||
InterpreterResult result10 = t.interpret("cd ../..", null);
|
||||
assertEquals(result10.type(), InterpreterResult.Type.TEXT);
|
||||
assertEquals(result7.message(), "OK");
|
||||
|
||||
InterpreterResult result11 = t.interpret("ls -l ", null);
|
||||
assertEquals(result11.type(), InterpreterResult.Type.TEXT);
|
||||
|
||||
// we should be back to first result after all this navigation
|
||||
assertEquals(result1.message(), result11.message());
|
||||
|
||||
t.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store command results from curl against a real file system
|
||||
*/
|
||||
class MockFileSystem {
|
||||
HashMap<String, String> mfs = new HashMap<String, String>();
|
||||
void addListStatusData() {
|
||||
mfs.put("/?op=LISTSTATUS",
|
||||
"{\"FileStatuses\":{\"FileStatus\":[\n" +
|
||||
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16389,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1438548219672,\"owner\":\"yarn\",\"pathSuffix\":\"app-logs\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
|
||||
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16395,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548030045,\"owner\":\"hdfs\",\"pathSuffix\":\"hdp\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
|
||||
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16390,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985336,\"owner\":\"mapred\",\"pathSuffix\":\"mapred\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
|
||||
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":2,\"fileId\":16392,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547985346,\"owner\":\"hdfs\",\"pathSuffix\":\"mr-history\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
|
||||
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16400,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725,\"owner\":\"hdfs\",\"pathSuffix\":\"system\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
|
||||
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548150089,\"owner\":\"hdfs\",\"pathSuffix\":\"tmp\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"},\n" +
|
||||
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438547921792,\"owner\":\"hdfs\",\"pathSuffix\":\"user\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" +
|
||||
"]}}"
|
||||
);
|
||||
mfs.put("/user?op=LISTSTATUS",
|
||||
"{\"FileStatuses\":{\"FileStatus\":[\n" +
|
||||
" {\"accessTime\":0,\"blockSize\":0,\"childrenNum\":4,\"fileId\":16388,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253161263,\"owner\":\"ambari-qa\",\"pathSuffix\":\"ambari-qa\",\"permission\":\"770\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" +
|
||||
" ]}}"
|
||||
);
|
||||
mfs.put("/tmp?op=LISTSTATUS",
|
||||
"{\"FileStatuses\":{\"FileStatus\":[\n" +
|
||||
" {\"accessTime\":1441253097489,\"blockSize\":134217728,\"childrenNum\":0,\"fileId\":16400,\"group\":\"hdfs\",\"length\":1645,\"modificationTime\":1441253097517,\"owner\":\"hdfs\",\"pathSuffix\":\"ida8c06540_date040315\",\"permission\":\"755\",\"replication\":3,\"storagePolicy\":0,\"type\":\"FILE\"}\n" +
|
||||
" ]}}"
|
||||
);
|
||||
mfs.put("/mr-history/done?op=LISTSTATUS",
|
||||
"{\"FileStatuses\":{\"FileStatus\":[\n" +
|
||||
"{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16433,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197481,\"owner\":\"mapred\",\"pathSuffix\":\"2015\",\"permission\":\"770\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}\n" +
|
||||
"]}}"
|
||||
);
|
||||
}
|
||||
void addGetFileStatusData() {
|
||||
mfs.put("/?op=GETFILESTATUS",
|
||||
"{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":7,\"fileId\":16385,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1438548089725,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}");
|
||||
mfs.put("/user?op=GETFILESTATUS",
|
||||
"{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16387,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253043188,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"755\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}");
|
||||
mfs.put("/tmp?op=GETFILESTATUS",
|
||||
"{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16386,\"group\":\"hdfs\",\"length\":0,\"modificationTime\":1441253097489,\"owner\":\"hdfs\",\"pathSuffix\":\"\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}");
|
||||
mfs.put("/mr-history/done?op=GETFILESTATUS",
|
||||
"{\"FileStatus\":{\"accessTime\":0,\"blockSize\":0,\"childrenNum\":1,\"fileId\":16393,\"group\":\"hadoop\",\"length\":0,\"modificationTime\":1441253197480,\"owner\":\"mapred\",\"pathSuffix\":\"\",\"permission\":\"777\",\"replication\":0,\"storagePolicy\":0,\"type\":\"DIRECTORY\"}}");
|
||||
}
|
||||
public void addMockData(HDFSCommand.Op op) {
|
||||
if (op.op.equals("LISTSTATUS")) {
|
||||
addListStatusData();
|
||||
} else if (op.op.equals("GETFILESTATUS")) {
|
||||
addGetFileStatusData();
|
||||
}
|
||||
// do nothing
|
||||
}
|
||||
public String get(String key) {
|
||||
return mfs.get(key);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run commands against mock file system that simulates webhdfs responses
|
||||
*/
|
||||
class MockHDFSCommand extends HDFSCommand {
|
||||
MockFileSystem fs = null;
|
||||
|
||||
public MockHDFSCommand(String url, String user, Logger logger) {
|
||||
super(url, user, logger, 1000);
|
||||
fs = new MockFileSystem();
|
||||
fs.addMockData(getFileStatus);
|
||||
fs.addMockData(listStatus);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String runCommand(Op op, String path, Arg[] args) throws Exception {
|
||||
|
||||
String error = checkArgs(op, path, args);
|
||||
assertNull(error);
|
||||
|
||||
String c = path + "?op=" + op.op;
|
||||
|
||||
if (args != null) {
|
||||
for (Arg a : args) {
|
||||
c += "&" + a.key + "=" + a.value;
|
||||
}
|
||||
}
|
||||
return fs.get(c);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mock Interpreter - uses Mock HDFS command
|
||||
*/
|
||||
class MockHDFSFileInterpreter extends HDFSFileInterpreter {
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
// Run commands against mock File System instead of WebHDFS
|
||||
cmd = new MockHDFSCommand("", "", logger);
|
||||
gson = new Gson();
|
||||
}
|
||||
|
||||
public MockHDFSFileInterpreter(Properties property) {
|
||||
super(property);
|
||||
}
|
||||
|
||||
}
|
||||
1
pom.xml
1
pom.xml
|
|
@ -98,6 +98,7 @@
|
|||
<module>postgresql</module>
|
||||
<module>jdbc</module>
|
||||
<module>tajo</module>
|
||||
<module>file</module>
|
||||
<module>flink</module>
|
||||
<module>ignite</module>
|
||||
<module>kylin</module>
|
||||
|
|
|
|||
Loading…
Reference in a new issue