ZEPPELIN-2893. Pass non spark.* of spark interpreter properties to spark app driver

This commit is contained in:
Jeff Zhang 2017-09-03 10:39:37 +08:00
parent d6203c51ed
commit 13425273e1
3 changed files with 36 additions and 7 deletions

View file

@ -196,6 +196,13 @@ Staring from 0.6.1 SparkSession is available as variable `spark` when you are us
<a name="dependencyloading"> </a>
### How to pass property to SparkConf
There're 2 kinds of properties that would be passed to SparkConf
* Standard spark property (prefix with `spark.`). e.g. `spark.executor.memory` will be passed to `SparkConf`
* Non-standard spark property (prefix with `zeppelin.spark.`). e.g. `zeppelin.spark.property_1`, `property_1` will be passed to `SparkConf`
## Dependency Management
There are two ways to load external libraries in Spark interpreter. First is using interpreter setting menu and second is loading Spark properties.

View file

@ -381,9 +381,16 @@ public class SparkInterpreter extends Interpreter {
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));
if (key.startsWith("spark.") && !val.trim().isEmpty()) {
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
conf.set(key, val);
if (!val.trim().isEmpty()) {
if (key.startsWith("spark.")) {
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
conf.set(key, val);
}
if (key.startsWith("zeppelin.spark.")) {
String sparkPropertyKey = key.substring("zeppelin.spark.".length());
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
conf.set(sparkPropertyKey, val);
}
}
}
@ -509,9 +516,17 @@ public class SparkInterpreter extends Interpreter {
for (Object k : intpProperty.keySet()) {
String key = (String) k;
String val = toString(intpProperty.get(key));
if (key.startsWith("spark.") && !val.trim().isEmpty()) {
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
conf.set(key, val);
if (!val.trim().isEmpty()) {
if (key.startsWith("spark.")) {
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", key, val));
conf.set(key, val);
}
if (key.startsWith("zeppelin.spark.")) {
String sparkPropertyKey = key.substring("zeppelin.spark.".length());
logger.debug(String.format("SparkConf: key = [%s], value = [%s]", sparkPropertyKey, val));
conf.set(sparkPropertyKey, val);
}
}
}
setupConfForPySpark(conf);

View file

@ -78,7 +78,7 @@ public class SparkInterpreterTest {
p.setProperty("zeppelin.spark.maxResult", "1000");
p.setProperty("zeppelin.spark.importImplicit", "true");
p.setProperty("zeppelin.dep.localrepo", tmpDir.newFolder().getAbsolutePath());
p.setProperty("zeppelin.spark.property_1", "value_1");
return p;
}
@ -151,6 +151,13 @@ public class SparkInterpreterTest {
*/
}
@Test
public void testNonStandardSparkProperties() throws IOException {
// throw NoSuchElementException if no such property is found
InterpreterResult result = repl.interpret("sc.getConf.get(\"property_1\")", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
@Test
public void testNextLineInvocation() {
assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("\"123\"\n.toInt", context).code());