Rename argz param, fix style

This commit is contained in:
Rafal Wojdyla 2016-09-29 21:33:44 -04:00
parent 0a3b49a50a
commit 93233a8f8b
3 changed files with 28 additions and 16 deletions

View file

@ -34,7 +34,7 @@ Scio is a Scala DSL for [Google Cloud Dataflow](https://github.com/GoogleCloudPl
<th>Description</th>
</tr>
<tr>
<td>argz</td>
<td>zeppelin.scio.argz</td>
<td>--runner=InProcessPipelineRunner</td>
<td>Scio Pipeline runner</td>
</tr>

View file

@ -5,9 +5,9 @@
"className": "org.apache.zeppelin.scio.ScioInterpreter",
"defaultInterpreter": true,
"properties": {
"argz": {
"envName": null,
"propertyName": null,
"zeppelin.scio.argz": {
"envName": "ZEPPELIN_SCIO_ARGZ",
"propertyName": "zeppelin.scio.argz",
"defaultValue": "--runner=InProcessPipelineRunner",
"description": "Scio interpreter wide arguments"
},

View file

@ -36,6 +36,25 @@ import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.JPrintWriter
import scala.tools.nsc.util.ClassPath
/**
* Scio interpreter for Zeppelin.
*
* <ul>
* <li>{@code zeppelin.scio.argz} - Scio interpreter wide arguments</li>
* <li>{@code zeppelin.scio.maxResult} - Max number of SCollection results to display.</li>
* </ul>
*
* <p>
* How to use: <br/>
* {@code
* %scio
* val (sc, args) = ContextAndArgs(argz)
* sc.parallelize(Seq("foo", "foo", "bar")).countByValue.closeAndDisplay()
* }
* </p>
*
*/
class ScioInterpreter(property: Properties) extends Interpreter(property) {
private val logger = LoggerFactory.getLogger(classOf[ScioInterpreter])
private var REPL: ScioILoop = null
@ -43,7 +62,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
val innerOut = new InterpreterOutputStream(logger)
override def open(): Unit = {
val argz: List[String] = Option(getProperty("argz"))
val argz: List[String] = Option(getProperty("zeppelin.scio.argz"))
.getOrElse(s"--runner=${classOf[InProcessPipelineRunner].getSimpleName}")
.split(" ")
.map(_.trim)
@ -75,9 +94,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
// There should be no harm if we keep this for sbt launch.
val thisJar = this.getClass.getProtectionDomain.getCodeSource.getLocation.getPath
// In some cases this may be `target/classes`
if(thisJar.endsWith(".jar")) {
settings.plugin.appendToValue(thisJar)
}
if(thisJar.endsWith(".jar")) settings.plugin.appendToValue(thisJar)
ClassPath.split(settings.classpath.value)
.find(File(_).name.startsWith("paradise_"))
@ -119,11 +136,8 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
cls.getMethods.flatMap { m =>
val n = m.getName
if ((!n.startsWith("get") && !n.startsWith("is")) ||
m.getParameterTypes.nonEmpty || m.getReturnType == classOf[Unit]) {
None
} else {
Some(Introspector.decapitalize(n.substring(if (n.startsWith("is")) 2 else 3)))
}
m.getParameterTypes.nonEmpty || m.getReturnType == classOf[Unit]) None
else Some(Introspector.decapitalize(n.substring(if (n.startsWith("is")) 2 else 3)))
}.map(s => s"--$s($$|=)".r)
}
@ -174,9 +188,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
// not implemented
}
override def getFormType: FormType = {
FormType.NATIVE
}
override def getFormType: FormType = FormType.NATIVE
override def getProgress(context: InterpreterContext): Int = {
// not implemented