Add Scio interpreter

This commit is contained in:
Rafal Wojdyla 2016-09-28 00:36:20 -04:00
parent 7b596ea5c2
commit 38abaf9e04
8 changed files with 557 additions and 4 deletions

View file

@ -35,4 +35,5 @@ md org.apache.zeppelin:zeppelin-markdown:0.6.1 Markdown
pig org.apache.zeppelin:zeppelin-pig:0.6.1 Pig interpreter
postgresql org.apache.zeppelin:zeppelin-postgresql:0.6.1 Postgresql interpreter
python org.apache.zeppelin:zeppelin-python:0.6.1 Python interpreter
scio org.apache.zeppelin:zeppelin-scio:0.6.1 Scio interpreter
shell org.apache.zeppelin:zeppelin-shell:0.6.1 Shell command

View file

@ -190,7 +190,7 @@
<property>
<name>zeppelin.interpreters</name>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter, org.apache.zeppelin.pig.PigQueryInterpreter</value>
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.rinterpreter.RRepl,org.apache.zeppelin.rinterpreter.KnitR,org.apache.zeppelin.spark.SparkRInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.file.HDFSFileInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,,org.apache.zeppelin.python.PythonInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter,org.apache.zeppelin.postgresql.PostgreSqlInterpreter,org.apache.zeppelin.jdbc.JDBCInterpreter,org.apache.zeppelin.kylin.KylinInterpreter,org.apache.zeppelin.elasticsearch.ElasticsearchInterpreter,org.apache.zeppelin.scalding.ScaldingInterpreter,org.apache.zeppelin.alluxio.AlluxioInterpreter,org.apache.zeppelin.hbase.HbaseInterpreter,org.apache.zeppelin.livy.LivySparkInterpreter,org.apache.zeppelin.livy.LivyPySparkInterpreter,org.apache.zeppelin.livy.LivySparkRInterpreter,org.apache.zeppelin.livy.LivySparkSQLInterpreter,org.apache.zeppelin.bigquery.BigQueryInterpreter,org.apache.zeppelin.beam.BeamInterpreter,org.apache.zeppelin.pig.PigInterpreter,org.apache.zeppelin.pig.PigQueryInterpreter,org.apache.zeppelin.scio.ScioInterpreter</value>
<description>Comma separated interpreter configurations. First interpreter become a default</description>
</property>

10
pom.xml
View file

@ -75,6 +75,7 @@
<module>elasticsearch</module>
<module>bigquery</module>
<module>alluxio</module>
<module>scio</module>
<module>zeppelin-web</module>
<module>zeppelin-server</module>
<module>zeppelin-distribution</module>
@ -589,7 +590,14 @@
<module>beam</module>
</modules>
</profile>
<profile>
<id>scio</id>
<modules>
<module>scio</module>
</modules>
</profile>
<profile>
<id>examples</id>
<modules>

18
scio/README.md Normal file
View file

@ -0,0 +1,18 @@
Scio interpreter for Apache Zeppelin
====================================
## Raison d'être:
Provide Scio Interpreter for Zeppelin.
## Build
```
mvn -pl zeppelin-interpreter,zeppelin-display,scio -DskipTests package
```
## Test
```
mvn -pl scio,zeppelin-display,zeppelin-interpreter -Dtest='org.apache.zeppelin.scio.*' -DfailIfNoTests=false test
```

341
scio/pom.xml Normal file
View file

@ -0,0 +1,341 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zeppelin</artifactId>
<groupId>org.apache.zeppelin</groupId>
<version>0.7.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<groupId>org.apache.zeppelin</groupId>
<artifactId>zeppelin-scio_2.10</artifactId>
<packaging>jar</packaging>
<version>0.7.0-SNAPSHOT</version>
<name>Zeppelin: Scio</name>
<description>Zeppelin Scio support</description>
<properties>
<scio.version>0.2.3</scio.version>
</properties>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-display_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.spotify</groupId>
<artifactId>scio-repl_${scala.binary.version}</artifactId>
<version>${scio.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
</dependency>
<!-- Aether :: maven dependency resolution -->
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-plugin-api</artifactId>
<version>3.0</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
<exclusion>
<groupId>org.sonatype.sisu</groupId>
<artifactId>sisu-inject-plexus</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.maven</groupId>
<artifactId>maven-model</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-api</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-util</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-impl</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.apache.maven</groupId>
<artifactId>maven-aether-provider</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-spi</artifactId>
</exclusion>
<exclusion>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-connector-file</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.sonatype.aether</groupId>
<artifactId>aether-connector-wagon</artifactId>
<version>1.12</version>
<exclusions>
<exclusion>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-provider-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-provider-api</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.codehaus.plexus</groupId>
<artifactId>plexus-utils</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-http-lightweight</artifactId>
<version>1.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-http-shared</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.maven.wagon</groupId>
<artifactId>wagon-http</artifactId>
<version>1.0</version>
<exclusions>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
<version>1.9</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-enforcer-plugin</artifactId>
<version>1.3.1</version>
<executions>
<execution>
<id>enforce</id>
<phase>none</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.17</version>
<configuration>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
<argLine>-Xmx1024m -XX:MaxPermSize=256m</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.8</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../interpreter/scio</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<includeScope>runtime</includeScope>
<artifactItems>
<artifactItem>
<groupId>${project.groupId}</groupId>
<artifactId>${project.artifactId}</artifactId>
<version>${project.version}</version>
<type>${project.packaging}</type>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<!-- Plugin to compile Scala code -->
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,19 @@
[
{
"group": "scio",
"name": "scio",
"className": "org.apache.zeppelin.scio.ScioInterpreter",
"defaultInterpreter": true,
"properties": {
"args": {
"envName": null,
"propertyName": null,
"defaultValue": "--runner=InProcessPipelineRunner",
"description": "Scio commandline args"
}
},
"editor": {
"language": "scala"
}
}
]

View file

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.scio
import java.io.PrintStream
import java.util
import java.util.Properties
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner
import com.spotify.scio.repl.{ScioILoop, ScioReplClassLoader}
import org.apache.zeppelin.interpreter.Interpreter.FormType
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream
import org.apache.zeppelin.interpreter.{Interpreter, InterpreterContext, InterpreterResult}
import org.slf4j.LoggerFactory
import scala.reflect.io.File
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.JPrintWriter
import scala.tools.nsc.util.ClassPath
class ScioInterpreter(property: Properties) extends Interpreter(property) {
private val logger = LoggerFactory.getLogger(classOf[ScioInterpreter])
private var REPL: ScioILoop = null
val innerOut = new InterpreterOutputStream(logger)
override def open(): Unit = {
val args: List[String] = Option(getProperty("args"))
.getOrElse(s"--runner=${classOf[InProcessPipelineRunner].getSimpleName}")
.split(" ")
.map(_.trim)
.toList
val settings = new Settings()
// For scala 2.10 - usejavacp
if (scala.util.Properties.versionString.contains("2.10.")) {
settings.classpath.append(System.getProperty("java.class.path"))
settings.usejavacp.value = true
}
def classLoaderURLs(cl: ClassLoader): Array[java.net.URL] = cl match {
case null => Array()
case u: java.net.URLClassLoader => u.getURLs ++ classLoaderURLs(cl.getParent)
case _ => classLoaderURLs(cl.getParent)
}
classLoaderURLs(Thread.currentThread().getContextClassLoader)
.foreach(u => settings.classpath.append(u.getPath))
// We have to make sure that scala macros are expandable. paradise plugin has to be added to
// -Xplugin paths. In case of assembly - paradise is included in assembly jar - thus we add
// itself to -Xplugin. If shell is started from sbt or classpath, paradise jar has to be in
// classpath, we find it and add it to -Xplugin.
// Repl assembly includes paradise's scalac-plugin.xml - required for BigQuery macro
// There should be no harm if we keep this for sbt launch.
val thisJar = this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath
// In some cases this may be `target/classes`
if(thisJar.endsWith(".jar")) {
settings.plugin.appendToValue(thisJar)
}
ClassPath.split(settings.classpath.value)
.find(File(_).name.startsWith("paradise_"))
.foreach(settings.plugin.appendToValue)
// Force the repl to be synchronous, so all cmds are executed in the same thread
settings.Yreplsync.value = true
val scioClassLoader = new ScioReplClassLoader(
ClassPath.toURLs(settings.classpath.value).toArray ++
classLoaderURLs(Thread.currentThread().getContextClassLoader),
null,
Thread.currentThread.getContextClassLoader)
REPL = new ScioILoop(scioClassLoader, args, None, new JPrintWriter(innerOut))
scioClassLoader.setRepl(REPL)
// Set classloader chain - expose top level abstract class loader down
// the chain to allow for readObject and latestUserDefinedLoader
// See https://gist.github.com/harrah/404272
settings.embeddedDefaults(scioClassLoader)
// No need for bigquery dumps
sys.props("bigquery.plugin.disable.dump") = "true"
REPL.settings_=(settings)
REPL.createInterpreter()
}
override def close(): Unit = {
logger.info("Closing Scio interpreter!")
REPL.closeInterpreter()
}
override def interpret(code: String, context: InterpreterContext): InterpreterResult = {
val paragraphId = context.getParagraphId
val consoleOut = new PrintStream(innerOut)
System.setOut(consoleOut)
innerOut.setInterpreterOutput(context.out)
try {
import tools.nsc.interpreter.Results._
REPL.interpret(code) match {
case Success => {
logger.debug(s"Successfully executed `$code` in $paragraphId")
new InterpreterResult(InterpreterResult.Code.SUCCESS)
}
case Error => {
logger.error(s"Error executing `$code` in $paragraphId")
new InterpreterResult(InterpreterResult.Code.ERROR)
}
case Incomplete => {
logger.warn(s"Code `$code` not complete in $paragraphId")
new InterpreterResult(InterpreterResult.Code.INCOMPLETE, "Incomplete expression")
}
}
} catch {
case e: Exception =>
logger.info("Interpreter exception", e)
new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage)
} finally {
innerOut.flush()
innerOut.setInterpreterOutput(null)
consoleOut.flush()
}
}
override def cancel(context: InterpreterContext): Unit = {
// not implemented
}
override def getFormType: FormType = {
FormType.NATIVE
}
override def getProgress(context: InterpreterContext): Int = {
// not implemented
42
}
override def completion(buf: String, cursor: Int): util.List[InterpreterCompletion] = {
//TODO: implement, delegate?
super.completion(buf, cursor)
}
}

View file

@ -544,7 +544,8 @@ public class ZeppelinConfiguration extends XMLConfiguration {
+ "org.apache.zeppelin.jdbc.JDBCInterpreter,"
+ "org.apache.zeppelin.hbase.HbaseInterpreter,"
+ "org.apache.zeppelin.bigquery.BigQueryInterpreter,"
+ "org.apache.zeppelin.beam.BeamInterpreter"),
+ "org.apache.zeppelin.beam.BeamInterpreter,"
+ "org.apache.zeppelin.scio.ScioInterpreter"),
ZEPPELIN_INTERPRETER_JSON("zeppelin.interpreter.setting", "interpreter-setting.json"),
ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"),
ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"),
@ -552,7 +553,7 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10),
ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh,"
+ "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch,"
+ "scalding,jdbc,hbase,bigquery,beam,pig"),
+ "scalding,jdbc,hbase,bigquery,beam,pig,scio"),
ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"),
ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"),
// use specified notebook (id) as homescreen