add more doc to explain this class's responsibility.

This commit is contained in:
liuxunorg 2018-09-26 14:47:29 +08:00
parent d268ee72ec
commit af728bd2cd
7 changed files with 42 additions and 38 deletions

View file

@ -22,6 +22,9 @@ import java.util.function.Consumer;
/**
* Broadcast Service Adapter
* Service for broadcast messaging between nodes.
* The broadcast service is an unreliable broadcast messaging service backed by multicast.
* This service provides no guaranteed regarding reliability or order of messages.
*/
public class BroadcastServiceAdapter implements BroadcastService {
@Override

View file

@ -122,7 +122,7 @@ import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATIO
* 3. Cluster monitoring
*/
public abstract class ClusterManager {
private static Logger logger = LoggerFactory.getLogger(ClusterManager.class);
private static Logger LOGGER = LoggerFactory.getLogger(ClusterManager.class);
public final ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
@ -140,7 +140,7 @@ public abstract class ClusterManager {
protected LocalRaftProtocolFactory protocolFactory
= new LocalRaftProtocolFactory(protocolSerializer);
protected List<MessagingService> messagingServices = new ArrayList<>();
protected Collection<MemberId> clusterMemberIds = new ArrayList<MemberId>();
protected List<MemberId> clusterMemberIds = new ArrayList<MemberId>();
protected AtomicBoolean running = new AtomicBoolean(true);
@ -151,10 +151,6 @@ public abstract class ClusterManager {
// zeppelin server host & port
protected String zeplServerHost = "";
// atomic dependent netty package path
public static String ATOMIX_NETTY_LIB_PATH = "/lib/atomix-netty";
public static String INTERPRETER_LIB_PATH = "/lib/interpreter";
public ClusterManager() {
try {
zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
@ -178,10 +174,9 @@ public abstract class ClusterManager {
}
}
} catch (UnknownHostException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
} catch (SocketException e) {
e.printStackTrace();
return;
LOGGER.error(e.getMessage());
}
}
@ -214,13 +209,13 @@ public abstract class ClusterManager {
new Thread(new Runnable() {
@Override
public void run() {
logger.info("RaftClientThread run() >>>");
LOGGER.info("RaftClientThread run() >>>");
int raftClientPort = 0;
try {
raftClientPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
}
MemberId memberId = MemberId.from(ZEPL_CLIENT_ID + zeplServerHost + ":" + raftClientPort);
@ -242,7 +237,7 @@ public abstract class ClusterManager {
raftSessionClient = createProxy(raftClient);
logger.info("RaftClientThread run() <<<");
LOGGER.info("RaftClientThread run() <<<");
}
}).start();
@ -259,7 +254,7 @@ public abstract class ClusterManager {
while (!raftInitialized()) {
retry++;
if (0 == retry % 30) {
logger.error("Raft incomplete initialization! retry[{}]", retry);
LOGGER.error("Raft incomplete initialization! retry[{}]", retry);
}
Thread.sleep(100);
}
@ -276,14 +271,14 @@ public abstract class ClusterManager {
// The operation was successfully deleted
clusterMetaQueue.remove(metaEntity);
} else {
logger.error("Cluster Meta Consume faild!");
LOGGER.error("Cluster Meta Consume faild!");
}
} else {
Thread.sleep(100);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
}
}
}).start();
@ -305,11 +300,11 @@ public abstract class ClusterManager {
raftClient.close().get(3, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
} catch (ExecutionException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
} catch (TimeoutException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
}
}
@ -320,7 +315,7 @@ public abstract class ClusterManager {
// put metadata into cluster metadata
private boolean putClusterMeta(ClusterMetaEntity entity) {
if (!raftInitialized()) {
logger.error("Raft incomplete initialization!");
LOGGER.error("Raft incomplete initialization!");
return false;
}
@ -328,8 +323,8 @@ public abstract class ClusterManager {
String metaKey = entity.getKey();
HashMap<String, Object> newMetaValue = entity.getValues();
if (logger.isDebugEnabled()) {
logger.debug("putClusterMeta {} {}", metaType, metaKey);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("putClusterMeta {} {}", metaType, metaKey);
}
// add cluster name
@ -348,7 +343,7 @@ public abstract class ClusterManager {
boolean result = putClusterMeta(metaEntity);
if (false == result) {
logger.warn("putClusterMeta failure, Cache metadata to queue.");
LOGGER.warn("putClusterMeta failure, Cache metadata to queue.");
clusterMetaQueue.add(metaEntity);
}
}
@ -359,10 +354,10 @@ public abstract class ClusterManager {
String metaKey = entity.getKey();
// Need to pay attention to delete metadata operations
logger.info("deleteClusterMeta {} {}", metaType, metaKey);
LOGGER.info("deleteClusterMeta {} {}", metaType, metaKey);
if (!raftInitialized()) {
logger.error("Raft incomplete initialization!");
LOGGER.error("Raft incomplete initialization!");
return false;
}
@ -371,7 +366,7 @@ public abstract class ClusterManager {
clientSerializer.encode(entity)))
.<Long>thenApply(clientSerializer::decode)
.thenAccept(result -> {
logger.info("deleteClusterMeta {}", result);
LOGGER.info("deleteClusterMeta {}", result);
});
return true;
@ -380,11 +375,10 @@ public abstract class ClusterManager {
// delete metadata from cluster metadata
public void deleteClusterMeta(ClusterMetaType type, String key) {
ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, type, key, null);
//clusterMetaQueue.add(metaEntity);
boolean result = deleteClusterMeta(metaEntity);
if (false == result) {
logger.warn("deleteClusterMeta faild, Cache data to queue.");
LOGGER.warn("deleteClusterMeta faild, Cache data to queue.");
clusterMetaQueue.add(metaEntity);
}
}
@ -394,7 +388,7 @@ public abstract class ClusterManager {
ClusterMetaType metaType, String metaKey) {
HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>();
if (!raftInitialized()) {
logger.error("Raft incomplete initialization!");
LOGGER.error("Raft incomplete initialization!");
return clusterMeta;
}
@ -405,20 +399,19 @@ public abstract class ClusterManager {
mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET,
clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
} catch (ExecutionException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
} catch (TimeoutException e) {
e.printStackTrace();
LOGGER.error(e.getMessage());
}
if (null != mateData) {
clusterMeta = clientSerializer.decode(mateData);
}
logger.info("getClusterMeta >>> {}", clusterMeta.toString());
if (logger.isDebugEnabled()) {
logger.debug("getClusterMeta >>> {}", clusterMeta.toString());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("getClusterMeta >>> {}", clusterMeta.toString());
}
return clusterMeta;

View file

@ -25,6 +25,8 @@ import io.atomix.primitive.service.ServiceConfig;
/**
* Cluster primitive type
* Creating a custom distributed primitive is defining the primitive type.
* To create a new type, implement the PrimitiveType interface
*/
public class ClusterPrimitiveType implements PrimitiveType {
public static final ClusterPrimitiveType INSTANCE = new ClusterPrimitiveType();

View file

@ -33,7 +33,10 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* Cluster State Machine
* Cluster State Machine for Zeppelin
* The cluster state is implemented as a snapshot state machine.
* The state machine stores the service and process metadata information of the cluster.
* Metadata information can be manipulated by put, get, remove, index, and snapshot.
*/
public class ClusterStateMachine extends AbstractPrimitiveService {
private static Logger logger = LoggerFactory.getLogger(ClusterStateMachine.class);

View file

@ -22,7 +22,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Zeppelin ClusterMembershipEventListener
* Entity capable of receiving device cluster-related events.
* Listen for new zeppelin servers to join or leave the cluster,
* Monitor whether the metadata in the cluster server changes
*/
public class ZeppelinClusterMembershipEventListener implements ClusterMembershipEventListener {
private static Logger logger

View file

@ -48,7 +48,8 @@ import java.util.function.Consumer;
import java.util.function.Function;
/**
* Cluster Raft client protocol.
* Protocol for intercommunication between Raft clients for each server in the cluster.
* Communication protocol for handling sessions, queries, commands, and services within the cluster.
*/
public class LocalRaftClientProtocol extends LocalRaftProtocol implements RaftClientProtocol {
private Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> heartbeatHandler;

View file

@ -62,7 +62,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
/**
* Raft server messaging protocol.
* Raft server messaging protocol between Raft Servers for each server in the cluster.
*/
public class RaftServerMessagingProtocol extends RaftMessagingProtocol
implements RaftServerProtocol {