Add Zeppelin custom ContextAndArgs

To support Dataflow service execution, which requires REPL state dump
in jar, we add custom ContextAndArgs.
This commit is contained in:
Rafal Wojdyla 2016-10-07 19:36:46 -04:00
parent 0920fddd9f
commit 327273ea13
3 changed files with 45 additions and 3 deletions

View file

@ -53,7 +53,6 @@
<groupId>com.spotify</groupId>
<artifactId>scio-repl_${scala.binary.version}</artifactId>
<version>${scio.version}</version>
<description>REPL dependency brings other Scio dependencies</description>
</dependency>
<dependency>

View file

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.scio
import com.google.cloud.dataflow.sdk.options.PipelineOptions
import com.spotify.scio.repl.ReplScioContext
import com.spotify.scio.{Args, ScioContext}
/**
* Convenience object for creating [[com.spotify.scio.ScioContext]] and [[com.spotify.scio.Args]].
*/
object ContextAndArgs {
def apply(argz: Array[String]): (ScioContext, Args) = {
val (dfOpts, args) = ScioContext.parseArguments[PipelineOptions](argz)
val nextReplJar = this
.getClass
.getClassLoader
.asInstanceOf[{def getNextReplCodeJarPath: String}].getNextReplCodeJarPath
val sc = new ReplScioContext(dfOpts, List(nextReplJar))
sc.setName("sciozeppelin")
(sc, args)
}
}

View file

@ -55,15 +55,16 @@ import scala.tools.nsc.util.ClassPath
class ScioInterpreter(property: Properties) extends Interpreter(property) {
private val logger = LoggerFactory.getLogger(classOf[ScioInterpreter])
private var REPL: ScioILoop = null
private var REPL: ScioILoop = _
val innerOut = new InterpreterOutputStream(logger)
override def open(): Unit = {
val argz: List[String] = Option(getProperty("zeppelin.scio.argz"))
val argz = Option(getProperty("zeppelin.scio.argz"))
.getOrElse(s"--runner=${classOf[InProcessPipelineRunner].getSimpleName}")
.split(" ")
.map(_.trim)
.filter(_.nonEmpty)
.toList
// Process command line arguments into a settings object, and use that to start the REPL.
@ -127,6 +128,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
REPL.interpret(s"""val argz = Array("${argz.mkString("\", \"")}")""")
REPL.interpret("import org.apache.zeppelin.scio.DisplaySCollectionImplicits._")
REPL.interpret("import org.apache.zeppelin.scio.DisplayTapImplicits._")
REPL.interpret("import org.apache.zeppelin.scio.ContextAndArgs")
}
private def parseAndPartitionArgs(args: List[String]): (List[String], List[String]) = {