Cluster Raft module design

By using the Raft protocol, multiple Zeppelin-Server groups are built into a Zeppelin cluster, the cluster State Machine is maintained through the Raft protocol, and the services in the cluster are agreed upon. The Zeppelin-Server and Zeppelin-Interperter services and processes are stored in the Cluster MetaData. Metadata information;

[Feature]

* [x] add raft algorithm atomix jar
* [x] add cluster state machine
* [x] add state machine query command
* [x] add state machine delete command
* [x] add state machine put command
* [x] Isolate the netty JAR package introduced by atomix

* https://issues.apache.org/jira/browse/ZEPPELIN-3610

CI pass

* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? Yes
This commit is contained in:
liuxunorg 2018-08-21 22:57:44 +08:00
parent 92f244ef7e
commit 97f17acac5
20 changed files with 2610 additions and 8 deletions

View file

@ -81,7 +81,7 @@ if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes" ]]; then
ZEPPELIN_INTP_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes"
fi
addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter-api/target"
addJarInDirForIntp "${ZEPPELIN_HOME}/lib/interpreter"
addJarInDirForIntp "${INTERPRETER_DIR}"

View file

@ -57,13 +57,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zeppelin-interpreter</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-integration-test</artifactId>

View file

@ -44,12 +44,137 @@
<maven.aeither.provider.version>3.0.3</maven.aeither.provider.version>
<wagon.version>1.0</wagon.version>
<jline.version>2.12.1</jline.version>
<atomix.version>3.0.0-rc4</atomix.version>
<atomix.netty.version>4.1.11.Final</atomix.netty.version>
<commons-math3.version>3.1.1</commons-math3.version>
<guava.version>20.0</guava.version>
<commons-lang3.version>3.4</commons-lang3.version>
<!--plugin versions-->
<plugin.shade.version>2.3</plugin.shade.version>
</properties>
<dependencies>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix</artifactId>
<version>${atomix.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-raft</artifactId>
<version>${atomix.version}</version>
</dependency>
<dependency>
<groupId>io.atomix</groupId>
<artifactId>atomix-primary-backup</artifactId>
<version>${atomix.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${atomix.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>${atomix.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${atomix.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${atomix.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${atomix.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<version>${atomix.netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>${atomix.netty.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>${commons-math3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>

View file

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster;
import io.atomix.cluster.messaging.BroadcastService;
import java.util.function.Consumer;
/**
* Broadcast Service Adapter
*/
public class BroadcastServiceAdapter implements BroadcastService {
@Override
public void broadcast(String subject, byte[] message) {
}
@Override
public void addListener(String subject, Consumer<byte[]> listener) {
}
@Override
public void removeListener(String subject, Consumer<byte[]> listener) {
}
}

View file

@ -0,0 +1,527 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster;
import com.google.common.collect.Maps;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.primitive.operation.OperationType;
import io.atomix.primitive.operation.PrimitiveOperation;
import io.atomix.primitive.operation.impl.DefaultOperationId;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.service.ServiceConfig;
import io.atomix.primitive.session.SessionClient;
import io.atomix.primitive.session.SessionId;
import io.atomix.protocols.raft.RaftClient;
import io.atomix.protocols.raft.RaftError;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
import io.atomix.protocols.raft.protocol.QueryRequest;
import io.atomix.protocols.raft.protocol.QueryResponse;
import io.atomix.protocols.raft.protocol.CommandRequest;
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.MetadataRequest;
import io.atomix.protocols.raft.protocol.MetadataResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
import io.atomix.protocols.raft.protocol.JoinResponse;
import io.atomix.protocols.raft.protocol.LeaveRequest;
import io.atomix.protocols.raft.protocol.LeaveResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
import io.atomix.protocols.raft.protocol.ReconfigureRequest;
import io.atomix.protocols.raft.protocol.ReconfigureResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.PollRequest;
import io.atomix.protocols.raft.protocol.PollResponse;
import io.atomix.protocols.raft.protocol.VoteRequest;
import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.protocol.AppendRequest;
import io.atomix.protocols.raft.protocol.AppendResponse;
import io.atomix.protocols.raft.protocol.PublishRequest;
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.protocol.RaftResponse;
import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.session.CommunicationStrategy;
import io.atomix.protocols.raft.storage.system.Configuration;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Namespace;
import io.atomix.utils.serializer.Serializer;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.meta.ClusterMetaEntity;
import org.apache.zeppelin.cluster.meta.ClusterMetaOperation;
import org.apache.zeppelin.cluster.meta.ClusterMetaType;
import org.apache.zeppelin.cluster.protocol.LocalRaftProtocolFactory;
import org.apache.zeppelin.cluster.protocol.RaftClientMessagingProtocol;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.time.Instant;
import java.util.Date;
import java.util.Collections;
import java.util.Map;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static io.atomix.primitive.operation.PrimitiveOperation.operation;
import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.DELETE_OPERATION;
import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.PUT_OPERATION;
import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATION;
/**
* The base class for cluster management, including the following implementations
* 1. RaftClient as the raft client
* 2. Threading to provide retry after cluster metadata submission failure
* 3. Cluster monitoring
*/
public abstract class ClusterManager {
private static Logger logger = LoggerFactory.getLogger(ClusterManager.class);
public final ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
protected Collection<Node> clusterNodes = new ArrayList<>();
// raft
protected static String ZEPL_CLUSTER_ID = "ZEPL-CLUSTER";
protected static String ZEPL_CLIENT_ID = "ZEPL-CLIENT";
protected int raftServerPort = 0;
protected RaftClient raftClient = null;
protected SessionClient raftSessionClient = null;
protected Map<MemberId, Address> raftAddressMap = new ConcurrentHashMap<>();
protected LocalRaftProtocolFactory protocolFactory
= new LocalRaftProtocolFactory(protocolSerializer);
protected List<MessagingService> messagingServices = new ArrayList<>();
protected Collection<MemberId> clusterMemberIds = new ArrayList<MemberId>();
protected AtomicBoolean running = new AtomicBoolean(true);
// Write data through the queue to prevent failure due to network exceptions
private ConcurrentLinkedQueue<ClusterMetaEntity> clusterMetaQueue
= new ConcurrentLinkedQueue<>();
// zeppelin server host & port
protected String zeplServerHost = "";
// atomic dependent netty package path
public static String ATOMIX_NETTY_LIB_PATH = "/lib/atomix-netty";
public static String INTERPRETER_LIB_PATH = "/lib/interpreter";
public ClusterManager() {
try {
zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
String clusterAddr = zconf.getClusterAddress();
if (!StringUtils.isEmpty(clusterAddr)) {
String cluster[] = clusterAddr.split(",");
for (int i = 0; i < cluster.length; i++) {
String[] parts = cluster[i].split(":");
String clusterHost = parts[0];
int clusterPort = Integer.valueOf(parts[1]);
if (zeplServerHost.equalsIgnoreCase(clusterHost)) {
raftServerPort = clusterPort;
}
Node node = Node.builder().withId(cluster[i])
.withAddress(Address.from(clusterHost, clusterPort)).build();
clusterNodes.add(node);
raftAddressMap.put(MemberId.from(cluster[i]), Address.from(clusterHost, clusterPort));
clusterMemberIds.add(MemberId.from(cluster[i]));
}
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (SocketException e) {
e.printStackTrace();
return;
}
}
// Check if the raft environment is initialized
public abstract boolean raftInitialized();
// Is it a cluster leader
public abstract boolean isClusterLeader();
public AtomicBoolean getRunning() {
return running;
}
private SessionClient createProxy(RaftClient client) {
return client.sessionBuilder(ClusterPrimitiveType.PRIMITIVE_NAME,
ClusterPrimitiveType.INSTANCE, new ServiceConfig())
.withReadConsistency(ReadConsistency.SEQUENTIAL)
.withCommunicationStrategy(CommunicationStrategy.LEADER)
.build()
.connect()
.join();
}
public void start() {
if (!zconf.isClusterMode()) {
return;
}
// RaftClient Thread
new Thread(new Runnable() {
@Override
public void run() {
logger.info("RaftClientThread run() >>>");
int raftClientPort = 0;
try {
raftClientPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
} catch (IOException e) {
e.printStackTrace();
}
MemberId memberId = MemberId.from(ZEPL_CLIENT_ID + zeplServerHost + ":" + raftClientPort);
Address address = Address.from(zeplServerHost, raftClientPort);
raftAddressMap.put(memberId, address);
MessagingService messagingManager
= NettyMessagingService.builder().withAddress(address).build().start().join();
RaftClientProtocol protocol = new RaftClientMessagingProtocol(
messagingManager, protocolSerializer, raftAddressMap::get);
raftClient = RaftClient.builder()
.withMemberId(memberId)
.withPartitionId(PartitionId.from("partition", 1))
.withProtocol(protocol)
.build();
raftClient.connect(clusterMemberIds).join();
raftSessionClient = createProxy(raftClient);
logger.info("RaftClientThread run() <<<");
}
}).start();
// Cluster Meta Consume Thread
new Thread(new Runnable() {
@Override
public void run() {
try {
while (getRunning().get()) {
ClusterMetaEntity metaEntity = clusterMetaQueue.peek();
if (null != metaEntity) {
// Determine whether the client is connected
int retry = 0;
while (!raftInitialized()) {
retry++;
if (0 == retry % 30) {
logger.error("Raft incomplete initialization! retry[{}]", retry);
}
Thread.sleep(100);
}
boolean success = false;
switch (metaEntity.getOperation()) {
case DELETE_OPERATION:
success = deleteClusterMeta(metaEntity);
break;
case PUT_OPERATION:
success = putClusterMeta(metaEntity);
break;
}
if (true == success) {
// The operation was successfully deleted
clusterMetaQueue.remove(metaEntity);
} else {
logger.error("Cluster Meta Consume faild!");
}
} else {
Thread.sleep(100);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// cluster shutdown
public void shutdown() {
if (!zconf.isClusterMode()) {
return;
}
running.set(false);
try {
if (null != raftSessionClient) {
raftSessionClient.close().get(3, TimeUnit.SECONDS);
}
if (null != raftClient) {
raftClient.close().get(3, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public String getClusterName() {
return zeplServerHost + ":" + raftServerPort;
}
// put metadata into cluster metadata
private boolean putClusterMeta(ClusterMetaEntity entity) {
if (!raftInitialized()) {
logger.error("Raft incomplete initialization!");
return false;
}
ClusterMetaType metaType = entity.getMetaType();
String metaKey = entity.getKey();
HashMap<String, Object> newMetaValue = entity.getValues();
if (logger.isDebugEnabled()) {
logger.debug("putClusterMeta {} {}", metaType, metaKey);
}
// add cluster name
newMetaValue.put(ClusterMeta.SERVER_HOST, zeplServerHost);
newMetaValue.put(ClusterMeta.SERVER_PORT, raftServerPort);
raftSessionClient.execute(operation(ClusterStateMachine.PUT,
clientSerializer.encode(entity)))
.<Long>thenApply(clientSerializer::decode);
return true;
}
// put metadata into cluster metadata
public void putClusterMeta(ClusterMetaType type, String key, HashMap<String, Object> values) {
ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, key, values);
boolean result = putClusterMeta(metaEntity);
if (false == result) {
logger.warn("putClusterMeta failure, Cache metadata to queue.");
clusterMetaQueue.add(metaEntity);
}
}
// delete metadata by cluster metadata
private boolean deleteClusterMeta(ClusterMetaEntity entity) {
ClusterMetaType metaType = entity.getMetaType();
String metaKey = entity.getKey();
// Need to pay attention to delete metadata operations
logger.info("deleteClusterMeta {} {}", metaType, metaKey);
if (!raftInitialized()) {
logger.error("Raft incomplete initialization!");
return false;
}
raftSessionClient.execute(operation(
ClusterStateMachine.REMOVE,
clientSerializer.encode(entity)))
.<Long>thenApply(clientSerializer::decode)
.thenAccept(result -> {
logger.info("deleteClusterMeta {}", result);
});
return true;
}
// delete metadata from cluster metadata
public void deleteClusterMeta(ClusterMetaType type, String key) {
ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, type, key, null);
//clusterMetaQueue.add(metaEntity);
boolean result = deleteClusterMeta(metaEntity);
if (false == result) {
logger.warn("deleteClusterMeta faild, Cache data to queue.");
clusterMetaQueue.add(metaEntity);
}
}
// get metadata by cluster metadata
public HashMap<String, HashMap<String, Object>> getClusterMeta(
ClusterMetaType metaType, String metaKey) {
HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>();
if (!raftInitialized()) {
logger.error("Raft incomplete initialization!");
return clusterMeta;
}
ClusterMetaEntity entity = new ClusterMetaEntity(GET_OPERATION, metaType, metaKey, null);
byte[] mateData = null;
try {
mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET,
clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
if (null != mateData) {
clusterMeta = clientSerializer.decode(mateData);
}
logger.info("getClusterMeta >>> {}", clusterMeta.toString());
if (logger.isDebugEnabled()) {
logger.debug("getClusterMeta >>> {}", clusterMeta.toString());
}
return clusterMeta;
}
protected static final Serializer protocolSerializer = Serializer.using(Namespace.builder()
.register(OpenSessionRequest.class)
.register(OpenSessionResponse.class)
.register(CloseSessionRequest.class)
.register(CloseSessionResponse.class)
.register(KeepAliveRequest.class)
.register(KeepAliveResponse.class)
.register(QueryRequest.class)
.register(QueryResponse.class)
.register(CommandRequest.class)
.register(CommandResponse.class)
.register(MetadataRequest.class)
.register(MetadataResponse.class)
.register(JoinRequest.class)
.register(JoinResponse.class)
.register(LeaveRequest.class)
.register(LeaveResponse.class)
.register(ConfigureRequest.class)
.register(ConfigureResponse.class)
.register(ReconfigureRequest.class)
.register(ReconfigureResponse.class)
.register(InstallRequest.class)
.register(InstallResponse.class)
.register(PollRequest.class)
.register(PollResponse.class)
.register(VoteRequest.class)
.register(VoteResponse.class)
.register(AppendRequest.class)
.register(AppendResponse.class)
.register(PublishRequest.class)
.register(ResetRequest.class)
.register(RaftResponse.Status.class)
.register(RaftError.class)
.register(RaftError.Type.class)
.register(PrimitiveOperation.class)
.register(ReadConsistency.class)
.register(byte[].class)
.register(long[].class)
.register(CloseSessionEntry.class)
.register(CommandEntry.class)
.register(ConfigurationEntry.class)
.register(InitializeEntry.class)
.register(KeepAliveEntry.class)
.register(MetadataEntry.class)
.register(OpenSessionEntry.class)
.register(QueryEntry.class)
.register(PrimitiveOperation.class)
.register(DefaultOperationId.class)
.register(OperationType.class)
.register(ReadConsistency.class)
.register(ArrayList.class)
.register(HashMap.class)
.register(ClusterMetaEntity.class)
.register(Date.class)
.register(Collections.emptyList().getClass())
.register(HashSet.class)
.register(DefaultRaftMember.class)
.register(MemberId.class)
.register(SessionId.class)
.register(RaftMember.Type.class)
.register(Instant.class)
.register(Configuration.class)
.build());
protected static final Serializer storageSerializer = Serializer.using(Namespace.builder()
.register(CloseSessionEntry.class)
.register(CommandEntry.class)
.register(ConfigurationEntry.class)
.register(InitializeEntry.class)
.register(KeepAliveEntry.class)
.register(MetadataEntry.class)
.register(OpenSessionEntry.class)
.register(QueryEntry.class)
.register(PrimitiveOperation.class)
.register(DefaultOperationId.class)
.register(OperationType.class)
.register(ReadConsistency.class)
.register(ArrayList.class)
.register(ClusterMetaEntity.class)
.register(HashMap.class)
.register(HashSet.class)
.register(Date.class)
.register(DefaultRaftMember.class)
.register(MemberId.class)
.register(RaftMember.Type.class)
.register(Instant.class)
.register(Configuration.class)
.register(byte[].class)
.register(long[].class)
.build());
protected static final Serializer clientSerializer = Serializer.using(Namespace.builder()
.register(ReadConsistency.class)
.register(ClusterMetaEntity.class)
.register(ClusterMetaOperation.class)
.register(ClusterMetaType.class)
.register(HashMap.class)
.register(Date.class)
.register(Maps.immutableEntry(new String(), new Object()).getClass())
.build());
}

View file

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster;
import io.atomix.primitive.PrimitiveBuilder;
import io.atomix.primitive.PrimitiveManagementService;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.config.PrimitiveConfig;
import io.atomix.primitive.service.PrimitiveService;
import io.atomix.primitive.service.ServiceConfig;
/**
* Cluster primitive type
*/
public class ClusterPrimitiveType implements PrimitiveType {
public static final ClusterPrimitiveType INSTANCE = new ClusterPrimitiveType();
public static final String PRIMITIVE_NAME = "CLUSTER_PRIMITIVE";
@Override
public String name() {
return PRIMITIVE_NAME;
}
@Override
public PrimitiveConfig newConfig() {
throw new UnsupportedOperationException();
}
@Override
public PrimitiveBuilder newBuilder(String primitiveName,
PrimitiveConfig config,
PrimitiveManagementService managementService) {
throw new UnsupportedOperationException();
}
@Override
public PrimitiveService newService(ServiceConfig config) {
return new ClusterStateMachine();
}
}

View file

@ -0,0 +1,176 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster;
import com.google.common.collect.Maps;
import io.atomix.primitive.operation.OperationId;
import io.atomix.primitive.service.AbstractPrimitiveService;
import io.atomix.primitive.service.BackupOutput;
import io.atomix.primitive.service.BackupInput;
import io.atomix.primitive.service.Commit;
import io.atomix.primitive.service.ServiceExecutor;
import io.atomix.utils.serializer.Serializer;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
import org.apache.zeppelin.cluster.meta.ClusterMetaEntity;
import org.apache.zeppelin.cluster.meta.ClusterMetaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
/**
* Cluster State Machine
*/
public class ClusterStateMachine extends AbstractPrimitiveService {
private static Logger logger = LoggerFactory.getLogger(ClusterStateMachine.class);
private ClusterMeta clusterMeta = new ClusterMeta();
// Command to operation a variable in cluster state machine
public static final OperationId PUT = OperationId.command("put");
public static final OperationId GET = OperationId.query("get");
public static final OperationId REMOVE = OperationId.command("remove");
public static final OperationId INDEX = OperationId.command("index");
public ClusterStateMachine() {
super(ClusterPrimitiveType.INSTANCE);
}
@Override
public Serializer serializer() {
return ClusterManager.clientSerializer;
}
@Override
protected void configure(ServiceExecutor executor) {
executor.register(PUT, this::put);
executor.register(GET, this::get);
executor.register(REMOVE, this::remove);
executor.register(INDEX, this::index);
}
protected long put(Commit<ClusterMetaEntity> commit) {
clusterMeta.put(commit.value().getMetaType(),
commit.value().getKey(), commit.value().getValues());
return commit.index();
}
protected Map<String, Map<String, Object>> get(Commit<ClusterMetaEntity> commit) {
return clusterMeta.get(commit.value().getMetaType(), commit.value().getKey());
}
protected long remove(Commit<ClusterMetaEntity> commit) {
clusterMeta.remove(commit.value().getMetaType(), commit.value().getKey());
return commit.index();
}
protected long index(Commit<Void> commit) {
return commit.index();
}
@Override
public void backup(BackupOutput writer) {
if (logger.isDebugEnabled()) {
logger.debug("ClusterStateMachine.backup()");
}
// backup ServerMeta
// cluster meta map struct
// cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
Map<String, Map<String, Object>> mapServerMeta
= clusterMeta.get(ClusterMetaType.ServerMeta, "");
// write all ServerMeta size
writer.writeInt(mapServerMeta.size());
for (Map.Entry<String, Map<String, Object>> entry : mapServerMeta.entrySet()) {
// write cluster_name
writer.writeString(entry.getKey());
Map<String, Object> kvPairs = entry.getValue();
// write cluster mate kv pairs size
writer.writeInt(kvPairs.size());
for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) {
// write cluster mate kv pairs
writer.writeString(entryValue.getKey());
writer.writeObject(entryValue.getValue());
}
}
// backup IntpProcessMeta
// Interpreter meta map struct
// IntpGroupId -> {server_tserver_host,server_tserver_port,...}
Map<String, Map<String, Object>> mapIntpProcMeta
= clusterMeta.get(ClusterMetaType.IntpProcessMeta, "");
// write interpreter size
writer.writeInt(mapIntpProcMeta.size());
for (Map.Entry<String, Map<String, Object>> entry : mapIntpProcMeta.entrySet()) {
// write IntpGroupId
writer.writeString(entry.getKey());
Map<String, Object> kvPairs = entry.getValue();
// write interpreter mate kv pairs size
writer.writeInt(kvPairs.size());
for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) {
// write interpreter mate kv pairs
writer.writeString(entryValue.getKey());
writer.writeObject(entryValue.getValue());
}
}
}
@Override
public void restore(BackupInput reader) {
if (logger.isDebugEnabled()) {
logger.debug("ClusterStateMachine.restore()");
}
clusterMeta = new ClusterMeta();
// read all ServerMeta size
int nServerMeta = reader.readInt();
for (int i = 0; i < nServerMeta; i++) {
// read cluster_name
String clusterName = reader.readString();
// read cluster mate kv pairs size
int nKVpairs = reader.readInt();
for (int j = 0; j < nKVpairs; i++) {
// read cluster mate kv pairs
String key = reader.readString();
Object value = reader.readObject();
clusterMeta.put(ClusterMetaType.ServerMeta,
clusterName, Maps.immutableEntry(key, value));
}
}
// read all IntpProcessMeta size
int nIntpMeta = reader.readInt();
for (int i = 0; i < nIntpMeta; i++) {
// read interpreter name
String intpName = reader.readString();
// read interpreter mate kv pairs size
int nKVpairs = reader.readInt();
for (int j = 0; j < nKVpairs; i++) {
// read interpreter mate kv pairs
String key = reader.readString();
Object value = reader.readObject();
clusterMeta.put(ClusterMetaType.IntpProcessMeta,
intpName, Maps.immutableEntry(key, value));
}
}
}
}

View file

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.listener;
import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Zeppelin ClusterMembershipEventListener
*/
public class ZeppelinClusterMembershipEventListener implements ClusterMembershipEventListener {
private static Logger logger
= LoggerFactory.getLogger(ZeppelinClusterMembershipEventListener.class);
@Override
public void event(ClusterMembershipEvent event) {
switch (event.type()) {
case MEMBER_ADDED:
logger.info(event.subject().id() + " joined the cluster.");
break;
case MEMBER_REMOVED:
logger.info(event.subject().id() + " left the cluster.");
break;
case METADATA_CHANGED:
logger.info(event.subject().id() + " meta data changed.");
break;
case REACHABILITY_CHANGED:
logger.info(event.subject().id() + " reachability changed.");
break;
}
}
}

View file

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.meta;
import com.google.gson.Gson;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
/**
* Metadata stores metadata information in a KV key-value pair
*/
public class ClusterMeta implements Serializable {
private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class);
// zeppelin-server meta
public static String SERVER_HOST = "SERVER_HOST";
public static String SERVER_PORT = "SERVER_PORT";
public static String SERVER_TSERVER_HOST = "SERVER_TSERVER_HOST";
public static String SERVER_TSERVER_PORT = "SERVER_TSERVER_PORT";
public static String SERVER_START_TIME = "SERVER_START_TIME";
// interperter-process meta
public static String INTP_TSERVER_HOST = "INTP_TSERVER_HOST";
public static String INTP_TSERVER_PORT = "INTP_TSERVER_PORT";
public static String INTP_START_TIME = "INTP_START_TIME";
// zeppelin-server resource usage
public static String CPU_CAPACITY = "CPU_CAPACITY";
public static String CPU_USED = "CPU_USED";
public static String MEMORY_CAPACITY = "MEMORY_CAPACITY";
public static String MEMORY_USED = "MEMORY_USED";
public static String HEARTBEAT = "HEARTBEAT";
// zeppelin-server or interperter-process status
public static String STATUS = "STATUS";
public static String ONLINE_STATUS = "ONLINE";
public static String OFFLINE_STATUS = "OFFLINE";
// cluster_name = host:port
// Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
private Map<String, Map<String, Object>> mapServerMeta = new HashMap<>();
// Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...}
private Map<String, Map<String, Object>> mapInterpreterMeta = new HashMap<>();
public static Gson gson = new Gson();
public void put(ClusterMetaType type, String key, Object value) {
Map<String, Object> mapValue = (Map<String, Object>) value;
switch (type) {
case ServerMeta:
// Because it may be partially updated metadata information
if (mapServerMeta.containsKey(key)) {
Map<String, Object> values = mapServerMeta.get(key);
values.putAll(mapValue);
} else {
mapServerMeta.put(key, mapValue);
}
break;
case IntpProcessMeta:
if (mapInterpreterMeta.containsKey(key)) {
Map<String, Object> values = mapInterpreterMeta.get(key);
values.putAll(mapValue);
} else {
mapInterpreterMeta.put(key, mapValue);
}
break;
}
}
public Map<String, Map<String, Object>> get(ClusterMetaType type, String key) {
Map<String, Object> values = null;
switch (type) {
case ServerMeta:
if (null == key || StringUtils.isEmpty(key)) {
return mapServerMeta;
}
if (mapServerMeta.containsKey(key)) {
values = mapServerMeta.get(key);
} else {
logger.warn("can not find key : {}", key);
}
break;
case IntpProcessMeta:
if (null == key || StringUtils.isEmpty(key)) {
return mapInterpreterMeta;
}
if (mapInterpreterMeta.containsKey(key)) {
values = mapInterpreterMeta.get(key);
} else {
logger.warn("can not find key : {}", key);
}
break;
}
Map<String, Map<String, Object>> result = new HashMap<>();
result.put(key, values);
return result;
}
public Map<String, Object> remove(ClusterMetaType type, String key) {
switch (type) {
case ServerMeta:
if (mapServerMeta.containsKey(key)) {
return mapServerMeta.remove(key);
} else {
logger.warn("can not find key : {}", key);
}
break;
case IntpProcessMeta:
if (mapInterpreterMeta.containsKey(key)) {
return mapInterpreterMeta.remove(key);
} else {
logger.warn("can not find key : {}", key);
}
break;
}
return null;
}
}

View file

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.meta;
import java.io.Serializable;
import java.util.HashMap;
/**
* Cluster operations, cluster types, encapsulation objects for keys and values
*/
public class ClusterMetaEntity implements Serializable {
private ClusterMetaOperation operation;
private ClusterMetaType type;
private String key;
private HashMap<String, Object> values = new HashMap<>();
public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type,
String key, HashMap<String, Object> values) {
this.operation = operation;
this.type = type;
this.key = key;
if (null != values) {
this.values.putAll(values);
}
}
public ClusterMetaOperation getOperation() {
return operation;
}
public ClusterMetaType getMetaType() {
return type;
}
public String getKey() {
return key;
}
public HashMap<String, Object> getValues() {
return values;
}
}

View file

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.meta;
/**
* Type of cluster metadata operation
*/
public enum ClusterMetaOperation {
GET_OPERATION,
PUT_OPERATION,
DELETE_OPERATION
}

View file

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.meta;
/**
* Type of cluster metadata
*/
public enum ClusterMetaType {
ServerMeta,
IntpProcessMeta
}

View file

@ -0,0 +1,162 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.protocol;
import com.google.common.collect.Maps;
import io.atomix.cluster.MemberId;
import io.atomix.primitive.session.SessionId;
import io.atomix.protocols.raft.protocol.HeartbeatRequest;
import io.atomix.protocols.raft.protocol.PublishRequest;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
import io.atomix.protocols.raft.protocol.QueryResponse;
import io.atomix.protocols.raft.protocol.QueryRequest;
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.CommandRequest;
import io.atomix.protocols.raft.protocol.MetadataResponse;
import io.atomix.protocols.raft.protocol.MetadataRequest;
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.serializer.Serializer;
import java.net.ConnectException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Cluster Raft client protocol.
*/
public class LocalRaftClientProtocol extends LocalRaftProtocol implements RaftClientProtocol {
private Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> heartbeatHandler;
private final Map<Long, Consumer<PublishRequest>> publishListeners = Maps.newConcurrentMap();
public LocalRaftClientProtocol(MemberId memberId,
Serializer serializer,
Map<MemberId, LocalRaftServerProtocol> servers,
Map<MemberId, LocalRaftClientProtocol> clients) {
super(serializer, servers, clients);
clients.put(memberId, this);
}
private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) {
LocalRaftServerProtocol server = server(memberId);
if (server != null) {
return Futures.completedFuture(server);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
OpenSessionRequest request) {
return getServer(memberId).thenCompose(protocol ->
protocol.openSession(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
CloseSessionRequest request) {
return getServer(memberId).thenCompose(protocol ->
protocol.closeSession(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
KeepAliveRequest request) {
return getServer(memberId).thenCompose(protocol ->
protocol.keepAlive(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
return getServer(memberId).thenCompose(protocol ->
protocol.query(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<CommandResponse> command(MemberId memberId,
CommandRequest request) {
return getServer(memberId).thenCompose(protocol ->
protocol.command(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
MetadataRequest request) {
return getServer(memberId).thenCompose(protocol ->
protocol.metadata(encode(request))).thenApply(this::decode);
}
CompletableFuture<byte[]> heartbeat(byte[] request) {
if (heartbeatHandler != null) {
return heartbeatHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerHeartbeatHandler(Function<HeartbeatRequest,
CompletableFuture<HeartbeatResponse>> handler) {
this.heartbeatHandler = handler;
}
@Override
public void unregisterHeartbeatHandler() {
this.heartbeatHandler = null;
}
@Override
public void reset(Set<MemberId> members, ResetRequest request) {
members.forEach(nodeId -> {
LocalRaftServerProtocol server = server(nodeId);
if (server != null) {
server.reset(request.session(), encode(request));
}
});
}
void publish(long sessionId, byte[] request) {
Consumer<PublishRequest> listener = publishListeners.get(sessionId);
if (listener != null) {
listener.accept(decode(request));
}
}
@Override
public void registerPublishListener(SessionId sessionId,
Consumer<PublishRequest> listener, Executor executor) {
publishListeners.put(sessionId.id(), request ->
executor.execute(() -> listener.accept(request)));
}
@Override
public void unregisterPublishListener(SessionId sessionId) {
publishListeners.remove(sessionId.id());
}
}

View file

@ -0,0 +1,58 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.protocol;
import io.atomix.cluster.MemberId;
import io.atomix.utils.serializer.Serializer;
import java.util.Map;
/**
* Base class for Raft protocol.
*/
public abstract class LocalRaftProtocol {
private final Serializer serializer;
private final Map<MemberId, LocalRaftServerProtocol> servers;
private final Map<MemberId, LocalRaftClientProtocol> clients;
public LocalRaftProtocol(Serializer serializer,
Map<MemberId, LocalRaftServerProtocol> servers,
Map<MemberId, LocalRaftClientProtocol> clients) {
this.serializer = serializer;
this.servers = servers;
this.clients = clients;
}
<T> T copy(T value) {
return serializer.decode(serializer.encode(value));
}
byte[] encode(Object value) {
return serializer.encode(value);
}
<T> T decode(byte[] bytes) {
return serializer.decode(bytes);
}
LocalRaftServerProtocol server(MemberId memberId) {
return servers.get(memberId);
}
LocalRaftClientProtocol client(MemberId memberId) {
return clients.get(memberId);
}
}

View file

@ -0,0 +1,57 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.protocol;
import com.google.common.collect.Maps;
import io.atomix.cluster.MemberId;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.utils.serializer.Serializer;
import java.util.Map;
/**
* Cluster Raft protocol factory.
*/
public class LocalRaftProtocolFactory {
private final Serializer serializer;
private final Map<MemberId, LocalRaftServerProtocol> servers = Maps.newConcurrentMap();
private final Map<MemberId, LocalRaftClientProtocol> clients = Maps.newConcurrentMap();
public LocalRaftProtocolFactory(Serializer serializer) {
this.serializer = serializer;
}
/**
* Returns a new test client protocol.
*
* @param memberId the client member identifier
* @return a new test client protocol
*/
public RaftClientProtocol newClientProtocol(MemberId memberId) {
return new LocalRaftClientProtocol(memberId, serializer, servers, clients);
}
/**
* Returns a new test server protocol.
*
* @param memberId the server member identifier
* @return a new test server protocol
*/
public RaftServerProtocol newServerProtocol(MemberId memberId) {
return new LocalRaftServerProtocol(memberId, serializer, servers, clients);
}
}

View file

@ -0,0 +1,527 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.protocol;
import com.google.common.collect.Maps;
import io.atomix.cluster.MemberId;
import io.atomix.primitive.session.SessionId;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
import io.atomix.protocols.raft.protocol.QueryRequest;
import io.atomix.protocols.raft.protocol.QueryResponse;
import io.atomix.protocols.raft.protocol.CommandRequest;
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.MetadataRequest;
import io.atomix.protocols.raft.protocol.MetadataResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
import io.atomix.protocols.raft.protocol.JoinResponse;
import io.atomix.protocols.raft.protocol.LeaveRequest;
import io.atomix.protocols.raft.protocol.LeaveResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
import io.atomix.protocols.raft.protocol.ReconfigureRequest;
import io.atomix.protocols.raft.protocol.ReconfigureResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.PollRequest;
import io.atomix.protocols.raft.protocol.PollResponse;
import io.atomix.protocols.raft.protocol.VoteRequest;
import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.protocol.TransferRequest;
import io.atomix.protocols.raft.protocol.TransferResponse;
import io.atomix.protocols.raft.protocol.AppendRequest;
import io.atomix.protocols.raft.protocol.AppendResponse;
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.protocol.PublishRequest;
import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.HeartbeatRequest;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.serializer.Serializer;
import java.net.ConnectException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Cluster server protocol.
*/
public class LocalRaftServerProtocol extends LocalRaftProtocol implements RaftServerProtocol {
private Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> openSessionHandler;
private Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>>
closeSessionHandler;
private Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> keepAliveHandler;
private Function<QueryRequest, CompletableFuture<QueryResponse>> queryHandler;
private Function<CommandRequest, CompletableFuture<CommandResponse>> commandHandler;
private Function<MetadataRequest, CompletableFuture<MetadataResponse>> metadataHandler;
private Function<JoinRequest, CompletableFuture<JoinResponse>> joinHandler;
private Function<LeaveRequest, CompletableFuture<LeaveResponse>> leaveHandler;
private Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> configureHandler;
private Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> reconfigureHandler;
private Function<InstallRequest, CompletableFuture<InstallResponse>> installHandler;
private Function<PollRequest, CompletableFuture<PollResponse>> pollHandler;
private Function<VoteRequest, CompletableFuture<VoteResponse>> voteHandler;
private Function<TransferRequest, CompletableFuture<TransferResponse>> transferHandler;
private Function<AppendRequest, CompletableFuture<AppendResponse>> appendHandler;
private final Map<Long, Consumer<ResetRequest>> resetListeners = Maps.newConcurrentMap();
public LocalRaftServerProtocol(MemberId memberId, Serializer serializer,
Map<MemberId, LocalRaftServerProtocol> servers,
Map<MemberId, LocalRaftClientProtocol> clients) {
super(serializer, servers, clients);
servers.put(memberId, this);
}
private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) {
LocalRaftServerProtocol server = server(memberId);
if (server != null) {
return Futures.completedFuture(server);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
private CompletableFuture<LocalRaftClientProtocol> getClient(MemberId memberId) {
LocalRaftClientProtocol client = client(memberId);
if (client != null) {
return Futures.completedFuture(client);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
OpenSessionRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.openSession(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
CloseSessionRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.closeSession(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
KeepAliveRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.keepAlive(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.query(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<CommandResponse> command(MemberId memberId,
CommandRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.command(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
MetadataRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.metadata(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.join(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.leave(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<ConfigureResponse> configure(MemberId memberId,
ConfigureRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.configure(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId,
ReconfigureRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.reconfigure(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.install(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.install(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.poll(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.vote(encode(request))).thenApply(this::decode);
}
@Override
public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
return getServer(memberId).thenCompose(listener ->
listener.append(encode(request))).thenApply(this::decode);
}
@Override
public void publish(MemberId memberId, PublishRequest request) {
getClient(memberId).thenAccept(protocol ->
protocol.publish(request.session(), encode(request)));
}
@Override
public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId,
HeartbeatRequest request) {
return getClient(memberId).thenCompose(protocol ->
protocol.heartbeat(encode(request))).thenApply(this::decode);
}
CompletableFuture<byte[]> openSession(byte[] request) {
if (openSessionHandler != null) {
return openSessionHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerOpenSessionHandler(Function<OpenSessionRequest,
CompletableFuture<OpenSessionResponse>> handler) {
this.openSessionHandler = handler;
}
@Override
public void unregisterOpenSessionHandler() {
this.openSessionHandler = null;
}
CompletableFuture<byte[]> closeSession(byte[] request) {
if (closeSessionHandler != null) {
return closeSessionHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerCloseSessionHandler(Function<CloseSessionRequest,
CompletableFuture<CloseSessionResponse>> handler) {
this.closeSessionHandler = handler;
}
@Override
public void unregisterCloseSessionHandler() {
this.closeSessionHandler = null;
}
CompletableFuture<byte[]> keepAlive(byte[] request) {
if (keepAliveHandler != null) {
return keepAliveHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerKeepAliveHandler(Function<KeepAliveRequest,
CompletableFuture<KeepAliveResponse>> handler) {
this.keepAliveHandler = handler;
}
@Override
public void unregisterKeepAliveHandler() {
this.keepAliveHandler = null;
}
CompletableFuture<byte[]> query(byte[] request) {
if (queryHandler != null) {
return queryHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerQueryHandler(Function<QueryRequest,
CompletableFuture<QueryResponse>> handler) {
this.queryHandler = handler;
}
@Override
public void unregisterQueryHandler() {
this.queryHandler = null;
}
CompletableFuture<byte[]> command(byte[] request) {
if (commandHandler != null) {
return commandHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerCommandHandler(Function<CommandRequest,
CompletableFuture<CommandResponse>> handler) {
this.commandHandler = handler;
}
@Override
public void unregisterCommandHandler() {
this.commandHandler = null;
}
CompletableFuture<byte[]> metadata(byte[] request) {
if (metadataHandler != null) {
return metadataHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerMetadataHandler(Function<MetadataRequest,
CompletableFuture<MetadataResponse>> handler) {
this.metadataHandler = handler;
}
@Override
public void unregisterMetadataHandler() {
this.metadataHandler = null;
}
CompletableFuture<byte[]> join(byte[] request) {
if (joinHandler != null) {
return joinHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerJoinHandler(Function<JoinRequest,
CompletableFuture<JoinResponse>> handler) {
this.joinHandler = handler;
}
@Override
public void unregisterJoinHandler() {
this.joinHandler = null;
}
CompletableFuture<byte[]> leave(byte[] request) {
if (leaveHandler != null) {
return leaveHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerLeaveHandler(Function<LeaveRequest,
CompletableFuture<LeaveResponse>> handler) {
this.leaveHandler = handler;
}
@Override
public void unregisterLeaveHandler() {
this.leaveHandler = null;
}
CompletableFuture<byte[]> configure(byte[] request) {
if (configureHandler != null) {
return configureHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerConfigureHandler(Function<ConfigureRequest,
CompletableFuture<ConfigureResponse>> handler) {
this.configureHandler = handler;
}
@Override
public void unregisterConfigureHandler() {
this.configureHandler = null;
}
CompletableFuture<byte[]> reconfigure(byte[] request) {
if (reconfigureHandler != null) {
return reconfigureHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerReconfigureHandler(Function<ReconfigureRequest,
CompletableFuture<ReconfigureResponse>> handler) {
this.reconfigureHandler = handler;
}
@Override
public void unregisterReconfigureHandler() {
this.reconfigureHandler = null;
}
CompletableFuture<byte[]> install(byte[] request) {
if (installHandler != null) {
return installHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerInstallHandler(Function<InstallRequest,
CompletableFuture<InstallResponse>> handler) {
this.installHandler = handler;
}
@Override
public void unregisterInstallHandler() {
this.installHandler = null;
}
CompletableFuture<byte[]> poll(byte[] request) {
if (pollHandler != null) {
return pollHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerPollHandler(Function<PollRequest,
CompletableFuture<PollResponse>> handler) {
this.pollHandler = handler;
}
@Override
public void unregisterPollHandler() {
this.pollHandler = null;
}
CompletableFuture<byte[]> vote(byte[] request) {
if (voteHandler != null) {
return voteHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerVoteHandler(Function<VoteRequest,
CompletableFuture<VoteResponse>> handler) {
this.voteHandler = handler;
}
@Override
public void unregisterVoteHandler() {
this.voteHandler = null;
}
@Override
public void registerTransferHandler(Function<TransferRequest,
CompletableFuture<TransferResponse>> handler) {
this.transferHandler = handler;
}
@Override
public void unregisterTransferHandler() {
this.transferHandler = null;
}
CompletableFuture<byte[]> transfer(byte[] request) {
if (transferHandler != null) {
return transferHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
CompletableFuture<byte[]> append(byte[] request) {
if (appendHandler != null) {
return appendHandler.apply(decode(request)).thenApply(this::encode);
} else {
return Futures.exceptionalFuture(new ConnectException());
}
}
@Override
public void registerAppendHandler(Function<AppendRequest,
CompletableFuture<AppendResponse>> handler) {
this.appendHandler = handler;
}
@Override
public void unregisterAppendHandler() {
this.appendHandler = null;
}
void reset(long sessionId, byte[] request) {
Consumer<ResetRequest> listener = resetListeners.get(sessionId);
if (listener != null) {
listener.accept(decode(request));
}
}
@Override
public void registerResetListener(SessionId sessionId,
Consumer<ResetRequest> listener, Executor executor) {
resetListeners.put(sessionId.id(), request -> executor.execute(()
-> listener.accept(request)));
}
@Override
public void unregisterResetListener(SessionId sessionId) {
resetListeners.remove(sessionId.id());
}
}

View file

@ -0,0 +1,123 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.protocol;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.primitive.session.SessionId;
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
import io.atomix.protocols.raft.protocol.HeartbeatRequest;
import io.atomix.protocols.raft.protocol.PublishRequest;
import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
import io.atomix.protocols.raft.protocol.QueryResponse;
import io.atomix.protocols.raft.protocol.QueryRequest;
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.CommandRequest;
import io.atomix.protocols.raft.protocol.MetadataResponse;
import io.atomix.protocols.raft.protocol.MetadataRequest;
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Serializer;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Raft client messaging service protocol.
*/
public class RaftClientMessagingProtocol extends RaftMessagingProtocol
implements RaftClientProtocol {
public RaftClientMessagingProtocol(MessagingService messagingService,
Serializer serializer,
Function<MemberId, Address> addressProvider) {
super(messagingService, serializer, addressProvider);
}
@Override
public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
OpenSessionRequest request) {
return sendAndReceive(memberId, "open-session", request);
}
@Override
public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
CloseSessionRequest request) {
return sendAndReceive(memberId, "close-session", request);
}
@Override
public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
KeepAliveRequest request) {
return sendAndReceive(memberId, "keep-alive", request);
}
@Override
public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
return sendAndReceive(memberId, "query", request);
}
@Override
public CompletableFuture<CommandResponse> command(MemberId memberId,
CommandRequest request) {
return sendAndReceive(memberId, "command", request);
}
@Override
public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
MetadataRequest request) {
return sendAndReceive(memberId, "metadata", request);
}
@Override
public void registerHeartbeatHandler(Function<HeartbeatRequest,
CompletableFuture<HeartbeatResponse>> handler) {
registerHandler("heartbeat", handler);
}
@Override
public void unregisterHeartbeatHandler() {
unregisterHandler("heartbeat");
}
@Override
public void reset(Set<MemberId> members, ResetRequest request) {
for (MemberId memberId : members) {
sendAsync(memberId, String.format("reset-%d", request.session()), request);
}
}
@Override
public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener,
Executor executor) {
messagingService.registerHandler(String.format("publish-%d", sessionId.id()), (e, p) -> {
listener.accept(serializer.decode(p));
}, executor);
}
@Override
public void unregisterPublishListener(SessionId sessionId) {
messagingService.unregisterHandler(String.format("publish-%d", sessionId.id()));
}
}

View file

@ -0,0 +1,83 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.protocol;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Serializer;
import java.net.ConnectException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
/**
* Messaging service based Raft protocol.
*/
public abstract class RaftMessagingProtocol {
protected final MessagingService messagingService;
protected final Serializer serializer;
private final Function<MemberId, Address> addressProvider;
public RaftMessagingProtocol(MessagingService messagingService,
Serializer serializer,
Function<MemberId, Address> addressProvider) {
this.messagingService = messagingService;
this.serializer = serializer;
this.addressProvider = addressProvider;
}
protected Address address(MemberId memberId) {
return addressProvider.apply(memberId);
}
protected <T, U> CompletableFuture<U> sendAndReceive(MemberId memberId,
String type, T request) {
Address address = address(memberId);
if (address == null) {
return Futures.exceptionalFuture(new ConnectException());
}
return messagingService.sendAndReceive(address, type, serializer.encode(request))
.thenApply(serializer::decode);
}
protected CompletableFuture<Void> sendAsync(MemberId memberId, String type, Object request) {
Address address = address(memberId);
if (address != null) {
return messagingService.sendAsync(address(memberId), type, serializer.encode(request));
}
return CompletableFuture.completedFuture(null);
}
protected <T, U> void registerHandler(String type, Function<T, CompletableFuture<U>> handler) {
messagingService.registerHandler(type, (e, p) -> {
CompletableFuture<byte[]> future = new CompletableFuture<>();
handler.apply(serializer.decode(p)).whenComplete((result, error) -> {
if (error == null) {
future.complete(serializer.encode(result));
} else {
future.completeExceptionally(error);
}
});
return future;
});
}
protected void unregisterHandler(String type) {
messagingService.unregisterHandler(type);
}
}

View file

@ -0,0 +1,346 @@
/*
* Copyright 2017-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.cluster.protocol;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.primitive.session.SessionId;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
import io.atomix.protocols.raft.protocol.QueryRequest;
import io.atomix.protocols.raft.protocol.QueryResponse;
import io.atomix.protocols.raft.protocol.CommandRequest;
import io.atomix.protocols.raft.protocol.CommandResponse;
import io.atomix.protocols.raft.protocol.MetadataRequest;
import io.atomix.protocols.raft.protocol.MetadataResponse;
import io.atomix.protocols.raft.protocol.JoinRequest;
import io.atomix.protocols.raft.protocol.JoinResponse;
import io.atomix.protocols.raft.protocol.LeaveRequest;
import io.atomix.protocols.raft.protocol.LeaveResponse;
import io.atomix.protocols.raft.protocol.ConfigureRequest;
import io.atomix.protocols.raft.protocol.ConfigureResponse;
import io.atomix.protocols.raft.protocol.ReconfigureRequest;
import io.atomix.protocols.raft.protocol.ReconfigureResponse;
import io.atomix.protocols.raft.protocol.InstallRequest;
import io.atomix.protocols.raft.protocol.InstallResponse;
import io.atomix.protocols.raft.protocol.PollRequest;
import io.atomix.protocols.raft.protocol.PollResponse;
import io.atomix.protocols.raft.protocol.VoteRequest;
import io.atomix.protocols.raft.protocol.VoteResponse;
import io.atomix.protocols.raft.protocol.TransferRequest;
import io.atomix.protocols.raft.protocol.TransferResponse;
import io.atomix.protocols.raft.protocol.AppendRequest;
import io.atomix.protocols.raft.protocol.AppendResponse;
import io.atomix.protocols.raft.protocol.ResetRequest;
import io.atomix.protocols.raft.protocol.PublishRequest;
import io.atomix.protocols.raft.protocol.HeartbeatResponse;
import io.atomix.protocols.raft.protocol.HeartbeatRequest;
import io.atomix.utils.net.Address;
import io.atomix.utils.serializer.Serializer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Raft server messaging protocol.
*/
public class RaftServerMessagingProtocol extends RaftMessagingProtocol
implements RaftServerProtocol {
public RaftServerMessagingProtocol(MessagingService messagingService,
Serializer serializer,
Function<MemberId, Address> addressProvider) {
super(messagingService, serializer, addressProvider);
}
@Override
public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
OpenSessionRequest request) {
return sendAndReceive(memberId, "open-session", request);
}
@Override
public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
CloseSessionRequest request) {
return sendAndReceive(memberId, "close-session", request);
}
@Override
public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
KeepAliveRequest request) {
return sendAndReceive(memberId, "keep-alive", request);
}
@Override
public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
return sendAndReceive(memberId, "query", request);
}
@Override
public CompletableFuture<CommandResponse> command(MemberId memberId,
CommandRequest request) {
return sendAndReceive(memberId, "command", request);
}
@Override
public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
MetadataRequest request) {
return sendAndReceive(memberId, "metadata", request);
}
@Override
public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
return sendAndReceive(memberId, "join", request);
}
@Override
public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
return sendAndReceive(memberId, "leave", request);
}
@Override
public CompletableFuture<ConfigureResponse> configure(MemberId memberId,
ConfigureRequest request) {
return sendAndReceive(memberId, "configure", request);
}
@Override
public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId,
ReconfigureRequest request) {
return sendAndReceive(memberId, "reconfigure", request);
}
@Override
public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
return sendAndReceive(memberId, "install", request);
}
@Override
public CompletableFuture<TransferResponse> transfer(MemberId memberId,
TransferRequest request) {
return sendAndReceive(memberId, "transfer", request);
}
@Override
public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
return sendAndReceive(memberId, "poll", request);
}
@Override
public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
return sendAndReceive(memberId, "vote", request);
}
@Override
public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
return sendAndReceive(memberId, "append", request);
}
@Override
public void publish(MemberId memberId, PublishRequest request) {
sendAsync(memberId, String.format("publish-%d", request.session()), request);
}
@Override
public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId,
HeartbeatRequest request) {
return sendAndReceive(memberId, "heartbeat", request);
}
@Override
public void registerOpenSessionHandler(Function<OpenSessionRequest,
CompletableFuture<OpenSessionResponse>> handler) {
registerHandler("open-session", handler);
}
@Override
public void unregisterOpenSessionHandler() {
unregisterHandler("open-session");
}
@Override
public void registerCloseSessionHandler(Function<CloseSessionRequest,
CompletableFuture<CloseSessionResponse>> handler) {
registerHandler("close-session", handler);
}
@Override
public void unregisterCloseSessionHandler() {
unregisterHandler("close-session");
}
@Override
public void registerKeepAliveHandler(Function<KeepAliveRequest,
CompletableFuture<KeepAliveResponse>> handler) {
registerHandler("keep-alive", handler);
}
@Override
public void unregisterKeepAliveHandler() {
unregisterHandler("keep-alive");
}
@Override
public void registerQueryHandler(Function<QueryRequest,
CompletableFuture<QueryResponse>> handler) {
registerHandler("query", handler);
}
@Override
public void unregisterQueryHandler() {
unregisterHandler("query");
}
@Override
public void registerCommandHandler(Function<CommandRequest,
CompletableFuture<CommandResponse>> handler) {
registerHandler("command", handler);
}
@Override
public void unregisterCommandHandler() {
unregisterHandler("command");
}
@Override
public void registerMetadataHandler(Function<MetadataRequest,
CompletableFuture<MetadataResponse>> handler) {
registerHandler("metadata", handler);
}
@Override
public void unregisterMetadataHandler() {
unregisterHandler("metadata");
}
@Override
public void registerJoinHandler(Function<JoinRequest,
CompletableFuture<JoinResponse>> handler) {
registerHandler("join", handler);
}
@Override
public void unregisterJoinHandler() {
unregisterHandler("join");
}
@Override
public void registerLeaveHandler(Function<LeaveRequest,
CompletableFuture<LeaveResponse>> handler) {
registerHandler("leave", handler);
}
@Override
public void unregisterLeaveHandler() {
unregisterHandler("leave");
}
@Override
public void registerConfigureHandler(Function<ConfigureRequest,
CompletableFuture<ConfigureResponse>> handler) {
registerHandler("configure", handler);
}
@Override
public void unregisterConfigureHandler() {
unregisterHandler("configure");
}
@Override
public void registerReconfigureHandler(Function<ReconfigureRequest,
CompletableFuture<ReconfigureResponse>> handler) {
registerHandler("reconfigure", handler);
}
@Override
public void unregisterReconfigureHandler() {
unregisterHandler("reconfigure");
}
@Override
public void registerInstallHandler(Function<InstallRequest,
CompletableFuture<InstallResponse>> handler) {
registerHandler("install", handler);
}
@Override
public void unregisterInstallHandler() {
unregisterHandler("install");
}
@Override
public void registerTransferHandler(Function<TransferRequest,
CompletableFuture<TransferResponse>> handler) {
registerHandler("transfer", handler);
}
@Override
public void unregisterTransferHandler() {
unregisterHandler("transfer");
}
@Override
public void registerPollHandler(Function<PollRequest,
CompletableFuture<PollResponse>> handler) {
registerHandler("poll", handler);
}
@Override
public void unregisterPollHandler() {
unregisterHandler("poll");
}
@Override
public void registerVoteHandler(Function<VoteRequest,
CompletableFuture<VoteResponse>> handler) {
registerHandler("vote", handler);
}
@Override
public void unregisterVoteHandler() {
unregisterHandler("vote");
}
@Override
public void registerAppendHandler(Function<AppendRequest,
CompletableFuture<AppendResponse>> handler) {
registerHandler("append", handler);
}
@Override
public void unregisterAppendHandler() {
unregisterHandler("append");
}
@Override
public void registerResetListener(SessionId sessionId,
Consumer<ResetRequest> listener, Executor executor) {
messagingService.registerHandler(String.format("reset-%d", sessionId.id()), (e, p) -> {
listener.accept(serializer.decode(p));
}, executor);
}
@Override
public void unregisterResetListener(SessionId sessionId) {
messagingService.unregisterHandler(String.format("reset-%d", sessionId.id()));
}
}

View file

@ -640,6 +640,31 @@ public class ZeppelinConfiguration extends XMLConfiguration {
return getRelativeDir(ConfVars.ZEPPELIN_SEARCH_TEMP_PATH);
}
public String getClusterAddress() {
return getString(ConfVars.ZEPPELIN_CLUSTER_ADDR);
}
public void setClusterAddress(String clusterAddr) {
properties.put(ConfVars.ZEPPELIN_CLUSTER_ADDR.getVarName(), clusterAddr);
}
public boolean isClusterMode() {
String clusterAddr = getString(ConfVars.ZEPPELIN_CLUSTER_ADDR);
if (StringUtils.isEmpty(clusterAddr)) {
return false;
}
return true;
}
public int getClusterHeartbeatInterval() {
return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL);
}
public int getClusterHeartbeatTimeout() {
return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT);
}
public Map<String, String> dumpConfigurations(Predicate<String> predicate) {
Map<String, String> properties = new HashMap<>();
@ -782,6 +807,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
ZEPPELIN_OWNER_ROLE("zeppelin.notebook.default.owner.username", ""),
ZEPPELIN_CLUSTER_ADDR("zeppelin.cluster.addr", ""),
ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 3000),
ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"),
ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""),