Inject argz to the Scio interpreter

This commit is contained in:
Rafal Wojdyla 2016-09-29 02:27:11 -04:00
parent 570cfaabf1
commit 6ff4e95c54
2 changed files with 29 additions and 4 deletions

View file

@ -5,11 +5,11 @@
"className": "org.apache.zeppelin.scio.ScioInterpreter",
"defaultInterpreter": true,
"properties": {
"args": {
"argz": {
"envName": null,
"propertyName": null,
"defaultValue": "--runner=InProcessPipelineRunner",
"description": "Scio commandline args"
"description": "Scio interpreter wide arguments"
}
},
"editor": {

View file

@ -17,10 +17,12 @@
package org.apache.zeppelin.scio
import java.beans.Introspector
import java.io.PrintStream
import java.util
import java.util.Properties
import com.google.cloud.dataflow.sdk.options.{PipelineOptions, PipelineOptionsFactory}
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner
import com.spotify.scio.repl.{ScioILoop, ScioReplClassLoader}
import org.apache.zeppelin.interpreter.Interpreter.FormType
@ -41,7 +43,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
val innerOut = new InterpreterOutputStream(logger)
override def open(): Unit = {
val args: List[String] = Option(getProperty("args"))
val args: List[String] = Option(getProperty("argz"))
.getOrElse(s"--runner=${classOf[InProcessPipelineRunner].getSimpleName}")
.split(" ")
.map(_.trim)
@ -90,7 +92,9 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
null,
Thread.currentThread.getContextClassLoader)
REPL = new ScioILoop(scioClassLoader, args, None, new JPrintWriter(innerOut))
val (dfArgs, _) = parseAndPartitionArgs(args)
REPL = new ScioILoop(scioClassLoader, dfArgs, None, new JPrintWriter(innerOut))
scioClassLoader.setRepl(REPL)
// Set classloader chain - expose top level abstract class loader down
@ -103,6 +107,27 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
REPL.settings_=(settings)
REPL.createInterpreter()
REPL.interpret(s"""val argz = Array("${args.mkString("\", \"")}")""")
}
private def parseAndPartitionArgs(args: List[String]): (List[String], List[String]) = {
import scala.collection.JavaConverters._
// Extract --pattern of all registered derived types of PipelineOptions
val classes = PipelineOptionsFactory.getRegisteredOptions.asScala + classOf[PipelineOptions]
val optPatterns = classes.flatMap { cls =>
cls.getMethods.flatMap { m =>
val n = m.getName
if ((!n.startsWith("get") && !n.startsWith("is")) ||
m.getParameterTypes.nonEmpty || m.getReturnType == classOf[Unit]) {
None
} else {
Some(Introspector.decapitalize(n.substring(if (n.startsWith("is")) 2 else 3)))
}
}.map(s => s"--$s($$|=)".r)
}
// Split cmdlineArgs into 2 parts, optArgs for PipelineOptions and appArgs for Args
args.partition(arg => optPatterns.exists(_.findFirstIn(arg).isDefined))
}
override def close(): Unit = {