diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java index 43cc5e20b2..34e3b6f350 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/BroadcastServiceAdapter.java @@ -22,6 +22,9 @@ import java.util.function.Consumer; /** * Broadcast Service Adapter + * Service for broadcast messaging between nodes. + * The broadcast service is an unreliable broadcast messaging service backed by multicast. + * This service provides no guaranteed regarding reliability or order of messages. */ public class BroadcastServiceAdapter implements BroadcastService { @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java index b8dd746da4..683f068f3c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java @@ -122,7 +122,7 @@ import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATIO * 3. Cluster monitoring */ public abstract class ClusterManager { - private static Logger logger = LoggerFactory.getLogger(ClusterManager.class); + private static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class); public final ZeppelinConfiguration zconf = ZeppelinConfiguration.create(); @@ -140,7 +140,7 @@ public abstract class ClusterManager { protected LocalRaftProtocolFactory protocolFactory = new LocalRaftProtocolFactory(protocolSerializer); protected List messagingServices = new ArrayList<>(); - protected Collection clusterMemberIds = new ArrayList(); + protected List clusterMemberIds = new ArrayList(); protected AtomicBoolean running = new AtomicBoolean(true); @@ -151,10 +151,6 @@ public abstract class ClusterManager { // zeppelin server host & port protected String zeplServerHost = ""; - // atomic dependent netty package path - public static String ATOMIX_NETTY_LIB_PATH = "/lib/atomix-netty"; - public static String INTERPRETER_LIB_PATH = "/lib/interpreter"; - public ClusterManager() { try { zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress(); @@ -178,10 +174,9 @@ public abstract class ClusterManager { } } } catch (UnknownHostException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } catch (SocketException e) { - e.printStackTrace(); - return; + LOGGER.error(e.getMessage()); } } @@ -214,13 +209,13 @@ public abstract class ClusterManager { new Thread(new Runnable() { @Override public void run() { - logger.info("RaftClientThread run() >>>"); + LOGGER.info("RaftClientThread run() >>>"); int raftClientPort = 0; try { raftClientPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } MemberId memberId = MemberId.from(ZEPL_CLIENT_ID + zeplServerHost + ":" + raftClientPort); @@ -242,7 +237,7 @@ public abstract class ClusterManager { raftSessionClient = createProxy(raftClient); - logger.info("RaftClientThread run() <<<"); + LOGGER.info("RaftClientThread run() <<<"); } }).start(); @@ -259,7 +254,7 @@ public abstract class ClusterManager { while (!raftInitialized()) { retry++; if (0 == retry % 30) { - logger.error("Raft incomplete initialization! retry[{}]", retry); + LOGGER.error("Raft incomplete initialization! retry[{}]", retry); } Thread.sleep(100); } @@ -276,14 +271,14 @@ public abstract class ClusterManager { // The operation was successfully deleted clusterMetaQueue.remove(metaEntity); } else { - logger.error("Cluster Meta Consume faild!"); + LOGGER.error("Cluster Meta Consume faild!"); } } else { Thread.sleep(100); } } } catch (InterruptedException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } } }).start(); @@ -305,11 +300,11 @@ public abstract class ClusterManager { raftClient.close().get(3, TimeUnit.SECONDS); } } catch (InterruptedException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } catch (ExecutionException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } catch (TimeoutException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } } @@ -320,7 +315,7 @@ public abstract class ClusterManager { // put metadata into cluster metadata private boolean putClusterMeta(ClusterMetaEntity entity) { if (!raftInitialized()) { - logger.error("Raft incomplete initialization!"); + LOGGER.error("Raft incomplete initialization!"); return false; } @@ -328,8 +323,8 @@ public abstract class ClusterManager { String metaKey = entity.getKey(); HashMap newMetaValue = entity.getValues(); - if (logger.isDebugEnabled()) { - logger.debug("putClusterMeta {} {}", metaType, metaKey); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("putClusterMeta {} {}", metaType, metaKey); } // add cluster name @@ -348,7 +343,7 @@ public abstract class ClusterManager { boolean result = putClusterMeta(metaEntity); if (false == result) { - logger.warn("putClusterMeta failure, Cache metadata to queue."); + LOGGER.warn("putClusterMeta failure, Cache metadata to queue."); clusterMetaQueue.add(metaEntity); } } @@ -359,10 +354,10 @@ public abstract class ClusterManager { String metaKey = entity.getKey(); // Need to pay attention to delete metadata operations - logger.info("deleteClusterMeta {} {}", metaType, metaKey); + LOGGER.info("deleteClusterMeta {} {}", metaType, metaKey); if (!raftInitialized()) { - logger.error("Raft incomplete initialization!"); + LOGGER.error("Raft incomplete initialization!"); return false; } @@ -371,7 +366,7 @@ public abstract class ClusterManager { clientSerializer.encode(entity))) .thenApply(clientSerializer::decode) .thenAccept(result -> { - logger.info("deleteClusterMeta {}", result); + LOGGER.info("deleteClusterMeta {}", result); }); return true; @@ -380,11 +375,10 @@ public abstract class ClusterManager { // delete metadata from cluster metadata public void deleteClusterMeta(ClusterMetaType type, String key) { ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, type, key, null); - //clusterMetaQueue.add(metaEntity); boolean result = deleteClusterMeta(metaEntity); if (false == result) { - logger.warn("deleteClusterMeta faild, Cache data to queue."); + LOGGER.warn("deleteClusterMeta faild, Cache data to queue."); clusterMetaQueue.add(metaEntity); } } @@ -394,7 +388,7 @@ public abstract class ClusterManager { ClusterMetaType metaType, String metaKey) { HashMap> clusterMeta = new HashMap<>(); if (!raftInitialized()) { - logger.error("Raft incomplete initialization!"); + LOGGER.error("Raft incomplete initialization!"); return clusterMeta; } @@ -405,20 +399,19 @@ public abstract class ClusterManager { mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET, clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS); } catch (InterruptedException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } catch (ExecutionException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } catch (TimeoutException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } if (null != mateData) { clusterMeta = clientSerializer.decode(mateData); } - logger.info("getClusterMeta >>> {}", clusterMeta.toString()); - if (logger.isDebugEnabled()) { - logger.debug("getClusterMeta >>> {}", clusterMeta.toString()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("getClusterMeta >>> {}", clusterMeta.toString()); } return clusterMeta; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java index 081ea1023c..b4802a036d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterPrimitiveType.java @@ -25,6 +25,8 @@ import io.atomix.primitive.service.ServiceConfig; /** * Cluster primitive type + * Creating a custom distributed primitive is defining the primitive type. + * To create a new type, implement the PrimitiveType interface */ public class ClusterPrimitiveType implements PrimitiveType { public static final ClusterPrimitiveType INSTANCE = new ClusterPrimitiveType(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java index 2937181424..460f6ac3f1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterStateMachine.java @@ -33,7 +33,10 @@ import org.slf4j.LoggerFactory; import java.util.Map; /** - * Cluster State Machine + * Cluster State Machine for Zeppelin + * The cluster state is implemented as a snapshot state machine. + * The state machine stores the service and process metadata information of the cluster. + * Metadata information can be manipulated by put, get, remove, index, and snapshot. */ public class ClusterStateMachine extends AbstractPrimitiveService { private static Logger logger = LoggerFactory.getLogger(ClusterStateMachine.class); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java index 98bd62b7c3..6283813fa9 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/listener/ZeppelinClusterMembershipEventListener.java @@ -22,7 +22,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Zeppelin ClusterMembershipEventListener + * Entity capable of receiving device cluster-related events. + * Listen for new zeppelin servers to join or leave the cluster, + * Monitor whether the metadata in the cluster server changes */ public class ZeppelinClusterMembershipEventListener implements ClusterMembershipEventListener { private static Logger logger diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java index 77abde09d3..eb7a76281e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/LocalRaftClientProtocol.java @@ -48,7 +48,8 @@ import java.util.function.Consumer; import java.util.function.Function; /** - * Cluster Raft client protocol. + * Protocol for intercommunication between Raft clients for each server in the cluster. + * Communication protocol for handling sessions, queries, commands, and services within the cluster. */ public class LocalRaftClientProtocol extends LocalRaftProtocol implements RaftClientProtocol { private Function> heartbeatHandler; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java index df0f2dc5a7..bae52bfc05 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/protocol/RaftServerMessagingProtocol.java @@ -62,7 +62,7 @@ import java.util.function.Consumer; import java.util.function.Function; /** - * Raft server messaging protocol. + * Raft server messaging protocol between Raft Servers for each server in the cluster. */ public class RaftServerMessagingProtocol extends RaftMessagingProtocol implements RaftServerProtocol {