mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
code cleanup & walkthrough
also removing Retryable and renaming ZeppelinhubConnection
This commit is contained in:
parent
636b9fa58d
commit
4c47e43116
7 changed files with 5 additions and 78 deletions
|
|
@ -38,7 +38,6 @@ import com.google.gson.reflect.TypeToken;
|
|||
/**
|
||||
* ZeppelinHub repo class.
|
||||
*/
|
||||
|
||||
public class ZeppelinHubRepo implements NotebookRepo {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ZeppelinhubRestApiHandler.class);
|
||||
private static final String DEFAULT_SERVER = "https://www.zeppelinhub.com";
|
||||
|
|
|
|||
|
|
@ -50,9 +50,9 @@ public class Client {
|
|||
public void start() {
|
||||
if (zeppelinhubClient != null) {
|
||||
zeppelinhubClient.start();
|
||||
if (zeppelinClient != null) {
|
||||
zeppelinClient.start();
|
||||
}
|
||||
}
|
||||
if (zeppelinClient != null) {
|
||||
zeppelinClient.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -65,7 +65,7 @@ public class Client {
|
|||
}
|
||||
}
|
||||
|
||||
public void relayToHub(String message) {
|
||||
public void relayToZeppelinHub(String message) {
|
||||
zeppelinhubClient.send(message);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -197,7 +197,7 @@ public class ZeppelinClient {
|
|||
LOG.warn("Client isn't initialized yet");
|
||||
return;
|
||||
}
|
||||
client.relayToHub(hubMsg.serialize());
|
||||
client.relayToZeppelinHub(hubMsg.serialize());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -189,9 +189,6 @@ public class ZeppelinhubClient {
|
|||
case RUN_NOTEBOOK:
|
||||
runAllParagraph(hubMsg.meta.get("noteId"), msg);
|
||||
break;
|
||||
case PONG:
|
||||
// do nothing
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Received {} from ZeppelinHub, not handled", op);
|
||||
break;
|
||||
|
|
|
|||
|
|
@ -63,7 +63,6 @@ public class ZeppelinhubWebsocket implements WebSocketListener {
|
|||
|
||||
@Override
|
||||
public void onWebSocketText(String message) {
|
||||
LOG.info("Got msg {}", message);
|
||||
// handle message from ZeppelinHub.
|
||||
ZeppelinhubClient client = ZeppelinhubClient.getInstance();
|
||||
if (client != null) {
|
||||
|
|
|
|||
|
|
@ -1,68 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.scheduler;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* todo: add description.
|
||||
*
|
||||
* @author anthonyc
|
||||
*
|
||||
*/
|
||||
abstract class Retryable {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Retryable.class);
|
||||
protected abstract void attempt() throws Exception;
|
||||
|
||||
/*
|
||||
* @delay: initial delay (ms)
|
||||
*
|
||||
* @interval: interval between re-trials (ms)
|
||||
*
|
||||
* @timeout: timeout after which re-trials are aborted (ms)
|
||||
*/
|
||||
public void execute(long delay, long interval, long timeout) throws Exception {
|
||||
long start = System.currentTimeMillis();
|
||||
if (delay > 0L) {
|
||||
sleep(delay);
|
||||
}
|
||||
while (true) {
|
||||
try {
|
||||
attempt();
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
if (System.currentTimeMillis() - start < timeout) {
|
||||
LOG.info("Retrying in " + interval + " ms");
|
||||
sleep(interval);
|
||||
// continue
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void sleep(long millis) {
|
||||
try {
|
||||
Thread.sleep(millis);
|
||||
} catch (InterruptedException interruptedException) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in a new issue