TDengine/source/dnode/mnode/impl/src/mndMain.c

860 lines
28 KiB
C
Raw Normal View History

2021-11-11 09:31:52 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
2021-11-27 14:56:18 +00:00
#include "mndAcct.h"
2021-12-29 12:05:10 +00:00
#include "mndBnode.h"
2021-11-27 14:56:18 +00:00
#include "mndCluster.h"
2022-01-20 01:49:27 +00:00
#include "mndConsumer.h"
2021-11-27 14:56:18 +00:00
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
2022-04-20 13:36:55 +00:00
#include "mndGrant.h"
2022-03-02 09:48:39 +00:00
#include "mndInfoSchema.h"
2021-11-27 14:56:18 +00:00
#include "mndMnode.h"
2022-02-23 08:04:06 +00:00
#include "mndOffset.h"
2022-04-20 13:36:55 +00:00
#include "mndPerfSchema.h"
2022-06-28 06:23:45 +00:00
#include "mndPrivilege.h"
2021-11-27 14:56:18 +00:00
#include "mndProfile.h"
2021-12-29 12:05:10 +00:00
#include "mndQnode.h"
2022-04-20 13:36:55 +00:00
#include "mndQuery.h"
2021-11-27 14:56:18 +00:00
#include "mndShow.h"
2022-03-23 09:53:00 +00:00
#include "mndSma.h"
2021-12-29 12:05:10 +00:00
#include "mndSnode.h"
2021-12-10 07:21:34 +00:00
#include "mndStb.h"
2022-03-10 12:48:38 +00:00
#include "mndStream.h"
2022-01-20 01:49:27 +00:00
#include "mndSubscribe.h"
2021-11-27 14:56:18 +00:00
#include "mndSync.h"
#include "mndTelem.h"
2021-12-24 06:37:06 +00:00
#include "mndTopic.h"
2021-11-27 14:56:18 +00:00
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
2021-11-11 09:31:52 +00:00
2022-02-16 05:44:43 +00:00
static void *mndBuildTimerMsg(int32_t *pContLen) {
SMTimerReq timerReq = {0};
int32_t contLen = tSerializeSMTimerMsg(NULL, 0, &timerReq);
if (contLen <= 0) return NULL;
void *pReq = rpcMallocCont(contLen);
if (pReq == NULL) return NULL;
tSerializeSMTimerMsg(pReq, contLen, &timerReq);
*pContLen = contLen;
return pReq;
}
2022-05-12 15:30:01 +00:00
static void mndPullupTrans(SMnode *pMnode) {
int32_t contLen = 0;
2022-07-04 06:55:26 +00:00
void *pReq = mndBuildTimerMsg(&contLen);
2022-06-21 07:05:31 +00:00
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
2021-12-26 15:46:22 +00:00
}
2022-06-20 07:23:48 +00:00
static void mndTtlTimer(SMnode *pMnode) {
2022-06-21 10:01:40 +00:00
int32_t contLen = 0;
2022-07-04 06:55:26 +00:00
void *pReq = mndBuildTimerMsg(&contLen);
2022-06-21 10:01:40 +00:00
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
2022-06-20 07:23:48 +00:00
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
2022-05-12 15:30:01 +00:00
static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0;
2022-07-04 06:55:26 +00:00
void *pReq = mndBuildTimerMsg(&contLen);
2022-06-21 07:05:31 +00:00
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
2022-05-12 15:30:01 +00:00
}
2022-01-18 08:02:12 +00:00
2022-05-12 15:30:01 +00:00
static void mndPullupTelem(SMnode *pMnode) {
int32_t contLen = 0;
2022-07-04 06:55:26 +00:00
void *pReq = mndBuildTimerMsg(&contLen);
2022-06-21 07:05:31 +00:00
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
2022-02-18 09:17:22 +00:00
}
2022-07-06 12:46:41 +00:00
static void mndGrantHeartBeat(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
2022-05-12 15:30:01 +00:00
static void *mndThreadFp(void *param) {
2022-02-18 09:17:22 +00:00
SMnode *pMnode = param;
2022-05-12 15:30:01 +00:00
int64_t lastTime = 0;
setThreadName("mnode-timer");
while (1) {
lastTime++;
taosMsleep(100);
2022-05-28 04:56:33 +00:00
if (mndGetStop(pMnode)) break;
2022-05-12 15:30:01 +00:00
2022-06-23 11:58:12 +00:00
if (lastTime % (tsTtlPushInterval * 10) == 1) {
2022-06-20 07:23:48 +00:00
mndTtlTimer(pMnode);
}
2022-05-12 15:30:01 +00:00
if (lastTime % (tsTransPullupInterval * 10) == 0) {
mndPullupTrans(pMnode);
}
if (lastTime % (tsMqRebalanceInterval * 10) == 0) {
mndCalMqRebalance(pMnode);
}
if (lastTime % (tsTelemInterval * 10) == 0) {
mndPullupTelem(pMnode);
}
2022-07-06 12:46:41 +00:00
if (lastTime % (tsGrantHBInterval * 10) == 0) {
mndGrantHeartBeat(pMnode);
}
2022-02-18 09:17:22 +00:00
}
2022-05-12 15:30:01 +00:00
return NULL;
2022-01-18 08:02:12 +00:00
}
2021-11-29 04:09:18 +00:00
static int32_t mndInitTimer(SMnode *pMnode) {
2022-05-12 15:30:01 +00:00
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode) != 0) {
mError("failed to create timer thread since %s", strerror(errno));
2022-03-16 13:29:53 +00:00
return -1;
}
2022-01-20 01:49:27 +00:00
2022-05-12 15:30:01 +00:00
taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("mnode-timer", "initialized");
2021-11-11 09:31:52 +00:00
return 0;
}
2021-11-29 04:09:18 +00:00
static void mndCleanupTimer(SMnode *pMnode) {
2022-05-12 15:30:01 +00:00
if (taosCheckPthreadValid(pMnode->thread)) {
taosThreadJoin(pMnode->thread, NULL);
taosThreadClear(&pMnode->thread);
2021-11-11 09:31:52 +00:00
}
}
2021-12-15 07:35:18 +00:00
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
2021-11-29 06:14:03 +00:00
pMnode->path = strdup(path);
if (pMnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
2021-11-29 11:35:42 +00:00
return -1;
2021-11-29 06:14:03 +00:00
}
if (taosMkDir(pMnode->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
2021-11-29 11:35:42 +00:00
return -1;
2021-11-29 04:09:18 +00:00
}
2021-11-29 06:14:03 +00:00
return 0;
2021-11-29 04:09:18 +00:00
}
2021-11-11 09:31:52 +00:00
static int32_t mndInitWal(SMnode *pMnode) {
char path[PATH_MAX + 20] = {0};
snprintf(path, sizeof(path), "%s%swal", pMnode->path, TD_DIRSEP);
SWalCfg cfg = {
.vgId = 1,
.fsyncPeriod = 0,
.rollPeriod = -1,
.segSize = -1,
.retentionPeriod = -1,
.retentionSize = -1,
.level = TAOS_WAL_FSYNC,
};
pMnode->pWal = walOpen(path, &cfg);
if (pMnode->pWal == NULL) {
mError("failed to open wal since %s", terrstr());
return -1;
}
return 0;
}
static void mndCloseWal(SMnode *pMnode) {
if (pMnode->pWal != NULL) {
walClose(pMnode->pWal);
pMnode->pWal = NULL;
}
}
2021-11-29 06:14:03 +00:00
static int32_t mndInitSdb(SMnode *pMnode) {
SSdbOpt opt = {0};
opt.path = pMnode->path;
2021-12-01 06:58:35 +00:00
opt.pMnode = pMnode;
opt.pWal = pMnode->pWal;
2021-11-20 16:13:35 +00:00
2021-11-29 11:35:42 +00:00
pMnode->pSdb = sdbInit(&opt);
2021-11-29 06:14:03 +00:00
if (pMnode->pSdb == NULL) {
2021-11-11 09:31:52 +00:00
return -1;
}
return 0;
}
static int32_t mndOpenSdb(SMnode *pMnode) {
if (!pMnode->deploy) {
return sdbReadFile(pMnode->pSdb);
} else {
return 0;
}
}
2021-11-29 06:14:03 +00:00
static void mndCleanupSdb(SMnode *pMnode) {
if (pMnode->pSdb) {
2021-11-29 11:35:42 +00:00
sdbCleanup(pMnode->pSdb);
2021-11-29 06:14:03 +00:00
pMnode->pSdb = NULL;
}
}
2021-11-29 04:09:18 +00:00
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
SMnodeStep step = {0};
step.name = name;
step.initFp = initFp;
step.cleanupFp = cleanupFp;
2021-11-29 11:35:42 +00:00
if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
2021-11-29 04:09:18 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
2021-11-11 09:31:52 +00:00
return 0;
}
static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-wal", mndInitWal, mndCloseWal) != 0) return -1;
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
2021-12-29 12:05:10 +00:00
if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
2022-06-10 07:39:11 +00:00
if (mndAllocStep(pMnode, "mnode-snode", mndInitSnode, mndCleanupSnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-bnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
2021-12-28 12:51:05 +00:00
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
2022-04-13 08:48:29 +00:00
if (mndAllocStep(pMnode, "mnode-grant", mndInitGrant, mndCleanupGrant) != 0) return -1;
2022-06-25 01:09:33 +00:00
if (mndAllocStep(pMnode, "mnode-privilege", mndInitPrivilege, mndCleanupPrivilege) != 0) return -1;
2021-12-28 12:51:05 +00:00
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
2022-03-10 12:48:38 +00:00
if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1;
2021-12-28 12:51:05 +00:00
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
2022-01-20 01:49:27 +00:00
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
2022-02-23 08:04:06 +00:00
if (mndAllocStep(pMnode, "mnode-offset", mndInitOffset, mndCleanupOffset) != 0) return -1;
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
2021-12-10 07:20:04 +00:00
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sma", mndInitSma, mndCleanupSma) != 0) return -1;
2022-02-24 11:45:05 +00:00
if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1;
2022-04-13 02:48:26 +00:00
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
2021-12-28 12:51:05 +00:00
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
2022-03-25 05:27:14 +00:00
if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
2021-11-11 09:31:52 +00:00
return 0;
}
2021-11-29 04:09:18 +00:00
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
2021-11-29 11:35:42 +00:00
if (pMnode->pSteps == NULL) return;
2021-11-29 04:09:18 +00:00
if (pos == -1) {
2021-12-01 10:59:38 +00:00
pos = taosArrayGetSize(pMnode->pSteps) - 1;
2021-11-11 09:31:52 +00:00
}
2021-11-29 04:09:18 +00:00
for (int32_t s = pos; s >= 0; s--) {
2021-12-01 10:59:38 +00:00
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
2022-01-04 07:20:29 +00:00
mDebug("%s will cleanup", pStep->name);
2021-11-29 04:09:18 +00:00
if (pStep->cleanupFp != NULL) {
(*pStep->cleanupFp)(pMnode);
}
2021-11-11 09:31:52 +00:00
}
2021-11-29 11:35:42 +00:00
taosArrayClear(pMnode->pSteps);
2021-12-01 16:02:01 +00:00
taosArrayDestroy(pMnode->pSteps);
2021-11-29 11:35:42 +00:00
pMnode->pSteps = NULL;
2021-11-29 04:09:18 +00:00
}
2021-11-11 09:31:52 +00:00
2021-11-29 04:09:18 +00:00
static int32_t mndExecSteps(SMnode *pMnode) {
2021-11-29 11:35:42 +00:00
int32_t size = taosArrayGetSize(pMnode->pSteps);
2021-11-29 04:09:18 +00:00
for (int32_t pos = 0; pos < size; pos++) {
2021-11-29 11:35:42 +00:00
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
2021-11-29 04:09:18 +00:00
if (pStep->initFp == NULL) continue;
2021-11-11 09:31:52 +00:00
2021-11-29 11:35:42 +00:00
if ((*pStep->initFp)(pMnode) != 0) {
2021-12-23 09:47:21 +00:00
int32_t code = terrno;
2022-01-04 07:20:29 +00:00
mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
2021-11-29 04:09:18 +00:00
mndCleanupSteps(pMnode, pos);
2021-12-23 09:47:21 +00:00
terrno = code;
2021-11-29 11:35:42 +00:00
return -1;
2021-11-20 16:13:35 +00:00
} else {
2022-01-04 07:20:29 +00:00
mDebug("%s is initialized", pStep->name);
2022-04-19 11:43:55 +00:00
tmsgReportStartup(pStep->name, "initialized");
2021-11-11 09:31:52 +00:00
}
}
2021-11-29 11:35:42 +00:00
2022-04-01 08:36:36 +00:00
pMnode->clusterId = mndGetClusterId(pMnode);
2021-11-29 11:35:42 +00:00
return 0;
2021-11-29 04:09:18 +00:00
}
2021-11-11 09:31:52 +00:00
2022-04-01 08:36:36 +00:00
static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
2022-03-21 11:08:25 +00:00
pMnode->msgCb = pOption->msgCb;
2022-05-25 08:41:38 +00:00
pMnode->selfDnodeId = pOption->dnodeId;
2022-06-14 02:58:36 +00:00
pMnode->syncMgmt.replica = pOption->replica;
2022-05-24 14:18:11 +00:00
pMnode->syncMgmt.standby = pOption->standby;
2021-12-24 06:37:06 +00:00
}
2021-11-29 06:14:03 +00:00
2021-11-29 04:09:18 +00:00
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
2021-11-29 11:35:42 +00:00
mDebug("start to open mnode in %s", path);
2022-03-25 16:29:53 +00:00
SMnode *pMnode = taosMemoryCalloc(1, sizeof(SMnode));
2021-11-29 11:35:42 +00:00
if (pMnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to open mnode since %s", terrstr());
return NULL;
}
2021-12-20 06:41:12 +00:00
char timestr[24] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
2022-04-01 08:36:36 +00:00
mndSetOptions(pMnode, pOption);
2021-12-20 06:41:12 +00:00
pMnode->deploy = pOption->deploy;
2021-11-29 11:35:42 +00:00
pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
if (pMnode->pSteps == NULL) {
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMnode);
2021-11-29 11:35:42 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to open mnode since %s", terrstr());
return NULL;
}
2021-11-29 04:09:18 +00:00
2021-12-15 07:35:18 +00:00
int32_t code = mndCreateDir(pMnode, path);
2021-12-01 10:59:38 +00:00
if (code != 0) {
2021-12-02 06:40:03 +00:00
code = terrno;
mError("failed to open mnode since %s", terrstr());
2021-11-29 06:14:03 +00:00
mndClose(pMnode);
terrno = code;
return NULL;
}
code = mndInitSteps(pMnode);
2021-11-29 04:09:18 +00:00
if (code != 0) {
2021-12-02 06:40:03 +00:00
code = terrno;
mError("failed to open mnode since %s", terrstr());
2021-11-29 04:09:18 +00:00
mndClose(pMnode);
terrno = code;
return NULL;
}
code = mndExecSteps(pMnode);
if (code != 0) {
2021-12-02 06:40:03 +00:00
code = terrno;
mError("failed to open mnode since %s", terrstr());
2021-11-29 04:09:18 +00:00
mndClose(pMnode);
terrno = code;
return NULL;
}
2021-11-11 09:31:52 +00:00
2021-11-29 11:35:42 +00:00
mDebug("mnode open successfully ");
2021-11-20 16:13:35 +00:00
return pMnode;
}
2021-11-11 09:31:52 +00:00
void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) {
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
syncLeaderTransfer(pMnode->syncMgmt.sync);
/*
mDebug("vgId:1, mnode start leader transfer");
// wait for leader transfer finish
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
taosMsleep(10);
mDebug("vgId:1, mnode waiting for leader transfer");
}
mDebug("vgId:1, mnode finish leader transfer");
*/
}
}
2021-11-29 04:09:18 +00:00
void mndClose(SMnode *pMnode) {
2021-11-29 11:35:42 +00:00
if (pMnode != NULL) {
mDebug("start to close mnode");
mndCleanupSteps(pMnode, -1);
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(pMnode->path);
taosMemoryFreeClear(pMnode);
2021-11-29 11:35:42 +00:00
mDebug("mnode is closed");
}
2021-11-29 04:09:18 +00:00
}
2021-11-11 09:31:52 +00:00
2022-05-22 08:42:44 +00:00
int32_t mndStart(SMnode *pMnode) {
2022-05-23 05:05:35 +00:00
mndSyncStart(pMnode);
2022-05-25 02:30:02 +00:00
if (pMnode->deploy) {
2022-05-28 04:56:33 +00:00
if (sdbDeploy(pMnode->pSdb) != 0) {
mError("failed to deploy sdb while start mnode");
return -1;
}
mndSetRestore(pMnode, true);
}
2022-07-11 09:19:54 +00:00
2022-07-16 07:11:50 +00:00
grantReset(pMnode, TSDB_GRANT_ALL, 0);
2022-07-11 09:19:54 +00:00
2022-05-21 13:26:27 +00:00
return mndInitTimer(pMnode);
}
2022-05-23 05:05:35 +00:00
void mndStop(SMnode *pMnode) {
2022-05-28 04:56:33 +00:00
mndSetStop(pMnode);
2022-05-23 05:05:35 +00:00
mndSyncStop(pMnode);
2022-05-28 04:56:33 +00:00
mndCleanupTimer(pMnode);
2022-05-21 13:26:27 +00:00
}
2022-05-21 14:35:02 +00:00
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
2022-07-04 06:55:26 +00:00
SMnode *pMnode = pMsg->info.node;
2022-05-24 03:26:17 +00:00
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
2022-06-15 08:14:17 +00:00
int32_t code = 0;
2022-05-21 14:35:02 +00:00
2022-05-24 03:26:17 +00:00
if (!syncEnvIsStart()) {
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
2022-06-15 08:14:17 +00:00
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
2022-05-24 03:26:17 +00:00
}
2022-05-21 14:35:02 +00:00
2022-05-24 03:26:17 +00:00
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
if (pSyncNode == NULL) {
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
2022-06-15 08:14:17 +00:00
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
2022-05-28 04:56:33 +00:00
}
2022-06-20 09:48:56 +00:00
do {
2022-07-04 06:55:26 +00:00
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
2022-06-20 09:48:56 +00:00
static int64_t mndTick = 0;
if (++mndTick % 10 == 1) {
2022-06-28 06:23:45 +00:00
mTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
2022-06-20 09:48:56 +00:00
}
if (gRaftDetailLog) {
char logBuf[512] = {0};
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg);
}
taosMemoryFree(syncNodeStr);
} while (0);
2022-05-24 03:26:17 +00:00
// ToDo: ugly! use function pointer
2022-07-04 07:57:28 +00:00
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
2022-05-30 05:14:48 +00:00
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
2022-05-30 05:14:48 +00:00
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
2022-05-30 05:14:48 +00:00
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
2022-05-30 05:14:48 +00:00
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
2022-06-25 12:31:42 +00:00
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
2022-05-30 05:14:48 +00:00
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
2022-05-30 05:14:48 +00:00
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
2022-05-30 05:14:48 +00:00
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteReplySnapshotCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
2022-05-30 05:14:48 +00:00
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesSnapshotCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
2022-05-30 05:14:48 +00:00
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
2022-06-14 09:19:10 +00:00
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
2022-05-30 05:14:48 +00:00
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
2022-06-15 08:14:17 +00:00
code = -1;
2022-05-30 05:14:48 +00:00
}
2022-05-21 14:35:02 +00:00
} else {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
2022-05-30 05:14:48 +00:00
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING) {
2022-05-30 05:14:48 +00:00
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
2022-05-30 05:14:48 +00:00
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
2022-05-30 05:14:48 +00:00
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
2022-06-25 12:31:42 +00:00
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
2022-05-30 05:14:48 +00:00
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
2022-05-30 05:14:48 +00:00
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
2022-05-30 05:14:48 +00:00
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
2022-05-30 05:14:48 +00:00
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
2022-05-30 05:14:48 +00:00
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
2022-06-14 09:19:10 +00:00
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
2022-05-30 05:14:48 +00:00
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
2022-06-15 08:14:17 +00:00
code = -1;
2022-05-30 05:14:48 +00:00
}
2022-05-21 14:35:02 +00:00
}
2022-07-05 09:32:51 +00:00
syncNodeRelease(pSyncNode);
2022-06-15 08:14:17 +00:00
if (code != 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
}
2022-05-24 03:26:17 +00:00
return code;
2022-05-21 14:35:02 +00:00
}
2022-05-28 04:56:33 +00:00
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
2022-06-16 13:31:45 +00:00
if (!IsReq(pMsg)) return 0;
2022-06-30 02:30:36 +00:00
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
2022-07-11 07:16:10 +00:00
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
2022-06-27 10:29:28 +00:00
return 0;
}
2022-05-28 04:56:33 +00:00
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
2022-06-21 07:05:31 +00:00
if (pMsg->msgType == TDMT_MND_MQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
2022-06-24 08:47:28 +00:00
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER) {
2022-06-21 07:05:31 +00:00
return -1;
}
2021-11-29 05:19:00 +00:00
2022-06-21 07:05:31 +00:00
SEpSet epSet = {0};
mndGetMnodeEpSet(pMsg->info.node, &epSet);
2022-06-24 09:00:00 +00:00
const STraceId *trace = &pMsg->info.traceId;
mError("msg:%p, failed to check mnode state since %s, type:%s, numOfMnodes:%d inUse:%d", pMsg, terrstr(),
TMSG_INFO(pMsg->msgType), epSet.numOfEps, epSet.inUse);
2022-06-24 11:03:56 +00:00
if (epSet.numOfEps > 0) {
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
mInfo("mnode index:%d, ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);
}
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->info.rsp = rpcMallocCont(contLen);
2022-06-30 02:30:36 +00:00
pMsg->info.hasEpSet = 1;
2022-06-24 11:03:56 +00:00
if (pMsg->info.rsp != NULL) {
tSerializeSEpSet(pMsg->info.rsp, contLen, &epSet);
pMsg->info.rspLen = contLen;
terrno = TSDB_CODE_RPC_REDIRECT;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
}
2022-06-21 07:05:31 +00:00
} else {
2022-06-24 11:03:56 +00:00
terrno = TSDB_CODE_APP_NOT_READY;
2021-11-20 16:13:35 +00:00
}
2022-06-25 01:01:16 +00:00
return -1;
}
2022-05-28 04:56:33 +00:00
static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0;
2022-06-24 09:00:00 +00:00
2022-06-21 07:05:31 +00:00
const STraceId *trace = &pMsg->info.traceId;
mGError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen,
2022-06-24 09:00:00 +00:00
pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_INVALID_MSG_LEN;
return -1;
}
2022-05-28 04:56:33 +00:00
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
2022-07-04 06:55:26 +00:00
SMnode *pMnode = pMsg->info.node;
2022-06-21 07:05:31 +00:00
const STraceId *trace = &pMsg->info.traceId;
2022-05-16 06:55:31 +00:00
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
2021-11-19 06:46:06 +00:00
if (fp == NULL) {
2022-06-21 07:05:31 +00:00
mGError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
2022-03-11 08:46:29 +00:00
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
2021-11-11 09:31:52 +00:00
}
2022-05-28 04:56:33 +00:00
if (mndCheckMsgContent(pMsg) != 0) return -1;
if (mndCheckMnodeState(pMsg) != 0) return -1;
2022-06-18 09:23:55 +00:00
mGTrace("msg:%p, start to process in mnode, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
2022-03-11 08:46:29 +00:00
int32_t code = (*fp)(pMsg);
2022-05-28 04:56:33 +00:00
mndReleaseRpcRef(pMnode);
2022-05-21 08:35:24 +00:00
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
2022-06-21 07:05:31 +00:00
mGTrace("msg:%p, won't response immediately since in progress", pMsg);
} else if (code == 0) {
2022-06-21 07:05:31 +00:00
mGTrace("msg:%p, successfully processed", pMsg);
2021-12-01 10:59:38 +00:00
} else {
2022-06-21 07:05:31 +00:00
mGError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
2021-11-29 05:19:00 +00:00
}
2022-05-28 04:56:33 +00:00
2022-03-11 08:46:29 +00:00
return code;
2021-11-11 09:31:52 +00:00
}
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
tmsg_t type = TMSG_INDEX(msgType);
if (type >= 0 && type < TDMT_MAX) {
pMnode->msgFp[type] = fp;
2021-11-11 09:31:52 +00:00
}
}
2022-02-24 11:45:05 +00:00
// Note: uid 0 is reserved
int64_t mndGenerateUid(const char *name, int32_t len) {
2022-03-02 09:48:39 +00:00
int32_t hashval = MurmurHash3_32(name, len);
2022-02-24 11:45:05 +00:00
do {
2022-03-24 08:18:08 +00:00
int64_t us = taosGetTimestampUs();
2022-03-23 09:53:00 +00:00
int64_t x = (us & 0x000000FFFFFFFFFF) << 24;
int64_t uuid = x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
2022-02-24 11:45:05 +00:00
if (uuid) {
2022-03-24 08:18:08 +00:00
return llabs(uuid);
2022-02-24 11:45:05 +00:00
}
} while (true);
2021-12-24 06:37:06 +00:00
}
2022-03-03 12:23:53 +00:00
int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
2022-06-23 13:09:50 +00:00
SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
2022-05-28 04:56:33 +00:00
if (mndAcquireRpcRef(pMnode) != 0) return -1;
2022-03-04 02:15:48 +00:00
2022-07-04 06:55:26 +00:00
SSdb *pSdb = pMnode->pSdb;
2022-03-03 12:23:53 +00:00
int64_t ms = taosGetTimestampMs();
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
pClusterInfo->mnodes = taosArrayInit(sdbGetSize(pSdb, SDB_MNODE), sizeof(SMonMnodeDesc));
pVgroupInfo->vgroups = taosArrayInit(sdbGetSize(pSdb, SDB_VGROUP), sizeof(SMonVgroupDesc));
2022-06-23 13:09:50 +00:00
pStbInfo->stbs = taosArrayInit(sdbGetSize(pSdb, SDB_STB), sizeof(SMonStbDesc));
if (pClusterInfo->dnodes == NULL || pClusterInfo->mnodes == NULL || pVgroupInfo->vgroups == NULL ||
pStbInfo->stbs == NULL) {
2022-05-28 04:56:33 +00:00
mndReleaseRpcRef(pMnode);
2022-03-03 12:23:53 +00:00
return -1;
}
// cluster info
2022-06-17 07:55:59 +00:00
tstrncpy(pClusterInfo->version, version, sizeof(pClusterInfo->version));
2022-03-03 12:23:53 +00:00
pClusterInfo->monitor_interval = tsMonitorInterval;
pClusterInfo->connections_total = mndGetNumOfConnections(pMnode);
2022-06-23 10:23:51 +00:00
pClusterInfo->dbs_total = sdbGetSize(pSdb, SDB_DB);
pClusterInfo->stbs_total = sdbGetSize(pSdb, SDB_STB);
2022-03-03 12:23:53 +00:00
void *pIter = NULL;
while (1) {
SDnodeObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
SMonDnodeDesc desc = {0};
desc.dnode_id = pObj->id;
tstrncpy(desc.dnode_ep, pObj->ep, sizeof(desc.dnode_ep));
2022-06-06 03:49:30 +00:00
if (mndIsDnodeOnline(pObj, ms)) {
2022-03-03 12:23:53 +00:00
tstrncpy(desc.status, "ready", sizeof(desc.status));
} else {
tstrncpy(desc.status, "offline", sizeof(desc.status));
}
taosArrayPush(pClusterInfo->dnodes, &desc);
sdbRelease(pSdb, pObj);
}
pIter = NULL;
while (1) {
SMnodeObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
if (pIter == NULL) break;
SMonMnodeDesc desc = {0};
desc.mnode_id = pObj->id;
tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep));
2022-05-25 08:41:38 +00:00
if (pObj->id == pMnode->selfDnodeId) {
2022-03-03 12:23:53 +00:00
pClusterInfo->first_ep_dnode_id = pObj->id;
tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep));
2022-05-25 08:41:38 +00:00
pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f);
tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role));
} else {
tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role));
2022-03-03 12:23:53 +00:00
}
2022-05-25 08:41:38 +00:00
taosArrayPush(pClusterInfo->mnodes, &desc);
sdbRelease(pSdb, pObj);
2022-03-03 12:23:53 +00:00
}
// vgroup info
pIter = NULL;
while (1) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
pClusterInfo->vgroups_total++;
2022-06-23 10:23:51 +00:00
pClusterInfo->tbs_total += pVgroup->numOfTables;
2022-03-03 12:23:53 +00:00
SMonVgroupDesc desc = {0};
desc.vgroup_id = pVgroup->vgId;
SName name = {0};
tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameGetDbName(&name, desc.database_name);
2022-03-03 12:23:53 +00:00
desc.tables_num = pVgroup->numOfTables;
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
tstrncpy(desc.status, "unsynced", sizeof(desc.status));
for (int32_t i = 0; i < pVgroup->replica; ++i) {
2022-07-04 06:55:26 +00:00
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
2022-03-03 12:23:53 +00:00
SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
pVnDesc->dnode_id = pVgid->dnodeId;
2022-04-19 13:39:42 +00:00
tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
2022-03-03 12:23:53 +00:00
if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
tstrncpy(desc.status, "ready", sizeof(desc.status));
pClusterInfo->vgroups_alive++;
}
2022-05-18 13:25:21 +00:00
if (pVgid->role != TAOS_SYNC_STATE_ERROR) {
2022-03-03 12:23:53 +00:00
pClusterInfo->vnodes_alive++;
}
pClusterInfo->vnodes_total++;
}
taosArrayPush(pVgroupInfo->vgroups, &desc);
sdbRelease(pSdb, pVgroup);
}
2022-06-23 13:09:50 +00:00
// stb info
pIter = NULL;
while (1) {
SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
if (pIter == NULL) break;
2022-06-24 09:00:00 +00:00
SMonStbDesc desc = {0};
2022-06-23 13:09:50 +00:00
SName name1 = {0};
tNameFromString(&name1, pStb->db, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameGetDbName(&name1, desc.database_name);
SName name2 = {0};
tNameFromString(&name2, pStb->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tstrncpy(desc.stb_name, tNameGetTableName(&name2), TSDB_TABLE_NAME_LEN);
taosArrayPush(pStbInfo->stbs, &desc);
sdbRelease(pSdb, pStb);
}
2022-03-03 12:23:53 +00:00
// grant info
pGrantInfo->expire_time = (pMnode->grant.expireTimeMS - ms) / 86400000.0f;
pGrantInfo->timeseries_total = pMnode->grant.timeseriesAllowed;
if (pMnode->grant.expireTimeMS == 0) {
pGrantInfo->expire_time = INT32_MAX;
pGrantInfo->timeseries_total = INT32_MAX;
}
2022-05-28 04:56:33 +00:00
mndReleaseRpcRef(pMnode);
2022-03-03 12:23:53 +00:00
return 0;
2022-03-08 09:22:21 +00:00
}
2022-04-19 13:39:42 +00:00
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
2022-05-25 08:41:38 +00:00
pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
2022-06-16 04:01:13 +00:00
mTrace("mnode current syncstate is %s", syncStr(pLoad->syncState));
2022-04-19 13:39:42 +00:00
return 0;
2022-04-27 09:27:36 +00:00
}
2022-05-28 04:56:33 +00:00
int32_t mndAcquireRpcRef(SMnode *pMnode) {
int32_t code = 0;
taosThreadRwlockRdlock(&pMnode->lock);
if (pMnode->stopped) {
terrno = TSDB_CODE_APP_NOT_READY;
code = -1;
} else if (!mndIsMaster(pMnode)) {
code = -1;
} else {
int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1);
2022-06-04 03:30:57 +00:00
// mTrace("mnode rpc is acquired, ref:%d", ref);
2022-05-28 04:56:33 +00:00
}
taosThreadRwlockUnlock(&pMnode->lock);
return code;
}
void mndReleaseRpcRef(SMnode *pMnode) {
taosThreadRwlockRdlock(&pMnode->lock);
int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1);
2022-06-04 03:30:57 +00:00
// mTrace("mnode rpc is released, ref:%d", ref);
2022-05-28 04:56:33 +00:00
taosThreadRwlockUnlock(&pMnode->lock);
}
void mndSetRestore(SMnode *pMnode, bool restored) {
if (restored) {
taosThreadRwlockWrlock(&pMnode->lock);
pMnode->restored = true;
taosThreadRwlockUnlock(&pMnode->lock);
mTrace("mnode set restored:%d", restored);
} else {
taosThreadRwlockWrlock(&pMnode->lock);
pMnode->restored = false;
taosThreadRwlockUnlock(&pMnode->lock);
mTrace("mnode set restored:%d", restored);
while (1) {
if (pMnode->rpcRef <= 0) break;
taosMsleep(3);
}
}
}
bool mndGetRestored(SMnode *pMnode) { return pMnode->restored; }
void mndSetStop(SMnode *pMnode) {
taosThreadRwlockWrlock(&pMnode->lock);
pMnode->stopped = true;
taosThreadRwlockUnlock(&pMnode->lock);
mTrace("mnode set stopped");
}
bool mndGetStop(SMnode *pMnode) { return pMnode->stopped; }