TDengine/source/dnode/mnode/impl/src/mndSync.c

280 lines
8.9 KiB
C
Raw Normal View History

2021-12-17 06:20:32 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
2021-12-25 06:40:43 +00:00
#include "mndSync.h"
2022-01-03 13:36:31 +00:00
#include "mndTrans.h"
2021-12-17 06:20:32 +00:00
static int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
2022-05-24 11:45:09 +00:00
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
2022-05-24 11:45:09 +00:00
}
2022-05-21 13:26:27 +00:00
static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t code = tmsgSendReq(pEpSet, pMsg);
if (code != 0) {
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
}
return code;
}
2022-05-21 13:26:27 +00:00
2022-05-22 04:34:22 +00:00
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
2022-05-27 07:21:23 +00:00
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
2022-05-27 14:49:18 +00:00
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
2022-05-27 07:21:23 +00:00
pMgmt->errCode = cbMeta.code;
2022-06-17 07:23:17 +00:00
mDebug("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p",
transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state),
pRaw);
2022-05-27 07:21:23 +00:00
if (pMgmt->errCode == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
2022-06-17 07:23:17 +00:00
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex);
2022-05-27 07:21:23 +00:00
}
if (pMgmt->transId == transId) {
if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
}
2022-06-16 09:24:42 +00:00
pMgmt->transId = 0;
2022-06-17 01:25:15 +00:00
tsem_post(&pMgmt->syncSem);
} else {
2022-06-11 06:11:10 +00:00
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mndTransExecute(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
}
2022-06-20 08:23:04 +00:00
#if 0
2022-06-17 07:23:17 +00:00
sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA);
2022-06-20 08:23:04 +00:00
#endif
2022-05-21 13:26:27 +00:00
}
}
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
2022-06-22 06:55:29 +00:00
mDebug("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
&pSnapshot->lastConfigIndex);
return 0;
}
int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
2022-05-23 01:58:21 +00:00
SMnode *pMnode = pFsm->data;
2022-06-17 07:23:17 +00:00
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
2022-05-21 13:26:27 +00:00
return 0;
}
void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
2022-06-09 06:12:52 +00:00
2022-05-25 02:30:02 +00:00
if (!pMnode->deploy) {
mInfo("mnode sync restore finished, and will handle outstanding transactions");
2022-05-25 02:30:02 +00:00
mndTransPullup(pMnode);
2022-05-28 04:56:33 +00:00
mndSetRestore(pMnode, true);
2022-05-29 05:14:36 +00:00
} else {
2022-06-22 06:55:29 +00:00
mInfo("mnode sync restore finished");
2022-05-25 02:30:02 +00:00
}
}
void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
2022-05-27 07:21:23 +00:00
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = cbMeta.code;
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
cbMeta.code, cbMeta.index, cbMeta.term);
2022-05-27 07:21:23 +00:00
if (pMgmt->transId == -1) {
if (pMgmt->errCode != 0) {
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
}
2022-06-16 09:24:42 +00:00
pMgmt->transId = 0;
2022-06-17 01:25:15 +00:00
tsem_post(&pMgmt->syncSem);
2022-05-27 07:21:23 +00:00
}
}
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
2022-06-22 06:55:29 +00:00
mDebug("start to read snapshot from sdb");
2022-05-29 05:14:36 +00:00
SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
2022-05-29 05:14:36 +00:00
}
int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
2022-06-22 06:55:29 +00:00
mDebug("stop to read snapshot from sdb");
2022-05-29 05:14:36 +00:00
SMnode *pMnode = pFsm->data;
return sdbStopRead(pMnode->pSdb, pReader);
}
int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
SMnode *pMnode = pFsm->data;
return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
}
int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
2022-05-29 05:14:36 +00:00
mInfo("start to apply snapshot to sdb");
SMnode *pMnode = pFsm->data;
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
}
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
mInfo("stop to apply snapshot to sdb, apply:%d", isApply);
SMnode *pMnode = pFsm->data;
return sdbStopWrite(pMnode->pSdb, pWriter, isApply);
}
int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
SMnode *pMnode = pFsm->data;
return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
}
2022-05-22 04:34:22 +00:00
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
2022-05-21 13:26:27 +00:00
pFsm->data = pMnode;
2022-05-22 04:34:22 +00:00
pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpReConfigCb = mndReConfig;
2022-05-29 05:14:36 +00:00
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo;
2022-05-29 05:14:36 +00:00
pFsm->FpSnapshotStartRead = mndSnapshotStartRead;
pFsm->FpSnapshotStopRead = mndSnapshotStopRead;
pFsm->FpSnapshotDoRead = mndSnapshotDoRead;
pFsm->FpSnapshotStartWrite = mndSnapshotStartWrite;
pFsm->FpSnapshotStopWrite = mndSnapshotStopWrite;
pFsm->FpSnapshotDoWrite = mndSnapshotDoWrite;
2022-05-21 13:26:27 +00:00
return pFsm;
2022-01-03 13:36:31 +00:00
}
int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-05-23 01:58:21 +00:00
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
syncInfo.pWal = pMnode->pWal;
2022-05-23 01:58:21 +00:00
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
2022-05-25 10:32:34 +00:00
syncInfo.isStandBy = pMgmt->standby;
2022-07-04 06:55:26 +00:00
syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT;
2022-05-23 01:58:21 +00:00
2022-06-14 02:58:36 +00:00
mInfo("start to open mnode sync, standby:%d", pMgmt->standby);
if (pMgmt->standby || pMgmt->replica.id > 0) {
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = 1;
pCfg->myIndex = 0;
SNodeInfo *pNode = &pCfg->nodeInfo[0];
tstrncpy(pNode->nodeFqdn, pMgmt->replica.fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pMgmt->replica.port;
2022-07-08 07:20:15 +00:00
mInfo("mnode ep:%s:%u", pNode->nodeFqdn, pNode->nodePort);
2022-05-21 13:26:27 +00:00
}
2022-05-23 05:05:35 +00:00
tsem_init(&pMgmt->syncSem, 0, 0);
2022-05-23 01:58:21 +00:00
pMgmt->sync = syncOpen(&syncInfo);
if (pMgmt->sync <= 0) {
mError("failed to open sync since %s", terrstr());
return -1;
}
2022-05-21 13:26:27 +00:00
2022-06-24 07:28:01 +00:00
// decrease election timer
setElectTimerMS(pMgmt->sync, 600);
setHeartbeatTimerMS(pMgmt->sync, 300);
2022-06-11 04:55:16 +00:00
mDebug("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
2021-12-25 06:40:43 +00:00
return 0;
}
void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-05-23 10:15:31 +00:00
syncStop(pMgmt->sync);
2022-06-11 04:55:16 +00:00
mDebug("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
2022-05-23 10:15:31 +00:00
2022-01-03 13:36:31 +00:00
tsem_destroy(&pMgmt->syncSem);
2022-05-23 03:08:31 +00:00
memset(pMgmt, 0, sizeof(SSyncMgmt));
}
2022-05-21 13:26:27 +00:00
2022-05-27 07:21:23 +00:00
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
2021-12-25 06:40:43 +00:00
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
req.pCont = rpcMallocCont(req.contLen);
if (req.pCont == NULL) return -1;
memcpy(req.pCont, pRaw, req.contLen);
2021-12-25 06:40:43 +00:00
2022-05-27 07:21:23 +00:00
pMgmt->errCode = 0;
pMgmt->transId = transId;
mTrace("trans:%d, will be proposed", pMgmt->transId);
2022-05-23 03:08:31 +00:00
const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
2022-05-22 08:42:44 +00:00
if (code == 0) {
tsem_wait(&pMgmt->syncSem);
2022-06-15 08:14:17 +00:00
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
2022-05-22 08:42:44 +00:00
terrno = TSDB_CODE_APP_NOT_READY;
2022-06-15 08:14:17 +00:00
} else if (code == -1 && terrno == TSDB_CODE_SYN_INTERNAL_ERROR) {
2022-05-22 08:42:44 +00:00
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} else {
2022-05-23 03:08:31 +00:00
terrno = TSDB_CODE_APP_ERROR;
2022-05-22 08:42:44 +00:00
}
2021-12-17 06:20:32 +00:00
rpcFreeCont(req.pCont);
2022-05-27 07:21:23 +00:00
if (code != 0) {
mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
return code;
}
2021-12-25 06:40:43 +00:00
return pMgmt->errCode;
2021-12-17 06:20:32 +00:00
}
2022-05-23 05:05:35 +00:00
void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
2022-06-12 07:21:56 +00:00
syncStart(pMgmt->sync);
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
2022-05-23 05:05:35 +00:00
}
2022-06-16 09:24:42 +00:00
void mndSyncStop(SMnode *pMnode) {
if (pMnode->syncMgmt.transId != 0) {
2022-06-17 01:25:15 +00:00
pMnode->syncMgmt.transId = 0;
2022-06-16 09:24:42 +00:00
tsem_post(&pMnode->syncMgmt.syncSem);
}
}
2022-05-23 05:05:35 +00:00
2021-12-23 06:47:46 +00:00
bool mndIsMaster(SMnode *pMnode) {
2021-12-25 06:40:43 +00:00
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-06-16 09:24:42 +00:00
if (!syncIsReady(pMgmt->sync)) {
// get terrno from syncIsReady
// terrno = TSDB_CODE_SYN_NOT_LEADER;
return false;
}
2022-05-28 03:11:48 +00:00
if (!pMnode->restored) {
terrno = TSDB_CODE_APP_NOT_READY;
return false;
}
return true;
2022-01-10 08:13:05 +00:00
}