mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Inject argz to the Scio interpreter
This commit is contained in:
parent
570cfaabf1
commit
6ff4e95c54
2 changed files with 29 additions and 4 deletions
|
|
@ -5,11 +5,11 @@
|
|||
"className": "org.apache.zeppelin.scio.ScioInterpreter",
|
||||
"defaultInterpreter": true,
|
||||
"properties": {
|
||||
"args": {
|
||||
"argz": {
|
||||
"envName": null,
|
||||
"propertyName": null,
|
||||
"defaultValue": "--runner=InProcessPipelineRunner",
|
||||
"description": "Scio commandline args"
|
||||
"description": "Scio interpreter wide arguments"
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
|
|
|
|||
|
|
@ -17,10 +17,12 @@
|
|||
|
||||
package org.apache.zeppelin.scio
|
||||
|
||||
import java.beans.Introspector
|
||||
import java.io.PrintStream
|
||||
import java.util
|
||||
import java.util.Properties
|
||||
|
||||
import com.google.cloud.dataflow.sdk.options.{PipelineOptions, PipelineOptionsFactory}
|
||||
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner
|
||||
import com.spotify.scio.repl.{ScioILoop, ScioReplClassLoader}
|
||||
import org.apache.zeppelin.interpreter.Interpreter.FormType
|
||||
|
|
@ -41,7 +43,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
|||
val innerOut = new InterpreterOutputStream(logger)
|
||||
|
||||
override def open(): Unit = {
|
||||
val args: List[String] = Option(getProperty("args"))
|
||||
val args: List[String] = Option(getProperty("argz"))
|
||||
.getOrElse(s"--runner=${classOf[InProcessPipelineRunner].getSimpleName}")
|
||||
.split(" ")
|
||||
.map(_.trim)
|
||||
|
|
@ -90,7 +92,9 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
|||
null,
|
||||
Thread.currentThread.getContextClassLoader)
|
||||
|
||||
REPL = new ScioILoop(scioClassLoader, args, None, new JPrintWriter(innerOut))
|
||||
val (dfArgs, _) = parseAndPartitionArgs(args)
|
||||
|
||||
REPL = new ScioILoop(scioClassLoader, dfArgs, None, new JPrintWriter(innerOut))
|
||||
scioClassLoader.setRepl(REPL)
|
||||
|
||||
// Set classloader chain - expose top level abstract class loader down
|
||||
|
|
@ -103,6 +107,27 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
|||
|
||||
REPL.settings_=(settings)
|
||||
REPL.createInterpreter()
|
||||
REPL.interpret(s"""val argz = Array("${args.mkString("\", \"")}")""")
|
||||
}
|
||||
|
||||
private def parseAndPartitionArgs(args: List[String]): (List[String], List[String]) = {
|
||||
import scala.collection.JavaConverters._
|
||||
// Extract --pattern of all registered derived types of PipelineOptions
|
||||
val classes = PipelineOptionsFactory.getRegisteredOptions.asScala + classOf[PipelineOptions]
|
||||
val optPatterns = classes.flatMap { cls =>
|
||||
cls.getMethods.flatMap { m =>
|
||||
val n = m.getName
|
||||
if ((!n.startsWith("get") && !n.startsWith("is")) ||
|
||||
m.getParameterTypes.nonEmpty || m.getReturnType == classOf[Unit]) {
|
||||
None
|
||||
} else {
|
||||
Some(Introspector.decapitalize(n.substring(if (n.startsWith("is")) 2 else 3)))
|
||||
}
|
||||
}.map(s => s"--$s($$|=)".r)
|
||||
}
|
||||
|
||||
// Split cmdlineArgs into 2 parts, optArgs for PipelineOptions and appArgs for Args
|
||||
args.partition(arg => optPatterns.exists(_.findFirstIn(arg).isDefined))
|
||||
}
|
||||
|
||||
override def close(): Unit = {
|
||||
|
|
|
|||
Loading…
Reference in a new issue