diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 9f773d50ad..e10c85e953 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -66,7 +66,7 @@
zeppelin.interpreters
- org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter
+ org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter
Comma separated interpreter configurations. First interpreter become a default
diff --git a/flink/pom.xml b/flink/pom.xml
new file mode 100644
index 0000000000..8dcd236c3d
--- /dev/null
+++ b/flink/pom.xml
@@ -0,0 +1,390 @@
+
+
+
+
+ 4.0.0
+
+
+ zeppelin
+ org.apache.zeppelin
+ 0.5.0-incubating-SNAPSHOT
+
+
+ org.apache.zeppelin
+ zeppelin-flink
+ jar
+ 0.5.0-incubating-SNAPSHOT
+ Zeppelin: Flink
+ Zeppelin flink support
+ http://zeppelin.incubator.apache.org
+
+
+ 0.9.0-milestone-1
+ 2.3.7
+ 2.10
+ 2.10.4
+
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ ${project.groupId}
+ zeppelin-interpreter
+ ${project.version}
+ provided
+
+
+
+ com.google.code.gson
+ gson
+
+
+
+ commons-collections
+ commons-collections
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+
+
+
+ org.apache.flink
+ flink-runtime
+ ${flink.version}
+
+
+ com.typesafe.akka
+ akka-actor_2.10
+
+
+ com.typesafe.akka
+ akka-remote_2.10
+
+
+ com.typesafe.akka
+ akka-slf4j_2.10
+
+
+
+
+
+ org.apache.flink
+ flink-scala
+ ${flink.version}
+
+
+
+ com.typesafe.akka
+ akka-actor_${flink.scala.binary.version}
+ ${flink.akka.version}
+
+
+
+ com.typesafe.akka
+ akka-remote_${flink.scala.binary.version}
+ ${flink.akka.version}
+
+
+
+ com.typesafe.akka
+ akka-slf4j_${flink.scala.binary.version}
+ ${flink.akka.version}
+
+
+
+ com.typesafe.akka
+ akka-testkit_${flink.scala.binary.version}
+ ${flink.akka.version}
+
+
+
+
+ org.scala-lang
+ scala-library
+ ${flink.scala.version}
+
+
+
+ org.scala-lang
+ scala-compiler
+ ${flink.scala.version}
+
+
+
+ org.scala-lang
+ scala-reflect
+ ${flink.scala.version}
+
+
+
+ junit
+ junit
+ test
+
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ **/.idea/
+ **/*.iml
+ .gitignore
+ **/.settings/*
+ **/.classpath
+ **/.project
+ **/target/**
+ **/README.md
+ dependency-reduced-pom.xml
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 3.1.4
+
+
+
+ scala-compile-first
+ process-resources
+
+ compile
+
+
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+
+ -Xms128m
+ -Xmx512m
+
+
+
+ org.scalamacros
+ paradise_${scala.version}
+ ${scala.macros.version}
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-eclipse-plugin
+ 2.8
+
+ true
+
+ org.scala-ide.sdt.core.scalanature
+ org.eclipse.jdt.core.javanature
+
+
+ org.scala-ide.sdt.core.scalabuilder
+
+
+ org.scala-ide.sdt.launching.SCALA_CONTAINER
+ org.eclipse.jdt.launching.JRE_CONTAINER
+
+
+
+ **/*.scala
+ **/*.java
+
+
+
+
+
+
+ org.codehaus.mojo
+ build-helper-maven-plugin
+ 1.7
+
+
+
+ add-source
+ generate-sources
+
+ add-source
+
+
+
+ src/main/scala
+
+
+
+
+
+ add-test-source
+ generate-test-sources
+
+ add-test-source
+
+
+
+ src/test/scala
+
+
+
+
+
+
+
+ org.scalastyle
+ scalastyle-maven-plugin
+ 0.5.0
+
+
+
+ check
+
+
+
+
+ false
+ true
+ true
+ false
+ ${basedir}/src/main/scala
+ ${basedir}/src/test/scala
+ ${project.basedir}/../_tools/scalastyle.xml
+ ${project.basedir}/target/scalastyle-output.xml
+ UTF-8
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 2.7
+
+ true
+
+
+
+
+ maven-enforcer-plugin
+ 1.3.1
+
+
+ enforce
+ none
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.17
+
+ 1
+ false
+ -Xmx1024m -XX:MaxPermSize=256m
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+ 2.4
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/../../interpreter/flink
+ false
+ false
+ true
+ runtime
+
+
+
+
+
+
+ maven-dependency-plugin
+ 2.8
+
+
+ package
+
+ copy
+
+
+ ${project.build.directory}/../../interpreter/flink
+ false
+ false
+ true
+ runtime
+
+
+ ${project.groupId}
+ ${project.artifactId}
+ ${project.version}
+ ${project.packaging}
+
+
+
+
+
+
+
+
+
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
new file mode 100644
index 0000000000..ed3e891504
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkEnvironment.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.File;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.translation.JavaPlan;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class FlinkEnvironment extends ExecutionEnvironment {
+ Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class);
+
+ private String host;
+ private int port;
+
+ private FlinkIMain imain;
+
+ public FlinkEnvironment(String host, int port, FlinkIMain imain) {
+ this.host = host;
+ this.port = port;
+ this.imain = imain;
+
+ logger.info("jobManager host={}, port={}", host, port);
+ }
+
+ @Override
+ public JobExecutionResult execute(String jobName) throws Exception {
+ JavaPlan plan = createProgramPlan(jobName);
+
+ File jarFile = imain.jar();
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
+ jarFile.getAbsolutePath());
+
+ JobExecutionResult result = executor.executePlan(plan);
+
+ if (jarFile.isFile()) {
+ jarFile.delete();
+ }
+
+ return result;
+ }
+
+ @Override
+ public String getExecutionPlan() throws Exception {
+ JavaPlan plan = createProgramPlan("unnamed", false);
+ plan.setDefaultParallelism(getParallelism());
+ registerCachedFilesWithPlan(plan);
+
+ File jarFile = imain.jar();
+ PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
+ jarFile.getAbsolutePath());
+ String jsonPlan = executor.getOptimizerPlanAsJSON(plan);
+
+ if (jarFile != null && jarFile.isFile()) {
+ jarFile.delete();
+ }
+
+ return jsonPlan;
+ }
+
+/*
+ private File createJar() throws IOException {
+ // create execution environment
+ File jarFile = new File(System.getProperty("java.io.tmpdir")
+ + "/ZeppelinFlinkJar_" + System.currentTimeMillis() + ".jar");
+
+
+ File[] classFiles = classDir.listFiles();
+ if (classFiles == null) {
+ return null;
+ }
+
+ byte buffer[] = new byte[10240];
+ // Open archive file
+ FileOutputStream stream = new FileOutputStream(jarFile);
+ JarOutputStream out = new JarOutputStream(stream, new Manifest());
+
+ for (int i = 0; i < classFiles.length; i++) {
+ File classFile = classFiles[i];
+ if (classFiles == null || !classFile.exists()
+ || classFile.isDirectory())
+ continue;
+
+
+ // Add class
+ JarEntry jarAdd = new JarEntry(classFile.getName());
+ jarAdd.setTime(classFile.lastModified());
+ out.putNextEntry(jarAdd);
+ logger.info("add class {} into jar", classFile);
+
+ // Write file to archive
+ FileInputStream in = new FileInputStream(classFile);
+ while (true) {
+ int nRead = in.read(buffer, 0, buffer.length);
+ if (nRead <= 0)
+ break;
+ out.write(buffer, 0, nRead);
+ }
+ in.close();
+ }
+
+ out.close();
+ stream.close();
+ return jarFile;
+ }
+ */
+
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
new file mode 100644
index 0000000000..57654a4ba0
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkIMain.java
@@ -0,0 +1,74 @@
+package org.apache.zeppelin.flink;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.reflect.io.AbstractFile;
+import scala.reflect.io.VirtualDirectory;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.interpreter.IMain;
+
+/**
+ */
+public class FlinkIMain extends IMain {
+ Logger logger = LoggerFactory.getLogger(FlinkIMain.class);
+
+ public FlinkIMain(Settings setting, PrintWriter out) {
+ super(setting, out);
+ }
+
+ public File jar() throws IOException {
+ VirtualDirectory classDir = virtualDirectory();
+ // create execution environment
+ File jarBuildDir = new File(System.getProperty("java.io.tmpdir")
+ + "/ZeppelinFlinkJarBiuldDir_" + System.currentTimeMillis());
+ jarBuildDir.mkdirs();
+
+ File jarFile = new File(System.getProperty("java.io.tmpdir")
+ + "/ZeppelinFlinkJarFile_" + System.currentTimeMillis() + ".jar");
+
+
+ Iterator vdIt = classDir.iterator();
+ while (vdIt.hasNext()) {
+ AbstractFile fi = vdIt.next();
+ if (fi.isDirectory()) {
+ Iterator fiIt = fi.iterator();
+ while (fiIt.hasNext()) {
+ AbstractFile f = fiIt.next();
+
+ // directory for compiled line
+ File lineDir = new File(jarBuildDir.getAbsolutePath(), fi.name());
+ lineDir.mkdirs();
+
+ // compiled classes for commands from shell
+ File writeFile = new File(lineDir.getAbsolutePath(), f.name());
+ FileOutputStream outputStream = new FileOutputStream(writeFile);
+ InputStream inputStream = f.input();
+
+ // copy file contents
+ org.apache.commons.io.IOUtils.copy(inputStream, outputStream);
+
+ inputStream.close();
+ outputStream.close();
+ }
+ }
+ }
+
+ // jarr up
+ JarHelper jh = new JarHelper();
+ jh.jarDir(jarBuildDir, jarFile);
+
+ FileUtils.deleteDirectory(jarBuildDir);
+ return jarFile;
+ }
+
+
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
new file mode 100644
index 0000000000..76c4390027
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Console;
+import scala.None;
+import scala.Some;
+import scala.tools.nsc.Settings;
+import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
+import scala.tools.nsc.settings.MutableSettings.PathSetting;
+
+/**
+ * Interpreter for Apache Flink (http://flink.apache.org)
+ */
+public class FlinkInterpreter extends Interpreter {
+ Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class);
+ private Settings settings;
+ private ByteArrayOutputStream out;
+ private FlinkIMain imain;
+ private File classDir;
+ private Map binder;
+ private ExecutionEnvironment env;
+ private Configuration flinkConf;
+ private LocalFlinkMiniCluster localFlinkCluster;
+ private Client client;
+
+ public FlinkInterpreter(Properties property) {
+ super(property);
+ }
+
+ static {
+ Interpreter.register(
+ "flink",
+ "flink",
+ FlinkInterpreter.class.getName(),
+ new InterpreterPropertyBuilder()
+ .add("local", "true", "Run flink locally")
+ .add("jobmanager.rpc.address", "localhost", "Flink cluster")
+ .add("jobmanager.rpc.port", "6123", "Flink cluster")
+ .build()
+ );
+ }
+
+ @Override
+ public void open() {
+ URL[] urls = getClassloaderUrls();
+ this.settings = new Settings();
+
+ // set classpath
+ PathSetting pathSettings = settings.classpath();
+ String classpath = "";
+ List paths = currentClassPath();
+ for (File f : paths) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += f.getAbsolutePath();
+ }
+
+ if (urls != null) {
+ for (URL u : urls) {
+ if (classpath.length() > 0) {
+ classpath += File.pathSeparator;
+ }
+ classpath += u.getFile();
+ }
+ }
+
+ pathSettings.v_$eq(classpath);
+ settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
+ settings.explicitParentLoader_$eq(new Some(Thread.currentThread()
+ .getContextClassLoader()));
+ BooleanSetting b = (BooleanSetting) settings.usejavacp();
+ b.v_$eq(true);
+ settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
+
+ out = new ByteArrayOutputStream();
+ imain = new FlinkIMain(settings, new PrintWriter(out));
+
+ initializeFlinkEnv();
+ }
+
+ private boolean localMode() {
+ return Boolean.parseBoolean(getProperty("local"));
+ }
+
+ private String getRpcAddress() {
+ if (localMode()) {
+ return "localhost";
+ } else {
+ return getProperty("jobmanager.rpc.address");
+ }
+ }
+
+ private int getRpcPort() {
+ if (localMode()) {
+ return localFlinkCluster.getJobManagerRPCPort();
+ } else {
+ return Integer.parseInt(getProperty("jobmanager.rpc.port"));
+ }
+ }
+
+ private void initializeFlinkEnv() {
+ // prepare bindings
+ imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
+ binder = (Map) getValue("_binder");
+
+ flinkConf = new org.apache.flink.configuration.Configuration();
+ Properties intpProperty = getProperty();
+ for (Object k : intpProperty.keySet()) {
+ String key = (String) k;
+ String val = toString(intpProperty.get(key));
+ flinkConf.setString(key, val);
+ }
+
+ if (localMode()) {
+ startFlinkMiniCluster();
+ }
+
+ env = new FlinkEnvironment(getRpcAddress(), getRpcPort(), imain);
+ binder.put("env", new org.apache.flink.api.scala.ExecutionEnvironment(env));
+
+ // do import and create val
+ imain.interpret("@transient val env = "
+ + "_binder.get(\"env\")"
+ + ".asInstanceOf[org.apache.flink.api.scala.ExecutionEnvironment]");
+
+ imain.interpret("import org.apache.flink.api.scala._");
+ }
+
+
+ private List currentClassPath() {
+ List paths = classPath(Thread.currentThread().getContextClassLoader());
+ String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
+ if (cps != null) {
+ for (String cp : cps) {
+ paths.add(new File(cp));
+ }
+ }
+ return paths;
+ }
+
+ private List classPath(ClassLoader cl) {
+ List paths = new LinkedList();
+ if (cl == null) {
+ return paths;
+ }
+
+ if (cl instanceof URLClassLoader) {
+ URLClassLoader ucl = (URLClassLoader) cl;
+ URL[] urls = ucl.getURLs();
+ if (urls != null) {
+ for (URL url : urls) {
+ paths.add(new File(url.getFile()));
+ }
+ }
+ }
+ return paths;
+ }
+
+ public Object getValue(String name) {
+ Object ret = imain.valueOfTerm(name);
+ if (ret instanceof None) {
+ return null;
+ } else if (ret instanceof Some) {
+ return ((Some) ret).get();
+ } else {
+ return ret;
+ }
+ }
+
+ @Override
+ public void close() {
+ imain.close();
+
+ if (localMode()) {
+ stopFlinkMiniCluster();
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String line, InterpreterContext context) {
+ if (line == null || line.trim().length() == 0) {
+ return new InterpreterResult(Code.SUCCESS);
+ }
+
+ InterpreterResult result = interpret(line.split("\n"), context);
+ return result;
+ }
+
+ public InterpreterResult interpret(String[] lines, InterpreterContext context) {
+ String[] linesToRun = new String[lines.length + 1];
+ for (int i = 0; i < lines.length; i++) {
+ linesToRun[i] = lines[i];
+ }
+ linesToRun[lines.length] = "print(\"\")";
+
+ Console.setOut(out);
+ System.setOut(new PrintStream(out));
+ out.reset();
+ Code r = null;
+
+ String incomplete = "";
+ for (String s : linesToRun) {
+ scala.tools.nsc.interpreter.Results.Result res = null;
+ try {
+ res = imain.interpret(incomplete + s);
+ } catch (Exception e) {
+ logger.info("Interpreter exception", e);
+ return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
+ }
+
+ r = getResultCode(res);
+
+ if (r == Code.ERROR) {
+ return new InterpreterResult(r, out.toString());
+ } else if (r == Code.INCOMPLETE) {
+ incomplete += s + "\n";
+ } else {
+ incomplete = "";
+ }
+ }
+
+ if (r == Code.INCOMPLETE) {
+ return new InterpreterResult(r, "Incomplete expression");
+ } else {
+ return new InterpreterResult(r, out.toString());
+ }
+ }
+
+ private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
+ if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
+ return Code.SUCCESS;
+ } else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
+ return Code.INCOMPLETE;
+ } else {
+ return Code.ERROR;
+ }
+ }
+
+
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.NATIVE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return new LinkedList();
+ }
+
+ private void startFlinkMiniCluster() {
+ localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false);
+ localFlinkCluster.waitForTaskManagersToBeRegistered();
+ }
+
+ private void stopFlinkMiniCluster() {
+ if (localFlinkCluster != null) {
+ localFlinkCluster.shutdown();
+ localFlinkCluster = null;
+ }
+ }
+
+ static final String toString(Object o) {
+ return (o instanceof String) ? (String) o : "";
+ }
+
+}
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
new file mode 100644
index 0000000000..e924d6d53b
--- /dev/null
+++ b/flink/src/main/java/org/apache/zeppelin/flink/JarHelper.java
@@ -0,0 +1,199 @@
+package org.apache.zeppelin.flink;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.jar.JarEntry;
+import java.util.jar.JarInputStream;
+import java.util.jar.JarOutputStream;
+
+/**
+ * Provides utility services for jarring and unjarring files and directories.
+ * Note that a given instance of JarHelper is not threadsafe with respect to
+ * multiple jar operations.
+ *
+ * Copied from
+ * http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans
+ * /xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
+ *
+ * @author Patrick Calahan
+ */
+public class JarHelper {
+ // ========================================================================
+ // Constants
+
+ private static final int BUFFER_SIZE = 2156;
+
+ // ========================================================================
+ // Variables
+
+ private byte[] mBuffer = new byte[BUFFER_SIZE];
+ private int mByteCount = 0;
+ private boolean mVerbose = false;
+ private String mDestJarName = "";
+
+ // ========================================================================
+ // Constructor
+
+ /**
+ * Instantiates a new JarHelper.
+ */
+ public JarHelper() {
+ }
+
+ // ========================================================================
+ // Public methods
+
+ /**
+ * Jars a given directory or single file into a JarOutputStream.
+ */
+ public void jarDir(File dirOrFile2Jar, File destJar) throws IOException {
+
+ if (dirOrFile2Jar == null || destJar == null) {
+ throw new IllegalArgumentException();
+ }
+
+ mDestJarName = destJar.getCanonicalPath();
+ FileOutputStream fout = new FileOutputStream(destJar);
+ JarOutputStream jout = new JarOutputStream(fout);
+ // jout.setLevel(0);
+ try {
+ jarDir(dirOrFile2Jar, jout, null);
+ } catch (IOException ioe) {
+ throw ioe;
+ } finally {
+ jout.close();
+ fout.close();
+ }
+ }
+
+ /**
+ * Unjars a given jar file into a given directory.
+ */
+ public void unjarDir(File jarFile, File destDir) throws IOException {
+ BufferedOutputStream dest = null;
+ FileInputStream fis = new FileInputStream(jarFile);
+ unjar(fis, destDir);
+ }
+
+ /**
+ * Given an InputStream on a jar file, unjars the contents into the given
+ * directory.
+ */
+ public void unjar(InputStream in, File destDir) throws IOException {
+ BufferedOutputStream dest = null;
+ JarInputStream jis = new JarInputStream(in);
+ JarEntry entry;
+ while ((entry = jis.getNextJarEntry()) != null) {
+ if (entry.isDirectory()) {
+ File dir = new File(destDir, entry.getName());
+ dir.mkdir();
+ if (entry.getTime() != -1) {
+ dir.setLastModified(entry.getTime());
+ }
+ continue;
+ }
+ int count;
+ byte[] data = new byte[BUFFER_SIZE];
+ File destFile = new File(destDir, entry.getName());
+ if (mVerbose) {
+ System.out
+ .println("unjarring " + destFile + " from " + entry.getName());
+ }
+ FileOutputStream fos = new FileOutputStream(destFile);
+ dest = new BufferedOutputStream(fos, BUFFER_SIZE);
+ while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
+ dest.write(data, 0, count);
+ }
+ dest.flush();
+ dest.close();
+ if (entry.getTime() != -1) {
+ destFile.setLastModified(entry.getTime());
+ }
+ }
+ jis.close();
+ }
+
+ public void setVerbose(boolean b) {
+ mVerbose = b;
+ }
+
+ // ========================================================================
+ // Private methods
+
+ private static final char SEP = '/';
+
+ /**
+ * Recursively jars up the given path under the given directory.
+ */
+ private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
+ throws IOException {
+ if (mVerbose) {
+ System.out.println("checking " + dirOrFile2jar);
+ }
+ if (dirOrFile2jar.isDirectory()) {
+ String[] dirList = dirOrFile2jar.list();
+ String subPath = (path == null) ? ""
+ : (path + dirOrFile2jar.getName() + SEP);
+ if (path != null) {
+ JarEntry je = new JarEntry(subPath);
+ je.setTime(dirOrFile2jar.lastModified());
+ jos.putNextEntry(je);
+ jos.flush();
+ jos.closeEntry();
+ }
+ for (int i = 0; i < dirList.length; i++) {
+ File f = new File(dirOrFile2jar, dirList[i]);
+ jarDir(f, jos, subPath);
+ }
+ } else {
+ if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) {
+ if (mVerbose) {
+ System.out.println("skipping " + dirOrFile2jar.getPath());
+ }
+ return;
+ }
+
+ if (mVerbose) {
+ System.out.println("adding " + dirOrFile2jar.getPath());
+ }
+ FileInputStream fis = new FileInputStream(dirOrFile2jar);
+ try {
+ JarEntry entry = new JarEntry(path + dirOrFile2jar.getName());
+ entry.setTime(dirOrFile2jar.lastModified());
+ jos.putNextEntry(entry);
+ while ((mByteCount = fis.read(mBuffer)) != -1) {
+ jos.write(mBuffer, 0, mByteCount);
+ if (mVerbose) {
+ System.out.println("wrote " + mByteCount + " bytes");
+ }
+ }
+ jos.flush();
+ jos.closeEntry();
+ } catch (IOException ioe) {
+ throw ioe;
+ } finally {
+ fis.close();
+ }
+ }
+ }
+
+ // for debugging
+ public static void main(String[] args) throws IOException {
+ if (args.length < 2) {
+ System.err.println("Usage: JarHelper jarname.jar directory");
+ return;
+ }
+
+ JarHelper jarHelper = new JarHelper();
+ jarHelper.mVerbose = true;
+
+ File destJar = new File(args[0]);
+ File dirOrFile2Jar = new File(args[1]);
+
+ jarHelper.jarDir(dirOrFile2Jar, destJar);
+ }
+}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
new file mode 100644
index 0000000000..091c4f3118
--- /dev/null
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.flink;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FlinkInterpreterTest {
+
+ private FlinkInterpreter flink;
+ private InterpreterContext context;
+
+ @Before
+ public void setUp() {
+ Properties p = new Properties();
+ flink = new FlinkInterpreter(p);
+ flink.open();
+ context = new InterpreterContext(null, null, null, null, null, null, null);
+ }
+
+ @After
+ public void tearDown() {
+ flink.close();
+ flink.destroy();
+ }
+
+ @Test
+ public void testSimpleStatement() {
+ InterpreterResult result = flink.interpret("val a=1", context);
+ result = flink.interpret("print(a)", context);
+ assertEquals("1", result.message());
+ }
+
+ @Test
+ public void testWordCount() {
+ flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
+ flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
+ flink.interpret("counts.print()", context);
+ InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context);
+ assertEquals("", result.message());
+ }
+}
diff --git a/pom.xml b/pom.xml
index bbde0846c6..dd3c2a3684 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,7 @@
shell
hive
tajo
+ flink
zeppelin-web
zeppelin-server
zeppelin-distribution
diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
index bbf46fc88d..78a463cc19 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java
@@ -389,7 +389,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.angular.AngularInterpreter,"
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
- + "org.apache.zeppelin.tajo.TajoInterpreter"),
+ + "org.apache.zeppelin.tajo.TajoInterpreter,"
+ + "org.apache.zeppelin.flink.FlinkInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),