address @bzz comments

This commit is contained in:
Khalid Huseynov 2016-05-25 15:28:01 +09:00
parent b1fe8a378e
commit 1c78d8c804
4 changed files with 22 additions and 26 deletions

View file

@ -239,26 +239,26 @@
<scope>test</scope>
</dependency>
<dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
</dependency>
<dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
</dependency>
<dependency>
<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-server</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
</dependency>
</dependencies>

View file

@ -28,7 +28,6 @@ import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
import org.apache.zeppelin.notebook.repo.zeppelinhub.security.Authentication;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -50,20 +49,20 @@ public class ZeppelinHubRepo implements NotebookRepo {
private final Client websocketClient;
private String token;
private ZeppelinhubRestApiHandler zeppelinhubHandler;
private ZeppelinhubRestApiHandler restApiClient;
public ZeppelinHubRepo(ZeppelinConfiguration conf) {
String zeppelinHubUrl = getZeppelinHubUrl(conf);
LOG.info("Initializing ZeppelinHub integration module");
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
zeppelinhubHandler = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);
websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
getZeppelinhubWebsocketUri(conf), token, conf);
websocketClient.start();
}
private String getZeppelinHubWsUri(URI api) throws IOException {
private String getZeppelinHubWsUri(URI api) throws URISyntaxException {
URI apiRoot = api;
String scheme = apiRoot.getScheme();
int port = apiRoot.getPort();
@ -74,12 +73,7 @@ public class ZeppelinHubRepo implements NotebookRepo {
if (scheme == null) {
LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}",
apiRoot, DEFAULT_SERVER);
try {
apiRoot = new URI(DEFAULT_SERVER);
} catch (URISyntaxException e) {
LOG.error("Invalid default zeppelinhub url {}", DEFAULT_SERVER, e);
throw new IOException(e);
}
apiRoot = new URI(DEFAULT_SERVER);
scheme = apiRoot.getScheme();
port = apiRoot.getPort();
if (port <= 0) {
@ -95,8 +89,8 @@ public class ZeppelinHubRepo implements NotebookRepo {
try {
zeppelinHubUri = getZeppelinHubWsUri(new URI(conf.getString("ZEPPELINHUB_API_ADDRESS",
ZEPPELIN_CONF_PROP_NAME_SERVER, DEFAULT_SERVER)));
} catch (URISyntaxException | IOException e) {
LOG.error("Cannot get zeppelinhub URI", e);
} catch (URISyntaxException e) {
LOG.error("Cannot get ZeppelinHub URI", e);
}
return zeppelinHubUri;
}
@ -110,8 +104,9 @@ public class ZeppelinHubRepo implements NotebookRepo {
return ws + "://localhost:" + port + "/ws";
}
public void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) {
zeppelinhubHandler = zeppelinhub;
// Used in tests
void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) {
restApiClient = zeppelinhub;
}
String getZeppelinHubUrl(ZeppelinConfiguration conf) {
@ -147,7 +142,7 @@ public class ZeppelinHubRepo implements NotebookRepo {
@Override
public List<NoteInfo> list() throws IOException {
String response = zeppelinhubHandler.asyncGet("");
String response = restApiClient.asyncGet("");
List<NoteInfo> notes = GSON.fromJson(response, new TypeToken<List<NoteInfo>>() {}.getType());
if (notes == null) {
return Collections.emptyList();
@ -162,7 +157,7 @@ public class ZeppelinHubRepo implements NotebookRepo {
return EMPTY_NOTE;
}
//String response = zeppelinhubHandler.get(noteId);
String response = zeppelinhubHandler.asyncGet(noteId);
String response = restApiClient.asyncGet(noteId);
Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
@ -177,13 +172,13 @@ public class ZeppelinHubRepo implements NotebookRepo {
throw new IOException("Zeppelinhub failed to save empty note");
}
String notebook = GSON.toJson(note);
zeppelinhubHandler.asyncPut(notebook);
restApiClient.asyncPut(notebook);
LOG.info("ZeppelinHub REST API saving note {} ", note.id());
}
@Override
public void remove(String noteId) throws IOException {
zeppelinhubHandler.asyncDel(noteId);
restApiClient.asyncDel(noteId);
LOG.info("ZeppelinHub REST API removing note {} ", noteId);
}

View file

@ -22,7 +22,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TODO(xxx): Add description
* Client to connect Zeppelin and ZeppelinHub via websocket API.
* Implemented using singleton pattern.
*
*/
public class Client {

View file

@ -129,7 +129,7 @@ public class ZeppelinhubClient {
}
private boolean isConnectedToZeppelinhub() {
return (zeppelinhubSession == null || !zeppelinhubSession.isSessionOpen()) ? false : true;
return (zeppelinhubSession != null && zeppelinhubSession.isSessionOpen());
}
private ZeppelinhubSession connect() {