mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Add avro display
This commit is contained in:
parent
89a2811853
commit
be252f81cc
1 changed files with 30 additions and 1 deletions
|
|
@ -17,8 +17,10 @@
|
|||
|
||||
package org.apache.zeppelin
|
||||
|
||||
import com.spotify.scio.values.SCollection
|
||||
import com.spotify.scio._
|
||||
import com.spotify.scio.values.SCollection
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.avro.generic.GenericRecord
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
|
|
@ -117,4 +119,31 @@ package object scio {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: scala 2.11
|
||||
// implicit class ZeppelinAvroSCollection[T: ClassTag](val self: SCollection[T])(implicit ev: T <:< GenericRecord) extends AnyVal {
|
||||
implicit class ZeppelinAvroSCollection[T: ClassTag](val self: SCollection[T])
|
||||
(implicit ev: T <:< GenericRecord) {
|
||||
/** Convenience method to close the current [[com.spotify.scio.ScioContext]]
|
||||
* and display elements from Avro like SCollection */
|
||||
def closeAndDisplay(schema: Schema = null): Unit = {
|
||||
val it = materialize(self).waitForResult().value
|
||||
|
||||
if (it.isEmpty) {
|
||||
println(SCollectionEmptyMsg)
|
||||
} else {
|
||||
val first = it.next()
|
||||
import collection.JavaConverters._
|
||||
val fieldNames = first.getSchema.getFields.iterator.asScala.map(_.name()).toArray
|
||||
|
||||
val header = fieldNames.mkString("\t")
|
||||
val firstStr = fieldNames.map(first.get(_)).mkString("\t")
|
||||
val content = it.take(maxResults)
|
||||
.map(r => fieldNames.map(r.get(_)).mkString("\t"))
|
||||
.mkString("\n")
|
||||
println(s"""%table $header\n$firstStr\n$content""")
|
||||
notifIfTruncated(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue