TDengine/source/libs/executor/src/streamNotify.c

914 lines
34 KiB
C
Raw Normal View History

2024-05-16 08:24:58 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamexecutorInt.h"
2024-05-16 08:24:58 +00:00
#include "executorInt.h"
#include "operator.h"
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
#include "cmdnodes.h"
2024-12-26 06:33:14 +00:00
#include "tdatablock.h"
2024-05-16 08:24:58 +00:00
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
#define UPDATE_OPERATOR_INFO BIT_FLAG_MASK(0)
#define FILL_HISTORY_OPERATOR BIT_FLAG_MASK(1)
#define RECALCULATE_OPERATOR BIT_FLAG_MASK(2)
#define SEMI_OPERATOR BIT_FLAG_MASK(3)
#define FINAL_OPERATOR BIT_FLAG_MASK(4)
#define SINGLE_OPERATOR BIT_FLAG_MASK(5)
#define NOTIFY_EVENT_NAME_CACHE_LIMIT_MB 16
typedef struct SStreamNotifyEvent {
uint64_t gid;
int64_t eventType;
STimeWindow win;
cJSON* pJson;
} SStreamNotifyEvent;
#define NOTIFY_EVENT_KEY_SIZE \
((sizeof(((struct SStreamNotifyEvent*)0)->gid) + sizeof(((struct SStreamNotifyEvent*)0)->eventType)) + \
sizeof(((struct SStreamNotifyEvent*)0)->win.skey))
static void destroyStreamWindowEvent(void* ptr) {
SStreamNotifyEvent* pEvent = (SStreamNotifyEvent*)ptr;
if (pEvent) {
if (pEvent->pJson) {
cJSON_Delete(pEvent->pJson);
}
*pEvent = (SStreamNotifyEvent){0};
}
}
static void destroyStreamNotifyEventSupp(SStreamNotifyEventSupp* sup) {
if (sup == NULL) return;
taosHashCleanup(sup->pWindowEventHashMap);
taosHashCleanup(sup->pTableNameHashMap);
blockDataDestroy(sup->pEventBlock);
taosArrayDestroy(sup->pSessionKeys);
*sup = (SStreamNotifyEventSupp){0};
}
static int32_t initStreamNotifyEventSupp(SStreamNotifyEventSupp* sup, const char* windowType, int32_t resCapacity) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSDataBlock* pBlock = NULL;
SColumnInfoData infoData = {0};
if (sup == NULL || sup->pWindowEventHashMap != NULL) {
goto _end;
}
code = createDataBlock(&pBlock);
QUERY_CHECK_CODE(code, lino, _end);
pBlock->info.type = STREAM_NOTIFY_EVENT;
pBlock->info.watermark = INT64_MIN;
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = tDataTypes[infoData.info.type].bytes;
code = blockDataAppendColInfo(pBlock, &infoData);
QUERY_CHECK_CODE(code, lino, _end);
sup->pWindowEventHashMap = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
QUERY_CHECK_NULL(sup->pWindowEventHashMap, code, lino, _end, terrno);
taosHashSetFreeFp(sup->pWindowEventHashMap, destroyStreamWindowEvent);
sup->pTableNameHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
QUERY_CHECK_NULL(sup->pTableNameHashMap, code, lino, _end, terrno);
sup->pEventBlock = pBlock;
pBlock = NULL;
code = blockDataEnsureCapacity(sup->pEventBlock, resCapacity);
QUERY_CHECK_CODE(code, lino, _end);
sup->windowType = windowType;
sup->pSessionKeys = taosArrayInit(resCapacity, sizeof(SSessionKey));
QUERY_CHECK_NULL(sup->pSessionKeys, code, lino, _end, terrno);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
if (sup) {
destroyStreamNotifyEventSupp(sup);
}
}
if (pBlock != NULL) {
blockDataDestroy(pBlock);
}
return code;
}
int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo, const struct SOperatorInfo* pOperator) {
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
2024-09-18 03:08:47 +00:00
pBasicInfo->primaryPkIndex = -1;
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
pBasicInfo->operatorFlag = 0;
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pBasicInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _end);
pBasicInfo->pUpdated = taosArrayInit(1024, sizeof(SResultWindowInfo));
QUERY_CHECK_NULL(pBasicInfo->pUpdated, code, lino, _end, terrno);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pBasicInfo->pSeDeleted = tSimpleHashInit(32, hashFn);
const char* windowType = NULL;
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
// if (IS_NORMAL_INTERVAL_OP(pOperator)) {
// windowType = "Time";
// } else if (IS_NORMAL_SESSION_OP(pOperator)) {
// windowType = "Session";
// } else if (IS_NORMAL_STATE_OP(pOperator)) {
// windowType = "State";
// } else if (IS_NORMAL_EVENT_OP(pOperator)) {
// windowType = "Event";
// } else if (IS_NORMAL_COUNT_OP(pOperator)) {
// windowType = "Count";
// } else {
// return TSDB_CODE_SUCCESS;
// }
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
code = initStreamNotifyEventSupp(&pBasicInfo->notifyEventSup, windowType, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
2024-12-26 06:33:14 +00:00
void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
blockDataDestroy(pBasicInfo->pCheckpointRes);
pBasicInfo->pCheckpointRes = NULL;
tSimpleHashCleanup(pBasicInfo->pSeDeleted);
pBasicInfo->pSeDeleted = NULL;
blockDataDestroy(pBasicInfo->pDelRes);
pBasicInfo->pDelRes = NULL;
pBasicInfo->pUpdated = NULL;
pBasicInfo->pTsDataState = NULL;
destroyStreamNotifyEventSupp(&pBasicInfo->notifyEventSup);
}
static int32_t encodeStreamNotifyEventSupp(void** buf, const SStreamNotifyEventSupp* sup) {
int32_t tlen = 0;
void* pIter = NULL;
char* str = NULL;
if (sup == NULL) {
return tlen;
}
tlen += taosEncodeFixedI32(buf, taosHashGetSize(sup->pWindowEventHashMap));
pIter = taosHashIterate(sup->pWindowEventHashMap, NULL);
while (pIter) {
const SStreamNotifyEvent* pEvent = (const SStreamNotifyEvent*)pIter;
str = cJSON_PrintUnformatted(pEvent->pJson);
tlen += taosEncodeFixedU64(buf, pEvent->gid);
tlen += taosEncodeFixedI64(buf, pEvent->eventType);
tlen += taosEncodeFixedI64(buf, pEvent->win.skey);
tlen += taosEncodeFixedI64(buf, pEvent->win.ekey);
tlen += taosEncodeString(buf, str);
cJSON_free(str);
pIter = taosHashIterate(sup->pWindowEventHashMap, pIter);
}
return tlen;
}
static int32_t decodeStreamNotifyEventSupp(void** buf, SStreamNotifyEventSupp* sup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* p = *buf;
int32_t size = 0;
uint64_t len = 0;
SStreamNotifyEvent item = {0};
p = taosDecodeFixedI32(p, &size);
for (int32_t i = 0; i < size; i++) {
p = taosDecodeFixedU64(p, &item.gid);
p = taosDecodeFixedI64(p, &item.eventType);
p = taosDecodeFixedI64(p, &item.win.skey);
p = taosDecodeFixedI64(p, &item.win.ekey);
p = taosDecodeVariantU64(p, &len);
item.pJson = cJSON_Parse(p);
if (item.pJson == NULL) {
qWarn("failed to parse the json content since %s", cJSON_GetErrorPtr());
}
QUERY_CHECK_NULL(item.pJson, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
p = POINTER_SHIFT(p, len);
code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
QUERY_CHECK_CODE(code, lino, _end);
item.pJson = NULL;
}
*buf = p;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
destroyStreamWindowEvent(&item);
return code;
}
int32_t encodeStreamBasicInfo(void** buf, const SSteamOpBasicInfo* pBasicInfo) {
return encodeStreamNotifyEventSupp(buf, &pBasicInfo->notifyEventSup);
}
int32_t decodeStreamBasicInfo(void** buf, SSteamOpBasicInfo* pBasicInfo) {
return decodeStreamNotifyEventSupp(buf, &pBasicInfo->notifyEventSup);
}
static void streamNotifyGetEventWindowId(const SSessionKey* pSessionKey, char* buf) {
uint64_t hash = 0;
uint64_t ar[2];
ar[0] = pSessionKey->groupId;
ar[1] = pSessionKey->win.skey;
hash = MurmurHash3_64((char*)ar, sizeof(ar));
buf = u64toaFastLut(hash, buf);
}
#define JSON_CHECK_ADD_ITEM(obj, str, item) \
QUERY_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
static int32_t jsonAddColumnField(const char* colName, int16_t type, bool isNull, const char* pData, cJSON* obj) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
char* temp = NULL;
QUERY_CHECK_NULL(colName, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_CONDITION(isNull || (pData != NULL), code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_INVALID_PARA);
if (isNull) {
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNull());
goto _end;
}
switch (type) {
case TSDB_DATA_TYPE_BOOL: {
bool val = *(const bool*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateBool(val));
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int8_t val = *(const int8_t*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t val = *(const int16_t*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t val = *(const int32_t*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t val = *(const int64_t*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float val = *(const float*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double val = *(const double*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_NCHAR: {
// cJSON requires null-terminated strings, but this data is not null-terminated,
// so we need to manually copy the string and add null termination.
const char* src = varDataVal(pData);
int32_t len = varDataLen(pData);
temp = cJSON_malloc(len + 1);
QUERY_CHECK_NULL(temp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
memcpy(temp, src, len);
temp[len] = '\0';
cJSON* item = cJSON_CreateStringReference(temp);
JSON_CHECK_ADD_ITEM(obj, colName, item);
// let the cjson object to free memory later
item->type &= ~cJSON_IsReference;
temp = NULL;
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t val = *(const uint8_t*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t val = *(const uint16_t*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t val = *(const uint32_t*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t val = *(const uint64_t*)pData;
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
default: {
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateStringReference("<Unable to display this data type>"));
break;
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (temp) {
cJSON_free(temp);
}
return code;
}
static cJSON* createBasicAggNotifyEvent(const char* windowType, EStreamNotifyEventType eventType,
const SSessionKey* pSessionKey) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
const char* eventTypeStr = NULL;
cJSON* event = NULL;
char windowId[32];
char groupId[32];
QUERY_CHECK_NULL(windowType, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pSessionKey, code, lino, _end, TSDB_CODE_INVALID_PARA);
if (eventType == SNOTIFY_EVENT_WINDOW_OPEN) {
eventTypeStr = "WINDOW_OPEN";
} else if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
eventTypeStr = "WINDOW_CLOSE";
} else if (eventType == SNOTIFY_EVENT_WINDOW_INVALIDATION) {
eventTypeStr = "WINDOW_INVALIDATION";
} else {
QUERY_CHECK_CONDITION(false, code, lino, _end, TSDB_CODE_INVALID_PARA);
}
qDebug("add stream notify event from %s Window, type: %s, start: %" PRId64 ", end: %" PRId64, windowType,
eventTypeStr, pSessionKey->win.skey, pSessionKey->win.ekey);
event = cJSON_CreateObject();
QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
// add basic info
streamNotifyGetEventWindowId(pSessionKey, windowId);
JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference(eventTypeStr));
JSON_CHECK_ADD_ITEM(event, "eventTime", cJSON_CreateNumber(taosGetTimestampMs()));
JSON_CHECK_ADD_ITEM(event, "windowId", cJSON_CreateString(windowId));
JSON_CHECK_ADD_ITEM(event, "windowType", cJSON_CreateStringReference(windowType));
char* p = u64toaFastLut(pSessionKey->groupId, groupId);
JSON_CHECK_ADD_ITEM(event, "groupId", cJSON_CreateString(groupId));
JSON_CHECK_ADD_ITEM(event, "windowStart", cJSON_CreateNumber(pSessionKey->win.skey));
if (eventType != SNOTIFY_EVENT_WINDOW_OPEN) {
if (strcmp(windowType, "Time") == 0) {
JSON_CHECK_ADD_ITEM(event, "windowEnd", cJSON_CreateNumber(pSessionKey->win.ekey + 1));
} else {
JSON_CHECK_ADD_ITEM(event, "windowEnd", cJSON_CreateNumber(pSessionKey->win.ekey));
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
terrno = code;
cJSON_Delete(event);
event = NULL;
}
return event;
}
int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri,
SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
cJSON* event = NULL;
cJSON* fields = NULL;
cJSON* cond = NULL;
const SNode* pNode = NULL;
int32_t origSize = 0;
int64_t startTime = 0;
int64_t endTime = 0;
SStreamNotifyEvent item = {0};
QUERY_CHECK_NULL(pInputBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pInputBlock->pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pCondCols, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup->windowType, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
startTime = taosGetMonoTimestampMs();
event = createBasicAggNotifyEvent(sup->windowType, eventType, pSessionKey);
QUERY_CHECK_NULL(event, code, lino, _end, terrno);
// create fields object to store matched column values
fields = cJSON_CreateObject();
QUERY_CHECK_NULL(fields, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
FOREACH(pNode, pCondCols) {
const SColumnNode* pColDef = (const SColumnNode*)pNode;
const SColumnInfoData* pColData = taosArrayGet(pInputBlock->pDataBlock, pColDef->slotId);
code = jsonAddColumnField(pColDef->colName, pColData->info.type, colDataIsNull_s(pColData, ri),
colDataGetData(pColData, ri), fields);
QUERY_CHECK_CODE(code, lino, _end);
}
// add trigger condition
cond = cJSON_CreateObject();
QUERY_CHECK_NULL(cond, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(0));
JSON_CHECK_ADD_ITEM(cond, "fieldValues", fields);
fields = NULL;
JSON_CHECK_ADD_ITEM(event, "triggerCondition", cond);
cond = NULL;
item.gid = pSessionKey->groupId;
item.win = pSessionKey->win;
item.eventType = eventType;
item.pJson = event;
event = NULL;
origSize = taosHashGetSize(sup->pWindowEventHashMap);
code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
QUERY_CHECK_CODE(code, lino, _end);
item.pJson = NULL;
endTime = taosGetMonoTimestampMs();
pNotifyEventStat->notifyEventAddTimes++;
pNotifyEventStat->notifyEventAddElems += taosHashGetSize(sup->pWindowEventHashMap) - origSize;
pNotifyEventStat->notifyEventAddCostSec += (endTime - startTime) / 1000.0;
pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
destroyStreamWindowEvent(&item);
if (cond != NULL) {
cJSON_Delete(cond);
}
if (fields != NULL) {
cJSON_Delete(fields);
}
if (event != NULL) {
cJSON_Delete(event);
}
return code;
}
int32_t addStateAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
const SStateKeys* pCurState, const SStateKeys* pAnotherState, bool onlyUpdate,
SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
cJSON* event = NULL;
int32_t origSize = 0;
int64_t startTime = 0;
int64_t endTime = 0;
SStreamNotifyEvent item = {0};
QUERY_CHECK_NULL(pCurState, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup->windowType, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
item.gid = pSessionKey->groupId;
item.win = pSessionKey->win;
item.eventType = eventType;
// Check if the notify event exists for update
if (onlyUpdate && taosHashGet(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE) == NULL) {
goto _end;
}
startTime = taosGetMonoTimestampMs();
event = createBasicAggNotifyEvent(sup->windowType, eventType, pSessionKey);
QUERY_CHECK_NULL(event, code, lino, _end, terrno);
// add state value
if (eventType == SNOTIFY_EVENT_WINDOW_OPEN) {
if (pAnotherState) {
code = jsonAddColumnField("prevState", pAnotherState->type, pAnotherState->isNull, pAnotherState->pData, event);
QUERY_CHECK_CODE(code, lino, _end);
} else {
code = jsonAddColumnField("prevState", pCurState->type, true, NULL, event);
QUERY_CHECK_CODE(code, lino, _end);
}
}
code = jsonAddColumnField("curState", pCurState->type, pCurState->isNull, pCurState->pData, event);
QUERY_CHECK_CODE(code, lino, _end);
if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
if (pAnotherState) {
code = jsonAddColumnField("nextState", pAnotherState->type, pAnotherState->isNull, pAnotherState->pData, event);
QUERY_CHECK_CODE(code, lino, _end);
} else {
code = jsonAddColumnField("nextState", pCurState->type, true, NULL, event);
QUERY_CHECK_CODE(code, lino, _end);
}
}
item.pJson = event;
event = NULL;
origSize = taosHashGetSize(sup->pWindowEventHashMap);
code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
QUERY_CHECK_CODE(code, lino, _end);
item.pJson = NULL;
endTime = taosGetMonoTimestampMs();
pNotifyEventStat->notifyEventAddTimes++;
pNotifyEventStat->notifyEventAddElems += taosHashGetSize(sup->pWindowEventHashMap) - origSize;
pNotifyEventStat->notifyEventAddCostSec += (endTime - startTime) / 1000.0;
pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
destroyStreamWindowEvent(&item);
if (event != NULL) {
cJSON_Delete(event);
}
return code;
}
static int32_t addNormalAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
cJSON* event = NULL;
int32_t origSize = 0;
int64_t startTime = 0;
int64_t endTime = 0;
SStreamNotifyEvent item = {0};
QUERY_CHECK_NULL(pSessionKey, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup->windowType, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
startTime = taosGetMonoTimestampMs();
event = createBasicAggNotifyEvent(sup->windowType, eventType, pSessionKey);
QUERY_CHECK_NULL(event, code, lino, _end, terrno);
item.gid = pSessionKey->groupId;
item.win = pSessionKey->win;
item.eventType = eventType;
item.pJson = event;
event = NULL;
origSize = taosHashGetSize(sup->pWindowEventHashMap);
code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
QUERY_CHECK_CODE(code, lino, _end);
item.pJson = NULL;
endTime = taosGetMonoTimestampMs();
pNotifyEventStat->notifyEventAddTimes++;
pNotifyEventStat->notifyEventAddElems += taosHashGetSize(sup->pWindowEventHashMap) - origSize;
pNotifyEventStat->notifyEventAddCostSec += (endTime - startTime) / 1000.0;
pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
destroyStreamWindowEvent(&item);
if (event != NULL) {
cJSON_Delete(event);
}
return code;
}
int32_t addIntervalAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
return addNormalAggNotifyEvent(eventType, pSessionKey, sup, pNotifyEventStat);
}
int32_t addSessionAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
return addNormalAggNotifyEvent(eventType, pSessionKey, sup, pNotifyEventStat);
}
int32_t addCountAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
SStreamNotifyEventSupp* sup, STaskNotifyEventStat* pNotifyEventStat) {
return addNormalAggNotifyEvent(eventType, pSessionKey, sup, pNotifyEventStat);
}
int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SArray* pSessionKeys,
const SSchemaWrapper* pSchemaWrapper, SStreamNotifyEventSupp* sup,
STaskNotifyEventStat* pNotifyEventStat) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
cJSON* result = NULL;
int32_t origSize = 0;
int64_t startTime = 0;
int64_t endTime = 0;
SStreamNotifyEvent item = {0};
QUERY_CHECK_NULL(pResultBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pSessionKeys, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pSchemaWrapper, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
qDebug("add %" PRId64 " stream notify results from window agg", pResultBlock->info.rows);
startTime = taosGetMonoTimestampMs();
origSize = taosHashGetSize(sup->pWindowEventHashMap);
for (int32_t i = 0; i < pResultBlock->info.rows; ++i) {
const SSessionKey* pSessionKey = taosArrayGet(pSessionKeys, i);
item.gid = pSessionKey->groupId;
item.win = pSessionKey->win;
item.eventType = SNOTIFY_EVENT_WINDOW_CLOSE;
SStreamNotifyEvent* pItem = taosHashGet(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE);
if (pItem == NULL) {
item.pJson = createBasicAggNotifyEvent(sup->windowType, SNOTIFY_EVENT_WINDOW_CLOSE, pSessionKey);
QUERY_CHECK_NULL(item.pJson, code, lino, _end, terrno);
if (strcmp(sup->windowType, "Event") == 0) {
JSON_CHECK_ADD_ITEM(item.pJson, "triggerCondition", cJSON_CreateNull());
} else if (strcmp(sup->windowType, "State") == 0) {
JSON_CHECK_ADD_ITEM(item.pJson, "curState", cJSON_CreateNull());
JSON_CHECK_ADD_ITEM(item.pJson, "nextState", cJSON_CreateNull());
}
code = taosHashPut(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE, &item, sizeof(SStreamNotifyEvent));
QUERY_CHECK_CODE(code, lino, _end);
item.pJson = NULL;
pItem = taosHashGet(sup->pWindowEventHashMap, &item, NOTIFY_EVENT_KEY_SIZE);
QUERY_CHECK_NULL(pItem, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
}
// convert the result row into json
result = cJSON_CreateObject();
QUERY_CHECK_NULL(result, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
for (int32_t j = 0; j < pSchemaWrapper->nCols; ++j) {
const SSchema* pCol = pSchemaWrapper->pSchema + j;
if (pCol->colId - 1 < taosArrayGetSize(pResultBlock->pDataBlock)) {
const SColumnInfoData* pColData = taosArrayGet(pResultBlock->pDataBlock, pCol->colId - 1);
code = jsonAddColumnField(pCol->name, pColData->info.type, colDataIsNull_s(pColData, i),
colDataGetData(pColData, i), result);
QUERY_CHECK_CODE(code, lino, _end);
}
}
JSON_CHECK_ADD_ITEM(pItem->pJson, "result", result);
result = NULL;
}
endTime = taosGetMonoTimestampMs();
pNotifyEventStat->notifyEventAddTimes++;
pNotifyEventStat->notifyEventAddElems += taosHashGetSize(sup->pWindowEventHashMap) - origSize;
pNotifyEventStat->notifyEventAddCostSec += (endTime - startTime) / 1000.0;
pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
destroyStreamWindowEvent(&item);
if (result != NULL) {
cJSON_Delete(result);
}
return code;
}
int32_t addAggDeleteNotifyEvent(const SSDataBlock* pDeleteBlock, SStreamNotifyEventSupp* sup,
STaskNotifyEventStat* pNotifyEventStat) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSessionKey sessionKey = {0};
SColumnInfoData* pWstartCol = NULL;
SColumnInfoData* pWendCol = NULL;
SColumnInfoData* pGroupIdCol = NULL;
QUERY_CHECK_NULL(pDeleteBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pNotifyEventStat, code, lino, _end, TSDB_CODE_INVALID_PARA);
qDebug("add %" PRId64 " stream notify delete events from window agg", pDeleteBlock->info.rows);
pWstartCol = taosArrayGet(pDeleteBlock->pDataBlock, START_TS_COLUMN_INDEX);
pWendCol = taosArrayGet(pDeleteBlock->pDataBlock, END_TS_COLUMN_INDEX);
pGroupIdCol = taosArrayGet(pDeleteBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = 0; i < pDeleteBlock->info.rows; ++i) {
sessionKey.win.skey = *(int64_t*)colDataGetNumData(pWstartCol, i);
sessionKey.win.ekey = *(int64_t*)colDataGetNumData(pWendCol, i);
sessionKey.groupId = *(uint64_t*)colDataGetNumData(pGroupIdCol, i);
code = addNormalAggNotifyEvent(SNOTIFY_EVENT_WINDOW_INVALIDATION, &sessionKey, sup, pNotifyEventStat);
QUERY_CHECK_CODE(code, lino, _end);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t streamNotifyGetDestTableName(const SExecTaskInfo* pTaskInfo, uint64_t gid, char** pTableName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
const SStorageAPI* pAPI = NULL;
void* tbname = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
char parTbName[TSDB_TABLE_NAME_LEN];
const SStreamTaskInfo* pStreamInfo = NULL;
QUERY_CHECK_NULL(pTaskInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pTableName, code, lino, _end, TSDB_CODE_INVALID_PARA);
*pTableName = NULL;
pAPI = &pTaskInfo->storageAPI;
code = pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, gid, &tbname, false, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
parTbName[0] = '\0';
} else {
tstrncpy(parTbName, tbname, sizeof(parTbName));
}
pAPI->stateStore.streamStateFreeVal(tbname);
pStreamInfo = &pTaskInfo->streamInfo;
code = buildSinkDestTableName(parTbName, pStreamInfo->stbFullName, gid, pStreamInfo->newSubTableRule, pTableName);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup,
STaskNotifyEventStat* pNotifyEventStat) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t nWindowEvents = 0;
SColumnInfoData* pEventStrCol = NULL;
int64_t startTime = 0;
int64_t endTime = 0;
void* pIter = NULL;
if (pTaskInfo == NULL || sup == NULL || sup->pEventBlock == NULL || pNotifyEventStat == NULL) {
goto _end;
}
QUERY_CHECK_NULL(sup->pEventBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
startTime = taosGetMonoTimestampMs();
blockDataCleanup(sup->pEventBlock);
nWindowEvents = taosHashGetSize(sup->pWindowEventHashMap);
qDebug("start to build stream notify event block, nWindowEvents: %d", nWindowEvents);
pEventStrCol = taosArrayGet(sup->pEventBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
QUERY_CHECK_NULL(pEventStrCol, code, lino, _end, terrno);
// Append all events content into data block.
pIter = taosHashIterate(sup->pWindowEventHashMap, NULL);
while (pIter) {
const SStreamNotifyEvent* pEvent = (const SStreamNotifyEvent*)pIter;
pIter = taosHashIterate(sup->pWindowEventHashMap, pIter);
if (pEvent->eventType == SNOTIFY_EVENT_WINDOW_CLOSE && !cJSON_HasObjectItem(pEvent->pJson, "result")) {
// current WINDOW_CLOSE event cannot be pushed yet due to watermark
continue;
}
// get name of the dest child table
char* tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(&pEvent->gid));
if (tableName == NULL) {
code = streamNotifyGetDestTableName(pTaskInfo, pEvent->gid, &tableName);
QUERY_CHECK_CODE(code, lino, _end);
code = taosHashPut(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid), tableName, strlen(tableName) + 1);
taosMemoryFreeClear(tableName);
QUERY_CHECK_CODE(code, lino, _end);
tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid));
QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
}
JSON_CHECK_ADD_ITEM(pEvent->pJson, "tableName", cJSON_CreateStringReference(tableName));
// convert the json object into string and append it into the block
char* str = cJSON_PrintUnformatted(pEvent->pJson);
QUERY_CHECK_NULL(str, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
int32_t len = strlen(str);
code = varColSetVarData(pEventStrCol, sup->pEventBlock->info.rows, str, len, false);
cJSON_free(str);
QUERY_CHECK_CODE(code, lino, _end);
sup->pEventBlock->info.rows++;
code = taosHashRemove(sup->pWindowEventHashMap, pEvent, NOTIFY_EVENT_KEY_SIZE);
if (code == TSDB_CODE_NOT_FOUND) {
code = TSDB_CODE_SUCCESS;
}
QUERY_CHECK_CODE(code, lino, _end);
if (sup->pEventBlock->info.rows >= sup->pEventBlock->info.capacity) {
break;
}
}
if (taosHashGetMemSize(sup->pTableNameHashMap) >= NOTIFY_EVENT_NAME_CACHE_LIMIT_MB * 1024 * 1024) {
taosHashClear(sup->pTableNameHashMap);
}
endTime = taosGetMonoTimestampMs();
if (sup->pEventBlock->info.rows > 0) {
pNotifyEventStat->notifyEventPushTimes++;
pNotifyEventStat->notifyEventPushElems += sup->pEventBlock->info.rows;
pNotifyEventStat->notifyEventPushCostSec += (endTime - startTime) / 1000.0;
}
pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pIter) {
taosHashCancelIterate(sup->pWindowEventHashMap, pIter);
}
return code;
}
int32_t removeOutdatedNotifyEvents(STimeWindowAggSupp* pTwSup, SStreamNotifyEventSupp* sup,
STaskNotifyEventStat* pNotifyEventStat) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* pIter = NULL;
if (pTwSup || sup == NULL || pNotifyEventStat == NULL) {
goto _end;
}
pIter = taosHashIterate(sup->pWindowEventHashMap, NULL);
while (pIter) {
const SStreamNotifyEvent* pEvent = (const SStreamNotifyEvent*)pIter;
pIter = taosHashIterate(sup->pWindowEventHashMap, pIter);
}
pNotifyEventStat->notifyEventHoldElems = taosHashGetSize(sup->pWindowEventHashMap);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
2024-09-18 03:08:47 +00:00
}
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
void setFinalOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
BIT_FLAG_SET_MASK(pBasicInfo->operatorFlag, FINAL_OPERATOR);
}
bool isFinalOperator(SSteamOpBasicInfo* pBasicInfo) {
return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, FINAL_OPERATOR);
}
void setRecalculateOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
BIT_FLAG_SET_MASK(pBasicInfo->operatorFlag, RECALCULATE_OPERATOR);
}
void unsetRecalculateOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
BIT_FLAG_UNSET_MASK(pBasicInfo->operatorFlag, RECALCULATE_OPERATOR);
}
bool isRecalculateOperator(SSteamOpBasicInfo* pBasicInfo) {
return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, RECALCULATE_OPERATOR);
}
void setSingleOperatorFlag(SSteamOpBasicInfo* pBasicInfo) {
BIT_FLAG_SET_MASK(pBasicInfo->operatorFlag, SINGLE_OPERATOR);
}
bool isSingleOperator(SSteamOpBasicInfo* pBasicInfo) {
return BIT_FLAG_TEST_MASK(pBasicInfo->operatorFlag, SINGLE_OPERATOR);
}