TDengine/source/libs/sync/src/syncRequestVote.c

216 lines
7.3 KiB
C
Raw Normal View History

2022-02-22 03:28:15 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncRequestVote.h"
2022-03-14 12:43:35 +00:00
#include "syncInt.h"
2022-06-08 14:43:58 +00:00
#include "syncRaftCfg.h"
2022-03-14 12:43:35 +00:00
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
2022-02-22 03:28:15 +00:00
2022-03-07 08:17:41 +00:00
// TLA+ Spec
// HandleRequestVoteRequest(i, j, m) ==
// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
// \/ /\ m.mlastLogTerm = LastTerm(log[i])
// /\ m.mlastLogIndex >= Len(log[i])
// grant == /\ m.mterm = currentTerm[i]
// /\ logOk
// /\ votedFor[i] \in {Nil, j}
// IN /\ m.mterm <= currentTerm[i]
// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j]
// \/ ~grant /\ UNCHANGED votedFor
// /\ Reply([mtype |-> RequestVoteResponse,
// mterm |-> currentTerm[i],
// mvoteGranted |-> grant,
// \* mlog is used just for the `elections' history variable for
// \* the proof. It would not exist in a real implementation.
// mlog |-> log[i],
// msource |-> i,
// mdest |-> j],
// m)
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
2022-03-07 08:29:21 +00:00
//
2022-03-14 12:43:35 +00:00
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
2022-03-18 11:09:22 +00:00
2022-06-25 03:30:34 +00:00
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
2022-07-20 09:19:42 +00:00
syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
2022-06-25 03:30:34 +00:00
return -1;
}
bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
(pMsg->lastLogIndex >= ths->pLogStore->getLastIndex(ths->pLogStore)));
2022-06-25 03:30:34 +00:00
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
#if 0
if (logOK) {
syncNodeUpdateTerm(ths, pMsg->term);
} else {
syncNodeUpdateTermWithoutStepDown(ths, pMsg->term);
}
#endif
2022-06-25 03:30:34 +00:00
}
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) {
// maybe has already voted for pMsg->srcId
// vote again, no harm
raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
// forbid elect for this round
syncNodeResetElectTimer(ths);
}
// send msg
SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->voteGranted = grant;
// trace log
do {
2022-07-20 09:19:42 +00:00
char logBuf[32];
snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
syncLogRecvRequestVote(ths, pMsg, logBuf);
syncLogSendRequestVoteReply(ths, pReply, "");
2022-06-25 03:30:34 +00:00
} while (0);
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncRequestVoteReplyDestroy(pReply);
return ret;
}
2022-06-06 08:02:25 +00:00
static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pMsg) {
SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode);
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
2022-06-29 07:44:30 +00:00
if (myLastTerm == SYNC_TERM_INVALID) {
2022-07-19 09:20:59 +00:00
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
2022-06-29 07:44:30 +00:00
return false;
}
2022-06-06 08:02:25 +00:00
if (pMsg->lastLogTerm > myLastTerm) {
2022-07-19 09:20:59 +00:00
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
2022-06-06 08:02:25 +00:00
return true;
}
2022-07-19 09:20:59 +00:00
2022-06-06 08:02:25 +00:00
if (pMsg->lastLogTerm == myLastTerm && pMsg->lastLogIndex >= myLastIndex) {
2022-07-19 09:20:59 +00:00
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"logok:1, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
2022-06-06 08:02:25 +00:00
return true;
}
2022-07-19 09:20:59 +00:00
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"logok:0, {my-lterm:%" PRIu64 ", my-lindex:%" PRId64 ", recv-lterm:%" PRIu64 ", recv-lindex:%" PRId64
", recv-term:%" PRIu64 "}",
myLastTerm, myLastIndex, pMsg->lastLogTerm, pMsg->lastLogIndex, pMsg->term);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
2022-06-06 08:02:25 +00:00
return false;
}
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
2022-06-08 03:03:28 +00:00
// if already drop replica, do not process
2022-06-08 14:43:58 +00:00
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
2022-07-20 09:19:42 +00:00
syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
2022-06-24 05:50:23 +00:00
return -1;
2022-06-08 03:03:28 +00:00
}
bool logOK = syncNodeOnRequestVoteLogOK(ths, pMsg);
2022-06-06 08:02:25 +00:00
// maybe update term
if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term);
#if 0
if (logOK) {
syncNodeUpdateTerm(ths, pMsg->term);
} else {
syncNodeUpdateTermWithoutStepDown(ths, pMsg->term);
}
#endif
}
2022-06-06 08:02:25 +00:00
ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool grant = (pMsg->term == ths->pRaftStore->currentTerm) && logOK &&
((!raftStoreHasVoted(ths->pRaftStore)) || (syncUtilSameId(&(ths->pRaftStore->voteFor), &(pMsg->srcId))));
if (grant) {
// maybe has already voted for pMsg->srcId
// vote again, no harm
raftStoreVote(ths->pRaftStore, &(pMsg->srcId));
2022-06-06 03:46:59 +00:00
// forbid elect for this round
syncNodeResetElectTimer(ths);
}
2022-06-06 08:02:25 +00:00
// send msg
SyncRequestVoteReply* pReply = syncRequestVoteReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->voteGranted = grant;
2022-06-24 05:50:23 +00:00
// trace log
do {
2022-07-20 09:19:42 +00:00
char logBuf[32];
snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
syncLogRecvRequestVote(ths, pMsg, logBuf);
syncLogSendRequestVoteReply(ths, pReply, "");
2022-06-24 05:50:23 +00:00
} while (0);
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncRequestVoteReplyDestroy(pReply);
2022-06-24 05:50:23 +00:00
return 0;
}