Fix style and number of records for take

This commit is contained in:
Rafal Wojdyla 2016-10-04 12:44:50 -04:00
parent 9dcc8cef33
commit c0f8ccff4d

View file

@ -29,15 +29,19 @@ import scala.reflect.ClassTag
*/
private[scio] object DisplayHelpers {
private val SCollectionEmptyMsg = "\n%html <font color=red>Result SCollection is empty!</font>\n"
private[scio] val sCollectionEmptyMsg =
"\n%html <font color=red>Result SCollection is empty!</font>\n"
private val maxResults = Integer.getInteger("zeppelin.scio.maxResult", 1000)
private val tab = "\t"
private val newline = "\n"
private val table = "%table"
private[scio] val tab = "\t"
private[scio] val newline = "\n"
private[scio] val table = "%table"
private[scio] val rowLimitReachedMsg =
s"$newline<font color=red>Results are limited to " + maxResults + s" rows.</font>$newline"
private[scio] val bQSchemaIncomplete =
s"$newline<font color=red>Provided BigQuery Schema has not fields!</font>$newline"
private def notifyIfTruncated(it: Iterator[_]): Unit = {
if(it.hasNext)
println(s"$newline<font color=red>Results are limited to " + maxResults + s" rows.</font>$newline")
if(it.hasNext) println(rowLimitReachedMsg)
}
/**
@ -45,7 +49,7 @@ private[scio] object DisplayHelpers {
*/
private[scio] def displayAnyVal[T: ClassTag](it: Iterator[T], printer: (T) => String): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
println(sCollectionEmptyMsg)
} else {
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
notifyIfTruncated(it)
@ -57,7 +61,7 @@ private[scio] object DisplayHelpers {
*/
private[scio] def displayString[T: ClassTag](it: Iterator[T], printer: (T) => String): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
println(sCollectionEmptyMsg)
} else {
println(s"$table value$newline${it.take(maxResults).map(printer).mkString(newline)}")
notifyIfTruncated(it)
@ -69,7 +73,7 @@ private[scio] object DisplayHelpers {
*/
private[scio] def displayKV[K: ClassTag, V: ClassTag](it: Iterator[(K,V)]): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
println(sCollectionEmptyMsg)
} else {
val content = it.take(maxResults).map{ case (k, v) => s"$k$tab$v" }.mkString(newline)
println(s"$table key${tab}value$newline$content")
@ -83,7 +87,7 @@ private[scio] object DisplayHelpers {
private[scio] def displayProduct[T: ClassTag](it: Iterator[T])
(implicit ev: T <:< Product): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
println(sCollectionEmptyMsg)
} else {
val first = it.next()
//TODO is this safe field name to value iterator?
@ -91,7 +95,7 @@ private[scio] object DisplayHelpers {
val header = fieldNames.mkString(tab)
val firstStr = first.productIterator.mkString(tab)
val content = it.take(maxResults).map(_.productIterator.mkString(tab)).mkString(newline)
val content = it.take(maxResults - 1).map(_.productIterator.mkString(tab)).mkString(newline)
println(s"$table $header$newline$firstStr$newline$content")
notifyIfTruncated(it)
}
@ -104,7 +108,7 @@ private[scio] object DisplayHelpers {
private[scio] def displayAvro[T: ClassTag](it: Iterator[T], schema: Schema = null)
(implicit ev: T <:< GenericRecord): Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
println(sCollectionEmptyMsg)
} else {
val first = it.next()
import collection.JavaConverters._
@ -117,7 +121,7 @@ private[scio] object DisplayHelpers {
val header = fieldNames.mkString(tab)
val firstStr = fieldNames.map(first.get).mkString(tab)
val content = it.take(maxResults)
val content = it.take(maxResults - 1)
.map(r => fieldNames.map(r.get).mkString(tab))
.mkString(newline)
println(s"$table $header$newline$firstStr$newline$content")
@ -131,18 +135,25 @@ private[scio] object DisplayHelpers {
private[scio] def displayBQTableRow[T: ClassTag](it: Iterator[T], schema: TableSchema)
(implicit ev: T <:< TableRow) : Unit = {
if (it.isEmpty) {
println(SCollectionEmptyMsg)
println(sCollectionEmptyMsg)
} else {
import collection.JavaConverters._
val fields = schema.getFields.asScala.map(_.getName).toArray
val header = fields.mkString(tab)
val fieldsOp = Option(schema.getFields)
fieldsOp match {
case None => println(bQSchemaIncomplete)
case Some(f) => {
val fields = f.asScala.map(_.getName).toArray
val content = it.take(maxResults)
.map(r => fields.map(r.get).mkString(tab))
.mkString(newline)
val header = fields.mkString(tab)
println(s"$table $header$newline$content")
notifyIfTruncated(it)
val content = it.take(maxResults)
.map(r => fields.map(r.get).mkString(tab))
.mkString(newline)
println(s"$table $header$newline$content")
notifyIfTruncated(it)
}
}
}
}