TDengine/source/dnode/mnode/impl/src/mnode.c

489 lines
15 KiB
C
Raw Normal View History

2021-11-11 09:31:52 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
2021-11-27 14:56:18 +00:00
#include "mndAcct.h"
#include "mndAuth.h"
2021-12-29 12:05:10 +00:00
#include "mndBnode.h"
2021-11-27 14:56:18 +00:00
#include "mndCluster.h"
2022-01-20 01:49:27 +00:00
#include "mndConsumer.h"
2021-11-27 14:56:18 +00:00
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndMnode.h"
#include "mndProfile.h"
2021-12-29 12:05:10 +00:00
#include "mndQnode.h"
2021-11-27 14:56:18 +00:00
#include "mndShow.h"
2021-12-29 12:05:10 +00:00
#include "mndSnode.h"
2021-12-10 07:21:34 +00:00
#include "mndStb.h"
2022-01-20 01:49:27 +00:00
#include "mndSubscribe.h"
2021-11-27 14:56:18 +00:00
#include "mndSync.h"
#include "mndTelem.h"
2021-12-24 06:37:06 +00:00
#include "mndTopic.h"
2021-11-27 14:56:18 +00:00
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
2021-11-11 09:31:52 +00:00
2022-01-04 11:44:44 +00:00
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
terrno = TSDB_CODE_MND_NOT_READY;
return -1;
2021-11-29 04:09:18 +00:00
}
2022-01-04 11:44:44 +00:00
return (*pMnode->sendReqToDnodeFp)(pMnode->pDnode, pEpSet, pMsg);
2021-11-19 09:15:24 +00:00
}
2021-11-11 09:31:52 +00:00
2022-01-04 11:44:44 +00:00
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
if (pMnode == NULL || pMnode->sendReqToDnodeFp == NULL) {
terrno = TSDB_CODE_MND_NOT_READY;
return -1;
2021-11-29 04:09:18 +00:00
}
2022-01-04 11:44:44 +00:00
return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg);
2021-11-19 09:15:24 +00:00
}
2021-11-11 09:31:52 +00:00
2022-01-04 11:44:44 +00:00
void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendRedirectRspFp != NULL) {
(*pMnode->sendRedirectRspFp)(pMnode->pDnode, pMsg);
2021-11-29 04:09:18 +00:00
}
2021-11-19 09:15:24 +00:00
}
2021-11-11 09:31:52 +00:00
2021-12-26 15:46:22 +00:00
static void mndTransReExecute(void *param, void *tmrId) {
SMnode *pMnode = param;
if (mndIsMaster(pMnode)) {
2022-01-10 12:44:11 +00:00
STransReq *pMsg = rpcMallocCont(sizeof(STransReq));
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS, .pCont = pMsg, .contLen = sizeof(STransReq)};
2022-01-04 11:44:44 +00:00
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
2021-12-26 15:46:22 +00:00
}
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
}
2022-01-20 01:49:27 +00:00
static void mndCalMqRebalance(void *param, void *tmrId) {
SMnode *pMnode = param;
2022-01-18 08:02:12 +00:00
if (mndIsMaster(pMnode)) {
2022-01-20 01:49:27 +00:00
SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg));
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
2022-01-18 08:02:12 +00:00
}
2022-01-20 01:49:27 +00:00
taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer);
2022-01-18 08:02:12 +00:00
}
2021-11-29 04:09:18 +00:00
static int32_t mndInitTimer(SMnode *pMnode) {
if (pMnode->timer == NULL) {
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
2021-11-11 09:31:52 +00:00
}
2021-11-29 04:09:18 +00:00
if (pMnode->timer == NULL) {
2021-11-29 11:35:42 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2021-11-11 09:31:52 +00:00
return -1;
}
2022-01-04 11:44:44 +00:00
if (taosTmrReset(mndTransReExecute, 6000, pMnode, pMnode->timer, &pMnode->transTimer)) {
2021-12-26 15:46:22 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
2022-01-20 01:49:27 +00:00
if (taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
2021-11-11 09:31:52 +00:00
return 0;
}
2021-11-29 04:09:18 +00:00
static void mndCleanupTimer(SMnode *pMnode) {
if (pMnode->timer != NULL) {
2021-12-26 15:46:22 +00:00
taosTmrStop(pMnode->transTimer);
pMnode->transTimer = NULL;
2022-01-20 01:49:27 +00:00
taosTmrStop(pMnode->mqTimer);
pMnode->mqTimer = NULL;
2021-11-29 04:09:18 +00:00
taosTmrCleanUp(pMnode->timer);
pMnode->timer = NULL;
2021-11-11 09:31:52 +00:00
}
}
2021-12-15 07:35:18 +00:00
static int32_t mndCreateDir(SMnode *pMnode, const char *path) {
2021-11-29 06:14:03 +00:00
pMnode->path = strdup(path);
if (pMnode->path == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
2021-11-29 11:35:42 +00:00
return -1;
2021-11-29 06:14:03 +00:00
}
if (taosMkDir(pMnode->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
2021-11-29 11:35:42 +00:00
return -1;
2021-11-29 04:09:18 +00:00
}
2021-11-29 06:14:03 +00:00
return 0;
2021-11-29 04:09:18 +00:00
}
2021-11-11 09:31:52 +00:00
2021-11-29 06:14:03 +00:00
static int32_t mndInitSdb(SMnode *pMnode) {
SSdbOpt opt = {0};
opt.path = pMnode->path;
2021-12-01 06:58:35 +00:00
opt.pMnode = pMnode;
2021-11-20 16:13:35 +00:00
2021-11-29 11:35:42 +00:00
pMnode->pSdb = sdbInit(&opt);
2021-11-29 06:14:03 +00:00
if (pMnode->pSdb == NULL) {
2021-11-11 09:31:52 +00:00
return -1;
}
return 0;
}
2021-11-29 06:14:03 +00:00
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
2021-11-29 11:35:42 +00:00
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
2021-11-29 06:14:03 +00:00
static void mndCleanupSdb(SMnode *pMnode) {
if (pMnode->pSdb) {
2021-11-29 11:35:42 +00:00
sdbCleanup(pMnode->pSdb);
2021-11-29 06:14:03 +00:00
pMnode->pSdb = NULL;
}
}
2021-11-29 04:09:18 +00:00
static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCleanupFp cleanupFp) {
SMnodeStep step = {0};
step.name = name;
step.initFp = initFp;
step.cleanupFp = cleanupFp;
2021-11-29 11:35:42 +00:00
if (taosArrayPush(pMnode->pSteps, &step) == NULL) {
2021-11-29 04:09:18 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
2021-11-11 09:31:52 +00:00
return 0;
}
2021-11-29 04:09:18 +00:00
static int32_t mndInitSteps(SMnode *pMnode) {
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-mnode", mndInitMnode, mndCleanupMnode) != 0) return -1;
2021-12-29 12:05:10 +00:00
if (mndAllocStep(pMnode, "mnode-qnode", mndInitQnode, mndCleanupQnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-qnode", mndInitSnode, mndCleanupSnode) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-qnode", mndInitBnode, mndCleanupBnode) != 0) return -1;
2021-12-28 12:51:05 +00:00
if (mndAllocStep(pMnode, "mnode-dnode", mndInitDnode, mndCleanupDnode) != 0) return -1;
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1;
2021-12-28 12:51:05 +00:00
if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
2022-01-20 01:49:27 +00:00
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
2021-12-10 07:20:04 +00:00
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
2021-12-28 12:51:05 +00:00
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
2021-11-29 06:14:03 +00:00
if (pMnode->clusterId <= 0) {
2021-11-29 11:35:42 +00:00
if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
} else {
if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
}
if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
2021-11-11 09:31:52 +00:00
return 0;
}
2021-11-29 04:09:18 +00:00
static void mndCleanupSteps(SMnode *pMnode, int32_t pos) {
2021-11-29 11:35:42 +00:00
if (pMnode->pSteps == NULL) return;
2021-11-29 04:09:18 +00:00
if (pos == -1) {
2021-12-01 10:59:38 +00:00
pos = taosArrayGetSize(pMnode->pSteps) - 1;
2021-11-11 09:31:52 +00:00
}
2021-11-29 04:09:18 +00:00
for (int32_t s = pos; s >= 0; s--) {
2021-12-01 10:59:38 +00:00
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, s);
2022-01-04 07:20:29 +00:00
mDebug("%s will cleanup", pStep->name);
2021-11-29 04:09:18 +00:00
if (pStep->cleanupFp != NULL) {
(*pStep->cleanupFp)(pMnode);
}
2021-11-11 09:31:52 +00:00
}
2021-11-29 11:35:42 +00:00
taosArrayClear(pMnode->pSteps);
2021-12-01 16:02:01 +00:00
taosArrayDestroy(pMnode->pSteps);
2021-11-29 11:35:42 +00:00
pMnode->pSteps = NULL;
2021-11-29 04:09:18 +00:00
}
2021-11-11 09:31:52 +00:00
2021-11-29 04:09:18 +00:00
static int32_t mndExecSteps(SMnode *pMnode) {
2021-11-29 11:35:42 +00:00
int32_t size = taosArrayGetSize(pMnode->pSteps);
2021-11-29 04:09:18 +00:00
for (int32_t pos = 0; pos < size; pos++) {
2021-11-29 11:35:42 +00:00
SMnodeStep *pStep = taosArrayGet(pMnode->pSteps, pos);
2021-11-29 04:09:18 +00:00
if (pStep->initFp == NULL) continue;
2021-11-11 09:31:52 +00:00
2021-11-29 11:35:42 +00:00
if ((*pStep->initFp)(pMnode) != 0) {
2021-12-23 09:47:21 +00:00
int32_t code = terrno;
2022-01-04 07:20:29 +00:00
mError("%s exec failed since %s, start to cleanup", pStep->name, terrstr());
2021-11-29 04:09:18 +00:00
mndCleanupSteps(pMnode, pos);
2021-12-23 09:47:21 +00:00
terrno = code;
2021-11-29 11:35:42 +00:00
return -1;
2021-11-20 16:13:35 +00:00
} else {
2022-01-04 07:20:29 +00:00
mDebug("%s is initialized", pStep->name);
2021-11-11 09:31:52 +00:00
}
}
2021-11-29 11:35:42 +00:00
return 0;
2021-11-29 04:09:18 +00:00
}
2021-11-11 09:31:52 +00:00
2021-11-29 06:14:03 +00:00
static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->dnodeId = pOption->dnodeId;
pMnode->clusterId = pOption->clusterId;
pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pDnode = pOption->pDnode;
2022-01-04 11:44:44 +00:00
pMnode->putReqToMWriteQFp = pOption->putReqToMWriteQFp;
pMnode->sendReqToDnodeFp = pOption->sendReqToDnodeFp;
pMnode->sendReqToMnodeFp = pOption->sendReqToMnodeFp;
pMnode->sendRedirectRspFp = pOption->sendRedirectRspFp;
2021-12-06 11:57:13 +00:00
pMnode->cfg.sver = pOption->cfg.sver;
pMnode->cfg.enableTelem = pOption->cfg.enableTelem;
pMnode->cfg.statusInterval = pOption->cfg.statusInterval;
pMnode->cfg.shellActivityTimer = pOption->cfg.shellActivityTimer;
pMnode->cfg.timezone = strdup(pOption->cfg.timezone);
pMnode->cfg.locale = strdup(pOption->cfg.locale);
pMnode->cfg.charset = strdup(pOption->cfg.charset);
pMnode->cfg.gitinfo = strdup(pOption->cfg.gitinfo);
pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo);
2021-11-29 06:14:03 +00:00
2022-01-04 11:44:44 +00:00
if (pMnode->sendReqToDnodeFp == NULL || pMnode->sendReqToMnodeFp == NULL || pMnode->sendRedirectRspFp == NULL ||
pMnode->putReqToMWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 ||
pMnode->cfg.statusInterval < 1) {
2021-12-02 06:40:03 +00:00
terrno = TSDB_CODE_MND_INVALID_OPTIONS;
2021-11-29 11:35:42 +00:00
return -1;
}
2021-12-06 11:57:13 +00:00
if (pMnode->cfg.timezone == NULL || pMnode->cfg.locale == NULL || pMnode->cfg.charset == NULL) {
2021-11-30 11:42:51 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2021-11-29 11:35:42 +00:00
return -1;
2021-11-29 06:14:03 +00:00
}
return 0;
2021-12-24 06:37:06 +00:00
}
2021-11-29 06:14:03 +00:00
2021-11-29 04:09:18 +00:00
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
2021-11-29 11:35:42 +00:00
mDebug("start to open mnode in %s", path);
2021-11-29 04:09:18 +00:00
SMnode *pMnode = calloc(1, sizeof(SMnode));
2021-11-29 11:35:42 +00:00
if (pMnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to open mnode since %s", terrstr());
return NULL;
}
2021-12-20 06:41:12 +00:00
char timestr[24] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
2021-11-29 11:35:42 +00:00
pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
if (pMnode->pSteps == NULL) {
free(pMnode);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to open mnode since %s", terrstr());
return NULL;
}
2021-11-29 04:09:18 +00:00
2021-12-15 07:35:18 +00:00
int32_t code = mndCreateDir(pMnode, path);
2021-12-01 10:59:38 +00:00
if (code != 0) {
2021-12-02 06:40:03 +00:00
code = terrno;
mError("failed to open mnode since %s", terrstr());
2021-11-29 06:14:03 +00:00
mndClose(pMnode);
terrno = code;
return NULL;
}
code = mndSetOptions(pMnode, pOption);
2021-11-29 04:09:18 +00:00
if (code != 0) {
2021-12-02 06:40:03 +00:00
code = terrno;
mError("failed to open mnode since %s", terrstr());
2021-11-29 04:09:18 +00:00
mndClose(pMnode);
terrno = code;
return NULL;
}
code = mndInitSteps(pMnode);
if (code != 0) {
2021-12-02 06:40:03 +00:00
code = terrno;
mError("failed to open mnode since %s", terrstr());
2021-11-29 04:09:18 +00:00
mndClose(pMnode);
terrno = code;
return NULL;
}
code = mndExecSteps(pMnode);
if (code != 0) {
2021-12-02 06:40:03 +00:00
code = terrno;
mError("failed to open mnode since %s", terrstr());
2021-11-29 04:09:18 +00:00
mndClose(pMnode);
terrno = code;
return NULL;
}
2021-11-11 09:31:52 +00:00
2021-11-29 11:35:42 +00:00
mDebug("mnode open successfully ");
2021-11-20 16:13:35 +00:00
return pMnode;
}
2021-11-11 09:31:52 +00:00
2021-11-29 04:09:18 +00:00
void mndClose(SMnode *pMnode) {
2021-11-29 11:35:42 +00:00
if (pMnode != NULL) {
mDebug("start to close mnode");
mndCleanupSteps(pMnode, -1);
tfree(pMnode->path);
2021-12-06 11:57:13 +00:00
tfree(pMnode->cfg.charset);
tfree(pMnode->cfg.locale);
tfree(pMnode->cfg.timezone);
tfree(pMnode->cfg.gitinfo);
tfree(pMnode->cfg.buildinfo);
2021-11-29 11:35:42 +00:00
tfree(pMnode);
mDebug("mnode is closed");
}
2021-11-29 04:09:18 +00:00
}
2021-11-11 09:31:52 +00:00
2021-11-29 04:09:18 +00:00
int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
2021-11-29 11:35:42 +00:00
mDebug("start to alter mnode");
mDebug("mnode is altered");
2021-11-29 04:09:18 +00:00
return 0;
}
2021-11-11 09:31:52 +00:00
2021-11-29 04:09:18 +00:00
void mndDestroy(const char *path) {
2021-11-29 11:35:42 +00:00
mDebug("start to destroy mnode at %s", path);
2021-11-29 07:53:02 +00:00
taosRemoveDir(path);
2021-11-29 11:35:42 +00:00
mDebug("mnode is destroyed");
2021-11-29 04:09:18 +00:00
}
2021-11-11 09:31:52 +00:00
2021-11-29 04:09:18 +00:00
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
2021-11-29 11:35:42 +00:00
pLoad->numOfDnode = 0;
pLoad->numOfMnode = 0;
pLoad->numOfVgroup = 0;
pLoad->numOfDatabase = 0;
pLoad->numOfSuperTable = 0;
pLoad->numOfChildTable = 0;
pLoad->numOfColumn = 0;
pLoad->totalPoints = 0;
pLoad->totalStorage = 0;
pLoad->compStorage = 0;
2021-11-29 04:09:18 +00:00
return 0;
}
2021-11-11 09:31:52 +00:00
2021-11-29 04:09:18 +00:00
SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
2021-11-11 09:31:52 +00:00
SMnodeMsg *pMsg = taosAllocateQitem(sizeof(SMnodeMsg));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-01-04 07:20:29 +00:00
mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
2021-11-11 09:31:52 +00:00
return NULL;
}
2022-01-20 01:49:27 +00:00
if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER) {
2022-01-05 02:27:11 +00:00
SRpcConnInfo connInfo = {0};
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
taosFreeQitem(pMsg);
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
return NULL;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
2021-11-11 09:31:52 +00:00
}
2021-12-01 09:27:31 +00:00
pMsg->pMnode = pMnode;
2021-11-11 09:31:52 +00:00
pMsg->rpcMsg = *pRpcMsg;
pMsg->createdTime = taosGetTimestampSec();
2022-01-04 07:20:29 +00:00
mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMsg->user);
2021-11-11 09:31:52 +00:00
return pMsg;
}
2021-11-29 04:09:18 +00:00
void mndCleanupMsg(SMnodeMsg *pMsg) {
2022-01-04 07:20:29 +00:00
mTrace("msg:%p, is destroyed, app:%p RPC:%p", pMsg, pMsg->rpcMsg.ahandle, pMsg->rpcMsg.handle);
2021-12-13 07:35:33 +00:00
rpcFreeCont(pMsg->rpcMsg.pCont);
pMsg->rpcMsg.pCont = NULL;
2021-11-11 09:31:52 +00:00
taosFreeQitem(pMsg);
}
2021-12-01 10:59:38 +00:00
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rpcRsp);
}
2021-11-23 10:20:57 +00:00
2021-12-28 09:38:15 +00:00
void mndProcessMsg(SMnodeMsg *pMsg) {
2021-11-29 04:09:18 +00:00
SMnode *pMnode = pMsg->pMnode;
2021-11-29 05:19:00 +00:00
int32_t code = 0;
tmsg_t msgType = pMsg->rpcMsg.msgType;
2021-11-29 05:19:00 +00:00
void *ahandle = pMsg->rpcMsg.ahandle;
2021-12-13 07:35:33 +00:00
bool isReq = (msgType & 1U);
2021-11-29 04:09:18 +00:00
2022-01-04 07:20:29 +00:00
mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
2021-12-01 09:27:31 +00:00
2021-11-29 05:19:00 +00:00
if (isReq && !mndIsMaster(pMnode)) {
code = TSDB_CODE_APP_NOT_READY;
2022-01-04 07:20:29 +00:00
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
2021-11-29 05:19:00 +00:00
goto PROCESS_RPC_END;
}
if (isReq && pMsg->rpcMsg.pCont == NULL) {
code = TSDB_CODE_MND_INVALID_MSG_LEN;
2022-01-04 07:20:29 +00:00
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
2021-11-29 05:19:00 +00:00
goto PROCESS_RPC_END;
2021-11-20 16:13:35 +00:00
}
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
2021-11-19 06:46:06 +00:00
if (fp == NULL) {
2021-11-29 05:19:00 +00:00
code = TSDB_CODE_MSG_NOT_PROCESSED;
2022-01-04 07:20:29 +00:00
mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
2021-11-29 05:19:00 +00:00
goto PROCESS_RPC_END;
2021-11-11 09:31:52 +00:00
}
2021-12-03 07:01:26 +00:00
code = (*fp)(pMsg);
2021-12-07 12:14:22 +00:00
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
2022-01-04 07:20:29 +00:00
mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
2021-12-07 12:14:22 +00:00
return;
} else if (code != 0) {
2021-11-29 11:35:42 +00:00
code = terrno;
2022-01-04 07:20:29 +00:00
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
2021-11-29 05:19:00 +00:00
goto PROCESS_RPC_END;
2021-12-01 10:59:38 +00:00
} else {
2022-01-04 07:20:29 +00:00
mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
2021-11-29 05:19:00 +00:00
}
PROCESS_RPC_END:
if (isReq) {
if (code == TSDB_CODE_APP_NOT_READY) {
2022-01-04 11:44:44 +00:00
mndSendRedirectRsp(pMnode, &pMsg->rpcMsg);
2021-11-29 05:19:00 +00:00
} else if (code != 0) {
2021-12-01 09:27:31 +00:00
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rpcRsp);
2021-11-29 05:19:00 +00:00
} else {
2021-12-01 09:27:31 +00:00
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
rpcSendResponse(&rpcRsp);
2021-11-29 05:19:00 +00:00
}
2021-11-19 06:46:06 +00:00
}
2021-11-11 09:31:52 +00:00
}
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
tmsg_t type = TMSG_INDEX(msgType);
if (type >= 0 && type < TDMT_MAX) {
pMnode->msgFp[type] = fp;
2021-11-11 09:31:52 +00:00
}
}
2021-12-12 02:46:49 +00:00
uint64_t mndGenerateUid(char *name, int32_t len) {
int64_t us = taosGetTimestampUs();
int32_t hashval = MurmurHash3_32(name, len);
uint64_t x = (us & 0x000000FFFFFFFFFF) << 24;
return x + ((hashval & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
2021-12-24 06:37:06 +00:00
}