TDengine/source/client/src/clientMonitor.c
2025-10-14 20:09:47 +08:00

1031 lines
33 KiB
C

#include "clientMonitor.h"
#include "cJSON.h"
#include "clientInt.h"
#include "clientLog.h"
#include "tmisce.h"
#include "tqueue.h"
#include "ttime.h"
#include "ttimer.h"
SRWLatch monitorLock;
void* monitorTimer;
SHashObj* monitorCounterHash;
int32_t monitorFlag = 0;
int32_t quitCnt = 0;
tsem2_t monitorSem;
STaosQueue* monitorQueue;
SHashObj* monitorSlowLogHash;
char tmpSlowLogPath[PATH_MAX] = {0};
TdThread monitorThread;
extern bool tsEnableAuditDelete;
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
int ret = tsnprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
if (ret < 0) {
tscError("failed to get tmp path ret:%d", ret);
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
return 0;
}
static void processFileInTheEnd(TdFilePtr pFile, char* path) {
if (pFile == NULL) {
return;
}
if (taosFtruncateFile(pFile, 0) != 0) {
tscError("failed to truncate file:%s, terrno:%d", path, terrno);
return;
}
if (taosUnLockFile(pFile) != 0) {
tscError("failed to unlock file:%s, terrno:%d", path, terrno);
return;
}
if (taosCloseFile(&(pFile)) != 0) {
tscError("failed to close file:%s, terrno:%d", path, terrno);
return;
}
if (taosRemoveFile(path) != 0) {
tscError("failed to remove file:%s, terrno:%d", path, terrno);
return;
}
}
static void destroySlowLogClient(void* data) {
if (data == NULL) {
return;
}
SlowLogClient* slowLogClient = *(SlowLogClient**)data;
processFileInTheEnd(slowLogClient->pFile, slowLogClient->path);
taosMemoryFree(slowLogClient);
}
static void destroyMonitorClient(void* data) {
if (data == NULL) {
return;
}
MonitorClient* pMonitor = *(MonitorClient**)data;
if (pMonitor == NULL) {
return;
}
if (!taosTmrStopA(&pMonitor->timer)) {
tscError("failed to stop timer, pMonitor:%p", pMonitor);
}
taosHashCleanup(pMonitor->counters);
int ret = taos_collector_registry_destroy(pMonitor->registry);
if (ret) {
tscError("failed to destroy registry, pMonitor:%p ret:%d", pMonitor, ret);
}
taosMemoryFree(pMonitor);
}
static void monitorFreeSlowLogData(void* paras) {
MonitorSlowLogData* pData = (MonitorSlowLogData*)paras;
if (pData == NULL) {
return;
}
taosMemoryFreeClear(pData->data);
if (pData->type == SLOW_LOG_READ_BEGINNIG) {
taosMemoryFree(pData->fileName);
}
}
static void monitorFreeSlowLogDataEx(void* paras) {
monitorFreeSlowLogData(paras);
taosMemoryFree(paras);
}
static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) {
void* p = taosHashGet(appInfo.pInstMapByClusterId, &clusterId, LONG_BYTES);
if (p == NULL) {
tscError("failed to get app inst, clusterId:0x%" PRIx64, clusterId);
return NULL;
}
return *(SAppInstInfo**)p;
}
static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) {
if (TSDB_CODE_SUCCESS != code) {
tscError("found error in monitorReport send callback, code:%d, please check the network.", code);
}
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
}
if (param != NULL) {
MonitorSlowLogData* p = (MonitorSlowLogData*)param;
if (code != 0) {
tscError("failed to send slow log:%s, clusterId:0x%" PRIx64, p->data, p->clusterId);
}
MonitorSlowLogData tmp = {.clusterId = p->clusterId,
.type = p->type,
.fileName = p->fileName,
.pFile = p->pFile,
.offset = p->offset,
.data = NULL};
if (monitorPutData2MonitorQueue(tmp) == 0) {
p->fileName = NULL;
} else {
if (taosCloseFile(&(p->pFile)) != 0) {
tscError("failed to close file:%p", p->pFile);
}
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t sendReport(void* pTransporter, SEpSet* epSet, char* pCont, MONITOR_TYPE type, void* param) {
SStatisReq sStatisReq;
sStatisReq.pCont = pCont;
sStatisReq.contLen = strlen(pCont);
sStatisReq.type = type;
int tlen = tSerializeSStatisReq(NULL, 0, &sStatisReq);
if (tlen < 0) {
goto FAILED;
}
void* buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
tscError("sendReport failed, out of memory, len:%d", tlen);
goto FAILED;
}
tlen = tSerializeSStatisReq(buf, tlen, &sStatisReq);
if (tlen < 0) {
taosMemoryFree(buf);
goto FAILED;
}
SMsgSendInfo* pInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pInfo == NULL) {
tscError("sendReport failed, out of memory send info");
taosMemoryFree(buf);
goto FAILED;
}
pInfo->fp = monitorReportAsyncCB;
pInfo->msgInfo.pData = buf;
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_STATIS;
pInfo->param = param;
pInfo->paramFreeFp = monitorFreeSlowLogDataEx;
pInfo->requestId = tGenIdPI64();
pInfo->requestObjRefId = 0;
// int64_t transporterId = 0;
return asyncSendMsgToServer(pTransporter, epSet, NULL, pInfo);
FAILED:
if (taosCloseFile(&(((MonitorSlowLogData*)param)->pFile)) != 0) {
tscError("failed to close file:%p", ((MonitorSlowLogData*)param)->pFile);
}
monitorFreeSlowLogDataEx(param);
return TAOS_GET_TERRNO(TSDB_CODE_TSC_INTERNAL_ERROR);
}
static void generateClusterReport(taos_collector_registry_t* registry, void* pTransporter, SEpSet* epSet) {
char ts[50] = {0};
(void)snprintf(ts, sizeof(ts), "%" PRId64, taosGetTimestamp(TSDB_TIME_PRECISION_MILLI));
char* pCont = (char*)taos_collector_registry_bridge_new(registry, ts, "%" PRId64, NULL);
if (NULL == pCont) {
tscError("generateClusterReport failed, get null content. since %s", tstrerror(terrno));
return;
}
if (strlen(pCont) != 0 && sendReport(pTransporter, epSet, pCont, MONITOR_TYPE_COUNTER, NULL) == 0) {
int ret = taos_collector_registry_clear_batch(registry);
if (ret) {
tscError("failed to clear registry, ret:%d", ret);
}
}
taosMemoryFreeClear(pCont);
}
static void reportSendProcess(void* param, void* tmrId) {
taosRLockLatch(&monitorLock);
if (atomic_load_32(&monitorFlag) == 1) {
taosRUnLockLatch(&monitorLock);
return;
}
MonitorClient* pMonitor = (MonitorClient*)param;
SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
if (pInst == NULL) {
taosRUnLockLatch(&monitorLock);
return;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
bool reset =
taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
taosRUnLockLatch(&monitorLock);
}
static void sendAllCounter() {
MonitorClient** ppMonitor = NULL;
while ((ppMonitor = taosHashIterate(monitorSlowLogHash, ppMonitor))) {
MonitorClient* pMonitor = *ppMonitor;
if (pMonitor == NULL) {
continue;
}
SAppInstInfo* pInst = getAppInstByClusterId(pMonitor->clusterId);
if (pInst == NULL) {
taosHashCancelIterate(monitorSlowLogHash, ppMonitor);
break;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
}
}
void monitorCreateClient(int64_t clusterId) {
MonitorClient* pMonitor = NULL;
taosWLockLatch(&monitorLock);
if (taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES) == NULL) {
tscInfo("clusterId:0x%" PRIx64 ", create monitor", clusterId);
pMonitor = taosMemoryCalloc(1, sizeof(MonitorClient));
if (pMonitor == NULL) {
tscError("failed to create monitor client");
goto fail;
}
pMonitor->clusterId = clusterId;
char clusterKey[32] = {0};
if (snprintf(clusterKey, sizeof(clusterKey), "%" PRId64, clusterId) < 0) {
tscError("failed to create cluster key");
goto fail;
}
pMonitor->registry = taos_collector_registry_new(clusterKey);
if (pMonitor->registry == NULL) {
tscError("failed to create registry");
goto fail;
}
pMonitor->colector = taos_collector_new(clusterKey);
if (pMonitor->colector == NULL) {
tscError("failed to create collector");
goto fail;
}
int r = taos_collector_registry_register_collector(pMonitor->registry, pMonitor->colector);
if (r) {
tscError("failed to register collector, ret:%d", r);
goto fail;
}
pMonitor->counters =
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (pMonitor->counters == NULL) {
tscError("failed to create monitor counters");
goto fail;
}
if (taosHashPut(monitorCounterHash, &clusterId, LONG_BYTES, &pMonitor, POINTER_BYTES) != 0) {
tscError("failed to put monitor client to hash");
goto fail;
}
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if (pInst == NULL) {
tscError("failed to get app instance by cluster id");
pMonitor = NULL;
goto fail;
}
pMonitor->timer =
taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
if (pMonitor->timer == NULL) {
tscError("failed to start timer");
goto fail;
}
tscInfo("clusterId:0x%" PRIx64 ", create monitor finished, monitor:%p", clusterId, pMonitor);
}
taosWUnLockLatch(&monitorLock);
return;
fail:
destroyMonitorClient(&pMonitor);
taosWUnLockLatch(&monitorLock);
}
void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count,
const char** label_keys) {
taosWLockLatch(&monitorLock);
MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
if (ppMonitor == NULL || *ppMonitor == NULL) {
tscError("failed to get monitor client");
goto end;
}
taos_counter_t* newCounter = taos_counter_new(name, help, label_key_count, label_keys);
if (newCounter == NULL) return;
MonitorClient* pMonitor = *ppMonitor;
if (taos_collector_add_metric(pMonitor->colector, newCounter) != 0) {
tscError("failed to add metric to collector");
int r = taos_counter_destroy(newCounter);
if (r) {
tscError("failed to destroy counter, code:%d", r);
}
goto end;
}
if (taosHashPut(pMonitor->counters, name, strlen(name), &newCounter, POINTER_BYTES) != 0) {
tscError("failed to put counter to monitor");
int r = taos_counter_destroy(newCounter);
if (r) {
tscError("failed to destroy counter, code:%d", r);
}
goto end;
}
tscInfo("clusterId:0x%" PRIx64 ", monitor:%p, create counter:%s %p", pMonitor->clusterId, pMonitor, name,
newCounter);
end:
taosWUnLockLatch(&monitorLock);
}
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) {
taosWLockLatch(&monitorLock);
if (atomic_load_32(&monitorFlag) == 1) {
taosWUnLockLatch(&monitorLock);
return;
}
MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES);
if (ppMonitor == NULL || *ppMonitor == NULL) {
tscError("clusterId:0x%" PRIx64 ", monitor not found", clusterId);
goto end;
}
MonitorClient* pMonitor = *ppMonitor;
taos_counter_t** ppCounter = (taos_counter_t**)taosHashGet(pMonitor->counters, counterName, strlen(counterName));
if (ppCounter == NULL || *ppCounter == NULL) {
tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s not found", clusterId, pMonitor, counterName);
goto end;
}
if (taos_counter_inc(*ppCounter, label_values) != 0) {
tscError("clusterId:0x%" PRIx64 ", monitor:%p counter:%s inc failed", clusterId, pMonitor, counterName);
goto end;
}
tscTrace("clusterId:0x%" PRIx64 ", monitor:%p, counter:%s inc", pMonitor->clusterId, pMonitor, counterName);
end:
taosWUnLockLatch(&monitorLock);
}
const char* monitorResultStr(SQL_RESULT_CODE code) {
static const char* result_state[] = {"Success", "Failed", "Cancel"};
return result_state[code];
}
static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char* tmpPath) {
TdFilePtr pFile = NULL;
void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES);
if (tmp == NULL) {
char path[PATH_MAX] = {0};
char clusterId[32] = {0};
if (snprintf(clusterId, sizeof(clusterId), "%" PRIx64, slowLogData->clusterId) < 0) {
tscError("failed to generate clusterId:0x%" PRIx64, slowLogData->clusterId);
return;
}
taosGetTmpfilePath(tmpPath, clusterId, path);
tscInfo("monitor create slow log file:%s", path);
pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
tscError("failed to open file:%s since %d", path, terrno);
return;
}
SlowLogClient* pClient = taosMemoryCalloc(1, sizeof(SlowLogClient));
if (pClient == NULL) {
tscError("failed to allocate memory for slow log client");
int32_t ret = taosCloseFile(&pFile);
if (ret != 0) {
tscError("failed to close file:%p ret:%d", pFile, ret);
}
return;
}
pClient->lastCheckTime = taosGetMonoTimestampMs();
tstrncpy(pClient->path, path, PATH_MAX);
pClient->offset = 0;
pClient->pFile = pFile;
if (taosHashPut(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES, &pClient, POINTER_BYTES) != 0) {
tscError("failed to put clusterId:0x%" PRIx64 " to hash table", slowLogData->clusterId);
int32_t ret = taosCloseFile(&pFile);
if (ret != 0) {
tscError("failed to close file:%p ret:%d", pFile, ret);
}
taosMemoryFree(pClient);
return;
}
if (taosLockFile(pFile) < 0) {
tscError("failed to lock file:%p since %s", pFile, terrstr());
return;
}
} else {
pFile = (*(SlowLogClient**)tmp)->pFile;
}
if (taosLSeekFile(pFile, 0, SEEK_END) < 0) {
tscError("failed to seek file:%p code:%d", pFile, terrno);
return;
}
if (taosWriteFile(pFile, slowLogData->data, strlen(slowLogData->data) + 1) < 0) {
tscError("failed to write len to file:%p since %s", pFile, terrstr());
}
tscDebug("monitor write slow log to file:%p, clusterId:0x%" PRIx64, pFile, slowLogData->clusterId);
}
static char* readFile(TdFilePtr pFile, int64_t* offset, int64_t size) {
tscDebug("monitor readFile slow begin pFile:%p, offset:%" PRId64 ", size:%" PRId64, pFile, *offset, size);
if (taosLSeekFile(pFile, *offset, SEEK_SET) < 0) {
tscError("failed to seek file:%p code:%d", pFile, terrno);
return NULL;
}
if ((size <= *offset)) {
tscError("invalid size:%" PRId64 ", offset:%" PRId64, size, *offset);
terrno = TSDB_CODE_TSC_INTERNAL_ERROR;
return NULL;
}
char* pCont = NULL;
int64_t totalSize = 0;
if (size - *offset >= SLOW_LOG_SEND_SIZE_MAX) {
totalSize = 4 + SLOW_LOG_SEND_SIZE_MAX;
} else {
totalSize = 4 + (size - *offset);
}
pCont = taosMemoryCalloc(1, totalSize); // 4 reserved for []
if (pCont == NULL) {
tscError("failed to allocate memory for slow log, size:%" PRId64, totalSize);
return NULL;
}
char* buf = pCont;
(void)strncat(buf++, "[", totalSize - 1);
int64_t readSize = taosReadFile(pFile, buf, totalSize - 4); // 4 reserved for []
if (readSize <= 0) {
if (readSize < 0) {
tscError("failed to read len from file:%p since %s", pFile, terrstr());
}
taosMemoryFree(pCont);
return NULL;
}
totalSize = 0;
while (1) {
size_t len = strlen(buf);
totalSize += (len + 1);
if (totalSize > readSize || len == 0) {
*(buf - 1) = ']';
*buf = '\0';
break;
}
buf[len] = ','; // replace '\0' with ','
buf += (len + 1);
*offset += (len + 1);
}
tscDebug("monitor readFile slow log end, data:%s, offset:%" PRId64, pCont, *offset);
return pCont;
}
static int64_t getFileSize(char* path) {
int64_t fileSize = 0;
if (taosStatFile(path, &fileSize, NULL, NULL) < 0) {
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
return fileSize;
}
static int32_t sendSlowLog(int64_t clusterId, char* data, TdFilePtr pFile, int64_t offset, SLOW_LOG_QUEUE_TYPE type,
char* fileName, void* pTransporter, SEpSet* epSet) {
if (data == NULL) {
if (taosCloseFile(&pFile) != 0) {
tscError("failed to close file:%p", pFile);
}
taosMemoryFree(fileName);
return TSDB_CODE_INVALID_PARA;
}
MonitorSlowLogData* pParam = taosMemoryMalloc(sizeof(MonitorSlowLogData));
if (pParam == NULL) {
if (taosCloseFile(&pFile) != 0) {
tscError("failed to close file:%p", pFile);
}
taosMemoryFree(data);
taosMemoryFree(fileName);
return terrno;
}
pParam->data = data;
pParam->offset = offset;
pParam->clusterId = clusterId;
pParam->type = type;
pParam->pFile = pFile;
pParam->fileName = fileName;
return sendReport(pTransporter, epSet, data, MONITOR_TYPE_SLOW_LOG, pParam);
}
static int32_t monitorReadSend(int64_t clusterId, TdFilePtr pFile, int64_t* offset, int64_t size,
SLOW_LOG_QUEUE_TYPE type, char* fileName) {
SAppInstInfo* pInst = getAppInstByClusterId(clusterId);
if (pInst == NULL) {
tscError("failed to get app instance by clusterId:0x%" PRIx64, clusterId);
if (taosCloseFile(&pFile) != 0) {
tscError("failed to close file:%p", pFile);
}
taosMemoryFree(fileName);
return terrno;
}
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
char* data = readFile(pFile, offset, size);
if (data == NULL) return terrno;
return sendSlowLog(clusterId, data, (type == SLOW_LOG_READ_BEGINNIG ? pFile : NULL), *offset, type, fileName,
pInst->pTransporter, &ep);
}
static void monitorSendSlowLogAtBeginning(int64_t clusterId, char** fileName, TdFilePtr pFile, int64_t offset) {
if (fileName == NULL) {
return;
}
int64_t size = getFileSize(*fileName);
if (size <= offset) {
processFileInTheEnd(pFile, *fileName);
tscDebug("monitor delete file:%s", *fileName);
} else {
int32_t code = monitorReadSend(clusterId, pFile, &offset, size, SLOW_LOG_READ_BEGINNIG, *fileName);
if (code == 0) {
tscDebug("monitor send slow log succ, clusterId:0x%" PRIx64, clusterId);
} else {
tscError("monitor send slow log failed, clusterId:0x%" PRIx64 ", ret:%d", clusterId,
code);
}
*fileName = NULL;
}
}
static void monitorSendSlowLogAtRunning(int64_t clusterId) {
void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
if (tmp == NULL) {
tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
return;
}
SlowLogClient* pClient = (*(SlowLogClient**)tmp);
if (pClient == NULL) {
tscError("failed to get slow log client by clusterId:0x%" PRIx64, clusterId);
return;
}
int64_t size = getFileSize(pClient->path);
if (size <= pClient->offset) {
if (taosFtruncateFile(pClient->pFile, 0) < 0) {
tscError("failed to truncate file:%p code:%d", pClient->pFile, terrno);
}
tscDebug("monitor truncate file to 0 file:%p", pClient->pFile);
pClient->offset = 0;
} else {
int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", clusterId, code);
}
}
static bool monitorSendSlowLogAtQuit(int64_t clusterId) {
void* tmp = taosHashGet(monitorSlowLogHash, &clusterId, LONG_BYTES);
if (tmp == NULL) {
return true;
}
SlowLogClient* pClient = (*(SlowLogClient**)tmp);
if (pClient == NULL) {
return true;
}
int64_t size = getFileSize(pClient->path);
if (size <= pClient->offset) {
processFileInTheEnd(pClient->pFile, pClient->path);
pClient->pFile = NULL;
tscInfo("monitor remove file:%s", pClient->path);
if ((--quitCnt) == 0) {
return true;
}
} else {
int32_t code = monitorReadSend(clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", clusterId, code);
}
return false;
}
static void monitorSendAllSlowLogAtQuit() {
void* pIter = NULL;
while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
SlowLogClient* pClient = (*(SlowLogClient**)pIter);
if (pClient == NULL) {
continue;
}
int64_t size = getFileSize(pClient->path);
if (size <= pClient->offset) {
processFileInTheEnd(pClient->pFile, pClient->path);
pClient->pFile = NULL;
} else if (pClient->offset == 0) {
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_QUIT, NULL);
tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", *clusterId, code);
if (code == 0) {
quitCnt++;
}
}
}
}
static void processFileRemoved(SlowLogClient* pClient) {
if (taosUnLockFile(pClient->pFile) != 0) {
tscError("failed to unlock file:%s since %d", pClient->path, terrno);
return;
}
int32_t ret = taosCloseFile(&(pClient->pFile));
if (ret != 0) {
tscError("failed to close file:%p ret:%d", pClient->pFile, ret);
return;
}
TdFilePtr pFile =
taosOpenFile(pClient->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_READ | TD_FILE_TRUNC);
if (pFile == NULL) {
tscError("failed to open file:%s since %d", pClient->path, terrno);
} else {
pClient->pFile = pFile;
}
}
static void monitorSendAllSlowLog() {
int64_t t = taosGetMonoTimestampMs();
void* pIter = NULL;
while ((pIter = taosHashIterate(monitorSlowLogHash, pIter))) {
int64_t* clusterId = (int64_t*)taosHashGetKey(pIter, NULL);
SAppInstInfo* pInst = getAppInstByClusterId(*clusterId);
SlowLogClient* pClient = (*(SlowLogClient**)pIter);
if (pClient == NULL || pInst == NULL) {
taosHashCancelIterate(monitorSlowLogHash, pIter);
return;
}
if (t - pClient->lastCheckTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000) {
pClient->lastCheckTime = t;
} else {
continue;
}
if (pClient->offset == 0) {
int64_t size = getFileSize(pClient->path);
if (size <= 0) {
if (size < 0) {
tscError("monitor failed to get file size:%s, err:%d", pClient->path, ERRNO);
if (ERRNO == ENOENT) {
processFileRemoved(pClient);
}
}
continue;
}
int32_t code = monitorReadSend(*clusterId, pClient->pFile, &pClient->offset, size, SLOW_LOG_READ_RUNNING, NULL);
tscDebug("monitor send slow log clusterId:0x%" PRIx64 ", ret:%d", *clusterId, code);
}
}
}
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) {
tscInfo("monitor is disabled, skip send slow log");
return;
}
char namePrefix[PATH_MAX] = {0};
if (snprintf(namePrefix, sizeof(namePrefix), "%s%" PRIx64, TD_TMP_FILE_PREFIX, clusterId) < 0) {
tscError("failed to generate slow log file name prefix");
return;
}
char tmpPath[PATH_MAX] = {0};
if (getSlowLogTmpDir(tmpPath, sizeof(tmpPath)) < 0) {
return;
}
TdDirPtr pDir = taosOpenDir(tmpPath);
if (pDir == NULL) {
return;
}
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
if (taosDirEntryIsDir(de)) {
continue;
}
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || strstr(name, namePrefix) == NULL) {
tscInfo("skip file:%s, for cluster id:%" PRIx64, name, clusterId);
continue;
}
char filename[PATH_MAX] = {0};
(void)snprintf(filename, sizeof(filename), "%s%s", tmpPath, name);
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) {
tscError("failed to open file:%s since %s", filename, terrstr());
continue;
}
if (taosLockFile(pFile) < 0) {
tscInfo("failed to lock file:%s since %s, maybe used by other process", filename, terrstr());
int32_t ret = taosCloseFile(&pFile);
if (ret != 0) {
tscError("failed to close file:%p ret:%d", pFile, ret);
}
continue;
}
char* tmp = taosStrdup(filename);
if (tmp == NULL) {
tscError("failed to dup string:%s since %s", filename, terrstr());
if (taosUnLockFile(pFile) != 0) {
tscError("failed to unlock file:%s, terrno:%d", filename, terrno);
}
if (taosCloseFile(&(pFile)) != 0) {
tscError("failed to close file:%s, terrno:%d", filename, terrno);
}
continue;
}
monitorSendSlowLogAtBeginning(clusterId, &tmp, pFile, 0);
taosMemoryFree(tmp);
}
int32_t ret = taosCloseDir(&pDir);
if (ret != 0) {
tscError("failed to close dir, ret:%d", ret);
}
}
static void* monitorThreadFunc(void* param) {
setThreadName("client-monitor-slowlog");
tscInfo("monitor update thread started");
int64_t quitTime = 0;
while (1) {
if (atomic_load_32(&monitorFlag) == 1) {
if (quitCnt == 0) {
monitorSendAllSlowLogAtQuit();
if (quitCnt == 0) {
tscInfo("monitorThreadFunc quit since no slow log to send");
break;
}
quitTime = taosGetMonoTimestampMs();
}
if (taosGetMonoTimestampMs() - quitTime > 500) { // quit at most 500ms
tscInfo("monitorThreadFunc quit since timeout");
break;
}
}
MonitorSlowLogData* slowLogData = NULL;
taosReadQitem(monitorQueue, (void**)&slowLogData);
if (slowLogData != NULL) {
if (slowLogData->type == SLOW_LOG_READ_BEGINNIG && quitCnt == 0) {
if (slowLogData->pFile != NULL) {
monitorSendSlowLogAtBeginning(slowLogData->clusterId, &(slowLogData->fileName), slowLogData->pFile,
slowLogData->offset);
} else {
monitorSendAllSlowLogFromTempDir(slowLogData->clusterId);
}
} else if (slowLogData->type == SLOW_LOG_WRITE) {
monitorWriteSlowLog2File(slowLogData, tmpSlowLogPath);
} else if (slowLogData->type == SLOW_LOG_READ_RUNNING) {
monitorSendSlowLogAtRunning(slowLogData->clusterId);
} else if (slowLogData->type == SLOW_LOG_READ_QUIT) {
if (monitorSendSlowLogAtQuit(slowLogData->clusterId)) {
tscInfo("monitorThreadFunc quit since all slow log sended");
monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData);
break;
}
}
monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData);
}
if (quitCnt == 0) {
monitorSendAllSlowLog();
}
(void)tsem2_timewait(&monitorSem, 100);
}
return NULL;
}
static int32_t tscMonitortInit() {
TdThreadAttr thAttr;
if (taosThreadAttrInit(&thAttr) != 0) {
tscError("failed to init thread attr since %s", strerror(ERRNO));
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE) != 0) {
tscError("failed to set thread attr since %s", strerror(ERRNO));
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) {
tscError("failed to create monitor thread since %s", strerror(ERRNO));
return TSDB_CODE_TSC_INTERNAL_ERROR;
}
(void)taosThreadAttrDestroy(&thAttr);
return 0;
}
static void tscMonitorStop() {
if (taosCheckPthreadValid(monitorThread)) {
(void)taosThreadJoin(monitorThread, NULL);
taosThreadClear(&monitorThread);
}
}
int32_t monitorInit() {
int32_t code = 0;
tscInfo("monitor init");
monitorCounterHash =
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorCounterHash == NULL) {
tscError("failed to create monitorCounterHash");
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
}
taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient);
monitorSlowLogHash =
(SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (monitorSlowLogHash == NULL) {
tscError("failed to create monitorSlowLogHash");
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
}
taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient);
monitorTimer = taosTmrInit(0, 0, 0, "MONITOR");
if (monitorTimer == NULL) {
tscError("failed to create monitor timer");
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
}
code = getSlowLogTmpDir(tmpSlowLogPath, sizeof(tmpSlowLogPath));
if (code != 0) {
return code;
}
code = taosMulModeMkDir(tmpSlowLogPath, 0777, true);
if (code != 0) {
tscError("failed to create dir:%s since %s", tmpSlowLogPath, terrstr());
return code;
}
if (tsem2_init(&monitorSem, 0, 0) != 0) {
tscError("sem init error since %s", terrstr());
return TAOS_SYSTEM_ERROR(ERRNO);
}
code = taosOpenQueue(&monitorQueue);
if (code) {
tscError("open queue error since %s", terrstr());
return TAOS_GET_TERRNO(code);
}
taosInitRWLatch(&monitorLock);
return tscMonitortInit();
}
void monitorClose() {
tscInfo("monitor close");
taosWLockLatch(&monitorLock);
atomic_store_32(&monitorFlag, 1);
tscMonitorStop();
sendAllCounter();
taosHashCleanup(monitorCounterHash);
taosHashCleanup(monitorSlowLogHash);
taosTmrCleanUp(monitorTimer);
taosCloseQueue(monitorQueue);
if (tsem2_destroy(&monitorSem) != 0) {
tscError("failed to destroy semaphore");
}
taosWUnLockLatch(&monitorLock);
}
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
int32_t code = 0;
MonitorSlowLogData* slowLogData = NULL;
if (atomic_load_32(&monitorFlag) == 1) {
tscError("monitor slow log thread is exiting");
return -1;
}
code = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0, (void**)&slowLogData);
if (code) {
tscError("monitor failed to allocate slow log data");
return code;
}
*slowLogData = data;
tscDebug("monitor write slow log to queue, clusterId:0x%" PRIx64 " type:%s, data:%s", slowLogData->clusterId,
queueTypeStr[slowLogData->type], slowLogData->data);
if (taosWriteQitem(monitorQueue, slowLogData) == 0) {
if (tsem2_post(&monitorSem) != 0) {
tscError("failed to post semaphore");
}
} else {
if (taosCloseFile(&(slowLogData->pFile)) != 0) {
tscError("failed to close file:%p", slowLogData->pFile);
}
monitorFreeSlowLogData(slowLogData);
taosFreeQitem(slowLogData);
}
return 0;
}
int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tscDebug("[del report]delete reportCB code:%d", code);
return 0;
}
int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) {
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
tscError("[del report] failed to allocate memory for sendInfo");
return terrno;
}
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL};
sendInfo->requestId = requestId;
sendInfo->requestObjRefId = 0;
sendInfo->param = NULL;
sendInfo->fp = reportCB;
sendInfo->msgType = TDMT_MND_AUDIT;
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) {
tscError("[del report] failed to send msg to server, code:%d", code);
taosMemoryFree(sendInfo);
return code;
}
return TSDB_CODE_SUCCESS;
}
static void reportDeleteSql(SRequestObj* pRequest) {
SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot;
STscObj* pTscObj = pRequest->pTscObj;
if (pTscObj == NULL || pTscObj->pAppInfo == NULL) {
tscError("[del report] invalid tsc obj");
return;
}
if(pTscObj->pAppInfo->serverCfg.enableAuditDelete == 0) {
tscDebug("[del report] audit delete is disabled");
return;
}
if (pRequest->code != TSDB_CODE_SUCCESS) {
tscDebug("[del report] delete request result code:%d", pRequest->code);
return;
}
if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) {
tscError("[del report] invalid from table node type:%d", nodeType(pStmt->pFromTable));
return;
}
SRealTableNode* pTable = (SRealTableNode*)pStmt->pFromTable;
SAuditReq req;
req.pSql = pRequest->sqlstr;
req.sqlLen = pRequest->sqlLen;
TAOS_UNUSED(tsnprintf(req.table, TSDB_TABLE_NAME_LEN, "%s", pTable->table.tableName));
TAOS_UNUSED(tsnprintf(req.db, TSDB_DB_FNAME_LEN, "%s", pTable->table.dbName));
TAOS_UNUSED(tsnprintf(req.operation, AUDIT_OPERATION_LEN, "delete"));
int32_t tlen = tSerializeSAuditReq(NULL, 0, &req);
void* pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("[del report] failed to allocate memory for req");
return;
}
if (tSerializeSAuditReq(pReq, tlen, &req) < 0) {
tscError("[del report] failed to serialize req");
taosMemoryFree(pReq);
return;
}
int32_t code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId);
if (code != 0) {
tscError("[del report] failed to send audit info, code:%d", code);
taosMemoryFree(pReq);
return;
}
tscDebug("[del report] delete data, sql:%s", req.pSql);
}
void clientOperateReport(SRequestObj* pRequest) {
if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) {
tscError("[del report] invalid request");
return;
}
if (QUERY_NODE_DELETE_STMT == nodeType(pRequest->pQuery->pRoot)) {
reportDeleteSql(pRequest);
}
}