ZEPPELIN-32 implement z.show()

This commit is contained in:
Lee moon soo 2015-04-07 21:09:17 +09:00
parent dac416d26c
commit ee29866881
3 changed files with 138 additions and 82 deletions

View file

@ -107,6 +107,7 @@ public class SparkInterpreter extends Interpreter {
+ "we should set this value")
.add("zeppelin.spark.useHiveContext", "true",
"Use HiveContext instead of SQLContext if it is true.")
.add("zeppelin.spark.maxResult", "1000", "Max number of SparkSQL result to display.")
.add("args", "", "spark commandline args").build());
}
@ -400,7 +401,8 @@ public class SparkInterpreter extends Interpreter {
dep = getDependencyResolver();
z = new ZeppelinContext(sc, sqlc, null, dep, printStream);
z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
try {
if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) {
@ -522,7 +524,7 @@ public class SparkInterpreter extends Interpreter {
}
String getJobGroup(InterpreterContext context){
return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
return "zeppelin-" + context.getParagraphId();
}
/**

View file

@ -29,18 +29,15 @@ import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Stage;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SQLContext.QueryExecution;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
@ -76,7 +73,7 @@ public class SparkSqlInterpreter extends Interpreter {
}
private String getJobGroup(InterpreterContext context){
return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
return "zeppelin-" + context.getParagraphId();
}
private int maxResult;
@ -126,81 +123,10 @@ public class SparkSqlInterpreter extends Interpreter {
sc.setLocalProperty("spark.scheduler.pool", null);
}
sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
Object rdd = sqlc.sql(st);
String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult);
// SchemaRDD - spark 1.1, 1.2, DataFrame - spark 1.3
Object rdd;
Object[] rows = null;
try {
rdd = sqlc.sql(st);
Method take = rdd.getClass().getMethod("take", int.class);
rows = (Object[]) take.invoke(rdd, maxResult + 1);
} catch (Exception e) {
logger.error("Error", e);
sc.clearJobGroup();
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
}
String msg = null;
// get field names
Method queryExecution;
QueryExecution qe;
try {
queryExecution = rdd.getClass().getMethod("queryExecution");
qe = (QueryExecution) queryExecution.invoke(rdd);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
}
List<Attribute> columns =
scala.collection.JavaConverters.asJavaListConverter(
qe.analyzed().output()).asJava();
for (Attribute col : columns) {
if (msg == null) {
msg = col.name();
} else {
msg += "\t" + col.name();
}
}
msg += "\n";
// ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
// FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
// NullType, NumericType, ShortType, StringType, StructType
try {
for (int r = 0; r < maxResult && r < rows.length; r++) {
Object row = rows[r];
Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
Method apply = row.getClass().getMethod("apply", int.class);
for (int i = 0; i < columns.size(); i++) {
if (!(Boolean) isNullAt.invoke(row, i)) {
msg += apply.invoke(row, i).toString();
} else {
msg += "null";
}
if (i != columns.size() - 1) {
msg += "\t";
}
}
msg += "\n";
}
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
}
if (rows.length > maxResult) {
msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>";
}
InterpreterResult rett = new InterpreterResult(Code.SUCCESS, "%table " + msg);
sc.clearJobGroup();
InterpreterResult rett = new InterpreterResult(Code.SUCCESS, msg);
return rett;
}

View file

@ -22,6 +22,8 @@ import static scala.collection.JavaConversions.asJavaIterable;
import static scala.collection.JavaConversions.collectionAsScalaIterable;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@ -30,6 +32,8 @@ import java.util.List;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SQLContext.QueryExecution;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
@ -54,15 +58,18 @@ public class ZeppelinContext extends HashMap<String, Object> {
private DependencyResolver dep;
private PrintStream out;
private InterpreterContext interpreterContext;
private int maxResult;
public ZeppelinContext(SparkContext sc, SQLContext sql,
InterpreterContext interpreterContext,
DependencyResolver dep, PrintStream printStream) {
DependencyResolver dep, PrintStream printStream,
int maxResult) {
this.sc = sc;
this.sqlContext = sql;
this.interpreterContext = interpreterContext;
this.dep = dep;
this.out = printStream;
this.maxResult = maxResult;
}
public SparkContext sc;
@ -233,6 +240,127 @@ public class ZeppelinContext extends HashMap<String, Object> {
this.interpreterContext = interpreterContext;
}
public void setMaxResult(int maxResult) {
this.maxResult = maxResult;
}
/**
* show DataFrame or SchemaRDD
* @param o DataFrame or SchemaRDD object
*/
public void show(Object o) {
show(o, maxResult);
}
/**
* show DataFrame or SchemaRDD
* @param o DataFrame or SchemaRDD object
* @param maxResult maximum number of rows to display
*/
public void show(Object o, int maxResult) {
Class cls = null;
try {
cls = this.getClass().forName("org.apache.spark.sql.DataFrame");
} catch (ClassNotFoundException e) {
}
if (cls == null) {
try {
cls = this.getClass().forName("org.apache.spark.sql.SchemaRDD");
} catch (ClassNotFoundException e) {
}
}
if (cls == null) {
throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
}
if (cls.isInstance(o)) {
out.print(showRDD(sc, interpreterContext, o, maxResult));
} else {
out.print(o.toString());
}
}
public static String showRDD(SparkContext sc,
InterpreterContext interpreterContext,
Object rdd, int maxResult) {
Object[] rows = null;
Method take;
String jobGroup = "zeppelin-" + interpreterContext.getParagraphId();
sc.setJobGroup(jobGroup, "Zeppelin", false);
try {
take = rdd.getClass().getMethod("take", int.class);
rows = (Object[]) take.invoke(rdd, maxResult + 1);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
sc.clearJobGroup();
throw new InterpreterException(e);
}
String msg = null;
// get field names
Method queryExecution;
QueryExecution qe;
try {
queryExecution = rdd.getClass().getMethod("queryExecution");
qe = (QueryExecution) queryExecution.invoke(rdd);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
}
List<Attribute> columns =
scala.collection.JavaConverters.asJavaListConverter(
qe.analyzed().output()).asJava();
for (Attribute col : columns) {
if (msg == null) {
msg = col.name();
} else {
msg += "\t" + col.name();
}
}
msg += "\n";
// ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
// FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
// NullType, NumericType, ShortType, StringType, StructType
try {
for (int r = 0; r < maxResult && r < rows.length; r++) {
Object row = rows[r];
Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
Method apply = row.getClass().getMethod("apply", int.class);
for (int i = 0; i < columns.size(); i++) {
if (!(Boolean) isNullAt.invoke(row, i)) {
msg += apply.invoke(row, i).toString();
} else {
msg += "null";
}
if (i != columns.size() - 1) {
msg += "\t";
}
}
msg += "\n";
}
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
throw new InterpreterException(e);
}
if (rows.length > maxResult) {
msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>";
}
sc.clearJobGroup();
return "%table " + msg;
}
/**
* Run paragraph by id
* @param id