Adding changes like logging and conventions and license

This commit is contained in:
mahmoudelgamal 2016-08-17 00:10:25 +02:00
parent 2aa6d6593c
commit 9c1e25d68b
9 changed files with 213 additions and 114 deletions

View file

@ -1,3 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with ~
this work for additional information regarding copyright ownership. ~ The
ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
"License"); you may not use this file except in compliance with ~ the License.
You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
~ ~ Unless required by applicable law or agreed to in writing, software ~
distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
License for the specific language governing permissions and ~ limitations
under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@ -14,17 +27,22 @@
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-beam</artifactId>
<packaging>jar</packaging>
<version>0.7.0-SNAPSHOT</version>
<name>Zeppelin: Beam interpreter</name>
<properties>
<beam.hadoop.version>2.3.0</beam.hadoop.version>
<beam.spark.version>1.4.1</beam.spark.version>
<beam.beam.version>0.1.0-incubating</beam.beam.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.4.1</version>
<version>${beam.spark.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@ -48,16 +66,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.1</version>
<version>${beam.spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.3.0</version>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@ -69,7 +88,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0</version>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@ -81,12 +100,13 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
<version>${beam.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.3.0</version>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@ -94,20 +114,23 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<version>2.3.0</version>
<version>${beam.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>2.3.0</version>
<version>${beam.hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>2.3.0</version>
<version>${beam.hadoop.version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
@ -115,44 +138,30 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.thoughtworks.qdox</groupId>
<artifactId>qdox</artifactId>
<version>2.0-M3</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.6.2</version>
</dependency>
<dependency>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
<version>2.12</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-parent</artifactId>
<version>0.1.0-incubating</version>
<version>${beam.beam.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
<version>0.1.0-incubating</version>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
<artifactId>google-http-client-jackson2</artifactId>
@ -160,16 +169,17 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>0.1.0-incubating</version>
<version>${beam.beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10</artifactId>
<version>0.1.0-incubating</version>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
@ -183,10 +193,11 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.10-examples</artifactId>
<version>0.1.0-incubating</version>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
@ -199,7 +210,7 @@
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>0.1.0-incubating</version>
<version>${beam.beam.version}</version>
<exclusions>
<exclusion>
@ -208,23 +219,21 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>0.1.0-incubating</version>
<version>${beam.beam.version}</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
@ -242,8 +251,6 @@
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>

View file

@ -1,27 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.beam;
import java.io.File;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Beam interpreter
*
*/
public class BeamInterpreter extends Interpreter {
Logger LOGGER = LoggerFactory.getLogger(BeamInterpreter.class);
public BeamInterpreter(Properties property) {
super(property);
}
@ -34,6 +49,7 @@ public class BeamInterpreter extends Interpreter {
@Override
public void close() {
File dir = new File(".");
// delete all .class files created while compilation process
for (int i = 0; i < dir.list().length; i++) {
File f = dir.listFiles()[i];
if (f.getAbsolutePath().contains(".class"))
@ -44,13 +60,14 @@ public class BeamInterpreter extends Interpreter {
@Override
public InterpreterResult interpret(String code, InterpreterContext context) {
String className = "C" + UUID.randomUUID().toString().replace("-", "");
// choosing new name to class containing Main method
String generatedClassName = "C" + UUID.randomUUID().toString().replace("-", "");
try {
String msg = StaticRepl.execute(className, code);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, msg);
String res = StaticRepl.execute(generatedClassName, code);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, res);
} catch (Exception e) {
// e.printStackTrace();
LOGGER.error("Exception in Interpreter while interpret", e);
return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
}

View file

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.beam;
import javax.tools.Diagnostic;
@ -8,6 +25,9 @@ import javax.tools.JavaFileObject;
import javax.tools.SimpleJavaFileObject;
import javax.tools.ToolProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.thoughtworks.qdox.JavaProjectBuilder;
import com.thoughtworks.qdox.model.JavaClass;
import com.thoughtworks.qdox.model.JavaSource;
@ -26,55 +46,60 @@ import java.util.Arrays;
import java.util.List;
/**
* @author Mahmoud
* StaticRepl for compling the java code in memory
*
*/
public class StaticRepl {
public static String execute(String className, String code) throws Exception {
static Logger LOGGER = LoggerFactory.getLogger(StaticRepl.class);
public static String execute(String generatedClassName, String code) throws Exception {
JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
// Java parasing
JavaProjectBuilder builder = new JavaProjectBuilder();
JavaSource src = builder.addSource(new StringReader(code));
// get all classes in code (paragraph)
List<JavaClass> classes = src.getClasses();
String classMainName = null;
String mainClassName = null;
// Searching for class containing Main method
for (int i = 0; i < classes.size(); i++) {
boolean hasMain = false;
for (int j = 0; j < classes.get(i).getMethods().size(); j++) {
if (classes.get(i).getMethods().get(j).getName().equals("main")) {
classMainName = classes.get(i).getName();
mainClassName = classes.get(i).getName();
hasMain = true;
break;
}
}
if (hasMain == true)
break;
}
if (classMainName == null)
// if there isn't Main method, will retuen error
if (mainClassName == null)
throw new Exception("There isn't any class containing Main method.");
code = code.replace(classMainName, className);
StringWriter writer = new StringWriter();
PrintWriter out = new PrintWriter(writer);
out.println(code);
out.close();
JavaFileObject file = new JavaSourceFromString(className, writer.toString());
// replace name of class containing Main method with generated name
code = code.replace(mainClassName, generatedClassName);
JavaFileObject file = new JavaSourceFromString(generatedClassName, code.toString());
Iterable<? extends JavaFileObject> compilationUnits = Arrays.asList(file);
ByteArrayOutputStream baosOut = new ByteArrayOutputStream();
ByteArrayOutputStream baosErr = new ByteArrayOutputStream();
// Creating new stream to get the output data
PrintStream newOut = new PrintStream(baosOut);
PrintStream newErr = new PrintStream(baosErr);
// IMPORTANT: Save the old System.out!
// Save the old System.out!
PrintStream oldOut = System.out;
PrintStream oldErr = System.err;
// Tell Java to use your special stream
@ -83,63 +108,74 @@ public class StaticRepl {
CompilationTask task = compiler.getTask(null, null, diagnostics, null, null, compilationUnits);
// executing the compilation process
boolean success = task.call();
// if success is false will get error
if (!success) {
for (Diagnostic diagnostic : diagnostics.getDiagnostics()) {
if (diagnostic.getLineNumber() == -1)
continue;
System.out.println("line " + diagnostic.getLineNumber() + " : "
System.err.println("line " + diagnostic.getLineNumber() + " : "
+ diagnostic.getMessage(null));
}
}
if (success) {
try {
URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI()
.toURL() });
Class.forName(className, true, classLoader)
.getDeclaredMethod("main", new Class[] { String[].class })
.invoke(null, new Object[] { null });
System.out.flush();
System.err.flush();
System.setOut(oldOut);
System.setErr(oldErr);
return baosOut.toString();
} catch (ClassNotFoundException e) {
e.printStackTrace(newErr);
System.err.println("Class not found: " + e);
throw new Exception(baosErr.toString());
} catch (NoSuchMethodException e) {
e.printStackTrace(newErr);
System.err.println("No such method: " + e);
throw new Exception(baosErr.toString());
} catch (IllegalAccessException e) {
e.printStackTrace(newErr);
System.err.println("Illegal access: " + e);
throw new Exception(baosErr.toString());
} catch (InvocationTargetException e) {
e.printStackTrace(newErr);
System.err.println("Invocation target: " + e);
throw new Exception(baosErr.toString());
} finally {
System.out.flush();
System.err.flush();
System.setOut(oldOut);
System.setErr(oldErr);
}
} else {
System.out.flush();
System.err.flush();
System.setOut(oldOut);
System.setErr(oldErr);
throw new Exception(baosOut.toString());
LOGGER.error("Exception in Interpreter while compilation", baosErr.toString());
throw new Exception(baosErr.toString());
} else {
try {
// creating new class loader
URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI()
.toURL() });
// execute the Main method
Class.forName(generatedClassName, true, classLoader)
.getDeclaredMethod("main", new Class[] { String[].class })
.invoke(null, new Object[] { null });
System.out.flush();
System.err.flush();
// set the stream to old stream
System.setOut(oldOut);
System.setErr(oldErr);
return baosOut.toString();
} catch (ClassNotFoundException e) {
LOGGER.error("Exception in Interpreter while Class not found", e);
System.err.println("Class not found: " + e);
throw new Exception(baosErr.toString());
} catch (NoSuchMethodException e) {
LOGGER.error("Exception in Interpreter while No such method", e);
System.err.println("No such method: " + e);
throw new Exception(baosErr.toString());
} catch (IllegalAccessException e) {
LOGGER.error("Exception in Interpreter while Illegal access", e);
System.err.println("Illegal access: " + e);
throw new Exception(baosErr.toString());
} catch (InvocationTargetException e) {
LOGGER.error("Exception in Interpreter while Invocation target", e);
System.err.println("Invocation target: " + e);
throw new Exception(baosErr.toString());
} finally {
System.out.flush();
System.err.flush();
System.setOut(oldOut);
System.setErr(oldErr);
}
}
}
}

View file

@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.beam;
import static org.junit.Assert.assertEquals;
@ -13,7 +30,7 @@ import org.junit.Test;
/**
*
* @author admin
* BeamInterpreterTest
*
*/
public class BeamInterpreterTest {

View file

@ -4,6 +4,20 @@ title: "Beam Interpreter"
description: ""
group: interpreter
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
{% include JB/setup %}
# Beam interpreter for Apache Zeppelin
@ -60,3 +74,9 @@ public class MinimalWordCount {
}
```
## Contributors
1- Mahmoud Fathy Elgamal (@mfelgamal)
2- Fouad Ali Al-Sayadi (@FouadMA)

View file

@ -42,10 +42,12 @@ The following components are provided under Apache License.
(Apache 2.0) Apache Kylin (http://kylin.apache.org/)
(Apache 2.0) Apache Lens (http://lens.apache.org/)
(Apache 2.0) Apache Flink (http://flink.apache.org/)
(Apache 2.0) Apache Beam (http://beam.apache.org/)
(Apache 2.0) Apache Thrift (http://thrift.apache.org/)
(Apache 2.0) Apache Lucene (https://lucene.apache.org/)
(Apache 2.0) Apache Zookeeper (org.apache.zookeeper:zookeeper:jar:3.4.5 - http://zookeeper.apache.org/)
(Apache 2.0) Chill (com.twitter:chill-java:jar:0.8.0 - https://github.com/twitter/chill/)
(Apache 2.0) QDox (com.thoughtworks.qdox:qdox:jar:2.0-M3 - https://github.com/paul-hammant/qdox/)
(Apache 2.0) Codehaus Plexus (org.codehaus.plexus:plexus:jar:1.5.6 - https://codehaus-plexus.github.io/)
(Apache 2.0) findbugs jsr305 (com.google.code.findbugs:jsr305:jar:1.3.9 - http://findbugs.sourceforge.net/)
(Apache 2.0) Google Guava (com.google.guava:guava:15.0 - https://code.google.com/p/guava-libraries/)

View file

@ -137,7 +137,7 @@ limitations under the License.
<script src="bower_components/angular-xeditable/dist/js/xeditable.js"></script>
<script src="bower_components/highlightjs/highlight.pack.js"></script>
<script src="bower_components/lodash/lodash.js"></script>
<script src="bower_components/angular-filter/dist/angular-filter.min.js"></script>
<script src="bower_components/angular-filter/dist/angular-filter.js"></script>
<script src="bower_components/ngtoast/dist/ngToast.js"></script>
<script src="bower_components/ng-focus-if/focusIf.js"></script>
<script src="bower_components/bootstrap3-dialog/dist/js/bootstrap-dialog.min.js"></script>

View file

@ -55,7 +55,7 @@ module.exports = function(config) {
'bower_components/angular-xeditable/dist/js/xeditable.js',
'bower_components/highlightjs/highlight.pack.js',
'bower_components/lodash/lodash.js',
'bower_components/angular-filter/dist/angular-filter.min.js',
'bower_components/angular-filter/dist/angular-filter.js',
'bower_components/ngtoast/dist/ngToast.js',
'bower_components/ng-focus-if/focusIf.js',
'bower_components/bootstrap3-dialog/dist/js/bootstrap-dialog.min.js',

View file

@ -529,7 +529,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.scalding.ScaldingInterpreter,"
+ "org.apache.zeppelin.jdbc.JDBCInterpreter,"
+ "org.apache.zeppelin.hbase.HbaseInterpreter,"
+ "org.apache.zeppelin.bigquery.BigQueryInterpreter",
+ "org.apache.zeppelin.bigquery.BigQueryInterpreter,"
+ "org.apache.zeppelin.beam.BeamInterpreter"),
ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),