mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
jetty client relay to asyncclient when proxy on
This commit is contained in:
parent
16ccbb4616
commit
a5711bc1dd
1 changed files with 80 additions and 36 deletions
|
|
@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.reflect.Type;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
|
@ -27,6 +29,12 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.http.client.methods.HttpDelete;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPost;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.client.methods.HttpRequestBase;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
|
|
@ -52,11 +60,10 @@ public class ZeppelinhubRestApiHandler {
|
|||
private static final String USER_SESSION_HEADER = "X-User-Session";
|
||||
private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
|
||||
private static boolean PROXY_ON = false;
|
||||
private static String PROXY_HOST;
|
||||
private static int PROXY_PORT;
|
||||
|
||||
//TODO(xxx): possibly switch to jetty-client > 9.3.12 when adopt jvm 1.8
|
||||
private static HttpProxyClient proxyClient;
|
||||
private final HttpClient client;
|
||||
private final String zepelinhubUrl;
|
||||
private String zepelinhubUrl;
|
||||
|
||||
public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl) {
|
||||
return new ZeppelinhubRestApiHandler(zeppelinhubUrl);
|
||||
|
|
@ -65,8 +72,7 @@ public class ZeppelinhubRestApiHandler {
|
|||
private ZeppelinhubRestApiHandler(String zeppelinhubUrl) {
|
||||
this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
|
||||
|
||||
//TODO(khalid):to make proxy conf consistent with Zeppelin confs
|
||||
//readProxyConf();
|
||||
readProxyConf();
|
||||
client = getAsyncClient();
|
||||
|
||||
try {
|
||||
|
|
@ -74,48 +80,41 @@ public class ZeppelinhubRestApiHandler {
|
|||
} catch (Exception e) {
|
||||
LOG.error("Cannot initialize ZeppelinHub REST async client", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private void readProxyConf() {
|
||||
//try reading http_proxy
|
||||
String proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
|
||||
System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
|
||||
//try reading https_proxy
|
||||
String proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
|
||||
System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
|
||||
if (StringUtils.isBlank(proxyHostString)) {
|
||||
//try https_proxy if no http_proxy
|
||||
proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
|
||||
System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
|
||||
//try http_proxy if no https_proxy
|
||||
proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
|
||||
System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
|
||||
}
|
||||
|
||||
if (StringUtils.isBlank(proxyHostString)) {
|
||||
PROXY_ON = false;
|
||||
} else {
|
||||
// host format - http://domain:port/
|
||||
String[] parts = proxyHostString.replaceAll("/", "").split(":");
|
||||
if (parts.length != 3) {
|
||||
LOG.warn("Proxy host format is incorrect {}, e.g. http://domain:port/", proxyHostString);
|
||||
PROXY_ON = false;
|
||||
return;
|
||||
if (!StringUtils.isBlank(proxyHostString)) {
|
||||
URI uri = null;
|
||||
try {
|
||||
uri = new URI(proxyHostString);
|
||||
} catch (URISyntaxException e) {
|
||||
LOG.error("Proxy uri doesn't follow correct syntax", e);
|
||||
}
|
||||
if (uri != null) {
|
||||
PROXY_ON = true;
|
||||
proxyClient = HttpProxyClient.newInstance(uri);
|
||||
}
|
||||
PROXY_HOST = parts[1];
|
||||
PROXY_PORT = Integer.parseInt(parts[2]);
|
||||
LOG.info("Proxy protocol: {}, domain: {}, port: {}", parts[0], parts[1], parts[2]);
|
||||
PROXY_ON = true;
|
||||
}
|
||||
}
|
||||
|
||||
private HttpClient getAsyncClient() {
|
||||
SslContextFactory sslContextFactory = new SslContextFactory();
|
||||
HttpClient httpClient = new HttpClient(sslContextFactory);
|
||||
|
||||
// Configure HttpClient
|
||||
httpClient.setFollowRedirects(false);
|
||||
httpClient.setMaxConnectionsPerDestination(100);
|
||||
// Config considerations
|
||||
//TODO(khalid): consider using proxy
|
||||
//TODO(khalid): consider whether require to follow redirects
|
||||
//TODO(khalid): consider multi-threaded connection manager case
|
||||
|
||||
// Config considerations
|
||||
//TODO(khalid): consider multi-threaded connection manager case
|
||||
return httpClient;
|
||||
}
|
||||
|
||||
|
|
@ -159,7 +158,11 @@ public class ZeppelinhubRestApiHandler {
|
|||
return StringUtils.EMPTY;
|
||||
}
|
||||
String url = zepelinhubUrl + argument;
|
||||
return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
|
||||
if (PROXY_ON) {
|
||||
return sendToZeppelinHubViaProxy(new HttpGet(url), StringUtils.EMPTY, token, true);
|
||||
} else {
|
||||
return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
|
||||
}
|
||||
}
|
||||
|
||||
public String putWithResponseBody(String token, String url, String json) throws IOException {
|
||||
|
|
@ -167,7 +170,11 @@ public class ZeppelinhubRestApiHandler {
|
|||
LOG.error("Empty note, cannot send it to zeppelinHub");
|
||||
throw new IOException("Cannot send emtpy note to zeppelinHub");
|
||||
}
|
||||
return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
|
||||
if (PROXY_ON) {
|
||||
return sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl + url), json, token, true);
|
||||
} else {
|
||||
return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
|
||||
}
|
||||
}
|
||||
|
||||
public void put(String token, String jsonNote) throws IOException {
|
||||
|
|
@ -175,7 +182,11 @@ public class ZeppelinhubRestApiHandler {
|
|||
LOG.error("Cannot save empty note/string to ZeppelinHub");
|
||||
return;
|
||||
}
|
||||
sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
|
||||
if (PROXY_ON) {
|
||||
sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl), jsonNote, token, false);
|
||||
} else {
|
||||
sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
|
||||
}
|
||||
}
|
||||
|
||||
public void del(String token, String argument) throws IOException {
|
||||
|
|
@ -183,7 +194,37 @@ public class ZeppelinhubRestApiHandler {
|
|||
LOG.error("Cannot delete empty note from ZeppelinHub");
|
||||
return;
|
||||
}
|
||||
sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, false);
|
||||
if (PROXY_ON) {
|
||||
sendToZeppelinHubViaProxy(new HttpDelete(zepelinhubUrl + argument), StringUtils.EMPTY, token,
|
||||
false);
|
||||
} else {
|
||||
sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token,
|
||||
false);
|
||||
}
|
||||
}
|
||||
|
||||
private String sendToZeppelinHubViaProxy(HttpRequestBase request,
|
||||
String json,
|
||||
String token,
|
||||
boolean withResponse) throws IOException {
|
||||
request.setHeader(ZEPPELIN_TOKEN_HEADER, token);
|
||||
if (request.getMethod().equals(HttpPost.METHOD_NAME)) {
|
||||
HttpPost post = (HttpPost) request;
|
||||
StringEntity content = new StringEntity(json, "application/json;charset=UTF-8");
|
||||
post.setEntity(content);
|
||||
}
|
||||
if (request.getMethod().equals(HttpPut.METHOD_NAME)) {
|
||||
HttpPut put = (HttpPut) request;
|
||||
StringEntity content = new StringEntity(json, "application/json;charset=UTF-8");
|
||||
put.setEntity(content);
|
||||
}
|
||||
String body = StringUtils.EMPTY;
|
||||
if (proxyClient != null) {
|
||||
body = proxyClient.sendToZeppelinHub(request, withResponse);
|
||||
} else {
|
||||
LOG.error("Proxy client request was submitted while not correctly initialized");
|
||||
}
|
||||
return body;
|
||||
}
|
||||
|
||||
private String sendToZeppelinHub(HttpMethod method,
|
||||
|
|
@ -243,6 +284,9 @@ public class ZeppelinhubRestApiHandler {
|
|||
public void close() {
|
||||
try {
|
||||
client.stop();
|
||||
if (proxyClient != null) {
|
||||
proxyClient.stop();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.info("Couldn't stop ZeppelinHub client properly", e);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue