TDengine/source/dnode/mnode/impl/src/mndSync.c

471 lines
15 KiB
C
Raw Normal View History

2021-12-17 06:20:32 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
2021-12-25 06:40:43 +00:00
#include "mndSync.h"
#include "mndCluster.h"
2022-01-03 13:36:31 +00:00
#include "mndTrans.h"
2021-12-17 06:20:32 +00:00
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_CTRL_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
return -1;
}
2022-05-24 11:45:09 +00:00
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
if (msgcb == NULL || msgcb->putToQueueFp == NULL) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return -1;
}
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
2022-05-24 11:45:09 +00:00
}
2022-05-21 13:26:27 +00:00
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t code = tmsgSendReq(pEpSet, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
2022-05-21 13:26:27 +00:00
int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
2022-05-27 07:21:23 +00:00
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
2022-05-27 14:49:18 +00:00
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
2022-08-25 14:33:27 +00:00
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p sec:%d seq:%" PRId64,
2022-11-01 07:15:58 +00:00
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw, pMgmt->transSec, pMgmt->transSeq);
2022-05-27 07:21:23 +00:00
if (pMeta->code == 0) {
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pRaw);
if (code != 0) {
mError("trans:%d, failed to write to sdb since %s", transId, terrstr());
return 0;
}
2022-11-01 07:15:58 +00:00
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
2022-05-27 07:21:23 +00:00
}
taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = pMeta->code;
if (transId <= 0) {
taosThreadMutexUnlock(&pMgmt->lock);
mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq);
} else if (transId == pMgmt->transId) {
2022-05-27 07:21:23 +00:00
if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
} else {
mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
2022-05-27 07:21:23 +00:00
}
2022-06-16 09:24:42 +00:00
pMgmt->transId = 0;
pMgmt->transSec = 0;
2022-12-14 01:14:54 +00:00
pMgmt->transSeq = 0;
2022-06-17 01:25:15 +00:00
tsem_post(&pMgmt->syncSem);
taosThreadMutexUnlock(&pMgmt->lock);
} else {
taosThreadMutexUnlock(&pMgmt->lock);
2022-06-11 06:11:10 +00:00
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("trans:%d, execute in mnode which not leader or sync timeout", transId);
2022-06-11 06:11:10 +00:00
mndTransExecute(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
// sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
} else {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
2022-06-11 06:11:10 +00:00
}
2022-05-21 13:26:27 +00:00
}
return 0;
2022-05-21 13:26:27 +00:00
}
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
int32_t code = 0;
if (!syncUtilUserCommit(pMsg->msgType)) {
goto _out;
}
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
_out:
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return code;
}
2022-11-01 07:15:58 +00:00
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
2022-08-25 14:33:27 +00:00
mInfo("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
&pSnapshot->lastConfigIndex);
return 0;
}
static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
2022-05-23 01:58:21 +00:00
SMnode *pMnode = pFsm->data;
2022-06-17 07:23:17 +00:00
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
2022-05-21 13:26:27 +00:00
}
2022-11-01 07:15:58 +00:00
void mndRestoreFinish(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
2022-06-09 06:12:52 +00:00
2022-05-25 02:30:02 +00:00
if (!pMnode->deploy) {
2022-12-08 09:53:39 +00:00
if (!pMnode->restored) {
2022-12-08 09:14:06 +00:00
mInfo("vgId:1, sync restore finished, and will handle outstanding transactions");
mndTransPullup(pMnode);
mndSetRestored(pMnode, true);
} else {
mInfo("vgId:1, sync restore finished, repeat call");
}
2022-05-29 05:14:36 +00:00
} else {
2022-09-23 07:42:36 +00:00
mInfo("vgId:1, sync restore finished");
2022-05-25 02:30:02 +00:00
}
}
2022-11-01 07:15:58 +00:00
int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
2022-08-25 14:33:27 +00:00
mInfo("start to read snapshot from sdb");
2022-05-29 05:14:36 +00:00
SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
2022-05-29 05:14:36 +00:00
}
static void mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
2022-08-25 14:33:27 +00:00
mInfo("stop to read snapshot from sdb");
2022-05-29 05:14:36 +00:00
SMnode *pMnode = pFsm->data;
sdbStopRead(pMnode->pSdb, pReader);
2022-05-29 05:14:36 +00:00
}
2022-11-01 07:15:58 +00:00
int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
2022-05-29 05:14:36 +00:00
SMnode *pMnode = pFsm->data;
return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
}
2022-11-01 07:15:58 +00:00
int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
2022-05-29 05:14:36 +00:00
mInfo("start to apply snapshot to sdb");
SMnode *pMnode = pFsm->data;
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
}
2022-11-01 07:15:58 +00:00
int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
2022-08-03 11:17:42 +00:00
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
2022-05-29 05:14:36 +00:00
SMnode *pMnode = pFsm->data;
return sdbStopWrite(pMnode->pSdb, pWriter, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm,
pSnapshot->lastConfigIndex);
2022-05-29 05:14:36 +00:00
}
2022-11-01 07:15:58 +00:00
int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
2022-05-29 05:14:36 +00:00
SMnode *pMnode = pFsm->data;
return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
}
2022-11-01 07:15:58 +00:00
static void mndBecomeFollower(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-10-21 07:33:06 +00:00
mInfo("vgId:1, become follower");
taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) {
mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId);
pMgmt->transId = 0;
pMgmt->transSec = 0;
2022-12-14 01:14:54 +00:00
pMgmt->transSeq = 0;
pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER;
tsem_post(&pMgmt->syncSem);
}
taosThreadMutexUnlock(&pMgmt->lock);
}
2022-11-01 07:15:58 +00:00
static void mndBecomeLeader(const SSyncFSM *pFsm) {
2022-08-25 14:33:27 +00:00
mInfo("vgId:1, become leader");
SMnode *pMnode = pFsm->data;
}
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
return (itemSize == 0);
} else {
return true;
}
}
static int32_t mndApplyQueueItems(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
if (pMnode != NULL && pMnode->msgCb.qsizeFp != NULL) {
int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
return itemSize;
} else {
return -1;
}
}
2022-05-22 04:34:22 +00:00
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
2022-05-21 13:26:27 +00:00
pFsm->data = pMnode;
2022-05-22 04:34:22 +00:00
pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
2022-11-01 07:15:58 +00:00
pFsm->FpLeaderTransferCb = NULL;
pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
pFsm->FpApplyQueueItems = mndApplyQueueItems;
2022-11-01 07:15:58 +00:00
pFsm->FpReConfigCb = NULL;
pFsm->FpBecomeLeaderCb = mndBecomeLeader;
pFsm->FpBecomeFollowerCb = mndBecomeFollower;
2022-05-29 05:14:36 +00:00
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
2022-05-29 05:14:36 +00:00
pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
2022-05-21 13:26:27 +00:00
return pFsm;
2022-01-03 13:36:31 +00:00
}
int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexInit(&pMgmt->lock, NULL);
2023-01-12 08:04:57 +00:00
taosThreadMutexLock(&pMgmt->lock);
pMgmt->transId = 0;
pMgmt->transSec = 0;
pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock);
2022-01-03 13:36:31 +00:00
2022-10-24 03:57:26 +00:00
SSyncInfo syncInfo = {
.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT,
.batchSize = 1,
.vgId = 1,
.pWal = pMnode->pWal,
2022-11-01 10:34:09 +00:00
.msgcb = &pMnode->msgCb,
2022-11-01 07:40:23 +00:00
.syncSendMSg = mndSyncSendMsg,
.syncEqMsg = mndSyncEqMsg,
.syncEqCtrlMsg = mndSyncEqCtrlMsg,
2022-11-01 07:40:23 +00:00
.pingMs = 5000,
.electMs = 3000,
.heartbeatMs = 500,
2022-10-24 03:57:26 +00:00
};
2022-05-23 01:58:21 +00:00
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
2022-10-24 03:57:26 +00:00
mInfo("vgId:1, start to open sync, replica:%d selfIndex:%d", pMgmt->numOfReplicas, pMgmt->selfIndex);
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMgmt->numOfReplicas;
pCfg->myIndex = pMgmt->selfIndex;
for (int32_t i = 0; i < pMgmt->numOfReplicas; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
pNode->nodeId = pMgmt->replicas[i].id;
pNode->nodePort = pMgmt->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
2022-12-30 09:47:17 +00:00
mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId,
pNode->clusterId);
2022-05-21 13:26:27 +00:00
}
2022-05-23 05:05:35 +00:00
tsem_init(&pMgmt->syncSem, 0, 0);
2022-05-23 01:58:21 +00:00
pMgmt->sync = syncOpen(&syncInfo);
if (pMgmt->sync <= 0) {
mError("failed to open sync since %s", terrstr());
return -1;
}
2022-05-21 13:26:27 +00:00
2022-09-23 07:42:36 +00:00
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
2021-12-25 06:40:43 +00:00
return 0;
}
void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-05-23 10:15:31 +00:00
syncStop(pMgmt->sync);
2022-09-23 07:42:36 +00:00
mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
2022-05-23 10:15:31 +00:00
2022-01-03 13:36:31 +00:00
tsem_destroy(&pMgmt->syncSem);
taosThreadMutexDestroy(&pMgmt->lock);
2022-05-23 03:08:31 +00:00
memset(pMgmt, 0, sizeof(SSyncMgmt));
}
2022-05-21 13:26:27 +00:00
void mndSyncCheckTimeout(SMnode *pMnode) {
mTrace("check sync timeout");
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) {
int32_t curSec = taosGetTimestampSec();
int32_t delta = curSec - pMgmt->transSec;
2022-12-14 01:14:54 +00:00
if (delta > MNODE_TIMEOUT_SEC) {
mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
pMgmt->transSec, curSec, delta, pMgmt->transSeq);
pMgmt->transId = 0;
pMgmt->transSec = 0;
2022-12-14 01:14:54 +00:00
pMgmt->transSeq = 0;
terrno = TSDB_CODE_SYN_TIMEOUT;
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
tsem_post(&pMgmt->syncSem);
} else {
2022-12-14 01:14:54 +00:00
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
}
} else {
2022-12-13 03:28:48 +00:00
// mTrace("check sync timeout msg, no trans waiting for confirm");
}
taosThreadMutexUnlock(&pMgmt->lock);
}
2022-05-27 07:21:23 +00:00
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
2021-12-25 06:40:43 +00:00
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-09-29 11:41:54 +00:00
2022-11-01 07:15:58 +00:00
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
2022-11-05 11:59:18 +00:00
if (req.contLen <= 0) return -1;
2022-09-29 11:41:54 +00:00
req.pCont = rpcMallocCont(req.contLen);
if (req.pCont == NULL) return -1;
memcpy(req.pCont, pRaw, req.contLen);
2021-12-25 06:40:43 +00:00
taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = 0;
if (pMgmt->transId != 0) {
2022-11-01 07:15:58 +00:00
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosThreadMutexUnlock(&pMgmt->lock);
2023-01-12 07:13:49 +00:00
rpcFreeCont(req.pCont);
terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED;
return terrno;
}
2022-05-27 07:21:23 +00:00
2022-11-01 07:15:58 +00:00
mInfo("trans:%d, will be proposed", transId);
pMgmt->transId = transId;
pMgmt->transSec = taosGetTimestampSec();
2022-12-14 01:14:54 +00:00
int64_t seq = 0;
int32_t code = syncPropose(pMgmt->sync, &req, false, &seq);
2022-05-22 08:42:44 +00:00
if (code == 0) {
2022-12-14 01:14:54 +00:00
mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq);
pMgmt->transSeq = seq;
taosThreadMutexUnlock(&pMgmt->lock);
2022-05-22 08:42:44 +00:00
tsem_wait(&pMgmt->syncSem);
} else if (code > 0) {
mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId);
pMgmt->transId = 0;
pMgmt->transSec = 0;
2022-12-14 01:14:54 +00:00
pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock);
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0;
2022-05-22 08:42:44 +00:00
} else {
mError("trans:%d, failed to proposed since %s", transId, terrstr());
2022-10-21 07:33:06 +00:00
pMgmt->transId = 0;
pMgmt->transSec = 0;
2022-12-14 01:14:54 +00:00
pMgmt->transSeq = 0;
taosThreadMutexUnlock(&pMgmt->lock);
if (terrno == 0) {
2022-10-21 07:33:06 +00:00
terrno = TSDB_CODE_APP_ERROR;
}
2022-05-22 08:42:44 +00:00
}
2021-12-17 06:20:32 +00:00
rpcFreeCont(req.pCont);
2022-11-15 05:43:30 +00:00
req.pCont = NULL;
2022-05-27 07:21:23 +00:00
if (code != 0) {
mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
return code;
}
2022-11-01 07:15:58 +00:00
terrno = pMgmt->errCode;
return terrno;
2021-12-17 06:20:32 +00:00
}
2022-05-23 05:05:35 +00:00
void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-10-31 04:59:42 +00:00
if (syncStart(pMgmt->sync) < 0) {
2022-11-17 09:21:51 +00:00
mError("vgId:1, failed to start sync, id:%" PRId64, pMgmt->sync);
2022-10-31 04:59:42 +00:00
return;
}
mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync);
2022-05-23 05:05:35 +00:00
}
2022-06-16 09:24:42 +00:00
void mndSyncStop(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
taosThreadMutexLock(&pMgmt->lock);
if (pMgmt->transId != 0) {
mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId);
pMgmt->transId = 0;
pMgmt->transSec = 0;
2022-12-13 03:28:48 +00:00
pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING;
tsem_post(&pMgmt->syncSem);
2022-06-16 09:24:42 +00:00
}
taosThreadMutexUnlock(&pMgmt->lock);
2022-06-16 09:24:42 +00:00
}
2022-05-23 05:05:35 +00:00
bool mndIsLeader(SMnode *pMnode) {
terrno = 0;
2022-11-02 02:24:55 +00:00
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
if (terrno != 0) {
mDebug("vgId:1, mnode is stopping");
return false;
}
if (state.state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
mDebug("vgId:1, mnode not leader, state:%s", syncStr(state.state));
return false;
}
if (!state.restored || !pMnode->restored) {
terrno = TSDB_CODE_SYN_RESTORING;
mDebug("vgId:1, mnode not restored:%d:%d", state.restored, pMnode->restored);
return false;
}
return true;
2022-01-10 08:13:05 +00:00
}