TDengine/source/dnode/mnode/impl/src/mndStreamUtil.c

404 lines
14 KiB
C
Raw Normal View History

2024-01-25 08:55:05 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2024-08-28 05:33:43 +00:00
#include "mndDb.h"
#include "mndStb.h"
2024-01-25 08:55:05 +00:00
#include "mndStream.h"
#include "mndTrans.h"
#include "mndVgroup.h"
2024-08-28 05:33:43 +00:00
#include "taoserror.h"
#include "tmisce.h"
2024-01-25 08:55:05 +00:00
2025-04-30 10:39:59 +00:00
int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
if (pStream->pCreate->streamId == streamId) {
*dropped = pStream->userDropped ? true : false;
sdbRelease(pSdb, pStream);
return TSDB_CODE_SUCCESS;
}
sdbRelease(pSdb, pStream);
}
*dropped = true;
return TSDB_CODE_SUCCESS;
}
2024-01-25 08:55:05 +00:00
2025-04-30 10:39:59 +00:00
int32_t mstGetStreamsNumInDb(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
2024-09-19 10:13:46 +00:00
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
2024-01-29 05:46:33 +00:00
}
2024-09-19 10:13:46 +00:00
int32_t numOfStreams = 0;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
2024-01-29 05:46:33 +00:00
2025-04-24 01:11:54 +00:00
/* STREAMTODO
2024-09-19 10:13:46 +00:00
if (pStream->sourceDbUid == pDb->uid) {
numOfStreams++;
2024-07-22 05:31:57 +00:00
}
2025-04-24 01:11:54 +00:00
*/
2024-09-19 10:13:46 +00:00
sdbRelease(pSdb, pStream);
2024-01-29 05:46:33 +00:00
}
2024-09-19 10:13:46 +00:00
*pNumOfStreams = numOfStreams;
mndReleaseDb(pMnode, pDb);
2024-01-29 05:46:33 +00:00
return 0;
}
2025-04-30 10:39:59 +00:00
static void mstShowStreamStatus(char *dst, int8_t status, int32_t bufLen) {
2025-04-24 01:11:54 +00:00
if (status == STREAM_STATUS_INIT) {
tstrncpy(dst, "init", bufLen);
} else if (status == STREAM_STATUS_RUNNING) {
tstrncpy(dst, "running", bufLen);
} else if (status == STREAM_STATUS_STOPPED) {
tstrncpy(dst, "stopped", bufLen);
} else if (status == STREAM_STATUS_FAILED) {
tstrncpy(dst, "failed", bufLen);
}
}
2025-04-30 10:39:59 +00:00
int32_t mstGenerateResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
int32_t code = 0;
int32_t cols = 0;
int32_t lino = 0;
2025-04-24 01:11:54 +00:00
/* STREAMTODO
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
TSDB_CHECK_CODE(code, lino, _end);
// create time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
TSDB_CHECK_CODE(code, lino, _end);
// stream id
char buf[128] = {0};
int64ToHexStr(pStream->uid, buf, tListLen(buf));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, buf, false);
TSDB_CHECK_CODE(code, lino, _end);
// related fill-history stream id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
if (pStream->hTaskUid != 0) {
int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
code = colDataSetVal(pColInfo, numOfRows, buf, false);
} else {
code = colDataSetVal(pColInfo, numOfRows, buf, true);
}
TSDB_CHECK_CODE(code, lino, _end);
// related fill-history stream id
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
TSDB_CHECK_CODE(code, lino, _end);
char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
bool isPaused = false;
2025-04-19 10:58:51 +00:00
//code = isAllTaskPaused(pStream, &isPaused);
TSDB_CHECK_CODE(code, lino, _end);
int8_t streamStatus = atomic_load_8(&pStream->status);
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
if (isPaused && pStream->pTaskList != NULL) {
streamStatus = STREAM_STATUS__PAUSE;
}
mndShowStreamStatus(status2, streamStatus);
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
TSDB_CHECK_CODE(code, lino, _end);
char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
TSDB_CHECK_CODE(code, lino, _end);
char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
TSDB_CHECK_CODE(code, lino, _end);
if (pStream->targetSTbName[0] == 0) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, NULL, true);
} else {
char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
}
TSDB_CHECK_CODE(code, lino, _end);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
TSDB_CHECK_CODE(code, lino, _end);
char trigger[20 + VARSTR_HEADER_SIZE] = {0};
char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
mndShowStreamTrigger(trigger2, pStream);
STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
TSDB_CHECK_CODE(code, lino, _end);
// sink_quota
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
sinkQuota[0] = '0';
char dstStr[20] = {0};
STR_TO_VARSTR(dstStr, sinkQuota)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
TSDB_CHECK_CODE(code, lino, _end);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
// checkpoint backup type
char backup[20 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(backup, "none")
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
TSDB_CHECK_CODE(code, lino, _end);
// history scan idle
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
memset(dstStr, 0, tListLen(dstStr));
STR_TO_VARSTR(dstStr, scanHistoryIdle)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
TSDB_CHECK_CODE(code, lino, _end);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
if (streamStatus == STREAM_STATUS__FAILED){
STR_TO_VARSTR(msg, pStream->reserve)
} else {
STR_TO_VARSTR(msg, " ")
}
code = colDataSetVal(pColInfo, numOfRows, (const char *)msg, false);
_end:
if (code) {
mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
}
2025-04-24 01:11:54 +00:00
*/
return code;
}
2025-04-30 10:39:59 +00:00
int32_t mstCheckSnodeExists(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSnodeObj *pObj = NULL;
2025-04-10 11:41:37 +00:00
while (1) {
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
2025-04-10 11:41:37 +00:00
sdbRelease(pSdb, pObj);
sdbCancelFetch(pSdb, pIter);
return TSDB_CODE_SUCCESS;
}
2025-04-10 11:41:37 +00:00
return TSDB_CODE_SNODE_NOT_DEPLOYED;
}
2025-05-10 09:16:39 +00:00
bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param) {
while (0 == atomic_load_64(&pQueue->qRemainNum)) {
return false;
}
SStmQNode *orig = pQueue->head;
SStmQNode *node = pQueue->head->next;
pQueue->head = pQueue->head->next;
*param = node;
atomic_sub_fetch_64(&pQueue->qRemainNum, 1);
return true;
}
void mndStreamActionEnqueue(SStmActionQ* pQueue, SStmQNode* param) {
taosWLockLatch(&pQueue->lock);
pQueue->tail->next = param;
pQueue->tail = param;
taosWUnLockLatch(&pQueue->lock);
atomic_add_fetch_64(&pQueue->qRemainNum, 1);
}
void mndStreamPostAction(SStmActionQ* actionQ, int64_t streamId, char* streamName, int32_t action) {
SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
if (NULL == pNode) {
mstError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
return;
}
pNode->type = action;
pNode->streamAct = true;
pNode->action.stream.streamId = streamId;
TAOS_STRCPY(pNode->action.stream.streamName, streamName);
pNode->next = NULL;
mndStreamActionEnqueue(actionQ, pNode);
}
2025-05-13 06:37:07 +00:00
void mndStreamPostTaskAction(SStmActionQ* actionQ, SStmTaskAction* pAction, int32_t action) {
2025-05-10 09:16:39 +00:00
SStmQNode *pNode = taosMemoryMalloc(sizeof(SStmQNode));
if (NULL == pNode) {
2025-05-13 06:37:07 +00:00
int64_t streamId = pAction->streamId;
2025-05-10 09:16:39 +00:00
mstError("%s failed at line %d, error:%s", __FUNCTION__, __LINE__, tstrerror(terrno));
return;
}
pNode->type = action;
pNode->streamAct = false;
2025-05-13 06:37:07 +00:00
pNode->action.task = *pAction;
2025-05-10 09:16:39 +00:00
pNode->next = NULL;
mndStreamActionEnqueue(actionQ, pNode);
}
2025-05-19 09:46:36 +00:00
void mndStreamLogSStreamObj(char* tips, SStreamObj* p) {
if (NULL == p) {
stDebug("%s: stream is NULL", tips);
return;
}
stDebug("%s: stream obj", tips);
stDebug("name:%s mainSnodeId:%d userDropped:%d userStopped:%d createTime:%" PRId64 " updateTime:%" PRId64,
p->name, p->mainSnodeId, p->userDropped, p->userStopped, p->createTime, p->updateTime);
SCMCreateStreamReq* q = p->pCreate;
if (NULL == q) {
stDebug("stream pCreate is NULL");
return;
}
int64_t streamId = q->streamId;
int32_t calcDBNum = taosArrayGetSize(q->calcDB);
2025-05-20 00:15:23 +00:00
int32_t calcScanNum = taosArrayGetSize(q->calcScanPlanList);
int32_t notifyUrlNum = taosArrayGetSize(q->pNotifyAddrUrls);
int32_t outColNum = taosArrayGetSize(q->outCols);
int32_t outTagNum = taosArrayGetSize(q->outTags);
int32_t forceOutColNum = taosArrayGetSize(q->forceOutCols);
mstDebug("create_info: name:%d sql:%s streamDB:%s triggerDB:%s outDB:%s calcDBNum:%d triggerTblName:%s outTblName:%s "
"igExists:%d triggerType:%d igDisorder:%d deleteReCalc:%d deleteOutTbl:%d fillHistory:%d fillHistroyFirst:%d "
"calcNotifyOnly:%d lowLatencyCalc:%d notifyUrlNum:%d notifyEventTypes:%d notifyErrorHandle:%d notifyHistory:%d "
"outColsNum:%d outTagsNum:%d maxDelay:%" PRId64 " fillHistoryStartTs:%" PRId64 " watermark:%" PRId64 " expiredTime:%" PRId64 " "
"triggerTblType:%d triggerTblUid:%" PRIu64 " outTblType:%d outStbExists:%d outStbUid:%" PRIu64 " outStbSversion:%d "
"eventTypes:0x%" PRIx64 " flags:0x%" PRIx64 " tsmaId:0x%" PRIx64 " placeHolderBitmap:0x%" PRIx64 " tsSlotId:%d "
"triggerTblVgId:%d outTblVgId:%d triggerCols:[%s] partitionCols:[%s] triggerScanPlan:[%s] calcPlan:[%s] calcScanPlanNum:%d "
"subTblNameExpr:[%s] tagValueExpr:[%s] forceOutCols:%d",
q->name, q->sql, q->streamDB, q->triggerDB, q->outDB, calcDBNum, q->triggerTblName, q->outTblName,
q->igExists, q->triggerType, q->igDisorder, q->deleteReCalc, q->deleteOutTbl, q->fillHistory, q->fillHistoryFirst,
q->calcNotifyOnly, q->lowLatencyCalc, notifyUrlNum, q->notifyEventTypes, q->notifyErrorHandle, q->notifyHistory,
outColNum, outTagNum, q->maxDelay, q->fillHistoryStartTime, q->watermark, q->expiredTime,
q->triggerTblType, q->triggerTblUid, q->outTblType, q->outStbExists, q->outStbUid, q->outStbSversion,
q->eventTypes, q->flags, q->tsmaId, q->placeHolderBitmap, q->tsSlotId,
q->triggerTblVgId, q->outTblVgId, q->triggerCols, q->partitionCols, q->triggerScanPlan, q->calcPlan, calcScanNum,
q->subTblNameExpr, q->tagValueExpr, forceOutColNum);
2025-05-19 09:46:36 +00:00
for (int32_t i = 0; i < calcDBNum; ++i) {
2025-05-20 00:15:23 +00:00
char* dbName = taosArrayGetP(q->calcDB, i);
mstDebug("create_info: calcDB[%d] - %s", i, dbName);
2025-05-19 09:46:36 +00:00
}
for (int32_t i = 0; i < calcScanNum; ++i) {
SStreamCalcScan* pScan = taosArrayGet(q->calcScanPlanList, i);
int32_t vgNum = taosArrayGetSize(pScan->vgList);
2025-05-20 00:15:23 +00:00
mstDebugL("create_info: calcScanPlan[%d] - readFromCache:%d vgNum:%d scanPlan:[%s]", i, pScan->readFromCache, vgNum, pScan->scanPlan);
2025-05-19 09:46:36 +00:00
for (int32_t v = 0; v < vgNum; ++v) {
2025-05-20 00:15:23 +00:00
mstDebug("create_info: calcScanPlan[%d] vg[%d] - vgId:%d", i, v, *(int32_t*)taosArrayGet(pScan->vgList, v));
2025-05-19 09:46:36 +00:00
}
}
2025-05-20 00:15:23 +00:00
for (int32_t i = 0; i < notifyUrlNum; ++i) {
char* url = taosArrayGetP(q->pNotifyAddrUrls, i);
mstDebug("create_info: notifyUrl[%d] - %s", i, url);
}
for (int32_t i = 0; i < outColNum; ++i) {
SFieldWithOptions* o = taosArrayGetP(q->outCols, i);
mstDebug("create_info: outCol[%d] - name:%s type:%d flags:%d bytes:%d compress:%u typeMod:%d",
i, o->name, o->type, o->flags, o->bytes, o->compress, o->typeMod);
}
2025-05-19 09:46:36 +00:00
}
2025-05-10 09:16:39 +00:00