Merge remote-tracking branch 'origin/master' into livyInterperter

This commit is contained in:
Prabhjyot Singh 2016-03-11 11:39:10 +05:30
commit 78eca1ebee
6 changed files with 165 additions and 36 deletions

View file

@ -104,7 +104,7 @@ if [[ -z "${ZEPPELIN_INTP_MEM}" ]]; then
export ZEPPELIN_INTP_MEM="${ZEPPELIN_MEM}"
fi
JAVA_INTP_OPTS+=" ${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
export JAVA_INTP_OPTS

View file

@ -40,6 +40,74 @@ Spark Interpreter group, which consisted of 4 interpreters.
</table>
## Configuration
Zeppelin provides the below properties for Spark interpreter.
You can also set other Spark properties which are not listed in the table. If so, please refer to [Spark Available Properties](http://spark.apache.org/docs/latest/configuration.html#available-properties).
<table class="table-configuration">
<tr>
<th>Property</th>
<th>Default</th>
<th>Description</th>
</tr>
<tr>
<td>args</td>
<td></td>
<td>Spark commandline args</td>
</tr>
<td>master</td>
<td>local[*]</td>
<td>Spark master uri. <br/> ex) spark://masterhost:7077</td>
<tr>
<td>spark.app.name</td>
<td>Zeppelin</td>
<td>The name of spark application.</td>
</tr>
<tr>
<td>spark.cores.max</td>
<td></td>
<td>Total number of cores to use. <br/> Empty value uses all available core.</td>
</tr>
<tr>
<td>spark.executor.memory </td>
<td>512m</td>
<td>Executor memory per worker instance. <br/> ex) 512m, 32g</td>
</tr>
<tr>
<td>zeppelin.dep.additionalRemoteRepository</td>
<td>spark-packages, <br/> http://dl.bintray.com/spark-packages/maven, <br/> false;</td>
<td>A list of `id,remote-repository-URL,is-snapshot;` <br/> for each remote repository.</td>
</tr>
<tr>
<td>zeppelin.dep.localrepo</td>
<td>local-repo</td>
<td>Local repository for dependency loader</td>
</tr>
<tr>
<td>zeppelin.pyspark.python</td>
<td>python</td>
<td>Python command to run pyspark with</td>
</tr>
<tr>
<td>zeppelin.spark.concurrentSQL</td>
<td>false</td>
<td>Execute multiple SQL concurrently if set true.</td>
</tr>
<tr>
<td>zeppelin.spark.maxResult</td>
<td>1000</td>
<td>Max number of SparkSQL result to display.</td>
</tr>
<tr>
<td>zeppelin.spark.printREPLOutput</td>
<td>true</td>
<td>Print REPL output</td>
</tr>
<tr>
<td>zeppelin.spark.useHiveContext</td>
<td>true</td>
<td>Use HiveContext instead of SQLContext if it is true.</td>
</tr>
</table>
Without any configuration, Spark interpreter works out of box in local mode. But if you want to connect to your Spark cluster, you'll need to follow below two simple steps.
### 1. Export SPARK_HOME
@ -269,7 +337,7 @@ To learn more about dynamic form, checkout [Dynamic Form](../manual/dynamicform.
In 'Separate Interpreter for each note' mode, SparkInterpreter creates scala compiler per each notebook. However it still shares the single SparkContext.
## Setting up Zeppelin with Kerberos
Logical setup with Zeppelin, Kerberos Distribution Center (KDC), and Spark on YARN:
Logical setup with Zeppelin, Kerberos Key Distribution Center (KDC), and Spark on YARN:
<img src="../assets/themes/zeppelin/img/docs-img/kdc_zeppelin.png">

View file

@ -82,31 +82,34 @@ public class SparkInterpreter extends Interpreter {
static {
Interpreter.register(
"spark",
"spark",
SparkInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.app.name",
getSystemDefault("SPARK_APP_NAME", "spark.app.name", "Zeppelin"),
"The name of spark application.")
.add("master",
getSystemDefault("MASTER", "spark.master", "local[*]"),
"Spark master uri. ex) spark://masterhost:7077")
.add("spark.executor.memory",
getSystemDefault(null, "spark.executor.memory", "512m"),
"Executor memory per worker instance. ex) 512m, 32g")
.add("spark.cores.max",
getSystemDefault(null, "spark.cores.max", ""),
"Total number of cores to use. Empty value uses all available core.")
.add("zeppelin.spark.useHiveContext",
getSystemDefault("ZEPPELIN_SPARK_USEHIVECONTEXT",
"zeppelin.spark.useHiveContext", "true"),
"Use HiveContext instead of SQLContext if it is true.")
.add("zeppelin.spark.maxResult",
getSystemDefault("ZEPPELIN_SPARK_MAXRESULT", "zeppelin.spark.maxResult", "1000"),
"Max number of SparkSQL result to display.")
.add("args", "", "spark commandline args").build());
"spark",
"spark",
SparkInterpreter.class.getName(),
new InterpreterPropertyBuilder()
.add("spark.app.name",
getSystemDefault("SPARK_APP_NAME", "spark.app.name", "Zeppelin"),
"The name of spark application.")
.add("master",
getSystemDefault("MASTER", "spark.master", "local[*]"),
"Spark master uri. ex) spark://masterhost:7077")
.add("spark.executor.memory",
getSystemDefault(null, "spark.executor.memory", "512m"),
"Executor memory per worker instance. ex) 512m, 32g")
.add("spark.cores.max",
getSystemDefault(null, "spark.cores.max", ""),
"Total number of cores to use. Empty value uses all available core.")
.add("zeppelin.spark.useHiveContext",
getSystemDefault("ZEPPELIN_SPARK_USEHIVECONTEXT",
"zeppelin.spark.useHiveContext", "true"),
"Use HiveContext instead of SQLContext if it is true.")
.add("zeppelin.spark.maxResult",
getSystemDefault("ZEPPELIN_SPARK_MAXRESULT", "zeppelin.spark.maxResult", "1000"),
"Max number of SparkSQL result to display.")
.add("args", "", "spark commandline args")
.add("zeppelin.spark.printREPLOutput", "true",
"Print REPL output")
.build()
);
}
private ZeppelinContext z;
@ -383,6 +386,10 @@ public class SparkInterpreter extends Interpreter {
return defaultValue;
}
public boolean printREPLOutput() {
return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.printREPLOutput"));
}
@Override
public void open() {
URL[] urls = getClassloaderUrls();
@ -483,7 +490,11 @@ public class SparkInterpreter extends Interpreter {
synchronized (sharedInterpreterLock) {
/* create scala repl */
this.interpreter = new SparkILoop(null, new PrintWriter(out));
if (printREPLOutput()) {
this.interpreter = new SparkILoop(null, new PrintWriter(out));
} else {
this.interpreter = new SparkILoop(null, new PrintWriter(Console.out(), false));
}
interpreter.settings_$eq(settings);

View file

@ -60,6 +60,10 @@ public class SparkSqlInterpreter extends Interpreter {
SparkInterpreter.getSystemDefault("ZEPPELIN_SPARK_CONCURRENTSQL",
"zeppelin.spark.concurrentSQL", "false"),
"Execute multiple SQL concurrently if set true.")
.add("zeppelin.spark.sql.stacktrace",
SparkInterpreter.getSystemDefault("ZEPPELIN_SPARK_SQL_STACKTRACE",
"zeppelin.spark.sql.stacktrace", "false"),
"Show full exception stacktrace for SQL queries if set to true.")
.build());
}
@ -131,8 +135,16 @@ public class SparkSqlInterpreter extends Interpreter {
// Therefore need to use reflection to keep binary compatibility for all spark versions.
Method sqlMethod = sqlc.getClass().getMethod("sql", String.class);
rdd = sqlMethod.invoke(sqlc, st);
} catch (InvocationTargetException ite) {
if (Boolean.parseBoolean(getProperty("zeppelin.spark.sql.stacktrace"))) {
throw new InterpreterException(ite);
}
logger.error("Invocation target exception", ite);
String msg = ite.getTargetException().getMessage()
+ "\nset zeppelin.spark.sql.stacktrace = true to see full stacktrace";
return new InterpreterResult(Code.ERROR, msg);
} catch (NoSuchMethodException | SecurityException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
| IllegalArgumentException e) {
throw new InterpreterException(e);
}

View file

@ -110,13 +110,10 @@ public class SparkSqlInterpreterTest {
assertEquals(Type.TABLE, ret.type());
assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
try {
sql.interpret("select wrong syntax", context);
fail("Exception not catched");
} catch (Exception e) {
// okay
LOGGER.info("Exception in SparkSqlInterpreterTest while test ", e);
}
ret = sql.interpret("select wrong syntax", context);
assertEquals(InterpreterResult.Code.ERROR, ret.code());
assertTrue(ret.message().length() > 0);
assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from test", context).code());
}

View file

@ -386,4 +386,45 @@ public class ParagraphActionsIT extends AbstractZeppelinIT {
}
@Test
public void testShowAndHideLineNumbers() throws Exception {
if (!endToEndTestEnabled()) {
return;
}
try {
createNewNote();
waitForParagraph(1, "READY");
String xpathToLineNumberField=getParagraphXPath(1) + "//div[contains(@class, 'ace_gutter-layer')]";
String xpathToShowLineNumberButton=getParagraphXPath(1) + "//ul/li/a[@ng-click='showLineNumbers()']";
String xpathToHideLineNumberButton=getParagraphXPath(1) + "//ul/li/a[@ng-click='hideLineNumbers()']";
collector.checkThat("Before \"Show line number\" the Line Number is Enabled ",
driver.findElement(By.xpath(xpathToLineNumberField)).isDisplayed(),
CoreMatchers.equalTo(false));
driver.findElement(By.xpath(getParagraphXPath(1) + "//span[@class='icon-settings']")).click();
collector.checkThat("Before \"Show line number\" The option panel in paragraph has button labeled ",
driver.findElement(By.xpath(xpathToShowLineNumberButton)).getText(),
CoreMatchers.equalTo("Show line numbers"));
driver.findElement(By.xpath(xpathToShowLineNumberButton)).click();
collector.checkThat("After \"Show line number\" the Line Number is Enabled ",
driver.findElement(By.xpath(xpathToLineNumberField)).isDisplayed(),
CoreMatchers.equalTo(true));
driver.findElement(By.xpath(getParagraphXPath(1) + "//span[@class='icon-settings']")).click();
collector.checkThat("After \"Show line number\" The option panel in paragraph has button labeled ",
driver.findElement(By.xpath(xpathToHideLineNumberButton)).getText(),
CoreMatchers.equalTo("Hide line numbers"));
driver.findElement(By.xpath(xpathToHideLineNumberButton)).click();
collector.checkThat("After \"Hide line number\" the Line Number is Enabled",
driver.findElement(By.xpath(xpathToLineNumberField)).isDisplayed(),
CoreMatchers.equalTo(false));
ZeppelinITUtils.sleep(1000, false);
deleteTestNotebook(driver);
} catch (Exception e) {
handleException("Exception in ParagraphActionsIT while testShowAndHideLineNumbers ", e);
}
}
}