Refactor ZeppelinHub rest API handler

- Now takes a token on every http request
This commit is contained in:
Anthony Corbacho 2016-11-18 19:16:53 +01:00
parent 3fbfcfa4e5
commit 674fb93391

View file

@ -18,12 +18,16 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
@ -35,6 +39,9 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
* REST API handler.
*
@ -42,6 +49,7 @@ import org.slf4j.LoggerFactory;
public class ZeppelinhubRestApiHandler {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token";
private static final String USER_SESSION_HEADER = "X-User-Session";
private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
private static boolean PROXY_ON = false;
private static String PROXY_HOST;
@ -49,16 +57,13 @@ public class ZeppelinhubRestApiHandler {
private final HttpClient client;
private final String zepelinhubUrl;
private final String token;
public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl,
String token) {
public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl, String token) {
return new ZeppelinhubRestApiHandler(zeppelinhubUrl, token);
}
private ZeppelinhubRestApiHandler(String zeppelinhubUrl, String token) {
this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
this.token = token;
//TODO(khalid):to make proxy conf consistent with Zeppelin confs
//readProxyConf();
@ -114,35 +119,75 @@ public class ZeppelinhubRestApiHandler {
return httpClient;
}
public String asyncGet(String argument) throws IOException {
return sendToZeppelinHub(HttpMethod.GET, zepelinhubUrl + argument, StringUtils.EMPTY, true);
/**
* Fetch zeppelin instances for a given user.
* @param ticket
* @return
* @throws IOException
*/
public List<Instance> asyncGetInstances(String ticket) throws IOException {
InputStreamResponseListener listener = new InputStreamResponseListener();
Response response;
String url = zepelinhubUrl + "instances";
String data;
Request request = client.newRequest(url).header(USER_SESSION_HEADER, ticket);
request.send(listener);
try {
response = listener.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
LOG.error("Cannot perform request to ZeppelinHub", e);
throw new IOException("Cannot perform GET request to ZeppelinHub", e);
}
int code = response.getStatus();
if (code == 200) {
try (InputStream responseContent = listener.getInputStream()) {
data = IOUtils.toString(responseContent, "UTF-8");
}
} else {
LOG.error("ZeppelinHub GET {} returned with status {} ", url, code);
throw new IOException("Cannot perform GET request to ZeppelinHub");
}
Type listType = new TypeToken<ArrayList<Instance>>() {}.getType();
return new Gson().fromJson(data, listType);
}
public String asyncGet(String token, String argument) throws IOException {
String url = zepelinhubUrl + argument;
return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true);
}
public String asyncPutWithResponseBody(String url, String json) throws IOException {
public String asyncPutWithResponseBody(String token, String url, String json) throws IOException {
if (StringUtils.isBlank(url) || StringUtils.isBlank(json)) {
LOG.error("Empty note, cannot send it to zeppelinHub");
throw new IOException("Cannot send emtpy note to zeppelinHub");
}
return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, true);
return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true);
}
public void asyncPut(String jsonNote) throws IOException {
public void asyncPut(String token, String jsonNote) throws IOException {
if (StringUtils.isBlank(jsonNote)) {
LOG.error("Cannot save empty note/string to ZeppelinHub");
return;
}
sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, false);
sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false);
}
public void asyncDel(String argument) throws IOException {
public void asyncDel(String token, String argument) throws IOException {
if (StringUtils.isBlank(argument)) {
LOG.error("Cannot delete empty note from ZeppelinHub");
return;
}
sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, false);
sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, false);
}
private String sendToZeppelinHub(HttpMethod method, String url, String json, boolean withResponse)
private String sendToZeppelinHub(HttpMethod method,
String url,
String json,
String token,
boolean withResponse)
throws IOException {
Request request = client.newRequest(url).method(method).header(ZEPPELIN_TOKEN_HEADER, token);
if ((method.equals(HttpMethod.PUT) || method.equals(HttpMethod.POST))