jetty client relay to asyncclient when proxy on

This commit is contained in:
Khalid Huseynov 2017-03-28 22:00:21 +09:00
parent 16ccbb4616
commit a5711bc1dd

View file

@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
@ -27,6 +29,12 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.entity.StringEntity;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
@ -52,11 +60,10 @@ public class ZeppelinhubRestApiHandler {
private static final String USER_SESSION_HEADER = "X-User-Session";
private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
private static boolean PROXY_ON = false;
private static String PROXY_HOST;
private static int PROXY_PORT;
//TODO(xxx): possibly switch to jetty-client > 9.3.12 when adopt jvm 1.8
private static HttpProxyClient proxyClient;
private final HttpClient client;
private final String zepelinhubUrl;
private String zepelinhubUrl;
public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl) {
return new ZeppelinhubRestApiHandler(zeppelinhubUrl);
@ -65,8 +72,7 @@ public class ZeppelinhubRestApiHandler {
private ZeppelinhubRestApiHandler(String zeppelinhubUrl) {
this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
//TODO(khalid):to make proxy conf consistent with Zeppelin confs
//readProxyConf();
readProxyConf();
client = getAsyncClient();
try {
@ -74,48 +80,41 @@ public class ZeppelinhubRestApiHandler {
} catch (Exception e) {
LOG.error("Cannot initialize ZeppelinHub REST async client", e);
}
}
private void readProxyConf() {
//try reading http_proxy
String proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
//try reading https_proxy
String proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
if (StringUtils.isBlank(proxyHostString)) {
//try https_proxy if no http_proxy
proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
//try http_proxy if no https_proxy
proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
}
if (StringUtils.isBlank(proxyHostString)) {
PROXY_ON = false;
} else {
// host format - http://domain:port/
String[] parts = proxyHostString.replaceAll("/", "").split(":");
if (parts.length != 3) {
LOG.warn("Proxy host format is incorrect {}, e.g. http://domain:port/", proxyHostString);
PROXY_ON = false;
return;
if (!StringUtils.isBlank(proxyHostString)) {
URI uri = null;
try {
uri = new URI(proxyHostString);
} catch (URISyntaxException e) {
LOG.error("Proxy uri doesn't follow correct syntax", e);
}
if (uri != null) {
PROXY_ON = true;
proxyClient = HttpProxyClient.newInstance(uri);
}
PROXY_HOST = parts[1];
PROXY_PORT = Integer.parseInt(parts[2]);
LOG.info("Proxy protocol: {}, domain: {}, port: {}", parts[0], parts[1], parts[2]);
PROXY_ON = true;
}
}
private HttpClient getAsyncClient() {
SslContextFactory sslContextFactory = new SslContextFactory();
HttpClient httpClient = new HttpClient(sslContextFactory);
// Configure HttpClient
httpClient.setFollowRedirects(false);
httpClient.setMaxConnectionsPerDestination(100);
// Config considerations
//TODO(khalid): consider using proxy
//TODO(khalid): consider whether require to follow redirects
//TODO(khalid): consider multi-threaded connection manager case
// Config considerations
//TODO(khalid): consider multi-threaded connection manager case
return httpClient;
}
@ -159,7 +158,11 @@ public class ZeppelinhubRestApiHandler {
return StringUtils.EMPTY;
}
String url = zepelinhubUrl + argument;
return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
if (PROXY_ON) {
return sendToZeppelinHubViaProxy(new HttpGet(url), StringUtils.EMPTY, token, true);
} else {
return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
}
}
public String putWithResponseBody(String token, String url, String json) throws IOException {
@ -167,7 +170,11 @@ public class ZeppelinhubRestApiHandler {
LOG.error("Empty note, cannot send it to zeppelinHub");
throw new IOException("Cannot send emtpy note to zeppelinHub");
}
return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
if (PROXY_ON) {
return sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl + url), json, token, true);
} else {
return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
}
}
public void put(String token, String jsonNote) throws IOException {
@ -175,7 +182,11 @@ public class ZeppelinhubRestApiHandler {
LOG.error("Cannot save empty note/string to ZeppelinHub");
return;
}
sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
if (PROXY_ON) {
sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl), jsonNote, token, false);
} else {
sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
}
}
public void del(String token, String argument) throws IOException {
@ -183,7 +194,37 @@ public class ZeppelinhubRestApiHandler {
LOG.error("Cannot delete empty note from ZeppelinHub");
return;
}
sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, false);
if (PROXY_ON) {
sendToZeppelinHubViaProxy(new HttpDelete(zepelinhubUrl + argument), StringUtils.EMPTY, token,
false);
} else {
sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token,
false);
}
}
private String sendToZeppelinHubViaProxy(HttpRequestBase request,
String json,
String token,
boolean withResponse) throws IOException {
request.setHeader(ZEPPELIN_TOKEN_HEADER, token);
if (request.getMethod().equals(HttpPost.METHOD_NAME)) {
HttpPost post = (HttpPost) request;
StringEntity content = new StringEntity(json, "application/json;charset=UTF-8");
post.setEntity(content);
}
if (request.getMethod().equals(HttpPut.METHOD_NAME)) {
HttpPut put = (HttpPut) request;
StringEntity content = new StringEntity(json, "application/json;charset=UTF-8");
put.setEntity(content);
}
String body = StringUtils.EMPTY;
if (proxyClient != null) {
body = proxyClient.sendToZeppelinHub(request, withResponse);
} else {
LOG.error("Proxy client request was submitted while not correctly initialized");
}
return body;
}
private String sendToZeppelinHub(HttpMethod method,
@ -243,6 +284,9 @@ public class ZeppelinhubRestApiHandler {
public void close() {
try {
client.stop();
if (proxyClient != null) {
proxyClient.stop();
}
} catch (Exception e) {
LOG.info("Couldn't stop ZeppelinHub client properly", e);
}