Add display helpers for Tap[T] and Future[Tap[T]]

This commit is contained in:
Rafal Wojdyla 2016-10-01 14:23:46 -04:00
parent 4014c817d5
commit 9dcc8cef33
4 changed files with 320 additions and 90 deletions

View file

@ -0,0 +1,149 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.scio
import com.google.api.services.bigquery.model.TableSchema
import com.spotify.scio.bigquery._
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.reflect.ClassTag
/**
* Set of helpers for Zeppelin Display system.
*/
private[scio] object DisplayHelpers {
private val SCollectionEmptyMsg = "\n%html <font color=red>Result SCollection is empty!</font>\n"
private val maxResults = Integer.getInteger("zeppelin.scio.maxResult", 1000)
private val tab = "\t"
private val newline = "\n"
private val table = "%table"
private def notifyIfTruncated(it: Iterator[_]): Unit = {
if(it.hasNext)
println(s"$newline<font color=red>Results are limited to " + maxResults + s" rows.</font>$newline")
}
/**
* Displays [[AnyVal]] values from given [[Iterator]].
*/
private[scio] def displayAnyVal[T: ClassTag](it: Iterator[T], printer: (T) => String): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
notifyIfTruncated(it)
}
}
/**
* Displays [[String]] values from given [[Iterator]].
*/
private[scio] def displayString[T: ClassTag](it: Iterator[T], printer: (T) => String): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
notifyIfTruncated(it)
}
}
/**
* Displays [[com.google.cloud.dataflow.sdk.values.KV]] values from given [[Iterator]].
*/
private[scio] def displayKV[K: ClassTag, V: ClassTag](it: Iterator[(K,V)]): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
val content = it.take(maxResults).map{ case (k, v) => s"$k$tab$v" }.mkString(newline)
println(s"$table key${tab}value$newline$content")
notifyIfTruncated(it)
}
}
/**
* Displays [[Product]] values from given [[Iterator]].
*/
private[scio] def displayProduct[T: ClassTag](it: Iterator[T])
(implicit ev: T <:< Product): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
val first = it.next()
//TODO is this safe field name to value iterator?
val fieldNames = first.getClass.getDeclaredFields.map(_.getName)
val header = fieldNames.mkString(tab)
val firstStr = first.productIterator.mkString(tab)
val content = it.take(maxResults).map(_.productIterator.mkString(tab)).mkString(newline)
println(s"$table $header$newline$firstStr$newline$content")
notifyIfTruncated(it)
}
}
/**
* Displays Avro values from given [[Iterator]] using optional [[Schema]].
* @param schema optional "view" schema, otherwise schema is inferred from the first object
*/
private[scio] def displayAvro[T: ClassTag](it: Iterator[T], schema: Schema = null)
(implicit ev: T <:< GenericRecord): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
val first = it.next()
import collection.JavaConverters._
val fieldNames = if (schema != null) {
schema.getFields.iterator().asScala.map(_.name()).toArray
} else {
first.getSchema.getFields.iterator.asScala.map(_.name()).toArray
}
val header = fieldNames.mkString(tab)
val firstStr = fieldNames.map(first.get).mkString(tab)
val content = it.take(maxResults)
.map(r => fieldNames.map(r.get).mkString(tab))
.mkString(newline)
println(s"$table $header$newline$firstStr$newline$content")
notifyIfTruncated(it)
}
}
/**
* Displays [[TableRow]] values from given [[Iterator]] using specified [[TableSchema]].
*/
private[scio] def displayBQTableRow[T: ClassTag](it: Iterator[T], schema: TableSchema)
(implicit ev: T <:< TableRow) : Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
import collection.JavaConverters._
val fields = schema.getFields.asScala.map(_.getName).toArray
val header = fields.mkString(tab)
val content = it.take(maxResults)
.map(r => fields.map(r.get).mkString(tab))
.mkString(newline)
println(s"$table $header$newline$content")
notifyIfTruncated(it)
}
}
}

View file

@ -15,23 +15,21 @@
* limitations under the License.
*/
package org.apache.zeppelin
package org.apache.zeppelin.scio
import com.google.api.services.bigquery.model.TableSchema
import com.spotify.scio._
import com.spotify.scio.bigquery.TableRow
import com.spotify.scio.bigquery._
import com.spotify.scio.values.SCollection
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.reflect.ClassTag
package object scio {
private val SCollectionEmptyMsg = "\n%html <font color=red>Result SCollection is empty!</font>\n"
private val maxResults = Integer.getInteger("zeppelin.scio.maxResult", 1000)
private val tab = "\t"
private val newline = "\n"
private val table = "%table"
/**
* Implicit Zeppelin display helpers for SCollection.
*/
object DisplaySCollectionImplicits {
private def materialize[T: ClassTag](self: SCollection[T]) = {
val f = self.materialize
@ -39,26 +37,15 @@ package object scio {
f
}
private def notifIfTruncated(it: Iterator[_]): Unit = {
if(it.hasNext)
println(s"$newline<font color=red>Results are limited to " + maxResults + s" rows.</font>$newline")
}
// TODO: scala 2.11
// implicit class ZeppelinSCollection[T: ClassTag](private val self: SCollection[T]) extends AnyVal {
// implicit class ZeppelinSCollection[T: ClassTag](private val self: SCollection[T])(implicit ev: T <:< AnyVal) extends AnyVal {
implicit class ZeppelinSCollection[T: ClassTag](val self: SCollection[T])
(implicit ev: T <:< AnyVal) {
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
* and display elements from SCollection. */
def closeAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = {
val it = materialize(self).waitForResult().value
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
notifIfTruncated(it)
}
DisplayHelpers.displayAnyVal(it, printer)
}
}
@ -70,13 +57,7 @@ package object scio {
* and display elements from SCollection. */
def closeAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = {
val it = materialize(self).waitForResult().value
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
notifIfTruncated(it)
}
DisplayHelpers.displayString(it, printer)
}
}
@ -87,16 +68,8 @@ package object scio {
* and display elements from KV SCollection. */
def closeAndDisplay(): Unit = {
val it = materialize(self).waitForResult().value
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
val content = it.take(maxResults).map{ case (k, v) => s"$k$tab$v" }.mkString(newline)
println(s"$table key${tab}value$newline$content")
notifIfTruncated(it)
}
DisplayHelpers.displayKV(it)
}
}
// TODO: scala 2.11
@ -107,20 +80,7 @@ package object scio {
* and display elements from Product like SCollection */
def closeAndDisplay(): Unit = {
val it = materialize(self).waitForResult().value
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
val first = it.next()
//TODO is this safe field name to value iterator?
val fieldNames = first.getClass.getDeclaredFields.map(_.getName)
val header = fieldNames.mkString(tab)
val firstStr = first.productIterator.mkString(tab)
val content = it.take(maxResults).map(_.productIterator.mkString(tab)).mkString(newline)
println(s"$table $header$newline$firstStr$newline$content")
notifIfTruncated(it)
}
DisplayHelpers.displayProduct(it)
}
}
@ -132,53 +92,19 @@ package object scio {
* and display elements from Avro like SCollection */
def closeAndDisplay(schema: Schema = null): Unit = {
val it = materialize(self).waitForResult().value
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
val first = it.next()
import collection.JavaConverters._
val fieldNames = if (schema != null) {
schema.getFields.iterator().asScala.map(_.name()).toArray
} else {
first.getSchema.getFields.iterator.asScala.map(_.name()).toArray
}
val header = fieldNames.mkString(tab)
val firstStr = fieldNames.map(first.get).mkString(tab)
val content = it.take(maxResults)
.map(r => fieldNames.map(r.get).mkString(tab))
.mkString(newline)
println(s"$table $header$newline$firstStr$newline$content")
notifIfTruncated(it)
}
DisplayHelpers.displayAvro(it, schema)
}
}
// TODO: scala 2.11
// implicit class ZeppelinBQTableSCollection[T: ClassTag](val self: SCollection[T])(implicit ev: T <:< TableRow) extends AnyVal {
implicit class ZeppelinBQTableSCollection[T: ClassTag](val self: SCollection[T])
(implicit ev: T <:< TableRow) {
(implicit ev: T <:< TableRow) {
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
* and display elements from TableRow like SCollection */
def closeAndDisplay(schema: TableSchema): Unit = {
val it = materialize(self).waitForResult().value
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
import collection.JavaConverters._
val fields = schema.getFields.asScala.map(_.getName).toArray
val header = fields.mkString(tab)
val content = it.take(maxResults)
.map(r => fields.map(r.get).mkString(tab))
.mkString(newline)
println(s"$table $header$newline$content")
notifIfTruncated(it)
}
DisplayHelpers.displayBQTableRow(it, schema)
}
}

View file

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.scio
import com.google.api.services.bigquery.model.TableSchema
import com.spotify.scio.bigquery.TableRow
import com.spotify.scio.io.Tap
import com.spotify.scio._
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.concurrent.Future
import scala.reflect.ClassTag
/**
* Implicit Zeppelin display helpers for [[Tap]] and [[Future]] of a [[Tap]].
*/
object DisplayTapImplicits {
// TODO: scala 2.11
// implicit class ZeppelinTap[T: ClassTag](private val self: Tap[T])(implicit ev: T <:< AnyVal) extends AnyVal {
implicit class ZeppelinTap[T: ClassTag](val self: Tap[T])
(implicit ev: T <:< AnyVal) {
/** Convenience method to display [[com.spotify.scio.io.Tap]] of AnyVal. */
def display(printer: (T) => String = (e: T) => e.toString): Unit = {
DisplayHelpers.displayAnyVal(self.value, printer)
}
}
// TODO: scala 2.11
// implicit class ZeppelinFutureTap[T: ClassTag](private val self: Future[Tap[T]])(implicit ev: T <:< AnyVal) extends AnyVal {
implicit class ZeppelinFutureTap[T: ClassTag](val self: Future[Tap[T]])
(implicit ev: T <:< AnyVal) {
/** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of AnyVal. */
def waitAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = {
ZeppelinTap(self.waitForResult()).display(printer)
}
}
// TODO: scala 2.11
// implicit class ZeppelinStringTap[T: ClassTag](private val self: Tap[T])(implicit ev: T <:< String) extends AnyVal {
implicit class ZeppelinStringTap[T: ClassTag](val self: Tap[T])
(implicit ev: T <:< String) {
/** Convenience method to display [[com.spotify.scio.io.Tap]] of Strings. */
def display(printer: (T) => String = (e: T) => e.toString): Unit = {
DisplayHelpers.displayString(self.value, printer)
}
}
// TODO: scala 2.11
// implicit class ZeppelinFutureStringTap[T: ClassTag](private val self: Tap[T])(implicit ev: T <:< String) extends AnyVal {
implicit class ZeppelinFutureStringTap[T: ClassTag](val self: Future[Tap[T]])
(implicit ev: T <:< String) {
/** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of Strings. */
def waitAndDisplay(printer: (T) => String = (e: T) => e.toString): Unit = {
ZeppelinStringTap(self.waitForResult()).display(printer)
}
}
// TODO: scala 2.11
// implicit class ZeppelinKVTap[K: ClassTag, V: ClassTag](val self: Tap[(K, V)]) extends AnyVal {
implicit class ZeppelinKVTap[K: ClassTag, V: ClassTag](val self: Tap[(K, V)]) {
/** Convenience method to display [[com.spotify.scio.io.Tap]] of KV. */
def display(): Unit = {
DisplayHelpers.displayKV(self.value)
}
}
// TODO: scala 2.11
// implicit class ZeppelinFutureKVTap[K: ClassTag, V: ClassTag](val self: Future[Tap[(K, V)]]) extends AnyVal {
implicit class ZeppelinFutureKVTap[K: ClassTag, V: ClassTag](val self: Future[Tap[(K, V)]]) {
/** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of KV. */
def waitAndDisplay(): Unit = {
ZeppelinKVTap(self.waitForResult()).display()
}
}
// TODO: scala 2.11
// implicit class ZeppelinProductTap[T: ClassTag](val self: Tap[T])(implicit ev: T <:< Product) extends AnyVal {
implicit class ZeppelinProductTap[T: ClassTag](val self: Tap[T])
(implicit ev: T <:< Product) {
/** Convenience method to display [[com.spotify.scio.io.Tap]] of Product. */
def display(): Unit = {
DisplayHelpers.displayProduct(self.value)
}
}
// TODO: scala 2.11
// implicit class ZeppelinFutureProductTap[T: ClassTag](val self: Future[Tap[T]])(implicit ev: T <:< Product) extends AnyVal {
implicit class ZeppelinFutureProductTap[T: ClassTag](val self: Future[Tap[T]])
(implicit ev: T <:< Product) {
/** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of Product. */
def waitAndDisplay(): Unit = {
ZeppelinProductTap(self.waitForResult()).display()
}
}
// TODO: scala 2.11
// implicit class ZeppelinAvroTap[T: ClassTag](val self: Tap[T])(implicit ev: T <:< GenericRecord) extends AnyVal {
implicit class ZeppelinAvroTap[T: ClassTag](val self: Tap[T])
(implicit ev: T <:< GenericRecord) {
/** Convenience method to display [[com.spotify.scio.io.Tap]] of Avro. */
def display(schema: Schema = null): Unit = {
DisplayHelpers.displayAvro(self.value, schema)
}
}
// TODO: scala 2.11
// implicit class ZeppelinFutureAvroTap[T: ClassTag](val self: Future[Tap[T]])(implicit ev: T <:< GenericRecord) extends AnyVal {
implicit class ZeppelinFutureAvroTap[T: ClassTag](val self: Future[Tap[T]])
(implicit ev: T <:< GenericRecord) {
/** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of Avro. */
def waitAndDisplay(schema: Schema = null): Unit = {
ZeppelinAvroTap(self.waitForResult()).display(schema)
}
}
// TODO: scala 2.11
// implicit class ZeppelinBQTableTap[T: ClassTag](val self: Tap[T])(implicit ev: T <:< TableRow) extends AnyVal {
implicit class ZeppelinBQTableTap[T: ClassTag](val self: Tap[T])
(implicit ev: T <:< TableRow) {
/** Convenience method to display [[com.spotify.scio.io.Tap]] of BigQuery TableRow. */
def display(schema: TableSchema): Unit = {
DisplayHelpers.displayBQTableRow(self.value, schema)
}
}
// TODO: scala 2.11
// implicit class ZeppelinFutureBQTableTap[T: ClassTag](val self: Future[Tap[T]])(implicit ev: T <:< TableRow) extends AnyVal {
implicit class ZeppelinFutureBQTableTap[T: ClassTag](val self: Future[Tap[T]])
(implicit ev: T <:< TableRow) {
/** Convenience method to display [[Future]] of a [[com.spotify.scio.io.Tap]] of BigQuery
* TableRow. */
def waitAndDisplay(schema: TableSchema): Unit = {
ZeppelinBQTableTap(self.waitForResult()).display(schema)
}
}
}

View file

@ -18,7 +18,7 @@
package org.apache.zeppelin.scio
import java.beans.Introspector
import java.io.PrintStream}
import java.io.PrintStream
import java.util.Properties
import com.google.cloud.dataflow.sdk.options.{PipelineOptions, PipelineOptionsFactory}
@ -125,7 +125,8 @@ class ScioInterpreter(property: Properties) extends Interpreter(property) {
REPL.settings_=(settings)
REPL.createInterpreter()
REPL.interpret(s"""val argz = Array("${argz.mkString("\", \"")}")""")
REPL.interpret("import org.apache.zeppelin.scio._")
REPL.interpret("import org.apache.zeppelin.scio.DisplaySCollectionImplicits._")
REPL.interpret("import org.apache.zeppelin.scio.DisplayTapImplicits._")
}
private def parseAndPartitionArgs(args: List[String]): (List[String], List[String]) = {