TDengine/source/dnode/mnode/impl/src/mndSync.c

260 lines
7.8 KiB
C
Raw Normal View History

2021-12-17 06:20:32 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
2021-12-25 06:40:43 +00:00
#include "mndSync.h"
2022-01-03 13:36:31 +00:00
#include "mndTrans.h"
2021-12-17 06:20:32 +00:00
2022-05-27 14:49:18 +00:00
int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
2022-05-24 11:45:09 +00:00
SMsgHead *pHead = pMsg->pCont;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
2022-05-27 14:49:18 +00:00
return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
2022-05-24 11:45:09 +00:00
}
2022-05-21 13:26:27 +00:00
2022-05-22 04:34:22 +00:00
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
2022-05-21 13:26:27 +00:00
2022-05-22 04:34:22 +00:00
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
2022-05-27 07:21:23 +00:00
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont;
2022-05-27 14:49:18 +00:00
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
2022-05-27 07:21:23 +00:00
pMgmt->errCode = cbMeta.code;
mTrace("trans:%d, is proposed, savedTransId:%d code:0x%x, ver:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId,
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw);
if (pMgmt->errCode == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
}
if (pMgmt->transId == transId) {
if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
}
tsem_post(&pMgmt->syncSem);
2022-05-21 13:26:27 +00:00
}
}
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
2022-05-23 01:58:21 +00:00
SMnode *pMnode = pFsm->data;
pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
2022-05-23 05:14:54 +00:00
pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb);
2022-05-21 13:26:27 +00:00
return 0;
}
void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
2022-05-25 02:30:02 +00:00
if (!pMnode->deploy) {
2022-05-26 08:47:37 +00:00
mInfo("mnode sync restore finished");
2022-05-25 02:30:02 +00:00
mndTransPullup(pMnode);
pMnode->syncMgmt.restored = true;
}
}
2022-05-27 14:49:18 +00:00
int32_t mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, void **ppIter, char **ppBuf, int32_t *len) {
SMnode *pMnode = pFsm->data;
2022-05-27 14:49:18 +00:00
mInfo("start to read snapshot from sdb");
int32_t code = sdbReadSnapshot(pMnode->pSdb, (SSdbIter **)ppIter, ppBuf, len);
if (code != 0) {
mError("failed to read snapshot from sdb since %s", terrstr());
} else {
2022-05-27 14:49:18 +00:00
if (*ppIter == NULL) {
mInfo("successfully to read snapshot from sdb");
}
}
2022-05-27 14:49:18 +00:00
return code;
}
2022-05-27 14:49:18 +00:00
int32_t mndSnapshotApply(struct SSyncFSM *pFsm, const SSnapshot *pSnapshot, char *pBuf, int32_t len) {
SMnode *pMnode = pFsm->data;
2022-05-27 14:49:18 +00:00
pMnode->syncMgmt.restored = false;
mInfo("start to apply snapshot to sdb, len:%d", len);
int32_t code = sdbApplySnapshot(pMnode->pSdb, pBuf, len);
if (code != 0) {
mError("failed to apply snapshot to sdb, len:%d", len);
} else {
mInfo("successfully to apply snapshot to sdb, len:%d", len);
pMnode->syncMgmt.restored = true;
}
// taosMemoryFree(pBuf);
return code;
}
2022-05-25 09:44:43 +00:00
void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
2022-05-27 07:21:23 +00:00
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
pMgmt->errCode = cbMeta.code;
mInfo("trans:-1, sync reconfig is proposed, savedTransId:%d code:0x%x, curTerm:%" PRId64 " term:%" PRId64,
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term);
if (pMgmt->transId == -1) {
if (pMgmt->errCode != 0) {
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
}
tsem_post(&pMgmt->syncSem);
}
}
2022-05-22 04:34:22 +00:00
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
2022-05-21 13:26:27 +00:00
pFsm->data = pMnode;
2022-05-22 04:34:22 +00:00
pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
2022-05-22 04:34:22 +00:00
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpSnapshotRead = mndSnapshotRead;
pFsm->FpSnapshotApply = mndSnapshotApply;
pFsm->FpReConfigCb = mndReConfig;
2022-05-27 14:49:18 +00:00
2022-05-21 13:26:27 +00:00
return pFsm;
2022-01-03 13:36:31 +00:00
}
int32_t mndInitSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-05-23 03:08:31 +00:00
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
SWalCfg cfg = {
.vgId = 1,
.fsyncPeriod = 0,
.rollPeriod = -1,
.segSize = -1,
.retentionPeriod = -1,
.retentionSize = -1,
.level = TAOS_WAL_FSYNC,
};
pMgmt->pWal = walOpen(path, &cfg);
if (pMgmt->pWal == NULL) {
2022-01-03 13:36:31 +00:00
mError("failed to open wal since %s", terrstr());
return -1;
}
2022-05-23 01:58:21 +00:00
SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
syncInfo.pWal = pMgmt->pWal;
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
2022-05-25 10:32:34 +00:00
syncInfo.isStandBy = pMgmt->standby;
2022-05-23 01:58:21 +00:00
SSyncCfg *pCfg = &syncInfo.syncCfg;
2022-05-21 13:26:27 +00:00
pCfg->replicaNum = pMnode->replica;
pCfg->myIndex = pMnode->selfIndex;
2022-05-27 14:49:18 +00:00
mInfo("start to open mnode sync, replica:%d myindex:%d standby:%d", pCfg->replicaNum, pCfg->myIndex, pMgmt->standby);
2022-05-22 08:42:44 +00:00
for (int32_t i = 0; i < pMnode->replica; ++i) {
2022-05-23 01:58:21 +00:00
SNodeInfo *pNode = &pCfg->nodeInfo[i];
tstrncpy(pNode->nodeFqdn, pMnode->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
pNode->nodePort = pMnode->replicas[i].port;
2022-05-24 07:05:01 +00:00
mInfo("index:%d, fqdn:%s port:%d", i, pNode->nodeFqdn, pNode->nodePort);
2022-05-21 13:26:27 +00:00
}
2022-05-23 05:05:35 +00:00
tsem_init(&pMgmt->syncSem, 0, 0);
2022-05-23 01:58:21 +00:00
pMgmt->sync = syncOpen(&syncInfo);
if (pMgmt->sync <= 0) {
mError("failed to open sync since %s", terrstr());
return -1;
}
2022-05-21 13:26:27 +00:00
2022-05-23 10:15:31 +00:00
mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync);
2021-12-25 06:40:43 +00:00
return 0;
}
void mndCleanupSync(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-05-23 10:15:31 +00:00
syncStop(pMgmt->sync);
mDebug("sync:%" PRId64 " is stopped", pMgmt->sync);
2022-01-03 13:36:31 +00:00
tsem_destroy(&pMgmt->syncSem);
2022-05-23 03:08:31 +00:00
if (pMgmt->pWal != NULL) {
walClose(pMgmt->pWal);
2022-01-03 09:44:47 +00:00
}
2022-05-23 03:08:31 +00:00
memset(pMgmt, 0, sizeof(SSyncMgmt));
}
2022-05-21 13:26:27 +00:00
2022-05-27 07:21:23 +00:00
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
2021-12-25 06:40:43 +00:00
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-05-27 07:21:23 +00:00
SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
2022-05-22 08:42:44 +00:00
rsp.pCont = rpcMallocCont(rsp.contLen);
2022-05-23 03:08:31 +00:00
if (rsp.pCont == NULL) return -1;
2022-05-22 08:42:44 +00:00
memcpy(rsp.pCont, pRaw, rsp.contLen);
2021-12-25 06:40:43 +00:00
2022-05-27 07:21:23 +00:00
pMgmt->errCode = 0;
pMgmt->transId = transId;
mTrace("trans:%d, will be proposed", pMgmt->transId);
2022-05-23 03:08:31 +00:00
const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
2022-05-22 08:42:44 +00:00
if (code == 0) {
tsem_wait(&pMgmt->syncSem);
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
terrno = TSDB_CODE_APP_NOT_READY;
} else if (code == TAOS_SYNC_PROPOSE_OTHER_ERROR) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} else {
2022-05-23 03:08:31 +00:00
terrno = TSDB_CODE_APP_ERROR;
2022-05-22 08:42:44 +00:00
}
2021-12-17 06:20:32 +00:00
2022-05-24 03:58:50 +00:00
rpcFreeCont(rsp.pCont);
2022-05-27 07:21:23 +00:00
if (code != 0) {
mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
return code;
}
2021-12-25 06:40:43 +00:00
return pMgmt->errCode;
2021-12-17 06:20:32 +00:00
}
2022-05-23 05:05:35 +00:00
void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
2022-05-25 10:32:34 +00:00
2022-05-24 14:18:11 +00:00
if (pMgmt->standby) {
2022-05-24 09:14:12 +00:00
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
2022-05-27 13:26:04 +00:00
mDebug("sync:%" PRId64 " is started, standby:%d", pMgmt->sync, pMgmt->standby);
2022-05-23 05:05:35 +00:00
}
2022-05-23 10:15:31 +00:00
void mndSyncStop(SMnode *pMnode) {}
2022-05-23 05:05:35 +00:00
2021-12-23 06:47:46 +00:00
bool mndIsMaster(SMnode *pMnode) {
2021-12-25 06:40:43 +00:00
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-05-25 08:41:38 +00:00
ESyncState state = syncGetMyRole(pMgmt->sync);
if (state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
return false;
}
if (!pMgmt->restored) {
terrno = TSDB_CODE_APP_NOT_READY;
return false;
}
return true;
2022-01-10 08:13:05 +00:00
}