mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Fix style and number of records for take
This commit is contained in:
parent
9dcc8cef33
commit
c0f8ccff4d
1 changed files with 32 additions and 21 deletions
|
|
@ -29,15 +29,19 @@ import scala.reflect.ClassTag
|
|||
*/
|
||||
private[scio] object DisplayHelpers {
|
||||
|
||||
private val SCollectionEmptyMsg = "\n%html <font color=red>Result SCollection is empty!</font>\n"
|
||||
private[scio] val sCollectionEmptyMsg =
|
||||
"\n%html <font color=red>Result SCollection is empty!</font>\n"
|
||||
private val maxResults = Integer.getInteger("zeppelin.scio.maxResult", 1000)
|
||||
private val tab = "\t"
|
||||
private val newline = "\n"
|
||||
private val table = "%table"
|
||||
private[scio] val tab = "\t"
|
||||
private[scio] val newline = "\n"
|
||||
private[scio] val table = "%table"
|
||||
private[scio] val rowLimitReachedMsg =
|
||||
s"$newline<font color=red>Results are limited to " + maxResults + s" rows.</font>$newline"
|
||||
private[scio] val bQSchemaIncomplete =
|
||||
s"$newline<font color=red>Provided BigQuery Schema has not fields!</font>$newline"
|
||||
|
||||
private def notifyIfTruncated(it: Iterator[_]): Unit = {
|
||||
if(it.hasNext)
|
||||
println(s"$newline<font color=red>Results are limited to " + maxResults + s" rows.</font>$newline")
|
||||
if(it.hasNext) println(rowLimitReachedMsg)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -45,7 +49,7 @@ private[scio] object DisplayHelpers {
|
|||
*/
|
||||
private[scio] def displayAnyVal[T: ClassTag](it: Iterator[T], printer: (T) => String): Unit = {
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
println(sCollectionEmptyMsg)
|
||||
} else {
|
||||
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
|
||||
notifyIfTruncated(it)
|
||||
|
|
@ -57,7 +61,7 @@ private[scio] object DisplayHelpers {
|
|||
*/
|
||||
private[scio] def displayString[T: ClassTag](it: Iterator[T], printer: (T) => String): Unit = {
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
println(sCollectionEmptyMsg)
|
||||
} else {
|
||||
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
|
||||
notifyIfTruncated(it)
|
||||
|
|
@ -69,7 +73,7 @@ private[scio] object DisplayHelpers {
|
|||
*/
|
||||
private[scio] def displayKV[K: ClassTag, V: ClassTag](it: Iterator[(K,V)]): Unit = {
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
println(sCollectionEmptyMsg)
|
||||
} else {
|
||||
val content = it.take(maxResults).map{ case (k, v) => s"$k$tab$v" }.mkString(newline)
|
||||
println(s"$table key${tab}value$newline$content")
|
||||
|
|
@ -83,7 +87,7 @@ private[scio] object DisplayHelpers {
|
|||
private[scio] def displayProduct[T: ClassTag](it: Iterator[T])
|
||||
(implicit ev: T <:< Product): Unit = {
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
println(sCollectionEmptyMsg)
|
||||
} else {
|
||||
val first = it.next()
|
||||
//TODO is this safe field name to value iterator?
|
||||
|
|
@ -91,7 +95,7 @@ private[scio] object DisplayHelpers {
|
|||
|
||||
val header = fieldNames.mkString(tab)
|
||||
val firstStr = first.productIterator.mkString(tab)
|
||||
val content = it.take(maxResults).map(_.productIterator.mkString(tab)).mkString(newline)
|
||||
val content = it.take(maxResults - 1).map(_.productIterator.mkString(tab)).mkString(newline)
|
||||
println(s"$table $header$newline$firstStr$newline$content")
|
||||
notifyIfTruncated(it)
|
||||
}
|
||||
|
|
@ -104,7 +108,7 @@ private[scio] object DisplayHelpers {
|
|||
private[scio] def displayAvro[T: ClassTag](it: Iterator[T], schema: Schema = null)
|
||||
(implicit ev: T <:< GenericRecord): Unit = {
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
println(sCollectionEmptyMsg)
|
||||
} else {
|
||||
val first = it.next()
|
||||
import collection.JavaConverters._
|
||||
|
|
@ -117,7 +121,7 @@ private[scio] object DisplayHelpers {
|
|||
|
||||
val header = fieldNames.mkString(tab)
|
||||
val firstStr = fieldNames.map(first.get).mkString(tab)
|
||||
val content = it.take(maxResults)
|
||||
val content = it.take(maxResults - 1)
|
||||
.map(r => fieldNames.map(r.get).mkString(tab))
|
||||
.mkString(newline)
|
||||
println(s"$table $header$newline$firstStr$newline$content")
|
||||
|
|
@ -131,18 +135,25 @@ private[scio] object DisplayHelpers {
|
|||
private[scio] def displayBQTableRow[T: ClassTag](it: Iterator[T], schema: TableSchema)
|
||||
(implicit ev: T <:< TableRow) : Unit = {
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
println(sCollectionEmptyMsg)
|
||||
} else {
|
||||
import collection.JavaConverters._
|
||||
val fields = schema.getFields.asScala.map(_.getName).toArray
|
||||
val header = fields.mkString(tab)
|
||||
val fieldsOp = Option(schema.getFields)
|
||||
fieldsOp match {
|
||||
case None => println(bQSchemaIncomplete)
|
||||
case Some(f) => {
|
||||
val fields = f.asScala.map(_.getName).toArray
|
||||
|
||||
val content = it.take(maxResults)
|
||||
.map(r => fields.map(r.get).mkString(tab))
|
||||
.mkString(newline)
|
||||
val header = fields.mkString(tab)
|
||||
|
||||
println(s"$table $header$newline$content")
|
||||
notifyIfTruncated(it)
|
||||
val content = it.take(maxResults)
|
||||
.map(r => fields.map(r.get).mkString(tab))
|
||||
.mkString(newline)
|
||||
|
||||
println(s"$table $header$newline$content")
|
||||
notifyIfTruncated(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue