Add TableRow display helper + style

This commit is contained in:
Rafal Wojdyla 2016-09-29 20:54:47 -04:00
parent be252f81cc
commit 61850d7f65
2 changed files with 52 additions and 16 deletions

View file

@ -15,7 +15,7 @@
"envName": "ZEPPELIN_SCIO_MAXRESULT",
"propertyName": "zeppelin.scio.maxResult",
"defaultValue": "1000",
"description": "Max number of SCollection result to display."
"description": "Max number of SCollection results to display."
}
},
"editor": {

View file

@ -17,7 +17,9 @@
package org.apache.zeppelin
import com.google.api.services.bigquery.model.TableSchema
import com.spotify.scio._
import com.spotify.scio.bigquery.TableRow
import com.spotify.scio.values.SCollection
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
@ -27,6 +29,9 @@ import scala.reflect.ClassTag
package object scio {
private val SCollectionEmptyMsg = "\n%html <font color=red>Result SCollection is empty!</font>\n"
private val maxResults = Integer.getInteger("zeppelin.scio.maxResult", 1000)
private val tab = "\t"
private val newline = "\n"
private val table = "%table"
private def materialize[T: ClassTag](self: SCollection[T]) = {
val f = self.materialize
@ -36,7 +41,7 @@ package object scio {
private def notifIfTruncated(it: Iterator[_]): Unit = {
if(it.hasNext)
println("\n<font color=red>Results are limited to " + maxResults + " rows.</font>\n")
println(s"$newline<font color=red>Results are limited to " + maxResults + s" rows.</font>$newline")
}
// TODO: scala 2.11
@ -51,7 +56,7 @@ package object scio {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
println(s"""%table value\n${it.take(maxResults).map(printer).mkString("\n")}""")
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
notifIfTruncated(it)
}
}
@ -69,7 +74,7 @@ package object scio {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
println(s"""%table value\n${it.take(maxResults).map(printer).mkString("\n")}""")
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
notifIfTruncated(it)
}
}
@ -86,8 +91,8 @@ package object scio {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
val content = it.take(maxResults).map{ case (k, v) => s"$k\t$v" }.mkString("\n")
println(s"""%table key\tvalue\n$content""")
val content = it.take(maxResults).map{ case (k, v) => s"$k$tab$v" }.mkString(newline)
println(s"$table key${tab}value$newline$content")
notifIfTruncated(it)
}
}
@ -110,10 +115,10 @@ package object scio {
//TODO is this safe field name to value iterator?
val fieldNames = first.getClass.getDeclaredFields.map(_.getName)
val header = fieldNames.mkString("\t")
val firstStr = first.productIterator.mkString("\t")
val content = it.take(maxResults).map(_.productIterator.mkString("\t")).mkString("\n")
println(s"""%table $header\n$firstStr\n$content""")
val header = fieldNames.mkString(tab)
val firstStr = first.productIterator.mkString(tab)
val content = it.take(maxResults).map(_.productIterator.mkString(tab)).mkString(newline)
println(s"$table $header$newline$firstStr$newline$content")
notifIfTruncated(it)
}
}
@ -133,14 +138,45 @@ package object scio {
} else {
val first = it.next()
import collection.JavaConverters._
val fieldNames = first.getSchema.getFields.iterator.asScala.map(_.name()).toArray
val header = fieldNames.mkString("\t")
val firstStr = fieldNames.map(first.get(_)).mkString("\t")
val fieldNames = if (schema != null) {
schema.getFields.iterator().asScala.map(_.name()).toArray
} else {
first.getSchema.getFields.iterator.asScala.map(_.name()).toArray
}
val header = fieldNames.mkString(tab)
val firstStr = fieldNames.map(first.get).mkString(tab)
val content = it.take(maxResults)
.map(r => fieldNames.map(r.get(_)).mkString("\t"))
.mkString("\n")
println(s"""%table $header\n$firstStr\n$content""")
.map(r => fieldNames.map(r.get).mkString(tab))
.mkString(newline)
println(s"$table $header$newline$firstStr$newline$content")
notifIfTruncated(it)
}
}
}
// TODO: scala 2.11
// implicit class ZeppelinBQTableSCollection[T: ClassTag](val self: SCollection[T])(implicit ev: T <:< TableRow) extends AnyVal {
implicit class ZeppelinBQTableSCollection[T: ClassTag](val self: SCollection[T])
(implicit ev: T <:< TableRow) {
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
* and display elements from TableRow like SCollection */
def closeAndDisplay(schema: TableSchema): Unit = {
val it = materialize(self).waitForResult().value
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
import collection.JavaConverters._
val fields = schema.getFields.asScala.map(_.getName).toArray
val header = fields.mkString(tab)
val content = it.take(maxResults)
.map(r => fields.map(r.get).mkString(tab))
.mkString(newline)
println(s"$table $header$newline$content")
notifIfTruncated(it)
}
}