mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Add TableRow display helper + style
This commit is contained in:
parent
be252f81cc
commit
61850d7f65
2 changed files with 52 additions and 16 deletions
|
|
@ -15,7 +15,7 @@
|
|||
"envName": "ZEPPELIN_SCIO_MAXRESULT",
|
||||
"propertyName": "zeppelin.scio.maxResult",
|
||||
"defaultValue": "1000",
|
||||
"description": "Max number of SCollection result to display."
|
||||
"description": "Max number of SCollection results to display."
|
||||
}
|
||||
},
|
||||
"editor": {
|
||||
|
|
|
|||
|
|
@ -17,7 +17,9 @@
|
|||
|
||||
package org.apache.zeppelin
|
||||
|
||||
import com.google.api.services.bigquery.model.TableSchema
|
||||
import com.spotify.scio._
|
||||
import com.spotify.scio.bigquery.TableRow
|
||||
import com.spotify.scio.values.SCollection
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.avro.generic.GenericRecord
|
||||
|
|
@ -27,6 +29,9 @@ import scala.reflect.ClassTag
|
|||
package object scio {
|
||||
private val SCollectionEmptyMsg = "\n%html <font color=red>Result SCollection is empty!</font>\n"
|
||||
private val maxResults = Integer.getInteger("zeppelin.scio.maxResult", 1000)
|
||||
private val tab = "\t"
|
||||
private val newline = "\n"
|
||||
private val table = "%table"
|
||||
|
||||
private def materialize[T: ClassTag](self: SCollection[T]) = {
|
||||
val f = self.materialize
|
||||
|
|
@ -36,7 +41,7 @@ package object scio {
|
|||
|
||||
private def notifIfTruncated(it: Iterator[_]): Unit = {
|
||||
if(it.hasNext)
|
||||
println("\n<font color=red>Results are limited to " + maxResults + " rows.</font>\n")
|
||||
println(s"$newline<font color=red>Results are limited to " + maxResults + s" rows.</font>$newline")
|
||||
}
|
||||
|
||||
// TODO: scala 2.11
|
||||
|
|
@ -51,7 +56,7 @@ package object scio {
|
|||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
} else {
|
||||
println(s"""%table value\n${it.take(maxResults).map(printer).mkString("\n")}""")
|
||||
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
|
|
@ -69,7 +74,7 @@ package object scio {
|
|||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
} else {
|
||||
println(s"""%table value\n${it.take(maxResults).map(printer).mkString("\n")}""")
|
||||
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
|
|
@ -86,8 +91,8 @@ package object scio {
|
|||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
} else {
|
||||
val content = it.take(maxResults).map{ case (k, v) => s"$k\t$v" }.mkString("\n")
|
||||
println(s"""%table key\tvalue\n$content""")
|
||||
val content = it.take(maxResults).map{ case (k, v) => s"$k$tab$v" }.mkString(newline)
|
||||
println(s"$table key${tab}value$newline$content")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
|
|
@ -110,10 +115,10 @@ package object scio {
|
|||
//TODO is this safe field name to value iterator?
|
||||
val fieldNames = first.getClass.getDeclaredFields.map(_.getName)
|
||||
|
||||
val header = fieldNames.mkString("\t")
|
||||
val firstStr = first.productIterator.mkString("\t")
|
||||
val content = it.take(maxResults).map(_.productIterator.mkString("\t")).mkString("\n")
|
||||
println(s"""%table $header\n$firstStr\n$content""")
|
||||
val header = fieldNames.mkString(tab)
|
||||
val firstStr = first.productIterator.mkString(tab)
|
||||
val content = it.take(maxResults).map(_.productIterator.mkString(tab)).mkString(newline)
|
||||
println(s"$table $header$newline$firstStr$newline$content")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
|
|
@ -133,14 +138,45 @@ package object scio {
|
|||
} else {
|
||||
val first = it.next()
|
||||
import collection.JavaConverters._
|
||||
val fieldNames = first.getSchema.getFields.iterator.asScala.map(_.name()).toArray
|
||||
|
||||
val header = fieldNames.mkString("\t")
|
||||
val firstStr = fieldNames.map(first.get(_)).mkString("\t")
|
||||
val fieldNames = if (schema != null) {
|
||||
schema.getFields.iterator().asScala.map(_.name()).toArray
|
||||
} else {
|
||||
first.getSchema.getFields.iterator.asScala.map(_.name()).toArray
|
||||
}
|
||||
|
||||
val header = fieldNames.mkString(tab)
|
||||
val firstStr = fieldNames.map(first.get).mkString(tab)
|
||||
val content = it.take(maxResults)
|
||||
.map(r => fieldNames.map(r.get(_)).mkString("\t"))
|
||||
.mkString("\n")
|
||||
println(s"""%table $header\n$firstStr\n$content""")
|
||||
.map(r => fieldNames.map(r.get).mkString(tab))
|
||||
.mkString(newline)
|
||||
println(s"$table $header$newline$firstStr$newline$content")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: scala 2.11
|
||||
// implicit class ZeppelinBQTableSCollection[T: ClassTag](val self: SCollection[T])(implicit ev: T <:< TableRow) extends AnyVal {
|
||||
implicit class ZeppelinBQTableSCollection[T: ClassTag](val self: SCollection[T])
|
||||
(implicit ev: T <:< TableRow) {
|
||||
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
|
||||
* and display elements from TableRow like SCollection */
|
||||
def closeAndDisplay(schema: TableSchema): Unit = {
|
||||
val it = materialize(self).waitForResult().value
|
||||
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
} else {
|
||||
import collection.JavaConverters._
|
||||
val fields = schema.getFields.asScala.map(_.getName).toArray
|
||||
val header = fields.mkString(tab)
|
||||
|
||||
val content = it.take(maxResults)
|
||||
.map(r => fields.map(r.get).mkString(tab))
|
||||
.mkString(newline)
|
||||
|
||||
println(s"$table $header$newline$content")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue