TDengine/source/libs/executor/src/mergejoinoperator.c

2096 lines
63 KiB
C
Raw Permalink Normal View History

2022-05-14 16:04:51 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2025-07-17 06:17:47 +00:00
// clang-format off
2023-04-28 03:42:34 +00:00
#include "executorInt.h"
#include "filter.h"
2022-05-14 16:04:51 +00:00
#include "function.h"
2023-04-28 03:42:34 +00:00
#include "operator.h"
2022-05-14 16:04:51 +00:00
#include "os.h"
#include "querynodes.h"
2023-04-28 03:42:34 +00:00
#include "querytask.h"
2022-05-14 16:04:51 +00:00
#include "tcompare.h"
#include "tdatablock.h"
2022-05-14 16:04:51 +00:00
#include "thash.h"
#include "tmsg.h"
2022-05-14 16:04:51 +00:00
#include "ttypes.h"
2024-02-29 09:52:53 +00:00
#include "functionMgt.h"
2023-12-01 10:33:50 +00:00
#include "mergejoin.h"
2025-07-17 06:17:47 +00:00
// clang-format on
2022-05-14 16:04:51 +00:00
2024-02-01 06:58:08 +00:00
int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
2024-02-29 09:52:53 +00:00
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
2024-07-22 03:06:24 +00:00
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2024-02-01 06:58:08 +00:00
pGrp->beginIdx = pTable->blkRowIdx;
pGrp->readIdx = pTable->blkRowIdx;
pTable->blkRowIdx++;
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
if (timestamp != *(int64_t*)pEndVal) {
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) {
continue;
}
pGrp->endIdx = pTable->blkRowIdx - 1;
return TSDB_CODE_SUCCESS;
}
}
pGrp->endIdx = pTable->blk->info.rows - 1;
pTable->blkRowIdx = pTable->blk->info.rows;
if (wholeBlk) {
*wholeBlk = true;
}
return TSDB_CODE_SUCCESS;
}
2024-07-22 03:06:24 +00:00
int32_t mJoinTrimKeepFirstRow(SSDataBlock* pBlock) {
2024-03-25 07:19:16 +00:00
int32_t bmLen = BitmapLen(pBlock->info.rows);
2025-07-17 06:17:47 +00:00
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2024-03-25 07:19:16 +00:00
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
2024-07-22 03:06:24 +00:00
if (NULL == pDst) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2025-07-17 06:17:47 +00:00
2024-03-25 07:19:16 +00:00
// it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
continue;
}
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
pDst->varmeta.length = 0;
if (!colDataIsNull_var(pDst, 0)) {
2025-07-17 06:17:47 +00:00
char* p1 = colDataGetVarData(pDst, 0);
// int32_t len = calcStrBytesByType(pDst->info.type, p1);
// if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
// len = getJsonValueLen(p1);
// } else if (IS_STR_DATA_BLOB(pDst->info.type)) {
// len = blobDataTLen(p1);
// } else {
// len = varDataTLen(p1);
// }
pDst->varmeta.length = calcStrBytesByType(pDst->info.type, p1);
2024-03-25 07:19:16 +00:00
}
} else {
bool isNull = colDataIsNull_f(pDst, 0);
2024-03-25 07:19:16 +00:00
2024-07-22 03:06:24 +00:00
TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
2024-03-25 07:19:16 +00:00
if (isNull) {
colDataSetNull_f(pDst->nullbitmap, 0);
}
}
}
pBlock->info.rows = 1;
2024-07-22 03:06:24 +00:00
return TSDB_CODE_SUCCESS;
2024-03-25 07:19:16 +00:00
}
2024-07-22 03:06:24 +00:00
int32_t mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
2024-01-10 10:56:49 +00:00
// int32_t totalRows = pBlock->info.rows;
2024-07-22 03:06:24 +00:00
int32_t code = TSDB_CODE_SUCCESS;
2024-01-10 10:56:49 +00:00
int32_t bmLen = BitmapLen(totalRows);
char* pBitmap = NULL;
int32_t maxRows = 0;
2025-07-17 06:17:47 +00:00
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2024-01-10 10:56:49 +00:00
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, i);
2024-07-22 03:06:24 +00:00
if (NULL == pDst) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2024-01-10 10:56:49 +00:00
// it is a reserved column for scalar function, and no data in this column yet.
if (pDst->pData == NULL || (IS_VAR_DATA_TYPE(pDst->info.type) && pDst->varmeta.length == 0)) {
continue;
}
int32_t numOfRows = 0;
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
int32_t j = 0;
pDst->varmeta.length = 0;
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (colDataIsNull_var(pDst, j)) {
colDataSetNull_var(pDst, numOfRows);
} else {
// fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first
// copy it to p2
char* p1 = colDataGetVarData(pDst, j);
2025-07-17 06:17:47 +00:00
int32_t len = calcStrBytesByType(pDst->info.type, p1);
// if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
// len = getJsonValueLen(p1);
// } else if (IS_STR_DATA_BLOB(pDst->info.type)) {
// len = blobDataTLen(p1);
// } else {
// len = varDataTLen(p1);
// }
2024-01-10 10:56:49 +00:00
char* p2 = taosMemoryMalloc(len);
2024-09-14 09:28:24 +00:00
if (NULL == p2) {
MJ_ERR_RET(terrno);
}
2024-07-22 03:06:24 +00:00
TAOS_MEMCPY(p2, p1, len);
code = colDataSetVal(pDst, numOfRows, p2, false);
if (code) {
taosMemoryFreeClear(p2);
MJ_ERR_RET(terrno);
}
2024-01-10 10:56:49 +00:00
taosMemoryFree(p2);
}
numOfRows += 1;
j += 1;
break;
}
if (maxRows < numOfRows) {
maxRows = numOfRows;
}
} else {
if (pBitmap == NULL) {
pBitmap = taosMemoryCalloc(1, bmLen);
2024-07-22 03:06:24 +00:00
if (NULL == pBitmap) {
MJ_ERR_RET(terrno);
}
2024-01-10 10:56:49 +00:00
}
2024-07-22 03:06:24 +00:00
TAOS_MEMCPY(pBitmap, pDst->nullbitmap, bmLen);
TAOS_MEMSET(pDst->nullbitmap, 0, bmLen);
2024-01-10 10:56:49 +00:00
int32_t j = 0;
switch (pDst->info.type) {
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (BMIsNull(pBitmap, j)) {
2024-01-10 10:56:49 +00:00
colDataSetNull_f(pDst->nullbitmap, numOfRows);
} else {
((int64_t*)pDst->pData)[numOfRows] = ((int64_t*)pDst->pData)[j];
}
numOfRows += 1;
j += 1;
break;
}
break;
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (BMIsNull(pBitmap, j)) {
2024-01-10 10:56:49 +00:00
colDataSetNull_f(pDst->nullbitmap, numOfRows);
} else {
((int32_t*)pDst->pData)[numOfRows] = ((int32_t*)pDst->pData)[j];
}
numOfRows += 1;
j += 1;
break;
}
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (BMIsNull(pBitmap, j)) {
2024-01-10 10:56:49 +00:00
colDataSetNull_f(pDst->nullbitmap, numOfRows);
} else {
((int16_t*)pDst->pData)[numOfRows] = ((int16_t*)pDst->pData)[j];
}
numOfRows += 1;
j += 1;
break;
}
break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
while (j < totalRows) {
if (pBoolList[j] == 0) {
j += 1;
continue;
}
if (BMIsNull(pBitmap, j)) {
2024-01-10 10:56:49 +00:00
colDataSetNull_f(pDst->nullbitmap, numOfRows);
} else {
((int8_t*)pDst->pData)[numOfRows] = ((int8_t*)pDst->pData)[j];
}
numOfRows += 1;
j += 1;
break;
}
break;
}
}
if (maxRows < numOfRows) {
maxRows = numOfRows;
}
}
pBlock->info.rows = maxRows;
if (pBitmap != NULL) {
taosMemoryFree(pBitmap);
}
2024-07-22 03:06:24 +00:00
return TSDB_CODE_SUCCESS;
2024-01-10 10:56:49 +00:00
}
2025-07-17 06:17:47 +00:00
int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build,
int32_t startRowIdx) {
2024-01-03 05:52:36 +00:00
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
SColumnInfoData* p = NULL;
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
2025-07-17 06:17:47 +00:00
if (!build->pHashGrpRows->allRowsMatch &&
(status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED)) {
2024-01-03 08:25:45 +00:00
if (status == FILTER_RESULT_ALL_QUALIFIED && taosArrayGetSize(build->pHashCurGrp) == pBlock->info.rows) {
2024-01-03 05:52:36 +00:00
build->pHashGrpRows->allRowsMatch = true;
} else {
bool* pRes = (bool*)p->pData;
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2025-07-17 06:17:47 +00:00
if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + i)) ||
MJOIN_ROW_BITMAP_SET(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i)) {
2024-01-03 05:52:36 +00:00
continue;
}
MJOIN_SET_ROW_BITMAP(build->pRowBitmap, build->pHashGrpRows->rowBitmapOffset, startRowIdx + i);
build->pHashGrpRows->rowMatchNum++;
}
if (build->pHashGrpRows->rowMatchNum == taosArrayGetSize(build->pHashGrpRows->pRows)) {
build->pHashGrpRows->allRowsMatch = true;
}
}
}
2024-07-29 02:29:40 +00:00
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
2024-01-03 05:52:36 +00:00
_err:
colDataDestroy(p);
taosMemoryFree(p);
2025-07-17 06:17:47 +00:00
2024-01-03 05:52:36 +00:00
return code;
}
2025-07-17 06:17:47 +00:00
int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build,
int32_t startGrpIdx, int32_t startRowIdx) {
2024-01-03 05:52:36 +00:00
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
2025-07-17 06:17:47 +00:00
int32_t code = TSDB_CODE_SUCCESS;
2024-01-03 05:52:36 +00:00
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
SColumnInfoData* p = NULL;
2024-07-22 03:06:24 +00:00
code = filterSetDataFromSlotId(pFilterInfo, &param1);
2024-01-03 05:52:36 +00:00
if (code != TSDB_CODE_SUCCESS) {
2024-07-22 03:06:24 +00:00
goto _return;
2024-01-03 05:52:36 +00:00
}
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (code != TSDB_CODE_SUCCESS) {
2024-07-22 03:06:24 +00:00
goto _return;
2024-01-03 05:52:36 +00:00
}
int32_t rowNum = 0;
2025-07-17 06:17:47 +00:00
bool* pRes = (bool*)p->pData;
2024-01-03 05:52:36 +00:00
int32_t grpNum = taosArrayGetSize(build->eqGrps);
if (status == FILTER_RESULT_ALL_QUALIFIED || status == FILTER_RESULT_PARTIAL_QUALIFIED) {
for (int32_t i = startGrpIdx; i < grpNum && rowNum < pBlock->info.rows; startRowIdx = 0, ++i) {
SMJoinGrpRows* buildGrp = taosArrayGet(build->eqGrps, i);
2024-07-22 03:06:24 +00:00
if (NULL == buildGrp) {
MJ_ERR_JRET(terrno);
}
2024-01-03 05:52:36 +00:00
if (buildGrp->allRowsMatch) {
rowNum += buildGrp->endIdx - startRowIdx + 1;
continue;
}
2025-07-17 06:17:47 +00:00
if (status == FILTER_RESULT_ALL_QUALIFIED && startRowIdx == buildGrp->beginIdx &&
((pBlock->info.rows - rowNum) >= (buildGrp->endIdx - startRowIdx + 1))) {
2024-01-03 05:52:36 +00:00
buildGrp->allRowsMatch = true;
rowNum += buildGrp->endIdx - startRowIdx + 1;
continue;
}
2024-01-05 06:40:05 +00:00
for (int32_t m = startRowIdx; m <= buildGrp->endIdx && rowNum < pBlock->info.rows; ++m, ++rowNum) {
2025-07-17 06:17:47 +00:00
if ((status == FILTER_RESULT_PARTIAL_QUALIFIED && false == *(pRes + rowNum)) ||
MJOIN_ROW_BITMAP_SET(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx)) {
2024-01-03 05:52:36 +00:00
continue;
}
MJOIN_SET_ROW_BITMAP(build->pRowBitmap, buildGrp->rowBitmapOffset, m - buildGrp->beginIdx);
buildGrp->rowMatchNum++;
}
2024-01-03 08:25:45 +00:00
if (buildGrp->rowMatchNum == (buildGrp->endIdx - buildGrp->beginIdx + 1)) {
2024-01-03 05:52:36 +00:00
buildGrp->allRowsMatch = true;
}
}
2025-07-17 06:17:47 +00:00
}
2024-07-29 02:29:40 +00:00
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
2024-01-03 05:52:36 +00:00
2024-07-22 03:06:24 +00:00
_return:
2024-01-03 05:52:36 +00:00
colDataDestroy(p);
taosMemoryFree(p);
2025-07-17 06:17:47 +00:00
2024-01-03 05:52:36 +00:00
return code;
}
2024-01-10 10:56:49 +00:00
int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
SColumnInfoData* p = NULL;
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
if (code != TSDB_CODE_SUCCESS) {
2024-07-22 03:06:24 +00:00
goto _return;
2024-01-10 10:56:49 +00:00
}
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (code != TSDB_CODE_SUCCESS) {
2024-07-22 03:06:24 +00:00
goto _return;
2024-01-10 10:56:49 +00:00
}
2025-07-17 06:17:47 +00:00
2024-01-10 10:56:49 +00:00
if (status == FILTER_RESULT_ALL_QUALIFIED) {
pBlock->info.rows = 1;
2024-07-22 03:06:24 +00:00
MJ_ERR_JRET(mJoinTrimKeepFirstRow(pBlock));
2024-01-10 10:56:49 +00:00
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
pBlock->info.rows = 0;
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
2024-07-22 03:06:24 +00:00
MJ_ERR_JRET(mJoinTrimKeepOneRow(pBlock, pBlock->info.rows, (bool*)p->pData));
2024-01-10 10:56:49 +00:00
}
code = TSDB_CODE_SUCCESS;
2024-07-22 03:06:24 +00:00
_return:
2024-01-10 10:56:49 +00:00
colDataDestroy(p);
taosMemoryFree(p);
2025-07-17 06:17:47 +00:00
2024-01-10 10:56:49 +00:00
return code;
}
int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo) {
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
SColumnInfoData* p = NULL;
int32_t code = filterSetDataFromSlotId(pFilterInfo, &param1);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
2025-07-17 06:17:47 +00:00
2024-01-10 10:56:49 +00:00
if (status == FILTER_RESULT_NONE_QUALIFIED) {
pBlock->info.rows = 0;
}
code = TSDB_CODE_SUCCESS;
_err:
2024-07-22 03:06:24 +00:00
2024-01-10 10:56:49 +00:00
colDataDestroy(p);
taosMemoryFree(p);
2025-07-17 06:17:47 +00:00
2024-01-10 10:56:49 +00:00
return code;
}
2024-01-03 05:52:36 +00:00
2023-12-26 11:28:19 +00:00
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin) {
2024-04-03 01:54:49 +00:00
SSDataBlock* pLess = *ppMid;
SSDataBlock* pMore = *ppFin;
2025-07-17 06:17:47 +00:00
/*
if ((*ppMid)->info.rows < (*ppFin)->info.rows) {
pLess = (*ppMid);
pMore = (*ppFin);
} else {
pLess = (*ppFin);
pMore = (*ppMid);
}
*/
2023-12-26 11:28:19 +00:00
int32_t totalRows = pMore->info.rows + pLess->info.rows;
if (totalRows <= pMore->info.capacity) {
MJ_ERR_RET(blockDataMerge(pMore, pLess));
blockDataCleanup(pLess);
pCtx->midRemains = false;
} else {
int32_t copyRows = pMore->info.capacity - pMore->info.rows;
2024-09-10 08:56:36 +00:00
if (copyRows > 0) {
MJ_ERR_RET(blockDataMergeNRows(pMore, pLess, pLess->info.rows - copyRows, copyRows));
blockDataShrinkNRows(pLess, copyRows);
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
pCtx->midRemains = true;
}
2025-07-17 06:17:47 +00:00
/*
if (pMore != (*ppFin)) {
TSWAP(*ppMid, *ppFin);
}
*/
2023-12-26 11:28:19 +00:00
return TSDB_CODE_SUCCESS;
}
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx) {
TSWAP(pCtx->midBlk, pCtx->finBlk);
pCtx->midRemains = false;
return TSDB_CODE_SUCCESS;
}
2025-07-17 06:17:47 +00:00
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp,
bool probeGrp) {
2023-12-26 11:28:19 +00:00
SMJoinTableCtx* probe = probeGrp ? pJoin->probe : pJoin->build;
SMJoinTableCtx* build = probeGrp ? pJoin->build : pJoin->probe;
2025-07-17 06:17:47 +00:00
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pGrp);
2023-12-26 11:28:19 +00:00
for (int32_t c = 0; c < probe->finNum; ++c) {
2025-07-17 06:17:47 +00:00
SMJoinColMap* pFirstCol = probe->finCols + c;
2023-12-26 11:28:19 +00:00
SColumnInfoData* pInCol = taosArrayGet(pGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
2024-07-22 03:06:24 +00:00
if (NULL == pInCol || NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows, pInCol, pGrp->readIdx, firstRows));
2023-12-26 11:28:19 +00:00
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
for (int32_t c = 0; c < build->finNum; ++c) {
2025-07-17 06:17:47 +00:00
SMJoinColMap* pSecondCol = build->finCols + c;
2023-12-26 11:28:19 +00:00
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
2024-07-22 03:06:24 +00:00
if (NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2023-12-26 11:28:19 +00:00
colDataSetNItemsNull(pOutCol, currRows, firstRows);
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
pRes->info.rows = append ? (pRes->info.rows + firstRows) : firstRows;
return TSDB_CODE_SUCCESS;
}
2024-02-22 10:59:10 +00:00
int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp, bool singleProbeRow) {
2023-12-26 11:28:19 +00:00
pCtx->lastEqGrp = false;
pCtx->lastProbeGrp = probeGrp;
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
if (rowsLeft <= 0) {
pCtx->grpRemains = pGrp->readIdx <= pGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
2024-02-22 10:59:10 +00:00
if (probeGrp && singleProbeRow) {
rowsLeft = 1;
}
2023-12-26 11:28:19 +00:00
if (GRP_REMAIN_ROWS(pGrp) <= rowsLeft) {
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
pGrp->readIdx = pGrp->endIdx + 1;
pCtx->grpRemains = false;
} else {
int32_t endIdx = pGrp->endIdx;
pGrp->endIdx = pGrp->readIdx + rowsLeft - 1;
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
pGrp->readIdx = pGrp->endIdx + 1;
pGrp->endIdx = endIdx;
pCtx->grpRemains = true;
}
return TSDB_CODE_SUCCESS;
}
2025-07-17 06:17:47 +00:00
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst,
SMJoinGrpRows* pSecond) {
2023-12-26 11:28:19 +00:00
SMJoinTableCtx* probe = pJoin->probe;
SMJoinTableCtx* build = pJoin->build;
2025-07-17 06:17:47 +00:00
int32_t currRows = append ? pRes->info.rows : 0;
int32_t firstRows = GRP_REMAIN_ROWS(pFirst);
int32_t secondRows = GRP_REMAIN_ROWS(pSecond);
2023-12-26 11:28:19 +00:00
for (int32_t c = 0; c < probe->finNum; ++c) {
2025-07-17 06:17:47 +00:00
SMJoinColMap* pFirstCol = probe->finCols + c;
2023-12-26 11:28:19 +00:00
SColumnInfoData* pInCol = taosArrayGet(pFirst->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pFirstCol->dstSlot);
2024-07-22 03:06:24 +00:00
if (NULL == pInCol || NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2023-12-26 11:28:19 +00:00
for (int32_t r = 0; r < firstRows; ++r) {
if (colDataIsNull_s(pInCol, pFirst->readIdx + r)) {
colDataSetNItemsNull(pOutCol, currRows + r * secondRows, secondRows);
} else {
2024-08-21 06:41:14 +00:00
if (pRes->info.capacity < (pRes->info.rows + firstRows * secondRows)) {
2025-07-17 06:17:47 +00:00
qError("capacity:%d not enough, rows:%" PRId64 ", firstRows:%d, secondRows:%d", pRes->info.capacity,
pRes->info.rows, firstRows, secondRows);
2024-08-21 06:41:14 +00:00
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2025-07-17 06:17:47 +00:00
uint32_t startOffset = (IS_VAR_DATA_TYPE(pOutCol->info.type))
? pOutCol->varmeta.length
: ((currRows + r * secondRows) * pOutCol->info.bytes);
2024-08-21 06:41:14 +00:00
if ((startOffset + 1 * pOutCol->info.bytes) > pRes->info.capacity * pOutCol->info.bytes) {
2025-07-17 06:17:47 +00:00
qError("col buff not enough, startOffset:%d, bytes:%d, capacity:%d", startOffset, pOutCol->info.bytes,
pRes->info.capacity);
2024-08-21 06:41:14 +00:00
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2025-07-17 06:17:47 +00:00
MJ_ERR_RET(colDataSetNItems(pOutCol, currRows + r * secondRows, colDataGetData(pInCol, pFirst->readIdx + r),
feat(stream): optimize stream logic (#33027) * fix: remove debug log * fix: remove assert * fix: delete unused code * enh: [TD-37251] Support expr in state window. * feat(stream): support expr in state window trigger * enh: [TD-37251] Fix SCL_IS_CONST_CALC condition. * fix(stream): set ver in wal * fix: print code * fix: increase runner replica num * fix: trigger mem error * fix: sliding _tnext_ts value * fix(stream): disable tagFilterCache in stream reader trigger * fix: crash * Revert "fix(stream): fix history calc finish check" This reverts commit f93d17f1d27f011639d22cb0880637dbeb7e5532. * Revert "fix(stream): fix calc request allocation in trigger" This reverts commit c5410f6da09835303d32967f4b6d02a7e47cd589. * fix(stream): fix calc request allocation in trigger * enh: [TD-37251] External window support more placeholder. * fix(stream): modify size of return from 1000000->4096 * enh: [TD-37251] Modify error msg when stream query do not have from clause. * fix(stream): add log for group not found * fix(stream): do not return gid=0 in walMetaData interface * enh: [TD-37251] Fix missing ts column in vtable query. * fix: test case build failed * fix: invalid read issue * fix(stream): add vtable logic * fix(stream): encode error in wal * fix(stream): add vtable logic * fix(stream): add log * fix: diff funcition crash * Revert "Merge branch 'enh/TD-37251-3.0-dropoutput' into enh/TD-37251-3.0" This reverts commit e93cbd6fd42261527df046c70e0ece7568af187b, reversing changes made to dc3230591d4428c817703f52574aa01d5f10e5fe. * Revert "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0" This reverts commit dc3230591d4428c817703f52574aa01d5f10e5fe, reversing changes made to 085e086782896db22acd292816e53497cbecb726. * fix(stream): fix block data len is too large if data type is vchar * fix: drop output table * feat: runner delete output table * process pDropBlock in trigger task. * fix(stream): opti log level * fix(stream): build block for drop table * fix(stream): set gid for normal table * fix(stream): set gid for normal table * feat: Support delete output. * fix(stream): rows error * fix(stream): memory leak * enh: [TD-37251] Fix external window wrong ts column. * fix(stream): fix calc time check in batch mode * fix: merge aligned external window issue * Revert "fix(stream): fix calc time check in batch mode" This reverts commit d895b7f5776cf77c2ce290cdeedca86c206da6ce. * fix(stream): add test case * fix(stream): add insert drop table logic * fix: external window end issue * fix(stream): add test case * fix(stream): fix trigger pull data * fix(stream): fix history calc request * enh: drop table on snode * fix(stream): adjust hash index if data is filtered in wal * fix(stream): rollback * enh: add merge aligned extwin window row idx * fix: drop output table * fix: compile issue * enh: [TD-37251] Add flag to identify interval window is overlapped * fix: overlap * fix(stream): set gid=-1 for initialized * fix(stream): modify log level * fix: trigger slow issue * fix(stream): add basic test for obj pool * fix(stream): fix metadata clear in trigger * fix(stream): fix idle runner allocation in trigger * fix: handle agg output on externalWin * fix: test case * fix(stream): adjust log * fix: reset pCtx pOutput * fix: memory leak * fix: search first win for tsCol * fix(stream): add test case for schema change * fix: mem leak * fix: mem leak * Reapply "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0" This reverts commit b508e66958eb95117b1d086a964b87b5ac59a19e. * fix(stream): fix virtual table data pull * fix(stream): fix set table request * fix(stream): process empty uidlist * fix(stream): fix set table request * fix(stream): fix data new request in trigger * fix(stream): tablelist error for vtable * fix(stream): block ver is null * fix(stream): remove version limition for wal * fix(stream): block rows error * fix(stream): fix pending calc param in batch mode * fix(stream): auto create table * fix(stream): fix stream vtable data merge * fix: _tcurrentts * fix(stream): destroy hash * fix(stream): fix trigger status * fix(stream): colId error in vtable * fix(stream): update nrows of vtable data block * fix(stream): fix trigger status * fix(stream): enable low latency calc for period trigger * fix: test case file path * fix: test case file path * fix: string to node in reader * enh: add test log * fix(stream): increase wait time of non-low-latency mode * fix(stream): fix column capacity in scalar calculation * fix(stream): fix column capacity in trigger expr calculation * fix: get origTableInfos * fix(stream): fix calc data pull in trigger * fix(stream): fix calc data cache write in trigger * enh: [TD-37251] Add flag to identify interval wind * fix: external window memory usage issue * fix(stream): fix epxr result column in trigger * fix(stream): add metaCache for calc plan * fix(stream): fix stream obj list clear * fix(stream): rollback * enh: [TD-37251] Add flag to identify interval wind * fix: mem free * fix(stream): add metaCache for calc plan * fix(stream): fix calc data cache write in trigger * fix(stream): fix calc data cache write in trigger * enh: optimize external result block memory * fix(stream): modify logic of judge table for create table * fix(stream): fix event window check in trigger * fix(stream): fix count window check in trigger * fix(stream): colSize=0 while encoding block because pDataCol->hasNull is false in secode time & reload table list if create table * enh: optimize stream memory * fix(stream): trigger tag error * fix: add log * fix(stream): fix calc data write in trigger * fix: drop output table * fix(stream): fix max delay in trigger * fix: drop output table * fix(stream): fix max delay in trigger * fix(build): handle return value of function * fix(stream): read gid error if it is child table in stream * fix(stream): fix calc param of history calculation * fix(stream): fix recalc of delete data * fix(stream): fix calc data of period trigger * fix(stream): fix cache read check * fix(stream): filter error in calc plan * fix: reset externla window expr * fix(stream): fix calc parm of max delay * fix: time range and case issues * fix(stream): fix crash in trigger * fix: case issues * fix: fix window node mem leak. * fix(stream): fix meta data clear in trigger * fix(stream): table schema is old in TsdbDataRequest for vtable * fix: fix slingding window place holder check condition. * Revert "fix: fix slingding window place holder check condition." This reverts commit ad864a1dc113961b409e32e2ac7d1feb534b74d6. * fix(stream): null pointer error * fix: case issue * fix(stream): calc data error for vtable * fix(stream): fix data sorter in trigger * fix(stream): add log for delete data * fix(stream): fix start version of realtime calculation * fix(stream): fix cache data merger of vtable * fix: case issues * fix(ci): upgrade stream cases in test_cols_function * fix(stream): gid not found if change tag value * fix: fix slingding window place holder check condition. * fix(stream): fix virt table info request in trigger * fix(stream): set gid = uid if stream table type != SUPER table * fix: clean cache data by group * fix: add block info and case issues * fix: fix heap-buffer-overflow. * fix(stream): fix state window with extend param * fix: fix access null pointer. * fix: case issues * fix: case issue * fix(stream): fix ignore_nodata_trigger option for period trigger * fix(stream): fix pseudo col fetch for calc data * fix: Extend checking time to avoid timeout. * fix: case issues * fix(stream): fix group col fetch for virtual tables * fix(stream): tag is NULL for non vtable * fix(stream): fix sliding check of virtual table * test(stream): check stream status after create all streams * fix: add log * fix(stream): fix wal meta truncate when ignore disorder * fix(stream): gid not found for child table * fix: fix place holder condition pushdown error. * fix(stream): suid not equal when delete data for child table * fix: id issue * fix(stream): disable recalc for count trigger * fix: cast result rowSize error in project * fix(stream): add log for tsdb meta * fix(stream): fix gid in tsdb meta request * fix(stream): fix wend of unclosed windows * fix(stream): fix ignore_nodata_trigger option for period trigger * fix: case issues * fix(stream): fix calc data pull for empty interval window * fix: fix ext window condition. * fix(ci): smaBasic performance check affectd by debug level log * fix(stream): add suid when set table list for vtable * fix: forbid using prefilter when using %%trows an trigger table is virtual table. * fix: forbid prefilter %%trows cases. * fix(stream): sort cid in tsdbVirtalDataReq * fix: forbid prefilter %%trows cases. * fix(stream): add log for virtual table tsdb data * fix(stream): get delete msg for vtable * fix: winRowIndex * Revert "fix: winRowIndex" This reverts commit e08b41cf960bb9ca031b9ea20b4495cbbd4e3ed0. * fix(stream): fix data merge in trigger * test(stream): fix case ans * fix(stream): fix empty calc data pull for period trigger * fix(stream): col index error for tsdbVirtalDataReq * fix(stream): pTableList is NULL for vtable * test(stream): fix case ans * fix(stream): fix notification in trigger * fix(stream): memory leak * fix(stream): fix virtual data pull in trigger * test(stream): fix case ans * fix(stream): col index error for tsdbVirtalDataReq * fix(stream): pTableList is NULL for vtable * test(stream): fix case ans * fix(stream): fix notification in trigger * fix(stream): memory leak * fix(stream): fix virtual data pull in trigger * test(stream): fix case ans * fix: case issue * fix: crash issue * fix: forbid prefilter %%trows cases. * fix(stream): session case * fix(stream): fix data pull for virtual tables * fix(stream): add log * fix(stream): fix calc req send in batch mode * fix: fix stream UT * fix(stream): tablelist is null for non vtable * fix: mem leak * fix(stream): fix compile error in trigger * fix: return code issue --------- Co-authored-by: dapan1121 <wpan@taosdata.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-09-25 07:48:14 +00:00
secondRows, 1, true));
2023-12-26 11:28:19 +00:00
}
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
2025-07-17 06:17:47 +00:00
SMJoinColMap* pSecondCol = build->finCols + c;
2023-12-26 11:28:19 +00:00
SColumnInfoData* pInCol = taosArrayGet(pSecond->blk->pDataBlock, pSecondCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pRes->pDataBlock, pSecondCol->dstSlot);
2024-07-22 03:06:24 +00:00
if (NULL == pInCol || NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2023-12-26 11:28:19 +00:00
for (int32_t r = 0; r < firstRows; ++r) {
2024-07-22 03:06:24 +00:00
MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r * secondRows, pInCol, pSecond->readIdx, secondRows));
2023-12-26 11:28:19 +00:00
}
}
pRes->info.rows = append ? (pRes->info.rows + firstRows * secondRows) : firstRows * secondRows;
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
return TSDB_CODE_SUCCESS;
}
2025-07-17 06:17:47 +00:00
int32_t mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe,
SMJoinTableCtx* build, bool* cont) {
2024-07-22 03:06:24 +00:00
if (NULL != cont) {
*cont = false;
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
int32_t rowsLeft = append ? (pBlk->info.capacity - pBlk->info.rows) : pBlk->info.capacity;
if (rowsLeft <= 0) {
2024-07-22 03:06:24 +00:00
return TSDB_CODE_SUCCESS;
2023-12-26 11:28:19 +00:00
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
int32_t buildGrpRows = taosArrayGetSize(build->pHashCurGrp);
int32_t grpRows = buildGrpRows - build->grpRowIdx;
if (grpRows <= 0 || build->grpRowIdx < 0) {
build->grpRowIdx = -1;
2024-07-22 03:06:24 +00:00
if (NULL != cont) {
*cont = true;
}
return TSDB_CODE_SUCCESS;
2023-12-26 11:28:19 +00:00
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
int32_t actRows = TMIN(grpRows, rowsLeft);
int32_t currRows = append ? pBlk->info.rows : 0;
for (int32_t c = 0; c < probe->finNum; ++c) {
2025-07-17 06:17:47 +00:00
SMJoinColMap* pFirstCol = probe->finCols + c;
2023-12-26 11:28:19 +00:00
SColumnInfoData* pInCol = taosArrayGet(probeGrp->blk->pDataBlock, pFirstCol->srcSlot);
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pFirstCol->dstSlot);
2024-07-22 03:06:24 +00:00
if (NULL == pInCol || NULL == pOutCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2023-12-26 11:28:19 +00:00
if (colDataIsNull_s(pInCol, probeGrp->readIdx)) {
colDataSetNItemsNull(pOutCol, currRows, actRows);
} else {
feat(stream): optimize stream logic (#33027) * fix: remove debug log * fix: remove assert * fix: delete unused code * enh: [TD-37251] Support expr in state window. * feat(stream): support expr in state window trigger * enh: [TD-37251] Fix SCL_IS_CONST_CALC condition. * fix(stream): set ver in wal * fix: print code * fix: increase runner replica num * fix: trigger mem error * fix: sliding _tnext_ts value * fix(stream): disable tagFilterCache in stream reader trigger * fix: crash * Revert "fix(stream): fix history calc finish check" This reverts commit f93d17f1d27f011639d22cb0880637dbeb7e5532. * Revert "fix(stream): fix calc request allocation in trigger" This reverts commit c5410f6da09835303d32967f4b6d02a7e47cd589. * fix(stream): fix calc request allocation in trigger * enh: [TD-37251] External window support more placeholder. * fix(stream): modify size of return from 1000000->4096 * enh: [TD-37251] Modify error msg when stream query do not have from clause. * fix(stream): add log for group not found * fix(stream): do not return gid=0 in walMetaData interface * enh: [TD-37251] Fix missing ts column in vtable query. * fix: test case build failed * fix: invalid read issue * fix(stream): add vtable logic * fix(stream): encode error in wal * fix(stream): add vtable logic * fix(stream): add log * fix: diff funcition crash * Revert "Merge branch 'enh/TD-37251-3.0-dropoutput' into enh/TD-37251-3.0" This reverts commit e93cbd6fd42261527df046c70e0ece7568af187b, reversing changes made to dc3230591d4428c817703f52574aa01d5f10e5fe. * Revert "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0" This reverts commit dc3230591d4428c817703f52574aa01d5f10e5fe, reversing changes made to 085e086782896db22acd292816e53497cbecb726. * fix(stream): fix block data len is too large if data type is vchar * fix: drop output table * feat: runner delete output table * process pDropBlock in trigger task. * fix(stream): opti log level * fix(stream): build block for drop table * fix(stream): set gid for normal table * fix(stream): set gid for normal table * feat: Support delete output. * fix(stream): rows error * fix(stream): memory leak * enh: [TD-37251] Fix external window wrong ts column. * fix(stream): fix calc time check in batch mode * fix: merge aligned external window issue * Revert "fix(stream): fix calc time check in batch mode" This reverts commit d895b7f5776cf77c2ce290cdeedca86c206da6ce. * fix(stream): add test case * fix(stream): add insert drop table logic * fix: external window end issue * fix(stream): add test case * fix(stream): fix trigger pull data * fix(stream): fix history calc request * enh: drop table on snode * fix(stream): adjust hash index if data is filtered in wal * fix(stream): rollback * enh: add merge aligned extwin window row idx * fix: drop output table * fix: compile issue * enh: [TD-37251] Add flag to identify interval window is overlapped * fix: overlap * fix(stream): set gid=-1 for initialized * fix(stream): modify log level * fix: trigger slow issue * fix(stream): add basic test for obj pool * fix(stream): fix metadata clear in trigger * fix(stream): fix idle runner allocation in trigger * fix: handle agg output on externalWin * fix: test case * fix(stream): adjust log * fix: reset pCtx pOutput * fix: memory leak * fix: search first win for tsCol * fix(stream): add test case for schema change * fix: mem leak * fix: mem leak * Reapply "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0" This reverts commit b508e66958eb95117b1d086a964b87b5ac59a19e. * fix(stream): fix virtual table data pull * fix(stream): fix set table request * fix(stream): process empty uidlist * fix(stream): fix set table request * fix(stream): fix data new request in trigger * fix(stream): tablelist error for vtable * fix(stream): block ver is null * fix(stream): remove version limition for wal * fix(stream): block rows error * fix(stream): fix pending calc param in batch mode * fix(stream): auto create table * fix(stream): fix stream vtable data merge * fix: _tcurrentts * fix(stream): destroy hash * fix(stream): fix trigger status * fix(stream): colId error in vtable * fix(stream): update nrows of vtable data block * fix(stream): fix trigger status * fix(stream): enable low latency calc for period trigger * fix: test case file path * fix: test case file path * fix: string to node in reader * enh: add test log * fix(stream): increase wait time of non-low-latency mode * fix(stream): fix column capacity in scalar calculation * fix(stream): fix column capacity in trigger expr calculation * fix: get origTableInfos * fix(stream): fix calc data pull in trigger * fix(stream): fix calc data cache write in trigger * enh: [TD-37251] Add flag to identify interval wind * fix: external window memory usage issue * fix(stream): fix epxr result column in trigger * fix(stream): add metaCache for calc plan * fix(stream): fix stream obj list clear * fix(stream): rollback * enh: [TD-37251] Add flag to identify interval wind * fix: mem free * fix(stream): add metaCache for calc plan * fix(stream): fix calc data cache write in trigger * fix(stream): fix calc data cache write in trigger * enh: optimize external result block memory * fix(stream): modify logic of judge table for create table * fix(stream): fix event window check in trigger * fix(stream): fix count window check in trigger * fix(stream): colSize=0 while encoding block because pDataCol->hasNull is false in secode time & reload table list if create table * enh: optimize stream memory * fix(stream): trigger tag error * fix: add log * fix(stream): fix calc data write in trigger * fix: drop output table * fix(stream): fix max delay in trigger * fix: drop output table * fix(stream): fix max delay in trigger * fix(build): handle return value of function * fix(stream): read gid error if it is child table in stream * fix(stream): fix calc param of history calculation * fix(stream): fix recalc of delete data * fix(stream): fix calc data of period trigger * fix(stream): fix cache read check * fix(stream): filter error in calc plan * fix: reset externla window expr * fix(stream): fix calc parm of max delay * fix: time range and case issues * fix(stream): fix crash in trigger * fix: case issues * fix: fix window node mem leak. * fix(stream): fix meta data clear in trigger * fix(stream): table schema is old in TsdbDataRequest for vtable * fix: fix slingding window place holder check condition. * Revert "fix: fix slingding window place holder check condition." This reverts commit ad864a1dc113961b409e32e2ac7d1feb534b74d6. * fix(stream): null pointer error * fix: case issue * fix(stream): calc data error for vtable * fix(stream): fix data sorter in trigger * fix(stream): add log for delete data * fix(stream): fix start version of realtime calculation * fix(stream): fix cache data merger of vtable * fix: case issues * fix(ci): upgrade stream cases in test_cols_function * fix(stream): gid not found if change tag value * fix: fix slingding window place holder check condition. * fix(stream): fix virt table info request in trigger * fix(stream): set gid = uid if stream table type != SUPER table * fix: clean cache data by group * fix: add block info and case issues * fix: fix heap-buffer-overflow. * fix(stream): fix state window with extend param * fix: fix access null pointer. * fix: case issues * fix: case issue * fix(stream): fix ignore_nodata_trigger option for period trigger * fix(stream): fix pseudo col fetch for calc data * fix: Extend checking time to avoid timeout. * fix: case issues * fix(stream): fix group col fetch for virtual tables * fix(stream): tag is NULL for non vtable * fix(stream): fix sliding check of virtual table * test(stream): check stream status after create all streams * fix: add log * fix(stream): fix wal meta truncate when ignore disorder * fix(stream): gid not found for child table * fix: fix place holder condition pushdown error. * fix(stream): suid not equal when delete data for child table * fix: id issue * fix(stream): disable recalc for count trigger * fix: cast result rowSize error in project * fix(stream): add log for tsdb meta * fix(stream): fix gid in tsdb meta request * fix(stream): fix wend of unclosed windows * fix(stream): fix ignore_nodata_trigger option for period trigger * fix: case issues * fix(stream): fix calc data pull for empty interval window * fix: fix ext window condition. * fix(ci): smaBasic performance check affectd by debug level log * fix(stream): add suid when set table list for vtable * fix: forbid using prefilter when using %%trows an trigger table is virtual table. * fix: forbid prefilter %%trows cases. * fix(stream): sort cid in tsdbVirtalDataReq * fix: forbid prefilter %%trows cases. * fix(stream): add log for virtual table tsdb data * fix(stream): get delete msg for vtable * fix: winRowIndex * Revert "fix: winRowIndex" This reverts commit e08b41cf960bb9ca031b9ea20b4495cbbd4e3ed0. * fix(stream): fix data merge in trigger * test(stream): fix case ans * fix(stream): fix empty calc data pull for period trigger * fix(stream): col index error for tsdbVirtalDataReq * fix(stream): pTableList is NULL for vtable * test(stream): fix case ans * fix(stream): fix notification in trigger * fix(stream): memory leak * fix(stream): fix virtual data pull in trigger * test(stream): fix case ans * fix(stream): col index error for tsdbVirtalDataReq * fix(stream): pTableList is NULL for vtable * test(stream): fix case ans * fix(stream): fix notification in trigger * fix(stream): memory leak * fix(stream): fix virtual data pull in trigger * test(stream): fix case ans * fix: case issue * fix: crash issue * fix: forbid prefilter %%trows cases. * fix(stream): session case * fix(stream): fix data pull for virtual tables * fix(stream): add log * fix(stream): fix calc req send in batch mode * fix: fix stream UT * fix(stream): tablelist is null for non vtable * fix: mem leak * fix(stream): fix compile error in trigger * fix: return code issue --------- Co-authored-by: dapan1121 <wpan@taosdata.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-09-25 07:48:14 +00:00
MJ_ERR_RET(colDataSetNItems(pOutCol, currRows, colDataGetData(pInCol, probeGrp->readIdx), actRows, 1, true));
2023-12-26 11:28:19 +00:00
}
}
for (int32_t c = 0; c < build->finNum; ++c) {
2025-07-17 06:17:47 +00:00
SMJoinColMap* pSecondCol = build->finCols + c;
2023-12-26 11:28:19 +00:00
SColumnInfoData* pOutCol = taosArrayGet(pBlk->pDataBlock, pSecondCol->dstSlot);
for (int32_t r = 0; r < actRows; ++r) {
SMJoinRowPos* pRow = taosArrayGet(build->pHashCurGrp, build->grpRowIdx + r);
2024-07-22 03:06:24 +00:00
if (NULL == pRow) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2023-12-26 11:28:19 +00:00
SColumnInfoData* pInCol = taosArrayGet(pRow->pBlk->pDataBlock, pSecondCol->srcSlot);
2024-07-22 03:06:24 +00:00
if (NULL == pInCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
MJ_ERR_RET(colDataAssignNRows(pOutCol, currRows + r, pInCol, pRow->pos, 1));
2023-12-26 11:28:19 +00:00
}
}
pBlk->info.rows += actRows;
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
if (actRows == grpRows) {
build->grpRowIdx = -1;
} else {
build->grpRowIdx += actRows;
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
if (actRows == rowsLeft) {
2024-07-22 03:06:24 +00:00
return TSDB_CODE_SUCCESS;
2023-12-26 11:28:19 +00:00
}
2024-07-22 03:06:24 +00:00
if (NULL != cont) {
*cont = true;
}
2025-07-17 06:17:47 +00:00
2024-07-22 03:06:24 +00:00
return TSDB_CODE_SUCCESS;
2023-12-26 11:28:19 +00:00
}
2025-07-17 06:17:47 +00:00
int32_t mJoinAllocGrpRowBitmap(SMJoinTableCtx* pTb) {
2024-01-03 08:25:45 +00:00
int32_t grpNum = taosArrayGetSize(pTb->eqGrps);
2024-01-03 05:52:36 +00:00
for (int32_t i = 0; i < grpNum; ++i) {
SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGet(pTb->eqGrps, i);
2024-07-22 03:06:24 +00:00
if (NULL == pGrp) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2024-01-03 05:52:36 +00:00
MJ_ERR_RET(mJoinGetRowBitmapOffset(pTb, pGrp->endIdx - pGrp->beginIdx + 1, &pGrp->rowBitmapOffset));
pGrp->rowMatchNum = 0;
}
return TSDB_CODE_SUCCESS;
}
2023-12-26 11:28:19 +00:00
int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp) {
SMJoinOperatorInfo* pJoin = pCtx->pJoin;
pCtx->lastEqGrp = true;
2024-07-22 03:06:24 +00:00
MJ_ERR_RET(mJoinBuildEqGroups(pJoin->probe, timestamp, NULL, true));
2023-12-26 11:28:19 +00:00
if (!lastBuildGrp) {
2024-07-22 03:06:24 +00:00
MJ_ERR_RET(mJoinRetrieveEqGrpRows(pJoin, pJoin->build, timestamp));
2023-12-26 11:28:19 +00:00
} else {
pJoin->build->grpIdx = 0;
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
if (pCtx->hashCan && REACH_HJOIN_THRESHOLD(pJoin->probe, pJoin->build)) {
if (!lastBuildGrp || !pCtx->hashJoin) {
2024-01-03 05:52:36 +00:00
if (pJoin->build->rowBitmapSize > 0) {
MJ_ERR_RET(mJoinCreateFullBuildTbHash(pJoin, pJoin->build));
} else {
MJ_ERR_RET(mJoinCreateBuildTbHash(pJoin, pJoin->build));
}
2023-12-26 11:28:19 +00:00
}
if (pJoin->probe->newBlk) {
MJ_ERR_RET(mJoinSetKeyColsData(pJoin->probe->blk, pJoin->probe));
pJoin->probe->newBlk = false;
}
2025-07-17 06:17:47 +00:00
pCtx->hashJoin = true;
2023-12-26 11:28:19 +00:00
return (*pCtx->hashCartFp)(pCtx);
}
2025-07-17 06:17:47 +00:00
pCtx->hashJoin = false;
2024-01-03 05:52:36 +00:00
2024-01-05 06:40:05 +00:00
if (!lastBuildGrp && pJoin->build->rowBitmapSize > 0) {
2024-07-22 03:06:24 +00:00
MJ_ERR_RET(mJoinAllocGrpRowBitmap(pJoin->build));
2024-01-03 05:52:36 +00:00
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
return (*pCtx->mergeCartFp)(pCtx);
}
2025-07-17 06:17:47 +00:00
int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs,
int64_t* buildTs) {
2024-01-05 06:40:05 +00:00
pCtx->probeNEqGrp.blk = pTb->blk;
pCtx->probeNEqGrp.beginIdx = pTb->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pCtx->probeNEqGrp.beginIdx;
2025-07-17 06:17:47 +00:00
2024-01-05 06:40:05 +00:00
while (++pTb->blkRowIdx < pTb->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pCol, *probeTs, pTb);
2024-03-08 08:23:41 +00:00
if (PROBE_TS_NMATCH(pCtx->ascTs, *probeTs, *buildTs)) {
2024-01-05 06:40:05 +00:00
pCtx->probeNEqGrp.endIdx = pTb->blkRowIdx;
continue;
}
2025-07-17 06:17:47 +00:00
2024-01-05 06:40:05 +00:00
break;
2023-12-26 11:28:19 +00:00
}
2025-07-17 06:17:47 +00:00
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
2024-01-05 06:40:05 +00:00
}
2025-07-17 06:17:47 +00:00
int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs,
int64_t* buildTs) {
2024-01-05 06:40:05 +00:00
pCtx->buildNEqGrp.blk = pTb->blk;
pCtx->buildNEqGrp.beginIdx = pTb->blkRowIdx;
pCtx->buildNEqGrp.readIdx = pCtx->buildNEqGrp.beginIdx;
pCtx->buildNEqGrp.endIdx = pCtx->buildNEqGrp.beginIdx;
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
while (++pTb->blkRowIdx < pTb->blk->info.rows) {
2024-01-05 06:40:05 +00:00
MJOIN_GET_TB_CUR_TS(pCol, *buildTs, pTb);
2024-03-08 08:23:41 +00:00
if (PROBE_TS_NREACH(pCtx->ascTs, *probeTs, *buildTs)) {
2024-01-05 06:40:05 +00:00
pCtx->buildNEqGrp.endIdx = pTb->blkRowIdx;
2023-12-26 11:28:19 +00:00
continue;
}
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
break;
}
2025-07-17 06:17:47 +00:00
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false);
2023-12-26 11:28:19 +00:00
}
2023-12-01 10:33:50 +00:00
SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream) {
2023-09-01 05:24:47 +00:00
SOperatorInfo** p = taosMemoryMalloc(2 * POINTER_BYTES);
if (p) {
p[0] = pDownstream[0];
p[1] = pDownstream[0];
}
return p;
}
2025-07-17 06:17:47 +00:00
int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo*** pDownstream, int32_t* numOfDownstream,
bool* newDownstreams) {
2023-12-01 10:33:50 +00:00
if (1 == *numOfDownstream) {
*newDownstreams = true;
2024-03-06 03:18:57 +00:00
*pDownstream = mJoinBuildDownstreams(pInfo, *pDownstream);
if (NULL == *pDownstream) {
2024-07-22 03:06:24 +00:00
return terrno;
2023-09-01 05:24:47 +00:00
}
2023-12-01 10:33:50 +00:00
*numOfDownstream = 2;
2023-09-01 05:24:47 +00:00
}
2023-12-01 10:33:50 +00:00
return TSDB_CODE_SUCCESS;
2022-05-14 16:04:51 +00:00
}
2022-07-27 06:27:59 +00:00
2023-12-14 06:13:01 +00:00
static int32_t mJoinInitPrimKeyInfo(SMJoinTableCtx* pTable, int32_t slotId) {
2023-12-01 10:33:50 +00:00
pTable->primCol = taosMemoryMalloc(sizeof(SMJoinColInfo));
if (NULL == pTable->primCol) {
2024-07-22 03:06:24 +00:00
return terrno;
2022-07-27 08:11:32 +00:00
}
2023-12-01 10:33:50 +00:00
pTable->primCol->srcSlot = slotId;
2023-12-01 10:33:50 +00:00
return TSDB_CODE_SUCCESS;
2022-07-27 08:11:32 +00:00
}
2022-05-14 16:04:51 +00:00
2023-12-14 09:06:19 +00:00
static int32_t mJoinInitColsInfo(int32_t* colNum, int64_t* rowSize, SMJoinColInfo** pCols, SNodeList* pList) {
*colNum = LIST_LENGTH(pList);
2025-07-17 06:17:47 +00:00
2023-12-14 09:06:19 +00:00
*pCols = taosMemoryMalloc((*colNum) * sizeof(SMJoinColInfo));
if (NULL == *pCols) {
2024-07-22 03:06:24 +00:00
return terrno;
2023-12-01 10:33:50 +00:00
}
2023-12-14 09:06:19 +00:00
*rowSize = 0;
2025-07-17 06:17:47 +00:00
2023-12-01 10:33:50 +00:00
int32_t i = 0;
2025-07-17 06:17:47 +00:00
SNode* pNode = NULL;
2023-12-01 10:33:50 +00:00
FOREACH(pNode, pList) {
SColumnNode* pColNode = (SColumnNode*)pNode;
2023-12-14 09:06:19 +00:00
(*pCols)[i].srcSlot = pColNode->slotId;
2024-03-25 07:19:16 +00:00
(*pCols)[i].jsonData = TSDB_DATA_TYPE_JSON == pColNode->node.resType.type;
2023-12-14 09:06:19 +00:00
(*pCols)[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
(*pCols)[i].bytes = pColNode->node.resType.bytes;
*rowSize += pColNode->node.resType.bytes;
2023-12-01 10:33:50 +00:00
++i;
2025-07-17 06:17:47 +00:00
}
2023-12-01 10:33:50 +00:00
2023-12-14 09:06:19 +00:00
return TSDB_CODE_SUCCESS;
}
2024-01-05 09:16:51 +00:00
static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bool allocKeyBuf) {
2023-12-14 09:06:19 +00:00
int64_t rowSize = 0;
MJ_ERR_RET(mJoinInitColsInfo(&pTable->keyNum, &rowSize, &pTable->keyCols, pList));
2024-01-05 09:16:51 +00:00
if (pTable->keyNum > 1 || allocKeyBuf) {
if (rowSize > 1) {
pTable->keyNullSize = 1;
} else {
pTable->keyNullSize = 2;
}
pTable->keyBuf = taosMemoryMalloc(TMAX(rowSize, pTable->keyNullSize));
2023-12-01 10:33:50 +00:00
if (NULL == pTable->keyBuf) {
2024-07-22 03:06:24 +00:00
return terrno;
2023-05-18 03:23:50 +00:00
}
}
2023-05-16 08:25:55 +00:00
2023-12-01 10:33:50 +00:00
return TSDB_CODE_SUCCESS;
2023-05-16 08:25:55 +00:00
}
2024-01-12 10:29:27 +00:00
static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) {
pTable->finCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap));
if (NULL == pTable->finCols) {
2024-07-22 03:06:24 +00:00
return terrno;
2023-12-14 09:06:19 +00:00
}
2025-07-17 06:17:47 +00:00
2023-12-14 09:06:19 +00:00
int32_t i = 0;
2025-07-17 06:17:47 +00:00
SNode* pNode = NULL;
2023-12-14 09:06:19 +00:00
FOREACH(pNode, pList) {
STargetNode* pTarget = (STargetNode*)pNode;
SColumnNode* pColumn = (SColumnNode*)pTarget->pExpr;
2024-01-12 10:29:27 +00:00
if (pColumn->dataBlockId == pTable->blkId) {
pTable->finCols[i].srcSlot = pColumn->slotId;
pTable->finCols[i].dstSlot = pTarget->slotId;
pTable->finCols[i].bytes = pColumn->node.resType.bytes;
pTable->finCols[i].vardata = IS_VAR_DATA_TYPE(pColumn->node.resType.type);
2023-12-14 09:06:19 +00:00
++i;
}
2025-07-17 06:17:47 +00:00
}
2023-12-14 09:06:19 +00:00
2024-01-12 10:29:27 +00:00
pTable->finNum = i;
2023-12-14 09:06:19 +00:00
return TSDB_CODE_SUCCESS;
}
2025-02-26 08:30:36 +00:00
static int32_t mJoinInitFuncPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
2024-02-29 09:52:53 +00:00
SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr;
if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
2024-07-22 03:06:24 +00:00
if (NULL == pUnit) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SValueNode* pCurrTz = NULL;
2025-07-17 06:17:47 +00:00
if (5 == pFunc->pParameterList->length) {
2024-07-22 03:06:24 +00:00
pCurrTz = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2);
if (NULL == pCurrTz) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
}
2025-07-17 06:17:47 +00:00
SValueNode* pTimeZone = (5 == pFunc->pParameterList->length)
? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4)
: (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3);
2024-07-22 03:06:24 +00:00
if (NULL == pTimeZone) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
2024-02-29 09:52:53 +00:00
pCtx->truncateUnit = pUnit->typeData;
2025-07-17 06:17:47 +00:00
if ((NULL == pCurrTz || 1 == pCurrTz->typeData) &&
pCtx->truncateUnit >= (86400 * TSDB_TICK_PER_SECOND(pFunc->node.resType.precision))) {
pCtx->timezoneUnit =
offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision));
2024-02-29 09:52:53 +00:00
}
qDebug("%s literal:%s, pCurrTz:%p", __func__, varDataVal(pTimeZone->datum.p), pCurrTz);
2025-02-26 08:30:36 +00:00
pCtx->type = E_PRIM_TIMETRUNCATE;
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitValPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
SValueNode* pVal = (SValueNode*)pTarget->pExpr;
if (TSDB_DATA_TYPE_TIMESTAMP != pVal->node.resType.type) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
pCtx->constTs = pVal->datum.i;
pCtx->type = E_PRIM_VALUE;
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) {
if (NULL == pNode) {
pCtx->targetSlotId = pTable->primCol->srcSlot;
return TSDB_CODE_SUCCESS;
}
2025-07-17 06:17:47 +00:00
2025-02-26 08:30:36 +00:00
if (QUERY_NODE_TARGET != nodeType(pNode)) {
qError("primary expr node is not target, type:%d", nodeType(pNode));
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
2025-07-17 06:17:47 +00:00
}
2025-02-26 08:30:36 +00:00
STargetNode* pTarget = (STargetNode*)pNode;
if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr) && QUERY_NODE_VALUE != nodeType(pTarget->pExpr)) {
qError("Invalid primary expr node type:%d", nodeType(pTarget->pExpr));
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (QUERY_NODE_FUNCTION == nodeType(pTarget->pExpr)) {
MJ_ERR_RET(mJoinInitFuncPrimExprCtx(pCtx, pTarget));
} else if (QUERY_NODE_VALUE == nodeType(pTarget->pExpr)) {
MJ_ERR_RET(mJoinInitValPrimExprCtx(pCtx, pTarget));
}
2024-02-29 09:52:53 +00:00
pCtx->targetSlotId = pTarget->slotId;
return TSDB_CODE_SUCCESS;
}
2025-07-17 06:17:47 +00:00
static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode,
SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat, bool sameDs) {
2023-12-14 06:13:01 +00:00
SMJoinTableCtx* pTable = &pJoin->tbs[idx];
2023-12-01 10:33:50 +00:00
pTable->downStream = pDownstream[idx];
2024-03-06 03:18:57 +00:00
pTable->blkId = getOperatorResultBlockId(pDownstream[idx], sameDs ? idx : 0);
2023-12-14 09:06:19 +00:00
MJ_ERR_RET(mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId));
2025-07-17 06:17:47 +00:00
MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight,
JOIN_TYPE_FULL == pJoin->joinType));
2024-01-12 10:29:27 +00:00
MJ_ERR_RET(mJoinInitFinColsInfo(pTable, pJoinNode->pTargets));
2023-12-14 09:06:19 +00:00
2024-07-22 03:06:24 +00:00
TAOS_MEMCPY(&pTable->inputStat, pStat, sizeof(*pStat));
2023-05-16 08:25:55 +00:00
2023-12-13 10:54:16 +00:00
pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
2024-07-22 03:06:24 +00:00
if (NULL == pTable->eqGrps) {
return terrno;
}
2025-07-17 06:17:47 +00:00
2023-12-13 10:54:16 +00:00
if (E_JOIN_TB_BUILD == pTable->type) {
pTable->createdBlks = taosArrayInit(8, POINTER_BYTES);
2024-07-22 03:06:24 +00:00
if (NULL == pTable->createdBlks) {
return terrno;
}
2023-12-13 10:54:16 +00:00
pTable->pGrpArrays = taosArrayInit(32, POINTER_BYTES);
2024-07-22 03:06:24 +00:00
if (NULL == pTable->pGrpArrays) {
return terrno;
}
2023-12-22 11:25:55 +00:00
pTable->pGrpHash = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
2024-07-22 03:06:24 +00:00
if (NULL == pTable->pGrpHash) {
return terrno;
2023-12-13 10:54:16 +00:00
}
2023-12-26 11:28:19 +00:00
2024-01-03 05:52:36 +00:00
if (pJoin->pFPreFilter && IS_FULL_OUTER_JOIN(pJoin->joinType, pJoin->subType)) {
2023-12-26 11:28:19 +00:00
pTable->rowBitmapSize = MJOIN_ROW_BITMAP_SIZE;
pTable->pRowBitmap = taosMemoryMalloc(pTable->rowBitmapSize);
if (NULL == pTable->pRowBitmap) {
2024-07-22 03:06:24 +00:00
return terrno;
2023-12-26 11:28:19 +00:00
}
}
2024-01-10 10:56:49 +00:00
pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter);
2025-07-17 06:17:47 +00:00
pTable->multiEqGrpRows =
!((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter);
pTable->multiRowsGrp =
!((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter);
2024-01-19 10:03:16 +00:00
if (JOIN_STYPE_ASOF == pJoinNode->subType) {
2025-07-17 06:17:47 +00:00
pTable->eqRowLimit = (pJoinNode->pJLimit && ((SLimitNode*)pJoinNode->pJLimit)->limit)
? ((SLimitNode*)pJoinNode->pJLimit)->limit->datum.i
: 1;
2024-01-18 08:50:47 +00:00
}
2024-01-10 10:56:49 +00:00
} else {
pTable->multiEqGrpRows = true;
2023-12-13 10:54:16 +00:00
}
2024-02-29 09:52:53 +00:00
MJ_ERR_RET(mJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable));
2025-07-17 06:17:47 +00:00
return TSDB_CODE_SUCCESS;
}
2023-12-01 10:33:50 +00:00
static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoinPhysiNode* pJoinNode) {
int32_t buildIdx = 0;
int32_t probeIdx = 1;
2023-12-01 10:33:50 +00:00
pInfo->joinType = pJoinNode->joinType;
pInfo->subType = pJoinNode->subType;
2025-07-17 06:17:47 +00:00
2023-12-01 10:33:50 +00:00
switch (pInfo->joinType) {
case JOIN_TYPE_INNER:
case JOIN_TYPE_FULL:
2024-01-05 06:40:05 +00:00
buildIdx = 1;
probeIdx = 0;
2023-12-01 10:33:50 +00:00
break;
case JOIN_TYPE_LEFT:
buildIdx = 1;
probeIdx = 0;
break;
case JOIN_TYPE_RIGHT:
buildIdx = 0;
probeIdx = 1;
break;
default:
break;
2025-07-17 06:17:47 +00:00
}
2023-12-08 11:25:05 +00:00
pInfo->build = &pInfo->tbs[buildIdx];
pInfo->probe = &pInfo->tbs[probeIdx];
2025-07-17 06:17:47 +00:00
2023-12-08 11:25:05 +00:00
pInfo->build->downStreamIdx = buildIdx;
pInfo->probe->downStreamIdx = probeIdx;
2023-12-12 11:31:12 +00:00
2024-02-29 09:52:53 +00:00
if (0 == buildIdx) {
pInfo->build->primExpr = pJoinNode->leftPrimExpr;
pInfo->probe->primExpr = pJoinNode->rightPrimExpr;
} else {
pInfo->build->primExpr = pJoinNode->rightPrimExpr;
pInfo->probe->primExpr = pJoinNode->leftPrimExpr;
}
2025-07-17 06:17:47 +00:00
2023-12-12 11:31:12 +00:00
pInfo->build->type = E_JOIN_TB_BUILD;
pInfo->probe->type = E_JOIN_TB_PROBE;
2023-12-01 10:33:50 +00:00
}
2022-05-14 16:04:51 +00:00
2024-02-29 09:52:53 +00:00
int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
if (NULL == pTable->primExpr) {
return TSDB_CODE_SUCCESS;
}
SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
2024-07-22 03:06:24 +00:00
if (NULL == pPrimOut) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
2025-02-26 08:30:36 +00:00
SMJoinPrimExprCtx* pCtx = &pTable->primCtx;
switch (pCtx->type) {
case E_PRIM_TIMETRUNCATE: {
SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
if (NULL == pPrimIn) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (0 != pCtx->timezoneUnit) {
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
2025-07-17 06:17:47 +00:00
((int64_t*)pPrimOut->pData)[i] =
((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit;
2025-02-26 08:30:36 +00:00
}
} else {
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit;
}
}
break;
2024-02-29 09:52:53 +00:00
}
2025-02-26 08:30:36 +00:00
case E_PRIM_VALUE: {
feat(stream): optimize stream logic (#33027) * fix: remove debug log * fix: remove assert * fix: delete unused code * enh: [TD-37251] Support expr in state window. * feat(stream): support expr in state window trigger * enh: [TD-37251] Fix SCL_IS_CONST_CALC condition. * fix(stream): set ver in wal * fix: print code * fix: increase runner replica num * fix: trigger mem error * fix: sliding _tnext_ts value * fix(stream): disable tagFilterCache in stream reader trigger * fix: crash * Revert "fix(stream): fix history calc finish check" This reverts commit f93d17f1d27f011639d22cb0880637dbeb7e5532. * Revert "fix(stream): fix calc request allocation in trigger" This reverts commit c5410f6da09835303d32967f4b6d02a7e47cd589. * fix(stream): fix calc request allocation in trigger * enh: [TD-37251] External window support more placeholder. * fix(stream): modify size of return from 1000000->4096 * enh: [TD-37251] Modify error msg when stream query do not have from clause. * fix(stream): add log for group not found * fix(stream): do not return gid=0 in walMetaData interface * enh: [TD-37251] Fix missing ts column in vtable query. * fix: test case build failed * fix: invalid read issue * fix(stream): add vtable logic * fix(stream): encode error in wal * fix(stream): add vtable logic * fix(stream): add log * fix: diff funcition crash * Revert "Merge branch 'enh/TD-37251-3.0-dropoutput' into enh/TD-37251-3.0" This reverts commit e93cbd6fd42261527df046c70e0ece7568af187b, reversing changes made to dc3230591d4428c817703f52574aa01d5f10e5fe. * Revert "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0" This reverts commit dc3230591d4428c817703f52574aa01d5f10e5fe, reversing changes made to 085e086782896db22acd292816e53497cbecb726. * fix(stream): fix block data len is too large if data type is vchar * fix: drop output table * feat: runner delete output table * process pDropBlock in trigger task. * fix(stream): opti log level * fix(stream): build block for drop table * fix(stream): set gid for normal table * fix(stream): set gid for normal table * feat: Support delete output. * fix(stream): rows error * fix(stream): memory leak * enh: [TD-37251] Fix external window wrong ts column. * fix(stream): fix calc time check in batch mode * fix: merge aligned external window issue * Revert "fix(stream): fix calc time check in batch mode" This reverts commit d895b7f5776cf77c2ce290cdeedca86c206da6ce. * fix(stream): add test case * fix(stream): add insert drop table logic * fix: external window end issue * fix(stream): add test case * fix(stream): fix trigger pull data * fix(stream): fix history calc request * enh: drop table on snode * fix(stream): adjust hash index if data is filtered in wal * fix(stream): rollback * enh: add merge aligned extwin window row idx * fix: drop output table * fix: compile issue * enh: [TD-37251] Add flag to identify interval window is overlapped * fix: overlap * fix(stream): set gid=-1 for initialized * fix(stream): modify log level * fix: trigger slow issue * fix(stream): add basic test for obj pool * fix(stream): fix metadata clear in trigger * fix(stream): fix idle runner allocation in trigger * fix: handle agg output on externalWin * fix: test case * fix(stream): adjust log * fix: reset pCtx pOutput * fix: memory leak * fix: search first win for tsCol * fix(stream): add test case for schema change * fix: mem leak * fix: mem leak * Reapply "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0" This reverts commit b508e66958eb95117b1d086a964b87b5ac59a19e. * fix(stream): fix virtual table data pull * fix(stream): fix set table request * fix(stream): process empty uidlist * fix(stream): fix set table request * fix(stream): fix data new request in trigger * fix(stream): tablelist error for vtable * fix(stream): block ver is null * fix(stream): remove version limition for wal * fix(stream): block rows error * fix(stream): fix pending calc param in batch mode * fix(stream): auto create table * fix(stream): fix stream vtable data merge * fix: _tcurrentts * fix(stream): destroy hash * fix(stream): fix trigger status * fix(stream): colId error in vtable * fix(stream): update nrows of vtable data block * fix(stream): fix trigger status * fix(stream): enable low latency calc for period trigger * fix: test case file path * fix: test case file path * fix: string to node in reader * enh: add test log * fix(stream): increase wait time of non-low-latency mode * fix(stream): fix column capacity in scalar calculation * fix(stream): fix column capacity in trigger expr calculation * fix: get origTableInfos * fix(stream): fix calc data pull in trigger * fix(stream): fix calc data cache write in trigger * enh: [TD-37251] Add flag to identify interval wind * fix: external window memory usage issue * fix(stream): fix epxr result column in trigger * fix(stream): add metaCache for calc plan * fix(stream): fix stream obj list clear * fix(stream): rollback * enh: [TD-37251] Add flag to identify interval wind * fix: mem free * fix(stream): add metaCache for calc plan * fix(stream): fix calc data cache write in trigger * fix(stream): fix calc data cache write in trigger * enh: optimize external result block memory * fix(stream): modify logic of judge table for create table * fix(stream): fix event window check in trigger * fix(stream): fix count window check in trigger * fix(stream): colSize=0 while encoding block because pDataCol->hasNull is false in secode time & reload table list if create table * enh: optimize stream memory * fix(stream): trigger tag error * fix: add log * fix(stream): fix calc data write in trigger * fix: drop output table * fix(stream): fix max delay in trigger * fix: drop output table * fix(stream): fix max delay in trigger * fix(build): handle return value of function * fix(stream): read gid error if it is child table in stream * fix(stream): fix calc param of history calculation * fix(stream): fix recalc of delete data * fix(stream): fix calc data of period trigger * fix(stream): fix cache read check * fix(stream): filter error in calc plan * fix: reset externla window expr * fix(stream): fix calc parm of max delay * fix: time range and case issues * fix(stream): fix crash in trigger * fix: case issues * fix: fix window node mem leak. * fix(stream): fix meta data clear in trigger * fix(stream): table schema is old in TsdbDataRequest for vtable * fix: fix slingding window place holder check condition. * Revert "fix: fix slingding window place holder check condition." This reverts commit ad864a1dc113961b409e32e2ac7d1feb534b74d6. * fix(stream): null pointer error * fix: case issue * fix(stream): calc data error for vtable * fix(stream): fix data sorter in trigger * fix(stream): add log for delete data * fix(stream): fix start version of realtime calculation * fix(stream): fix cache data merger of vtable * fix: case issues * fix(ci): upgrade stream cases in test_cols_function * fix(stream): gid not found if change tag value * fix: fix slingding window place holder check condition. * fix(stream): fix virt table info request in trigger * fix(stream): set gid = uid if stream table type != SUPER table * fix: clean cache data by group * fix: add block info and case issues * fix: fix heap-buffer-overflow. * fix(stream): fix state window with extend param * fix: fix access null pointer. * fix: case issues * fix: case issue * fix(stream): fix ignore_nodata_trigger option for period trigger * fix(stream): fix pseudo col fetch for calc data * fix: Extend checking time to avoid timeout. * fix: case issues * fix(stream): fix group col fetch for virtual tables * fix(stream): tag is NULL for non vtable * fix(stream): fix sliding check of virtual table * test(stream): check stream status after create all streams * fix: add log * fix(stream): fix wal meta truncate when ignore disorder * fix(stream): gid not found for child table * fix: fix place holder condition pushdown error. * fix(stream): suid not equal when delete data for child table * fix: id issue * fix(stream): disable recalc for count trigger * fix: cast result rowSize error in project * fix(stream): add log for tsdb meta * fix(stream): fix gid in tsdb meta request * fix(stream): fix wend of unclosed windows * fix(stream): fix ignore_nodata_trigger option for period trigger * fix: case issues * fix(stream): fix calc data pull for empty interval window * fix: fix ext window condition. * fix(ci): smaBasic performance check affectd by debug level log * fix(stream): add suid when set table list for vtable * fix: forbid using prefilter when using %%trows an trigger table is virtual table. * fix: forbid prefilter %%trows cases. * fix(stream): sort cid in tsdbVirtalDataReq * fix: forbid prefilter %%trows cases. * fix(stream): add log for virtual table tsdb data * fix(stream): get delete msg for vtable * fix: winRowIndex * Revert "fix: winRowIndex" This reverts commit e08b41cf960bb9ca031b9ea20b4495cbbd4e3ed0. * fix(stream): fix data merge in trigger * test(stream): fix case ans * fix(stream): fix empty calc data pull for period trigger * fix(stream): col index error for tsdbVirtalDataReq * fix(stream): pTableList is NULL for vtable * test(stream): fix case ans * fix(stream): fix notification in trigger * fix(stream): memory leak * fix(stream): fix virtual data pull in trigger * test(stream): fix case ans * fix(stream): col index error for tsdbVirtalDataReq * fix(stream): pTableList is NULL for vtable * test(stream): fix case ans * fix(stream): fix notification in trigger * fix(stream): memory leak * fix(stream): fix virtual data pull in trigger * test(stream): fix case ans * fix: case issue * fix: crash issue * fix: forbid prefilter %%trows cases. * fix(stream): session case * fix(stream): fix data pull for virtual tables * fix(stream): add log * fix(stream): fix calc req send in batch mode * fix: fix stream UT * fix(stream): tablelist is null for non vtable * fix: mem leak * fix(stream): fix compile error in trigger * fix: return code issue --------- Co-authored-by: dapan1121 <wpan@taosdata.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-09-25 07:48:14 +00:00
MJ_ERR_RET(colDataSetNItems(pPrimOut, 0, (char*)&pCtx->constTs, pBlock->info.rows, 1, false));
2025-02-26 08:30:36 +00:00
break;
2024-02-29 09:52:53 +00:00
}
2025-02-26 08:30:36 +00:00
default:
break;
2024-02-29 09:52:53 +00:00
}
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinGetExternalWinIdx(SMJoinTableCtx* pTable, int32_t rowIdx) {
if (NULL == pTable->pBlkWinIdx || taosArrayGetSize(pTable->pBlkWinIdx) <= 0) {
return -1;
}
int32_t size = taosArrayGetSize(pTable->pBlkWinIdx);
for (int32_t i = size - 1; i >= 0; --i) {
int64_t* pIdx = taosArrayGet(pTable->pBlkWinIdx, i);
if (NULL == pIdx) {
continue;
}
int32_t* pPair = (int32_t*)pIdx;
if (rowIdx >= pPair[1]) {
return pPair[0];
}
}
return -1;
}
static void mJoinSetExternalWinIdxFromPeer(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
SExecTaskInfo* pTaskInfo = pJoin->pOperator->pTaskInfo;
if (NULL == pTaskInfo->pStreamRuntimeInfo || !pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow) {
return;
}
SMJoinTableCtx* pPeer = (pTable == pJoin->build) ? pJoin->probe : pJoin->build;
if (NULL == pPeer || NULL == pPeer->blk || pPeer->blkRowIdx >= pPeer->blk->info.rows) {
return;
}
int32_t winIdx = mJoinGetExternalWinIdx(pPeer, pPeer->blkRowIdx);
if (winIdx >= 0) {
pTaskInfo->pStreamRuntimeInfo->funcInfo.curIdx = winIdx;
}
}
static int32_t mJoinSaveExternalWinIdx(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
SExecTaskInfo* pTaskInfo = pJoin->pOperator->pTaskInfo;
if (NULL == pTaskInfo->pStreamRuntimeInfo || !pTaskInfo->pStreamRuntimeInfo->funcInfo.withExternalWindow) {
if (pTable->pBlkWinIdx) {
taosArrayClear(pTable->pBlkWinIdx);
}
return TSDB_CODE_SUCCESS;
}
SArray* pInputWinIdx = pTaskInfo->pStreamRuntimeInfo->funcInfo.pStreamBlkWinIdx;
int32_t size = (NULL == pInputWinIdx) ? 0 : taosArrayGetSize(pInputWinIdx);
if (NULL == pTable->pBlkWinIdx) {
pTable->pBlkWinIdx = taosArrayInit(TMAX(size, 1), sizeof(int64_t));
if (NULL == pTable->pBlkWinIdx) {
return terrno;
}
} else {
taosArrayClear(pTable->pBlkWinIdx);
}
if (size > 0 && NULL == taosArrayAddBatch(pTable->pBlkWinIdx, TARRAY_DATA(pInputWinIdx), size)) {
return terrno;
}
return TSDB_CODE_SUCCESS;
}
2024-02-27 10:18:25 +00:00
SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
2024-02-29 09:52:53 +00:00
SSDataBlock* pTmp = NULL;
2025-07-17 06:17:47 +00:00
int32_t code = TSDB_CODE_SUCCESS;
int32_t dsIdx = pTable->downStreamIdx;
2024-02-27 10:18:25 +00:00
if (E_JOIN_TB_PROBE == pTable->type) {
if (pTable->remainInBlk) {
2024-02-29 09:52:53 +00:00
pTmp = pTable->remainInBlk;
2024-02-27 10:18:25 +00:00
pTable->remainInBlk = NULL;
(*pJoin->grpResetFp)(pJoin);
pTable->lastInGid = pTmp->info.id.groupId;
2024-02-29 09:52:53 +00:00
goto _return;
2024-02-27 10:18:25 +00:00
}
if (pTable->dsFetchDone) {
return NULL;
}
mJoinSetExternalWinIdxFromPeer(pJoin, pTable);
2024-02-29 09:52:53 +00:00
pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
2024-02-27 10:18:25 +00:00
if (NULL == pTmp) {
pTable->dsFetchDone = true;
return NULL;
}
code = mJoinSaveExternalWinIdx(pJoin, pTable);
if (code) {
pJoin->errCode = code;
T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
}
2025-07-17 06:17:47 +00:00
2024-02-27 10:18:25 +00:00
if (0 == pTable->lastInGid) {
pTable->lastInGid = pTmp->info.id.groupId;
2024-02-29 09:52:53 +00:00
goto _return;
2024-02-27 10:18:25 +00:00
}
if (pTable->lastInGid == pTmp->info.id.groupId) {
2024-02-29 09:52:53 +00:00
goto _return;
2024-02-27 10:18:25 +00:00
}
pTable->remainInBlk = pTmp;
return NULL;
}
SMJoinTableCtx* pProbe = pJoin->probe;
while (true) {
if (pTable->remainInBlk) {
if (pTable->remainInBlk->info.id.groupId == pProbe->lastInGid) {
2024-02-29 09:52:53 +00:00
pTmp = pTable->remainInBlk;
2024-02-27 10:18:25 +00:00
pTable->remainInBlk = NULL;
pTable->lastInGid = pTmp->info.id.groupId;
2024-02-29 09:52:53 +00:00
goto _return;
2024-02-27 10:18:25 +00:00
}
if (pTable->remainInBlk->info.id.groupId > pProbe->lastInGid) {
return NULL;
}
pTable->remainInBlk = NULL;
}
if (pTable->dsFetchDone) {
return NULL;
}
mJoinSetExternalWinIdxFromPeer(pJoin, pTable);
2024-02-27 10:18:25 +00:00
SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
if (NULL == pTmp) {
pTable->dsFetchDone = true;
return NULL;
}
code = mJoinSaveExternalWinIdx(pJoin, pTable);
if (code) {
pJoin->errCode = code;
T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
}
2024-02-27 10:18:25 +00:00
pTable->remainInBlk = pTmp;
}
2024-02-29 09:52:53 +00:00
_return:
2024-07-22 03:06:24 +00:00
code = mJoinLaunchPrimExpr(pTmp, pTable);
if (code) {
pJoin->errCode = code;
T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
}
2025-07-17 06:17:47 +00:00
2024-02-29 09:52:53 +00:00
return pTmp;
2024-02-27 10:18:25 +00:00
}
static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
if (pTable->dsFetchDone) {
return NULL;
}
2025-07-17 06:17:47 +00:00
mJoinSetExternalWinIdxFromPeer(pJoin, pTable);
2024-02-27 10:18:25 +00:00
SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTable->downStreamIdx);
if (NULL == pTmp) {
pTable->dsFetchDone = true;
2024-02-29 09:52:53 +00:00
} else {
2024-07-22 03:06:24 +00:00
int32_t code = mJoinLaunchPrimExpr(pTmp, pTable);
if (code) {
pJoin->errCode = code;
T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
}
code = mJoinSaveExternalWinIdx(pJoin, pTable);
if (code) {
pJoin->errCode = code;
T_LONG_JMP(pJoin->pOperator->pTaskInfo->env, pJoin->errCode);
}
2024-02-27 10:18:25 +00:00
}
return pTmp;
}
2023-12-13 10:54:16 +00:00
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
2024-02-27 10:18:25 +00:00
pJoin->ctx.mergeCtx.groupJoin = pJoinNode->grpJoin;
2025-07-17 06:17:47 +00:00
pJoin->ctx.mergeCtx.limit = (pJoinNode->node.pLimit && ((SLimitNode*)pJoinNode->node.pLimit)->limit)
? ((SLimitNode*)pJoinNode->node.pLimit)->limit->datum.i
: INT64_MAX;
2024-02-27 10:18:25 +00:00
pJoin->retrieveFp = pJoinNode->grpJoin ? mJoinGrpRetrieveImpl : mJoinRetrieveImpl;
2024-03-28 05:45:55 +00:00
pJoin->outBlkId = pJoinNode->node.pOutputDataBlockDesc->dataBlockId;
2025-07-17 06:17:47 +00:00
if ((JOIN_STYPE_ASOF == pJoin->subType &&
(ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) ||
(JOIN_STYPE_WIN == pJoin->subType)) {
2024-03-06 03:18:57 +00:00
pJoin->ctx.mergeCtxInUse = false;
2024-01-18 08:50:47 +00:00
return mJoinInitWindowCtx(pJoin, pJoinNode);
2023-12-07 11:22:46 +00:00
}
2024-03-06 03:18:57 +00:00
2025-07-17 06:17:47 +00:00
pJoin->ctx.mergeCtxInUse = true;
2023-12-13 10:54:16 +00:00
return mJoinInitMergeCtx(pJoin, pJoinNode);
2023-12-05 11:27:57 +00:00
}
2022-05-14 16:04:51 +00:00
2024-01-19 10:03:16 +00:00
static void mJoinDestroyCtx(SMJoinOperatorInfo* pJoin) {
if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) {
return mJoinDestroyWindowCtx(pJoin);
}
2025-07-17 06:17:47 +00:00
2024-01-19 10:03:16 +00:00
return mJoinDestroyMergeCtx(pJoin);
}
2025-07-17 06:17:47 +00:00
bool mJoinIsDone(SOperatorInfo* pOperator) { return (OP_EXEC_DONE == pOperator->status); }
2024-02-27 10:18:25 +00:00
2023-12-22 11:25:55 +00:00
void mJoinSetDone(SOperatorInfo* pOperator) {
2023-12-06 11:22:14 +00:00
setOperatorCompleted(pOperator);
if (pOperator->pDownstreamGetParams) {
freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM);
freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM);
pOperator->pDownstreamGetParams[0] = NULL;
pOperator->pDownstreamGetParams[1] = NULL;
2023-12-05 11:27:57 +00:00
}
}
2024-02-27 10:18:25 +00:00
bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb) {
2023-12-13 10:54:16 +00:00
if (NULL == (*ppBlk) || *pIdx >= (*ppBlk)->info.rows) {
2024-02-27 10:18:25 +00:00
(*ppBlk) = (*pJoin->retrieveFp)(pJoin, pTb);
2023-12-14 06:13:01 +00:00
pTb->dsInitDone = true;
2023-12-05 11:27:57 +00:00
2025-07-17 06:17:47 +00:00
qDebug("%s merge join %s table got %" PRId64 " rows block", GET_TASKID(pJoin->pOperator->pTaskInfo),
MJOIN_TBTYPE(pTb->type), (*ppBlk) ? (*ppBlk)->info.rows : 0);
2023-12-08 11:25:05 +00:00
*pIdx = 0;
2024-02-27 10:18:25 +00:00
if (NULL != (*ppBlk)) {
2023-12-22 11:25:55 +00:00
pTb->newBlk = true;
2023-12-08 11:25:05 +00:00
}
2025-07-17 06:17:47 +00:00
2023-12-08 11:25:05 +00:00
return ((*ppBlk) == NULL) ? false : true;
2023-12-06 11:22:14 +00:00
}
2023-12-08 11:25:05 +00:00
return true;
}
2023-12-06 11:22:14 +00:00
2023-12-14 06:13:01 +00:00
static void mJoinDestroyCreatedBlks(SArray* pCreatedBlks) {
int32_t blkNum = taosArrayGetSize(pCreatedBlks);
for (int32_t i = 0; i < blkNum; ++i) {
2024-07-22 03:06:24 +00:00
(void)blockDataDestroy(*(SSDataBlock**)TARRAY_GET_ELEM(pCreatedBlks, i));
2023-12-08 11:25:05 +00:00
}
2023-12-14 06:13:01 +00:00
taosArrayClear(pCreatedBlks);
2023-12-08 11:25:05 +00:00
}
2023-12-08 07:27:22 +00:00
2025-07-17 06:17:47 +00:00
int32_t mJoinGetRowBitmapOffset(SMJoinTableCtx* pTable, int32_t rowNum, int32_t* rowBitmapOffset) {
2023-12-26 11:28:19 +00:00
int32_t bitmapLen = BitmapLen(rowNum);
int64_t reqSize = pTable->rowBitmapOffset + bitmapLen;
if (reqSize > pTable->rowBitmapSize) {
int64_t newSize = reqSize * 1.1;
pTable->pRowBitmap = taosMemoryRealloc(pTable->pRowBitmap, newSize);
if (NULL == pTable->pRowBitmap) {
2024-07-22 03:06:24 +00:00
return terrno;
2023-12-26 11:28:19 +00:00
}
pTable->rowBitmapSize = newSize;
}
2024-07-22 03:06:24 +00:00
TAOS_MEMSET(pTable->pRowBitmap + pTable->rowBitmapOffset, 0xFFFFFFFF, bitmapLen);
2025-07-17 06:17:47 +00:00
2023-12-26 11:28:19 +00:00
*rowBitmapOffset = pTable->rowBitmapOffset;
pTable->rowBitmapOffset += bitmapLen;
return TSDB_CODE_SUCCESS;
}
2024-01-03 05:52:36 +00:00
void mJoinResetForBuildTable(SMJoinTableCtx* pTable) {
pTable->grpTotalRows = 0;
pTable->grpIdx = 0;
2024-01-18 08:50:47 +00:00
pTable->eqRowNum = 0;
2024-01-03 05:52:36 +00:00
mJoinDestroyCreatedBlks(pTable->createdBlks);
taosArrayClear(pTable->eqGrps);
if (pTable->rowBitmapSize > 0) {
pTable->rowBitmapOffset = 1;
2024-07-22 03:06:24 +00:00
TAOS_MEMSET(&pTable->nMatchCtx, 0, sizeof(pTable->nMatchCtx));
2024-01-03 05:52:36 +00:00
}
}
2023-12-26 11:28:19 +00:00
int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
2024-02-29 09:52:53 +00:00
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
2024-07-22 03:06:24 +00:00
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2025-07-17 06:17:47 +00:00
2023-12-08 11:25:05 +00:00
SMJoinGrpRows* pGrp = NULL;
2025-07-17 06:17:47 +00:00
int32_t code = TSDB_CODE_SUCCESS;
2023-12-07 11:22:46 +00:00
2024-01-10 10:56:49 +00:00
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
2023-12-26 11:28:19 +00:00
return TSDB_CODE_SUCCESS;
2023-12-22 11:25:55 +00:00
}
2023-12-08 11:25:05 +00:00
if (restart) {
2024-01-03 05:52:36 +00:00
mJoinResetForBuildTable(pTable);
2023-12-08 11:25:05 +00:00
}
2023-12-06 11:22:14 +00:00
2024-01-18 08:50:47 +00:00
bool keepGrp = true;
2023-12-22 11:25:55 +00:00
pGrp = taosArrayReserve(pTable->eqGrps, 1);
2024-07-22 03:06:24 +00:00
if (NULL == pGrp) {
MJ_ERR_RET(terrno);
}
2025-07-17 06:17:47 +00:00
2023-12-12 11:31:12 +00:00
pGrp->beginIdx = pTable->blkRowIdx++;
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pGrp->beginIdx;
pGrp->readMatch = false;
2023-12-08 11:25:05 +00:00
pGrp->blk = pTable->blk;
2024-01-10 10:56:49 +00:00
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
if (timestamp == *(int64_t*)pEndVal) {
if (pTable->multiEqGrpRows) {
pGrp->endIdx = pTable->blk->info.rows - 1;
} else {
pGrp->endIdx = pGrp->beginIdx;
2023-12-08 11:25:05 +00:00
}
2025-07-17 06:17:47 +00:00
2024-01-10 10:56:49 +00:00
pTable->blkRowIdx = pTable->blk->info.rows;
} else {
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) {
pGrp->endIdx++;
continue;
}
2023-12-12 11:31:12 +00:00
2024-01-10 10:56:49 +00:00
if (!pTable->multiEqGrpRows) {
pGrp->endIdx = pGrp->beginIdx;
2024-01-18 08:50:47 +00:00
} else if (0 == pTable->eqRowLimit) {
// DO NOTHING
} else if (pTable->eqRowLimit == pTable->eqRowNum) {
keepGrp = false;
} else {
int64_t rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
pTable->eqRowNum += rowNum;
2024-01-10 10:56:49 +00:00
}
2025-07-17 06:17:47 +00:00
2024-01-10 10:56:49 +00:00
goto _return;
}
2023-12-08 11:25:05 +00:00
}
2023-12-06 11:22:14 +00:00
2024-01-10 10:56:49 +00:00
if (wholeBlk && (pTable->multiEqGrpRows || restart)) {
2023-12-13 10:54:16 +00:00
*wholeBlk = true;
2025-07-17 06:17:47 +00:00
2024-01-18 08:50:47 +00:00
if (pTable->noKeepEqGrpRows || !keepGrp) {
2024-01-10 10:56:49 +00:00
goto _return;
}
2025-07-17 06:17:47 +00:00
2024-01-18 08:50:47 +00:00
if (0 == pGrp->beginIdx && pTable->multiEqGrpRows && 0 == pTable->eqRowLimit) {
2024-07-27 10:55:34 +00:00
pGrp->blk = NULL;
code = createOneDataBlock(pTable->blk, true, &pGrp->blk);
if (code) {
MJ_ERR_RET(code);
2024-07-19 10:27:49 +00:00
}
2024-07-27 10:55:34 +00:00
2024-07-19 10:27:49 +00:00
if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
2024-07-22 03:06:24 +00:00
MJ_ERR_RET(terrno);
2024-07-19 10:27:49 +00:00
}
2023-12-08 11:25:05 +00:00
} else {
2024-01-10 10:56:49 +00:00
if (!pTable->multiEqGrpRows) {
pGrp->endIdx = pGrp->beginIdx;
}
2024-01-18 08:50:47 +00:00
int64_t rowNum = 0;
if (!pTable->multiEqGrpRows) {
rowNum = 1;
pGrp->endIdx = pGrp->beginIdx;
} else if (0 == pTable->eqRowLimit) {
rowNum = pGrp->endIdx - pGrp->beginIdx + 1;
} else if (pTable->eqRowLimit == pTable->eqRowNum) {
keepGrp = false;
} else {
rowNum = TMIN(pGrp->endIdx - pGrp->beginIdx + 1, pTable->eqRowLimit - pTable->eqRowNum);
pGrp->endIdx = pGrp->beginIdx + rowNum - 1;
}
if (keepGrp && rowNum > 0) {
pTable->eqRowNum += rowNum;
2024-07-28 06:29:56 +00:00
code = blockDataExtractBlock(pTable->blk, pGrp->beginIdx, rowNum, &pGrp->blk);
if (code) {
MJ_ERR_RET(code);
2024-07-22 03:06:24 +00:00
}
2024-01-18 08:50:47 +00:00
pGrp->endIdx -= pGrp->beginIdx;
pGrp->beginIdx = 0;
pGrp->readIdx = 0;
2024-07-22 03:06:24 +00:00
if (NULL == taosArrayPush(pTable->createdBlks, &pGrp->blk)) {
MJ_ERR_RET(terrno);
}
2024-01-18 08:50:47 +00:00
}
2023-12-08 11:25:05 +00:00
}
}
2023-12-12 11:31:12 +00:00
2023-12-26 11:28:19 +00:00
_return:
2024-01-18 08:50:47 +00:00
if (pTable->noKeepEqGrpRows || !keepGrp || (!pTable->multiEqGrpRows && !restart)) {
2024-09-12 06:17:14 +00:00
if (NULL == taosArrayPop(pTable->eqGrps)) {
code = terrno;
}
2024-01-10 10:56:49 +00:00
} else {
2025-07-17 06:17:47 +00:00
pTable->grpTotalRows += pGrp->endIdx - pGrp->beginIdx + 1;
2024-01-10 10:56:49 +00:00
}
2025-07-17 06:17:47 +00:00
2024-07-22 03:06:24 +00:00
return code;
2023-12-08 11:25:05 +00:00
}
2023-12-08 07:27:22 +00:00
2024-02-27 10:18:25 +00:00
int32_t mJoinRetrieveEqGrpRows(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable, int64_t timestamp) {
2023-12-13 10:54:16 +00:00
bool wholeBlk = false;
2025-07-17 06:17:47 +00:00
2024-07-22 03:06:24 +00:00
MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true));
2025-07-17 06:17:47 +00:00
2024-02-01 06:58:08 +00:00
while (wholeBlk && !pTable->dsFetchDone) {
2024-02-27 10:18:25 +00:00
pTable->blk = (*pJoin->retrieveFp)(pJoin, pTable);
2025-07-17 06:17:47 +00:00
qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pJoin->pOperator->pTaskInfo),
MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
2023-12-08 07:27:22 +00:00
2023-12-12 11:31:12 +00:00
pTable->blkRowIdx = 0;
2023-12-08 07:27:22 +00:00
2023-12-08 11:25:05 +00:00
if (NULL == pTable->blk) {
2023-12-08 07:27:22 +00:00
break;
2023-12-06 11:22:14 +00:00
}
2023-12-08 11:25:05 +00:00
2023-12-13 10:54:16 +00:00
wholeBlk = false;
2024-07-22 03:06:24 +00:00
MJ_ERR_RET(mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, false));
2023-12-06 11:22:14 +00:00
}
2023-12-08 11:25:05 +00:00
2023-12-13 10:54:16 +00:00
return TSDB_CODE_SUCCESS;
2023-12-08 07:27:22 +00:00
}
2023-12-06 11:22:14 +00:00
2023-12-22 11:25:55 +00:00
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
2023-12-12 11:31:12 +00:00
for (int32_t i = 0; i < pTable->keyNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pTable->keyCols[i].srcSlot);
2024-07-22 03:06:24 +00:00
if (NULL == pCol) {
MJ_ERR_RET(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
}
2025-07-17 06:17:47 +00:00
if (pTable->keyCols[i].vardata != IS_VAR_DATA_TYPE(pCol->info.type)) {
qError("column type mismatch, idx:%d, slotId:%d, type:%d, vardata:%d", i, pTable->keyCols[i].srcSlot,
pCol->info.type, pTable->keyCols[i].vardata);
2023-12-12 11:31:12 +00:00
return TSDB_CODE_INVALID_PARA;
}
2025-07-17 06:17:47 +00:00
if (pTable->keyCols[i].bytes != pCol->info.bytes) {
qError("column bytes mismatch, idx:%d, slotId:%d, bytes:%d, %d", i, pTable->keyCols[i].srcSlot, pCol->info.bytes,
pTable->keyCols[i].bytes);
2023-12-12 11:31:12 +00:00
return TSDB_CODE_INVALID_PARA;
}
pTable->keyCols[i].data = pCol->pData;
if (pTable->keyCols[i].vardata) {
pTable->keyCols[i].offset = pCol->varmeta.offset;
}
pTable->keyCols[i].colData = pCol;
}
2023-12-08 11:25:05 +00:00
2023-12-12 11:31:12 +00:00
return TSDB_CODE_SUCCESS;
}
2025-07-17 06:17:47 +00:00
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t* pBufLen) {
char* pData = NULL;
2023-12-12 11:31:12 +00:00
size_t bufLen = 0;
2025-07-17 06:17:47 +00:00
2023-12-12 11:31:12 +00:00
if (1 == pTable->keyNum) {
if (colDataIsNull_s(pTable->keyCols[0].colData, rowIdx)) {
return true;
}
2024-03-25 07:19:16 +00:00
if (pTable->keyCols[0].jsonData) {
pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
bufLen = getJsonValueLen(pData);
} else if (pTable->keyCols[0].vardata) {
2023-12-12 11:31:12 +00:00
pData = pTable->keyCols[0].data + pTable->keyCols[0].offset[rowIdx];
bufLen = varDataTLen(pData);
} else {
pData = pTable->keyCols[0].data + pTable->keyCols[0].bytes * rowIdx;
bufLen = pTable->keyCols[0].bytes;
2023-12-08 11:25:05 +00:00
}
2023-12-12 11:31:12 +00:00
pTable->keyData = pData;
} else {
for (int32_t i = 0; i < pTable->keyNum; ++i) {
if (colDataIsNull_s(pTable->keyCols[i].colData, rowIdx)) {
return true;
}
2024-03-25 07:19:16 +00:00
if (pTable->keyCols[0].jsonData) {
pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
2024-07-22 03:06:24 +00:00
TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, getJsonValueLen(pData));
2024-03-25 07:19:16 +00:00
bufLen += getJsonValueLen(pData);
} else if (pTable->keyCols[i].vardata) {
2023-12-12 11:31:12 +00:00
pData = pTable->keyCols[i].data + pTable->keyCols[i].offset[rowIdx];
2024-07-22 03:06:24 +00:00
TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, varDataTLen(pData));
2023-12-12 11:31:12 +00:00
bufLen += varDataTLen(pData);
} else {
pData = pTable->keyCols[i].data + pTable->keyCols[i].bytes * rowIdx;
2024-07-22 03:06:24 +00:00
TAOS_MEMCPY(pTable->keyBuf + bufLen, pData, pTable->keyCols[i].bytes);
2023-12-12 11:31:12 +00:00
bufLen += pTable->keyCols[i].bytes;
}
}
pTable->keyData = pTable->keyBuf;
2023-12-08 11:25:05 +00:00
}
2023-12-06 11:22:14 +00:00
2023-12-12 11:31:12 +00:00
if (pBufLen) {
*pBufLen = bufLen;
}
2023-12-08 11:25:05 +00:00
2023-12-12 11:31:12 +00:00
return false;
}
2023-12-14 06:13:01 +00:00
static int32_t mJoinGetAvailableGrpArray(SMJoinTableCtx* pTable, SArray** ppRes) {
2023-12-12 11:31:12 +00:00
do {
if (pTable->grpArrayIdx < taosArrayGetSize(pTable->pGrpArrays)) {
*ppRes = taosArrayGetP(pTable->pGrpArrays, pTable->grpArrayIdx++);
2024-07-22 03:06:24 +00:00
if (NULL == *ppRes) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
2023-12-13 10:54:16 +00:00
taosArrayClear(*ppRes);
2023-12-12 11:31:12 +00:00
return TSDB_CODE_SUCCESS;
}
SArray* pNew = taosArrayInit(4, sizeof(SMJoinRowPos));
if (NULL == pNew) {
2024-07-22 03:06:24 +00:00
return terrno;
}
if (NULL == taosArrayPush(pTable->pGrpArrays, &pNew)) {
return terrno;
2023-12-12 11:31:12 +00:00
}
} while (true);
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinAddRowToHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
2023-12-14 06:13:01 +00:00
SMJoinTableCtx* pBuild = pJoin->build;
2025-07-17 06:17:47 +00:00
SMJoinRowPos pos = {pBlock, rowIdx};
SArray** pGrpRows = tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
2023-12-12 11:31:12 +00:00
if (!pGrpRows) {
SArray* pNewGrp = NULL;
2023-12-13 10:54:16 +00:00
MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp));
2024-07-22 03:06:24 +00:00
if (NULL == taosArrayPush(pNewGrp, &pos)) {
return terrno;
}
MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, POINTER_BYTES));
2024-01-10 10:56:49 +00:00
} else if (pBuild->multiRowsGrp) {
2024-07-22 03:06:24 +00:00
if (NULL == taosArrayPush(*pGrpRows, &pos)) {
return terrno;
}
2023-12-12 11:31:12 +00:00
}
return TSDB_CODE_SUCCESS;
}
2024-01-03 05:52:36 +00:00
static int32_t mJoinAddRowToFullHash(SMJoinOperatorInfo* pJoin, size_t keyLen, SSDataBlock* pBlock, int32_t rowIdx) {
2025-07-17 06:17:47 +00:00
SMJoinTableCtx* pBuild = pJoin->build;
SMJoinRowPos pos = {pBlock, rowIdx};
2024-01-03 05:52:36 +00:00
SMJoinHashGrpRows* pGrpRows = (SMJoinHashGrpRows*)tSimpleHashGet(pBuild->pGrpHash, pBuild->keyData, keyLen);
if (!pGrpRows) {
SMJoinHashGrpRows pNewGrp = {0};
MJ_ERR_RET(mJoinGetAvailableGrpArray(pBuild, &pNewGrp.pRows));
2024-07-22 03:06:24 +00:00
if (NULL == taosArrayPush(pNewGrp.pRows, &pos)) {
return terrno;
}
MJ_ERR_RET(tSimpleHashPut(pBuild->pGrpHash, pBuild->keyData, keyLen, &pNewGrp, sizeof(pNewGrp)));
2024-01-03 05:52:36 +00:00
} else {
2024-07-22 03:06:24 +00:00
if (NULL == taosArrayPush(pGrpRows->pRows, &pos)) {
return terrno;
}
2024-01-03 05:52:36 +00:00
}
return TSDB_CODE_SUCCESS;
}
int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
size_t bufLen = 0;
tSimpleHashClear(pJoin->build->pGrpHash);
pJoin->build->grpArrayIdx = 0;
pJoin->build->grpRowIdx = -1;
2025-07-17 06:17:47 +00:00
2024-01-03 05:52:36 +00:00
int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
for (int32_t g = 0; g < grpNum; ++g) {
SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
2024-07-22 03:06:24 +00:00
if (NULL == pGrp) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
2024-01-03 05:52:36 +00:00
MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
for (int32_t r = 0; r < grpRows; ++r) {
if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) {
2025-07-17 06:17:47 +00:00
*(int16_t*)pTable->keyBuf = 0;
2024-01-05 09:16:51 +00:00
pTable->keyData = pTable->keyBuf;
bufLen = pTable->keyNullSize;
2024-01-03 05:52:36 +00:00
}
MJ_ERR_RET(mJoinAddRowToFullHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r));
}
}
return TSDB_CODE_SUCCESS;
}
int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
2023-12-13 10:54:16 +00:00
size_t bufLen = 0;
tSimpleHashClear(pJoin->build->pGrpHash);
pJoin->build->grpArrayIdx = 0;
2023-12-22 11:25:55 +00:00
pJoin->build->grpRowIdx = -1;
2025-07-17 06:17:47 +00:00
2023-12-12 11:31:12 +00:00
int32_t grpNum = taosArrayGetSize(pTable->eqGrps);
for (int32_t g = 0; g < grpNum; ++g) {
SMJoinGrpRows* pGrp = taosArrayGet(pTable->eqGrps, g);
2024-07-22 03:06:24 +00:00
if (NULL == pGrp) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
2025-07-17 06:17:47 +00:00
2023-12-13 10:54:16 +00:00
MJ_ERR_RET(mJoinSetKeyColsData(pGrp->blk, pTable));
2023-12-12 11:31:12 +00:00
int32_t grpRows = GRP_REMAIN_ROWS(pGrp);
for (int32_t r = 0; r < grpRows; ++r) {
2023-12-22 11:25:55 +00:00
if (mJoinCopyKeyColsDataToBuf(pTable, pGrp->beginIdx + r, &bufLen)) {
2023-12-12 11:31:12 +00:00
continue;
}
2023-12-13 10:54:16 +00:00
MJ_ERR_RET(mJoinAddRowToHash(pJoin, bufLen, pGrp->blk, pGrp->beginIdx + r));
2023-12-12 11:31:12 +00:00
}
}
2023-12-13 10:54:16 +00:00
return TSDB_CODE_SUCCESS;
2023-12-12 11:31:12 +00:00
}
2024-02-27 10:18:25 +00:00
void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx) {
pCtx->blk = NULL;
pCtx->blkRowIdx = 0;
pCtx->newBlk = false;
if (pCtx->pBlkWinIdx) {
taosArrayClear(pCtx->pBlkWinIdx);
}
2024-02-27 10:18:25 +00:00
mJoinDestroyCreatedBlks(pCtx->createdBlks);
tSimpleHashClear(pCtx->pGrpHash);
}
2023-12-14 06:13:01 +00:00
void mJoinResetTableCtx(SMJoinTableCtx* pCtx) {
pCtx->dsInitDone = false;
pCtx->dsFetchDone = false;
2024-02-27 10:18:25 +00:00
pCtx->lastInGid = 0;
pCtx->remainInBlk = NULL;
2023-12-14 06:13:01 +00:00
2024-02-27 10:18:25 +00:00
mJoinResetGroupTableCtx(pCtx);
2023-12-14 06:13:01 +00:00
}
void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) {
pCtx->grpRemains = false;
pCtx->midRemains = false;
pCtx->lastEqGrp = false;
pCtx->lastEqTs = INT64_MIN;
pCtx->hashJoin = false;
}
2024-03-19 09:05:44 +00:00
void mWinJoinResetWindowCache(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache) {
2024-03-06 03:18:57 +00:00
pCache->outRowIdx = 0;
pCache->rowNum = 0;
pCache->grpIdx = 0;
if (pCache->grpsQueue) {
TSWAP(pCache->grps, pCache->grpsQueue);
}
2024-03-19 09:05:44 +00:00
int32_t grpNum = taosArrayGetSize(pCache->grps);
for (int32_t i = 0; i < grpNum; ++i) {
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
2024-07-22 03:06:24 +00:00
if (NULL == pGrp) {
continue;
}
2024-03-19 09:05:44 +00:00
if (pGrp->blk != pCtx->cache.outBlk && pGrp->clonedBlk) {
2024-07-22 03:06:24 +00:00
(void)blockDataDestroy(pGrp->blk);
2024-03-19 09:05:44 +00:00
}
}
2025-07-17 06:17:47 +00:00
2024-03-06 03:18:57 +00:00
taosArrayClear(pCache->grps);
2025-07-17 06:17:47 +00:00
2024-03-06 03:18:57 +00:00
if (pCache->outBlk) {
blockDataCleanup(pCache->outBlk);
}
}
void mJoinResetWindowCtx(SMJoinWindowCtx* pCtx) {
pCtx->grpRemains = false;
pCtx->lastEqGrp = false;
pCtx->lastProbeGrp = false;
pCtx->eqPostDone = false;
pCtx->lastTs = INT64_MIN;
2025-07-17 06:17:47 +00:00
2024-03-19 09:05:44 +00:00
mWinJoinResetWindowCache(pCtx, &pCtx->cache);
2024-03-06 03:18:57 +00:00
}
2023-12-14 06:13:01 +00:00
void mJoinResetCtx(SMJoinOperatorInfo* pJoin) {
2024-03-06 03:18:57 +00:00
if (pJoin->ctx.mergeCtxInUse) {
mJoinResetMergeCtx(&pJoin->ctx.mergeCtx);
} else {
mJoinResetWindowCtx(&pJoin->ctx.windowCtx);
}
2023-12-14 06:13:01 +00:00
}
void mJoinResetOperator(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
2023-12-08 11:25:05 +00:00
2023-12-14 06:13:01 +00:00
mJoinResetTableCtx(pJoin->build);
mJoinResetTableCtx(pJoin->probe);
mJoinResetCtx(pJoin);
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
pJoin->errCode = 0;
pJoin->execInfo = (SMJoinExecInfo){0};
2023-12-14 06:13:01 +00:00
pOperator->status = OP_OPENED;
}
2023-12-08 11:25:05 +00:00
2024-08-27 09:04:44 +00:00
int32_t mJoinMainProcess(struct SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
2023-12-05 11:27:57 +00:00
SMJoinOperatorInfo* pJoin = pOperator->info;
2025-07-17 06:17:47 +00:00
int32_t code = TSDB_CODE_SUCCESS;
2023-09-01 05:24:47 +00:00
if (pOperator->status == OP_EXEC_DONE) {
2025-07-17 06:17:47 +00:00
if (NULL == pOperator->pDownstreamGetParams || NULL == pOperator->pDownstreamGetParams[0] ||
NULL == pOperator->pDownstreamGetParams[1]) {
2023-12-14 06:13:01 +00:00
qDebug("%s merge join done", GET_TASKID(pOperator->pTaskInfo));
2024-08-27 09:04:44 +00:00
return code;
2023-09-01 05:24:47 +00:00
} else {
2023-12-14 06:13:01 +00:00
mJoinResetOperator(pOperator);
2023-12-01 10:33:50 +00:00
qDebug("%s start new round merge join", GET_TASKID(pOperator->pTaskInfo));
2023-09-01 05:24:47 +00:00
}
}
2023-12-05 11:27:57 +00:00
SSDataBlock* pBlock = NULL;
while (true) {
2024-01-05 06:40:05 +00:00
pBlock = (*pJoin->joinFp)(pOperator);
2023-12-05 11:27:57 +00:00
if (NULL == pBlock) {
2023-12-13 10:54:16 +00:00
if (pJoin->errCode) {
T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
}
break;
}
2024-03-28 05:45:55 +00:00
pBlock->info.id.blockId = pJoin->outBlkId;
2023-12-05 11:27:57 +00:00
if (pJoin->pFinFilter != NULL) {
feat(stream): optimize stream logic (#33027) * fix: remove debug log * fix: remove assert * fix: delete unused code * enh: [TD-37251] Support expr in state window. * feat(stream): support expr in state window trigger * enh: [TD-37251] Fix SCL_IS_CONST_CALC condition. * fix(stream): set ver in wal * fix: print code * fix: increase runner replica num * fix: trigger mem error * fix: sliding _tnext_ts value * fix(stream): disable tagFilterCache in stream reader trigger * fix: crash * Revert "fix(stream): fix history calc finish check" This reverts commit f93d17f1d27f011639d22cb0880637dbeb7e5532. * Revert "fix(stream): fix calc request allocation in trigger" This reverts commit c5410f6da09835303d32967f4b6d02a7e47cd589. * fix(stream): fix calc request allocation in trigger * enh: [TD-37251] External window support more placeholder. * fix(stream): modify size of return from 1000000->4096 * enh: [TD-37251] Modify error msg when stream query do not have from clause. * fix(stream): add log for group not found * fix(stream): do not return gid=0 in walMetaData interface * enh: [TD-37251] Fix missing ts column in vtable query. * fix: test case build failed * fix: invalid read issue * fix(stream): add vtable logic * fix(stream): encode error in wal * fix(stream): add vtable logic * fix(stream): add log * fix: diff funcition crash * Revert "Merge branch 'enh/TD-37251-3.0-dropoutput' into enh/TD-37251-3.0" This reverts commit e93cbd6fd42261527df046c70e0ece7568af187b, reversing changes made to dc3230591d4428c817703f52574aa01d5f10e5fe. * Revert "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0" This reverts commit dc3230591d4428c817703f52574aa01d5f10e5fe, reversing changes made to 085e086782896db22acd292816e53497cbecb726. * fix(stream): fix block data len is too large if data type is vchar * fix: drop output table * feat: runner delete output table * process pDropBlock in trigger task. * fix(stream): opti log level * fix(stream): build block for drop table * fix(stream): set gid for normal table * fix(stream): set gid for normal table * feat: Support delete output. * fix(stream): rows error * fix(stream): memory leak * enh: [TD-37251] Fix external window wrong ts column. * fix(stream): fix calc time check in batch mode * fix: merge aligned external window issue * Revert "fix(stream): fix calc time check in batch mode" This reverts commit d895b7f5776cf77c2ce290cdeedca86c206da6ce. * fix(stream): add test case * fix(stream): add insert drop table logic * fix: external window end issue * fix(stream): add test case * fix(stream): fix trigger pull data * fix(stream): fix history calc request * enh: drop table on snode * fix(stream): adjust hash index if data is filtered in wal * fix(stream): rollback * enh: add merge aligned extwin window row idx * fix: drop output table * fix: compile issue * enh: [TD-37251] Add flag to identify interval window is overlapped * fix: overlap * fix(stream): set gid=-1 for initialized * fix(stream): modify log level * fix: trigger slow issue * fix(stream): add basic test for obj pool * fix(stream): fix metadata clear in trigger * fix(stream): fix idle runner allocation in trigger * fix: handle agg output on externalWin * fix: test case * fix(stream): adjust log * fix: reset pCtx pOutput * fix: memory leak * fix: search first win for tsCol * fix(stream): add test case for schema change * fix: mem leak * fix: mem leak * Reapply "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0" This reverts commit b508e66958eb95117b1d086a964b87b5ac59a19e. * fix(stream): fix virtual table data pull * fix(stream): fix set table request * fix(stream): process empty uidlist * fix(stream): fix set table request * fix(stream): fix data new request in trigger * fix(stream): tablelist error for vtable * fix(stream): block ver is null * fix(stream): remove version limition for wal * fix(stream): block rows error * fix(stream): fix pending calc param in batch mode * fix(stream): auto create table * fix(stream): fix stream vtable data merge * fix: _tcurrentts * fix(stream): destroy hash * fix(stream): fix trigger status * fix(stream): colId error in vtable * fix(stream): update nrows of vtable data block * fix(stream): fix trigger status * fix(stream): enable low latency calc for period trigger * fix: test case file path * fix: test case file path * fix: string to node in reader * enh: add test log * fix(stream): increase wait time of non-low-latency mode * fix(stream): fix column capacity in scalar calculation * fix(stream): fix column capacity in trigger expr calculation * fix: get origTableInfos * fix(stream): fix calc data pull in trigger * fix(stream): fix calc data cache write in trigger * enh: [TD-37251] Add flag to identify interval wind * fix: external window memory usage issue * fix(stream): fix epxr result column in trigger * fix(stream): add metaCache for calc plan * fix(stream): fix stream obj list clear * fix(stream): rollback * enh: [TD-37251] Add flag to identify interval wind * fix: mem free * fix(stream): add metaCache for calc plan * fix(stream): fix calc data cache write in trigger * fix(stream): fix calc data cache write in trigger * enh: optimize external result block memory * fix(stream): modify logic of judge table for create table * fix(stream): fix event window check in trigger * fix(stream): fix count window check in trigger * fix(stream): colSize=0 while encoding block because pDataCol->hasNull is false in secode time & reload table list if create table * enh: optimize stream memory * fix(stream): trigger tag error * fix: add log * fix(stream): fix calc data write in trigger * fix: drop output table * fix(stream): fix max delay in trigger * fix: drop output table * fix(stream): fix max delay in trigger * fix(build): handle return value of function * fix(stream): read gid error if it is child table in stream * fix(stream): fix calc param of history calculation * fix(stream): fix recalc of delete data * fix(stream): fix calc data of period trigger * fix(stream): fix cache read check * fix(stream): filter error in calc plan * fix: reset externla window expr * fix(stream): fix calc parm of max delay * fix: time range and case issues * fix(stream): fix crash in trigger * fix: case issues * fix: fix window node mem leak. * fix(stream): fix meta data clear in trigger * fix(stream): table schema is old in TsdbDataRequest for vtable * fix: fix slingding window place holder check condition. * Revert "fix: fix slingding window place holder check condition." This reverts commit ad864a1dc113961b409e32e2ac7d1feb534b74d6. * fix(stream): null pointer error * fix: case issue * fix(stream): calc data error for vtable * fix(stream): fix data sorter in trigger * fix(stream): add log for delete data * fix(stream): fix start version of realtime calculation * fix(stream): fix cache data merger of vtable * fix: case issues * fix(ci): upgrade stream cases in test_cols_function * fix(stream): gid not found if change tag value * fix: fix slingding window place holder check condition. * fix(stream): fix virt table info request in trigger * fix(stream): set gid = uid if stream table type != SUPER table * fix: clean cache data by group * fix: add block info and case issues * fix: fix heap-buffer-overflow. * fix(stream): fix state window with extend param * fix: fix access null pointer. * fix: case issues * fix: case issue * fix(stream): fix ignore_nodata_trigger option for period trigger * fix(stream): fix pseudo col fetch for calc data * fix: Extend checking time to avoid timeout. * fix: case issues * fix(stream): fix group col fetch for virtual tables * fix(stream): tag is NULL for non vtable * fix(stream): fix sliding check of virtual table * test(stream): check stream status after create all streams * fix: add log * fix(stream): fix wal meta truncate when ignore disorder * fix(stream): gid not found for child table * fix: fix place holder condition pushdown error. * fix(stream): suid not equal when delete data for child table * fix: id issue * fix(stream): disable recalc for count trigger * fix: cast result rowSize error in project * fix(stream): add log for tsdb meta * fix(stream): fix gid in tsdb meta request * fix(stream): fix wend of unclosed windows * fix(stream): fix ignore_nodata_trigger option for period trigger * fix: case issues * fix(stream): fix calc data pull for empty interval window * fix: fix ext window condition. * fix(ci): smaBasic performance check affectd by debug level log * fix(stream): add suid when set table list for vtable * fix: forbid using prefilter when using %%trows an trigger table is virtual table. * fix: forbid prefilter %%trows cases. * fix(stream): sort cid in tsdbVirtalDataReq * fix: forbid prefilter %%trows cases. * fix(stream): add log for virtual table tsdb data * fix(stream): get delete msg for vtable * fix: winRowIndex * Revert "fix: winRowIndex" This reverts commit e08b41cf960bb9ca031b9ea20b4495cbbd4e3ed0. * fix(stream): fix data merge in trigger * test(stream): fix case ans * fix(stream): fix empty calc data pull for period trigger * fix(stream): col index error for tsdbVirtalDataReq * fix(stream): pTableList is NULL for vtable * test(stream): fix case ans * fix(stream): fix notification in trigger * fix(stream): memory leak * fix(stream): fix virtual data pull in trigger * test(stream): fix case ans * fix(stream): col index error for tsdbVirtalDataReq * fix(stream): pTableList is NULL for vtable * test(stream): fix case ans * fix(stream): fix notification in trigger * fix(stream): memory leak * fix(stream): fix virtual data pull in trigger * test(stream): fix case ans * fix: case issue * fix: crash issue * fix: forbid prefilter %%trows cases. * fix(stream): session case * fix(stream): fix data pull for virtual tables * fix(stream): add log * fix(stream): fix calc req send in batch mode * fix: fix stream UT * fix(stream): tablelist is null for non vtable * fix: mem leak * fix(stream): fix compile error in trigger * fix: return code issue --------- Co-authored-by: dapan1121 <wpan@taosdata.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-09-25 07:48:14 +00:00
code = doFilter(pBlock, pJoin->pFinFilter, NULL, NULL);
2024-07-22 03:06:24 +00:00
if (code) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
2024-07-22 03:06:24 +00:00
pJoin->errCode = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pJoin->errCode);
}
}
2025-07-17 06:17:47 +00:00
if (pBlock->info.rows > 0 || pOperator->status == OP_EXEC_DONE) {
2024-02-27 10:55:21 +00:00
pBlock->info.dataLoad = 1;
2023-09-01 05:24:47 +00:00
break;
}
}
2024-04-03 01:54:49 +00:00
pJoin->execInfo.resRows += pBlock ? pBlock->info.rows : 0;
2024-08-27 09:04:44 +00:00
if (pBlock && pBlock->info.rows > 0) {
*pResBlock = pBlock;
}
2024-09-10 08:56:36 +00:00
2024-08-27 09:04:44 +00:00
return code;
2022-05-14 16:04:51 +00:00
}
2023-09-01 05:24:47 +00:00
2023-12-25 01:19:55 +00:00
void destroyGrpArray(void* ppArray) {
SArray* pArray = *(SArray**)ppArray;
taosArrayDestroy(pArray);
}
void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) {
2024-08-19 09:13:27 +00:00
if (NULL == pTable) {
return;
}
2023-12-25 01:19:55 +00:00
mJoinDestroyCreatedBlks(pTable->createdBlks);
taosArrayDestroy(pTable->createdBlks);
tSimpleHashCleanup(pTable->pGrpHash);
taosMemoryFree(pTable->primCol);
taosMemoryFree(pTable->finCols);
taosMemoryFree(pTable->keyCols);
taosMemoryFree(pTable->keyBuf);
2023-12-26 11:28:19 +00:00
taosMemoryFree(pTable->pRowBitmap);
2023-12-25 01:19:55 +00:00
taosArrayDestroy(pTable->eqGrps);
taosArrayDestroyEx(pTable->pGrpArrays, destroyGrpArray);
taosArrayDestroy(pTable->pBlkWinIdx);
2023-12-25 01:19:55 +00:00
}
2023-09-01 05:24:47 +00:00
2023-12-14 06:13:01 +00:00
void destroyMergeJoinOperator(void* param) {
2023-12-25 01:19:55 +00:00
SMJoinOperatorInfo* pJoin = (SMJoinOperatorInfo*)param;
2024-01-19 10:03:16 +00:00
mJoinDestroyCtx(pJoin);
2023-12-22 11:25:55 +00:00
2023-12-25 01:19:55 +00:00
if (pJoin->pFPreFilter != NULL) {
filterFreeInfo(pJoin->pFPreFilter);
pJoin->pFPreFilter = NULL;
}
if (pJoin->pPreFilter != NULL) {
filterFreeInfo(pJoin->pPreFilter);
pJoin->pPreFilter = NULL;
}
if (pJoin->pFinFilter != NULL) {
filterFreeInfo(pJoin->pFinFilter);
pJoin->pFinFilter = NULL;
}
2023-12-22 11:25:55 +00:00
2023-12-25 01:19:55 +00:00
destroyMergeJoinTableCtx(pJoin->probe);
destroyMergeJoinTableCtx(pJoin->build);
2023-12-14 06:13:01 +00:00
2023-12-25 01:19:55 +00:00
taosMemoryFreeClear(pJoin);
2023-12-14 06:13:01 +00:00
}
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
int32_t mJoinHandleConds(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
2023-12-26 11:28:19 +00:00
switch (pJoin->joinType) {
case JOIN_TYPE_INNER: {
SNode* pCond = NULL;
if (pJoinNode->pFullOnCond != NULL) {
if (pJoinNode->node.pConditions != NULL) {
MJ_ERR_RET(mergeJoinConds(&pJoinNode->pFullOnCond, &pJoinNode->node.pConditions));
}
pCond = pJoinNode->pFullOnCond;
} else if (pJoinNode->node.pConditions != NULL) {
pCond = pJoinNode->node.pConditions;
}
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
MJ_ERR_RET(filterInitFromNode(pCond, &pJoin->pFinFilter, 0, pTaskInfo->pStreamRuntimeInfo));
2023-12-26 11:28:19 +00:00
break;
}
case JOIN_TYPE_LEFT:
case JOIN_TYPE_RIGHT:
case JOIN_TYPE_FULL:
if (pJoinNode->pFullOnCond != NULL) {
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
MJ_ERR_RET(filterInitFromNode(pJoinNode->pFullOnCond, &pJoin->pFPreFilter, 0,
pTaskInfo->pStreamRuntimeInfo));
2023-12-26 11:28:19 +00:00
}
if (pJoinNode->pColOnCond != NULL) {
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
MJ_ERR_RET(
filterInitFromNode(pJoinNode->pColOnCond, &pJoin->pPreFilter, 0, pTaskInfo->pStreamRuntimeInfo));
2023-12-26 11:28:19 +00:00
}
if (pJoinNode->node.pConditions != NULL) {
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
MJ_ERR_RET(filterInitFromNode(pJoinNode->node.pConditions, &pJoin->pFinFilter, 0,
pTaskInfo->pStreamRuntimeInfo));
2023-12-26 11:28:19 +00:00
}
break;
default:
break;
}
2024-01-03 08:25:45 +00:00
return TSDB_CODE_SUCCESS;
2023-12-26 11:28:19 +00:00
}
2024-01-05 06:40:05 +00:00
int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
switch (pJoin->joinType) {
case JOIN_TYPE_INNER:
pJoin->joinFp = mInnerJoinDo;
break;
case JOIN_TYPE_LEFT:
2024-01-10 10:56:49 +00:00
case JOIN_TYPE_RIGHT: {
switch (pJoin->subType) {
2025-07-17 06:17:47 +00:00
case JOIN_STYPE_OUTER:
2024-01-10 10:56:49 +00:00
pJoin->joinFp = mLeftJoinDo;
2024-02-29 09:52:53 +00:00
pJoin->grpResetFp = mLeftJoinGroupReset;
2024-01-10 10:56:49 +00:00
break;
2025-07-17 06:17:47 +00:00
case JOIN_STYPE_SEMI:
2024-01-10 10:56:49 +00:00
pJoin->joinFp = mSemiJoinDo;
break;
case JOIN_STYPE_ANTI:
pJoin->joinFp = mAntiJoinDo;
break;
2024-01-12 10:29:27 +00:00
case JOIN_STYPE_WIN:
2024-02-01 06:58:08 +00:00
pJoin->joinFp = mWinJoinDo;
2024-02-27 10:18:25 +00:00
pJoin->grpResetFp = mWinJoinGroupReset;
2024-01-12 10:29:27 +00:00
break;
2024-01-10 10:56:49 +00:00
default:
break;
}
2024-01-05 06:40:05 +00:00
break;
2025-07-17 06:17:47 +00:00
}
2024-01-05 06:40:05 +00:00
case JOIN_TYPE_FULL:
pJoin->joinFp = mFullJoinDo;
break;
default:
break;
}
return TSDB_CODE_SUCCESS;
}
2023-12-14 06:13:01 +00:00
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
static int32_t resetMergeJoinOperState(SOperatorInfo* pOper) {
mJoinResetOperator(pOper);
return 0;
}
2024-07-24 09:08:08 +00:00
int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
2025-07-17 06:17:47 +00:00
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo,
SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
2024-07-24 09:08:08 +00:00
2025-07-17 06:17:47 +00:00
int32_t oldNum = numOfDownstream;
bool newDownstreams = false;
int32_t code = TSDB_CODE_SUCCESS;
SOperatorInfo* pOperator = NULL;
2024-07-22 03:06:24 +00:00
SMJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMJoinOperatorInfo));
if (pInfo == NULL) {
code = terrno;
goto _return;
}
2024-08-19 01:15:36 +00:00
pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2024-07-22 03:06:24 +00:00
if (pOperator == NULL) {
code = terrno;
2023-12-14 06:13:01 +00:00
goto _return;
2023-12-01 10:33:50 +00:00
}
2026-03-20 02:08:49 +00:00
initOperatorCostInfo(pOperator);
2023-12-01 10:33:50 +00:00
2023-12-05 11:27:57 +00:00
pInfo->pOperator = pOperator;
2024-03-06 03:18:57 +00:00
MJ_ERR_JRET(mJoinInitDownstreamInfo(pInfo, &pDownstream, &numOfDownstream, &newDownstreams));
2023-12-01 10:33:50 +00:00
2025-07-17 06:17:47 +00:00
setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
2023-12-01 10:33:50 +00:00
2023-12-13 10:54:16 +00:00
mJoinSetBuildAndProbeTable(pInfo, pJoinNode);
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
MJ_ERR_JRET(mJoinHandleConds(pInfo, pJoinNode, pTaskInfo));
2023-12-26 11:28:19 +00:00
2024-07-22 03:06:24 +00:00
MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0], newDownstreams));
MJ_ERR_JRET(mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1], newDownstreams));
2023-12-01 10:33:50 +00:00
2023-12-14 09:06:19 +00:00
MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
2024-01-18 09:09:59 +00:00
MJ_ERR_JRET(mJoinSetImplFp(pInfo));
2023-12-14 09:06:19 +00:00
2025-07-17 06:17:47 +00:00
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mJoinMainProcess, NULL, destroyMergeJoinOperator,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
2023-12-01 10:33:50 +00:00
feat: new stream (#31678) * fix: windows compile issue * test: add vtable cases (#31829) * fix: windows compile issues * test:add test cases * fix: windows compile issue * case: em-4 stream case submit * test: stream4_sub1 found bug2 * test: submit test_scene_meters_bug2.py * add stream parameters example * feat: [TS-6100] Do not translate const value as column. * Feat/ts 6100 3.0 zlv (#31747) * modify asan exampel * modify asan exampel * add example * add example * modify case example --------- Co-authored-by: zelv01 <1101510017@qq.com> * feat(stream): fix memory leak * modify sliding example * test: update test case. * feat(stream): fix conflicts * fix: add offset case 10a 10s 10m 10h 10d * feat(stream): fix conflicts * chore(stream): rename case name #TS-6100 * add case * modify example * fix: windows compile issues * fix: data null check * feat: [TS-6100] Forbid where when using %%trows (#31827) * feat: [TS-6100] Forbid where when using %%trows * test: update cases * feat: [TS-6100] Fix leaks. --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * test: reproduce bugs * test: update test case. * test: update test case. * feat: [TS-6100] Fix leaks. * test: add cases * Feat/ts 6100 3.0.pw10 (#31841) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * test: reproduce bugs * fix: add sliding interval combine case * test: add cases * test: add recalc test. * test: reproduce bugs * case : add vt ts is null check * modify case * bug: submit test_idmp_meters_bug3.py * test: add test for recalc. * test: add cases * fix: error code check * test: add cases * fix(stream): scan wal with schema in that version * add case * test: add cases * test: update test case. * fix: windows compile issues * add case * test: add cases (#31845) * modify case * fix: reset interpPrev * test: add test_idmp_meters bug4 and bug3 * add case * fix(stream): opti wal interface * fix: remove test_idmp_meters_bug5.py * test: add cases * fix(stream): fix ts data fetch for virtual tables * cancel asan case * test: update test case. * test: update test case. * add case * test: add cases * test: add cases * test: add case test_idmp_meters_bug5.py * test: update test case. * fix(stream): tmq error * test: add cases * feat: [TS-6100] Restore deleted code in mndSma.c since they are still in use. * fix(stream): optimize val scan logic * test: add test_recalc_expired_time.py to ci. * test: update test case. * test: update test case. * feat: [TS-6100] Fix fill range check * fix(stream): optimize val scan logic * add case * test: modify for partition by %%1 * test: add fun case stream4_sub7 * fix(stream): optimize val scan logic * add case * feat: [TS-6100] Rename OPTIONS to STREAM_OPTIONS. * test: add test for recalc. * test: use stream_options. * fix: some cases error. * test: remove recalc from ci. * fix: ci case issues (#31880) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): fix compilation error * fix(stream): optimize val scan logic * test:add test cases * test: modify case * fix: external agg error * test(stream): tobacco scene testing #TD-36514 * test: add stream cases (#31885) * fix: windows compile issue * fix: calc timerange * fix: windows compile issue * modify case * fix(stream): compile error * test: remove one debug test case file * test: modify * test: add test cases * test: reproduce bugs * test: reproduce bugs * feat: [TS-6100] Placeholder function should only appera in SELECT and… (#31868) * feat: [TS-6100] Placeholder function should only appera in SELECT and WHERE and FROM. * test: update case --------- Co-authored-by: Simon Guan <guanshengliang@qq.com> * add example * add example * modify case example * modify case * test:alter sql * test: add stream5 case * fix(stream): get schema error with version * test: add delete recalc test py. * test: remove bug cases * test: stream5 case test passed * test: add state cases (#31893) * fix(stream): compile error * test: modify case * test: add cases * test: add test. * test: update test case. * chore(test): fix case err * test: update test case. * fix: align data get * fix(stream): fix row index of datablock written into data cache * fix: put align data * test: update test case. * test: add test cases for virtual table * chore(test): fix case err #TD-36514 * add case * test: add test for water mark. * test: add meters bug6 for stream5 * test: add cases (#31903) * test: add test for recalc. * feat: [TS-6100] %%trows can only be used when event type is window close. * test: add precision of database for ms/us/ns * modify case * add case * add case * test: add test to ci. * modify case * fix: ci case issues (#31904) * enh: add operator reset func * fix: merge join reset issue * fix: memory issues * fix: add debug assert * fix: memory issues * fix: memory leak * fix: memory issues * fix taos log miss * fix: case issue * fix: case issue * fix: case issues * fix: drop dnode issue * fix: memory issues * fix: memory issues * fix: memory leak issues * fix: recalculate time range issue * fix: add debug log * fix: memory issues * fix: enable case asan * Update streamlist_for_ci.task * fix: case asan issue * fix: stream name issue * fix: external window compile issues * fix: deploy memory issue * fix: ahandle issue * fix: ahandle issue * fix: ahandle issue * fix: virtual table reader list issue * fix: log info * fix: msg error * fix: virtual table addr list issue * fix: memory issues * fix: memory leak issue * fix: memory issues * fix: memory free issues * fix: memory issues * fix: snode deploy issue * fix: mnode reader issue * fix: memory issues * fix: add debug test * enh: add ignore nodata trigger * fix: memory leaks * fix: configuration issue * fix: memory issue * fix: external window issue * fix: external window issues * fix: external window placeholder issue * fix: placeholder function init issues * fix: memory leak issue * fix: add debug log * fix: compile issues * fix: double free issue * fix: runner addr update issue * fix: msg rsp issue * fix: external window reset issue * fix: configuration issue * fix: deploy msg issue * fix: compile issue * fix: external window idx issue * fix: ci issues * fix: ci case issues * fix: drop dnode issue --------- Co-authored-by: huohong <sallyhuo@taosdata.com> * fix(stream): ci error * test: update test case. * feat: [TS-6100] Disable some failed UT. * feat: [TS-6100] Fix virtual table * test: add bug 5. * test: add test delete recalc to ci. * test: add bug 6. * test(stream): tobacco scene #TD-36514 * fix: reqCids,reqCols memory leak in SSTriggerRealtimeContext Co-authored-by: Tony Zhang <tonyzhang@taosdata.com> * test: add case stream6 * fix(stream): implement some pending features in trigger task * modify case * modify case * fix: case issues * modify case * test: add recalc for warter mark. * fix(stream): fix count window trigger of virtual tables * fix(stream): memory leak * test: fix run err. * test: add stream6 bug7 * fix: adjust format * test(stream): tobacco scene testing #TD-36514 * test: change bug7 with update window1 and 2 * test: add test bug 7. * case: restore write 3 window * fix: windows compile issue * fix: notify * test: add cases * modify case * test: update test case. * test(stream): toobacco scene testing #TD-36514 --------- Co-authored-by: Simon Guan <slguan@taosdata.com> Co-authored-by: plum-lihui <huili@taosdata.com> Co-authored-by: Alex Duan <417921451@qq.com> Co-authored-by: zelv01 <1101510017@qq.com> Co-authored-by: Jing Sima <simondominic9997@outlook.com> Co-authored-by: xiangyang guo <66111494+happyguoxy@users.noreply.github.com> Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: zyyang90 <zyyang@taosdata.com> Co-authored-by: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Co-authored-by: facetosea <285808407@qq.com> Co-authored-by: Simon Guan <guanshengliang@qq.com> Co-authored-by: huohong <sallyhuo@taosdata.com> Co-authored-by: Li Hui <52318143+plum-lihui@users.noreply.github.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: xiao-77 <berylbao@taosdata.com> Co-authored-by: Zhixiao Bao <62235797+xiao-77@users.noreply.github.com> Co-authored-by: happyguoxy <happy_guoxy@163.com> Co-authored-by: Tony Zhang <34825804+Tony2h@users.noreply.github.com> Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-07-16 06:42:16 +00:00
setOperatorResetStateFn(pOperator, resetMergeJoinOperState);
2023-12-14 06:13:01 +00:00
MJ_ERR_JRET(appendDownstream(pOperator, pDownstream, numOfDownstream));
2023-12-01 10:33:50 +00:00
if (newDownstreams) {
taosMemoryFree(pDownstream);
pOperator->numOfRealDownstream = 1;
} else {
pOperator->numOfRealDownstream = 2;
}
2024-07-24 09:08:08 +00:00
*pOptrInfo = pOperator;
return code;
2023-12-01 10:33:50 +00:00
2023-12-14 06:13:01 +00:00
_return:
2024-07-22 03:06:24 +00:00
2023-12-01 10:33:50 +00:00
if (pInfo != NULL) {
destroyMergeJoinOperator(pInfo);
}
2024-09-02 07:12:47 +00:00
destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum);
2023-12-01 10:33:50 +00:00
if (newDownstreams) {
taosMemoryFree(pDownstream);
}
pTaskInfo->code = code;
2025-07-17 06:17:47 +00:00
2024-07-24 09:08:08 +00:00
return code;
2023-12-01 10:33:50 +00:00
}