TDengine/source/dnode/mnode/impl/src/mndStreamUtil.c

299 lines
10 KiB
C
Raw Normal View History

2024-01-25 08:55:05 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2024-08-28 05:33:43 +00:00
#include "mndDb.h"
#include "mndStb.h"
2024-01-25 08:55:05 +00:00
#include "mndStream.h"
#include "mndTrans.h"
#include "mndVgroup.h"
2024-08-28 05:33:43 +00:00
#include "taoserror.h"
#include "tmisce.h"
2024-01-25 08:55:05 +00:00
2024-07-22 05:31:57 +00:00
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) {
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
*pStream = NULL;
2024-01-25 08:55:05 +00:00
2024-07-22 05:31:57 +00:00
SStreamObj *p = NULL;
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&p)) != NULL) {
if (p->uid == streamId) {
2024-01-25 08:55:05 +00:00
sdbCancelFetch(pSdb, pIter);
2024-07-22 05:31:57 +00:00
*pStream = p;
return TSDB_CODE_SUCCESS;
2024-01-25 08:55:05 +00:00
}
2024-07-22 05:31:57 +00:00
sdbRelease(pSdb, p);
2024-01-25 08:55:05 +00:00
}
2024-07-22 05:31:57 +00:00
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
2024-01-25 08:55:05 +00:00
}
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
int32_t num = 0;
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
for (int32_t i = 0; i < taosArrayGetSize(pStream->pTaskList); ++i) {
SArray *pLevel = taosArrayGetP(pStream->pTaskList, i);
2024-01-25 08:55:05 +00:00
num += taosArrayGetSize(pLevel);
}
return num;
}
2024-09-19 10:13:46 +00:00
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
TAOS_RETURN(TSDB_CODE_MND_DB_NOT_SELECTED);
2024-01-29 05:46:33 +00:00
}
2024-09-19 10:13:46 +00:00
int32_t numOfStreams = 0;
void *pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
2024-01-29 05:46:33 +00:00
2024-09-19 10:13:46 +00:00
if (pStream->sourceDbUid == pDb->uid) {
numOfStreams++;
2024-07-22 05:31:57 +00:00
}
2024-09-19 10:13:46 +00:00
sdbRelease(pSdb, pStream);
2024-01-29 05:46:33 +00:00
}
2024-09-19 10:13:46 +00:00
*pNumOfStreams = numOfStreams;
mndReleaseDb(pMnode, pDb);
2024-01-29 05:46:33 +00:00
return 0;
}
static void mndShowStreamStatus(char *dst, int8_t status) {
if (status == STREAM_STATUS__NORMAL) {
tstrncpy(dst, "ready", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__STOP) {
tstrncpy(dst, "stop", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__FAILED) {
tstrncpy(dst, "failed", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__RECOVER) {
tstrncpy(dst, "recover", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__PAUSE) {
tstrncpy(dst, "paused", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (status == STREAM_STATUS__INIT) {
tstrncpy(dst, "init", MND_STREAM_TRIGGER_NAME_SIZE);
}
}
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->conf.trigger;
if (trigger == STREAM_TRIGGER_AT_ONCE) {
tstrncpy(dst, "at once", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
tstrncpy(dst, "window close", MND_STREAM_TRIGGER_NAME_SIZE);
} else if (trigger == STREAM_TRIGGER_MAX_DELAY) {
tstrncpy(dst, "max delay", MND_STREAM_TRIGGER_NAME_SIZE);
2024-08-06 08:14:15 +00:00
} else if (trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
tstrncpy(dst, "force window close", MND_STREAM_TRIGGER_NAME_SIZE);
}
}
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
memset(pBuf, 0, bufLen);
pBuf[2] = '0';
pBuf[3] = 'x';
int32_t len = tintToHex(id, &pBuf[4]);
varDataSetLen(pBuf, len + 2);
}
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) {
int32_t code = 0;
int32_t cols = 0;
int32_t lino = 0;
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
TSDB_CHECK_CODE(code, lino, _end);
// create time
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false);
TSDB_CHECK_CODE(code, lino, _end);
// stream id
char buf[128] = {0};
int64ToHexStr(pStream->uid, buf, tListLen(buf));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, buf, false);
TSDB_CHECK_CODE(code, lino, _end);
// related fill-history stream id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
if (pStream->hTaskUid != 0) {
int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
code = colDataSetVal(pColInfo, numOfRows, buf, false);
} else {
code = colDataSetVal(pColInfo, numOfRows, buf, true);
}
TSDB_CHECK_CODE(code, lino, _end);
// related fill-history stream id
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
TSDB_CHECK_CODE(code, lino, _end);
char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
bool isPaused = false;
2025-04-19 10:58:51 +00:00
//code = isAllTaskPaused(pStream, &isPaused);
TSDB_CHECK_CODE(code, lino, _end);
int8_t streamStatus = atomic_load_8(&pStream->status);
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
if (isPaused && pStream->pTaskList != NULL) {
streamStatus = STREAM_STATUS__PAUSE;
}
mndShowStreamStatus(status2, streamStatus);
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
TSDB_CHECK_CODE(code, lino, _end);
char sourceDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(sourceDB, mndGetDbStr(pStream->sourceDb), sizeof(sourceDB));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&sourceDB, false);
TSDB_CHECK_CODE(code, lino, _end);
char targetDB[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(targetDB, mndGetDbStr(pStream->targetDb), sizeof(targetDB));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetDB, false);
TSDB_CHECK_CODE(code, lino, _end);
if (pStream->targetSTbName[0] == 0) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, NULL, true);
} else {
char targetSTB[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(targetSTB, mndGetStbStr(pStream->targetSTbName), sizeof(targetSTB));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&targetSTB, false);
}
TSDB_CHECK_CODE(code, lino, _end);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
TSDB_CHECK_CODE(code, lino, _end);
char trigger[20 + VARSTR_HEADER_SIZE] = {0};
char trigger2[MND_STREAM_TRIGGER_NAME_SIZE] = {0};
mndShowStreamTrigger(trigger2, pStream);
STR_WITH_MAXSIZE_TO_VARSTR(trigger, trigger2, sizeof(trigger));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false);
TSDB_CHECK_CODE(code, lino, _end);
// sink_quota
char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0};
sinkQuota[0] = '0';
char dstStr[20] = {0};
STR_TO_VARSTR(dstStr, sinkQuota)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
TSDB_CHECK_CODE(code, lino, _end);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
// checkpoint backup type
char backup[20 + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(backup, "none")
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)backup, false);
TSDB_CHECK_CODE(code, lino, _end);
// history scan idle
char scanHistoryIdle[20 + VARSTR_HEADER_SIZE] = {0};
tstrncpy(scanHistoryIdle, "100a", sizeof(scanHistoryIdle));
memset(dstStr, 0, tListLen(dstStr));
STR_TO_VARSTR(dstStr, scanHistoryIdle)
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
code = colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false);
TSDB_CHECK_CODE(code, lino, _end);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno);
char msg[TSDB_RESERVE_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
if (streamStatus == STREAM_STATUS__FAILED){
STR_TO_VARSTR(msg, pStream->reserve)
} else {
STR_TO_VARSTR(msg, " ")
}
code = colDataSetVal(pColInfo, numOfRows, (const char *)msg, false);
_end:
if (code) {
mError("error happens when build stream attr result block, lino:%d, code:%s", lino, tstrerror(code));
}
return code;
}
2025-04-10 11:41:37 +00:00
int32_t mndStreamCheckSnodeExists(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SSnodeObj *pObj = NULL;
2025-04-10 11:41:37 +00:00
while (1) {
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
2025-04-10 11:41:37 +00:00
sdbRelease(pSdb, pObj);
sdbCancelFetch(pSdb, pIter);
return TSDB_CODE_SUCCESS;
}
2025-04-10 11:41:37 +00:00
return TSDB_CODE_SNODE_NOT_DEPLOYED;
}