TDengine/source/dnode/mnode/impl/src/mndProfile.c

984 lines
33 KiB
C
Raw Normal View History

2021-09-22 08:15:20 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2021-10-16 07:16:05 +00:00
#define _DEFAULT_SOURCE
2021-12-03 06:36:41 +00:00
#include "mndProfile.h"
#include "mndDb.h"
2022-05-16 06:55:31 +00:00
#include "mndDnode.h"
2021-12-03 06:36:41 +00:00
#include "mndMnode.h"
#include "mndPrivilege.h"
2022-05-31 06:03:47 +00:00
#include "mndQnode.h"
2021-12-03 06:36:41 +00:00
#include "mndShow.h"
2022-02-16 07:44:20 +00:00
#include "mndStb.h"
2021-12-03 06:36:41 +00:00
#include "mndUser.h"
2022-02-16 07:44:20 +00:00
#include "tglobal.h"
#include "tversion.h"
2021-10-17 03:42:05 +00:00
2021-12-03 06:36:41 +00:00
typedef struct {
2022-04-22 11:47:00 +00:00
uint32_t id;
int8_t connType;
char user[TSDB_USER_LEN];
char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc
int64_t appStartTimeMs; // app start time
int32_t pid; // pid of app that invokes taosc
uint32_t ip;
uint16_t port;
int8_t killed;
int64_t loginTimeMs;
int64_t lastAccessTimeMs;
uint64_t killId;
int32_t numOfQueries;
SRWLatch queryLock;
2022-05-16 06:55:31 +00:00
SArray *pQueries; // SArray<SQueryDesc>
2021-12-03 06:36:41 +00:00
} SConnObj;
2022-06-15 12:59:33 +00:00
typedef struct {
int64_t appId;
uint32_t ip;
int32_t pid;
char name[TSDB_APP_NAME_LEN];
int64_t startTime;
SAppClusterSummary summary;
int64_t lastAccessTimeMs;
} SAppObj;
2022-04-14 06:42:51 +00:00
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
int32_t pid, const char *app, int64_t startTime);
2021-12-03 06:36:41 +00:00
static void mndFreeConn(SConnObj *pConn);
2022-04-12 11:10:52 +00:00
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId);
2021-12-03 06:36:41 +00:00
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
2022-05-16 06:55:31 +00:00
static void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter);
2021-12-03 06:36:41 +00:00
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
2022-05-16 06:55:31 +00:00
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq);
static int32_t mndProcessConnectReq(SRpcMsg *pReq);
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq);
static int32_t mndProcessKillConnReq(SRpcMsg *pReq);
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
2021-12-03 06:36:41 +00:00
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter);
2022-06-15 12:59:33 +00:00
static void mndFreeApp(SAppObj *pApp);
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextApp(SMnode *pMnode, void *pIter);
2022-06-24 07:18:40 +00:00
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq);
2021-12-03 06:36:41 +00:00
int32_t mndInitProfile(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-04-22 11:47:00 +00:00
// in ms
2022-06-15 12:59:33 +00:00
int32_t checkTime = tsShellActivityTimer * 2 * 1000;
2022-06-17 07:26:45 +00:00
pMgmt->connCache = taosCacheInit(TSDB_DATA_TYPE_UINT, checkTime, true, (__cache_free_fn_t)mndFreeConn, "conn");
2022-06-15 12:59:33 +00:00
if (pMgmt->connCache == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc profile cache since %s", terrstr());
return -1;
}
pMgmt->appCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, checkTime, true, (__cache_free_fn_t)mndFreeApp, "app");
if (pMgmt->appCache == NULL) {
2021-12-03 06:36:41 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to alloc profile cache since %s", terrstr());
return -1;
}
2022-01-06 08:13:49 +00:00
mndSetMsgHandle(pMnode, TDMT_MND_HEARTBEAT, mndProcessHeartBeatReq);
mndSetMsgHandle(pMnode, TDMT_MND_CONNECT, mndProcessConnectReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_QUERY, mndProcessKillQueryReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_CONN, mndProcessKillConnReq);
2022-06-24 07:18:40 +00:00
mndSetMsgHandle(pMnode, TDMT_MND_SERVER_VERSION, mndProcessSvrVerReq);
2021-12-03 06:36:41 +00:00
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
2021-12-03 06:36:41 +00:00
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
2022-06-13 12:40:19 +00:00
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
2021-12-05 15:28:11 +00:00
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
2022-06-15 12:59:33 +00:00
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndRetrieveApps);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_APPS, mndCancelGetNextApp);
2021-12-03 07:01:26 +00:00
2021-12-03 06:36:41 +00:00
return 0;
}
void mndCleanupProfile(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-06-15 12:59:33 +00:00
if (pMgmt->connCache != NULL) {
taosCacheCleanup(pMgmt->connCache);
pMgmt->connCache = NULL;
}
if (pMgmt->appCache != NULL) {
taosCacheCleanup(pMgmt->appCache);
pMgmt->appCache = NULL;
2021-12-03 06:36:41 +00:00
}
}
2022-04-14 06:42:51 +00:00
static SConnObj *mndCreateConn(SMnode *pMnode, const char *user, int8_t connType, uint32_t ip, uint16_t port,
int32_t pid, const char *app, int64_t startTime) {
2021-12-03 06:36:41 +00:00
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-06-21 07:05:31 +00:00
char connStr[255] = {0};
int32_t len = snprintf(connStr, sizeof(connStr), "%s%d%d%d%s", user, ip, port, pid, app);
2022-06-17 07:26:45 +00:00
uint32_t connId = mndGenerateUid(connStr, len);
2021-12-21 14:35:45 +00:00
if (startTime == 0) startTime = taosGetTimestampMs();
2021-12-03 06:36:41 +00:00
2022-06-21 07:05:31 +00:00
SConnObj connObj = {
.id = connId,
.connType = connType,
.appStartTimeMs = startTime,
.pid = pid,
.ip = ip,
.port = port,
.killed = 0,
.loginTimeMs = taosGetTimestampMs(),
.lastAccessTimeMs = 0,
.killId = 0,
.numOfQueries = 0,
.pQueries = NULL,
};
2021-12-03 06:36:41 +00:00
2021-12-21 14:35:45 +00:00
connObj.lastAccessTimeMs = connObj.loginTimeMs;
2022-03-28 11:44:53 +00:00
tstrncpy(connObj.user, user, TSDB_USER_LEN);
2021-12-03 06:36:41 +00:00
tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN);
2022-02-24 12:17:48 +00:00
int32_t keepTime = tsShellActivityTimer * 3;
2022-06-21 07:05:31 +00:00
SConnObj *pConn =
taosCachePut(pMgmt->connCache, &connId, sizeof(uint32_t), &connObj, sizeof(connObj), keepTime * 1000);
2021-12-05 15:28:11 +00:00
if (pConn == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-03-28 11:44:53 +00:00
mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
2021-12-05 15:28:11 +00:00
return NULL;
} else {
2022-04-12 11:10:52 +00:00
mTrace("conn:%u, is created, data:%p user:%s", pConn->id, pConn, user);
2021-12-05 15:28:11 +00:00
return pConn;
}
2021-12-03 06:36:41 +00:00
}
static void mndFreeConn(SConnObj *pConn) {
taosWLockLatch(&pConn->queryLock);
2022-05-21 13:59:04 +00:00
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
taosWUnLockLatch(&pConn->queryLock);
2022-04-12 11:10:52 +00:00
mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
2021-12-03 06:36:41 +00:00
}
2022-04-12 11:10:52 +00:00
static SConnObj *mndAcquireConn(SMnode *pMnode, uint32_t connId) {
2021-12-03 06:36:41 +00:00
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-06-15 12:59:33 +00:00
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(connId));
2021-12-03 06:36:41 +00:00
if (pConn == NULL) {
2022-04-12 11:10:52 +00:00
mDebug("conn:%u, already destroyed", connId);
2021-12-03 06:36:41 +00:00
return NULL;
}
2022-06-15 12:59:33 +00:00
pConn->lastAccessTimeMs = taosGetTimestampMs();
2022-04-12 11:10:52 +00:00
mTrace("conn:%u, acquired from cache, data:%p", pConn->id, pConn);
2021-12-03 06:36:41 +00:00
return pConn;
}
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) {
if (pConn == NULL) return;
2022-04-12 11:10:52 +00:00
mTrace("conn:%u, released from cache, data:%p", pConn->id, pConn);
2021-12-06 07:06:29 +00:00
2021-12-06 06:38:37 +00:00
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-06-15 12:59:33 +00:00
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
2021-12-03 06:36:41 +00:00
}
2022-03-03 10:49:39 +00:00
void *mndGetNextConn(SMnode *pMnode, SCacheIter *pIter) {
2022-04-14 06:42:51 +00:00
SConnObj *pConn = NULL;
bool hasNext = taosCacheIterNext(pIter);
2022-03-03 10:49:39 +00:00
if (hasNext) {
size_t dataLen = 0;
pConn = taosCacheIterGetData(pIter, &dataLen);
} else {
taosCacheDestroyIter(pIter);
2021-12-03 06:36:41 +00:00
}
2022-03-03 10:49:39 +00:00
return pConn;
2021-12-03 06:36:41 +00:00
}
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
2022-03-03 10:49:39 +00:00
if (pIter != NULL) {
taosCacheDestroyIter(pIter);
}
2021-12-03 06:36:41 +00:00
}
2022-05-16 06:55:31 +00:00
static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
2022-06-21 07:05:31 +00:00
SMnode *pMnode = pReq->info.node;
SUserObj *pUser = NULL;
SDbObj *pDb = NULL;
SConnObj *pConn = NULL;
int32_t code = -1;
SConnectReq connReq = {0};
2022-06-24 07:55:31 +00:00
char ip[24] = {0};
2022-06-21 07:05:31 +00:00
const STraceId *trace = &pReq->info.traceId;
2022-02-16 03:45:44 +00:00
if ((code = tDeserializeSConnectReq(pReq->pCont, pReq->contLen, &connReq)) != 0) {
terrno = (-1 == code ? TSDB_CODE_INVALID_MSG : code);
goto _OVER;
}
2023-03-29 03:10:15 +00:00
if ((code = taosCheckVersionCompatibleFromStr(connReq.sVer, version, 3)) != 0) {
terrno = code;
2022-06-24 07:55:31 +00:00
goto _OVER;
2022-02-16 03:45:44 +00:00
}
2021-12-03 06:36:41 +00:00
code = -1;
taosIp2String(pReq->info.conn.clientIp, ip);
2022-06-25 01:09:33 +00:00
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONNECT) != 0) {
mGError("user:%s, failed to login from %s since %s", pReq->info.conn.user, ip, terrstr());
goto _OVER;
}
2021-12-03 06:36:41 +00:00
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) {
2022-06-24 07:55:31 +00:00
mGError("user:%s, failed to login from %s while acquire user since %s", pReq->info.conn.user, ip, terrstr());
goto _OVER;
}
2022-06-24 07:55:31 +00:00
if (strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1) != 0) {
mGError("user:%s, failed to login from %s since invalid pass, input:%s", pReq->info.conn.user, ip, connReq.passwd);
2022-07-26 05:33:24 +00:00
code = TSDB_CODE_MND_AUTH_FAILURE;
2022-06-24 07:55:31 +00:00
goto _OVER;
}
2022-02-16 03:45:44 +00:00
if (connReq.db[0]) {
2022-06-24 07:55:31 +00:00
char db[TSDB_DB_FNAME_LEN] = {0};
2022-03-15 12:53:29 +00:00
snprintf(db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
pDb = mndAcquireDb(pMnode, db);
2021-12-03 06:36:41 +00:00
if (pDb == NULL) {
if (0 != strcmp(connReq.db, TSDB_INFORMATION_SCHEMA_DB) &&
(0 != strcmp(connReq.db, TSDB_PERFORMANCE_SCHEMA_DB))) {
terrno = TSDB_CODE_MND_INVALID_DB;
mGError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
terrstr());
goto _OVER;
}
2022-06-25 10:03:12 +00:00
}
if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_READ_OR_WRITE_DB, pDb) != 0) {
2022-06-24 07:55:31 +00:00
goto _OVER;
2021-12-03 06:36:41 +00:00
}
}
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
2021-12-03 06:36:41 +00:00
if (pConn == NULL) {
2022-06-21 07:05:31 +00:00
mGError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
2022-06-24 07:55:31 +00:00
goto _OVER;
2021-12-03 06:36:41 +00:00
}
2022-02-16 03:45:44 +00:00
SConnectRsp connectRsp = {0};
connectRsp.acctId = pUser->acctId;
connectRsp.superUser = pUser->superUser;
2022-08-24 09:36:10 +00:00
connectRsp.sysInfo = pUser->sysInfo;
2022-02-16 03:45:44 +00:00
connectRsp.clusterId = pMnode->clusterId;
connectRsp.connId = pConn->id;
2022-04-14 06:42:51 +00:00
connectRsp.connType = connReq.connType;
2022-05-09 13:13:29 +00:00
connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
2022-07-15 06:51:20 +00:00
connectRsp.svrTimestamp = taosGetTimestampSec();
connectRsp.passVer = pUser->passVersion;
2023-06-28 12:49:29 +00:00
connectRsp.authVer = pUser->authVersion;
2022-06-25 01:09:33 +00:00
2022-06-24 07:18:40 +00:00
strcpy(connectRsp.sVer, version);
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
2022-02-16 03:45:44 +00:00
gitinfo);
mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
2022-02-16 03:45:44 +00:00
int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
2022-06-24 07:55:31 +00:00
if (contLen < 0) goto _OVER;
2022-02-16 03:45:44 +00:00
void *pRsp = rpcMallocCont(contLen);
2022-06-24 07:55:31 +00:00
if (pRsp == NULL) goto _OVER;
2022-02-16 03:45:44 +00:00
tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
2021-12-03 06:36:41 +00:00
2022-05-16 06:55:31 +00:00
pReq->info.rspLen = contLen;
pReq->info.rsp = pRsp;
2022-06-21 07:05:31 +00:00
mGDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
code = 0;
2022-06-24 07:55:31 +00:00
_OVER:
mndReleaseUser(pMnode, pUser);
mndReleaseDb(pMnode, pDb);
mndReleaseConn(pMnode, pConn);
return code;
2021-12-03 06:36:41 +00:00
}
2022-04-12 11:10:52 +00:00
static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
taosWLockLatch(&pConn->queryLock);
2022-04-12 11:10:52 +00:00
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
2021-12-03 09:22:44 +00:00
2022-04-12 11:10:52 +00:00
pConn->pQueries = pBasic->queryDesc;
pConn->numOfQueries = pBasic->queryDesc ? taosArrayGetSize(pBasic->queryDesc) : 0;
2022-04-12 11:10:52 +00:00
pBasic->queryDesc = NULL;
2022-04-22 11:47:00 +00:00
mDebug("queries updated in conn %u, num:%d", pConn->id, pConn->numOfQueries);
taosWUnLockLatch(&pConn->queryLock);
2021-12-03 09:22:44 +00:00
return TSDB_CODE_SUCCESS;
}
2022-06-21 07:05:31 +00:00
static SAppObj *mndCreateApp(SMnode *pMnode, uint32_t clientIp, SAppHbReq *pReq) {
2022-06-15 12:59:33 +00:00
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SAppObj app;
app.appId = pReq->appId;
app.ip = clientIp;
app.pid = pReq->pid;
strcpy(app.name, pReq->name);
app.startTime = pReq->startTime;
memcpy(&app.summary, &pReq->summary, sizeof(pReq->summary));
app.lastAccessTimeMs = taosGetTimestampMs();
2022-06-21 07:05:31 +00:00
const int32_t keepTime = tsShellActivityTimer * 3;
2022-06-15 12:59:33 +00:00
SAppObj *pApp = taosCachePut(pMgmt->appCache, &pReq->appId, sizeof(pReq->appId), &app, sizeof(app), keepTime * 1000);
if (pApp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to app %" PRIx64 " into cache since %s", pReq->appId, terrstr());
return NULL;
}
2022-06-21 07:05:31 +00:00
2022-06-15 12:59:33 +00:00
mTrace("app %" PRIx64 " is put into cache", pReq->appId);
return pApp;
}
2022-06-21 07:05:31 +00:00
static void mndFreeApp(SAppObj *pApp) { mTrace("app %" PRIx64 " is destroyed", pApp->appId); }
2022-06-15 12:59:33 +00:00
static SAppObj *mndAcquireApp(SMnode *pMnode, int64_t appId) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SAppObj *pApp = taosCacheAcquireByKey(pMgmt->appCache, &appId, sizeof(appId));
if (pApp == NULL) {
mDebug("app %" PRIx64 " not in cache", appId);
return NULL;
}
pApp->lastAccessTimeMs = (uint64_t)taosGetTimestampMs();
mTrace("app %" PRIx64 " acquired from cache", appId);
return pApp;
}
static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
if (pApp == NULL) return;
mTrace("release app %" PRIx64 " to cache", pApp->appId);
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
}
2022-10-17 11:48:36 +00:00
SAppObj *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
2022-06-15 12:59:33 +00:00
SAppObj *pApp = NULL;
2022-06-21 07:05:31 +00:00
bool hasNext = taosCacheIterNext(pIter);
2022-06-15 12:59:33 +00:00
if (hasNext) {
size_t dataLen = 0;
pApp = taosCacheIterGetData(pIter, &dataLen);
} else {
taosCacheDestroyIter(pIter);
}
return pApp;
}
static void mndCancelGetNextApp(SMnode *pMnode, void *pIter) {
if (pIter != NULL) {
taosCacheDestroyIter(pIter);
}
}
2022-02-16 07:44:20 +00:00
static SClientHbRsp *mndMqHbBuildRsp(SMnode *pMnode, SClientHbReq *pReq) {
//
2022-01-19 08:28:13 +00:00
return NULL;
2022-01-17 11:56:33 +00:00
}
2022-06-15 12:59:33 +00:00
static int32_t mndUpdateAppInfo(SMnode *pMnode, SClientHbReq *pHbReq, SRpcConnInfo *connInfo) {
2022-06-21 07:05:31 +00:00
SAppHbReq *pReq = &pHbReq->app;
SAppObj *pApp = mndAcquireApp(pMnode, pReq->appId);
2022-06-15 12:59:33 +00:00
if (pApp == NULL) {
pApp = mndCreateApp(pMnode, connInfo->clientIp, pReq);
if (pApp == NULL) {
mError("failed to create new app %" PRIx64 " since %s", pReq->appId, terrstr());
return -1;
} else {
2022-06-22 06:55:29 +00:00
mDebug("a new app %" PRIx64 " is created", pReq->appId);
2022-06-21 07:05:31 +00:00
mndReleaseApp(pMnode, pApp);
2022-06-15 12:59:33 +00:00
return TSDB_CODE_SUCCESS;
}
}
memcpy(&pApp->summary, &pReq->summary, sizeof(pReq->summary));
mndReleaseApp(pMnode, pApp);
return TSDB_CODE_SUCCESS;
}
2022-06-24 07:18:40 +00:00
static int32_t mndGetOnlineDnodeNum(SMnode *pMnode, int32_t *num) {
SSdb *pSdb = pMnode->pSdb;
SDnodeObj *pDnode = NULL;
int64_t curMs = taosGetTimestampMs();
void *pIter = NULL;
2022-06-25 01:09:33 +00:00
2022-06-24 07:18:40 +00:00
while (true) {
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
2022-06-25 01:09:33 +00:00
2022-06-24 07:18:40 +00:00
bool online = mndIsDnodeOnline(pDnode, curMs);
if (online) {
(*num)++;
}
2022-06-25 01:09:33 +00:00
2022-06-24 07:18:40 +00:00
sdbRelease(pSdb, pDnode);
}
return TSDB_CODE_SUCCESS;
}
2022-04-22 11:47:00 +00:00
static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHbReq *pHbReq,
SClientHbBatchRsp *pBatchRsp) {
2022-04-12 11:10:52 +00:00
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-04-22 11:47:00 +00:00
SClientHbRsp hbRsp = {.connKey = pHbReq->connKey, .status = 0, .info = NULL, .query = NULL};
2022-06-21 07:05:31 +00:00
SRpcConnInfo connInfo = pMsg->info.conn;
2022-06-15 12:59:33 +00:00
mndUpdateAppInfo(pMnode, pHbReq, &connInfo);
2022-04-12 11:10:52 +00:00
if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query;
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
2022-04-22 11:47:00 +00:00
if (pConn == NULL) {
pConn = mndCreateConn(pMnode, connInfo.user, CONN_TYPE__QUERY, connInfo.clientIp, connInfo.clientPort,
2022-06-15 12:59:33 +00:00
pHbReq->app.pid, pHbReq->app.name, 0);
2022-04-12 11:10:52 +00:00
if (pConn == NULL) {
mError("user:%s, conn:%u is freed and failed to create new since %s", connInfo.user, pBasic->connId, terrstr());
return -1;
} else {
2022-06-15 12:59:33 +00:00
mDebug("user:%s, conn:%u is freed, will create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
2022-04-12 11:10:52 +00:00
}
}
2022-04-12 11:10:52 +00:00
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
if (rspBasic == NULL) {
mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, conn:%u failed to process hb while since %s", pConn->user, pBasic->connId, terrstr());
return -1;
}
mndSaveQueryList(pConn, pBasic);
if (pConn->killed != 0) {
rspBasic->killConnection = 1;
}
if (pConn->killId != 0) {
rspBasic->killRid = pConn->killId;
pConn->killId = 0;
}
rspBasic->connId = pConn->id;
2022-05-24 13:22:03 +00:00
rspBasic->totalDnodes = mndGetDnodeSize(pMnode);
2022-06-24 07:18:40 +00:00
mndGetOnlineDnodeNum(pMnode, &rspBasic->onlineDnodes);
2022-04-12 11:10:52 +00:00
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
2022-05-31 06:03:47 +00:00
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
2022-04-12 11:10:52 +00:00
mndReleaseConn(pMnode, pConn);
hbRsp.query = rspBasic;
} else {
mDebug("no query info in hb msg");
2022-04-12 11:10:52 +00:00
}
int32_t kvNum = taosHashGetSize(pHbReq->info);
if (NULL == pHbReq->info || kvNum <= 0) {
2022-04-22 11:47:00 +00:00
taosArrayPush(pBatchRsp->rsps, &hbRsp);
2022-04-12 11:10:52 +00:00
return TSDB_CODE_SUCCESS;
}
hbRsp.info = taosArrayInit(kvNum, sizeof(SKv));
if (NULL == hbRsp.info) {
mError("taosArrayInit %d rsp kv failed", kvNum);
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-05-21 13:59:04 +00:00
tFreeClientHbRsp(&hbRsp);
2022-04-12 11:10:52 +00:00
return -1;
}
void *pIter = taosHashIterate(pHbReq->info, NULL);
while (pIter != NULL) {
SKv *kv = pIter;
switch (kv->key) {
2022-05-06 06:13:56 +00:00
case HEARTBEAT_KEY_USER_AUTHINFO: {
2022-05-16 06:55:31 +00:00
void *rspMsg = NULL;
2022-05-06 06:13:56 +00:00
int32_t rspLen = 0;
mndValidateUserAuthInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserAuthVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_USER_AUTHINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
2022-04-12 11:10:52 +00:00
case HEARTBEAT_KEY_DBINFO: {
2022-05-16 06:55:31 +00:00
void *rspMsg = NULL;
2022-04-12 11:10:52 +00:00
int32_t rspLen = 0;
2023-04-13 02:54:57 +00:00
mndValidateDbInfo(pMnode, kv->value, kv->valueLen / sizeof(SDbCacheInfo), &rspMsg, &rspLen);
2022-04-12 11:10:52 +00:00
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
case HEARTBEAT_KEY_STBINFO: {
2022-05-16 06:55:31 +00:00
void *rspMsg = NULL;
2022-04-12 11:10:52 +00:00
int32_t rspLen = 0;
2022-06-13 02:38:45 +00:00
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
2022-04-12 11:10:52 +00:00
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
2023-04-09 10:44:46 +00:00
case HEARTBEAT_KEY_USER_PASSINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateUserPassInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserPassVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_USER_PASSINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
2022-04-12 11:10:52 +00:00
default:
mError("invalid kv key:%d", kv->key);
2022-12-03 02:03:18 +00:00
hbRsp.status = TSDB_CODE_APP_ERROR;
2022-04-12 11:10:52 +00:00
break;
}
pIter = taosHashIterate(pHbReq->info, pIter);
}
taosArrayPush(pBatchRsp->rsps, &hbRsp);
return TSDB_CODE_SUCCESS;
}
2022-05-16 06:55:31 +00:00
static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
2022-02-14 07:27:38 +00:00
2022-01-14 02:48:05 +00:00
SClientHbBatchReq batchReq = {0};
2022-05-16 06:55:31 +00:00
if (tDeserializeSClientHbBatchReq(pReq->pCont, pReq->contLen, &batchReq) != 0) {
2022-05-21 13:59:04 +00:00
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
2022-02-14 07:27:38 +00:00
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
2022-01-14 07:54:07 +00:00
SClientHbBatchRsp batchRsp = {0};
2022-07-15 06:56:23 +00:00
batchRsp.svrTimestamp = taosGetTimestampSec();
2022-01-14 07:54:07 +00:00
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
2022-01-14 02:48:05 +00:00
2022-02-16 07:44:20 +00:00
int32_t sz = taosArrayGetSize(batchReq.reqs);
2022-01-14 07:54:07 +00:00
for (int i = 0; i < sz; i++) {
2022-02-16 07:44:20 +00:00
SClientHbReq *pHbReq = taosArrayGet(batchReq.reqs, i);
2022-04-14 11:54:59 +00:00
if (pHbReq->connKey.connType == CONN_TYPE__QUERY) {
2022-05-16 06:55:31 +00:00
mndProcessQueryHeartBeat(pMnode, pReq, pHbReq, &batchRsp);
2022-04-14 11:54:59 +00:00
} else if (pHbReq->connKey.connType == CONN_TYPE__TMQ) {
2022-01-17 11:56:33 +00:00
SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
if (pRsp != NULL) {
taosArrayPush(batchRsp.rsps, pRsp);
2022-03-25 16:29:53 +00:00
taosMemoryFree(pRsp);
2022-01-17 11:56:33 +00:00
}
2022-01-14 02:48:05 +00:00
}
}
2022-02-16 07:44:20 +00:00
taosArrayDestroyEx(batchReq.reqs, tFreeClientHbReq);
2022-01-18 08:15:24 +00:00
2022-02-14 07:27:38 +00:00
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, 0, &batchRsp);
void *buf = rpcMallocCont(tlen);
tSerializeSClientHbBatchRsp(buf, tlen, &batchRsp);
2022-05-21 13:59:04 +00:00
tFreeClientHbBatchRsp(&batchRsp);
2022-05-16 06:55:31 +00:00
pReq->info.rspLen = tlen;
pReq->info.rsp = buf;
2021-12-03 06:36:41 +00:00
return 0;
}
2022-05-16 06:55:31 +00:00
static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
2021-12-03 07:01:26 +00:00
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-02-16 07:44:20 +00:00
SKillQueryReq killReq = {0};
2022-05-16 06:55:31 +00:00
if (tDeserializeSKillQueryReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
2022-02-16 07:44:20 +00:00
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
2022-06-16 13:06:04 +00:00
mInfo("kill query msg is received, queryId:%s", killReq.queryStrId);
2022-06-25 01:09:33 +00:00
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_QUERY) != 0) {
return -1;
}
2022-06-21 07:05:31 +00:00
int32_t connId = 0;
2022-06-16 13:06:04 +00:00
uint64_t queryId = 0;
2022-06-21 07:05:31 +00:00
char *p = strchr(killReq.queryStrId, ':');
2022-06-16 13:06:04 +00:00
if (NULL == p) {
mError("invalid query id %s", killReq.queryStrId);
terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
return -1;
}
*p = 0;
connId = taosStr2Int32(killReq.queryStrId, NULL, 16);
queryId = taosStr2UInt64(p + 1, NULL, 16);
2021-12-03 07:01:26 +00:00
2022-06-16 13:06:04 +00:00
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &connId, sizeof(int32_t));
2021-12-03 07:01:26 +00:00
if (pConn == NULL) {
2022-06-16 13:06:04 +00:00
mError("connId:%x, failed to kill queryId:%" PRIx64 ", conn not exist", connId, queryId);
2021-12-03 07:01:26 +00:00
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
2022-06-17 07:26:45 +00:00
mInfo("connId:%x, queryId:%" PRIx64 " is killed by user:%s", connId, queryId, pReq->info.conn.user);
2022-06-16 13:06:04 +00:00
pConn->killId = queryId;
2022-06-15 12:59:33 +00:00
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
2021-12-03 07:01:26 +00:00
return 0;
}
}
2022-05-16 06:55:31 +00:00
static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
2021-12-03 07:01:26 +00:00
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-02-16 07:44:20 +00:00
SKillConnReq killReq = {0};
2022-05-16 06:55:31 +00:00
if (tDeserializeSKillConnReq(pReq->pCont, pReq->contLen, &killReq) != 0) {
2022-02-16 07:44:20 +00:00
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
2021-12-05 15:28:11 +00:00
2022-06-25 01:09:33 +00:00
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_KILL_CONN) != 0) {
return -1;
}
2022-06-17 07:26:45 +00:00
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->connCache, &killReq.connId, sizeof(uint32_t));
2021-12-03 07:01:26 +00:00
if (pConn == NULL) {
2022-06-17 07:26:45 +00:00
mError("connId:%u, failed to kill connection, conn not exist", killReq.connId);
2021-12-03 07:01:26 +00:00
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
2022-06-17 07:26:45 +00:00
mInfo("connId:%u, is killed by user:%s", killReq.connId, pReq->info.conn.user);
2021-12-03 07:01:26 +00:00
pConn->killed = 1;
2022-06-15 12:59:33 +00:00
taosCacheRelease(pMgmt->connCache, (void **)&pConn, false);
2021-12-03 07:01:26 +00:00
return TSDB_CODE_SUCCESS;
}
}
2022-06-24 07:18:40 +00:00
static int32_t mndProcessSvrVerReq(SRpcMsg *pReq) {
2022-06-25 01:09:33 +00:00
int32_t code = -1;
2022-06-24 07:18:40 +00:00
SServerVerRsp rsp = {0};
2022-09-30 03:26:26 +00:00
tstrncpy(rsp.ver, version, sizeof(rsp.ver));
2022-06-25 01:09:33 +00:00
2022-06-24 07:18:40 +00:00
int32_t contLen = tSerializeSServerVerRsp(NULL, 0, &rsp);
if (contLen < 0) goto _over;
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) goto _over;
tSerializeSServerVerRsp(pRsp, contLen, &rsp);
pReq->info.rspLen = contLen;
pReq->info.rsp = pRsp;
code = 0;
_over:
return code;
}
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SConnObj *pConn = NULL;
int32_t keepTime = tsShellActivityTimer * 3;
2022-03-03 10:49:39 +00:00
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-06-15 12:59:33 +00:00
pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
2022-03-03 10:49:39 +00:00
}
2021-12-03 06:36:41 +00:00
while (numOfRows < rows) {
2022-03-03 10:49:39 +00:00
pConn = mndGetNextConn(pMnode, pShow->pIter);
if (pConn == NULL) {
pShow->pIter = NULL;
break;
}
2021-12-03 06:36:41 +00:00
2022-09-30 03:26:26 +00:00
if ((taosGetTimestampMs() - pConn->lastAccessTimeMs) > ((int64_t)keepTime * 1000)) {
continue;
}
2021-12-03 06:36:41 +00:00
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->id, false);
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(user, pConn->user);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)user, false);
char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
STR_TO_VARSTR(app, pConn->app);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)app, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->pid, false);
char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)endpoint, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->loginTimeMs, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pConn->lastAccessTimeMs, false);
2021-12-03 06:36:41 +00:00
numOfRows++;
}
pShow->numOfRows += numOfRows;
2021-12-03 06:36:41 +00:00
return numOfRows;
}
/**
* @param pConn the conn queries pack from
* @param[out] pBlock the block data packed into
* @param offset skip [offset] queries in pConn
* @param rowsToPack at most rows to pack
* @return rows packed
*/
static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBlock* pBlock, uint32_t offset, uint32_t rowsToPack) {
int32_t cols = 0;
taosRLockLatch(&pConn->queryLock);
int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
if (NULL == pConn->pQueries || numOfQueries <= offset) {
taosRUnLockLatch(&pConn->queryLock);
return 0;
2022-03-03 10:49:39 +00:00
}
int32_t i = offset;
for (; i < numOfQueries && (i - offset) < rowsToPack; ++i) {
int32_t curRowIndex = pBlock->info.rows;
SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
cols = 0;
2021-12-03 06:36:41 +00:00
char queryId[26 + VARSTR_HEADER_SIZE] = {0};
sprintf(&queryId[VARSTR_HEADER_SIZE], "%x:%" PRIx64, pConn->id, pQuery->reqRid);
varDataLen(queryId) = strlen(&queryId[VARSTR_HEADER_SIZE]);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)queryId, false);
2021-12-03 06:36:41 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->queryId, false);
2021-12-03 06:36:41 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->id, false);
char app[TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE];
STR_TO_VARSTR(app, pConn->app);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)app, false);
2021-12-03 06:36:41 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)&pConn->pid, false);
2021-12-03 06:36:41 +00:00
char user[TSDB_USER_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(user, pConn->user);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)user, false);
char endpoint[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
sprintf(&endpoint[VARSTR_HEADER_SIZE], "%s:%d", taosIpStr(pConn->ip), pConn->port);
varDataLen(endpoint) = strlen(&endpoint[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)endpoint, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->useconds, false);
2021-12-03 06:36:41 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->stableQuery, false);
2021-12-03 06:36:41 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->subPlanNum, false);
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t strSize = sizeof(subStatus);
int32_t offset = VARSTR_HEADER_SIZE;
for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
if (i) {
offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
}
SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
}
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, subStatus, false);
2021-12-03 06:36:41 +00:00
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(sql, pQuery->sql);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, (const char *)sql, false);
2021-12-03 06:36:41 +00:00
pBlock->info.rows++;
}
2021-12-03 06:36:41 +00:00
taosRUnLockLatch(&pConn->queryLock);
return i - offset;
}
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SConnObj *pConn = NULL;
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->connCache);
}
2021-12-03 06:36:41 +00:00
// means fetched some data last time for this conn
if (pShow->curIterPackedRows > 0) {
size_t len = 0;
pConn = taosCacheIterGetData(pShow->pIter, &len);
if (pConn && (taosArrayGetSize(pConn->pQueries) > pShow->curIterPackedRows)) {
numOfRows = packQueriesIntoBlock(pShow, pConn, pBlock, pShow->curIterPackedRows, rows);
pShow->curIterPackedRows += numOfRows;
2021-12-03 06:36:41 +00:00
}
}
while (numOfRows < rows) {
pConn = mndGetNextConn(pMnode, pShow->pIter);
if (pConn == NULL) {
pShow->pIter = NULL;
break;
}
int32_t packedRows = packQueriesIntoBlock(pShow, pConn, pBlock, 0, rows - numOfRows);
pShow->curIterPackedRows = packedRows;
numOfRows += packedRows;
}
pShow->numOfRows += numOfRows;
2021-12-03 06:36:41 +00:00
return numOfRows;
}
2022-06-15 12:59:33 +00:00
static int32_t mndRetrieveApps(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
2022-06-21 07:05:31 +00:00
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SAppObj *pApp = NULL;
2022-06-15 12:59:33 +00:00
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->appCache);
}
while (numOfRows < rows) {
pApp = mndGetNextApp(pMnode, pShow->pIter);
if (pApp == NULL) {
pShow->pIter = NULL;
break;
}
cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->appId, false);
2022-06-15 12:59:33 +00:00
2022-06-16 02:24:01 +00:00
char ip[TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
sprintf(&ip[VARSTR_HEADER_SIZE], "%s", taosIpStr(pApp->ip));
varDataLen(ip) = strlen(&ip[VARSTR_HEADER_SIZE]);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)ip, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->pid, false);
2022-06-15 12:59:33 +00:00
2022-06-16 02:24:01 +00:00
char name[TSDB_APP_NAME_LEN + 6 + VARSTR_HEADER_SIZE] = {0};
sprintf(&name[VARSTR_HEADER_SIZE], "%s", pApp->name);
varDataLen(name) = strlen(&name[VARSTR_HEADER_SIZE]);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)name, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->startTime, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertsReq, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.numOfInsertRows, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.insertElapsedTime, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.insertBytes, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.fetchBytes, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.queryElapsedTime, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.numOfSlowQueries, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.totalRequests, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->summary.currentRequests, false);
2022-06-15 12:59:33 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2023-02-20 02:04:08 +00:00
colDataSetVal(pColInfo, numOfRows, (const char *)&pApp->lastAccessTimeMs, false);
2022-06-15 12:59:33 +00:00
numOfRows++;
}
pShow->numOfRows += numOfRows;
return numOfRows;
}
2021-12-03 06:36:41 +00:00
static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) {
2022-03-03 10:49:39 +00:00
if (pIter != NULL) {
taosCacheDestroyIter(pIter);
}
2021-12-03 06:36:41 +00:00
}
2022-03-03 12:23:53 +00:00
2022-03-04 05:41:47 +00:00
int32_t mndGetNumOfConnections(SMnode *pMnode) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
2022-06-15 12:59:33 +00:00
return taosCacheGetNumOfObj(pMgmt->connCache);
2022-04-12 11:10:52 +00:00
}