Merge branch 'enh/addStreamLatency-marks' of github.com:taosdata/tdengine into enh/addStreamLatency-marks

This commit is contained in:
Haojun Liao 2025-11-12 11:26:16 +08:00
commit e862b24ca3
9 changed files with 149 additions and 96 deletions

View file

@ -153,9 +153,10 @@ typedef enum {
STABLE_TAG_FILTER_CACHE_ADD_TABLE = 2,
} ETagFilterCacheAction;
int32_t metaStableTagFilterCacheUpdateUid(
SMeta* pMeta, const SMetaEntry* pDroppedTable, ETagFilterCacheAction action);
SMeta* pMeta, const SMetaEntry* pDroppedTable,
const SMetaEntry* pSuperTable, ETagFilterCacheAction action);
int32_t metaStableTagFilterCacheDropTag(
SMeta* pMeta, tb_uid_t suid, int16_t tagColId);
SMeta* pMeta, tb_uid_t suid, col_id_t tagColId);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList);
int32_t metaPutTbGroupToCache(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,

View file

@ -952,8 +952,10 @@ _end:
TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
metaDebug(
"vgId:%d, suid:%" PRIu64 " stable tag filter cache dropped from cache",
TD_VID(pMeta->pVnode), suid);
"vgId:%d, suid:%" PRIu64 " stable tag filter cache dropped from cache"
"left stable num:%d, tag conditions num:%" PRIu32,
TD_VID(pMeta->pVnode), suid, (int32_t)taosHashGetSize(pTableEntry),
pMeta->pCache->sStableTagFilterResCache.numTagDataEntries);
}
code = taosThreadRwlockUnlock(pRwlock);
if (TSDB_CODE_SUCCESS != code) {
@ -963,28 +965,68 @@ _end:
return code;
}
static int32_t buildTagDataEntryKey(
const SArray* pColIds, STag* pTag, T_MD5_CTX* pContext) {
static int32_t getTagColSize(
const SSchema* pTagSchemas, int32_t nTagCols, col_id_t cid) {
for (int32_t i = 0; i < nTagCols; i++) {
if (pTagSchemas[i].colId == cid) {
return pTagSchemas[i].bytes;
}
}
return 0;
}
// when encode nchar tag into tag data entry key, need to convert it to var type
static FORCE_INLINE int32_t ncharToVar(char *pData, int32_t nData, char **ppOut) {
int32_t code = TSDB_CODE_SUCCESS;
char *t = taosMemoryCalloc(1, nData + VARSTR_HEADER_SIZE);
if (NULL == t) {
return terrno;
}
int32_t len = taosUcs4ToMbs(
(TdUcs4 *)pData, nData, varDataVal(t), NULL);
if (len < 0) {
taosMemoryFree(t);
return TSDB_CODE_SCALAR_CONVERT_ERROR;
}
varDataSetLen(t, len);
*ppOut = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
memcpy(*ppOut, t, len + VARSTR_HEADER_SIZE);
_return:
taosMemoryFree(t);
return code;
}
static int32_t buildTagDataEntryKey(const SArray* pColIds, const STag* pTag,
const SSchemaWrapper* pTagScheam, T_MD5_CTX* pContext) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t keyLen = 0;
char* pKey = NULL;
// get length first
for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) {
STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)};
if (tTagGet(pTag, &pTagValue)) {
keyLen += sizeof(col_id_t);
if (IS_VAR_DATA_TYPE(pTagValue.type)) {
keyLen += pTagValue.nData;
int32_t varLen = getTagColSize(
pTagScheam->pSchema, pTagScheam->nCols, pTagValue.cid);
code = varLen > 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_NOT_FOUND;
QUERY_CHECK_CODE(code, lino, _end);
keyLen += varLen;
} else {
keyLen += tDataTypes[pTagValue.type].bytes;
}
} else {
// tag value not found
code = TSDB_CODE_NOT_FOUND;
return code;
QUERY_CHECK_CODE(code, lino, _end);
}
}
char* pKey = taosMemoryCalloc(1, keyLen);
pKey = taosMemoryCalloc(1, keyLen);
if (NULL == pKey) {
code = terrno;
return code;
@ -1000,19 +1042,26 @@ static int32_t buildTagDataEntryKey(
pStart += sizeof(col_id_t);
// copy value
if (IS_VAR_DATA_TYPE(pTagValue.type) && pTagValue.pData != NULL) {
int32_t varLen = pTagValue.nData;
memcpy(pStart, pTagValue.pData, varLen);
pStart += varLen;
if (TSDB_DATA_TYPE_NCHAR == pTagValue.type) {
// need to convert nchar to var
char *pVar = NULL;
code = ncharToVar((char *)pTagValue.pData, pTagValue.nData, &pVar);
QUERY_CHECK_CODE(code, lino, _end);
memcpy(pStart, varDataVal(pVar), varDataLen(pVar));
pStart += varDataLen(pVar);
taosMemoryFree(pVar);
} else {
memcpy(pStart, pTagValue.pData, pTagValue.nData);
pStart += pTagValue.nData;
}
} else {
int32_t typeLen = tDataTypes[pTagValue.type].bytes;
memcpy(pStart, &pTagValue.i64, typeLen);
pStart += typeLen;
memcpy(pStart, &pTagValue.i64, tDataTypes[pTagValue.type].bytes);
pStart += tDataTypes[pTagValue.type].bytes;
}
} else {
// tag value not found
taosMemoryFree(pKey);
code = TSDB_CODE_NOT_FOUND;
return code;
QUERY_CHECK_CODE(code, lino, _end);
}
}
@ -1021,6 +1070,7 @@ static int32_t buildTagDataEntryKey(
tMD5Update(pContext, (uint8_t*)pKey, (uint32_t)keyLen);
tMD5Final(pContext);
_end:
taosMemFreeClear(pKey);
return code;
}
@ -1028,8 +1078,9 @@ static int32_t buildTagDataEntryKey(
// remove the dropped table uid from all cache entries
// pDroppedTable is the dropped child table meta entry
int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta,
const SMetaEntry* pDroppedTable, ETagFilterCacheAction action) {
if (pMeta == NULL || pDroppedTable == NULL) {
const SMetaEntry* pChildTable, const SMetaEntry* pSuperTable,
ETagFilterCacheAction action) {
if (pMeta == NULL || pChildTable == NULL || pSuperTable == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t lino = 0;
@ -1040,7 +1091,7 @@ int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta,
code = taosThreadRwlockWrlock(pRwlock);
TSDB_CHECK_CODE(code, lino, _end);
tb_uid_t suid = pDroppedTable->ctbEntry.suid;;
tb_uid_t suid = pChildTable->ctbEntry.suid;;
STagConds** pTagConds =
(STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
if (pTagConds != NULL) {
@ -1053,11 +1104,12 @@ int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta,
int32_t keyLen = 0;
char* pKey = NULL;
T_MD5_CTX context = {0};
code = buildTagDataEntryKey(pColIds, (STag*)pDroppedTable->ctbEntry.pTags, &context);
code = buildTagDataEntryKey(pColIds, (STag*)pChildTable->ctbEntry.pTags,
&pSuperTable->stbEntry.schemaTag, &context);
if (code != TSDB_CODE_SUCCESS) {
metaError("vgId:%d, suid:%" PRIu64 " failed to build tag condition key for dropped table uid:%" PRIu64
" since %s",
TD_VID(pMeta->pVnode), suid, pDroppedTable->uid, tstrerror(code));
metaError("vgId:%d, suid:%" PRIu64 " failed to build tag condition"
" key for dropped table uid:%" PRIu64 " since %s",
TD_VID(pMeta->pVnode), suid, pChildTable->uid, tstrerror(code));
goto _end;
}
@ -1069,17 +1121,18 @@ int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta,
if (action == STABLE_TAG_FILTER_CACHE_DROP_TABLE) {
for (int32_t i = 0; i < taosArrayGetSize(*pArray); i++) {
uint64_t uid = *(uint64_t*)taosArrayGet(*pArray, i);
if (uid == pDroppedTable->uid) {
if (uid == pChildTable->uid) {
taosArrayRemove(*pArray, i);
metaDebug("vgId:%d, suid:%" PRIu64 " removed dropped table uid:%" PRIu64
metaDebug("vgId:%d, suid:%" PRIu64
" removed dropped table uid:%" PRIu64
" from stable tag filter cache",
TD_VID(pMeta->pVnode), suid, pDroppedTable->uid);
TD_VID(pMeta->pVnode), suid, pChildTable->uid);
break;
}
}
} else {
// STABLE_TAG_FILTER_CACHE_ADD_TABLE
void* _tmp = taosArrayPush(*pArray, &pDroppedTable->uid);
void* _tmp = taosArrayPush(*pArray, &pChildTable->uid);
}
}
}
@ -1094,7 +1147,7 @@ _end:
"vgId:%d, suid:%" PRIu64 " update table uid:%" PRIu64
" in stable tag filter cache, action:%d",
TD_VID(pMeta->pVnode),
pDroppedTable->ctbEntry.suid, pDroppedTable->uid, action);
pChildTable->ctbEntry.suid, pChildTable->uid, action);
}
code = taosThreadRwlockUnlock(pRwlock);
if (TSDB_CODE_SUCCESS != code) {
@ -1104,8 +1157,8 @@ _end:
return code;
}
int32_t metaStableTagFilterCacheDropTag(SMeta* pMeta,
tb_uid_t suid, col_id_t cid) {
int32_t metaStableTagFilterCacheDropTag(
SMeta* pMeta, tb_uid_t suid, col_id_t cid) {
if (pMeta == NULL) {
return TSDB_CODE_INVALID_PARA;
}

View file

@ -1357,7 +1357,7 @@ static int32_t metaHandleChildTableCreateImpl(SMeta *pMeta, const SMetaEntry *pE
}
ret = metaStableTagFilterCacheUpdateUid(
pMeta, pEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE);
pMeta, pEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE);
if (ret < 0) {
metaErr(TD_VID(pMeta->pVnode), ret);
}
@ -1496,7 +1496,7 @@ static int32_t metaHandleVirtualChildTableCreateImpl(SMeta *pMeta, const SMetaEn
}
ret = metaStableTagFilterCacheUpdateUid(
pMeta, pEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE);
pMeta, pEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE);
if (ret < 0) {
metaErr(TD_VID(pMeta->pVnode), ret);
}
@ -1656,7 +1656,7 @@ static int32_t metaHandleChildTableDropImpl(SMeta *pMeta, const SMetaHandleParam
}
ret = metaStableTagFilterCacheUpdateUid(
pMeta, pChild, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
pMeta, pChild, pSuper, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
if (ret < 0) {
metaErr(TD_VID(pMeta->pVnode), ret);
}
@ -1849,7 +1849,7 @@ static int32_t metaHandleVirtualChildTableDropImpl(SMeta *pMeta, const SMetaHand
}
ret = metaStableTagFilterCacheUpdateUid(
pMeta, pChild, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
pMeta, pChild, pSuper, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
if (ret < 0) {
metaErr(TD_VID(pMeta->pVnode), ret);
}
@ -2076,12 +2076,12 @@ static int32_t metaHandleVirtualChildTableUpdateImpl(SMeta *pMeta, const SMetaHa
// update stable tag filter cache: drop old then add new
code = metaStableTagFilterCacheUpdateUid(
pMeta, pOldEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
pMeta, pOldEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
if (TSDB_CODE_SUCCESS != code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
code = metaStableTagFilterCacheUpdateUid(
pMeta, pEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE);
pMeta, pEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE);
if (TSDB_CODE_SUCCESS != code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
@ -2126,12 +2126,12 @@ static int32_t metaHandleChildTableUpdateImpl(SMeta *pMeta, const SMetaHandlePar
// update stable tag filter cache: drop old then add new
code = metaStableTagFilterCacheUpdateUid(
pMeta, pOldEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
pMeta, pOldEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
if (TSDB_CODE_SUCCESS != code) {
metaErr(TD_VID(pMeta->pVnode), code);
}
code = metaStableTagFilterCacheUpdateUid(
pMeta, pEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE);
pMeta, pEntry, pSuperEntry, STABLE_TAG_FILTER_CACHE_ADD_TABLE);
if (TSDB_CODE_SUCCESS != code) {
metaErr(TD_VID(pMeta->pVnode), code);
}

View file

@ -639,6 +639,14 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
}
if (pSysTbl) *pSysTbl = metaTbInFilterCache(pMeta, stbEntry.name, 1) ? 1 : 0;
ret = metaStableTagFilterCacheUpdateUid(
pMeta, &e, &stbEntry, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
if (ret < 0) {
metaError("vgId:%d, failed to update stable tag filter cache:%s "
"uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name,
e.ctbEntry.suid, tstrerror(ret));
}
SSchema *pTagColumn = NULL;
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
@ -728,13 +736,6 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
metaError("vgId:%d, failed to clear uid cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name,
e.ctbEntry.suid, tstrerror(ret));
}
ret = metaStableTagFilterCacheUpdateUid(
pMeta, &e, STABLE_TAG_FILTER_CACHE_DROP_TABLE);
if (ret < 0) {
metaError("vgId:%d, failed to update stable tag filter cache:%s "
"uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name,
e.ctbEntry.suid, tstrerror(ret));
}
ret = metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
if (ret < 0) {
metaError("vgId:%d, failed to clear group cache:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), e.name,

View file

@ -2420,7 +2420,8 @@ static int32_t vnodeProcessStreamTsdbTriggerDataReq(SVnode* pVnode, SRpcMsg* pMs
ST_TASK_DLOG("vgId:%d %s get skey:%" PRId64 ", eksy:%" PRId64 ", uid:%" PRId64 ", gId:%" PRIu64 ", rows:%" PRId64,
TD_VID(pVnode), __func__, pTaskInner->pResBlock->info.window.skey, pTaskInner->pResBlock->info.window.ekey,
pTaskInner->pResBlock->info.id.uid, pTaskInner->pResBlock->info.id.groupId, pTaskInner->pResBlock->info.rows);
if (pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) {
if ((req->tsdbTriggerDataReq.gid != 0 && pTaskInner->pResBlockDst->info.rows >= STREAM_RETURN_ROWS_NUM) ||
(req->tsdbTriggerDataReq.gid == 0 && pTaskInner->pResBlockDst->info.rows >= 0)) { //todo optimize send multi blocks in one group
break;
}
}

View file

@ -717,12 +717,12 @@ static bool canOptimizeTagCondFilter(const SNode* pTagCond) {
typedef struct {
col_id_t colId;
SNode* pValueNode;
int32_t bytes; // length defined in schema
} STagDataEntry;
static int compareTagDataEntry(const void* a, const void* b) {
STagDataEntry* p1 = (STagDataEntry*)a;
STagDataEntry* p2 = (STagDataEntry*)b;
return compareInt16Val(&p1->colId, &p2->colId);
}
@ -736,8 +736,8 @@ static int32_t buildTagDataEntryKey(SArray* pIdWithValue, char** keyBuf, int32_t
}
char* pStart = *keyBuf;
for (int32_t i = 0; i < taosArrayGetSize(pIdWithValue); ++i) {
STagDataEntry* entry = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
SValueNode* pValueNode = (SValueNode*)entry->pValueNode;
STagDataEntry* entry = (STagDataEntry*)taosArrayGet(pIdWithValue, i);
SValueNode* pValueNode = (SValueNode*)entry->pValueNode;
(void)memcpy(pStart, &entry->colId, sizeof(col_id_t));
pStart += sizeof(col_id_t);
@ -808,9 +808,8 @@ static void extractTagDataEntry(
STagDataEntry entry = {0};
entry.colId = pColNode->colId;
entry.pValueNode = (SNode*)pValueNode;
entry.bytes = pColNode->node.resType.bytes;
void* _tmp = taosArrayPush(pIdWithValue, &entry);
STagDataEntry* pLastEntry = taosArrayGetLast(pIdWithValue);
((SValueNode*)pLastEntry->pValueNode)->node.resType = pColNode->node.resType;
}
static int32_t extractTagFilterTagDataEntries(
@ -851,8 +850,7 @@ static int32_t genStableTagFilterDigest(const SNode* pTagCond, T_MD5_CTX* pConte
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < taosArrayGetSize(pIdWithVal); ++i) {
STagDataEntry* pEntry = taosArrayGet(pIdWithVal, i);
len += sizeof(col_id_t) +
((SValueNode*)pEntry->pValueNode)->node.resType.bytes;
len += sizeof(col_id_t) + pEntry->bytes;
}
code = buildTagDataEntryKey(pIdWithVal, &payload, len);
QUERY_CHECK_CODE(code, lino, _end);
@ -2030,7 +2028,7 @@ static void extractTagColId(SOperatorNode* pOpNode, SArray* pColIdArray) {
}
static int32_t buildTagCondKey(
SNode* pTagCond, char** pTagCondKey,
const SNode* pTagCond, char** pTagCondKey,
int32_t* tagCondKeyLen, SArray** pTagColIds) {
if (NULL == pTagCond ||
(nodeType(pTagCond) != QUERY_NODE_OPERATOR &&
@ -2084,7 +2082,6 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
int32_t lino = 0;
size_t numOfTables = 0;
bool listAdded = false;
char* pTagCondStr = NULL;
pListInfo->idInfo.suid = pScanNode->suid;
pListInfo->idInfo.tableType = pScanNode->tableType;
@ -2124,8 +2121,6 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
code = ctx.code;
goto _error;
}
code = nodesNodeToString(tmp, false, &pTagCondStr, NULL);
code = genStableTagFilterDigest(tmp, &contextStable);
nodesDestroyNode(tmp);
} else {
@ -2148,9 +2143,12 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
pTagColIds = NULL;
digest[0] = 1;
memcpy(digest + 1, contextStable.digest, tListLen(contextStable.digest));
qDebug("suid:%" PRIu64 ", %s retrieve table uid list from stable cache, key:%s, kenLen:%d, tagCondStr:%s, numOfTables:%d",
pScanNode->suid, idstr, pTagCondKey, tagCondKeyLen, pTagCondStr, (int32_t)taosArrayGetSize(pUidList));
memcpy(
digest + 1, contextStable.digest, tListLen(contextStable.digest));
qDebug("suid:%" PRIu64 ", %s retrieve table uid list from stable cache,"
" key:%s, kenLen:%d, numOfTables:%d",
pScanNode->suid, idstr, pTagCondKey, tagCondKeyLen,
(int32_t)taosArrayGetSize(pUidList));
goto _end;
} else {
qDebug("suid:%" PRIu64
@ -2242,8 +2240,10 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
memcpy(digest + 1, context.digest, tListLen(context.digest));
}
if (tsStableTagFilterCache && isStream && canCacheTagCondFilter) {
qInfo("suid:%" PRIu64 ", %s add uid list to stableTagFilterCache, key:%s, keyLen:%d, condition:%s, uidListSize:%d",
pScanNode->suid, idstr, pTagCondKey, tagCondKeyLen, pTagCondStr, (int32_t)taosArrayGetSize(pUidList));
qInfo("suid:%" PRIu64 ", %s add uid list to stableTagFilterCache, key:%s,"
" keyLen:%d, uidListSize:%d",
pScanNode->suid, idstr, pTagCondKey, tagCondKeyLen,
(int32_t)taosArrayGetSize(pUidList));
code = pStorageAPI->metaFn.putStableCachedTableList(
pVnode, pScanNode->suid, pTagCondKey, tagCondKeyLen,
contextStable.digest, tListLen(contextStable.digest),
@ -2276,7 +2276,6 @@ _error:
taosArrayDestroy(pUidList);
taosArrayDestroy(pTagColIds);
taosMemFreeClear(pTagCondKey);
taosMemFreeClear(pTagCondStr);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}

View file

@ -331,6 +331,7 @@ typedef struct SStreamTriggerTask {
bool ignoreNoDataTrigger;
bool hasTriggerFilter;
int8_t precision;
int64_t historyStep;
int64_t placeHolderBitmap;
SNode *triggerFilter;
// trigger options: old version, to be removed
@ -405,7 +406,8 @@ int32_t stTriggerTaskAcquireRequest(SStreamTriggerTask *pTask, int64_t sessionId
int32_t stTriggerTaskReleaseRequest(SStreamTriggerTask *pTask, SSTriggerCalcRequest **ppRequest);
int32_t stTriggerTaskAddRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRealtimeGroup *pGroup,
STimeWindow *pCalcRange, SSHashObj *pWalProgress, bool isHistory);
STimeWindow *pCalcRange, SSHashObj *pWalProgress, bool isHistory,
bool shrinkRange);
int32_t stTriggerTaskFetchRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRecalcRequest **ppReq);
// interfaces called by stream mgmt thread

View file

@ -37,7 +37,6 @@
#define STREAM_TRIGGER_REALTIME_SESSIONID 1
#define STREAM_TRIGGER_HISTORY_SESSIONID 2
#define STREAM_TRIGGER_HISTORY_STEP_MS (10 * MILLISECOND_PER_DAY) // 10d
#define STREAM_TRIGGER_RECALC_MERGE_MS (30 * MILLISECOND_PER_MINUTE) // 30min
#define IS_TRIGGER_GROUP_TO_CHECK(pGroup) \
@ -1081,7 +1080,8 @@ int32_t stTriggerTaskReleaseDropTableRequest(SStreamTriggerTask *pTask, SSTrigge
}
int32_t stTriggerTaskAddRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRealtimeGroup *pGroup,
STimeWindow *pCalcRange, SSHashObj *pWalProgress, bool isHistory) {
STimeWindow *pCalcRange, SSHashObj *pWalProgress, bool isHistory,
bool shrinkRange) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
bool needUnlock = false;
@ -1114,6 +1114,9 @@ int32_t stTriggerTaskAddRecalcRequest(SStreamTriggerTask *pTask, SSTriggerRealti
void *px = tSimpleHashGet(pTask->pHistoryCutoffTime, &pReq->gid, sizeof(int64_t));
pReq->scanRange.skey = ((px == NULL) ? INT64_MIN : *(int64_t *)px) + 1;
}
if (shrinkRange && pCalcRange->skey != INT64_MIN) {
pReq->scanRange.skey = TMAX(pReq->scanRange.skey, pCalcRange->skey - pTask->historyStep);
}
pReq->scanRange.ekey = pGroup->newThreshold;
pReq->calcRange = *pCalcRange;
pReq->isHistory = false;
@ -1899,6 +1902,7 @@ int32_t stTriggerTaskDeploy(SStreamTriggerTask *pTask, SStreamTriggerDeployMsg *
_end, TSDB_CODE_INVALID_PARA);
}
pTask->precision = pMsg->precision;
pTask->historyStep = convertTimePrecision(10 * MILLISECOND_PER_DAY, TSDB_TIME_PRECISION_MILLI, pTask->precision);
pTask->placeHolderBitmap = pMsg->placeHolderBitmap;
pTask->streamName = taosStrdup(pMsg->streamName);
code = nodesStringToNode(pMsg->triggerPrevFilter, &pTask->triggerFilter);
@ -2385,7 +2389,7 @@ int32_t stTriggerTaskExecute(SStreamTriggerTask *pTask, const SStreamMsg *pMsg)
while (px != NULL) {
SSTriggerRealtimeGroup *pGroup = *(SSTriggerRealtimeGroup **)px;
STimeWindow range = {.skey = pReq->start, .ekey = pReq->end - 1};
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false);
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, false);
QUERY_CHECK_CODE(code, lino, _end);
px = tSimpleHashIterate(pContext->pGroups, px, &iter);
}
@ -3670,31 +3674,23 @@ static int32_t stRealtimeContextRetryCalcRequest(SSTriggerRealtimeContext *pCont
pReq->createTable = true;
SSTriggerRealtimeGroup *pGroup = stRealtimeContextGetCurrentGroup(pContext);
QUERY_CHECK_NULL(pGroup, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
if (pReq->createTable && pTask->hasPartitionBy || (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_IDX) ||
(pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_TBNAME)) {
needTagValue = true;
}
if (needTagValue && taosArrayGetSize(pReq->groupColVals) == 0) {
void *px = tSimpleHashGet(pContext->pGroupColVals, &pGroup->gid, sizeof(int64_t));
if (px == NULL) {
code = stRealtimeContextSendPullReq(pContext, STRIGGER_PULL_GROUP_COL_VALUE);
QUERY_CHECK_CODE(code, lino, _end);
goto _end;
} else {
SArray *pGroupColVals = *(SArray **)px;
for (int32_t i = 0; i < TARRAY_SIZE(pGroupColVals); i++) {
SStreamGroupValue *pValue = TARRAY_GET_ELEM(pGroupColVals, i);
SStreamGroupValue *pDst = taosArrayPush(pReq->groupColVals, pValue);
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
if (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL) {
pDst->data.pData = taosMemoryMalloc(pDst->data.nData);
QUERY_CHECK_NULL(pDst->data.pData, code, lino, _end, terrno);
TAOS_MEMCPY(pDst->data.pData, pValue->data.pData, pDst->data.nData);
}
void *px = tSimpleHashGet(pContext->pGroupColVals, &pReq->gid, sizeof(int64_t));
QUERY_CHECK_NULL(px, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
SArray *pGroupColVals = *(SArray **)px;
for (int32_t i = 0; i < TARRAY_SIZE(pGroupColVals); i++) {
SStreamGroupValue *pValue = TARRAY_GET_ELEM(pGroupColVals, i);
SStreamGroupValue *pDst = taosArrayPush(pReq->groupColVals, pValue);
QUERY_CHECK_NULL(pDst, code, lino, _end, terrno);
if (IS_VAR_DATA_TYPE(pValue->data.type) || pValue->data.type == TSDB_DATA_TYPE_DECIMAL) {
pDst->data.pData = taosMemoryMalloc(pDst->data.nData);
QUERY_CHECK_NULL(pDst->data.pData, code, lino, _end, terrno);
TAOS_MEMCPY(pDst->data.pData, pValue->data.pData, pDst->data.nData);
}
}
}
@ -3904,7 +3900,7 @@ static int32_t stRealtimeContextCheck(SSTriggerRealtimeContext *pContext) {
pTask->historyCalcStarted = true;
if (pTask->fillHistory) {
code = stTriggerTaskAddRecalcRequest(pTask, NULL, NULL, pContext->pReaderWalProgress, true);
code = stTriggerTaskAddRecalcRequest(pTask, NULL, NULL, pContext->pReaderWalProgress, true, false);
QUERY_CHECK_CODE(code, lino, _end);
}
}
@ -4319,7 +4315,7 @@ static int32_t stRealtimeContextProcWalMeta(SSTriggerRealtimeContext *pContext,
}
}
ST_TASK_DLOG("add recalc request for delete data, start: %" PRId64 ", end: %" PRId64, range.skey, range.ekey);
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false);
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, true);
QUERY_CHECK_CODE(code, lino, _end);
if (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_ROWS) {
code = stRealtimeGroupRemovePendingCalc(pGroup, &range);
@ -4353,7 +4349,7 @@ static int32_t stRealtimeContextProcWalMeta(SSTriggerRealtimeContext *pContext,
}
}
ST_TASK_DLOG("add recalc request for delete data, start: %" PRId64 ", end: %" PRId64, range.skey, range.ekey);
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false);
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, true);
QUERY_CHECK_CODE(code, lino, _end);
if (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_ROWS) {
code = stRealtimeGroupRemovePendingCalc(pGroup, &range);
@ -6176,8 +6172,7 @@ static int32_t stHistoryContextCheck(SSTriggerHistoryContext *pContext) {
pContext->status = STRIGGER_CONTEXT_FETCH_META;
if (pContext->needTsdbMeta) {
// TODO(kjq): use precision of trigger table
int64_t step = STREAM_TRIGGER_HISTORY_STEP_MS;
int64_t step = pTask->historyStep;
pContext->stepRange.skey = pContext->scanRange.skey;
pContext->stepRange.ekey = pContext->scanRange.skey / step * step + step - 1;
for (pContext->curReaderIdx = 0; pContext->curReaderIdx < TARRAY_SIZE(pTask->readerList);
@ -6323,7 +6318,7 @@ static int32_t stHistoryContextCheck(SSTriggerHistoryContext *pContext) {
if (TD_DLIST_NELES(&pContext->groupsForceClose) == 0) {
if (pContext->needTsdbMeta) {
// TODO(kjq): use precision of trigger table
int64_t step = STREAM_TRIGGER_HISTORY_STEP_MS;
int64_t step = pTask->historyStep;
QUERY_CHECK_CONDITION(pContext->stepRange.skey <= pContext->stepRange.ekey, code, lino, _end,
TSDB_CODE_INTERNAL_ERROR);
finished = (pContext->stepRange.ekey + 1 > pContext->scanRange.ekey);
@ -6463,7 +6458,7 @@ static int32_t stHistoryContextCheck(SSTriggerHistoryContext *pContext) {
pContext->status = STRIGGER_CONTEXT_FETCH_META;
if (pContext->needTsdbMeta) {
// TODO(kjq): use precision of trigger table
int64_t step = STREAM_TRIGGER_HISTORY_STEP_MS;
int64_t step = pTask->historyStep;
pContext->stepRange.skey = pContext->stepRange.ekey + 1;
pContext->stepRange.ekey += step;
for (pContext->curReaderIdx = 0; pContext->curReaderIdx < TARRAY_SIZE(pTask->readerList);
@ -7247,7 +7242,7 @@ static int32_t stRealtimeGroupAddMeta(SSTriggerRealtimeGroup *pGroup, int32_t vg
if (range.skey <= range.ekey) {
ST_TASK_DLOG("add recalc request for disorder data, threshold: %" PRId64 ", start: %" PRId64 ", end: %" PRId64,
pGroup->oldThreshold, pMeta->skey, pMeta->ekey);
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false);
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, true);
QUERY_CHECK_CODE(code, lino, _end);
if (pTask->placeHolderBitmap & PLACE_HOLDER_PARTITION_ROWS) {
code = stRealtimeGroupRemovePendingCalc(pGroup, &range);
@ -8147,7 +8142,7 @@ static int32_t stRealtimeGroupGenCalcParams(SSTriggerRealtimeGroup *pGroup, int3
if (skip) {
STimeWindow range = {.skey = param.wstart, .ekey = param.wend};
ST_TASK_DLOG("add recalc request for next window, groupId: %" PRId64, pGroup->gid);
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false);
code = stTriggerTaskAddRecalcRequest(pTask, pGroup, &range, pContext->pReaderWalProgress, false, true);
QUERY_CHECK_CODE(code, lino, _end);
} else {
code = taosObjListAppend(&pGroup->pPendingCalcParams, &param);

View file

@ -286,6 +286,7 @@ class TestStreamRecalcWithOptions:
# In real scenario, the current stream processing time would determine what's expired
tdSql.execute("insert into tdb.exp1 values ('2025-01-01 02:04:00', 15, 150, 1.75, 'expired');")
time.sleep(5)
# Manual recalculation for expired data - should still work since manual recalc bypasses expiry
tdLog.info("Test EXPIRED_TIME manual recalculation - expired data")
tdSql.execute("recalculate stream rdb.s_expired_interval from '2025-01-01 02:04:00' to '2025-01-01 02:14:00';")