mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
add more doc to explain this class's responsibility.
This commit is contained in:
parent
d268ee72ec
commit
af728bd2cd
7 changed files with 42 additions and 38 deletions
|
|
@ -22,6 +22,9 @@ import java.util.function.Consumer;
|
|||
|
||||
/**
|
||||
* Broadcast Service Adapter
|
||||
* Service for broadcast messaging between nodes.
|
||||
* The broadcast service is an unreliable broadcast messaging service backed by multicast.
|
||||
* This service provides no guaranteed regarding reliability or order of messages.
|
||||
*/
|
||||
public class BroadcastServiceAdapter implements BroadcastService {
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -122,7 +122,7 @@ import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATIO
|
|||
* 3. Cluster monitoring
|
||||
*/
|
||||
public abstract class ClusterManager {
|
||||
private static Logger logger = LoggerFactory.getLogger(ClusterManager.class);
|
||||
private static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class);
|
||||
|
||||
public final ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
|
||||
|
||||
|
|
@ -140,7 +140,7 @@ public abstract class ClusterManager {
|
|||
protected LocalRaftProtocolFactory protocolFactory
|
||||
= new LocalRaftProtocolFactory(protocolSerializer);
|
||||
protected List<MessagingService> messagingServices = new ArrayList<>();
|
||||
protected Collection<MemberId> clusterMemberIds = new ArrayList<MemberId>();
|
||||
protected List<MemberId> clusterMemberIds = new ArrayList<MemberId>();
|
||||
|
||||
protected AtomicBoolean running = new AtomicBoolean(true);
|
||||
|
||||
|
|
@ -151,10 +151,6 @@ public abstract class ClusterManager {
|
|||
// zeppelin server host & port
|
||||
protected String zeplServerHost = "";
|
||||
|
||||
// atomic dependent netty package path
|
||||
public static String ATOMIX_NETTY_LIB_PATH = "/lib/atomix-netty";
|
||||
public static String INTERPRETER_LIB_PATH = "/lib/interpreter";
|
||||
|
||||
public ClusterManager() {
|
||||
try {
|
||||
zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
|
||||
|
|
@ -178,10 +174,9 @@ public abstract class ClusterManager {
|
|||
}
|
||||
}
|
||||
} catch (UnknownHostException e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error(e.getMessage());
|
||||
} catch (SocketException e) {
|
||||
e.printStackTrace();
|
||||
return;
|
||||
LOGGER.error(e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -214,13 +209,13 @@ public abstract class ClusterManager {
|
|||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
logger.info("RaftClientThread run() >>>");
|
||||
LOGGER.info("RaftClientThread run() >>>");
|
||||
|
||||
int raftClientPort = 0;
|
||||
try {
|
||||
raftClientPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error(e.getMessage());
|
||||
}
|
||||
|
||||
MemberId memberId = MemberId.from(ZEPL_CLIENT_ID + zeplServerHost + ":" + raftClientPort);
|
||||
|
|
@ -242,7 +237,7 @@ public abstract class ClusterManager {
|
|||
|
||||
raftSessionClient = createProxy(raftClient);
|
||||
|
||||
logger.info("RaftClientThread run() <<<");
|
||||
LOGGER.info("RaftClientThread run() <<<");
|
||||
}
|
||||
}).start();
|
||||
|
||||
|
|
@ -259,7 +254,7 @@ public abstract class ClusterManager {
|
|||
while (!raftInitialized()) {
|
||||
retry++;
|
||||
if (0 == retry % 30) {
|
||||
logger.error("Raft incomplete initialization! retry[{}]", retry);
|
||||
LOGGER.error("Raft incomplete initialization! retry[{}]", retry);
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
|
@ -276,14 +271,14 @@ public abstract class ClusterManager {
|
|||
// The operation was successfully deleted
|
||||
clusterMetaQueue.remove(metaEntity);
|
||||
} else {
|
||||
logger.error("Cluster Meta Consume faild!");
|
||||
LOGGER.error("Cluster Meta Consume faild!");
|
||||
}
|
||||
} else {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
|
|
@ -305,11 +300,11 @@ public abstract class ClusterManager {
|
|||
raftClient.close().get(3, TimeUnit.SECONDS);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error(e.getMessage());
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error(e.getMessage());
|
||||
} catch (TimeoutException e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -320,7 +315,7 @@ public abstract class ClusterManager {
|
|||
// put metadata into cluster metadata
|
||||
private boolean putClusterMeta(ClusterMetaEntity entity) {
|
||||
if (!raftInitialized()) {
|
||||
logger.error("Raft incomplete initialization!");
|
||||
LOGGER.error("Raft incomplete initialization!");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -328,8 +323,8 @@ public abstract class ClusterManager {
|
|||
String metaKey = entity.getKey();
|
||||
HashMap<String, Object> newMetaValue = entity.getValues();
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("putClusterMeta {} {}", metaType, metaKey);
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("putClusterMeta {} {}", metaType, metaKey);
|
||||
}
|
||||
|
||||
// add cluster name
|
||||
|
|
@ -348,7 +343,7 @@ public abstract class ClusterManager {
|
|||
|
||||
boolean result = putClusterMeta(metaEntity);
|
||||
if (false == result) {
|
||||
logger.warn("putClusterMeta failure, Cache metadata to queue.");
|
||||
LOGGER.warn("putClusterMeta failure, Cache metadata to queue.");
|
||||
clusterMetaQueue.add(metaEntity);
|
||||
}
|
||||
}
|
||||
|
|
@ -359,10 +354,10 @@ public abstract class ClusterManager {
|
|||
String metaKey = entity.getKey();
|
||||
|
||||
// Need to pay attention to delete metadata operations
|
||||
logger.info("deleteClusterMeta {} {}", metaType, metaKey);
|
||||
LOGGER.info("deleteClusterMeta {} {}", metaType, metaKey);
|
||||
|
||||
if (!raftInitialized()) {
|
||||
logger.error("Raft incomplete initialization!");
|
||||
LOGGER.error("Raft incomplete initialization!");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -371,7 +366,7 @@ public abstract class ClusterManager {
|
|||
clientSerializer.encode(entity)))
|
||||
.<Long>thenApply(clientSerializer::decode)
|
||||
.thenAccept(result -> {
|
||||
logger.info("deleteClusterMeta {}", result);
|
||||
LOGGER.info("deleteClusterMeta {}", result);
|
||||
});
|
||||
|
||||
return true;
|
||||
|
|
@ -380,11 +375,10 @@ public abstract class ClusterManager {
|
|||
// delete metadata from cluster metadata
|
||||
public void deleteClusterMeta(ClusterMetaType type, String key) {
|
||||
ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, type, key, null);
|
||||
//clusterMetaQueue.add(metaEntity);
|
||||
|
||||
boolean result = deleteClusterMeta(metaEntity);
|
||||
if (false == result) {
|
||||
logger.warn("deleteClusterMeta faild, Cache data to queue.");
|
||||
LOGGER.warn("deleteClusterMeta faild, Cache data to queue.");
|
||||
clusterMetaQueue.add(metaEntity);
|
||||
}
|
||||
}
|
||||
|
|
@ -394,7 +388,7 @@ public abstract class ClusterManager {
|
|||
ClusterMetaType metaType, String metaKey) {
|
||||
HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>();
|
||||
if (!raftInitialized()) {
|
||||
logger.error("Raft incomplete initialization!");
|
||||
LOGGER.error("Raft incomplete initialization!");
|
||||
return clusterMeta;
|
||||
}
|
||||
|
||||
|
|
@ -405,20 +399,19 @@ public abstract class ClusterManager {
|
|||
mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET,
|
||||
clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error(e.getMessage());
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error(e.getMessage());
|
||||
} catch (TimeoutException e) {
|
||||
e.printStackTrace();
|
||||
LOGGER.error(e.getMessage());
|
||||
}
|
||||
|
||||
if (null != mateData) {
|
||||
clusterMeta = clientSerializer.decode(mateData);
|
||||
}
|
||||
|
||||
logger.info("getClusterMeta >>> {}", clusterMeta.toString());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("getClusterMeta >>> {}", clusterMeta.toString());
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("getClusterMeta >>> {}", clusterMeta.toString());
|
||||
}
|
||||
|
||||
return clusterMeta;
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ import io.atomix.primitive.service.ServiceConfig;
|
|||
|
||||
/**
|
||||
* Cluster primitive type
|
||||
* Creating a custom distributed primitive is defining the primitive type.
|
||||
* To create a new type, implement the PrimitiveType interface
|
||||
*/
|
||||
public class ClusterPrimitiveType implements PrimitiveType {
|
||||
public static final ClusterPrimitiveType INSTANCE = new ClusterPrimitiveType();
|
||||
|
|
|
|||
|
|
@ -33,7 +33,10 @@ import org.slf4j.LoggerFactory;
|
|||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Cluster State Machine
|
||||
* Cluster State Machine for Zeppelin
|
||||
* The cluster state is implemented as a snapshot state machine.
|
||||
* The state machine stores the service and process metadata information of the cluster.
|
||||
* Metadata information can be manipulated by put, get, remove, index, and snapshot.
|
||||
*/
|
||||
public class ClusterStateMachine extends AbstractPrimitiveService {
|
||||
private static Logger logger = LoggerFactory.getLogger(ClusterStateMachine.class);
|
||||
|
|
|
|||
|
|
@ -22,7 +22,9 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Zeppelin ClusterMembershipEventListener
|
||||
* Entity capable of receiving device cluster-related events.
|
||||
* Listen for new zeppelin servers to join or leave the cluster,
|
||||
* Monitor whether the metadata in the cluster server changes
|
||||
*/
|
||||
public class ZeppelinClusterMembershipEventListener implements ClusterMembershipEventListener {
|
||||
private static Logger logger
|
||||
|
|
|
|||
|
|
@ -48,7 +48,8 @@ import java.util.function.Consumer;
|
|||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Cluster Raft client protocol.
|
||||
* Protocol for intercommunication between Raft clients for each server in the cluster.
|
||||
* Communication protocol for handling sessions, queries, commands, and services within the cluster.
|
||||
*/
|
||||
public class LocalRaftClientProtocol extends LocalRaftProtocol implements RaftClientProtocol {
|
||||
private Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> heartbeatHandler;
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ import java.util.function.Consumer;
|
|||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Raft server messaging protocol.
|
||||
* Raft server messaging protocol between Raft Servers for each server in the cluster.
|
||||
*/
|
||||
public class RaftServerMessagingProtocol extends RaftMessagingProtocol
|
||||
implements RaftServerProtocol {
|
||||
|
|
|
|||
Loading…
Reference in a new issue