initial rest api

This commit is contained in:
Khalid Huseynov 2016-04-23 13:42:28 +09:00
parent 7d00af4daf
commit 45ed9e1803
6 changed files with 538 additions and 0 deletions

View file

@ -127,6 +127,12 @@
<version>4.3.6</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>9.2.15.v20160210</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>

View file

@ -0,0 +1,129 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
/**
* ZeppelinHub repo class.
*/
public class ZeppelinHubRepo implements NotebookRepo {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
private static final String DEFAULT_SERVER = "https://www.zeppelinhub.com";
static final String ZEPPELIN_CONF_PROP_NAME_SERVER = "zeppelinhub.api.address";
static final String ZEPPELIN_CONF_PROP_NAME_TOKEN = "zeppelinhub.api.token";
private static final Gson GSON = new Gson();
private static final Note EMPTY_NOTE = new Note();
private String token;
private ZeppelinhubRestApiHandler zeppelinhubHandler;
public ZeppelinHubRepo(ZeppelinConfiguration conf) throws IOException {
String zeppelinHubUrl = getZeppelinHubUrl(conf);
LOG.info("Initializing ZeppelinHub integration module version ?");
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
zeppelinhubHandler = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
}
public void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) {
zeppelinhubHandler = zeppelinhub;
}
private String getZeppelinHubUrl(ZeppelinConfiguration conf) throws IOException {
if (conf == null) {
LOG.error("Invalid configuration, cannot be null");
throw new IOException("Configuration is null");
}
URI apiRoot;
String zeppelinhubUrl;
try {
String url = conf.getString("ZEPPELINHUB_API_ADDRESS",
ZEPPELIN_CONF_PROP_NAME_SERVER,
DEFAULT_SERVER);
apiRoot = new URI(url);
} catch (URISyntaxException e) {
LOG.error("Invalid zeppelinhub url", e);
throw new IOException(e);
}
String scheme = apiRoot.getScheme();
if (scheme == null) {
LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}",
apiRoot, DEFAULT_SERVER);
zeppelinhubUrl = DEFAULT_SERVER;
} else {
zeppelinhubUrl = scheme + "://" + apiRoot.getHost();
if (apiRoot.getPort() > 0) {
zeppelinhubUrl += ":" + apiRoot.getPort();
}
}
return zeppelinhubUrl;
}
@Override
public List<NoteInfo> list() throws IOException {
String response = zeppelinhubHandler.asyncGet("");
List<NoteInfo> notes = GSON.fromJson(response, new TypeToken<List<NoteInfo>>() {}.getType());
if (notes == null) {
return Collections.emptyList();
}
LOG.info("ZeppelinHub REST API listing notes ");
return notes;
}
@Override
public Note get(String noteId) throws IOException {
if (StringUtils.isBlank(noteId)) {
return EMPTY_NOTE;
}
//String response = zeppelinhubHandler.get(noteId);
String response = zeppelinhubHandler.asyncGet(noteId);
Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
}
LOG.info("ZeppelinHub REST API get note {} ", noteId);
return note;
}
@Override
public void save(Note note) throws IOException {
if (note == null) {
throw new IOException("Zeppelinhub failed to save empty note");
}
String notebook = GSON.toJson(note);
zeppelinhubHandler.asyncPut(notebook);
LOG.info("ZeppelinHub REST API saving note {} ", note.id());
}
@Override
public void remove(String noteId) throws IOException {
zeppelinhubHandler.asyncDel(noteId);
LOG.info("ZeppelinHub REST API removing note {} ", noteId);
}
@Override
public void close() {
}
@Override
public void checkpoint(String noteId, String checkPointName) throws IOException {
}
}

View file

@ -0,0 +1,194 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* REST API handler.
*/
public class ZeppelinhubRestApiHandler {
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token";
private static final String DEFAULT_API_PATH = "/api/v1/zeppelin";
private static boolean PROXY_ON = false;
private static String PROXY_HOST;
private static int PROXY_PORT;
private final HttpClient client;
private final String zepelinhubUrl;
private final String token;
public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl,
String token) {
return new ZeppelinhubRestApiHandler(zeppelinhubUrl, token);
}
private ZeppelinhubRestApiHandler(String zeppelinhubUrl, String token) {
this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/";
this.token = token;
//TODO(khalid):to make proxy conf consistent with Zeppelin confs
//readProxyConf();
client = getAsyncClient();
try {
client.start();
} catch (Exception e) {
LOG.error("Cannot initialize ZeppelinHub REST async client", e);
}
}
private void readProxyConf() {
//try reading http_proxy
String proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ?
System.getenv("HTTP_PROXY") : System.getenv("http_proxy");
if (StringUtils.isBlank(proxyHostString)) {
//try https_proxy if no http_proxy
proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ?
System.getenv("HTTPS_PROXY") : System.getenv("https_proxy");
}
if (StringUtils.isBlank(proxyHostString)) {
PROXY_ON = false;
} else {
// host format - http://domain:port/
String[] parts = proxyHostString.replaceAll("/", "").split(":");
if (parts.length != 3) {
LOG.warn("Proxy host format is incorrect {}, e.g. http://domain:port/", proxyHostString);
PROXY_ON = false;
return;
}
PROXY_HOST = parts[1];
PROXY_PORT = Integer.parseInt(parts[2]);
LOG.info("Proxy protocol: {}, domain: {}, port: {}", parts[0], parts[1], parts[2]);
PROXY_ON = true;
}
}
private HttpClient getAsyncClient() {
SslContextFactory sslContextFactory = new SslContextFactory();
HttpClient httpClient = new HttpClient(sslContextFactory);
// Configure HttpClient
httpClient.setFollowRedirects(false);
httpClient.setMaxConnectionsPerDestination(100);
// Config considerations
//TODO(khalid): consider using proxy
//TODO(khalid): consider whether require to follow redirects
//TODO(khalid): consider multi-threaded connection manager case
return httpClient;
}
public String asyncGet(String argument) throws IOException {
String note = StringUtils.EMPTY;
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest(zepelinhubUrl + argument)
.header(ZEPPELIN_TOKEN_HEADER, token)
.send(listener);
// Wait for the response headers to arrive
Response response;
try {
response = listener.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException | ExecutionException e) {
LOG.error("Cannot perform Get request to ZeppelinHub", e);
throw new IOException("Cannot load note from ZeppelinHub", e);
}
int code = response.getStatus();
if (code == 200) {
try (InputStream responseContent = listener.getInputStream()) {
note = IOUtils.toString(responseContent, "UTF-8");
}
} else {
LOG.error("ZeppelinHub Get {} returned with status {} ", zepelinhubUrl + argument, code);
throw new IOException("Cannot load note from ZeppelinHub");
}
return note;
}
public void asyncPut(String jsonNote) throws IOException {
if (StringUtils.isBlank(jsonNote)) {
LOG.error("Cannot save empty note/string to ZeppelinHub");
return;
}
client.newRequest(zepelinhubUrl).method(HttpMethod.PUT)
.header(ZEPPELIN_TOKEN_HEADER, token)
.content(new StringContentProvider(jsonNote, "UTF-8"), "application/json;charset=UTF-8")
.send(new BufferingResponseListener() {
@Override
public void onComplete(Result res) {
if (!res.isFailed() && res.getResponse().getStatus() == 200) {
LOG.info("Successfully saved note to ZeppelinHub with {}",
res.getResponse().getStatus());
} else {
LOG.warn("Failed to save note to ZeppelinHub with HttpStatus {}",
res.getResponse().getStatus());
}
}
@Override
public void onFailure(Response response, Throwable failure) {
LOG.error("Failed to save note to ZeppelinHub: {}", response.getReason(), failure);
}
});
}
public void asyncDel(String argument) {
if (StringUtils.isBlank(argument)) {
LOG.error("Cannot delete empty note from ZeppelinHub");
return;
}
client.newRequest(zepelinhubUrl + argument)
.method(HttpMethod.DELETE)
.header(ZEPPELIN_TOKEN_HEADER, token)
.send(new BufferingResponseListener() {
@Override
public void onComplete(Result res) {
if (!res.isFailed() && res.getResponse().getStatus() == 200) {
LOG.info("Successfully removed note from ZeppelinHub with {}",
res.getResponse().getStatus());
} else {
LOG.warn("Failed to remove note from ZeppelinHub with HttpStatus {}",
res.getResponse().getStatus());
}
}
@Override
public void onFailure(Response response, Throwable failure) {
LOG.error("Failed to remove note from ZeppelinHub: {}", response.getReason(), failure);
}
});
}
public void close() {
try {
client.stop();
} catch (Exception e) {
LOG.info("Couldn't stop ZeppelinHub client properly", e);
}
}
}

View file

@ -0,0 +1,152 @@
package org.apache.zeppelin.notebook.repo.zeppelinhub;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.util.List;
import org.apache.commons.httpclient.HttpException;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.junit.Before;
import org.junit.Test;
import com.google.common.io.Files;
public class ZeppelinHubRepoTest {
final String TOKEN = "AAA-BBB-CCC-00";
final String testAddr = "http://zeppelinhub.ltd";
private ZeppelinHubRepo repo;
private File pathOfNotebooks = new File(System.getProperty("user.dir") + "/src/test/resources/list_of_notes");
private File pathOfNotebook = new File(System.getProperty("user.dir") + "/src/test/resources/note");
@Before
public void setUp() throws Exception {
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, testAddr);
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_TOKEN, "AAA-BBB-CCC-00");
ZeppelinConfiguration conf = new ZeppelinConfiguration();
repo = new ZeppelinHubRepo(conf);
repo.setZeppelinhubRestApiHandler(getMockedZeppelinHandler());
}
private ZeppelinhubRestApiHandler getMockedZeppelinHandler() throws HttpException, IOException {
ZeppelinhubRestApiHandler mockedZeppelinhubHandler = mock(ZeppelinhubRestApiHandler.class);
byte[] response = Files.toByteArray(pathOfNotebooks);
when(mockedZeppelinhubHandler.asyncGet("")).thenReturn(new String(response));
response = Files.toByteArray(pathOfNotebook);
when(mockedZeppelinhubHandler.asyncGet("AAAAA")).thenReturn(new String(response));
//when(mockedZeppelinhubHandler.asyncDel("AAAAA")).thenReturn(true);
//when(mockedZeppelinhubHandler.asyncDel("BBBBB")).thenReturn(false);
return mockedZeppelinhubHandler;
}
/*
@Test
public void testGetZeppelinUrl() throws IOException {
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, testAddr);
ZeppelinConfiguration config = new ZeppelinConfiguration();
ZeppelinHubRepo repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinUrl(config)).isEqualTo("http://zeppelinhub.ltd");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "yolow");
config = new ZeppelinConfiguration();
repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinUrl(config)).isEqualTo("https://www.zeppelinhub.com");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "http://zeppelinhub.ltd:4242");
config = new ZeppelinConfiguration();
repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinUrl(config)).isEqualTo("http://zeppelinhub.ltd:4242");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "http://zeppelinhub.ltd:0");
config = new ZeppelinConfiguration();
repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinUrl(config)).isEqualTo("http://zeppelinhub.ltd");
}
@Test
public void testGetZeppelinHubWsEndpoint() throws IOException, URISyntaxException {
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, testAddr);
ZeppelinConfiguration config = new ZeppelinConfiguration();
ZeppelinHubRepo repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinHubWsUri()).isEqualTo("ws://zeppelinhub.ltd:80/async");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "https://zeppelinhub.ltd");
config = new ZeppelinConfiguration();
repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinHubWsUri()).isEqualTo("wss://zeppelinhub.ltd:443/async");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "yolow");
config = new ZeppelinConfiguration();
repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinHubWsUri()).isEqualTo("wss://www.zeppelinhub.com:443/async");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "http://zeppelinhub.ltd:4242");
config = new ZeppelinConfiguration();
repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinHubWsUri()).isEqualTo("ws://zeppelinhub.ltd:4242/async");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "https://www.zeppelinhub.com");
config = new ZeppelinConfiguration();
repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinHubWsUri()).isEqualTo("wss://www.zeppelinhub.com:443/async");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "http://www.zeppelinhub.com");
config = new ZeppelinConfiguration();
repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinHubWsUri()).isEqualTo("ws://www.zeppelinhub.com:80/async");
System.setProperty(ZeppelinHubRepo.ZEPPELIN_CONF_PROP_NAME_SERVER, "https://www.zeppelinhub.com:4242");
config = new ZeppelinConfiguration();
repository = new ZeppelinHubRepo(config);
assertThat(repository.getZeppelinHubWsUri()).isEqualTo("wss://www.zeppelinhub.com:4242/async");
}
*/
@Test
public void testGetAllNotes() throws IOException {
List<NoteInfo> notebooks = repo.list();
assertThat(notebooks).isNotEmpty();
assertThat(notebooks.size()).isEqualTo(3);
}
@Test
public void testGetNote() throws IOException {
Note notebook = repo.get("AAAAA");
assertThat(notebook).isNotNull();
assertThat(notebook.id()).isEqualTo("2A94M5J1Z");
}
@Test
public void testRemoveNote() throws IOException {
// not suppose to throw
repo.remove("AAAAA");
}
@Test
public void testRemoveNoteError() throws IOException {
// not suppose to throw
repo.remove("BBBBB");
}
}

View file

@ -0,0 +1,41 @@
[
{
"id": "2ABSFSR35",
"name": "ES RDD",
"config": {
"looknfeel": "default",
"codeHighlightStyle": "GitHub",
"codeHighlightStyleOrig": "GitHub"
},
"info": {},
"paragraphs": [],
"angularObjects": {},
"lastUpdate": 1445663884000
},
{
"id": "2A94M5J1Z",
"name": "Zeppelin Tutorial",
"config": {
"looknfeel": "default"},
"info": {},
"paragraphs": [],
"angularObjects": {
"2AMFHG1GQ": [],
"2APT3NC5T": [],
"2ANPJTJRQ": [],
"2AMJZ9R4C": [],
"2ANJFBYJS": [],
"2APKU6T1J": []},
"lastUpdate": 1446688883000
},
{
"id": "2AVSEYW1R",
"name": "NFLabs - Tracking beta",
"config": {
"looknfeel": "default"},
"info": {},
"paragraphs": [],
"angularObjects": {},
"lastUpdate": 1445663888000
}
]

View file

@ -0,0 +1,16 @@
{
"id": "2A94M5J1Z",
"name": "Zeppelin Tutorial",
"config": {
"looknfeel": "default"},
"info": {},
"paragraphs": [],
"angularObjects": {
"2AMFHG1GQ": [],
"2APT3NC5T": [],
"2ANPJTJRQ": [],
"2AMJZ9R4C": [],
"2ANJFBYJS": [],
"2APKU6T1J": []},
"lastUpdate": 1446688883000
}