TDengine/source/libs/sync/src/syncMessage.c

318 lines
9.2 KiB
C
Raw Normal View History

2022-02-22 03:28:15 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-11-11 06:35:16 +00:00
#define _DEFAULT_SOURCE
2022-02-22 03:28:15 +00:00
#include "syncMessage.h"
#include "syncRaftEntry.h"
2022-02-22 03:28:15 +00:00
2022-11-12 12:29:49 +00:00
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS,
2022-11-12 02:08:28 +00:00
SSyncNode* pNode) {
int32_t bytes = sizeof(SyncTimeout);
2022-11-12 12:29:49 +00:00
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_TIMEOUT;
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
2022-11-12 02:08:28 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-03-16 02:54:06 +00:00
}
2022-03-05 04:28:34 +00:00
2022-11-12 12:29:49 +00:00
SyncTimeout* pTimeout = pMsg->pCont;
2022-11-12 02:08:28 +00:00
pTimeout->bytes = bytes;
pTimeout->msgType = TDMT_SYNC_TIMEOUT;
pTimeout->vgId = pNode->vgId;
pTimeout->timeoutType = timeoutType;
pTimeout->logicClock = logicClock;
pTimeout->timerMS = timerMS;
2022-11-25 10:19:25 +00:00
pTimeout->timeStamp = taosGetTimestampMs();
2022-11-12 02:08:28 +00:00
pTimeout->data = pNode;
return 0;
2022-03-05 04:28:34 +00:00
}
2022-11-12 12:29:49 +00:00
int32_t syncBuildClientRequest(SRpcMsg* pMsg, const SRpcMsg* pOriginal, uint64_t seqNum, bool isWeak, int32_t vgId) {
int32_t bytes = sizeof(SyncClientRequest) + pOriginal->contLen;
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-03-09 08:34:34 +00:00
}
2022-11-12 12:29:49 +00:00
SyncClientRequest* pClientRequest = pMsg->pCont;
pClientRequest->bytes = bytes;
pClientRequest->vgId = vgId;
pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
2022-11-12 12:29:49 +00:00
pClientRequest->originalRpcType = pOriginal->msgType;
pClientRequest->seqNum = seqNum;
pClientRequest->isWeak = isWeak;
2022-11-12 12:29:49 +00:00
pClientRequest->dataLen = pOriginal->contLen;
memcpy(pClientRequest->data, (char*)pOriginal->pCont, pOriginal->contLen);
2022-03-09 08:34:34 +00:00
return 0;
2022-03-09 08:34:34 +00:00
}
2022-11-12 12:29:49 +00:00
int32_t syncBuildClientRequestFromNoopEntry(SRpcMsg* pMsg, const SSyncRaftEntry* pEntry, int32_t vgId) {
int32_t bytes = sizeof(SyncClientRequest) + pEntry->bytes;
2022-11-12 12:29:49 +00:00
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST;
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-03-11 09:08:27 +00:00
}
2022-11-12 12:29:49 +00:00
SyncClientRequest* pClientRequest = pMsg->pCont;
pClientRequest->bytes = bytes;
pClientRequest->vgId = vgId;
pClientRequest->msgType = TDMT_SYNC_CLIENT_REQUEST;
pClientRequest->originalRpcType = TDMT_SYNC_NOOP;
pClientRequest->dataLen = pEntry->bytes;
memcpy(pClientRequest->data, (char*)pEntry, pEntry->bytes);
2022-03-09 08:34:34 +00:00
return 0;
2022-03-11 09:08:27 +00:00
}
2022-11-12 04:58:08 +00:00
int32_t syncBuildRequestVote(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncRequestVote);
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE;
2022-11-12 04:58:08 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
2022-11-12 10:21:58 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-11-12 04:58:08 +00:00
return -1;
2022-03-16 02:54:06 +00:00
}
2022-03-03 06:52:30 +00:00
2022-11-12 04:58:08 +00:00
SyncRequestVote* pRequestVote = pMsg->pCont;
pRequestVote->bytes = bytes;
pRequestVote->msgType = TDMT_SYNC_REQUEST_VOTE;
pRequestVote->vgId = vgId;
return 0;
2022-03-11 02:50:50 +00:00
}
2022-11-12 05:17:56 +00:00
int32_t syncBuildRequestVoteReply(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncRequestVoteReply);
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
2022-11-12 05:17:56 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
2022-11-12 10:21:58 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-11-12 05:17:56 +00:00
return -1;
2022-03-16 02:54:06 +00:00
}
2022-03-03 06:52:30 +00:00
2022-11-12 05:17:56 +00:00
SyncRequestVoteReply* pRequestVoteReply = pMsg->pCont;
pRequestVoteReply->bytes = bytes;
pRequestVoteReply->msgType = TDMT_SYNC_REQUEST_VOTE_REPLY;
pRequestVoteReply->vgId = vgId;
return 0;
2022-03-11 03:47:56 +00:00
}
2022-11-12 08:40:09 +00:00
int32_t syncBuildAppendEntries(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
int32_t bytes = sizeof(SyncAppendEntries) + dataLen;
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES;
2022-11-12 08:40:09 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-03-16 02:54:06 +00:00
}
2022-03-03 08:15:18 +00:00
2022-11-12 08:40:09 +00:00
SyncAppendEntries* pAppendEntries = pMsg->pCont;
pAppendEntries->bytes = bytes;
pAppendEntries->vgId = vgId;
pAppendEntries->msgType = TDMT_SYNC_APPEND_ENTRIES;
pAppendEntries->dataLen = dataLen;
return 0;
2022-03-11 07:05:10 +00:00
}
2022-11-12 10:21:58 +00:00
int32_t syncBuildAppendEntriesReply(SRpcMsg* pMsg, int32_t vgId) {
2022-11-12 12:28:45 +00:00
int32_t bytes = sizeof(SyncAppendEntriesReply);
2022-11-12 10:21:58 +00:00
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
2022-11-12 10:21:58 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-03-03 08:15:18 +00:00
}
2022-11-12 10:21:58 +00:00
SyncAppendEntriesReply* pAppendEntriesReply = pMsg->pCont;
pAppendEntriesReply->bytes = bytes;
pAppendEntriesReply->msgType = TDMT_SYNC_APPEND_ENTRIES_REPLY;
pAppendEntriesReply->vgId = vgId;
return 0;
2022-03-12 09:13:49 +00:00
}
2022-04-20 03:51:00 +00:00
2022-11-12 12:37:15 +00:00
int32_t syncBuildHeartbeat(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncHeartbeat);
pMsg->pCont = rpcMallocCont(bytes);
2022-09-07 10:02:10 +00:00
pMsg->msgType = TDMT_SYNC_HEARTBEAT;
2022-11-12 12:37:15 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-09-07 07:29:04 +00:00
}
2022-11-12 12:37:15 +00:00
SyncHeartbeat* pHeartbeat = pMsg->pCont;
pHeartbeat->bytes = bytes;
pHeartbeat->msgType = TDMT_SYNC_HEARTBEAT;
pHeartbeat->vgId = vgId;
return 0;
2022-09-07 07:29:04 +00:00
}
2022-11-12 13:31:01 +00:00
int32_t syncBuildHeartbeatReply(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncHeartbeatReply);
pMsg->pCont = rpcMallocCont(bytes);
2022-09-07 10:02:10 +00:00
pMsg->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
2022-11-12 13:31:01 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-09-07 10:02:10 +00:00
}
2022-11-12 13:31:01 +00:00
SyncHeartbeatReply* pHeartbeatReply = pMsg->pCont;
pHeartbeatReply->bytes = bytes;
pHeartbeatReply->msgType = TDMT_SYNC_HEARTBEAT_REPLY;
pHeartbeatReply->vgId = vgId;
return 0;
2022-09-07 10:02:10 +00:00
}
2022-11-13 09:21:30 +00:00
#if 0
2022-11-13 08:03:45 +00:00
int32_t syncBuildPreSnapshot(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncPreSnapshot);
pMsg->pCont = rpcMallocCont(bytes);
2022-11-01 02:25:31 +00:00
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT;
2022-11-13 08:03:45 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-11-01 02:25:31 +00:00
}
2022-11-13 08:03:45 +00:00
SyncPreSnapshot* pPreSnapshot = pMsg->pCont;
pPreSnapshot->bytes = bytes;
pPreSnapshot->msgType = TDMT_SYNC_PRE_SNAPSHOT;
pPreSnapshot->vgId = vgId;
return 0;
2022-11-01 02:25:31 +00:00
}
2022-11-13 08:03:45 +00:00
int32_t syncBuildPreSnapshotReply(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncPreSnapshotReply);
pMsg->pCont = rpcMallocCont(bytes);
2022-11-01 02:47:19 +00:00
pMsg->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
2022-11-13 08:03:45 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-11-01 02:47:19 +00:00
}
2022-11-13 08:03:45 +00:00
SyncPreSnapshotReply* pPreSnapshotReply = pMsg->pCont;
pPreSnapshotReply->bytes = bytes;
pPreSnapshotReply->msgType = TDMT_SYNC_PRE_SNAPSHOT_REPLY;
pPreSnapshotReply->vgId = vgId;
return 0;
2022-11-01 02:47:19 +00:00
}
2022-11-13 09:21:30 +00:00
#endif
2022-11-01 02:47:19 +00:00
2022-11-13 08:39:21 +00:00
int32_t syncBuildSnapshotSend(SRpcMsg* pMsg, int32_t dataLen, int32_t vgId) {
int32_t bytes = sizeof(SyncSnapshotSend) + dataLen;
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_SNAPSHOT_SEND;
2022-11-13 08:39:21 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
2022-11-13 08:39:21 +00:00
SyncSnapshotSend* pSnapshotSend = pMsg->pCont;
pSnapshotSend->bytes = bytes;
pSnapshotSend->vgId = vgId;
pSnapshotSend->msgType = TDMT_SYNC_SNAPSHOT_SEND;
pSnapshotSend->dataLen = dataLen;
return 0;
}
2022-11-13 08:41:40 +00:00
int32_t syncBuildSnapshotSendRsp(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncSnapshotRsp);
pMsg->pCont = rpcMallocCont(bytes);
pMsg->msgType = TDMT_SYNC_SNAPSHOT_RSP;
2022-11-13 08:41:40 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
2022-11-13 08:41:40 +00:00
SyncSnapshotRsp* pPreSnapshotRsp = pMsg->pCont;
pPreSnapshotRsp->bytes = bytes;
pPreSnapshotRsp->msgType = TDMT_SYNC_SNAPSHOT_RSP;
pPreSnapshotRsp->vgId = vgId;
return 0;
2022-06-11 04:44:58 +00:00
}
2022-11-13 09:00:47 +00:00
int32_t syncBuildLeaderTransfer(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncLeaderTransfer);
pMsg->pCont = rpcMallocCont(bytes);
2022-06-11 04:44:58 +00:00
pMsg->msgType = TDMT_SYNC_LEADER_TRANSFER;
2022-11-13 09:00:47 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-06-11 04:44:58 +00:00
}
2022-11-13 09:00:47 +00:00
SyncLeaderTransfer* pLeaderTransfer = pMsg->pCont;
pLeaderTransfer->bytes = bytes;
pLeaderTransfer->msgType = TDMT_SYNC_LEADER_TRANSFER;
pLeaderTransfer->vgId = vgId;
return 0;
2022-06-11 04:44:58 +00:00
}
2022-11-13 09:14:03 +00:00
int32_t syncBuildLocalCmd(SRpcMsg* pMsg, int32_t vgId) {
int32_t bytes = sizeof(SyncLocalCmd);
pMsg->pCont = rpcMallocCont(bytes);
2022-10-27 09:16:46 +00:00
pMsg->msgType = TDMT_SYNC_LOCAL_CMD;
2022-11-13 09:14:03 +00:00
pMsg->contLen = bytes;
if (pMsg->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
2022-10-27 09:16:46 +00:00
}
2022-11-13 09:14:03 +00:00
SyncLocalCmd* pLocalCmd = pMsg->pCont;
pLocalCmd->bytes = bytes;
pLocalCmd->msgType = TDMT_SYNC_LOCAL_CMD;
pLocalCmd->vgId = vgId;
return 0;
2022-10-27 09:16:46 +00:00
}
2022-11-13 09:14:03 +00:00
const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
switch (timerType) {
case SYNC_TIMEOUT_PING:
return "ping";
case SYNC_TIMEOUT_ELECTION:
return "elect";
case SYNC_TIMEOUT_HEARTBEAT:
return "heartbeat";
default:
return "unknown";
2022-10-27 09:16:46 +00:00
}
}
2022-11-13 09:14:03 +00:00
const char* syncLocalCmdGetStr(ESyncLocalCmd cmd) {
switch (cmd) {
case SYNC_LOCAL_CMD_STEP_DOWN:
return "step-down";
case SYNC_LOCAL_CMD_FOLLOWER_CMT:
return "follower-commit";
default:
return "unknown-local-cmd";
2022-10-27 09:16:46 +00:00
}
2022-11-12 02:08:28 +00:00
}