address comment

This commit is contained in:
Jeff Zhang 2019-12-28 21:54:19 +08:00
parent bc3c1feff1
commit 4ff15e4fbf
2 changed files with 28 additions and 24 deletions

View file

@ -88,29 +88,31 @@ public class SparkSqlInterpreter extends AbstractInterpreter {
StringBuilder builder = new StringBuilder();
List<String> sqls = sqlSplitter.splitSql(st);
for (String sql : sqls) {
sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool"));
sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
int maxResult = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
"" + sparkInterpreter.getZeppelinContext().getMaxResult()));
try {
Method method = sqlc.getClass().getMethod("sql", String.class);
int maxResult = Integer.parseInt(context.getLocalProperties().getOrDefault("limit",
"" + sparkInterpreter.getZeppelinContext().getMaxResult()));
String result = sparkInterpreter.getZeppelinContext().showData(
method.invoke(sqlc, sql), maxResult);
sc.clearJobGroup();
sc.setLocalProperty("spark.scheduler.pool", context.getLocalProperties().get("pool"));
sc.setJobGroup(Utils.buildJobGroupId(context), Utils.buildJobDesc(context), false);
String curSql = null;
try {
for (String sql : sqls) {
curSql = sql;
String result = sparkInterpreter.getZeppelinContext().showData(sqlc.sql(sql), maxResult);
builder.append(result);
} catch (Exception e) {
if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) {
builder.append("%text " + ExceptionUtils.getStackTrace(e));
} else {
logger.error("Invocation target exception", e);
String msg = e.getCause().getMessage()
+ "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace";
builder.append("\n%text " + msg);
}
return new InterpreterResult(Code.ERROR, builder.toString());
}
} catch (Exception e) {
builder.append("\n%text Error happens in sql: " + curSql + "\n");
if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace", "false"))) {
builder.append(ExceptionUtils.getStackTrace(e));
} else {
logger.error("Invocation target exception", e);
String msg = e.getCause().getMessage()
+ "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace";
builder.append(msg);
}
return new InterpreterResult(Code.ERROR, builder.toString());
} finally {
sc.clearJobGroup();
}
return new InterpreterResult(Code.SUCCESS, builder.toString());

View file

@ -195,18 +195,20 @@ public class SparkSqlInterpreterTest {
// One correct sql + One invalid sql
ret = sqlInterpreter.interpret("select * from gr;invalid_sql", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertEquals(ret.message().toString(), 3, ret.message().size());
assertEquals(ret.message().toString(), 4, ret.message().size());
assertEquals(ret.message().toString(), Type.TABLE, ret.message().get(1).getType());
assertEquals(ret.message().toString(), Type.TEXT, ret.message().get(2).getType());
assertTrue(ret.message().toString(), ret.message().get(2).getData().contains("ParseException"));
assertEquals(ret.message().toString(), Type.TEXT, ret.message().get(3).getType());
assertTrue(ret.message().toString(), ret.message().get(3).getData().contains("ParseException"));
// One correct sql + One invalid sql + One valid sql (skipped)
ret = sqlInterpreter.interpret("select * from gr;invalid_sql; select count(1) from gr", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertEquals(ret.message().toString(), 3, ret.message().size());
assertEquals(ret.message().toString(), 4, ret.message().size());
assertEquals(ret.message().toString(), Type.TABLE, ret.message().get(1).getType());
assertEquals(ret.message().toString(), Type.TEXT, ret.message().get(2).getType());
assertTrue(ret.message().toString(), ret.message().get(2).getData().contains("ParseException"));
assertEquals(ret.message().toString(), Type.TEXT, ret.message().get(3).getType());
assertTrue(ret.message().toString(), ret.message().get(3).getData().contains("ParseException"));
}
@Test