mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
ZEPPELIN-32 implement z.show()
This commit is contained in:
parent
dac416d26c
commit
ee29866881
3 changed files with 138 additions and 82 deletions
|
|
@ -107,6 +107,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
+ "we should set this value")
|
||||
.add("zeppelin.spark.useHiveContext", "true",
|
||||
"Use HiveContext instead of SQLContext if it is true.")
|
||||
.add("zeppelin.spark.maxResult", "1000", "Max number of SparkSQL result to display.")
|
||||
.add("args", "", "spark commandline args").build());
|
||||
|
||||
}
|
||||
|
|
@ -400,7 +401,8 @@ public class SparkInterpreter extends Interpreter {
|
|||
|
||||
dep = getDependencyResolver();
|
||||
|
||||
z = new ZeppelinContext(sc, sqlc, null, dep, printStream);
|
||||
z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
|
||||
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
|
||||
|
||||
try {
|
||||
if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) {
|
||||
|
|
@ -522,7 +524,7 @@ public class SparkInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
String getJobGroup(InterpreterContext context){
|
||||
return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
|
||||
return "zeppelin-" + context.getParagraphId();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -29,18 +29,15 @@ import org.apache.spark.scheduler.ActiveJob;
|
|||
import org.apache.spark.scheduler.DAGScheduler;
|
||||
import org.apache.spark.scheduler.Stage;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.spark.sql.SQLContext.QueryExecution;
|
||||
import org.apache.spark.sql.catalyst.expressions.Attribute;
|
||||
import org.apache.spark.ui.jobs.JobProgressListener;
|
||||
import org.apache.zeppelin.interpreter.Interpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterContext;
|
||||
import org.apache.zeppelin.interpreter.InterpreterException;
|
||||
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult;
|
||||
import org.apache.zeppelin.interpreter.InterpreterUtils;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
|
||||
import org.apache.zeppelin.interpreter.WrappedInterpreter;
|
||||
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
|
||||
import org.apache.zeppelin.scheduler.Scheduler;
|
||||
import org.apache.zeppelin.scheduler.SchedulerFactory;
|
||||
import org.slf4j.Logger;
|
||||
|
|
@ -76,7 +73,7 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
}
|
||||
|
||||
private String getJobGroup(InterpreterContext context){
|
||||
return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
|
||||
return "zeppelin-" + context.getParagraphId();
|
||||
}
|
||||
|
||||
private int maxResult;
|
||||
|
|
@ -126,81 +123,10 @@ public class SparkSqlInterpreter extends Interpreter {
|
|||
sc.setLocalProperty("spark.scheduler.pool", null);
|
||||
}
|
||||
|
||||
sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
|
||||
Object rdd = sqlc.sql(st);
|
||||
String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult);
|
||||
|
||||
// SchemaRDD - spark 1.1, 1.2, DataFrame - spark 1.3
|
||||
Object rdd;
|
||||
Object[] rows = null;
|
||||
try {
|
||||
rdd = sqlc.sql(st);
|
||||
|
||||
Method take = rdd.getClass().getMethod("take", int.class);
|
||||
rows = (Object[]) take.invoke(rdd, maxResult + 1);
|
||||
} catch (Exception e) {
|
||||
logger.error("Error", e);
|
||||
sc.clearJobGroup();
|
||||
return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e));
|
||||
}
|
||||
|
||||
String msg = null;
|
||||
|
||||
// get field names
|
||||
Method queryExecution;
|
||||
QueryExecution qe;
|
||||
try {
|
||||
queryExecution = rdd.getClass().getMethod("queryExecution");
|
||||
qe = (QueryExecution) queryExecution.invoke(rdd);
|
||||
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
|
||||
| IllegalArgumentException | InvocationTargetException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
List<Attribute> columns =
|
||||
scala.collection.JavaConverters.asJavaListConverter(
|
||||
qe.analyzed().output()).asJava();
|
||||
|
||||
for (Attribute col : columns) {
|
||||
if (msg == null) {
|
||||
msg = col.name();
|
||||
} else {
|
||||
msg += "\t" + col.name();
|
||||
}
|
||||
}
|
||||
|
||||
msg += "\n";
|
||||
|
||||
// ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
|
||||
// FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
|
||||
// NullType, NumericType, ShortType, StringType, StructType
|
||||
|
||||
try {
|
||||
for (int r = 0; r < maxResult && r < rows.length; r++) {
|
||||
Object row = rows[r];
|
||||
Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
|
||||
Method apply = row.getClass().getMethod("apply", int.class);
|
||||
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
if (!(Boolean) isNullAt.invoke(row, i)) {
|
||||
msg += apply.invoke(row, i).toString();
|
||||
} else {
|
||||
msg += "null";
|
||||
}
|
||||
if (i != columns.size() - 1) {
|
||||
msg += "\t";
|
||||
}
|
||||
}
|
||||
msg += "\n";
|
||||
}
|
||||
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
|
||||
| IllegalArgumentException | InvocationTargetException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
if (rows.length > maxResult) {
|
||||
msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>";
|
||||
}
|
||||
InterpreterResult rett = new InterpreterResult(Code.SUCCESS, "%table " + msg);
|
||||
sc.clearJobGroup();
|
||||
InterpreterResult rett = new InterpreterResult(Code.SUCCESS, msg);
|
||||
return rett;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import static scala.collection.JavaConversions.asJavaIterable;
|
|||
import static scala.collection.JavaConversions.collectionAsScalaIterable;
|
||||
|
||||
import java.io.PrintStream;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
|
@ -30,6 +32,8 @@ import java.util.List;
|
|||
|
||||
import org.apache.spark.SparkContext;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
import org.apache.spark.sql.SQLContext.QueryExecution;
|
||||
import org.apache.spark.sql.catalyst.expressions.Attribute;
|
||||
import org.apache.spark.sql.hive.HiveContext;
|
||||
import org.apache.zeppelin.display.AngularObject;
|
||||
import org.apache.zeppelin.display.AngularObjectRegistry;
|
||||
|
|
@ -54,15 +58,18 @@ public class ZeppelinContext extends HashMap<String, Object> {
|
|||
private DependencyResolver dep;
|
||||
private PrintStream out;
|
||||
private InterpreterContext interpreterContext;
|
||||
private int maxResult;
|
||||
|
||||
public ZeppelinContext(SparkContext sc, SQLContext sql,
|
||||
InterpreterContext interpreterContext,
|
||||
DependencyResolver dep, PrintStream printStream) {
|
||||
DependencyResolver dep, PrintStream printStream,
|
||||
int maxResult) {
|
||||
this.sc = sc;
|
||||
this.sqlContext = sql;
|
||||
this.interpreterContext = interpreterContext;
|
||||
this.dep = dep;
|
||||
this.out = printStream;
|
||||
this.maxResult = maxResult;
|
||||
}
|
||||
|
||||
public SparkContext sc;
|
||||
|
|
@ -233,6 +240,127 @@ public class ZeppelinContext extends HashMap<String, Object> {
|
|||
this.interpreterContext = interpreterContext;
|
||||
}
|
||||
|
||||
public void setMaxResult(int maxResult) {
|
||||
this.maxResult = maxResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* show DataFrame or SchemaRDD
|
||||
* @param o DataFrame or SchemaRDD object
|
||||
*/
|
||||
public void show(Object o) {
|
||||
show(o, maxResult);
|
||||
}
|
||||
|
||||
/**
|
||||
* show DataFrame or SchemaRDD
|
||||
* @param o DataFrame or SchemaRDD object
|
||||
* @param maxResult maximum number of rows to display
|
||||
*/
|
||||
public void show(Object o, int maxResult) {
|
||||
Class cls = null;
|
||||
try {
|
||||
cls = this.getClass().forName("org.apache.spark.sql.DataFrame");
|
||||
} catch (ClassNotFoundException e) {
|
||||
}
|
||||
|
||||
if (cls == null) {
|
||||
try {
|
||||
cls = this.getClass().forName("org.apache.spark.sql.SchemaRDD");
|
||||
} catch (ClassNotFoundException e) {
|
||||
}
|
||||
}
|
||||
|
||||
if (cls == null) {
|
||||
throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
|
||||
}
|
||||
|
||||
if (cls.isInstance(o)) {
|
||||
out.print(showRDD(sc, interpreterContext, o, maxResult));
|
||||
} else {
|
||||
out.print(o.toString());
|
||||
}
|
||||
}
|
||||
|
||||
public static String showRDD(SparkContext sc,
|
||||
InterpreterContext interpreterContext,
|
||||
Object rdd, int maxResult) {
|
||||
Object[] rows = null;
|
||||
Method take;
|
||||
String jobGroup = "zeppelin-" + interpreterContext.getParagraphId();
|
||||
sc.setJobGroup(jobGroup, "Zeppelin", false);
|
||||
|
||||
try {
|
||||
take = rdd.getClass().getMethod("take", int.class);
|
||||
rows = (Object[]) take.invoke(rdd, maxResult + 1);
|
||||
|
||||
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
|
||||
| IllegalArgumentException | InvocationTargetException e) {
|
||||
sc.clearJobGroup();
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
String msg = null;
|
||||
|
||||
// get field names
|
||||
Method queryExecution;
|
||||
QueryExecution qe;
|
||||
try {
|
||||
queryExecution = rdd.getClass().getMethod("queryExecution");
|
||||
qe = (QueryExecution) queryExecution.invoke(rdd);
|
||||
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
|
||||
| IllegalArgumentException | InvocationTargetException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
List<Attribute> columns =
|
||||
scala.collection.JavaConverters.asJavaListConverter(
|
||||
qe.analyzed().output()).asJava();
|
||||
|
||||
for (Attribute col : columns) {
|
||||
if (msg == null) {
|
||||
msg = col.name();
|
||||
} else {
|
||||
msg += "\t" + col.name();
|
||||
}
|
||||
}
|
||||
|
||||
msg += "\n";
|
||||
|
||||
// ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType,
|
||||
// FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType,
|
||||
// NullType, NumericType, ShortType, StringType, StructType
|
||||
|
||||
try {
|
||||
for (int r = 0; r < maxResult && r < rows.length; r++) {
|
||||
Object row = rows[r];
|
||||
Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
|
||||
Method apply = row.getClass().getMethod("apply", int.class);
|
||||
|
||||
for (int i = 0; i < columns.size(); i++) {
|
||||
if (!(Boolean) isNullAt.invoke(row, i)) {
|
||||
msg += apply.invoke(row, i).toString();
|
||||
} else {
|
||||
msg += "null";
|
||||
}
|
||||
if (i != columns.size() - 1) {
|
||||
msg += "\t";
|
||||
}
|
||||
}
|
||||
msg += "\n";
|
||||
}
|
||||
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
|
||||
| IllegalArgumentException | InvocationTargetException e) {
|
||||
throw new InterpreterException(e);
|
||||
}
|
||||
|
||||
if (rows.length > maxResult) {
|
||||
msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>";
|
||||
}
|
||||
sc.clearJobGroup();
|
||||
return "%table " + msg;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run paragraph by id
|
||||
* @param id
|
||||
|
|
|
|||
Loading…
Reference in a new issue