Add avro display

This commit is contained in:
Rafal Wojdyla 2016-09-29 18:31:06 -04:00
parent 89a2811853
commit be252f81cc

View file

@ -17,8 +17,10 @@
package org.apache.zeppelin
import com.spotify.scio.values.SCollection
import com.spotify.scio._
import com.spotify.scio.values.SCollection
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import scala.reflect.ClassTag
@ -117,4 +119,31 @@ package object scio {
}
}
// TODO: scala 2.11
// implicit class ZeppelinAvroSCollection[T: ClassTag](val self: SCollection[T])(implicit ev: T <:< GenericRecord) extends AnyVal {
implicit class ZeppelinAvroSCollection[T: ClassTag](val self: SCollection[T])
(implicit ev: T <:< GenericRecord) {
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
* and display elements from Avro like SCollection */
def closeAndDisplay(schema: Schema = null): Unit = {
val it = materialize(self).waitForResult().value
if (it.isEmpty) {
println(SCollectionEmptyMsg)
} else {
val first = it.next()
import collection.JavaConverters._
val fieldNames = first.getSchema.getFields.iterator.asScala.map(_.name()).toArray
val header = fieldNames.mkString("\t")
val firstStr = fieldNames.map(first.get(_)).mkString("\t")
val content = it.take(maxResults)
.map(r => fieldNames.map(r.get(_)).mkString("\t"))
.mkString("\n")
println(s"""%table $header\n$firstStr\n$content""")
notifIfTruncated(it)
}
}
}
}