diff --git a/beam/pom.xml b/beam/pom.xml index 48122a689e..5e206419a6 100644 --- a/beam/pom.xml +++ b/beam/pom.xml @@ -4,6 +4,7 @@ + zeppelin org.apache.zeppelin @@ -15,22 +16,26 @@ zeppelin-beam 0.7.0-SNAPSHOT - - + + + apache-beam + https://repository.apache.org/content/repositories/snapshots/org/apache/beam/ + + + 0.1.0-incubating --> - org.apache.commons commons-exec diff --git a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java index d8ea9da993..cd5c07bdb2 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/BeamInterpreter.java @@ -1,6 +1,5 @@ package org.apache.zeppelin.beam; - import java.io.File; import java.io.PrintWriter; import java.io.StringWriter; @@ -23,18 +22,10 @@ import com.google.gson.Gson; */ public class BeamInterpreter extends Interpreter { - private String host = "http://localhost:8001"; - private InterpreterContext context; - public BeamInterpreter(Properties property) { super(property); } - - public static void main(String[] args) { - - } - @Override public void open() { @@ -42,16 +33,22 @@ public class BeamInterpreter extends Interpreter { @Override public void close() { - + File dir = new File("."); + for (int i = 0; i < dir.list().length; i++) { + File f = dir.listFiles()[i]; + System.out.println(f.getAbsolutePath()); + if (f.getAbsolutePath().contains(".class")) + f.delete(); + } } @Override public InterpreterResult interpret(String st, InterpreterContext context) { - String uuid = "C" + UUID.randomUUID().toString().replace("-", ""); + String className = "C" + UUID.randomUUID().toString().replace("-", ""); try { - String msg = CompileSourceInMemory.execute(uuid, st); + String msg = CompileSourceInMemory.execute(className, st); return new InterpreterResult(InterpreterResult.Code.SUCCESS, msg); } catch (Exception e) { e.printStackTrace(); diff --git a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java index f96374b263..ccf77c2fab 100644 --- a/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java +++ b/beam/src/main/java/org/apache/zeppelin/beam/CompileSourceInMemory.java @@ -14,20 +14,17 @@ import com.thoughtworks.qdox.model.JavaClass; import com.thoughtworks.qdox.model.JavaSource; import java.io.ByteArrayOutputStream; -import java.io.File; import java.io.PrintStream; import java.io.PrintWriter; import java.io.StringReader; import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.net.URI; -import java.net.URL; -import java.net.URLClassLoader; import java.util.Arrays; import java.util.List; /** - * @author admin + * @author Mahmoud * */ public class CompileSourceInMemory { @@ -39,31 +36,25 @@ public class CompileSourceInMemory { JavaProjectBuilder builder = new JavaProjectBuilder(); JavaSource src = builder.addSource(new StringReader(code)); - // List imports = src.getImports(); - // String importsString = ""; - // - // for (int i = 0; i < imports.size(); i++) { - // importsString += "import " + imports.get(i) + ";\n"; - // } - List classes = src.getClasses(); - String classesSt = ""; - String classMain = "", classMainName = ""; + String classMainName = null; for (int i = 0; i < classes.size(); i++) { boolean hasMain = false; for (int j = 0; j < classes.get(i).getMethods().size(); j++) { if (classes.get(i).getMethods().get(j).getName().equals("main")) { + classMainName = classes.get(i).getName(); hasMain = true; break; } } - if (hasMain == true) { - classMain = classes.get(i).getCodeBlock() + "\n"; - classMainName = classes.get(i).getName(); - } else - classesSt += classes.get(i).getCodeBlock() + "\n"; + if (hasMain == true) + break; } + + if (classMainName == null) + throw new Exception("There isn't any class containing Main method."); + code = code.replace(classMainName, className); StringWriter writer = new StringWriter(); @@ -71,11 +62,11 @@ public class CompileSourceInMemory { out.println(code); out.close(); + - System.out.println(writer.toString()); JavaFileObject file = new JavaSourceFromString(className, writer.toString()); - + Iterable compilationUnits = Arrays.asList(file); ByteArrayOutputStream baosOut = new ByteArrayOutputStream(); @@ -100,11 +91,9 @@ public class CompileSourceInMemory { } if (success) { try { - URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File("").toURI() - .toURL() }); - Class.forName(className, true, classLoader) - .getDeclaredMethod("main", new Class[] { String[].class }) - .invoke(null, new Object[] { null }); + + Class.forName(className).getDeclaredMethod("main", new Class[] { String[].class }) + .invoke(null, new Object[] { null }); System.out.flush(); System.err.flush(); @@ -112,8 +101,8 @@ public class CompileSourceInMemory { System.setOut(oldOut); System.setErr(oldErr); - classLoader.clearAssertionStatus(); + return baosOut.toString(); } catch (ClassNotFoundException e) { e.printStackTrace(newErr); diff --git a/conf/interpreter-list b/conf/interpreter-list index 17a6f1e4a3..77c4c4001a 100644 --- a/conf/interpreter-list +++ b/conf/interpreter-list @@ -24,6 +24,7 @@ cassandra org.apache.zeppelin:zeppelin-cassandra_2.11:0.6.1 Cassandr elasticsearch org.apache.zeppelin:zeppelin-elasticsearch:0.6.1 Elasticsearch interpreter file org.apache.zeppelin:zeppelin-file:0.6.1 HDFS file interpreter flink org.apache.zeppelin:zeppelin-flink_2.11:0.6.1 Flink interpreter built with Scala 2.11 +beam org.apache.zeppelin:zeppelin-beam:0.6.1 Beam interpreter hbase org.apache.zeppelin:zeppelin-hbase:0.6.1 Hbase interpreter ignite org.apache.zeppelin:zeppelin-ignite_2.11:0.6.1 Ignite interpreter built with Scala 2.11 jdbc org.apache.zeppelin:zeppelin-jdbc:0.6.1 Jdbc interpreter diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 77e0b1f3bc..9386fd78eb 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -184,13 +184,13 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter Comma separated interpreter configurations. First interpreter become a default zeppelin.interpreter.group.order - spark,md,angular,sh,livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,scalding,jdbc,hbase,bigquery + spark,md,angular,sh,livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,scalding,jdbc,hbase,bigquery,beam diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 75efe3956f..eb8f27b78d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -529,7 +529,8 @@ public class ZeppelinConfiguration extends XMLConfiguration { + "org.apache.zeppelin.scalding.ScaldingInterpreter," + "org.apache.zeppelin.jdbc.JDBCInterpreter," + "org.apache.zeppelin.hbase.HbaseInterpreter," - + "org.apache.zeppelin.bigquery.BigQueryInterpreter"), + + "org.apache.zeppelin.bigquery.BigQueryInterpreter", + + "org.apache.zeppelin.beam.BeamInterpreter"), ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), @@ -537,7 +538,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh," + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch," - + "scalding,jdbc,hbase,bigquery"), + + "scalding,jdbc,hbase,bigquery,beam"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), // use specified notebook (id) as homescreen