ZEPPELIN-2953 Allow custom http header for livy interpreter

This commit is contained in:
Jeff Zhang 2017-09-26 10:12:48 +08:00
parent a424f5c655
commit 720d8d09e6
3 changed files with 47 additions and 1 deletions

View file

@ -144,7 +144,12 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory`
<td>zeppelin.livy.ssl.trustStorePassword</td>
<td></td>
<td>password for trustStore file. Used when livy ssl is enabled</td>
</tr>
</tr>
<tr>
<td>zeppelin.livy.http.headers</td>
<td>key_1: value_1; key_2: value_2</td>
<td>custom http headers when calling livy rest api. Each http header is separated by `;`, and each header is one key value pair where key value is separated by `:`</td>
</tr>
</table>
**We remove livy.spark.master in zeppelin-0.7. Because we sugguest user to use livy 0.3 in zeppelin-0.7. And livy 0.3 don't allow to specify livy.spark.master, it enfornce yarn-cluster mode.**

View file

@ -62,6 +62,8 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
@ -80,6 +82,7 @@ public abstract class BaseLivyInterpreter extends Interpreter {
protected boolean displayAppInfo;
protected LivyVersion livyVersion;
private RestTemplate restTemplate;
private Map<String, String> customHeaders = new HashMap<>();
Set<Object> paragraphsToCancel = Collections.newSetFromMap(
new ConcurrentHashMap<Object, Boolean>());
@ -96,6 +99,33 @@ public abstract class BaseLivyInterpreter extends Interpreter {
this.pullStatusInterval = Integer.parseInt(
property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + ""));
this.restTemplate = createRestTemplate();
if (!StringUtils.isBlank(property.getProperty("zeppelin.livy.http.headers"))) {
String[] headers = property.getProperty("zeppelin.livy.http.headers").split(";");
for (String header : headers) {
String[] splits = header.split(":", -1);
if (splits.length != 2) {
throw new RuntimeException("Invalid format of http headers: " + header +
", valid http header format is HEADER_NAME:HEADER_VALUE");
}
customHeaders.put(splits[0].trim(), envSubstitute(splits[1].trim()));
}
}
}
private String envSubstitute(String value) {
String newValue = new String(value);
Pattern pattern = Pattern.compile("\\$\\{(.*)\\}");
Matcher matcher = pattern.matcher(value);
while (matcher.find()) {
String env = matcher.group(1);
newValue = newValue.replace("${" + env + "}", System.getenv(env));
}
return newValue;
}
// only for testing
Map<String, String> getCustomHeaders() {
return customHeaders;
}
public abstract String getSessionKind();
@ -523,6 +553,9 @@ public abstract class BaseLivyInterpreter extends Interpreter {
HttpHeaders headers = new HttpHeaders();
headers.add("Content-Type", MediaType.APPLICATION_JSON_UTF8_VALUE);
headers.add("X-Requested-By", "zeppelin");
for (Map.Entry<String, String> entry : customHeaders.entrySet()) {
headers.add(entry.getKey(), entry.getValue());
}
ResponseEntity<String> response = null;
try {
if (method.equals("POST")) {

View file

@ -39,9 +39,17 @@ public class LivySQLInterpreterTest {
properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "3");
properties.setProperty("zeppelin.livy.http.headers", "HEADER_1: VALUE_1_${HOME}");
sqlInterpreter = new LivySparkSQLInterpreter(properties);
}
@Test
public void testHttpHeaders() {
assertEquals(1, sqlInterpreter.getCustomHeaders().size());
assertTrue(sqlInterpreter.getCustomHeaders().get("HEADER_1").startsWith("VALUE_1_"));
assertNotEquals("VALUE_1_${HOME}", sqlInterpreter.getCustomHeaders().get("HEADER_1"));
}
@Test
public void testParseSQLOutput() {
// Empty sql output