mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Add Zeppelin custom ContextAndArgs
To support Dataflow service execution, which requires REPL state dump in jar, we add custom ContextAndArgs.
This commit is contained in:
parent
0920fddd9f
commit
327273ea13
3 changed files with 45 additions and 3 deletions
|
|
@ -53,7 +53,6 @@
|
|||
<groupId>com.spotify</groupId>
|
||||
<artifactId>scio-repl_${scala.binary.version}</artifactId>
|
||||
<version>${scio.version}</version>
|
||||
<description>REPL dependency brings other Scio dependencies</description>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin.scio
|
||||
|
||||
import com.google.cloud.dataflow.sdk.options.PipelineOptions
|
||||
import com.spotify.scio.repl.ReplScioContext
|
||||
import com.spotify.scio.{Args, ScioContext}
|
||||
|
||||
/**
|
||||
* Convenience object for creating [[com.spotify.scio.ScioContext]] and [[com.spotify.scio.Args]].
|
||||
*/
|
||||
object ContextAndArgs {
|
||||
def apply(argz: Array[String]): (ScioContext, Args) = {
|
||||
val (dfOpts, args) = ScioContext.parseArguments[PipelineOptions](argz)
|
||||
|
||||
val nextReplJar = this
|
||||
.getClass
|
||||
.getClassLoader
|
||||
.asInstanceOf[{def getNextReplCodeJarPath: String}].getNextReplCodeJarPath
|
||||
|
||||
val sc = new ReplScioContext(dfOpts, List(nextReplJar))
|
||||
sc.setName("sciozeppelin")
|
||||
|
||||
(sc, args)
|
||||
}
|
||||
}
|
||||
|
|
@ -55,15 +55,16 @@ import scala.tools.nsc.util.ClassPath
|
|||
|
||||
class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
||||
private val logger = LoggerFactory.getLogger(classOf[ScioInterpreter])
|
||||
private var REPL: ScioILoop = null
|
||||
private var REPL: ScioILoop = _
|
||||
|
||||
val innerOut = new InterpreterOutputStream(logger)
|
||||
|
||||
override def open(): Unit = {
|
||||
val argz: List[String] = Option(getProperty("zeppelin.scio.argz"))
|
||||
val argz = Option(getProperty("zeppelin.scio.argz"))
|
||||
.getOrElse(s"--runner=${classOf[InProcessPipelineRunner].getSimpleName}")
|
||||
.split(" ")
|
||||
.map(_.trim)
|
||||
.filter(_.nonEmpty)
|
||||
.toList
|
||||
|
||||
// Process command line arguments into a settings object, and use that to start the REPL.
|
||||
|
|
@ -127,6 +128,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
|||
REPL.interpret(s"""val argz = Array("${argz.mkString("\", \"")}")""")
|
||||
REPL.interpret("import org.apache.zeppelin.scio.DisplaySCollectionImplicits._")
|
||||
REPL.interpret("import org.apache.zeppelin.scio.DisplayTapImplicits._")
|
||||
REPL.interpret("import org.apache.zeppelin.scio.ContextAndArgs")
|
||||
}
|
||||
|
||||
private def parseAndPartitionArgs(args: List[String]): (List[String], List[String]) = {
|
||||
|
|
|
|||
Loading…
Reference in a new issue