Initial implementation of interpreter for Apache Flink

This commit is contained in:
Lee moon soo 2015-05-11 16:56:00 +02:00
parent 109b0807fc
commit f2a66df49e
9 changed files with 1178 additions and 2 deletions

View file

@ -66,7 +66,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

390
flink/pom.xml Normal file
View file

@ -0,0 +1,390 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.5.0-incubating-SNAPSHOT</version>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-flink</artifactId>
<packaging>jar</packaging>
<version>0.5.0-incubating-SNAPSHOT</version>
<name>Zeppelin: Flink</name>
<description>Zeppelin flink support</description>
<url>http://zeppelin.incubator.apache.org</url>
<properties>
<flink.version>0.9.0-milestone-1</flink.version>
<flink.akka.version>2.3.7</flink.akka.version>
<flink.scala.binary.version>2.10</flink.scala.binary.version>
<flink.scala.version>2.10.4</flink.scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.10</artifactId>
</exclusion>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.10</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${flink.scala.binary.version}</artifactId>
<version>${flink.akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${flink.scala.binary.version}</artifactId>
<version>${flink.akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_${flink.scala.binary.version}</artifactId>
<version>${flink.akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${flink.scala.binary.version}</artifactId>
<version>${flink.akka.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${flink.scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${flink.scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${flink.scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/.idea/</exclude>
<exclude>**/*.iml</exclude>
<exclude>.gitignore</exclude>
<exclude>**/.settings/*</exclude>
<exclude>**/.classpath</exclude>
<exclude>**/.project</exclude>
<exclude>**/target/**</exclude>
<exclude>**/README.md</exclude>
<exclude>dependency-reduced-pom.xml</exclude>
</excludes>
</configuration>
</plugin>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.4</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmArgs>
<jvmArg>-Xms128m</jvmArg>
<jvmArg>-Xmx512m</jvmArg>
</jvmArgs>
<compilerPlugins combine.children="append">
<compilerPlugin>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
</plugin>
<!-- Eclipse Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<!-- excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes -->
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<!-- Adding scala source directories to build path -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>0.5.0</version>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>${project.basedir}/../_tools/scalastyle.xml</configLocation>
<outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/flink</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.flink;
import java.io.File;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class FlinkEnvironment extends ExecutionEnvironment {
Logger logger = LoggerFactory.getLogger(FlinkEnvironment.class);
private String host;
private int port;
private FlinkIMain imain;
public FlinkEnvironment(String host, int port, FlinkIMain imain) {
this.host = host;
this.port = port;
this.imain = imain;
logger.info("jobManager host={}, port={}", host, port);
}
@Override
public JobExecutionResult execute(String jobName) throws Exception {
JavaPlan plan = createProgramPlan(jobName);
File jarFile = imain.jar();
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
jarFile.getAbsolutePath());
JobExecutionResult result = executor.executePlan(plan);
if (jarFile.isFile()) {
jarFile.delete();
}
return result;
}
@Override
public String getExecutionPlan() throws Exception {
JavaPlan plan = createProgramPlan("unnamed", false);
plan.setDefaultParallelism(getParallelism());
registerCachedFilesWithPlan(plan);
File jarFile = imain.jar();
PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port,
jarFile.getAbsolutePath());
String jsonPlan = executor.getOptimizerPlanAsJSON(plan);
if (jarFile != null && jarFile.isFile()) {
jarFile.delete();
}
return jsonPlan;
}
/*
private File createJar() throws IOException {
// create execution environment
File jarFile = new File(System.getProperty("java.io.tmpdir")
+ "/ZeppelinFlinkJar_" + System.currentTimeMillis() + ".jar");
File[] classFiles = classDir.listFiles();
if (classFiles == null) {
return null;
}
byte buffer[] = new byte[10240];
// Open archive file
FileOutputStream stream = new FileOutputStream(jarFile);
JarOutputStream out = new JarOutputStream(stream, new Manifest());
for (int i = 0; i < classFiles.length; i++) {
File classFile = classFiles[i];
if (classFiles == null || !classFile.exists()
|| classFile.isDirectory())
continue;
// Add class
JarEntry jarAdd = new JarEntry(classFile.getName());
jarAdd.setTime(classFile.lastModified());
out.putNextEntry(jarAdd);
logger.info("add class {} into jar", classFile);
// Write file to archive
FileInputStream in = new FileInputStream(classFile);
while (true) {
int nRead = in.read(buffer, 0, buffer.length);
if (nRead <= 0)
break;
out.write(buffer, 0, nRead);
}
in.close();
}
out.close();
stream.close();
return jarFile;
}
*/
}

View file

@ -0,0 +1,74 @@
package org.apache.zeppelin.flink;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Iterator;
import scala.reflect.io.AbstractFile;
import scala.reflect.io.VirtualDirectory;
import scala.tools.nsc.Settings;
import scala.tools.nsc.interpreter.IMain;
/**
*/
public class FlinkIMain extends IMain {
Logger logger = LoggerFactory.getLogger(FlinkIMain.class);
public FlinkIMain(Settings setting, PrintWriter out) {
super(setting, out);
}
public File jar() throws IOException {
VirtualDirectory classDir = virtualDirectory();
// create execution environment
File jarBuildDir = new File(System.getProperty("java.io.tmpdir")
+ "/ZeppelinFlinkJarBiuldDir_" + System.currentTimeMillis());
jarBuildDir.mkdirs();
File jarFile = new File(System.getProperty("java.io.tmpdir")
+ "/ZeppelinFlinkJarFile_" + System.currentTimeMillis() + ".jar");
Iterator<AbstractFile> vdIt = classDir.iterator();
while (vdIt.hasNext()) {
AbstractFile fi = vdIt.next();
if (fi.isDirectory()) {
Iterator<AbstractFile> fiIt = fi.iterator();
while (fiIt.hasNext()) {
AbstractFile f = fiIt.next();
// directory for compiled line
File lineDir = new File(jarBuildDir.getAbsolutePath(), fi.name());
lineDir.mkdirs();
// compiled classes for commands from shell
File writeFile = new File(lineDir.getAbsolutePath(), f.name());
FileOutputStream outputStream = new FileOutputStream(writeFile);
InputStream inputStream = f.input();
// copy file contents
org.apache.commons.io.IOUtils.copy(inputStream, outputStream);
inputStream.close();
outputStream.close();
}
}
}
// jarr up
JarHelper jh = new JarHelper();
jh.jarDir(jarBuildDir, jarFile);
FileUtils.deleteDirectory(jarBuildDir);
return jarFile;
}
}

View file

@ -0,0 +1,317 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.flink;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.Client;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Console;
import scala.None;
import scala.Some;
import scala.tools.nsc.Settings;
import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
import scala.tools.nsc.settings.MutableSettings.PathSetting;
/**
* Interpreter for Apache Flink (http://flink.apache.org)
*/
public class FlinkInterpreter extends Interpreter {
Logger logger = LoggerFactory.getLogger(FlinkInterpreter.class);
private Settings settings;
private ByteArrayOutputStream out;
private FlinkIMain imain;
private File classDir;
private Map<String, Object> binder;
private ExecutionEnvironment env;
private Configuration flinkConf;
private LocalFlinkMiniCluster localFlinkCluster;
private Client client;
public FlinkInterpreter(Properties property) {
super(property);
}
static {
Interpreter.register(
"flink",
"flink",
FlinkInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("local", "true", "Run flink locally")
.add("jobmanager.rpc.address", "localhost", "Flink cluster")
.add("jobmanager.rpc.port", "6123", "Flink cluster")
.build()
);
}
@Override
public void open() {
URL[] urls = getClassloaderUrls();
this.settings = new Settings();
// set classpath
PathSetting pathSettings = settings.classpath();
String classpath = "";
List<File> paths = currentClassPath();
for (File f : paths) {
if (classpath.length() > 0) {
classpath += File.pathSeparator;
}
classpath += f.getAbsolutePath();
}
if (urls != null) {
for (URL u : urls) {
if (classpath.length() > 0) {
classpath += File.pathSeparator;
}
classpath += u.getFile();
}
}
pathSettings.v_$eq(classpath);
settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings);
settings.explicitParentLoader_$eq(new Some<ClassLoader>(Thread.currentThread()
.getContextClassLoader()));
BooleanSetting b = (BooleanSetting) settings.usejavacp();
b.v_$eq(true);
settings.scala$tools$nsc$settings$StandardScalaSettings$_setter_$usejavacp_$eq(b);
out = new ByteArrayOutputStream();
imain = new FlinkIMain(settings, new PrintWriter(out));
initializeFlinkEnv();
}
private boolean localMode() {
return Boolean.parseBoolean(getProperty("local"));
}
private String getRpcAddress() {
if (localMode()) {
return "localhost";
} else {
return getProperty("jobmanager.rpc.address");
}
}
private int getRpcPort() {
if (localMode()) {
return localFlinkCluster.getJobManagerRPCPort();
} else {
return Integer.parseInt(getProperty("jobmanager.rpc.port"));
}
}
private void initializeFlinkEnv() {
// prepare bindings
imain.interpret("@transient var _binder = new java.util.HashMap[String, Object]()");
binder = (Map<String, Object>) getValue("_binder");
flinkConf = new org.apache.flink.configuration.Configuration();
Properties intpProperty = getProperty();
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));
flinkConf.setString(key, val);
}
if (localMode()) {
startFlinkMiniCluster();
}
env = new FlinkEnvironment(getRpcAddress(), getRpcPort(), imain);
binder.put("env", new org.apache.flink.api.scala.ExecutionEnvironment(env));
// do import and create val
imain.interpret("@transient val env = "
+ "_binder.get(\"env\")"
+ ".asInstanceOf[org.apache.flink.api.scala.ExecutionEnvironment]");
imain.interpret("import org.apache.flink.api.scala._");
}
private List<File> currentClassPath() {
List<File> paths = classPath(Thread.currentThread().getContextClassLoader());
String[] cps = System.getProperty("java.class.path").split(File.pathSeparator);
if (cps != null) {
for (String cp : cps) {
paths.add(new File(cp));
}
}
return paths;
}
private List<File> classPath(ClassLoader cl) {
List<File> paths = new LinkedList<File>();
if (cl == null) {
return paths;
}
if (cl instanceof URLClassLoader) {
URLClassLoader ucl = (URLClassLoader) cl;
URL[] urls = ucl.getURLs();
if (urls != null) {
for (URL url : urls) {
paths.add(new File(url.getFile()));
}
}
}
return paths;
}
public Object getValue(String name) {
Object ret = imain.valueOfTerm(name);
if (ret instanceof None) {
return null;
} else if (ret instanceof Some) {
return ((Some) ret).get();
} else {
return ret;
}
}
@Override
public void close() {
imain.close();
if (localMode()) {
stopFlinkMiniCluster();
}
}
@Override
public InterpreterResult interpret(String line, InterpreterContext context) {
if (line == null || line.trim().length() == 0) {
return new InterpreterResult(Code.SUCCESS);
}
InterpreterResult result = interpret(line.split("\n"), context);
return result;
}
public InterpreterResult interpret(String[] lines, InterpreterContext context) {
String[] linesToRun = new String[lines.length + 1];
for (int i = 0; i < lines.length; i++) {
linesToRun[i] = lines[i];
}
linesToRun[lines.length] = "print(\"\")";
Console.setOut(out);
System.setOut(new PrintStream(out));
out.reset();
Code r = null;
String incomplete = "";
for (String s : linesToRun) {
scala.tools.nsc.interpreter.Results.Result res = null;
try {
res = imain.interpret(incomplete + s);
} catch (Exception e) {
logger.info("Interpreter exception", e);
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
}
r = getResultCode(res);
if (r == Code.ERROR) {
return new InterpreterResult(r, out.toString());
} else if (r == Code.INCOMPLETE) {
incomplete += s + "\n";
} else {
incomplete = "";
}
}
if (r == Code.INCOMPLETE) {
return new InterpreterResult(r, "Incomplete expression");
} else {
return new InterpreterResult(r, out.toString());
}
}
private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) {
if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
return Code.SUCCESS;
} else if (r instanceof scala.tools.nsc.interpreter.Results.Incomplete$) {
return Code.INCOMPLETE;
} else {
return Code.ERROR;
}
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.NATIVE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public List<String> completion(String buf, int cursor) {
return new LinkedList<String>();
}
private void startFlinkMiniCluster() {
localFlinkCluster = new LocalFlinkMiniCluster(flinkConf, false);
localFlinkCluster.waitForTaskManagersToBeRegistered();
}
private void stopFlinkMiniCluster() {
if (localFlinkCluster != null) {
localFlinkCluster.shutdown();
localFlinkCluster = null;
}
}
static final String toString(Object o) {
return (o instanceof String) ? (String) o : "";
}
}

View file

@ -0,0 +1,199 @@
package org.apache.zeppelin.flink;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.jar.JarEntry;
import java.util.jar.JarInputStream;
import java.util.jar.JarOutputStream;
/**
* Provides utility services for jarring and unjarring files and directories.
* Note that a given instance of JarHelper is not threadsafe with respect to
* multiple jar operations.
*
* Copied from
* http://grepcode.com/file_/repo1.maven.org/maven2/org.apache.xmlbeans
* /xmlbeans/2.4.0/org/apache/xmlbeans/impl/common/JarHelper.java/?v=source
*
* @author Patrick Calahan <pcal@bea.com>
*/
public class JarHelper {
// ========================================================================
// Constants
private static final int BUFFER_SIZE = 2156;
// ========================================================================
// Variables
private byte[] mBuffer = new byte[BUFFER_SIZE];
private int mByteCount = 0;
private boolean mVerbose = false;
private String mDestJarName = "";
// ========================================================================
// Constructor
/**
* Instantiates a new JarHelper.
*/
public JarHelper() {
}
// ========================================================================
// Public methods
/**
* Jars a given directory or single file into a JarOutputStream.
*/
public void jarDir(File dirOrFile2Jar, File destJar) throws IOException {
if (dirOrFile2Jar == null || destJar == null) {
throw new IllegalArgumentException();
}
mDestJarName = destJar.getCanonicalPath();
FileOutputStream fout = new FileOutputStream(destJar);
JarOutputStream jout = new JarOutputStream(fout);
// jout.setLevel(0);
try {
jarDir(dirOrFile2Jar, jout, null);
} catch (IOException ioe) {
throw ioe;
} finally {
jout.close();
fout.close();
}
}
/**
* Unjars a given jar file into a given directory.
*/
public void unjarDir(File jarFile, File destDir) throws IOException {
BufferedOutputStream dest = null;
FileInputStream fis = new FileInputStream(jarFile);
unjar(fis, destDir);
}
/**
* Given an InputStream on a jar file, unjars the contents into the given
* directory.
*/
public void unjar(InputStream in, File destDir) throws IOException {
BufferedOutputStream dest = null;
JarInputStream jis = new JarInputStream(in);
JarEntry entry;
while ((entry = jis.getNextJarEntry()) != null) {
if (entry.isDirectory()) {
File dir = new File(destDir, entry.getName());
dir.mkdir();
if (entry.getTime() != -1) {
dir.setLastModified(entry.getTime());
}
continue;
}
int count;
byte[] data = new byte[BUFFER_SIZE];
File destFile = new File(destDir, entry.getName());
if (mVerbose) {
System.out
.println("unjarring " + destFile + " from " + entry.getName());
}
FileOutputStream fos = new FileOutputStream(destFile);
dest = new BufferedOutputStream(fos, BUFFER_SIZE);
while ((count = jis.read(data, 0, BUFFER_SIZE)) != -1) {
dest.write(data, 0, count);
}
dest.flush();
dest.close();
if (entry.getTime() != -1) {
destFile.setLastModified(entry.getTime());
}
}
jis.close();
}
public void setVerbose(boolean b) {
mVerbose = b;
}
// ========================================================================
// Private methods
private static final char SEP = '/';
/**
* Recursively jars up the given path under the given directory.
*/
private void jarDir(File dirOrFile2jar, JarOutputStream jos, String path)
throws IOException {
if (mVerbose) {
System.out.println("checking " + dirOrFile2jar);
}
if (dirOrFile2jar.isDirectory()) {
String[] dirList = dirOrFile2jar.list();
String subPath = (path == null) ? ""
: (path + dirOrFile2jar.getName() + SEP);
if (path != null) {
JarEntry je = new JarEntry(subPath);
je.setTime(dirOrFile2jar.lastModified());
jos.putNextEntry(je);
jos.flush();
jos.closeEntry();
}
for (int i = 0; i < dirList.length; i++) {
File f = new File(dirOrFile2jar, dirList[i]);
jarDir(f, jos, subPath);
}
} else {
if (dirOrFile2jar.getCanonicalPath().equals(mDestJarName)) {
if (mVerbose) {
System.out.println("skipping " + dirOrFile2jar.getPath());
}
return;
}
if (mVerbose) {
System.out.println("adding " + dirOrFile2jar.getPath());
}
FileInputStream fis = new FileInputStream(dirOrFile2jar);
try {
JarEntry entry = new JarEntry(path + dirOrFile2jar.getName());
entry.setTime(dirOrFile2jar.lastModified());
jos.putNextEntry(entry);
while ((mByteCount = fis.read(mBuffer)) != -1) {
jos.write(mBuffer, 0, mByteCount);
if (mVerbose) {
System.out.println("wrote " + mByteCount + " bytes");
}
}
jos.flush();
jos.closeEntry();
} catch (IOException ioe) {
throw ioe;
} finally {
fis.close();
}
}
}
// for debugging
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.err.println("Usage: JarHelper jarname.jar directory");
return;
}
JarHelper jarHelper = new JarHelper();
jarHelper.mVerbose = true;
File destJar = new File(args[0]);
File dirOrFile2Jar = new File(args[1]);
jarHelper.jarDir(dirOrFile2Jar, destJar);
}
}

View file

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.flink;
import static org.junit.Assert.assertEquals;
import java.util.Properties;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class FlinkInterpreterTest {
private FlinkInterpreter flink;
private InterpreterContext context;
@Before
public void setUp() {
Properties p = new Properties();
flink = new FlinkInterpreter(p);
flink.open();
context = new InterpreterContext(null, null, null, null, null, null, null);
}
@After
public void tearDown() {
flink.close();
flink.destroy();
}
@Test
public void testSimpleStatement() {
InterpreterResult result = flink.interpret("val a=1", context);
result = flink.interpret("print(a)", context);
assertEquals("1", result.message());
}
@Test
public void testWordCount() {
flink.interpret("val text = env.fromElements(\"To be or not to be\")", context);
flink.interpret("val counts = text.flatMap { _.toLowerCase.split(\" \") }.map { (_, 1) }.groupBy(0).sum(1)", context);
flink.interpret("counts.print()", context);
InterpreterResult result = flink.interpret("env.execute(\"WordCount Example\")", context);
assertEquals("", result.message());
}
}

View file

@ -92,6 +92,7 @@
<module>shell</module>
<module>hive</module>
<module>tajo</module>
<module>flink</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-distribution</module>

View file

@ -389,7 +389,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.angular.AngularInterpreter,"
+ "org.apache.zeppelin.shell.ShellInterpreter,"
+ "org.apache.zeppelin.hive.HiveInterpreter,"
+ "org.apache.zeppelin.tajo.TajoInterpreter"),
+ "org.apache.zeppelin.tajo.TajoInterpreter,"
+ "org.apache.zeppelin.flink.FlinkInterpreter"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),