beam interpreter

This commit is contained in:
mahmoudelgamal 2016-06-26 00:03:06 +02:00
parent 85957ff037
commit ab7ee2d837
3 changed files with 665 additions and 0 deletions

418
beam/pom.xml Normal file
View file

@ -0,0 +1,418 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.6.0-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-beam</artifactId>
<version>0.6.0-incubating-SNAPSHOT</version>
<repositories>
<repository>
<id>repo.bodar.com</id>
<url>http://repo.bodar.com</url>
</repository>
</repositories>
<dependencies>
<!-- <dependency> <groupId>com.google.cloud.dataflow</groupId> <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>1.6.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId> <version>0.2.0-incubating-SNAPSHOT</version>
<type>pom</type> </dependency> <dependency> <groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>0.2.0-incubating-SNAPSHOT</version>
</dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-parent</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version> <type>pom</type> </dependency>
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>0.2.0-incubating</version> </dependency> <dependency> <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId> <version>0.2.0-incubating</version>
</dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version> </dependency> -->
<!--
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>1.6.0</version>
<exclusions>
<exclusion>
<artifactId>google-api-client-jackson2</artifactId>
<groupId>com.google.api-client</groupId>
</exclusion>
<exclusion>
<artifactId>google-http-client-jackson2</artifactId>
<groupId>com.google.http-client</groupId>
</exclusion>
</exclusions>
</dependency>
-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>netty-all</artifactId>
<groupId>io.netty</groupId>
</exclusion>
<exclusion>
<artifactId>akka-actor_2.10</artifactId>
<groupId>org.spark-project.akka</groupId>
</exclusion>
<exclusion>
<artifactId>akka-remote_2.10</artifactId>
<groupId>org.spark-project.akka</groupId>
</exclusion>
<exclusion>
<artifactId>akka-slf4j_2.10</artifactId>
<groupId>org.spark-project.akka</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.3.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.3.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.3.0</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.qdox</groupId>
<artifactId>qdox</artifactId>
<version>2.0-M3</version>
</dependency>
<dependency>
<groupId>org.mdkt.compiler</groupId>
<artifactId>InMemoryJavaCompiler</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>async-http-client</artifactId>
<version>1.9.31</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>2.12</version>
</dependency>
<dependency>
<groupId>com.googlecode.totallylazy</groupId>
<artifactId>totallylazy</artifactId>
<version>1.83</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<type>pom</type>
</dependency>
<!--
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-examples-java</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>slf4j-jdk14</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
-->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>google-http-client-jackson2</artifactId>
<groupId>com.google.http-client</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>netty-all</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10-examples</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!--
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<exclusions>
<exclusion>
<artifactId>google-http-client-jackson2</artifactId>
<groupId>com.google.http-client</groupId>
</exclusion>
</exclusions>
</dependency>
--><dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<type>jar</type>
</dependency>
<!-- <dependency>
<groupId>com.google.api.client</groupId>
<artifactId>google-api-client-json</artifactId>
<version>1.2.3-alpha</version>
</dependency>
<dependency>
<groupId>com.google.api.client</groupId>
<artifactId>google-api-client-util</artifactId>
<version>1.2.3-alpha</version>
</dependency>
<dependency>
<groupId>com.google.api.client</groupId>
<artifactId>google-api-client-auth</artifactId>
<version>1.2.3-alpha</version>
</dependency>
-->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<!-- <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId>
</dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId>
</dependency> -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.beam.runners.flink.examples.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.7</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/beam
</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
</configuration>
</execution>
<execution>
<id>copy-artifact</id>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/beam
</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,88 @@
package org.apache.zeppelin.beam;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.beam.examples.MinimalWordCount;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import com.google.gson.Gson;
/**
*
*/
public class BeamInterpreter extends Interpreter {
private String host = "http://localhost:8001";
private InterpreterContext context;
public BeamInterpreter(Properties property) {
super(property);
}
static {
Interpreter.register("beam", "beam", BeamInterpreter.class.getName(),
new InterpreterPropertyBuilder().build());
}
public static void main(String[] args) {
}
@Override
public void open() {
}
@Override
public void close() {
}
@Override
public InterpreterResult interpret(String st, InterpreterContext context) {
String uuid = "C" + UUID.randomUUID().toString().replace("-", "");
try {
String msg = CompileSourceInMemory.execute(uuid, st);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, msg);
} catch (Exception e) {
e.printStackTrace();
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}
}
@Override
public void cancel(InterpreterContext context) {
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
return 0;
}
@Override
public List<String> completion(String buf, int cursor) {
return null;
}
}

View file

@ -0,0 +1,159 @@
package org.apache.zeppelin.beam;
import javax.tools.Diagnostic;
import javax.tools.DiagnosticCollector;
import javax.tools.JavaCompiler;
import javax.tools.JavaCompiler.CompilationTask;
import javax.tools.JavaFileObject;
import javax.tools.SimpleJavaFileObject;
import javax.tools.ToolProvider;
import org.apache.log4j.Category;
import org.apache.log4j.Priority;
import com.thoughtworks.qdox.JavaProjectBuilder;
import com.thoughtworks.qdox.model.JavaClass;
import com.thoughtworks.qdox.model.JavaSource;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringReader;
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Arrays;
import java.util.List;
/**
* @author admin
*
*/
public class CompileSourceInMemory {
public static String execute(String className, String code) throws Exception {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
JavaProjectBuilder builder = new JavaProjectBuilder();
JavaSource src = builder.addSource(new StringReader(code));
// List<String> imports = src.getImports();
// String importsString = "";
//
// for (int i = 0; i < imports.size(); i++) {
// importsString += "import " + imports.get(i) + ";\n";
// }
List<JavaClass> classes = src.getClasses();
String classesSt = "";
String classMain = "", classMainName = "";
for (int i = 0; i < classes.size(); i++) {
boolean hasMain = false;
for (int j = 0; j < classes.get(i).getMethods().size(); j++) {
if (classes.get(i).getMethods().get(j).getName().equals("main")) {
hasMain = true;
break;
}
}
if (hasMain == true) {
classMain = classes.get(i).getCodeBlock() + "\n";
classMainName = classes.get(i).getName();
} else
classesSt += classes.get(i).getCodeBlock() + "\n";
}
code = code.replace(classMainName, className);
StringWriter writer = new StringWriter();
PrintWriter out = new PrintWriter(writer);
out.println(code);
out.close();
System.out.println(writer.toString());
JavaFileObject file = new JavaSourceFromString(className, writer.toString());
Iterable<? extends JavaFileObject> compilationUnits = Arrays.asList(file);
ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
PrintStream newOut = new PrintStream(baosOut);
PrintStream newErr = new PrintStream(baosErr);
// IMPORTANT: Save the old System.out!
PrintStream oldOut = System.out;
PrintStream oldErr = System.err;
// Tell Java to use your special stream
System.setOut(newOut);
System.setErr(newErr);
CompilationTask task = compiler.getTask(null, null, diagnostics, null, null, compilationUnits);
boolean success = task.call();
if (!success) {
for (Diagnostic diagnostic : diagnostics.getDiagnostics()) {
System.out.println(diagnostic.getMessage(null));
}
}
if (success) {
try {
URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI()
.toURL() });
Class.forName(className, true, classLoader)
.getDeclaredMethod("main", new Class[] { String[].class })
.invoke(null, new Object[] { null });
System.out.flush();
System.err.flush();
System.setOut(oldOut);
System.setErr(oldErr);
classLoader.clearAssertionStatus();
return baosOut.toString();
} catch (ClassNotFoundException e) {
e.printStackTrace(newErr);
System.err.println("Class not found: " + e);
throw new Exception(baosErr.toString());
} catch (NoSuchMethodException e) {
e.printStackTrace(newErr);
System.err.println("No such method: " + e);
throw new Exception(baosErr.toString());
} catch (IllegalAccessException e) {
e.printStackTrace(newErr);
System.err.println("Illegal access: " + e);
throw new Exception(baosErr.toString());
} catch (InvocationTargetException e) {
e.printStackTrace(newErr);
System.err.println("Invocation target: " + e);
throw new Exception(baosErr.toString());
}
} else {
throw new Exception(baosOut.toString());
}
}
}
class JavaSourceFromString extends SimpleJavaFileObject {
final String code;
JavaSourceFromString(String name, String code) {
super(URI.create("string:///" + name.replace('.', '/') + Kind.SOURCE.extension), Kind.SOURCE);
this.code = code;
}
@Override
public CharSequence getCharContent(boolean ignoreEncodingErrors) {
return code;
}
}