From c999f7f425f9c393f103bfa4b398349bba061f6e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 11 Nov 2025 18:50:53 +0800 Subject: [PATCH 1/6] enh(stream): fix case error --- source/dnode/vnode/src/vnd/vnodeStream.c | 3 ++- .../08-Recalc/test_recalc_manual_with_options.py | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeStream.c b/source/dnode/vnode/src/vnd/vnodeStream.c index 01230572df0..8d319a79ee5 100644 --- a/source/dnode/vnode/src/vnd/vnodeStream.c +++ b/source/dnode/vnode/src/vnd/vnodeStream.c @@ -2420,7 +2420,8 @@ static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMs ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64, TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey, pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows); - if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) { + if ((req->tsdbTriggerDataReq.gid != 0 && pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) || + (req->tsdbTriggerDataReq.gid == 0 && pTaskInner->pResBlockDst->info.rows >= 0)) { //todo optimize send multi blocks in one group break; } } diff --git a/test/cases/18-StreamProcessing/08-Recalc/test_recalc_manual_with_options.py b/test/cases/18-StreamProcessing/08-Recalc/test_recalc_manual_with_options.py index 929b32aec67..c63d3f21b22 100644 --- a/test/cases/18-StreamProcessing/08-Recalc/test_recalc_manual_with_options.py +++ b/test/cases/18-StreamProcessing/08-Recalc/test_recalc_manual_with_options.py @@ -286,6 +286,7 @@ class TestStreamRecalcWithOptions: # In real scenario, the current stream processing time would determine what's expired tdSql.execute("insert into tdb.exp1 values ('2025-01-01 02:04:00', 15, 150, 1.75, 'expired');") + time.sleep(5) # Manual recalculation for expired data - should still work since manual recalc bypasses expiry tdLog.info("Test EXPIRED_TIME manual recalculation - expired data") tdSql.execute("recalculate stream rdb.s_expired_interval from '2025-01-01 02:04:00' to '2025-01-01 02:14:00';") From f30511343c119ded9ebb8df44e74fd99c6392e42 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 11 Nov 2025 19:09:29 +0800 Subject: [PATCH 2/6] enh(stream): fix case error --- .../07-SubQuery/test_subquery_vtable_change.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_vtable_change.py b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_vtable_change.py index f6647a7b711..f6cbd866c65 100644 --- a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_vtable_change.py +++ b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_vtable_change.py @@ -28,14 +28,14 @@ class TestStreamSubQueryVtableChange: """ tdStream.createSnode() - tdSql.execute(f"alter all dnodes 'debugflag 131';") - tdSql.execute(f"alter all dnodes 'stdebugflag 131';") + tdSql.execute(f"alter all dnodes 'debugflag 143';") + tdSql.execute(f"alter all dnodes 'stdebugflag 143';") streams = [] streams.append(self.Basic0()) # add col ref from new vg for virtual normal table - streams.append(self.Basic1()) # add col ref from new vg for virtual child table - streams.append(self.Basic2()) # add col ref from new vg for virtual super table - streams.append(self.Basic3()) # add new virtual child table, and ref from new vg + # streams.append(self.Basic1()) # add col ref from new vg for virtual child table + # streams.append(self.Basic2()) # add col ref from new vg for virtual super table + # streams.append(self.Basic3()) # add new virtual child table, and ref from new vg tdStream.checkAll(streams) From e7d862e4cf4f2ecd55e0477b2d2d4f45a0539af7 Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Tue, 11 Nov 2025 20:05:37 +0800 Subject: [PATCH 3/6] Revert "enh(stream): fix case error" This reverts commit f30511343c119ded9ebb8df44e74fd99c6392e42. --- .../07-SubQuery/test_subquery_vtable_change.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_vtable_change.py b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_vtable_change.py index f6cbd866c65..f6647a7b711 100644 --- a/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_vtable_change.py +++ b/test/cases/18-StreamProcessing/07-SubQuery/test_subquery_vtable_change.py @@ -28,14 +28,14 @@ class TestStreamSubQueryVtableChange: """ tdStream.createSnode() - tdSql.execute(f"alter all dnodes 'debugflag 143';") - tdSql.execute(f"alter all dnodes 'stdebugflag 143';") + tdSql.execute(f"alter all dnodes 'debugflag 131';") + tdSql.execute(f"alter all dnodes 'stdebugflag 131';") streams = [] streams.append(self.Basic0()) # add col ref from new vg for virtual normal table - # streams.append(self.Basic1()) # add col ref from new vg for virtual child table - # streams.append(self.Basic2()) # add col ref from new vg for virtual super table - # streams.append(self.Basic3()) # add new virtual child table, and ref from new vg + streams.append(self.Basic1()) # add col ref from new vg for virtual child table + streams.append(self.Basic2()) # add col ref from new vg for virtual super table + streams.append(self.Basic3()) # add new virtual child table, and ref from new vg tdStream.checkAll(streams) From 5f8d3af827fe39fce711f8523db72094bcdd27ac Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Tue, 11 Nov 2025 20:06:15 +0800 Subject: [PATCH 4/6] fix(stream): fix retry of calc request --- .../libs/new-stream/src/streamTriggerTask.c | 30 +++++++------------ 1 file changed, 11 insertions(+), 19 deletions(-) diff --git a/source/libs/new-stream/src/streamTriggerTask.c b/source/libs/new-stream/src/streamTriggerTask.c index 87cb55d930c..c71229ead54 100644 --- a/source/libs/new-stream/src/streamTriggerTask.c +++ b/source/libs/new-stream/src/streamTriggerTask.c @@ -3670,31 +3670,23 @@ static int32_t stRealtimeContextRetryCalcRequest(SSTriggerRealtimeContext *pCont pReq->createTable = true; - SSTriggerRealtimeGroup *pGroup = stRealtimeContextGetCurrentGroup(pContext); - QUERY_CHECK_NULL(pGroup, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); - if (pReq->createTable && pTask->hasPartitionBy || (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_IDX) || (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_TBNAME)) { needTagValue = true; } if (needTagValue && taosArrayGetSize(pReq->groupColVals) == 0) { - void *px = tSimpleHashGet(pContext->pGroupColVals, &pGroup->gid, sizeof(int64_t)); - if (px == NULL) { - code = stRealtimeContextSendPullReq(pContext, STRIGGER_PULL_GROUP_COL_VALUE); - QUERY_CHECK_CODE(code, lino, _end); - goto _end; - } else { - SArray *pGroupColVals = *(SArray **)px; - for (int32_t i = 0; i < TARRAY_SIZE(pGroupColVals); i++) { - SStreamGroupValue *pValue = TARRAY_GET_ELEM(pGroupColVals, i); - SStreamGroupValue *pDst = taosArrayPush(pReq->groupColVals, pValue); - QUERY_CHECK_NULL(pDst, code, lino, _end, terrno); - if (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL) { - pDst->data.pData = taosMemoryMalloc(pDst->data.nData); - QUERY_CHECK_NULL(pDst->data.pData, code, lino, _end, terrno); - TAOS_MEMCPY(pDst->data.pData, pValue->data.pData, pDst->data.nData); - } + void *px = tSimpleHashGet(pContext->pGroupColVals, &pReq->gid, sizeof(int64_t)); + QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); + SArray *pGroupColVals = *(SArray **)px; + for (int32_t i = 0; i < TARRAY_SIZE(pGroupColVals); i++) { + SStreamGroupValue *pValue = TARRAY_GET_ELEM(pGroupColVals, i); + SStreamGroupValue *pDst = taosArrayPush(pReq->groupColVals, pValue); + QUERY_CHECK_NULL(pDst, code, lino, _end, terrno); + if (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL) { + pDst->data.pData = taosMemoryMalloc(pDst->data.nData); + QUERY_CHECK_NULL(pDst->data.pData, code, lino, _end, terrno); + TAOS_MEMCPY(pDst->data.pData, pValue->data.pData, pDst->data.nData); } } } From 3c42516e057252daa9192c289ed2d72378120d05 Mon Sep 17 00:00:00 2001 From: Tony Zhang Date: Tue, 11 Nov 2025 20:46:09 +0800 Subject: [PATCH 5/6] fix: stable tagFilterCache bug when alter varchar/nchar tag --- source/dnode/vnode/inc/vnode.h | 5 +- source/dnode/vnode/src/meta/metaCache.c | 111 +++++++++++++++++------ source/dnode/vnode/src/meta/metaEntry2.c | 16 ++-- source/dnode/vnode/src/meta/metaTable.c | 15 +-- source/libs/executor/src/executil.c | 33 ++++--- 5 files changed, 117 insertions(+), 63 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ff66607104c..ab257dae49d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -153,9 +153,10 @@ typedef enum { STABLE_TAG_FILTER_CACHE_ADD_TABLE = 2, } ETagFilterCacheAction; int32_t metaStableTagFilterCacheUpdateUid( - SMeta* pMeta, const SMetaEntry* pDroppedTable, ETagFilterCacheAction action); + SMeta* pMeta, const SMetaEntry* pDroppedTable, + const SMetaEntry* pSuperTable, ETagFilterCacheAction action); int32_t metaStableTagFilterCacheDropTag( - SMeta* pMeta, tb_uid_t suid, int16_t tagColId); + SMeta* pMeta, tb_uid_t suid, col_id_t tagColId); tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name); int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList); int32_t metaPutTbGroupToCache(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload, diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 1ceee88a61f..dfd6cccb47d 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -952,8 +952,10 @@ _end: TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code)); } else { metaDebug( - "vgId:%d, suid:%" PRIu64 " stable tag filter cache dropped from cache", - TD_VID(pMeta->pVnode), suid); + "vgId:%d, suid:%" PRIu64 " stable tag filter cache dropped from cache" + "left stable num:%d, tag conditions num:%" PRIu32, + TD_VID(pMeta->pVnode), suid, (int32_t)taosHashGetSize(pTableEntry), + pMeta->pCache->sStableTagFilterResCache.numTagDataEntries); } code = taosThreadRwlockUnlock(pRwlock); if (TSDB_CODE_SUCCESS != code) { @@ -963,28 +965,68 @@ _end: return code; } -static int32_t buildTagDataEntryKey( - const SArray* pColIds, STag* pTag, T_MD5_CTX* pContext) { +static int32_t getTagColSize( + const SSchema* pTagSchemas, int32_t nTagCols, col_id_t cid) { + for (int32_t i = 0; i < nTagCols; i++) { + if (pTagSchemas[i].colId == cid) { + return pTagSchemas[i].bytes; + } + } + return 0; +} + +// when encode nchar tag into tag data entry key, need to convert it to var type +static FORCE_INLINE int32_t ncharToVar(char *pData, int32_t nData, char **ppOut) { int32_t code = TSDB_CODE_SUCCESS; + + char *t = taosMemoryCalloc(1, nData + VARSTR_HEADER_SIZE); + if (NULL == t) { + return terrno; + } + int32_t len = taosUcs4ToMbs( + (TdUcs4 *)pData, nData, varDataVal(t), NULL); + if (len < 0) { + taosMemoryFree(t); + return TSDB_CODE_SCALAR_CONVERT_ERROR; + } + varDataSetLen(t, len); + + *ppOut = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); + memcpy(*ppOut, t, len + VARSTR_HEADER_SIZE); + +_return: + taosMemoryFree(t); + return code; +} + +static int32_t buildTagDataEntryKey(const SArray* pColIds, const STag* pTag, + const SSchemaWrapper* pTagScheam, T_MD5_CTX* pContext) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t keyLen = 0; + char* pKey = NULL; // get length first for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) { STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)}; if (tTagGet(pTag, &pTagValue)) { keyLen += sizeof(col_id_t); if (IS_VAR_DATA_TYPE(pTagValue.type)) { - keyLen += pTagValue.nData; + int32_t varLen = getTagColSize( + pTagScheam->pSchema, pTagScheam->nCols, pTagValue.cid); + code = varLen > 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_NOT_FOUND; + QUERY_CHECK_CODE(code, lino, _end); + keyLen += varLen; } else { keyLen += tDataTypes[pTagValue.type].bytes; } } else { // tag value not found code = TSDB_CODE_NOT_FOUND; - return code; + QUERY_CHECK_CODE(code, lino, _end); } } - char* pKey = taosMemoryCalloc(1, keyLen); + pKey = taosMemoryCalloc(1, keyLen); if (NULL == pKey) { code = terrno; return code; @@ -1000,19 +1042,26 @@ static int32_t buildTagDataEntryKey( pStart += sizeof(col_id_t); // copy value if (IS_VAR_DATA_TYPE(pTagValue.type) && pTagValue.pData != NULL) { - int32_t varLen = pTagValue.nData; - memcpy(pStart, pTagValue.pData, varLen); - pStart += varLen; + if (TSDB_DATA_TYPE_NCHAR == pTagValue.type) { + // need to convert nchar to var + char *pVar = NULL; + code = ncharToVar((char *)pTagValue.pData, pTagValue.nData, &pVar); + QUERY_CHECK_CODE(code, lino, _end); + memcpy(pStart, varDataVal(pVar), varDataLen(pVar)); + pStart += varDataLen(pVar); + taosMemoryFree(pVar); + } else { + memcpy(pStart, pTagValue.pData, pTagValue.nData); + pStart += pTagValue.nData; + } } else { - int32_t typeLen = tDataTypes[pTagValue.type].bytes; - memcpy(pStart, &pTagValue.i64, typeLen); - pStart += typeLen; + memcpy(pStart, &pTagValue.i64, tDataTypes[pTagValue.type].bytes); + pStart += tDataTypes[pTagValue.type].bytes; } } else { // tag value not found - taosMemoryFree(pKey); code = TSDB_CODE_NOT_FOUND; - return code; + QUERY_CHECK_CODE(code, lino, _end); } } @@ -1021,6 +1070,7 @@ static int32_t buildTagDataEntryKey( tMD5Update(pContext, (uint8_t*)pKey, (uint32_t)keyLen); tMD5Final(pContext); +_end: taosMemFreeClear(pKey); return code; } @@ -1028,8 +1078,9 @@ static int32_t buildTagDataEntryKey( // remove the dropped table uid from all cache entries // pDroppedTable is the dropped child table meta entry int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta, - const SMetaEntry* pDroppedTable, ETagFilterCacheAction action) { - if (pMeta == NULL || pDroppedTable == NULL) { + const SMetaEntry* pChildTable, const SMetaEntry* pSuperTable, + ETagFilterCacheAction action) { + if (pMeta == NULL || pChildTable == NULL || pSuperTable == NULL) { return TSDB_CODE_INVALID_PARA; } int32_t lino = 0; @@ -1040,7 +1091,7 @@ int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta, code = taosThreadRwlockWrlock(pRwlock); TSDB_CHECK_CODE(code, lino, _end); - tb_uid_t suid = pDroppedTable->ctbEntry.suid;; + tb_uid_t suid = pChildTable->ctbEntry.suid;; STagConds** pTagConds = (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t)); if (pTagConds != NULL) { @@ -1053,11 +1104,12 @@ int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta, int32_t keyLen = 0; char* pKey = NULL; T_MD5_CTX context = {0}; - code = buildTagDataEntryKey(pColIds, (STag*)pDroppedTable->ctbEntry.pTags, &context); + code = buildTagDataEntryKey(pColIds, (STag*)pChildTable->ctbEntry.pTags, + &pSuperTable->stbEntry.schemaTag, &context); if (code != TSDB_CODE_SUCCESS) { - metaError("vgId:%d, suid:%" PRIu64 " failed to build tag condition key for dropped table uid:%" PRIu64 - " since %s", - TD_VID(pMeta->pVnode), suid, pDroppedTable->uid, tstrerror(code)); + metaError("vgId:%d, suid:%" PRIu64 " failed to build tag condition" + " key for dropped table uid:%" PRIu64 " since %s", + TD_VID(pMeta->pVnode), suid, pChildTable->uid, tstrerror(code)); goto _end; } @@ -1069,17 +1121,18 @@ int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta, if (action == STABLE_TAG_FILTER_CACHE_DROP_TABLE) { for (int32_t i = 0; i < taosArrayGetSize(*pArray); i++) { uint64_t uid = *(uint64_t*)taosArrayGet(*pArray, i); - if (uid == pDroppedTable->uid) { + if (uid == pChildTable->uid) { taosArrayRemove(*pArray, i); - metaDebug("vgId:%d, suid:%" PRIu64 " removed dropped table uid:%" PRIu64 + metaDebug("vgId:%d, suid:%" PRIu64 + " removed dropped table uid:%" PRIu64 " from stable tag filter cache", - TD_VID(pMeta->pVnode), suid, pDroppedTable->uid); + TD_VID(pMeta->pVnode), suid, pChildTable->uid); break; } } } else { // STABLE_TAG_FILTER_CACHE_ADD_TABLE - void* _tmp = taosArrayPush(*pArray, &pDroppedTable->uid); + void* _tmp = taosArrayPush(*pArray, &pChildTable->uid); } } } @@ -1094,7 +1147,7 @@ _end: "vgId:%d, suid:%" PRIu64 " update table uid:%" PRIu64 " in stable tag filter cache, action:%d", TD_VID(pMeta->pVnode), - pDroppedTable->ctbEntry.suid, pDroppedTable->uid, action); + pChildTable->ctbEntry.suid, pChildTable->uid, action); } code = taosThreadRwlockUnlock(pRwlock); if (TSDB_CODE_SUCCESS != code) { @@ -1104,8 +1157,8 @@ _end: return code; } -int32_t metaStableTagFilterCacheDropTag(SMeta* pMeta, - tb_uid_t suid, col_id_t cid) { +int32_t metaStableTagFilterCacheDropTag( + SMeta* pMeta, tb_uid_t suid, col_id_t cid) { if (pMeta == NULL) { return TSDB_CODE_INVALID_PARA; } diff --git a/source/dnode/vnode/src/meta/metaEntry2.c b/source/dnode/vnode/src/meta/metaEntry2.c index c48b03c492e..d4b8e98c6f9 100644 --- a/source/dnode/vnode/src/meta/metaEntry2.c +++ b/source/dnode/vnode/src/meta/metaEntry2.c @@ -1357,7 +1357,7 @@ static int32_t metaHandleChildTableCreateImpl(SMeta *pMeta, const SMetaEntry *pE } ret = metaStableTagFilterCacheUpdateUid( - pMeta, pEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE); + pMeta, pEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE); if (ret < 0) { metaErr(TD_VID(pMeta->pVnode), ret); } @@ -1496,7 +1496,7 @@ static int32_t metaHandleVirtualChildTableCreateImpl(SMeta *pMeta, const SMetaEn } ret = metaStableTagFilterCacheUpdateUid( - pMeta, pEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE); + pMeta, pEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE); if (ret < 0) { metaErr(TD_VID(pMeta->pVnode), ret); } @@ -1656,7 +1656,7 @@ static int32_t metaHandleChildTableDropImpl(SMeta *pMeta, const SMetaHandleParam } ret = metaStableTagFilterCacheUpdateUid( - pMeta, pChild, STABLE_TAG_FILTER_CACHE_DROP_TABLE); + pMeta, pChild, pSuper, STABLE_TAG_FILTER_CACHE_DROP_TABLE); if (ret < 0) { metaErr(TD_VID(pMeta->pVnode), ret); } @@ -1849,7 +1849,7 @@ static int32_t metaHandleVirtualChildTableDropImpl(SMeta *pMeta, const SMetaHand } ret = metaStableTagFilterCacheUpdateUid( - pMeta, pChild, STABLE_TAG_FILTER_CACHE_DROP_TABLE); + pMeta, pChild, pSuper, STABLE_TAG_FILTER_CACHE_DROP_TABLE); if (ret < 0) { metaErr(TD_VID(pMeta->pVnode), ret); } @@ -2076,12 +2076,12 @@ static int32_t metaHandleVirtualChildTableUpdateImpl(SMeta *pMeta, const SMetaHa // update stable tag filter cache: drop old then add new code = metaStableTagFilterCacheUpdateUid( - pMeta, pOldEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE); + pMeta, pOldEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE); if (TSDB_CODE_SUCCESS != code) { metaErr(TD_VID(pMeta->pVnode), code); } code = metaStableTagFilterCacheUpdateUid( - pMeta, pEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE); + pMeta, pEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE); if (TSDB_CODE_SUCCESS != code) { metaErr(TD_VID(pMeta->pVnode), code); } @@ -2126,12 +2126,12 @@ static int32_t metaHandleChildTableUpdateImpl(SMeta *pMeta, const SMetaHandlePar // update stable tag filter cache: drop old then add new code = metaStableTagFilterCacheUpdateUid( - pMeta, pOldEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE); + pMeta, pOldEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE); if (TSDB_CODE_SUCCESS != code) { metaErr(TD_VID(pMeta->pVnode), code); } code = metaStableTagFilterCacheUpdateUid( - pMeta, pEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE); + pMeta, pEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE); if (TSDB_CODE_SUCCESS != code) { metaErr(TD_VID(pMeta->pVnode), code); } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index b06bef0e790..ab5d381a0b4 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -639,6 +639,14 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p } if (pSysTbl) *pSysTbl = metaTbInFilterCache(pMeta, stbEntry.name, 1) ? 1 : 0; + + ret = metaStableTagFilterCacheUpdateUid( + pMeta, &e, &stbEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE); + if (ret < 0) { + metaError("vgId:%d, failed to update stable tag filter cache:%s " + "uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, + e.ctbEntry.suid, tstrerror(ret)); + } SSchema *pTagColumn = NULL; SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag; @@ -728,13 +736,6 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p metaError("vgId:%d, failed to clear uid cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, e.ctbEntry.suid, tstrerror(ret)); } - ret = metaStableTagFilterCacheUpdateUid( - pMeta, &e, STABLE_TAG_FILTER_CACHE_DROP_TABLE); - if (ret < 0) { - metaError("vgId:%d, failed to update stable tag filter cache:%s " - "uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, - e.ctbEntry.suid, tstrerror(ret)); - } ret = metaTbGroupCacheClear(pMeta, e.ctbEntry.suid); if (ret < 0) { metaError("vgId:%d, failed to clear group cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 271365c1b04..0d4c2ea99fd 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -717,12 +717,12 @@ static bool canOptimizeTagCondFilter(const SNode* pTagCond) { typedef struct { col_id_t colId; SNode* pValueNode; + int32_t bytes; // length defined in schema } STagDataEntry; static int compareTagDataEntry(const void* a, const void* b) { STagDataEntry* p1 = (STagDataEntry*)a; STagDataEntry* p2 = (STagDataEntry*)b; - return compareInt16Val(&p1->colId, &p2->colId); } @@ -736,8 +736,8 @@ static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t } char* pStart = *keyBuf; for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) { - STagDataEntry* entry = (STagDataEntry*)taosArrayGet(pIdWithValue, i); - SValueNode* pValueNode = (SValueNode*)entry->pValueNode; + STagDataEntry* entry = (STagDataEntry*)taosArrayGet(pIdWithValue, i); + SValueNode* pValueNode = (SValueNode*)entry->pValueNode; (void)memcpy(pStart, &entry->colId, sizeof(col_id_t)); pStart += sizeof(col_id_t); @@ -808,9 +808,8 @@ static void extractTagDataEntry( STagDataEntry entry = {0}; entry.colId = pColNode->colId; entry.pValueNode = (SNode*)pValueNode; + entry.bytes = pColNode->node.resType.bytes; void* _tmp = taosArrayPush(pIdWithValue, &entry); - STagDataEntry* pLastEntry = taosArrayGetLast(pIdWithValue); - ((SValueNode*)pLastEntry->pValueNode)->node.resType = pColNode->node.resType; } static int32_t extractTagFilterTagDataEntries( @@ -851,8 +850,7 @@ static int32_t genStableTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pConte QUERY_CHECK_CODE(code, lino, _end); for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) { STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i); - len += sizeof(col_id_t) + - ((SValueNode*)pEntry->pValueNode)->node.resType.bytes; + len += sizeof(col_id_t) + pEntry->bytes; } code = buildTagDataEntryKey(pIdWithVal, &payload, len); QUERY_CHECK_CODE(code, lino, _end); @@ -2030,7 +2028,7 @@ static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) { } static int32_t buildTagCondKey( - SNode* pTagCond, char** pTagCondKey, + const SNode* pTagCond, char** pTagCondKey, int32_t* tagCondKeyLen, SArray** pTagColIds) { if (NULL == pTagCond || (nodeType(pTagCond) != QUERY_NODE_OPERATOR && @@ -2084,7 +2082,6 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S int32_t lino = 0; size_t numOfTables = 0; bool listAdded = false; - char* pTagCondStr = NULL; pListInfo->idInfo.suid = pScanNode->suid; pListInfo->idInfo.tableType = pScanNode->tableType; @@ -2124,8 +2121,6 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S code = ctx.code; goto _error; } - code = nodesNodeToString(tmp, false, &pTagCondStr, NULL); - code = genStableTagFilterDigest(tmp, &contextStable); nodesDestroyNode(tmp); } else { @@ -2148,9 +2143,12 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S pTagColIds = NULL; digest[0] = 1; - memcpy(digest + 1, contextStable.digest, tListLen(contextStable.digest)); - qDebug("suid:%" PRIu64 ", %s retrieve table uid list from stable cache, key:%s, kenLen:%d, tagCondStr:%s, numOfTables:%d", - pScanNode->suid, idstr, pTagCondKey, tagCondKeyLen, pTagCondStr, (int32_t)taosArrayGetSize(pUidList)); + memcpy( + digest + 1, contextStable.digest, tListLen(contextStable.digest)); + qDebug("suid:%" PRIu64 ", %s retrieve table uid list from stable cache," + " key:%s, kenLen:%d, numOfTables:%d", + pScanNode->suid, idstr, pTagCondKey, tagCondKeyLen, + (int32_t)taosArrayGetSize(pUidList)); goto _end; } else { qDebug("suid:%" PRIu64 @@ -2242,8 +2240,10 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S memcpy(digest + 1, context.digest, tListLen(context.digest)); } if (tsStableTagFilterCache && isStream && canCacheTagCondFilter) { - qInfo("suid:%" PRIu64 ", %s add uid list to stableTagFilterCache, key:%s, keyLen:%d, condition:%s, uidListSize:%d", - pScanNode->suid, idstr, pTagCondKey, tagCondKeyLen, pTagCondStr, (int32_t)taosArrayGetSize(pUidList)); + qInfo("suid:%" PRIu64 ", %s add uid list to stableTagFilterCache, key:%s," + " keyLen:%d, uidListSize:%d", + pScanNode->suid, idstr, pTagCondKey, tagCondKeyLen, + (int32_t)taosArrayGetSize(pUidList)); code = pStorageAPI->metaFn.putStableCachedTableList( pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen, contextStable.digest, tListLen(contextStable.digest), @@ -2276,7 +2276,6 @@ _error: taosArrayDestroy(pUidList); taosArrayDestroy(pTagColIds); taosMemFreeClear(pTagCondKey); - taosMemFreeClear(pTagCondStr); if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } From cadc8f517cb27adabc4de73a6485b43e3ef5119f Mon Sep 17 00:00:00 2001 From: Jinqing Kuang Date: Wed, 12 Nov 2025 09:38:23 +0800 Subject: [PATCH 6/6] fix(stream): use database precision for history scan step --- .../libs/new-stream/inc/streamTriggerTask.h | 4 ++- .../libs/new-stream/src/streamTriggerTask.c | 27 ++++++++++--------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/source/libs/new-stream/inc/streamTriggerTask.h b/source/libs/new-stream/inc/streamTriggerTask.h index 6e1efd839d1..4d6dfea6a9c 100644 --- a/source/libs/new-stream/inc/streamTriggerTask.h +++ b/source/libs/new-stream/inc/streamTriggerTask.h @@ -331,6 +331,7 @@ typedef struct SStreamTriggerTask { bool ignoreNoDataTrigger; bool hasTriggerFilter; int8_t precision; + int64_t historyStep; int64_t placeHolderBitmap; SNode *triggerFilter; // trigger options: old version, to be removed @@ -405,7 +406,8 @@ int32_t stTriggerTaskAcquireRequest(SStreamTriggerTask *pTask, int64_t sessionId int32_t stTriggerTaskReleaseRequest(SStreamTriggerTask *pTask, SSTriggerCalcRequest **ppRequest); int32_t stTriggerTaskAddRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRealtimeGroup *pGroup, - STimeWindow *pCalcRange, SSHashObj *pWalProgress, bool isHistory); + STimeWindow *pCalcRange, SSHashObj *pWalProgress, bool isHistory, + bool shrinkRange); int32_t stTriggerTaskFetchRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRecalcRequest **ppReq); // interfaces called by stream mgmt thread diff --git a/source/libs/new-stream/src/streamTriggerTask.c b/source/libs/new-stream/src/streamTriggerTask.c index c71229ead54..df3e7cc5812 100644 --- a/source/libs/new-stream/src/streamTriggerTask.c +++ b/source/libs/new-stream/src/streamTriggerTask.c @@ -37,7 +37,6 @@ #define STREAM_TRIGGER_REALTIME_SESSIONID 1 #define STREAM_TRIGGER_HISTORY_SESSIONID 2 -#define STREAM_TRIGGER_HISTORY_STEP_MS (10 * MILLISECOND_PER_DAY) // 10d #define STREAM_TRIGGER_RECALC_MERGE_MS (30 * MILLISECOND_PER_MINUTE) // 30min #define IS_TRIGGER_GROUP_TO_CHECK(pGroup) \ @@ -1081,7 +1080,8 @@ int32_t stTriggerTaskReleaseDropTableRequest(SStreamTriggerTask *pTask, SSTrigge } int32_t stTriggerTaskAddRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRealtimeGroup *pGroup, - STimeWindow *pCalcRange, SSHashObj *pWalProgress, bool isHistory) { + STimeWindow *pCalcRange, SSHashObj *pWalProgress, bool isHistory, + bool shrinkRange) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; bool needUnlock = false; @@ -1114,6 +1114,9 @@ int32_t stTriggerTaskAddRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRealti void *px = tSimpleHashGet(pTask->pHistoryCutoffTime, &pReq->gid, sizeof(int64_t)); pReq->scanRange.skey = ((px == NULL) ? INT64_MIN : *(int64_t *)px) + 1; } + if (shrinkRange && pCalcRange->skey != INT64_MIN) { + pReq->scanRange.skey = TMAX(pReq->scanRange.skey, pCalcRange->skey - pTask->historyStep); + } pReq->scanRange.ekey = pGroup->newThreshold; pReq->calcRange = *pCalcRange; pReq->isHistory = false; @@ -1899,6 +1902,7 @@ int32_t stTriggerTaskDeploy(SStreamTriggerTask *pTask, SStreamTriggerDeployMsg * _end, TSDB_CODE_INVALID_PARA); } pTask->precision = pMsg->precision; + pTask->historyStep = convertTimePrecision(10 * MILLISECOND_PER_DAY, TSDB_TIME_PRECISION_MILLI, pTask->precision); pTask->placeHolderBitmap = pMsg->placeHolderBitmap; pTask->streamName = taosStrdup(pMsg->streamName); code = nodesStringToNode(pMsg->triggerPrevFilter, &pTask->triggerFilter); @@ -2385,7 +2389,7 @@ int32_t stTriggerTaskExecute(SStreamTriggerTask *pTask, const SStreamMsg *pMsg) while (px != NULL) { SSTriggerRealtimeGroup *pGroup = *(SSTriggerRealtimeGroup **)px; STimeWindow range = {.skey = pReq->start, .ekey = pReq->end - 1}; - code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false); + code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, false); QUERY_CHECK_CODE(code, lino, _end); px = tSimpleHashIterate(pContext->pGroups, px, &iter); } @@ -3896,7 +3900,7 @@ static int32_t stRealtimeContextCheck(SSTriggerRealtimeContext *pContext) { pTask->historyCalcStarted = true; if (pTask->fillHistory) { - code = stTriggerTaskAddRecalcRequest(pTask, NULL, NULL, pContext->pReaderWalProgress, true); + code = stTriggerTaskAddRecalcRequest(pTask, NULL, NULL, pContext->pReaderWalProgress, true, false); QUERY_CHECK_CODE(code, lino, _end); } } @@ -4311,7 +4315,7 @@ static int32_t stRealtimeContextProcWalMeta(SSTriggerRealtimeContext *pContext, } } ST_TASK_DLOG("add recalc request for delete data, start: %" PRId64 ", end: %" PRId64, range.skey, range.ekey); - code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false); + code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, true); QUERY_CHECK_CODE(code, lino, _end); if (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_ROWS) { code = stRealtimeGroupRemovePendingCalc(pGroup, &range); @@ -4345,7 +4349,7 @@ static int32_t stRealtimeContextProcWalMeta(SSTriggerRealtimeContext *pContext, } } ST_TASK_DLOG("add recalc request for delete data, start: %" PRId64 ", end: %" PRId64, range.skey, range.ekey); - code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false); + code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, true); QUERY_CHECK_CODE(code, lino, _end); if (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_ROWS) { code = stRealtimeGroupRemovePendingCalc(pGroup, &range); @@ -6168,8 +6172,7 @@ static int32_t stHistoryContextCheck(SSTriggerHistoryContext *pContext) { pContext->status = STRIGGER_CONTEXT_FETCH_META; if (pContext->needTsdbMeta) { - // TODO(kjq): use precision of trigger table - int64_t step = STREAM_TRIGGER_HISTORY_STEP_MS; + int64_t step = pTask->historyStep; pContext->stepRange.skey = pContext->scanRange.skey; pContext->stepRange.ekey = pContext->scanRange.skey / step * step + step - 1; for (pContext->curReaderIdx = 0; pContext->curReaderIdx < TARRAY_SIZE(pTask->readerList); @@ -6315,7 +6318,7 @@ static int32_t stHistoryContextCheck(SSTriggerHistoryContext *pContext) { if (TD_DLIST_NELES(&pContext->groupsForceClose) == 0) { if (pContext->needTsdbMeta) { // TODO(kjq): use precision of trigger table - int64_t step = STREAM_TRIGGER_HISTORY_STEP_MS; + int64_t step = pTask->historyStep; QUERY_CHECK_CONDITION(pContext->stepRange.skey <= pContext->stepRange.ekey, code, lino, _end, TSDB_CODE_INTERNAL_ERROR); finished = (pContext->stepRange.ekey + 1 > pContext->scanRange.ekey); @@ -6455,7 +6458,7 @@ static int32_t stHistoryContextCheck(SSTriggerHistoryContext *pContext) { pContext->status = STRIGGER_CONTEXT_FETCH_META; if (pContext->needTsdbMeta) { // TODO(kjq): use precision of trigger table - int64_t step = STREAM_TRIGGER_HISTORY_STEP_MS; + int64_t step = pTask->historyStep; pContext->stepRange.skey = pContext->stepRange.ekey + 1; pContext->stepRange.ekey += step; for (pContext->curReaderIdx = 0; pContext->curReaderIdx < TARRAY_SIZE(pTask->readerList); @@ -7239,7 +7242,7 @@ static int32_t stRealtimeGroupAddMeta(SSTriggerRealtimeGroup *pGroup, int32_t vg if (range.skey <= range.ekey) { ST_TASK_DLOG("add recalc request for disorder data, threshold: %" PRId64 ", start: %" PRId64 ", end: %" PRId64, pGroup->oldThreshold, pMeta->skey, pMeta->ekey); - code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false); + code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, true); QUERY_CHECK_CODE(code, lino, _end); if (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_ROWS) { code = stRealtimeGroupRemovePendingCalc(pGroup, &range); @@ -8139,7 +8142,7 @@ static int32_t stRealtimeGroupGenCalcParams(SSTriggerRealtimeGroup *pGroup, int3 if (skip) { STimeWindow range = {.skey = param.wstart, .ekey = param.wend}; ST_TASK_DLOG("add recalc request for next window, groupId: %" PRId64, pGroup->gid); - code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false); + code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, true); QUERY_CHECK_CODE(code, lino, _end); } else { code = taosObjListAppend(&pGroup->pPendingCalcParams, ¶m);