TDengine/source/libs/executor/src/hashjoin.c
WANG MINGMING 558928cbe1
feat(stream): optimize stream logic (#33027)
* fix: remove debug log

* fix: remove assert

* fix: delete unused code

* enh: [TD-37251] Support expr in state window.

* feat(stream): support expr in state window trigger

* enh: [TD-37251] Fix SCL_IS_CONST_CALC condition.

* fix(stream): set ver in wal

* fix: print code

* fix: increase runner replica num

* fix: trigger mem error

* fix: sliding _tnext_ts value

* fix(stream): disable tagFilterCache in stream reader trigger

* fix: crash

* Revert "fix(stream): fix history calc finish check"

This reverts commit f93d17f1d2.

* Revert "fix(stream): fix calc request allocation in trigger"

This reverts commit c5410f6da0.

* fix(stream): fix calc request allocation in trigger

* enh: [TD-37251] External window support more placeholder.

* fix(stream): modify size of return from 1000000->4096

* enh: [TD-37251] Modify error msg when stream query do not have from clause.

* fix(stream): add log for group not found

* fix(stream): do not return gid=0 in walMetaData interface

* enh: [TD-37251] Fix missing ts column in vtable query.

* fix: test case build failed

* fix: invalid read issue

* fix(stream): add vtable logic

* fix(stream): encode error in wal

* fix(stream): add vtable logic

* fix(stream): add log

* fix: diff funcition crash

* Revert "Merge branch 'enh/TD-37251-3.0-dropoutput' into enh/TD-37251-3.0"

This reverts commit e93cbd6fd4, reversing
changes made to dc3230591d.

* Revert "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0"

This reverts commit dc3230591d, reversing
changes made to 085e086782.

* fix(stream): fix block data len is too large if data type is vchar

* fix: drop output table

* feat: runner delete output table

* process pDropBlock in trigger task.

* fix(stream): opti log level

* fix(stream): build block for drop table

* fix(stream): set gid for normal table

* fix(stream): set gid for normal table

* feat: Support delete output.

* fix(stream): rows error

* fix(stream): memory leak

* enh: [TD-37251] Fix external window wrong ts column.

* fix(stream): fix calc time check in batch mode

* fix: merge aligned external window issue

* Revert "fix(stream): fix calc time check in batch mode"

This reverts commit d895b7f577.

* fix(stream): add test case

* fix(stream): add insert drop table logic

* fix: external window end issue

* fix(stream): add test case

* fix(stream): fix trigger pull data

* fix(stream): fix history calc request

* enh: drop table on snode

* fix(stream): adjust hash index if data is filtered in wal

* fix(stream): rollback

* enh: add merge aligned extwin window row idx

* fix: drop output table

* fix: compile issue

* enh: [TD-37251] Add flag to identify interval window is overlapped

* fix: overlap

* fix(stream): set gid=-1 for initialized

* fix(stream): modify log level

* fix: trigger slow issue

* fix(stream): add basic test for obj pool

* fix(stream): fix metadata clear in trigger

* fix(stream): fix idle runner allocation in trigger

* fix: handle agg output on externalWin

* fix: test case

* fix(stream): adjust log

* fix: reset pCtx pOutput

* fix: memory leak

* fix: search first win for tsCol

* fix(stream): add test case for schema change

* fix: mem leak

* fix: mem leak

* Reapply "Merge branch 'enh/TD-37251-3.0-vtable' into enh/TD-37251-3.0"

This reverts commit b508e66958.

* fix(stream): fix virtual table data pull

* fix(stream): fix set table request

* fix(stream): process empty uidlist

* fix(stream): fix set table request

* fix(stream): fix data new request in trigger

* fix(stream): tablelist error for vtable

* fix(stream): block ver is null

* fix(stream): remove version limition for wal

* fix(stream): block rows error

* fix(stream): fix pending calc param in batch mode

* fix(stream): auto create table

* fix(stream): fix stream vtable data merge

* fix: _tcurrentts

* fix(stream): destroy hash

* fix(stream): fix trigger status

* fix(stream): colId error in vtable

* fix(stream): update nrows of vtable data block

* fix(stream): fix trigger status

* fix(stream): enable low latency calc for period trigger

* fix: test case file path

* fix: test case file path

* fix: string to node in reader

* enh: add test log

* fix(stream): increase wait time of non-low-latency mode

* fix(stream): fix column capacity in scalar calculation

* fix(stream): fix column capacity in trigger expr calculation

* fix: get origTableInfos

* fix(stream): fix calc data pull in trigger

* fix(stream): fix calc data cache write in trigger

* enh: [TD-37251] Add flag to identify interval wind

* fix: external window memory usage issue

* fix(stream): fix epxr result column in trigger

* fix(stream): add metaCache for calc plan

* fix(stream): fix stream obj list clear

* fix(stream): rollback

* enh: [TD-37251] Add flag to identify interval wind

* fix: mem free

* fix(stream): add metaCache for calc plan

* fix(stream): fix calc data cache write in trigger

* fix(stream): fix calc data cache write in trigger

* enh: optimize external result block memory

* fix(stream): modify logic of judge table for create table

* fix(stream): fix event window check in trigger

* fix(stream): fix count window check in trigger

* fix(stream): colSize=0 while encoding block because pDataCol->hasNull is false in secode time & reload table list if create table

* enh: optimize stream memory

* fix(stream): trigger tag error

* fix: add log

* fix(stream): fix calc data write in trigger

* fix: drop output table

* fix(stream): fix max delay in trigger

* fix: drop output table

* fix(stream): fix max delay in trigger

* fix(build): handle return value of function

* fix(stream): read gid error if it is child table in stream

* fix(stream): fix calc param of history calculation

* fix(stream): fix recalc of delete data

* fix(stream): fix calc data of period trigger

* fix(stream): fix cache read check

* fix(stream): filter error in calc plan

* fix: reset externla window expr

* fix(stream): fix calc parm of max delay

* fix: time range and case issues

* fix(stream): fix crash in trigger

* fix: case issues

* fix: fix window node mem leak.

* fix(stream): fix meta data clear in trigger

* fix(stream): table schema is old in TsdbDataRequest for vtable

* fix: fix slingding window place holder check condition.

* Revert "fix: fix slingding window place holder check condition."

This reverts commit ad864a1dc1.

* fix(stream): null pointer error

* fix: case issue

* fix(stream): calc data error for vtable

* fix(stream): fix data sorter in trigger

* fix(stream): add log for delete data

* fix(stream): fix start version of realtime calculation

* fix(stream): fix cache data merger of vtable

* fix: case issues

* fix(ci): upgrade stream cases in test_cols_function

* fix(stream): gid not found if change tag value

* fix: fix slingding window place holder check condition.

* fix(stream): fix virt table info request in trigger

* fix(stream): set gid = uid if stream table type != SUPER table

* fix: clean cache data by group

* fix: add block info and case issues

* fix: fix heap-buffer-overflow.

* fix(stream): fix state window with extend param

* fix: fix access null pointer.

* fix: case issues

* fix: case issue

* fix(stream): fix ignore_nodata_trigger option for period trigger

* fix(stream): fix pseudo col fetch for calc data

* fix: Extend checking time to avoid timeout.

* fix: case issues

* fix(stream): fix group col fetch for virtual tables

* fix(stream): tag is NULL for non vtable

* fix(stream): fix sliding check of virtual table

* test(stream): check stream status after create all streams

* fix: add log

* fix(stream): fix wal meta truncate when ignore disorder

* fix(stream): gid not found for child table

* fix: fix place holder condition pushdown error.

* fix(stream): suid not equal when delete data for child table

* fix: id issue

* fix(stream): disable recalc for count trigger

* fix: cast result rowSize error in project

* fix(stream): add log for tsdb meta

* fix(stream): fix gid in tsdb meta request

* fix(stream): fix wend of unclosed windows

* fix(stream): fix ignore_nodata_trigger option for period trigger

* fix: case issues

* fix(stream): fix calc data pull for empty interval window

* fix: fix ext window condition.

* fix(ci): smaBasic performance check affectd by debug level log

* fix(stream): add suid when set table list for vtable

* fix: forbid using prefilter when using %%trows an trigger table is virtual table.

* fix: forbid prefilter %%trows cases.

* fix(stream): sort cid in tsdbVirtalDataReq

* fix: forbid prefilter %%trows cases.

* fix(stream): add log for virtual table tsdb data

* fix(stream): get delete msg for vtable

* fix: winRowIndex

* Revert "fix: winRowIndex"

This reverts commit e08b41cf96.

* fix(stream): fix data merge in trigger

* test(stream): fix case ans

* fix(stream): fix empty calc data pull for period trigger

* fix(stream): col index error for tsdbVirtalDataReq

* fix(stream): pTableList is NULL for vtable

* test(stream): fix case ans

* fix(stream): fix notification in trigger

* fix(stream): memory leak

* fix(stream): fix virtual data pull in trigger

* test(stream): fix case ans

* fix(stream): col index error for tsdbVirtalDataReq

* fix(stream): pTableList is NULL for vtable

* test(stream): fix case ans

* fix(stream): fix notification in trigger

* fix(stream): memory leak

* fix(stream): fix virtual data pull in trigger

* test(stream): fix case ans

* fix: case issue

* fix: crash issue

* fix: forbid prefilter %%trows cases.

* fix(stream): session case

* fix(stream): fix data pull for virtual tables

* fix(stream): add log

* fix(stream): fix calc req send in batch mode

* fix: fix stream UT

* fix(stream): tablelist is null for non vtable

* fix: mem leak

* fix(stream): fix compile error in trigger

* fix: return code issue

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: facetosea <285808407@qq.com>
Co-authored-by: Jing Sima <simondominic9997@outlook.com>
Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
Co-authored-by: Tony Zhang <tonyzhang@taosdata.com>
2025-09-25 15:48:14 +08:00

352 lines
10 KiB
C
Executable file

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
#include "hashjoin.h"
int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinTableCtx* pProbe = pJoin->pProbe;
SHJoinCtx* pCtx = &pJoin->ctx;
SSDataBlock* pRes = pJoin->finBlk;
size_t bufLen = 0;
int32_t code = 0;
bool allFetched = false;
if (pJoin->ctx.pBuildRow) {
hJoinAppendResToBlock(pOperator, pRes, &allFetched);
if (pRes->info.rows >= pRes->info.capacity) {
if (allFetched) {
++pCtx->probeStartIdx;
}
return code;
} else {
++pCtx->probeStartIdx;
}
}
for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
continue;
}
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
*/
if (pGroup) {
pCtx->pBuildRow = pGroup->rows;
hJoinAppendResToBlock(pOperator, pRes, &allFetched);
if (pRes->info.rows >= pRes->info.capacity) {
if (allFetched) {
++pCtx->probeStartIdx;
}
return code;
}
}
}
pCtx->rowRemains = false;
return code;
}
#ifdef HASH_JOIN_FULL
int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
bool allFetched = false;
SHJoinCtx* pCtx = &pJoin->ctx;
while (!allFetched) {
hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
if (pJoin->midBlk->info.rows > 0) {
HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL, NULL));
if (pJoin->midBlk->info.rows > 0) {
pCtx->readMatch = true;
HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));
if (pCtx->midRemains) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
}
if (allFetched && !pCtx->readMatch) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
}
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
++pCtx->probeStartIdx;
*loopCont = true;
return TSDB_CODE_SUCCESS;
}
int32_t hLeftJoinHandleSeqProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
SHJoinTableCtx* pProbe = pJoin->pProbe;
SHJoinCtx* pCtx = &pJoin->ctx;
size_t bufLen = 0;
bool allFetched = false;
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
return TSDB_CODE_SUCCESS;
}
for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
continue;
}
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
*/
if (NULL == pGroup) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
++pCtx->probeStartIdx;
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
continue;
}
pCtx->readMatch = false;
pCtx->pBuildRow = pGroup->rows;
allFetched = false;
while (!allFetched) {
hJoinAppendResToBlock(pOperator, pJoin->midBlk, &allFetched);
if (pJoin->midBlk->info.rows > 0) {
HJ_ERR_RET(doFilter(pJoin->midBlk, pJoin->pPreFilter, NULL, NULL));
if (pJoin->midBlk->info.rows > 0) {
pCtx->readMatch = true;
HJ_ERR_RET(hJoinCopyMergeMidBlk(pCtx, &pJoin->midBlk, &pJoin->finBlk));
if (pCtx->midRemains) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
}
if (allFetched && !pCtx->readMatch) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
}
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
}
pCtx->probePhase = E_JOIN_PHASE_POST;
*loopCont = true;
return TSDB_CODE_SUCCESS;
}
int32_t hLeftJoinHandleRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
bool allFetched = false;
SHJoinCtx* pCtx = &pJoin->ctx;
hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched);
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
} else {
++pCtx->probeStartIdx;
}
*loopCont = true;
return TSDB_CODE_SUCCESS;
}
int32_t hLeftJoinHandleProbeRows(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
SHJoinTableCtx* pProbe = pJoin->pProbe;
SHJoinCtx* pCtx = &pJoin->ctx;
size_t bufLen = 0;
bool allFetched = false;
for (; pCtx->probeStartIdx <= pCtx->probeEndIdx; ++pCtx->probeStartIdx) {
if (hJoinCopyKeyColsDataToBuf(pProbe, pCtx->probeStartIdx, &bufLen)) {
continue;
}
SGroupData* pGroup = tSimpleHashGet(pJoin->pKeyHash, pProbe->keyData, bufLen);
/*
size_t keySize = 0;
int32_t* pKey = tSimpleHashGetKey(pGroup, &keySize);
A S S E R T(keySize == bufLen && 0 == memcmp(pKey, pProbe->keyData, bufLen));
int64_t rows = getSingleKeyRowsNum(pGroup->rows);
pJoin->execInfo.expectRows += rows;
qTrace("hash_key:%d, rows:%" PRId64, *pKey, rows);
*/
if (NULL == pGroup) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pCtx->probeStartIdx, 1));
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
++pCtx->probeStartIdx;
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
continue;
}
pCtx->pBuildRow = pGroup->rows;
hJoinAppendResToBlock(pOperator, pJoin->finBlk, &allFetched);
if (hJoinBlkReachThreshold(pJoin, pJoin->finBlk->info.rows)) {
if (allFetched) {
++pCtx->probeStartIdx;
}
*loopCont = false;
return TSDB_CODE_SUCCESS;
}
}
pCtx->probePhase = E_JOIN_PHASE_POST;
*loopCont = true;
return TSDB_CODE_SUCCESS;
}
int32_t hLeftJoinDo(struct SOperatorInfo* pOperator) {
SHJoinOperatorInfo* pJoin = pOperator->info;
SHJoinCtx* pCtx = &pJoin->ctx;
while (pCtx->rowRemains) {
switch (pCtx->probePhase) {
case E_JOIN_PHASE_PRE: {
int32_t rows = pCtx->probeStartIdx - pCtx->probePreIdx;
int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows;
if (rows <= rowsLeft) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rows));
pCtx->probePhase = E_JOIN_PHASE_CUR;
} else {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, 0, rowsLeft));
pJoin->ctx.probePreIdx += rowsLeft;
return TSDB_CODE_SUCCESS;
}
break;
}
case E_JOIN_PHASE_CUR: {
bool loopCont = false;
if (NULL == pJoin->ctx.pBuildRow) {
HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqProbeRows(pOperator, pJoin, &loopCont) : hLeftJoinHandleProbeRows(pOperator, pJoin, &loopCont));
} else {
HJ_ERR_RET(pJoin->pPreFilter ? hLeftJoinHandleSeqRowRemains(pOperator, pJoin, &loopCont) : hLeftJoinHandleRowRemains(pOperator, pJoin, &loopCont));
}
if (!loopCont) {
return TSDB_CODE_SUCCESS;
}
break;
}
case E_JOIN_PHASE_POST: {
if (pCtx->probeEndIdx < (pCtx->pProbeData->info.rows - 1) && pCtx->probePostIdx <= (pCtx->pProbeData->info.rows - 1)) {
int32_t rowsLeft = pJoin->finBlk->info.capacity - pJoin->finBlk->info.rows;
int32_t rows = pCtx->pProbeData->info.rows - pCtx->probePostIdx;
if (rows <= rowsLeft) {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rows));
pCtx->rowRemains = false;
} else {
HJ_ERR_RET(hJoinCopyNMatchRowsToBlock(pJoin, pJoin->finBlk, pJoin->ctx.probePostIdx, rowsLeft));
pCtx->probePostIdx += rowsLeft;
return TSDB_CODE_SUCCESS;
}
} else {
pJoin->ctx.rowRemains = false;
}
break;
}
default:
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
}
return TSDB_CODE_SUCCESS;
}
#endif