mirror of
https://github.com/apache/zeppelin
synced 2026-05-24 09:38:26 +00:00
Cluster Raft module design
By using the Raft protocol, multiple Zeppelin-Server groups are built into a Zeppelin cluster, the cluster State Machine is maintained through the Raft protocol, and the services in the cluster are agreed upon. The Zeppelin-Server and Zeppelin-Interperter services and processes are stored in the Cluster MetaData. Metadata information; [Feature] * [x] add raft algorithm atomix jar * [x] add cluster state machine * [x] add state machine query command * [x] add state machine delete command * [x] add state machine put command * [x] Isolate the netty JAR package introduced by atomix * https://issues.apache.org/jira/browse/ZEPPELIN-3610 CI pass * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? Yes
This commit is contained in:
parent
92f244ef7e
commit
97f17acac5
20 changed files with 2610 additions and 8 deletions
|
|
@ -81,7 +81,7 @@ if [[ -d "${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes" ]]; then
|
|||
ZEPPELIN_INTP_CLASSPATH+=":${ZEPPELIN_HOME}/zeppelin-zengine/target/test-classes"
|
||||
fi
|
||||
|
||||
addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter/target/lib"
|
||||
addJarInDirForIntp "${ZEPPELIN_HOME}/zeppelin-interpreter-api/target"
|
||||
addJarInDirForIntp "${ZEPPELIN_HOME}/lib/interpreter"
|
||||
addJarInDirForIntp "${INTERPRETER_DIR}"
|
||||
|
||||
|
|
|
|||
|
|
@ -57,13 +57,6 @@
|
|||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>${project.groupId}</groupId>
|
||||
<artifactId>zeppelin-interpreter</artifactId>
|
||||
<version>${project.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.livy</groupId>
|
||||
<artifactId>livy-integration-test</artifactId>
|
||||
|
|
|
|||
|
|
@ -44,12 +44,137 @@
|
|||
<maven.aeither.provider.version>3.0.3</maven.aeither.provider.version>
|
||||
<wagon.version>1.0</wagon.version>
|
||||
<jline.version>2.12.1</jline.version>
|
||||
<atomix.version>3.0.0-rc4</atomix.version>
|
||||
<atomix.netty.version>4.1.11.Final</atomix.netty.version>
|
||||
<commons-math3.version>3.1.1</commons-math3.version>
|
||||
<guava.version>20.0</guava.version>
|
||||
<commons-lang3.version>3.4</commons-lang3.version>
|
||||
|
||||
<!--plugin versions-->
|
||||
<plugin.shade.version>2.3</plugin.shade.version>
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.atomix</groupId>
|
||||
<artifactId>atomix</artifactId>
|
||||
<version>${atomix.version}</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-transport</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-codec</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-handler</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-common</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-buffer</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-transport-native-epoll</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-transport-native-unix-common</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-math3</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
</exclusion>
|
||||
<exclusion>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.atomix</groupId>
|
||||
<artifactId>atomix-raft</artifactId>
|
||||
<version>${atomix.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.atomix</groupId>
|
||||
<artifactId>atomix-primary-backup</artifactId>
|
||||
<version>${atomix.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-transport</artifactId>
|
||||
<version>${atomix.netty.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-codec</artifactId>
|
||||
<version>${atomix.netty.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-handler</artifactId>
|
||||
<version>${atomix.netty.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-common</artifactId>
|
||||
<version>${atomix.netty.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-buffer</artifactId>
|
||||
<version>${atomix.netty.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-transport-native-epoll</artifactId>
|
||||
<classifier>linux-x86_64</classifier>
|
||||
<version>${atomix.netty.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-transport-native-unix-common</artifactId>
|
||||
<version>${atomix.netty.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-math3</artifactId>
|
||||
<version>${commons-math3.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.commons</groupId>
|
||||
<artifactId>commons-lang3</artifactId>
|
||||
<version>${commons-lang3.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.google.guava</groupId>
|
||||
<artifactId>guava</artifactId>
|
||||
<version>${guava.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.apache.thrift</groupId>
|
||||
|
|
|
|||
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster;
|
||||
|
||||
import io.atomix.cluster.messaging.BroadcastService;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* Broadcast Service Adapter
|
||||
*/
|
||||
public class BroadcastServiceAdapter implements BroadcastService {
|
||||
@Override
|
||||
public void broadcast(String subject, byte[] message) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addListener(String subject, Consumer<byte[]> listener) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeListener(String subject, Consumer<byte[]> listener) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,527 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import io.atomix.cluster.MemberId;
|
||||
import io.atomix.cluster.Node;
|
||||
import io.atomix.cluster.messaging.MessagingService;
|
||||
import io.atomix.cluster.messaging.impl.NettyMessagingService;
|
||||
import io.atomix.primitive.operation.OperationType;
|
||||
import io.atomix.primitive.operation.PrimitiveOperation;
|
||||
import io.atomix.primitive.operation.impl.DefaultOperationId;
|
||||
import io.atomix.primitive.partition.PartitionId;
|
||||
import io.atomix.primitive.service.ServiceConfig;
|
||||
import io.atomix.primitive.session.SessionClient;
|
||||
import io.atomix.primitive.session.SessionId;
|
||||
import io.atomix.protocols.raft.RaftClient;
|
||||
import io.atomix.protocols.raft.RaftError;
|
||||
import io.atomix.protocols.raft.ReadConsistency;
|
||||
import io.atomix.protocols.raft.cluster.RaftMember;
|
||||
import io.atomix.protocols.raft.cluster.impl.DefaultRaftMember;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
|
||||
import io.atomix.protocols.raft.protocol.QueryRequest;
|
||||
import io.atomix.protocols.raft.protocol.QueryResponse;
|
||||
import io.atomix.protocols.raft.protocol.CommandRequest;
|
||||
import io.atomix.protocols.raft.protocol.CommandResponse;
|
||||
import io.atomix.protocols.raft.protocol.MetadataRequest;
|
||||
import io.atomix.protocols.raft.protocol.MetadataResponse;
|
||||
import io.atomix.protocols.raft.protocol.JoinRequest;
|
||||
import io.atomix.protocols.raft.protocol.JoinResponse;
|
||||
import io.atomix.protocols.raft.protocol.LeaveRequest;
|
||||
import io.atomix.protocols.raft.protocol.LeaveResponse;
|
||||
import io.atomix.protocols.raft.protocol.ConfigureRequest;
|
||||
import io.atomix.protocols.raft.protocol.ConfigureResponse;
|
||||
import io.atomix.protocols.raft.protocol.ReconfigureRequest;
|
||||
import io.atomix.protocols.raft.protocol.ReconfigureResponse;
|
||||
import io.atomix.protocols.raft.protocol.InstallRequest;
|
||||
import io.atomix.protocols.raft.protocol.InstallResponse;
|
||||
import io.atomix.protocols.raft.protocol.PollRequest;
|
||||
import io.atomix.protocols.raft.protocol.PollResponse;
|
||||
import io.atomix.protocols.raft.protocol.VoteRequest;
|
||||
import io.atomix.protocols.raft.protocol.VoteResponse;
|
||||
import io.atomix.protocols.raft.protocol.AppendRequest;
|
||||
import io.atomix.protocols.raft.protocol.AppendResponse;
|
||||
import io.atomix.protocols.raft.protocol.PublishRequest;
|
||||
import io.atomix.protocols.raft.protocol.ResetRequest;
|
||||
import io.atomix.protocols.raft.protocol.RaftResponse;
|
||||
import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
|
||||
import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
|
||||
import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
|
||||
import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
|
||||
import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
|
||||
import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
|
||||
import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
|
||||
import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
|
||||
import io.atomix.protocols.raft.session.CommunicationStrategy;
|
||||
import io.atomix.protocols.raft.storage.system.Configuration;
|
||||
import io.atomix.utils.net.Address;
|
||||
import io.atomix.utils.serializer.Namespace;
|
||||
import io.atomix.utils.serializer.Serializer;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.zeppelin.cluster.meta.ClusterMeta;
|
||||
import org.apache.zeppelin.cluster.meta.ClusterMetaEntity;
|
||||
import org.apache.zeppelin.cluster.meta.ClusterMetaOperation;
|
||||
import org.apache.zeppelin.cluster.meta.ClusterMetaType;
|
||||
import org.apache.zeppelin.cluster.protocol.LocalRaftProtocolFactory;
|
||||
import org.apache.zeppelin.cluster.protocol.RaftClientMessagingProtocol;
|
||||
import org.apache.zeppelin.conf.ZeppelinConfiguration;
|
||||
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.time.Instant;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static io.atomix.primitive.operation.PrimitiveOperation.operation;
|
||||
import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.DELETE_OPERATION;
|
||||
import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.PUT_OPERATION;
|
||||
import static org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATION;
|
||||
|
||||
/**
|
||||
* The base class for cluster management, including the following implementations
|
||||
* 1. RaftClient as the raft client
|
||||
* 2. Threading to provide retry after cluster metadata submission failure
|
||||
* 3. Cluster monitoring
|
||||
*/
|
||||
public abstract class ClusterManager {
|
||||
private static Logger logger = LoggerFactory.getLogger(ClusterManager.class);
|
||||
|
||||
public final ZeppelinConfiguration zconf = ZeppelinConfiguration.create();
|
||||
|
||||
protected Collection<Node> clusterNodes = new ArrayList<>();
|
||||
|
||||
// raft
|
||||
protected static String ZEPL_CLUSTER_ID = "ZEPL-CLUSTER";
|
||||
protected static String ZEPL_CLIENT_ID = "ZEPL-CLIENT";
|
||||
|
||||
protected int raftServerPort = 0;
|
||||
|
||||
protected RaftClient raftClient = null;
|
||||
protected SessionClient raftSessionClient = null;
|
||||
protected Map<MemberId, Address> raftAddressMap = new ConcurrentHashMap<>();
|
||||
protected LocalRaftProtocolFactory protocolFactory
|
||||
= new LocalRaftProtocolFactory(protocolSerializer);
|
||||
protected List<MessagingService> messagingServices = new ArrayList<>();
|
||||
protected Collection<MemberId> clusterMemberIds = new ArrayList<MemberId>();
|
||||
|
||||
protected AtomicBoolean running = new AtomicBoolean(true);
|
||||
|
||||
// Write data through the queue to prevent failure due to network exceptions
|
||||
private ConcurrentLinkedQueue<ClusterMetaEntity> clusterMetaQueue
|
||||
= new ConcurrentLinkedQueue<>();
|
||||
|
||||
// zeppelin server host & port
|
||||
protected String zeplServerHost = "";
|
||||
|
||||
// atomic dependent netty package path
|
||||
public static String ATOMIX_NETTY_LIB_PATH = "/lib/atomix-netty";
|
||||
public static String INTERPRETER_LIB_PATH = "/lib/interpreter";
|
||||
|
||||
public ClusterManager() {
|
||||
try {
|
||||
zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
|
||||
String clusterAddr = zconf.getClusterAddress();
|
||||
if (!StringUtils.isEmpty(clusterAddr)) {
|
||||
String cluster[] = clusterAddr.split(",");
|
||||
|
||||
for (int i = 0; i < cluster.length; i++) {
|
||||
String[] parts = cluster[i].split(":");
|
||||
String clusterHost = parts[0];
|
||||
int clusterPort = Integer.valueOf(parts[1]);
|
||||
if (zeplServerHost.equalsIgnoreCase(clusterHost)) {
|
||||
raftServerPort = clusterPort;
|
||||
}
|
||||
|
||||
Node node = Node.builder().withId(cluster[i])
|
||||
.withAddress(Address.from(clusterHost, clusterPort)).build();
|
||||
clusterNodes.add(node);
|
||||
raftAddressMap.put(MemberId.from(cluster[i]), Address.from(clusterHost, clusterPort));
|
||||
clusterMemberIds.add(MemberId.from(cluster[i]));
|
||||
}
|
||||
}
|
||||
} catch (UnknownHostException e) {
|
||||
e.printStackTrace();
|
||||
} catch (SocketException e) {
|
||||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Check if the raft environment is initialized
|
||||
public abstract boolean raftInitialized();
|
||||
// Is it a cluster leader
|
||||
public abstract boolean isClusterLeader();
|
||||
|
||||
public AtomicBoolean getRunning() {
|
||||
return running;
|
||||
}
|
||||
|
||||
private SessionClient createProxy(RaftClient client) {
|
||||
return client.sessionBuilder(ClusterPrimitiveType.PRIMITIVE_NAME,
|
||||
ClusterPrimitiveType.INSTANCE, new ServiceConfig())
|
||||
.withReadConsistency(ReadConsistency.SEQUENTIAL)
|
||||
.withCommunicationStrategy(CommunicationStrategy.LEADER)
|
||||
.build()
|
||||
.connect()
|
||||
.join();
|
||||
}
|
||||
|
||||
public void start() {
|
||||
if (!zconf.isClusterMode()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// RaftClient Thread
|
||||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
logger.info("RaftClientThread run() >>>");
|
||||
|
||||
int raftClientPort = 0;
|
||||
try {
|
||||
raftClientPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
MemberId memberId = MemberId.from(ZEPL_CLIENT_ID + zeplServerHost + ":" + raftClientPort);
|
||||
Address address = Address.from(zeplServerHost, raftClientPort);
|
||||
raftAddressMap.put(memberId, address);
|
||||
|
||||
MessagingService messagingManager
|
||||
= NettyMessagingService.builder().withAddress(address).build().start().join();
|
||||
RaftClientProtocol protocol = new RaftClientMessagingProtocol(
|
||||
messagingManager, protocolSerializer, raftAddressMap::get);
|
||||
|
||||
raftClient = RaftClient.builder()
|
||||
.withMemberId(memberId)
|
||||
.withPartitionId(PartitionId.from("partition", 1))
|
||||
.withProtocol(protocol)
|
||||
.build();
|
||||
|
||||
raftClient.connect(clusterMemberIds).join();
|
||||
|
||||
raftSessionClient = createProxy(raftClient);
|
||||
|
||||
logger.info("RaftClientThread run() <<<");
|
||||
}
|
||||
}).start();
|
||||
|
||||
// Cluster Meta Consume Thread
|
||||
new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
while (getRunning().get()) {
|
||||
ClusterMetaEntity metaEntity = clusterMetaQueue.peek();
|
||||
if (null != metaEntity) {
|
||||
// Determine whether the client is connected
|
||||
int retry = 0;
|
||||
while (!raftInitialized()) {
|
||||
retry++;
|
||||
if (0 == retry % 30) {
|
||||
logger.error("Raft incomplete initialization! retry[{}]", retry);
|
||||
}
|
||||
Thread.sleep(100);
|
||||
}
|
||||
boolean success = false;
|
||||
switch (metaEntity.getOperation()) {
|
||||
case DELETE_OPERATION:
|
||||
success = deleteClusterMeta(metaEntity);
|
||||
break;
|
||||
case PUT_OPERATION:
|
||||
success = putClusterMeta(metaEntity);
|
||||
break;
|
||||
}
|
||||
if (true == success) {
|
||||
// The operation was successfully deleted
|
||||
clusterMetaQueue.remove(metaEntity);
|
||||
} else {
|
||||
logger.error("Cluster Meta Consume faild!");
|
||||
}
|
||||
} else {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
|
||||
// cluster shutdown
|
||||
public void shutdown() {
|
||||
if (!zconf.isClusterMode()) {
|
||||
return;
|
||||
}
|
||||
|
||||
running.set(false);
|
||||
|
||||
try {
|
||||
if (null != raftSessionClient) {
|
||||
raftSessionClient.close().get(3, TimeUnit.SECONDS);
|
||||
}
|
||||
if (null != raftClient) {
|
||||
raftClient.close().get(3, TimeUnit.SECONDS);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
} catch (TimeoutException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
public String getClusterName() {
|
||||
return zeplServerHost + ":" + raftServerPort;
|
||||
}
|
||||
|
||||
// put metadata into cluster metadata
|
||||
private boolean putClusterMeta(ClusterMetaEntity entity) {
|
||||
if (!raftInitialized()) {
|
||||
logger.error("Raft incomplete initialization!");
|
||||
return false;
|
||||
}
|
||||
|
||||
ClusterMetaType metaType = entity.getMetaType();
|
||||
String metaKey = entity.getKey();
|
||||
HashMap<String, Object> newMetaValue = entity.getValues();
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("putClusterMeta {} {}", metaType, metaKey);
|
||||
}
|
||||
|
||||
// add cluster name
|
||||
newMetaValue.put(ClusterMeta.SERVER_HOST, zeplServerHost);
|
||||
newMetaValue.put(ClusterMeta.SERVER_PORT, raftServerPort);
|
||||
|
||||
raftSessionClient.execute(operation(ClusterStateMachine.PUT,
|
||||
clientSerializer.encode(entity)))
|
||||
.<Long>thenApply(clientSerializer::decode);
|
||||
return true;
|
||||
}
|
||||
|
||||
// put metadata into cluster metadata
|
||||
public void putClusterMeta(ClusterMetaType type, String key, HashMap<String, Object> values) {
|
||||
ClusterMetaEntity metaEntity = new ClusterMetaEntity(PUT_OPERATION, type, key, values);
|
||||
|
||||
boolean result = putClusterMeta(metaEntity);
|
||||
if (false == result) {
|
||||
logger.warn("putClusterMeta failure, Cache metadata to queue.");
|
||||
clusterMetaQueue.add(metaEntity);
|
||||
}
|
||||
}
|
||||
|
||||
// delete metadata by cluster metadata
|
||||
private boolean deleteClusterMeta(ClusterMetaEntity entity) {
|
||||
ClusterMetaType metaType = entity.getMetaType();
|
||||
String metaKey = entity.getKey();
|
||||
|
||||
// Need to pay attention to delete metadata operations
|
||||
logger.info("deleteClusterMeta {} {}", metaType, metaKey);
|
||||
|
||||
if (!raftInitialized()) {
|
||||
logger.error("Raft incomplete initialization!");
|
||||
return false;
|
||||
}
|
||||
|
||||
raftSessionClient.execute(operation(
|
||||
ClusterStateMachine.REMOVE,
|
||||
clientSerializer.encode(entity)))
|
||||
.<Long>thenApply(clientSerializer::decode)
|
||||
.thenAccept(result -> {
|
||||
logger.info("deleteClusterMeta {}", result);
|
||||
});
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// delete metadata from cluster metadata
|
||||
public void deleteClusterMeta(ClusterMetaType type, String key) {
|
||||
ClusterMetaEntity metaEntity = new ClusterMetaEntity(DELETE_OPERATION, type, key, null);
|
||||
//clusterMetaQueue.add(metaEntity);
|
||||
|
||||
boolean result = deleteClusterMeta(metaEntity);
|
||||
if (false == result) {
|
||||
logger.warn("deleteClusterMeta faild, Cache data to queue.");
|
||||
clusterMetaQueue.add(metaEntity);
|
||||
}
|
||||
}
|
||||
|
||||
// get metadata by cluster metadata
|
||||
public HashMap<String, HashMap<String, Object>> getClusterMeta(
|
||||
ClusterMetaType metaType, String metaKey) {
|
||||
HashMap<String, HashMap<String, Object>> clusterMeta = new HashMap<>();
|
||||
if (!raftInitialized()) {
|
||||
logger.error("Raft incomplete initialization!");
|
||||
return clusterMeta;
|
||||
}
|
||||
|
||||
ClusterMetaEntity entity = new ClusterMetaEntity(GET_OPERATION, metaType, metaKey, null);
|
||||
|
||||
byte[] mateData = null;
|
||||
try {
|
||||
mateData = raftSessionClient.execute(operation(ClusterStateMachine.GET,
|
||||
clientSerializer.encode(entity))).get(3, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
} catch (ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
} catch (TimeoutException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
if (null != mateData) {
|
||||
clusterMeta = clientSerializer.decode(mateData);
|
||||
}
|
||||
|
||||
logger.info("getClusterMeta >>> {}", clusterMeta.toString());
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("getClusterMeta >>> {}", clusterMeta.toString());
|
||||
}
|
||||
|
||||
return clusterMeta;
|
||||
}
|
||||
|
||||
protected static final Serializer protocolSerializer = Serializer.using(Namespace.builder()
|
||||
.register(OpenSessionRequest.class)
|
||||
.register(OpenSessionResponse.class)
|
||||
.register(CloseSessionRequest.class)
|
||||
.register(CloseSessionResponse.class)
|
||||
.register(KeepAliveRequest.class)
|
||||
.register(KeepAliveResponse.class)
|
||||
.register(QueryRequest.class)
|
||||
.register(QueryResponse.class)
|
||||
.register(CommandRequest.class)
|
||||
.register(CommandResponse.class)
|
||||
.register(MetadataRequest.class)
|
||||
.register(MetadataResponse.class)
|
||||
.register(JoinRequest.class)
|
||||
.register(JoinResponse.class)
|
||||
.register(LeaveRequest.class)
|
||||
.register(LeaveResponse.class)
|
||||
.register(ConfigureRequest.class)
|
||||
.register(ConfigureResponse.class)
|
||||
.register(ReconfigureRequest.class)
|
||||
.register(ReconfigureResponse.class)
|
||||
.register(InstallRequest.class)
|
||||
.register(InstallResponse.class)
|
||||
.register(PollRequest.class)
|
||||
.register(PollResponse.class)
|
||||
.register(VoteRequest.class)
|
||||
.register(VoteResponse.class)
|
||||
.register(AppendRequest.class)
|
||||
.register(AppendResponse.class)
|
||||
.register(PublishRequest.class)
|
||||
.register(ResetRequest.class)
|
||||
.register(RaftResponse.Status.class)
|
||||
.register(RaftError.class)
|
||||
.register(RaftError.Type.class)
|
||||
.register(PrimitiveOperation.class)
|
||||
.register(ReadConsistency.class)
|
||||
.register(byte[].class)
|
||||
.register(long[].class)
|
||||
.register(CloseSessionEntry.class)
|
||||
.register(CommandEntry.class)
|
||||
.register(ConfigurationEntry.class)
|
||||
.register(InitializeEntry.class)
|
||||
.register(KeepAliveEntry.class)
|
||||
.register(MetadataEntry.class)
|
||||
.register(OpenSessionEntry.class)
|
||||
.register(QueryEntry.class)
|
||||
.register(PrimitiveOperation.class)
|
||||
.register(DefaultOperationId.class)
|
||||
.register(OperationType.class)
|
||||
.register(ReadConsistency.class)
|
||||
.register(ArrayList.class)
|
||||
.register(HashMap.class)
|
||||
.register(ClusterMetaEntity.class)
|
||||
.register(Date.class)
|
||||
.register(Collections.emptyList().getClass())
|
||||
.register(HashSet.class)
|
||||
.register(DefaultRaftMember.class)
|
||||
.register(MemberId.class)
|
||||
.register(SessionId.class)
|
||||
.register(RaftMember.Type.class)
|
||||
.register(Instant.class)
|
||||
.register(Configuration.class)
|
||||
.build());
|
||||
|
||||
protected static final Serializer storageSerializer = Serializer.using(Namespace.builder()
|
||||
.register(CloseSessionEntry.class)
|
||||
.register(CommandEntry.class)
|
||||
.register(ConfigurationEntry.class)
|
||||
.register(InitializeEntry.class)
|
||||
.register(KeepAliveEntry.class)
|
||||
.register(MetadataEntry.class)
|
||||
.register(OpenSessionEntry.class)
|
||||
.register(QueryEntry.class)
|
||||
.register(PrimitiveOperation.class)
|
||||
.register(DefaultOperationId.class)
|
||||
.register(OperationType.class)
|
||||
.register(ReadConsistency.class)
|
||||
.register(ArrayList.class)
|
||||
.register(ClusterMetaEntity.class)
|
||||
.register(HashMap.class)
|
||||
.register(HashSet.class)
|
||||
.register(Date.class)
|
||||
.register(DefaultRaftMember.class)
|
||||
.register(MemberId.class)
|
||||
.register(RaftMember.Type.class)
|
||||
.register(Instant.class)
|
||||
.register(Configuration.class)
|
||||
.register(byte[].class)
|
||||
.register(long[].class)
|
||||
.build());
|
||||
|
||||
protected static final Serializer clientSerializer = Serializer.using(Namespace.builder()
|
||||
.register(ReadConsistency.class)
|
||||
.register(ClusterMetaEntity.class)
|
||||
.register(ClusterMetaOperation.class)
|
||||
.register(ClusterMetaType.class)
|
||||
.register(HashMap.class)
|
||||
.register(Date.class)
|
||||
.register(Maps.immutableEntry(new String(), new Object()).getClass())
|
||||
.build());
|
||||
}
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster;
|
||||
|
||||
import io.atomix.primitive.PrimitiveBuilder;
|
||||
import io.atomix.primitive.PrimitiveManagementService;
|
||||
import io.atomix.primitive.PrimitiveType;
|
||||
import io.atomix.primitive.config.PrimitiveConfig;
|
||||
import io.atomix.primitive.service.PrimitiveService;
|
||||
import io.atomix.primitive.service.ServiceConfig;
|
||||
|
||||
/**
|
||||
* Cluster primitive type
|
||||
*/
|
||||
public class ClusterPrimitiveType implements PrimitiveType {
|
||||
public static final ClusterPrimitiveType INSTANCE = new ClusterPrimitiveType();
|
||||
|
||||
public static final String PRIMITIVE_NAME = "CLUSTER_PRIMITIVE";
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return PRIMITIVE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrimitiveConfig newConfig() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrimitiveBuilder newBuilder(String primitiveName,
|
||||
PrimitiveConfig config,
|
||||
PrimitiveManagementService managementService) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PrimitiveService newService(ServiceConfig config) {
|
||||
return new ClusterStateMachine();
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,176 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import io.atomix.primitive.operation.OperationId;
|
||||
import io.atomix.primitive.service.AbstractPrimitiveService;
|
||||
import io.atomix.primitive.service.BackupOutput;
|
||||
import io.atomix.primitive.service.BackupInput;
|
||||
import io.atomix.primitive.service.Commit;
|
||||
import io.atomix.primitive.service.ServiceExecutor;
|
||||
import io.atomix.utils.serializer.Serializer;
|
||||
import org.apache.zeppelin.cluster.meta.ClusterMeta;
|
||||
import org.apache.zeppelin.cluster.meta.ClusterMetaEntity;
|
||||
import org.apache.zeppelin.cluster.meta.ClusterMetaType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Cluster State Machine
|
||||
*/
|
||||
public class ClusterStateMachine extends AbstractPrimitiveService {
|
||||
private static Logger logger = LoggerFactory.getLogger(ClusterStateMachine.class);
|
||||
private ClusterMeta clusterMeta = new ClusterMeta();
|
||||
|
||||
// Command to operation a variable in cluster state machine
|
||||
public static final OperationId PUT = OperationId.command("put");
|
||||
public static final OperationId GET = OperationId.query("get");
|
||||
public static final OperationId REMOVE = OperationId.command("remove");
|
||||
public static final OperationId INDEX = OperationId.command("index");
|
||||
|
||||
public ClusterStateMachine() {
|
||||
super(ClusterPrimitiveType.INSTANCE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Serializer serializer() {
|
||||
return ClusterManager.clientSerializer;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configure(ServiceExecutor executor) {
|
||||
executor.register(PUT, this::put);
|
||||
executor.register(GET, this::get);
|
||||
executor.register(REMOVE, this::remove);
|
||||
executor.register(INDEX, this::index);
|
||||
}
|
||||
|
||||
protected long put(Commit<ClusterMetaEntity> commit) {
|
||||
clusterMeta.put(commit.value().getMetaType(),
|
||||
commit.value().getKey(), commit.value().getValues());
|
||||
return commit.index();
|
||||
}
|
||||
|
||||
protected Map<String, Map<String, Object>> get(Commit<ClusterMetaEntity> commit) {
|
||||
return clusterMeta.get(commit.value().getMetaType(), commit.value().getKey());
|
||||
}
|
||||
|
||||
protected long remove(Commit<ClusterMetaEntity> commit) {
|
||||
clusterMeta.remove(commit.value().getMetaType(), commit.value().getKey());
|
||||
return commit.index();
|
||||
}
|
||||
|
||||
protected long index(Commit<Void> commit) {
|
||||
return commit.index();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void backup(BackupOutput writer) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("ClusterStateMachine.backup()");
|
||||
}
|
||||
|
||||
// backup ServerMeta
|
||||
// cluster meta map struct
|
||||
// cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
|
||||
Map<String, Map<String, Object>> mapServerMeta
|
||||
= clusterMeta.get(ClusterMetaType.ServerMeta, "");
|
||||
// write all ServerMeta size
|
||||
writer.writeInt(mapServerMeta.size());
|
||||
for (Map.Entry<String, Map<String, Object>> entry : mapServerMeta.entrySet()) {
|
||||
// write cluster_name
|
||||
writer.writeString(entry.getKey());
|
||||
|
||||
Map<String, Object> kvPairs = entry.getValue();
|
||||
// write cluster mate kv pairs size
|
||||
writer.writeInt(kvPairs.size());
|
||||
for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) {
|
||||
// write cluster mate kv pairs
|
||||
writer.writeString(entryValue.getKey());
|
||||
writer.writeObject(entryValue.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// backup IntpProcessMeta
|
||||
// Interpreter meta map struct
|
||||
// IntpGroupId -> {server_tserver_host,server_tserver_port,...}
|
||||
Map<String, Map<String, Object>> mapIntpProcMeta
|
||||
= clusterMeta.get(ClusterMetaType.IntpProcessMeta, "");
|
||||
// write interpreter size
|
||||
writer.writeInt(mapIntpProcMeta.size());
|
||||
for (Map.Entry<String, Map<String, Object>> entry : mapIntpProcMeta.entrySet()) {
|
||||
// write IntpGroupId
|
||||
writer.writeString(entry.getKey());
|
||||
|
||||
Map<String, Object> kvPairs = entry.getValue();
|
||||
// write interpreter mate kv pairs size
|
||||
writer.writeInt(kvPairs.size());
|
||||
for (Map.Entry<String, Object> entryValue : kvPairs.entrySet()) {
|
||||
// write interpreter mate kv pairs
|
||||
writer.writeString(entryValue.getKey());
|
||||
writer.writeObject(entryValue.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restore(BackupInput reader) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("ClusterStateMachine.restore()");
|
||||
}
|
||||
|
||||
clusterMeta = new ClusterMeta();
|
||||
// read all ServerMeta size
|
||||
int nServerMeta = reader.readInt();
|
||||
for (int i = 0; i < nServerMeta; i++) {
|
||||
// read cluster_name
|
||||
String clusterName = reader.readString();
|
||||
|
||||
// read cluster mate kv pairs size
|
||||
int nKVpairs = reader.readInt();
|
||||
for (int j = 0; j < nKVpairs; i++) {
|
||||
// read cluster mate kv pairs
|
||||
String key = reader.readString();
|
||||
Object value = reader.readObject();
|
||||
|
||||
clusterMeta.put(ClusterMetaType.ServerMeta,
|
||||
clusterName, Maps.immutableEntry(key, value));
|
||||
}
|
||||
}
|
||||
|
||||
// read all IntpProcessMeta size
|
||||
int nIntpMeta = reader.readInt();
|
||||
for (int i = 0; i < nIntpMeta; i++) {
|
||||
// read interpreter name
|
||||
String intpName = reader.readString();
|
||||
|
||||
// read interpreter mate kv pairs size
|
||||
int nKVpairs = reader.readInt();
|
||||
for (int j = 0; j < nKVpairs; i++) {
|
||||
// read interpreter mate kv pairs
|
||||
String key = reader.readString();
|
||||
Object value = reader.readObject();
|
||||
|
||||
clusterMeta.put(ClusterMetaType.IntpProcessMeta,
|
||||
intpName, Maps.immutableEntry(key, value));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.listener;
|
||||
|
||||
import io.atomix.cluster.ClusterMembershipEvent;
|
||||
import io.atomix.cluster.ClusterMembershipEventListener;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Zeppelin ClusterMembershipEventListener
|
||||
*/
|
||||
public class ZeppelinClusterMembershipEventListener implements ClusterMembershipEventListener {
|
||||
private static Logger logger
|
||||
= LoggerFactory.getLogger(ZeppelinClusterMembershipEventListener.class);
|
||||
|
||||
@Override
|
||||
public void event(ClusterMembershipEvent event) {
|
||||
switch (event.type()) {
|
||||
case MEMBER_ADDED:
|
||||
logger.info(event.subject().id() + " joined the cluster.");
|
||||
break;
|
||||
case MEMBER_REMOVED:
|
||||
logger.info(event.subject().id() + " left the cluster.");
|
||||
break;
|
||||
case METADATA_CHANGED:
|
||||
logger.info(event.subject().id() + " meta data changed.");
|
||||
break;
|
||||
case REACHABILITY_CHANGED:
|
||||
logger.info(event.subject().id() + " reachability changed.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,144 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.meta;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Metadata stores metadata information in a KV key-value pair
|
||||
*/
|
||||
public class ClusterMeta implements Serializable {
|
||||
private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class);
|
||||
|
||||
// zeppelin-server meta
|
||||
public static String SERVER_HOST = "SERVER_HOST";
|
||||
public static String SERVER_PORT = "SERVER_PORT";
|
||||
public static String SERVER_TSERVER_HOST = "SERVER_TSERVER_HOST";
|
||||
public static String SERVER_TSERVER_PORT = "SERVER_TSERVER_PORT";
|
||||
public static String SERVER_START_TIME = "SERVER_START_TIME";
|
||||
|
||||
// interperter-process meta
|
||||
public static String INTP_TSERVER_HOST = "INTP_TSERVER_HOST";
|
||||
public static String INTP_TSERVER_PORT = "INTP_TSERVER_PORT";
|
||||
public static String INTP_START_TIME = "INTP_START_TIME";
|
||||
|
||||
// zeppelin-server resource usage
|
||||
public static String CPU_CAPACITY = "CPU_CAPACITY";
|
||||
public static String CPU_USED = "CPU_USED";
|
||||
public static String MEMORY_CAPACITY = "MEMORY_CAPACITY";
|
||||
public static String MEMORY_USED = "MEMORY_USED";
|
||||
|
||||
public static String HEARTBEAT = "HEARTBEAT";
|
||||
|
||||
// zeppelin-server or interperter-process status
|
||||
public static String STATUS = "STATUS";
|
||||
public static String ONLINE_STATUS = "ONLINE";
|
||||
public static String OFFLINE_STATUS = "OFFLINE";
|
||||
|
||||
// cluster_name = host:port
|
||||
// Map:cluster_name -> {server_tserver_host,server_tserver_port,cpu_capacity,...}
|
||||
private Map<String, Map<String, Object>> mapServerMeta = new HashMap<>();
|
||||
|
||||
// Map:InterpreterGroupId -> {cluster_name,intp_tserver_host,...}
|
||||
private Map<String, Map<String, Object>> mapInterpreterMeta = new HashMap<>();
|
||||
|
||||
public static Gson gson = new Gson();
|
||||
|
||||
public void put(ClusterMetaType type, String key, Object value) {
|
||||
Map<String, Object> mapValue = (Map<String, Object>) value;
|
||||
|
||||
switch (type) {
|
||||
case ServerMeta:
|
||||
// Because it may be partially updated metadata information
|
||||
if (mapServerMeta.containsKey(key)) {
|
||||
Map<String, Object> values = mapServerMeta.get(key);
|
||||
values.putAll(mapValue);
|
||||
} else {
|
||||
mapServerMeta.put(key, mapValue);
|
||||
}
|
||||
break;
|
||||
case IntpProcessMeta:
|
||||
if (mapInterpreterMeta.containsKey(key)) {
|
||||
Map<String, Object> values = mapInterpreterMeta.get(key);
|
||||
values.putAll(mapValue);
|
||||
} else {
|
||||
mapInterpreterMeta.put(key, mapValue);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public Map<String, Map<String, Object>> get(ClusterMetaType type, String key) {
|
||||
Map<String, Object> values = null;
|
||||
|
||||
switch (type) {
|
||||
case ServerMeta:
|
||||
if (null == key || StringUtils.isEmpty(key)) {
|
||||
return mapServerMeta;
|
||||
}
|
||||
if (mapServerMeta.containsKey(key)) {
|
||||
values = mapServerMeta.get(key);
|
||||
} else {
|
||||
logger.warn("can not find key : {}", key);
|
||||
}
|
||||
break;
|
||||
case IntpProcessMeta:
|
||||
if (null == key || StringUtils.isEmpty(key)) {
|
||||
return mapInterpreterMeta;
|
||||
}
|
||||
if (mapInterpreterMeta.containsKey(key)) {
|
||||
values = mapInterpreterMeta.get(key);
|
||||
} else {
|
||||
logger.warn("can not find key : {}", key);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
Map<String, Map<String, Object>> result = new HashMap<>();
|
||||
result.put(key, values);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
public Map<String, Object> remove(ClusterMetaType type, String key) {
|
||||
switch (type) {
|
||||
case ServerMeta:
|
||||
if (mapServerMeta.containsKey(key)) {
|
||||
return mapServerMeta.remove(key);
|
||||
} else {
|
||||
logger.warn("can not find key : {}", key);
|
||||
}
|
||||
break;
|
||||
case IntpProcessMeta:
|
||||
if (mapInterpreterMeta.containsKey(key)) {
|
||||
return mapInterpreterMeta.remove(key);
|
||||
} else {
|
||||
logger.warn("can not find key : {}", key);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.meta;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
|
||||
/**
|
||||
* Cluster operations, cluster types, encapsulation objects for keys and values
|
||||
*/
|
||||
public class ClusterMetaEntity implements Serializable {
|
||||
private ClusterMetaOperation operation;
|
||||
private ClusterMetaType type;
|
||||
private String key;
|
||||
private HashMap<String, Object> values = new HashMap<>();
|
||||
|
||||
public ClusterMetaEntity(ClusterMetaOperation operation, ClusterMetaType type,
|
||||
String key, HashMap<String, Object> values) {
|
||||
this.operation = operation;
|
||||
this.type = type;
|
||||
this.key = key;
|
||||
|
||||
if (null != values) {
|
||||
this.values.putAll(values);
|
||||
}
|
||||
}
|
||||
|
||||
public ClusterMetaOperation getOperation() {
|
||||
return operation;
|
||||
}
|
||||
|
||||
public ClusterMetaType getMetaType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
|
||||
public HashMap<String, Object> getValues() {
|
||||
return values;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.meta;
|
||||
|
||||
/**
|
||||
* Type of cluster metadata operation
|
||||
*/
|
||||
public enum ClusterMetaOperation {
|
||||
GET_OPERATION,
|
||||
PUT_OPERATION,
|
||||
DELETE_OPERATION
|
||||
}
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.meta;
|
||||
|
||||
/**
|
||||
* Type of cluster metadata
|
||||
*/
|
||||
public enum ClusterMetaType {
|
||||
ServerMeta,
|
||||
IntpProcessMeta
|
||||
}
|
||||
|
|
@ -0,0 +1,162 @@
|
|||
/*
|
||||
* Copyright 2017-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.protocol;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import io.atomix.cluster.MemberId;
|
||||
import io.atomix.primitive.session.SessionId;
|
||||
|
||||
import io.atomix.protocols.raft.protocol.HeartbeatRequest;
|
||||
import io.atomix.protocols.raft.protocol.PublishRequest;
|
||||
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
|
||||
import io.atomix.protocols.raft.protocol.HeartbeatResponse;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
|
||||
import io.atomix.protocols.raft.protocol.QueryResponse;
|
||||
import io.atomix.protocols.raft.protocol.QueryRequest;
|
||||
import io.atomix.protocols.raft.protocol.CommandResponse;
|
||||
import io.atomix.protocols.raft.protocol.CommandRequest;
|
||||
import io.atomix.protocols.raft.protocol.MetadataResponse;
|
||||
import io.atomix.protocols.raft.protocol.MetadataRequest;
|
||||
import io.atomix.protocols.raft.protocol.ResetRequest;
|
||||
import io.atomix.utils.concurrent.Futures;
|
||||
import io.atomix.utils.serializer.Serializer;
|
||||
|
||||
import java.net.ConnectException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Cluster Raft client protocol.
|
||||
*/
|
||||
public class LocalRaftClientProtocol extends LocalRaftProtocol implements RaftClientProtocol {
|
||||
private Function<HeartbeatRequest, CompletableFuture<HeartbeatResponse>> heartbeatHandler;
|
||||
private final Map<Long, Consumer<PublishRequest>> publishListeners = Maps.newConcurrentMap();
|
||||
|
||||
public LocalRaftClientProtocol(MemberId memberId,
|
||||
Serializer serializer,
|
||||
Map<MemberId, LocalRaftServerProtocol> servers,
|
||||
Map<MemberId, LocalRaftClientProtocol> clients) {
|
||||
super(serializer, servers, clients);
|
||||
clients.put(memberId, this);
|
||||
}
|
||||
|
||||
private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) {
|
||||
LocalRaftServerProtocol server = server(memberId);
|
||||
if (server != null) {
|
||||
return Futures.completedFuture(server);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
|
||||
OpenSessionRequest request) {
|
||||
return getServer(memberId).thenCompose(protocol ->
|
||||
protocol.openSession(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
|
||||
CloseSessionRequest request) {
|
||||
return getServer(memberId).thenCompose(protocol ->
|
||||
protocol.closeSession(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
|
||||
KeepAliveRequest request) {
|
||||
return getServer(memberId).thenCompose(protocol ->
|
||||
protocol.keepAlive(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
|
||||
return getServer(memberId).thenCompose(protocol ->
|
||||
protocol.query(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CommandResponse> command(MemberId memberId,
|
||||
CommandRequest request) {
|
||||
return getServer(memberId).thenCompose(protocol ->
|
||||
protocol.command(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
|
||||
MetadataRequest request) {
|
||||
return getServer(memberId).thenCompose(protocol ->
|
||||
protocol.metadata(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> heartbeat(byte[] request) {
|
||||
if (heartbeatHandler != null) {
|
||||
return heartbeatHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerHeartbeatHandler(Function<HeartbeatRequest,
|
||||
CompletableFuture<HeartbeatResponse>> handler) {
|
||||
this.heartbeatHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterHeartbeatHandler() {
|
||||
this.heartbeatHandler = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset(Set<MemberId> members, ResetRequest request) {
|
||||
members.forEach(nodeId -> {
|
||||
LocalRaftServerProtocol server = server(nodeId);
|
||||
if (server != null) {
|
||||
server.reset(request.session(), encode(request));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void publish(long sessionId, byte[] request) {
|
||||
Consumer<PublishRequest> listener = publishListeners.get(sessionId);
|
||||
if (listener != null) {
|
||||
listener.accept(decode(request));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerPublishListener(SessionId sessionId,
|
||||
Consumer<PublishRequest> listener, Executor executor) {
|
||||
publishListeners.put(sessionId.id(), request ->
|
||||
executor.execute(() -> listener.accept(request)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterPublishListener(SessionId sessionId) {
|
||||
publishListeners.remove(sessionId.id());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,58 @@
|
|||
/*
|
||||
* Copyright 2017-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.protocol;
|
||||
|
||||
import io.atomix.cluster.MemberId;
|
||||
import io.atomix.utils.serializer.Serializer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Base class for Raft protocol.
|
||||
*/
|
||||
public abstract class LocalRaftProtocol {
|
||||
private final Serializer serializer;
|
||||
private final Map<MemberId, LocalRaftServerProtocol> servers;
|
||||
private final Map<MemberId, LocalRaftClientProtocol> clients;
|
||||
|
||||
public LocalRaftProtocol(Serializer serializer,
|
||||
Map<MemberId, LocalRaftServerProtocol> servers,
|
||||
Map<MemberId, LocalRaftClientProtocol> clients) {
|
||||
this.serializer = serializer;
|
||||
this.servers = servers;
|
||||
this.clients = clients;
|
||||
}
|
||||
|
||||
<T> T copy(T value) {
|
||||
return serializer.decode(serializer.encode(value));
|
||||
}
|
||||
|
||||
byte[] encode(Object value) {
|
||||
return serializer.encode(value);
|
||||
}
|
||||
|
||||
<T> T decode(byte[] bytes) {
|
||||
return serializer.decode(bytes);
|
||||
}
|
||||
|
||||
LocalRaftServerProtocol server(MemberId memberId) {
|
||||
return servers.get(memberId);
|
||||
}
|
||||
|
||||
LocalRaftClientProtocol client(MemberId memberId) {
|
||||
return clients.get(memberId);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright 2017-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.protocol;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import io.atomix.cluster.MemberId;
|
||||
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
|
||||
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
|
||||
import io.atomix.utils.serializer.Serializer;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Cluster Raft protocol factory.
|
||||
*/
|
||||
public class LocalRaftProtocolFactory {
|
||||
private final Serializer serializer;
|
||||
private final Map<MemberId, LocalRaftServerProtocol> servers = Maps.newConcurrentMap();
|
||||
private final Map<MemberId, LocalRaftClientProtocol> clients = Maps.newConcurrentMap();
|
||||
|
||||
public LocalRaftProtocolFactory(Serializer serializer) {
|
||||
this.serializer = serializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new test client protocol.
|
||||
*
|
||||
* @param memberId the client member identifier
|
||||
* @return a new test client protocol
|
||||
*/
|
||||
public RaftClientProtocol newClientProtocol(MemberId memberId) {
|
||||
return new LocalRaftClientProtocol(memberId, serializer, servers, clients);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new test server protocol.
|
||||
*
|
||||
* @param memberId the server member identifier
|
||||
* @return a new test server protocol
|
||||
*/
|
||||
public RaftServerProtocol newServerProtocol(MemberId memberId) {
|
||||
return new LocalRaftServerProtocol(memberId, serializer, servers, clients);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,527 @@
|
|||
/*
|
||||
* Copyright 2017-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.protocol;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import io.atomix.cluster.MemberId;
|
||||
import io.atomix.primitive.session.SessionId;
|
||||
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
|
||||
import io.atomix.protocols.raft.protocol.QueryRequest;
|
||||
import io.atomix.protocols.raft.protocol.QueryResponse;
|
||||
import io.atomix.protocols.raft.protocol.CommandRequest;
|
||||
import io.atomix.protocols.raft.protocol.CommandResponse;
|
||||
import io.atomix.protocols.raft.protocol.MetadataRequest;
|
||||
import io.atomix.protocols.raft.protocol.MetadataResponse;
|
||||
import io.atomix.protocols.raft.protocol.JoinRequest;
|
||||
import io.atomix.protocols.raft.protocol.JoinResponse;
|
||||
import io.atomix.protocols.raft.protocol.LeaveRequest;
|
||||
import io.atomix.protocols.raft.protocol.LeaveResponse;
|
||||
import io.atomix.protocols.raft.protocol.ConfigureRequest;
|
||||
import io.atomix.protocols.raft.protocol.ConfigureResponse;
|
||||
import io.atomix.protocols.raft.protocol.ReconfigureRequest;
|
||||
import io.atomix.protocols.raft.protocol.ReconfigureResponse;
|
||||
import io.atomix.protocols.raft.protocol.InstallRequest;
|
||||
import io.atomix.protocols.raft.protocol.InstallResponse;
|
||||
import io.atomix.protocols.raft.protocol.PollRequest;
|
||||
import io.atomix.protocols.raft.protocol.PollResponse;
|
||||
import io.atomix.protocols.raft.protocol.VoteRequest;
|
||||
import io.atomix.protocols.raft.protocol.VoteResponse;
|
||||
import io.atomix.protocols.raft.protocol.TransferRequest;
|
||||
import io.atomix.protocols.raft.protocol.TransferResponse;
|
||||
import io.atomix.protocols.raft.protocol.AppendRequest;
|
||||
import io.atomix.protocols.raft.protocol.AppendResponse;
|
||||
import io.atomix.protocols.raft.protocol.ResetRequest;
|
||||
import io.atomix.protocols.raft.protocol.PublishRequest;
|
||||
import io.atomix.protocols.raft.protocol.HeartbeatResponse;
|
||||
import io.atomix.protocols.raft.protocol.HeartbeatRequest;
|
||||
|
||||
import io.atomix.utils.concurrent.Futures;
|
||||
import io.atomix.utils.serializer.Serializer;
|
||||
|
||||
import java.net.ConnectException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Cluster server protocol.
|
||||
*/
|
||||
public class LocalRaftServerProtocol extends LocalRaftProtocol implements RaftServerProtocol {
|
||||
private Function<OpenSessionRequest, CompletableFuture<OpenSessionResponse>> openSessionHandler;
|
||||
private Function<CloseSessionRequest, CompletableFuture<CloseSessionResponse>>
|
||||
closeSessionHandler;
|
||||
private Function<KeepAliveRequest, CompletableFuture<KeepAliveResponse>> keepAliveHandler;
|
||||
private Function<QueryRequest, CompletableFuture<QueryResponse>> queryHandler;
|
||||
private Function<CommandRequest, CompletableFuture<CommandResponse>> commandHandler;
|
||||
private Function<MetadataRequest, CompletableFuture<MetadataResponse>> metadataHandler;
|
||||
private Function<JoinRequest, CompletableFuture<JoinResponse>> joinHandler;
|
||||
private Function<LeaveRequest, CompletableFuture<LeaveResponse>> leaveHandler;
|
||||
private Function<ConfigureRequest, CompletableFuture<ConfigureResponse>> configureHandler;
|
||||
private Function<ReconfigureRequest, CompletableFuture<ReconfigureResponse>> reconfigureHandler;
|
||||
private Function<InstallRequest, CompletableFuture<InstallResponse>> installHandler;
|
||||
private Function<PollRequest, CompletableFuture<PollResponse>> pollHandler;
|
||||
private Function<VoteRequest, CompletableFuture<VoteResponse>> voteHandler;
|
||||
private Function<TransferRequest, CompletableFuture<TransferResponse>> transferHandler;
|
||||
private Function<AppendRequest, CompletableFuture<AppendResponse>> appendHandler;
|
||||
private final Map<Long, Consumer<ResetRequest>> resetListeners = Maps.newConcurrentMap();
|
||||
|
||||
public LocalRaftServerProtocol(MemberId memberId, Serializer serializer,
|
||||
Map<MemberId, LocalRaftServerProtocol> servers,
|
||||
Map<MemberId, LocalRaftClientProtocol> clients) {
|
||||
super(serializer, servers, clients);
|
||||
servers.put(memberId, this);
|
||||
}
|
||||
|
||||
private CompletableFuture<LocalRaftServerProtocol> getServer(MemberId memberId) {
|
||||
LocalRaftServerProtocol server = server(memberId);
|
||||
if (server != null) {
|
||||
return Futures.completedFuture(server);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
private CompletableFuture<LocalRaftClientProtocol> getClient(MemberId memberId) {
|
||||
LocalRaftClientProtocol client = client(memberId);
|
||||
if (client != null) {
|
||||
return Futures.completedFuture(client);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
|
||||
OpenSessionRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.openSession(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
|
||||
CloseSessionRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.closeSession(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
|
||||
KeepAliveRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.keepAlive(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.query(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CommandResponse> command(MemberId memberId,
|
||||
CommandRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.command(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
|
||||
MetadataRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.metadata(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.join(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.leave(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ConfigureResponse> configure(MemberId memberId,
|
||||
ConfigureRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.configure(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId,
|
||||
ReconfigureRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.reconfigure(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.install(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<TransferResponse> transfer(MemberId memberId, TransferRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.install(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.poll(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.vote(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
|
||||
return getServer(memberId).thenCompose(listener ->
|
||||
listener.append(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publish(MemberId memberId, PublishRequest request) {
|
||||
getClient(memberId).thenAccept(protocol ->
|
||||
protocol.publish(request.session(), encode(request)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId,
|
||||
HeartbeatRequest request) {
|
||||
return getClient(memberId).thenCompose(protocol ->
|
||||
protocol.heartbeat(encode(request))).thenApply(this::decode);
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> openSession(byte[] request) {
|
||||
if (openSessionHandler != null) {
|
||||
return openSessionHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerOpenSessionHandler(Function<OpenSessionRequest,
|
||||
CompletableFuture<OpenSessionResponse>> handler) {
|
||||
this.openSessionHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterOpenSessionHandler() {
|
||||
this.openSessionHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> closeSession(byte[] request) {
|
||||
if (closeSessionHandler != null) {
|
||||
return closeSessionHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerCloseSessionHandler(Function<CloseSessionRequest,
|
||||
CompletableFuture<CloseSessionResponse>> handler) {
|
||||
this.closeSessionHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterCloseSessionHandler() {
|
||||
this.closeSessionHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> keepAlive(byte[] request) {
|
||||
if (keepAliveHandler != null) {
|
||||
return keepAliveHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerKeepAliveHandler(Function<KeepAliveRequest,
|
||||
CompletableFuture<KeepAliveResponse>> handler) {
|
||||
this.keepAliveHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterKeepAliveHandler() {
|
||||
this.keepAliveHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> query(byte[] request) {
|
||||
if (queryHandler != null) {
|
||||
return queryHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerQueryHandler(Function<QueryRequest,
|
||||
CompletableFuture<QueryResponse>> handler) {
|
||||
this.queryHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterQueryHandler() {
|
||||
this.queryHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> command(byte[] request) {
|
||||
if (commandHandler != null) {
|
||||
return commandHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerCommandHandler(Function<CommandRequest,
|
||||
CompletableFuture<CommandResponse>> handler) {
|
||||
this.commandHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterCommandHandler() {
|
||||
this.commandHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> metadata(byte[] request) {
|
||||
if (metadataHandler != null) {
|
||||
return metadataHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerMetadataHandler(Function<MetadataRequest,
|
||||
CompletableFuture<MetadataResponse>> handler) {
|
||||
this.metadataHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterMetadataHandler() {
|
||||
this.metadataHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> join(byte[] request) {
|
||||
if (joinHandler != null) {
|
||||
return joinHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerJoinHandler(Function<JoinRequest,
|
||||
CompletableFuture<JoinResponse>> handler) {
|
||||
this.joinHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterJoinHandler() {
|
||||
this.joinHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> leave(byte[] request) {
|
||||
if (leaveHandler != null) {
|
||||
return leaveHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerLeaveHandler(Function<LeaveRequest,
|
||||
CompletableFuture<LeaveResponse>> handler) {
|
||||
this.leaveHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterLeaveHandler() {
|
||||
this.leaveHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> configure(byte[] request) {
|
||||
if (configureHandler != null) {
|
||||
return configureHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerConfigureHandler(Function<ConfigureRequest,
|
||||
CompletableFuture<ConfigureResponse>> handler) {
|
||||
this.configureHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterConfigureHandler() {
|
||||
this.configureHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> reconfigure(byte[] request) {
|
||||
if (reconfigureHandler != null) {
|
||||
return reconfigureHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerReconfigureHandler(Function<ReconfigureRequest,
|
||||
CompletableFuture<ReconfigureResponse>> handler) {
|
||||
this.reconfigureHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterReconfigureHandler() {
|
||||
this.reconfigureHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> install(byte[] request) {
|
||||
if (installHandler != null) {
|
||||
return installHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerInstallHandler(Function<InstallRequest,
|
||||
CompletableFuture<InstallResponse>> handler) {
|
||||
this.installHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterInstallHandler() {
|
||||
this.installHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> poll(byte[] request) {
|
||||
if (pollHandler != null) {
|
||||
return pollHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerPollHandler(Function<PollRequest,
|
||||
CompletableFuture<PollResponse>> handler) {
|
||||
this.pollHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterPollHandler() {
|
||||
this.pollHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> vote(byte[] request) {
|
||||
if (voteHandler != null) {
|
||||
return voteHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerVoteHandler(Function<VoteRequest,
|
||||
CompletableFuture<VoteResponse>> handler) {
|
||||
this.voteHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterVoteHandler() {
|
||||
this.voteHandler = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerTransferHandler(Function<TransferRequest,
|
||||
CompletableFuture<TransferResponse>> handler) {
|
||||
this.transferHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterTransferHandler() {
|
||||
this.transferHandler = null;
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> transfer(byte[] request) {
|
||||
if (transferHandler != null) {
|
||||
return transferHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
CompletableFuture<byte[]> append(byte[] request) {
|
||||
if (appendHandler != null) {
|
||||
return appendHandler.apply(decode(request)).thenApply(this::encode);
|
||||
} else {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerAppendHandler(Function<AppendRequest,
|
||||
CompletableFuture<AppendResponse>> handler) {
|
||||
this.appendHandler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterAppendHandler() {
|
||||
this.appendHandler = null;
|
||||
}
|
||||
|
||||
void reset(long sessionId, byte[] request) {
|
||||
Consumer<ResetRequest> listener = resetListeners.get(sessionId);
|
||||
if (listener != null) {
|
||||
listener.accept(decode(request));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerResetListener(SessionId sessionId,
|
||||
Consumer<ResetRequest> listener, Executor executor) {
|
||||
resetListeners.put(sessionId.id(), request -> executor.execute(()
|
||||
-> listener.accept(request)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterResetListener(SessionId sessionId) {
|
||||
resetListeners.remove(sessionId.id());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Copyright 2017-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.protocol;
|
||||
|
||||
import io.atomix.cluster.MemberId;
|
||||
import io.atomix.cluster.messaging.MessagingService;
|
||||
import io.atomix.primitive.session.SessionId;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.RaftClientProtocol;
|
||||
import io.atomix.protocols.raft.protocol.HeartbeatRequest;
|
||||
import io.atomix.protocols.raft.protocol.PublishRequest;
|
||||
import io.atomix.protocols.raft.protocol.HeartbeatResponse;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
|
||||
import io.atomix.protocols.raft.protocol.QueryResponse;
|
||||
import io.atomix.protocols.raft.protocol.QueryRequest;
|
||||
import io.atomix.protocols.raft.protocol.CommandResponse;
|
||||
import io.atomix.protocols.raft.protocol.CommandRequest;
|
||||
import io.atomix.protocols.raft.protocol.MetadataResponse;
|
||||
import io.atomix.protocols.raft.protocol.MetadataRequest;
|
||||
import io.atomix.protocols.raft.protocol.ResetRequest;
|
||||
import io.atomix.utils.net.Address;
|
||||
import io.atomix.utils.serializer.Serializer;
|
||||
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Raft client messaging service protocol.
|
||||
*/
|
||||
public class RaftClientMessagingProtocol extends RaftMessagingProtocol
|
||||
implements RaftClientProtocol {
|
||||
public RaftClientMessagingProtocol(MessagingService messagingService,
|
||||
Serializer serializer,
|
||||
Function<MemberId, Address> addressProvider) {
|
||||
super(messagingService, serializer, addressProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
|
||||
OpenSessionRequest request) {
|
||||
return sendAndReceive(memberId, "open-session", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
|
||||
CloseSessionRequest request) {
|
||||
return sendAndReceive(memberId, "close-session", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
|
||||
KeepAliveRequest request) {
|
||||
return sendAndReceive(memberId, "keep-alive", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
|
||||
return sendAndReceive(memberId, "query", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CommandResponse> command(MemberId memberId,
|
||||
CommandRequest request) {
|
||||
return sendAndReceive(memberId, "command", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
|
||||
MetadataRequest request) {
|
||||
return sendAndReceive(memberId, "metadata", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerHeartbeatHandler(Function<HeartbeatRequest,
|
||||
CompletableFuture<HeartbeatResponse>> handler) {
|
||||
registerHandler("heartbeat", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterHeartbeatHandler() {
|
||||
unregisterHandler("heartbeat");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset(Set<MemberId> members, ResetRequest request) {
|
||||
for (MemberId memberId : members) {
|
||||
sendAsync(memberId, String.format("reset-%d", request.session()), request);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerPublishListener(SessionId sessionId, Consumer<PublishRequest> listener,
|
||||
Executor executor) {
|
||||
messagingService.registerHandler(String.format("publish-%d", sessionId.id()), (e, p) -> {
|
||||
listener.accept(serializer.decode(p));
|
||||
}, executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterPublishListener(SessionId sessionId) {
|
||||
messagingService.unregisterHandler(String.format("publish-%d", sessionId.id()));
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
* Copyright 2017-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.protocol;
|
||||
|
||||
import io.atomix.cluster.MemberId;
|
||||
import io.atomix.cluster.messaging.MessagingService;
|
||||
import io.atomix.utils.concurrent.Futures;
|
||||
import io.atomix.utils.net.Address;
|
||||
import io.atomix.utils.serializer.Serializer;
|
||||
|
||||
import java.net.ConnectException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Messaging service based Raft protocol.
|
||||
*/
|
||||
public abstract class RaftMessagingProtocol {
|
||||
protected final MessagingService messagingService;
|
||||
protected final Serializer serializer;
|
||||
private final Function<MemberId, Address> addressProvider;
|
||||
|
||||
public RaftMessagingProtocol(MessagingService messagingService,
|
||||
Serializer serializer,
|
||||
Function<MemberId, Address> addressProvider) {
|
||||
this.messagingService = messagingService;
|
||||
this.serializer = serializer;
|
||||
this.addressProvider = addressProvider;
|
||||
}
|
||||
|
||||
protected Address address(MemberId memberId) {
|
||||
return addressProvider.apply(memberId);
|
||||
}
|
||||
|
||||
protected <T, U> CompletableFuture<U> sendAndReceive(MemberId memberId,
|
||||
String type, T request) {
|
||||
Address address = address(memberId);
|
||||
if (address == null) {
|
||||
return Futures.exceptionalFuture(new ConnectException());
|
||||
}
|
||||
return messagingService.sendAndReceive(address, type, serializer.encode(request))
|
||||
.thenApply(serializer::decode);
|
||||
}
|
||||
|
||||
protected CompletableFuture<Void> sendAsync(MemberId memberId, String type, Object request) {
|
||||
Address address = address(memberId);
|
||||
if (address != null) {
|
||||
return messagingService.sendAsync(address(memberId), type, serializer.encode(request));
|
||||
}
|
||||
return CompletableFuture.completedFuture(null);
|
||||
}
|
||||
|
||||
protected <T, U> void registerHandler(String type, Function<T, CompletableFuture<U>> handler) {
|
||||
messagingService.registerHandler(type, (e, p) -> {
|
||||
CompletableFuture<byte[]> future = new CompletableFuture<>();
|
||||
handler.apply(serializer.decode(p)).whenComplete((result, error) -> {
|
||||
if (error == null) {
|
||||
future.complete(serializer.encode(result));
|
||||
} else {
|
||||
future.completeExceptionally(error);
|
||||
}
|
||||
});
|
||||
return future;
|
||||
});
|
||||
}
|
||||
|
||||
protected void unregisterHandler(String type) {
|
||||
messagingService.unregisterHandler(type);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,346 @@
|
|||
/*
|
||||
* Copyright 2017-present Open Networking Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.zeppelin.cluster.protocol;
|
||||
|
||||
import io.atomix.cluster.MemberId;
|
||||
import io.atomix.cluster.messaging.MessagingService;
|
||||
import io.atomix.primitive.session.SessionId;
|
||||
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.OpenSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionRequest;
|
||||
import io.atomix.protocols.raft.protocol.CloseSessionResponse;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveRequest;
|
||||
import io.atomix.protocols.raft.protocol.KeepAliveResponse;
|
||||
import io.atomix.protocols.raft.protocol.QueryRequest;
|
||||
import io.atomix.protocols.raft.protocol.QueryResponse;
|
||||
import io.atomix.protocols.raft.protocol.CommandRequest;
|
||||
import io.atomix.protocols.raft.protocol.CommandResponse;
|
||||
import io.atomix.protocols.raft.protocol.MetadataRequest;
|
||||
import io.atomix.protocols.raft.protocol.MetadataResponse;
|
||||
import io.atomix.protocols.raft.protocol.JoinRequest;
|
||||
import io.atomix.protocols.raft.protocol.JoinResponse;
|
||||
import io.atomix.protocols.raft.protocol.LeaveRequest;
|
||||
import io.atomix.protocols.raft.protocol.LeaveResponse;
|
||||
import io.atomix.protocols.raft.protocol.ConfigureRequest;
|
||||
import io.atomix.protocols.raft.protocol.ConfigureResponse;
|
||||
import io.atomix.protocols.raft.protocol.ReconfigureRequest;
|
||||
import io.atomix.protocols.raft.protocol.ReconfigureResponse;
|
||||
import io.atomix.protocols.raft.protocol.InstallRequest;
|
||||
import io.atomix.protocols.raft.protocol.InstallResponse;
|
||||
import io.atomix.protocols.raft.protocol.PollRequest;
|
||||
import io.atomix.protocols.raft.protocol.PollResponse;
|
||||
import io.atomix.protocols.raft.protocol.VoteRequest;
|
||||
import io.atomix.protocols.raft.protocol.VoteResponse;
|
||||
import io.atomix.protocols.raft.protocol.TransferRequest;
|
||||
import io.atomix.protocols.raft.protocol.TransferResponse;
|
||||
import io.atomix.protocols.raft.protocol.AppendRequest;
|
||||
import io.atomix.protocols.raft.protocol.AppendResponse;
|
||||
import io.atomix.protocols.raft.protocol.ResetRequest;
|
||||
import io.atomix.protocols.raft.protocol.PublishRequest;
|
||||
import io.atomix.protocols.raft.protocol.HeartbeatResponse;
|
||||
import io.atomix.protocols.raft.protocol.HeartbeatRequest;
|
||||
import io.atomix.utils.net.Address;
|
||||
import io.atomix.utils.serializer.Serializer;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* Raft server messaging protocol.
|
||||
*/
|
||||
public class RaftServerMessagingProtocol extends RaftMessagingProtocol
|
||||
implements RaftServerProtocol {
|
||||
public RaftServerMessagingProtocol(MessagingService messagingService,
|
||||
Serializer serializer,
|
||||
Function<MemberId, Address> addressProvider) {
|
||||
super(messagingService, serializer, addressProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<OpenSessionResponse> openSession(MemberId memberId,
|
||||
OpenSessionRequest request) {
|
||||
return sendAndReceive(memberId, "open-session", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CloseSessionResponse> closeSession(MemberId memberId,
|
||||
CloseSessionRequest request) {
|
||||
return sendAndReceive(memberId, "close-session", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<KeepAliveResponse> keepAlive(MemberId memberId,
|
||||
KeepAliveRequest request) {
|
||||
return sendAndReceive(memberId, "keep-alive", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<QueryResponse> query(MemberId memberId, QueryRequest request) {
|
||||
return sendAndReceive(memberId, "query", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<CommandResponse> command(MemberId memberId,
|
||||
CommandRequest request) {
|
||||
return sendAndReceive(memberId, "command", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<MetadataResponse> metadata(MemberId memberId,
|
||||
MetadataRequest request) {
|
||||
return sendAndReceive(memberId, "metadata", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<JoinResponse> join(MemberId memberId, JoinRequest request) {
|
||||
return sendAndReceive(memberId, "join", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<LeaveResponse> leave(MemberId memberId, LeaveRequest request) {
|
||||
return sendAndReceive(memberId, "leave", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ConfigureResponse> configure(MemberId memberId,
|
||||
ConfigureRequest request) {
|
||||
return sendAndReceive(memberId, "configure", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<ReconfigureResponse> reconfigure(MemberId memberId,
|
||||
ReconfigureRequest request) {
|
||||
return sendAndReceive(memberId, "reconfigure", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<InstallResponse> install(MemberId memberId, InstallRequest request) {
|
||||
return sendAndReceive(memberId, "install", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<TransferResponse> transfer(MemberId memberId,
|
||||
TransferRequest request) {
|
||||
return sendAndReceive(memberId, "transfer", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<PollResponse> poll(MemberId memberId, PollRequest request) {
|
||||
return sendAndReceive(memberId, "poll", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<VoteResponse> vote(MemberId memberId, VoteRequest request) {
|
||||
return sendAndReceive(memberId, "vote", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest request) {
|
||||
return sendAndReceive(memberId, "append", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publish(MemberId memberId, PublishRequest request) {
|
||||
sendAsync(memberId, String.format("publish-%d", request.session()), request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<HeartbeatResponse> heartbeat(MemberId memberId,
|
||||
HeartbeatRequest request) {
|
||||
return sendAndReceive(memberId, "heartbeat", request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerOpenSessionHandler(Function<OpenSessionRequest,
|
||||
CompletableFuture<OpenSessionResponse>> handler) {
|
||||
registerHandler("open-session", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterOpenSessionHandler() {
|
||||
unregisterHandler("open-session");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerCloseSessionHandler(Function<CloseSessionRequest,
|
||||
CompletableFuture<CloseSessionResponse>> handler) {
|
||||
registerHandler("close-session", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterCloseSessionHandler() {
|
||||
unregisterHandler("close-session");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerKeepAliveHandler(Function<KeepAliveRequest,
|
||||
CompletableFuture<KeepAliveResponse>> handler) {
|
||||
registerHandler("keep-alive", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterKeepAliveHandler() {
|
||||
unregisterHandler("keep-alive");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerQueryHandler(Function<QueryRequest,
|
||||
CompletableFuture<QueryResponse>> handler) {
|
||||
registerHandler("query", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterQueryHandler() {
|
||||
unregisterHandler("query");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerCommandHandler(Function<CommandRequest,
|
||||
CompletableFuture<CommandResponse>> handler) {
|
||||
registerHandler("command", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterCommandHandler() {
|
||||
unregisterHandler("command");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerMetadataHandler(Function<MetadataRequest,
|
||||
CompletableFuture<MetadataResponse>> handler) {
|
||||
registerHandler("metadata", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterMetadataHandler() {
|
||||
unregisterHandler("metadata");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerJoinHandler(Function<JoinRequest,
|
||||
CompletableFuture<JoinResponse>> handler) {
|
||||
registerHandler("join", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterJoinHandler() {
|
||||
unregisterHandler("join");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerLeaveHandler(Function<LeaveRequest,
|
||||
CompletableFuture<LeaveResponse>> handler) {
|
||||
registerHandler("leave", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterLeaveHandler() {
|
||||
unregisterHandler("leave");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerConfigureHandler(Function<ConfigureRequest,
|
||||
CompletableFuture<ConfigureResponse>> handler) {
|
||||
registerHandler("configure", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterConfigureHandler() {
|
||||
unregisterHandler("configure");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerReconfigureHandler(Function<ReconfigureRequest,
|
||||
CompletableFuture<ReconfigureResponse>> handler) {
|
||||
registerHandler("reconfigure", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterReconfigureHandler() {
|
||||
unregisterHandler("reconfigure");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerInstallHandler(Function<InstallRequest,
|
||||
CompletableFuture<InstallResponse>> handler) {
|
||||
registerHandler("install", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterInstallHandler() {
|
||||
unregisterHandler("install");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerTransferHandler(Function<TransferRequest,
|
||||
CompletableFuture<TransferResponse>> handler) {
|
||||
registerHandler("transfer", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterTransferHandler() {
|
||||
unregisterHandler("transfer");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerPollHandler(Function<PollRequest,
|
||||
CompletableFuture<PollResponse>> handler) {
|
||||
registerHandler("poll", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterPollHandler() {
|
||||
unregisterHandler("poll");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerVoteHandler(Function<VoteRequest,
|
||||
CompletableFuture<VoteResponse>> handler) {
|
||||
registerHandler("vote", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterVoteHandler() {
|
||||
unregisterHandler("vote");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerAppendHandler(Function<AppendRequest,
|
||||
CompletableFuture<AppendResponse>> handler) {
|
||||
registerHandler("append", handler);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterAppendHandler() {
|
||||
unregisterHandler("append");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void registerResetListener(SessionId sessionId,
|
||||
Consumer<ResetRequest> listener, Executor executor) {
|
||||
messagingService.registerHandler(String.format("reset-%d", sessionId.id()), (e, p) -> {
|
||||
listener.accept(serializer.decode(p));
|
||||
}, executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterResetListener(SessionId sessionId) {
|
||||
messagingService.unregisterHandler(String.format("reset-%d", sessionId.id()));
|
||||
}
|
||||
}
|
||||
|
|
@ -640,6 +640,31 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
return getRelativeDir(ConfVars.ZEPPELIN_SEARCH_TEMP_PATH);
|
||||
}
|
||||
|
||||
public String getClusterAddress() {
|
||||
return getString(ConfVars.ZEPPELIN_CLUSTER_ADDR);
|
||||
}
|
||||
|
||||
public void setClusterAddress(String clusterAddr) {
|
||||
properties.put(ConfVars.ZEPPELIN_CLUSTER_ADDR.getVarName(), clusterAddr);
|
||||
}
|
||||
|
||||
public boolean isClusterMode() {
|
||||
String clusterAddr = getString(ConfVars.ZEPPELIN_CLUSTER_ADDR);
|
||||
if (StringUtils.isEmpty(clusterAddr)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
public int getClusterHeartbeatInterval() {
|
||||
return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL);
|
||||
}
|
||||
|
||||
public int getClusterHeartbeatTimeout() {
|
||||
return getInt(ConfVars.ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT);
|
||||
}
|
||||
|
||||
public Map<String, String> dumpConfigurations(Predicate<String> predicate) {
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
|
||||
|
|
@ -782,6 +807,10 @@ public class ZeppelinConfiguration extends XMLConfiguration {
|
|||
|
||||
ZEPPELIN_OWNER_ROLE("zeppelin.notebook.default.owner.username", ""),
|
||||
|
||||
ZEPPELIN_CLUSTER_ADDR("zeppelin.cluster.addr", ""),
|
||||
ZEPPELIN_CLUSTER_HEARTBEAT_INTERVAL("zeppelin.cluster.heartbeat.interval", 3000),
|
||||
ZEPPELIN_CLUSTER_HEARTBEAT_TIMEOUT("zeppelin.cluster.heartbeat.timeout", 9000),
|
||||
|
||||
ZEPPELIN_NOTEBOOK_GIT_REMOTE_URL("zeppelin.notebook.git.remote.url", ""),
|
||||
ZEPPELIN_NOTEBOOK_GIT_REMOTE_USERNAME("zeppelin.notebook.git.remote.username", "token"),
|
||||
ZEPPELIN_NOTEBOOK_GIT_REMOTE_ACCESS_TOKEN("zeppelin.notebook.git.remote.access-token", ""),
|
||||
|
|
|
|||
Loading…
Reference in a new issue