mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Add display helpers
This commit is contained in:
parent
6ff4e95c54
commit
9a21aa0a64
2 changed files with 124 additions and 3 deletions
|
|
@ -43,7 +43,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
|||
val innerOut = new InterpreterOutputStream(logger)
|
||||
|
||||
override def open(): Unit = {
|
||||
val args: List[String] = Option(getProperty("argz"))
|
||||
val argz: List[String] = Option(getProperty("argz"))
|
||||
.getOrElse(s"--runner=${classOf[InProcessPipelineRunner].getSimpleName}")
|
||||
.split(" ")
|
||||
.map(_.trim)
|
||||
|
|
@ -92,7 +92,7 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
|||
null,
|
||||
Thread.currentThread.getContextClassLoader)
|
||||
|
||||
val (dfArgs, _) = parseAndPartitionArgs(args)
|
||||
val (dfArgs, _) = parseAndPartitionArgs(argz)
|
||||
|
||||
REPL = new ScioILoop(scioClassLoader, dfArgs, None, new JPrintWriter(innerOut))
|
||||
scioClassLoader.setRepl(REPL)
|
||||
|
|
@ -107,7 +107,8 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
|
|||
|
||||
REPL.settings_=(settings)
|
||||
REPL.createInterpreter()
|
||||
REPL.interpret(s"""val argz = Array("${args.mkString("\", \"")}")""")
|
||||
REPL.interpret(s"""val argz = Array("${argz.mkString("\", \"")}")""")
|
||||
REPL.interpret("import org.apache.zeppelin.scio._")
|
||||
}
|
||||
|
||||
private def parseAndPartitionArgs(args: List[String]): (List[String], List[String]) = {
|
||||
|
|
|
|||
120
scio/src/main/scala/org/apache/zeppelin/scio/package.scala
Normal file
120
scio/src/main/scala/org/apache/zeppelin/scio/package.scala
Normal file
|
|
@ -0,0 +1,120 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.zeppelin
|
||||
|
||||
import com.spotify.scio.values.SCollection
|
||||
import com.spotify.scio._
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
package object scio {
|
||||
private val SCollectionEmptyMsg = "\n%html <font color=red>Result SCollection is empty!</font>\n"
|
||||
private val maxResults = 1000
|
||||
|
||||
private def materialize[T: ClassTag](self: SCollection[T]) = {
|
||||
val f = self.materialize
|
||||
self.context.close()
|
||||
f
|
||||
}
|
||||
|
||||
private def notifIfTruncated(it: Iterator[_]): Unit = {
|
||||
if(it.hasNext)
|
||||
println("\n<font color=red>Results are limited to " + maxResults + ".</font>\n")
|
||||
}
|
||||
|
||||
// TODO: scala 2.11
|
||||
// implicit class ZeppelinSCollection[T: ClassTag](private val self: SCollection[T]) extends AnyVal {
|
||||
implicit class ZeppelinSCollection[T: ClassTag](val self: SCollection[T])
|
||||
(implicit ev: T <:< AnyVal) {
|
||||
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
|
||||
* and display elements from SCollection. */
|
||||
def closeAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = {
|
||||
val it = materialize(self).waitForResult().value
|
||||
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
} else {
|
||||
println(s"""%table value\n${it.take(maxResults).map(printer).mkString("\n")}""")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: scala 2.11
|
||||
// implicit class ZeppelinSCollection[T: ClassTag](private val self: SCollection[T]) extends AnyVal {
|
||||
implicit class ZeppelinStringSCollection[T: ClassTag](val self: SCollection[T])
|
||||
(implicit ev: T <:< String) {
|
||||
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
|
||||
* and display elements from SCollection. */
|
||||
def closeAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = {
|
||||
val it = materialize(self).waitForResult().value
|
||||
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
} else {
|
||||
println(s"""%table value\n${it.take(maxResults).map(printer).mkString("\n")}""")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: scala 2.11
|
||||
// implicit class ZeppelinKVSCollection[K: ClassTag, V: ClassTag](val self: SCollection[(K, V)]) extends AnyVal {
|
||||
implicit class ZeppelinKVSCollection[K: ClassTag, V: ClassTag](val self: SCollection[(K, V)]) {
|
||||
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
|
||||
* and display elements from KV SCollection. */
|
||||
def closeAndDisplay(): Unit = {
|
||||
val it = materialize(self).waitForResult().value
|
||||
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
} else {
|
||||
val content = it.take(maxResults).map{ case (k, v) => s"$k\t$v" }.mkString("\n")
|
||||
println(s"""%table key\tvalue\n$content""")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TODO: scala 2.11
|
||||
// implicit class ZeppelinProductSCollection[T: ClassTag](val self: SCollection[T])(implicit ev: T <:< Product) extends AnyVal {
|
||||
implicit class ZeppelinProductSCollection[T: ClassTag](val self: SCollection[T])
|
||||
(implicit ev: T <:< Product) {
|
||||
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
|
||||
* and display elements from Product like SCollection */
|
||||
def closeAndDisplay(): Unit = {
|
||||
val it = materialize(self).waitForResult().value
|
||||
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
} else {
|
||||
val first = it.next()
|
||||
//TODO is this safe field name to value iterator?
|
||||
val fieldNames = first.getClass.getDeclaredFields.map(_.getName)
|
||||
|
||||
val header = fieldNames.mkString("\t")
|
||||
val firstStr = first.productIterator.mkString("\t")
|
||||
val content = it.take(maxResults).map(_.productIterator.mkString("\t")).mkString("\n")
|
||||
println(s"""%table $header\n$firstStr\n$content""")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue