TDengine/source/libs/executor/src/executorInt.c

1329 lines
44 KiB
C
Raw Normal View History

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-04-04 06:54:39 +00:00
#include "filter.h"
#include "function.h"
2022-04-20 06:59:06 +00:00
#include "functionMgt.h"
#include "os.h"
2022-04-04 06:54:39 +00:00
#include "querynodes.h"
2022-04-20 06:59:06 +00:00
#include "tfill.h"
2022-04-23 10:29:45 +00:00
#include "tname.h"
2022-03-04 05:25:39 +00:00
#include "tdatablock.h"
2022-01-20 09:10:28 +00:00
#include "tmsg.h"
#include "ttime.h"
2022-01-08 08:28:44 +00:00
2023-04-28 03:42:34 +00:00
#include "executorInt.h"
2022-05-27 10:13:22 +00:00
#include "index.h"
2023-04-28 03:42:34 +00:00
#include "operator.h"
#include "query.h"
2023-04-28 03:42:34 +00:00
#include "querytask.h"
2023-08-18 08:54:33 +00:00
#include "storageapi.h"
#include "tcompare.h"
2022-01-08 14:59:24 +00:00
#include "thash.h"
#include "ttypes.h"
2023-04-04 06:50:58 +00:00
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
2022-03-09 16:36:30 +00:00
uint32_t v = taosRand();
if (v % 1000 <= 0) {
return NULL;
} else {
2022-03-25 16:29:53 +00:00
return taosMemoryMalloc(__size);
}
}
static UNUSED_FUNC void* u_calloc(size_t num, size_t __size) {
2022-03-09 16:36:30 +00:00
uint32_t v = taosRand();
if (v % 1000 <= 0) {
return NULL;
} else {
2022-03-25 16:29:53 +00:00
return taosMemoryCalloc(num, __size);
}
}
static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
2022-03-09 16:36:30 +00:00
uint32_t v = taosRand();
if (v % 5 <= 1) {
return NULL;
} else {
2022-03-25 16:29:53 +00:00
return taosMemoryRealloc(p, __size);
}
}
#define calloc u_calloc
#define malloc u_malloc
#define realloc u_realloc
#endif
2024-07-22 04:51:25 +00:00
static int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
2024-07-24 09:08:08 +00:00
static int32_t initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
2024-07-29 02:35:06 +00:00
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
2023-04-03 06:49:14 +00:00
2022-11-25 10:22:09 +00:00
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
2024-07-22 04:51:25 +00:00
static void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
2023-05-16 06:12:48 +00:00
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup);
2022-04-26 12:26:32 +00:00
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
2022-03-29 07:24:25 +00:00
SFilePage* pData = NULL;
// in the first scan, new space needed for results
int32_t pageId = -1;
if (*currentPageId == -1) {
pData = getNewBufPage(pResultBuf, &pageId);
2024-08-05 03:17:49 +00:00
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return NULL;
}
pData->num = sizeof(SFilePage);
} else {
pData = getBufPage(pResultBuf, *currentPageId);
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return NULL;
}
pageId = *currentPageId;
2022-04-11 06:09:47 +00:00
if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one
releaseBufPage(pResultBuf, pData);
pData = getNewBufPage(pResultBuf, &pageId);
2024-08-05 03:17:49 +00:00
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return NULL;
}
2024-08-05 03:17:49 +00:00
pData->num = sizeof(SFilePage);
}
}
if (pData == NULL) {
return NULL;
}
setBufPageDirty(pData, true);
// set the number of rows in current disk page
SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
2023-01-04 09:29:02 +00:00
2023-02-23 06:58:22 +00:00
memset((char*)pResultRow, 0, interBufSize);
pResultRow->pageId = pageId;
pResultRow->offset = (int32_t)pData->num;
*currentPageId = pageId;
2022-04-11 06:09:47 +00:00
pData->num += interBufSize;
return pResultRow;
}
/**
* the struct of key in hash table
* +----------+---------------+
* | group id | key data |
* | 8 bytes | actual length |
* +----------+---------------+
*/
2022-05-03 06:43:53 +00:00
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
2023-12-11 03:46:15 +00:00
int32_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
2023-03-22 08:08:16 +00:00
bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup) {
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
2023-03-22 08:08:16 +00:00
if (!keepGroup) {
*(uint64_t*)pSup->keyBuf = calcGroupId(pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
}
2022-01-20 05:52:46 +00:00
2022-04-23 10:29:45 +00:00
SResultRowPosition* p1 =
(SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
2022-01-20 05:52:46 +00:00
SResultRow* pResult = NULL;
2022-01-20 05:52:46 +00:00
// in case of repeat scan/reverse scan, no new time window added.
if (isIntervalQuery) {
if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
pResult = getResultRowByPos(pResultBuf, p1, true);
2024-08-06 09:32:20 +00:00
if (pResult == NULL) {
pTaskInfo->code = terrno;
return NULL;
}
if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
pTaskInfo->code = terrno;
return NULL;
}
2022-01-20 05:52:46 +00:00
}
} else {
2022-04-23 10:29:45 +00:00
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
// pResultRowInfo object.
2022-01-20 05:52:46 +00:00
if (p1 != NULL) {
// todo
pResult = getResultRowByPos(pResultBuf, p1, true);
if (NULL == pResult) {
2024-08-06 09:32:20 +00:00
pTaskInfo->code = terrno;
return NULL;
}
if (pResult->pageId != p1->pageId || pResult->offset != p1->offset) {
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
pTaskInfo->code = terrno;
return NULL;
}
2022-01-20 05:52:46 +00:00
}
}
2022-04-28 08:31:35 +00:00
// 1. close current opened time window
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
SResultRowPosition pos = pResultRowInfo->cur;
2022-06-04 11:54:55 +00:00
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
if (pPage == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
2024-08-06 09:32:20 +00:00
pTaskInfo->code = terrno;
return NULL;
}
releaseBufPage(pResultBuf, pPage);
}
// allocate a new buffer page
if (pResult == NULL) {
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (pResult == NULL) {
2024-08-06 09:32:20 +00:00
pTaskInfo->code = terrno;
return NULL;
}
// add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
2024-07-22 04:51:25 +00:00
int32_t code = tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
2024-07-29 02:35:06 +00:00
sizeof(SResultRowPosition));
2024-07-22 04:51:25 +00:00
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
2024-08-06 09:32:20 +00:00
pTaskInfo->code = code;
return NULL;
2024-07-22 04:51:25 +00:00
}
2022-01-20 05:52:46 +00:00
}
// 2. set the new time window to be the new active time window
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
2022-01-20 05:52:46 +00:00
// too many time window in query
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
tSimpleHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
2024-08-06 09:32:20 +00:00
pTaskInfo->code = TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW;
return NULL;
2022-01-20 05:52:46 +00:00
}
2022-03-31 08:10:32 +00:00
return pResult;
2022-01-20 05:52:46 +00:00
}
// query_range_start, query_range_end, window_duration, window_start, window_end
2024-07-22 04:51:25 +00:00
int32_t initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
pColData->info.bytes = sizeof(int64_t);
2024-07-22 04:51:25 +00:00
int32_t code = colInfoDataEnsureCapacity(pColData, 5, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
2023-02-20 02:04:08 +00:00
colDataSetInt64(pColData, 0, &pQueryWindow->skey);
colDataSetInt64(pColData, 1, &pQueryWindow->ekey);
int64_t interval = 0;
2023-02-20 02:04:08 +00:00
colDataSetInt64(pColData, 2, &interval); // this value may be variable in case of 'n' and 'y'.
colDataSetInt64(pColData, 3, &pQueryWindow->skey);
colDataSetInt64(pColData, 4, &pQueryWindow->ekey);
2024-07-22 04:51:25 +00:00
return TSDB_CODE_SUCCESS;
}
2024-07-22 04:51:25 +00:00
static int32_t doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SqlFunctionCtx* pCtx = pExprSup->pCtx;
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
pCtx[i].order = order;
2022-05-17 08:53:55 +00:00
pCtx[i].input.numOfRows = pBlock->info.rows;
2024-07-22 04:51:25 +00:00
code = setBlockSMAInfo(&pCtx[i], &pExprSup->pExprInfo[i], pBlock);
QUERY_CHECK_CODE(code, lino, _end);
pCtx[i].pSrcBlock = pBlock;
2023-02-21 09:24:09 +00:00
pCtx[i].scanFlag = scanFlag;
}
2024-07-22 04:51:25 +00:00
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
2024-07-22 04:51:25 +00:00
int32_t setInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol) {
if (pBlock->pBlockAgg != NULL) {
2024-07-22 04:51:25 +00:00
return doSetInputDataBlockInfo(pExprSup, pBlock, order, scanFlag);
} else {
2024-07-22 04:51:25 +00:00
return doSetInputDataBlock(pExprSup, pBlock, order, scanFlag, createDummyCol);
2022-03-09 02:22:53 +00:00
}
}
2022-04-28 08:31:35 +00:00
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
int32_t numOfRows) {
2024-07-22 04:51:25 +00:00
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SColumnInfoData* pColInfo = NULL;
if (pInput->pData[paramIndex] == NULL) {
pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(pColInfo, code, lino, _end, terrno);
// Set the correct column info (data type and bytes)
2022-04-27 02:11:32 +00:00
pColInfo->info.type = pFuncParam->param.nType;
pColInfo->info.bytes = pFuncParam->param.nLen;
pInput->pData[paramIndex] = pColInfo;
} else {
pColInfo = pInput->pData[paramIndex];
}
2024-07-22 04:51:25 +00:00
code = colInfoDataEnsureCapacity(pColInfo, numOfRows, false);
QUERY_CHECK_CODE(code, lino, _end);
2022-04-27 02:11:32 +00:00
int8_t type = pFuncParam->param.nType;
if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT) {
int64_t v = pFuncParam->param.i;
2022-04-23 10:29:45 +00:00
for (int32_t i = 0; i < numOfRows; ++i) {
2023-02-20 02:04:08 +00:00
colDataSetInt64(pColInfo, i, &v);
}
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double v = pFuncParam->param.d;
2022-04-23 10:29:45 +00:00
for (int32_t i = 0; i < numOfRows; ++i) {
2023-02-20 02:04:08 +00:00
colDataSetDouble(pColInfo, i, &v);
}
Feature/3.0 geometry (#21037) * Add GEOMETRY data type and make sql.c able to parse it. The GEMETRY works like BINARY so far. * add GEOMETRY type into gConvertTypes to fix some issues like DELETE calling * change some test cases to make sure no same timestamp is inserted, and add my smoketest.sh * Add a function MakePoint() and introduce a lib geometry * implement sql functions GeomFromText() and AsText() * Use GEOS *_r funcions instead for thread safety * Handle with TSDB_DATA_TYPE_GEOMETRY when INSERT geometry data by converting WKT. Add geosWrapper to wrap the basic GEOS functions for TDEngine. * refactor AsText and MakePoint functions to be like GeomFromText * Show WKT when print geometry data in screen Dump hex data when dump geometry data in a file * define TYPE_BYTES item for TSDB_DATA_TYPE_GEOMETRY, which casued some strange issues. * set number of decimals of WKT to 6 * Implement SQL function Intersects() * refactor geometry sql functions * Add geosErrMsgeHandler() to get the GEOS error detail * use threadlocal to instantiate SGeosContext call destroyGeosContext() only if the thread exists * remove SGeosContext *context param for all geometry functions since we use thread local one, so that all caller do not need to know the context. * Modify Intersects() to call PreparedIntersects() when one of param is a constant, which has higher performance. * rename prepareFn() to initCtxFn() to avoid confusion with PreparedFn * Add prefix "ST_" for all geometry functions * move getThreadLocalGeosCtx() and destroyThreadLocalGeosCtx() into util, so that all unit test tools can compile * Add unit test for geometry lib, only test MakePoint so far * refactor and enhance existing cases in geomFuncTest * implement NULL type and NULL value test for geomFuncTest * add test on geomFromText() * add unit test on AsText() in geomFuncTest * combine some makePointFunction test items * add intersectsFunctionTwoColumns test refactor on callGeomFromTextWrapper functions * enhance intersectsFunction test to add cases like input constant , NULL type, NULL value, or wrong content * add more cases into intersectsFunction test * Add basic test on geometry in system test * Add ST_GeomFromText and ST_AsText function test in system test on geometry * add ST_Intersects function test in system test on geometry * support to check expectedErrno in system test on geometry * adjust geomTest unit test and geometry system test * add geometry data type and functions in doc english version * implement touchesFunction() in geometry lib refactor geometry relation functions model * separate gemFuncTest into several src files * add unit test on touchesFunction * support sql function ST_Touches() add system test on ST_Touches * add docs for ST_Touches() * Add ST_Contains() * Add ST_Covers() * Add ST_Equals() * add swapAllowed param for geomRelationFunction() read geom2 earlier intead of at doGeosRelation() * Add ST_ContainsProperly() * build on windows * Merge from 3.0 to 3.0_geometry * change macro definition TSDB_DATA_TYPE_GEOMETRY as the last one for compatibility * change '\\NULL' to 'NULL' back in shellDumpFieldToFile() * add /usr/local/include into include directory * add /usr/local/inlcude and /usr/local/lib in cmake.platform for DARWIN
2023-05-24 07:36:46 +00:00
} else if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_GEOMETRY) {
2022-04-28 08:31:35 +00:00
char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2022-04-27 02:11:32 +00:00
STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen);
2022-04-28 08:31:35 +00:00
for (int32_t i = 0; i < numOfRows; ++i) {
2024-07-22 04:51:25 +00:00
code = colDataSetVal(pColInfo, i, tmp, false);
QUERY_CHECK_CODE(code, lino, _end);
2022-04-27 02:11:32 +00:00
}
2022-10-19 05:38:01 +00:00
taosMemoryFree(tmp);
}
2024-07-22 04:51:25 +00:00
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SqlFunctionCtx* pCtx = pExprSup->pCtx;
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
2022-04-28 08:31:35 +00:00
pCtx[i].order = order;
2022-05-17 08:53:55 +00:00
pCtx[i].input.numOfRows = pBlock->info.rows;
2022-04-28 08:31:35 +00:00
pCtx[i].pSrcBlock = pBlock;
2022-05-23 11:50:08 +00:00
pCtx[i].scanFlag = scanFlag;
2022-03-09 02:22:53 +00:00
SInputColumnInfoData* pInput = &pCtx[i].input;
2022-11-28 04:32:40 +00:00
pInput->uid = pBlock->info.id.uid;
2022-11-14 06:14:24 +00:00
pInput->colDataSMAIsSet = false;
SExprInfo* pOneExpr = &pExprSup->pExprInfo[i];
2024-07-22 04:51:25 +00:00
bool hasPk = pOneExpr->pExpr->nodeType == QUERY_NODE_FUNCTION && pOneExpr->pExpr->_function.pFunctNode->hasPk;
2024-03-15 10:34:41 +00:00
pCtx[i].hasPrimaryKey = hasPk;
int16_t tsParamIdx = (!hasPk) ? pOneExpr->base.numOfParams - 1 : pOneExpr->base.numOfParams - 2;
int16_t pkParamIdx = pOneExpr->base.numOfParams - 1;
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
2022-04-23 10:29:45 +00:00
SFunctParam* pFuncParam = &pOneExpr->base.pParam[j];
2022-03-29 09:30:44 +00:00
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId;
2022-04-23 10:29:45 +00:00
pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0;
2024-01-30 09:34:06 +00:00
pInput->blankFill = pBlock->info.blankFill;
2022-05-11 09:19:35 +00:00
// NOTE: the last parameter is the primary timestamp column
2022-07-15 05:25:15 +00:00
// todo: refactor this
2024-07-22 04:51:25 +00:00
2024-03-15 10:34:41 +00:00
if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == tsParamIdx)) {
2022-07-21 13:46:55 +00:00
pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data.
}
2024-03-15 10:34:41 +00:00
if (hasPk && (j == pkParamIdx)) {
pInput->pPrimaryKey = pInput->pData[j];
2024-07-22 04:51:25 +00:00
}
QUERY_CHECK_CONDITION((pInput->pData[j] != NULL), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
// todo avoid case: top(k, 12), 12 is the value parameter.
// sum(11), 11 is also the value parameter.
if (createDummyCol && pOneExpr->base.numOfParams == 1) {
pInput->totalRows = pBlock->info.rows;
pInput->numOfRows = pBlock->info.rows;
pInput->startRowIndex = 0;
2024-01-30 09:34:06 +00:00
pInput->blankFill = pBlock->info.blankFill;
2022-04-27 02:11:32 +00:00
code = doCreateConstantValColumnInfo(pInput, pFuncParam, j, pBlock->info.rows);
QUERY_CHECK_CODE(code, lino, _end);
}
2022-03-29 09:30:44 +00:00
}
}
2022-03-09 02:22:53 +00:00
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
2022-03-09 02:22:53 +00:00
}
2022-05-20 11:34:39 +00:00
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
2022-05-03 06:43:53 +00:00
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
2022-05-03 06:43:53 +00:00
// in case of timestamp column, always generated results.
int32_t functionId = pCtx->functionId;
if (functionId == -1) {
return false;
}
2023-02-22 02:22:10 +00:00
if (pCtx->scanFlag == PRE_SCAN) {
return fmIsRepeatScanFunc(pCtx->functionId);
}
if (isRowEntryCompleted(pResInfo)) {
return false;
2022-05-03 06:43:53 +00:00
}
return true;
}
2022-12-26 02:58:17 +00:00
static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type,
2022-05-03 06:43:53 +00:00
int32_t paramIndex, int32_t numOfRows) {
if (pInput->pData[paramIndex] == NULL) {
pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData));
if (pInput->pData[paramIndex] == NULL) {
return terrno;
2022-05-03 06:43:53 +00:00
}
2022-05-03 06:43:53 +00:00
// Set the correct column info (data type and bytes)
pInput->pData[paramIndex]->info.type = type;
pInput->pData[paramIndex]->info.bytes = tDataTypes[type].bytes;
}
2022-02-24 09:18:56 +00:00
2022-05-03 06:43:53 +00:00
SColumnDataAgg* da = NULL;
if (pInput->pColumnDataAgg[paramIndex] == NULL) {
da = taosMemoryCalloc(1, sizeof(SColumnDataAgg));
2024-08-05 04:22:25 +00:00
if (!da) {
return terrno;
}
2022-05-03 06:43:53 +00:00
pInput->pColumnDataAgg[paramIndex] = da;
if (da == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
2022-05-03 06:43:53 +00:00
da = pInput->pColumnDataAgg[paramIndex];
}
2022-05-03 06:43:53 +00:00
if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t v = pFuncParam->param.i;
2022-07-07 09:23:54 +00:00
*da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows};
2022-05-03 06:43:53 +00:00
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double v = pFuncParam->param.d;
2022-07-07 09:23:54 +00:00
*da = (SColumnDataAgg){.numOfNull = 0};
2022-05-03 06:43:53 +00:00
*(double*)&da->min = v;
*(double*)&da->max = v;
*(double*)&da->sum = v * numOfRows;
} else if (type == TSDB_DATA_TYPE_BOOL) { // todo validate this data type
bool v = pFuncParam->param.i;
2022-07-07 09:23:54 +00:00
*da = (SColumnDataAgg){.numOfNull = 0};
2022-05-03 06:43:53 +00:00
*(bool*)&da->min = 0;
*(bool*)&da->max = v;
*(bool*)&da->sum = v * numOfRows;
} else if (type == TSDB_DATA_TYPE_TIMESTAMP) {
// do nothing
} else {
2022-12-26 02:58:17 +00:00
qError("invalid constant type for sma info");
}
2022-05-03 06:43:53 +00:00
return TSDB_CODE_SUCCESS;
}
2024-07-22 04:51:25 +00:00
int32_t setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t numOfRows = pBlock->info.rows;
SInputColumnInfoData* pInput = &pCtx->input;
pInput->numOfRows = numOfRows;
pInput->totalRows = numOfRows;
if (pBlock->pBlockAgg != NULL) {
2022-11-14 06:14:24 +00:00
pInput->colDataSMAIsSet = true;
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) {
int32_t slotId = pFuncParam->pCol->slotId;
2024-06-04 03:39:47 +00:00
pInput->pColumnDataAgg[j] = &pBlock->pBlockAgg[slotId];
2024-06-13 11:14:44 +00:00
if (pInput->pColumnDataAgg[j]->colId == -1) {
2022-11-14 06:14:24 +00:00
pInput->colDataSMAIsSet = false;
}
// Here we set the column info data since the data type for each column data is required, but
// the data in the corresponding SColumnInfoData will not be used.
pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
2024-07-22 04:51:25 +00:00
code = doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
QUERY_CHECK_CODE(code, lino, _end);
}
}
} else {
2022-11-14 06:14:24 +00:00
pInput->colDataSMAIsSet = false;
}
2024-07-22 04:51:25 +00:00
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
/////////////////////////////////////////////////////////////////////////////////////////////
2023-05-19 01:07:35 +00:00
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key) {
2022-07-21 13:46:55 +00:00
STimeWindow win = {0};
2023-05-19 01:07:35 +00:00
win.skey = taosTimeTruncate(key, pInterval);
/*
2022-02-24 09:18:56 +00:00
* if the realSkey > INT64_MAX - pInterval->interval, the query duration between
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
*/
win.ekey = taosTimeGetIntervalEnd(win.skey, pInterval);
if (win.ekey < win.skey) {
win.ekey = INT64_MAX;
}
return win;
}
2024-07-22 04:51:25 +00:00
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t* rowEntryInfoOffset) {
2022-08-11 02:19:06 +00:00
bool init = false;
for (int32_t i = 0; i < numOfOutput; ++i) {
2022-06-17 15:23:37 +00:00
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset);
2022-08-11 02:19:06 +00:00
if (init) {
continue;
}
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
continue;
}
if (pCtx[i].isPseudoFunc) {
continue;
}
if (!pResInfo->initialized) {
if (pCtx[i].functionId != -1) {
int32_t code = pCtx[i].fpSet.init(&pCtx[i], pResInfo);
2024-07-29 02:35:06 +00:00
if (code != TSDB_CODE_SUCCESS && fmIsUserDefinedFunc(pCtx[i].functionId)) {
2024-06-13 08:57:49 +00:00
pResInfo->initialized = false;
return TSDB_CODE_UDF_FUNC_EXEC_FAILURE;
} else if (code != TSDB_CODE_SUCCESS) {
return code;
2024-06-13 08:57:49 +00:00
}
} else {
pResInfo->initialized = true;
}
2022-08-11 02:19:06 +00:00
} else {
init = true;
}
}
2024-06-13 08:57:49 +00:00
return TSDB_CODE_SUCCESS;
}
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (pResInfo == NULL) {
continue;
}
pResInfo->initialized = false;
pResInfo->numOfRes = 0;
pResInfo->isNullRes = 0;
pResInfo->complete = false;
}
}
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) {
2024-07-22 04:51:25 +00:00
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
2022-11-04 14:13:40 +00:00
if (pFilterInfo == NULL || pBlock->info.rows == 0) {
return TSDB_CODE_SUCCESS;
2022-07-19 08:22:05 +00:00
}
SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock};
SColumnInfoData* p = NULL;
2022-04-06 09:59:08 +00:00
2024-07-22 04:51:25 +00:00
code = filterSetDataFromSlotId(pFilterInfo, &param1);
QUERY_CHECK_CODE(code, lino, _err);
2022-09-20 02:41:35 +00:00
int32_t status = 0;
code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status);
2024-07-22 04:51:25 +00:00
QUERY_CHECK_CODE(code, lino, _err);
2024-07-29 02:35:06 +00:00
code = extractQualifiedTupleByFilterResult(pBlock, p, status);
QUERY_CHECK_CODE(code, lino, _err);
2022-09-20 02:41:35 +00:00
if (pColMatchInfo != NULL) {
size_t size = taosArrayGetSize(pColMatchInfo->pList);
2022-11-04 14:13:40 +00:00
for (int32_t i = 0; i < size; ++i) {
2022-10-24 08:44:44 +00:00
SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i);
2024-08-05 08:09:01 +00:00
QUERY_CHECK_NULL(pInfo, code, lino, _err, terrno);
if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
2022-10-24 08:44:44 +00:00
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, pInfo->dstSlotId);
2024-08-05 08:09:01 +00:00
QUERY_CHECK_NULL(pColData, code, lino, _err, terrno);
if (pColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
2024-07-22 04:51:25 +00:00
code = blockDataUpdateTsWindow(pBlock, pInfo->dstSlotId);
QUERY_CHECK_CODE(code, lino, _err);
break;
}
}
}
}
code = TSDB_CODE_SUCCESS;
_err:
2024-09-10 08:56:36 +00:00
blockDataCheck(pBlock, true);
colDataDestroy(p);
taosMemoryFree(p);
return code;
}
2024-07-29 02:35:06 +00:00
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) {
int32_t code = TSDB_CODE_SUCCESS;
2023-01-03 14:45:02 +00:00
int8_t* pIndicator = (int8_t*)p->pData;
2022-09-20 05:40:40 +00:00
if (status == FILTER_RESULT_ALL_QUALIFIED) {
// here nothing needs to be done
2022-09-20 05:40:40 +00:00
} else if (status == FILTER_RESULT_NONE_QUALIFIED) {
2024-07-29 02:35:06 +00:00
code = trimDataBlock(pBlock, pBlock->info.rows, NULL);
pBlock->info.rows = 0;
} else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) {
2024-07-29 02:35:06 +00:00
code = trimDataBlock(pBlock, pBlock->info.rows, (bool*)pIndicator);
2022-09-20 05:40:40 +00:00
} else {
qError("unknown filter result type: %d", status);
2022-04-06 09:59:08 +00:00
}
2024-07-29 02:35:06 +00:00
return code;
2022-04-06 09:59:08 +00:00
}
2023-01-03 09:41:10 +00:00
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) {
bool returnNotNull = false;
for (int32_t j = 0; j < numOfExprs; ++j) {
SResultRowEntryInfo* pResInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (!isRowEntryInitialized(pResInfo)) {
continue;
2023-03-25 06:43:17 +00:00
} else {
}
if (pRow->numOfRows < pResInfo->numOfRes) {
pRow->numOfRows = pResInfo->numOfRes;
}
if (pCtx[j].isNotNullFunc) {
returnNotNull = true;
}
}
2022-08-06 22:08:12 +00:00
// if all expr skips all blocks, e.g. all null inputs for max function, output one row in final result.
// except for first/last, which require not null output, output no rows
if (pRow->numOfRows == 0 && !returnNotNull) {
pRow->numOfRows = 1;
}
}
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) {
2024-07-22 04:51:25 +00:00
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
2022-06-09 11:21:52 +00:00
for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
2022-06-09 11:21:52 +00:00
if (pCtx[j].fpSet.finalize) {
2024-07-21 16:06:47 +00:00
if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_key") == 0 ||
2024-07-29 02:35:06 +00:00
strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_group_const_value") == 0) {
// for groupkey along with functions that output multiple lines(e.g. Histogram)
// need to match groupkey result for each output row of that function.
if (pCtx[j].resultInfo->numOfRes != 0) {
pCtx[j].resultInfo->numOfRes = pRow->numOfRows;
}
}
2024-07-22 04:51:25 +00:00
code = blockDataEnsureCapacity(pBlock, pBlock->info.rows + pCtx[j].resultInfo->numOfRes);
QUERY_CHECK_CODE(code, lino, _end);
code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TSDB_CODE_SUCCESS != code) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
QUERY_CHECK_CODE(code, lino, _end);
2022-06-09 11:21:52 +00:00
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing
2022-06-09 11:21:52 +00:00
} else {
// expand the result into multiple rows. E.g., _wstart, top(k, 20)
// the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
2022-06-09 11:21:52 +00:00
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
2024-08-05 08:09:01 +00:00
QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno);
2022-06-09 11:21:52 +00:00
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
for (int32_t k = 0; k < pRow->numOfRows; ++k) {
2024-07-22 04:51:25 +00:00
code = colDataSetVal(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes);
QUERY_CHECK_CODE(code, lino, _end);
2022-06-09 11:21:52 +00:00
}
}
}
2024-07-22 04:51:25 +00:00
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
// todo refactor. SResultRow has direct pointer in miainfo
2024-07-23 02:50:16 +00:00
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
2024-07-29 02:35:06 +00:00
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
2023-01-16 09:55:35 +00:00
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
SqlFunctionCtx* pCtx = pSup->pCtx;
SExprInfo* pExprInfo = pSup->pExprInfo;
const int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset);
if (pRow->numOfRows == 0) {
releaseBufPage(pBuf, page);
2024-07-29 02:35:06 +00:00
return;
}
int32_t size = pBlock->info.capacity;
while (pBlock->info.rows + pRow->numOfRows > size) {
size = size * 1.25;
}
int32_t code = blockDataEnsureCapacity(pBlock, size);
if (TAOS_FAILED(code)) {
releaseBufPage(pBuf, page);
qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
code = copyResultrowToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
if (TAOS_FAILED(code)) {
releaseBufPage(pBuf, page);
qError("%s copy result row to datablock failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
2022-06-09 11:21:52 +00:00
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
2022-06-09 11:21:52 +00:00
}
2024-07-22 04:51:25 +00:00
void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
size_t keyLen = 0;
int32_t numOfRows = tSimpleHashGetSize(pHashmap);
// begin from last iter
void* pData = pGroupResInfo->dataPos;
int32_t iter = pGroupResInfo->iter;
while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
void* key = tSimpleHashGetKey(pData, &keyLen);
SResultRowPosition* pos = pData;
uint64_t groupId = *(uint64_t*)key;
SFilePage* page = getBufPage(pBuf, pos->pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
pGroupResInfo->iter = iter;
pGroupResInfo->dataPos = pData;
releaseBufPage(pBuf, page);
continue;
}
if (!ignoreGroup) {
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = groupId;
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != groupId) {
releaseBufPage(pBuf, page);
break;
}
}
}
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - iter) > 1 ? 1 : 0);
2024-07-22 04:51:25 +00:00
code = blockDataEnsureCapacity(pBlock, newSize);
QUERY_CHECK_CODE(code, lino, _end);
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
pBlock->info.capacity, GET_TASKID(pTaskInfo));
// todo set the pOperator->resultInfo size
}
pGroupResInfo->index += 1;
pGroupResInfo->iter = iter;
pGroupResInfo->dataPos = pData;
code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
QUERY_CHECK_CODE(code, lino, _end);
pBlock->info.rows += pRow->numOfRows;
if (pBlock->info.rows >= threshold) {
break;
}
}
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
2024-07-22 04:51:25 +00:00
pBlock->info.id.groupId);
pBlock->info.dataLoad = 1;
2024-07-22 04:51:25 +00:00
code = blockDataUpdateTsWindow(pBlock, 0);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
}
2024-07-22 04:51:25 +00:00
void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
2022-04-28 08:31:35 +00:00
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
2022-08-08 09:56:26 +00:00
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one
if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1;
releaseBufPage(pBuf, page);
continue;
}
2023-05-16 06:12:48 +00:00
if (!ignoreGroup) {
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pPos->groupId;
} else {
// current value belongs to different group, it can't be packed into one datablock
if (pBlock->info.id.groupId != pPos->groupId) {
releaseBufPage(pBuf, page);
break;
}
}
}
2022-05-17 04:36:32 +00:00
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - i) > 1 ? 1 : 0);
2024-07-22 04:51:25 +00:00
code = blockDataEnsureCapacity(pBlock, newSize);
QUERY_CHECK_CODE(code, lino, _end);
2023-08-18 08:54:33 +00:00
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
pBlock->info.capacity, GET_TASKID(pTaskInfo));
2023-03-22 13:19:56 +00:00
// todo set the pOperator->resultInfo size
}
pGroupResInfo->index += 1;
code = copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
releaseBufPage(pBuf, page);
QUERY_CHECK_CODE(code, lino, _end);
pBlock->info.rows += pRow->numOfRows;
if (pBlock->info.rows >= threshold) {
break;
}
}
2023-03-29 02:46:56 +00:00
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
2022-11-28 04:32:40 +00:00
pBlock->info.id.groupId);
2022-12-12 10:33:44 +00:00
pBlock->info.dataLoad = 1;
2024-07-22 04:51:25 +00:00
code = blockDataUpdateTsWindow(pBlock, 0);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
}
2022-05-23 11:50:08 +00:00
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = pbInfo->pRes;
2022-05-03 07:04:34 +00:00
// set output datablock version
pBlock->info.version = pTaskInfo->version;
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
return;
}
// clear the existed group id
2022-11-28 04:32:40 +00:00
pBlock->info.id.groupId = 0;
if (!pbInfo->mergeResultBlock) {
2023-08-18 08:54:33 +00:00
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
false);
} else {
2022-08-12 03:22:41 +00:00
while (hasRemainResults(pGroupResInfo)) {
2023-08-18 08:54:33 +00:00
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
true);
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
break;
}
// clearing group id to continue to merge data that belong to different groups
2022-11-28 04:32:40 +00:00
pBlock->info.id.groupId = 0;
}
// clear the group id info in SSDataBlock, since the client does not need it
2022-11-28 04:32:40 +00:00
pBlock->info.id.groupId = 0;
}
}
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
2022-08-04 13:31:13 +00:00
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExpr[i];
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_COLUMN) {
taosMemoryFreeClear(pExprInfo->base.pParam[j].pCol);
2022-11-20 02:15:26 +00:00
} else if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
taosVariantDestroy(&pExprInfo->base.pParam[j].param);
2022-07-13 15:15:58 +00:00
}
}
2022-08-04 13:31:13 +00:00
taosMemoryFree(pExprInfo->base.pParam);
taosMemoryFree(pExprInfo->pExpr);
2022-06-04 11:19:49 +00:00
}
}
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
*defaultPgsz = 4096;
2023-06-09 05:49:25 +00:00
uint32_t last = *defaultPgsz;
while (*defaultPgsz < rowSize * 4) {
*defaultPgsz <<= 1u;
2023-06-09 05:49:25 +00:00
if (*defaultPgsz < last) {
return TSDB_CODE_INVALID_PARA;
}
last = *defaultPgsz;
}
// The default buffer for each operator in query is 10MB.
// at least four pages need to be in buffer
// TODO: make this variable to be configurable.
*defaultBufsz = 4096 * 2560;
if ((*defaultBufsz) <= (*defaultPgsz)) {
(*defaultBufsz) = (*defaultPgsz) * 4;
2023-06-09 05:49:25 +00:00
if (*defaultBufsz < ((int64_t)(*defaultPgsz)) * 4) {
return TSDB_CODE_INVALID_PARA;
}
}
return 0;
}
2022-07-21 13:46:55 +00:00
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) {
2023-02-22 06:32:39 +00:00
if (numOfRows == 0) {
numOfRows = 4096;
}
pResultInfo->capacity = numOfRows;
pResultInfo->threshold = numOfRows * 0.75;
if (pResultInfo->threshold == 0) {
pResultInfo->threshold = numOfRows;
}
}
2022-06-18 06:49:27 +00:00
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) {
pInfo->pRes = pBlock;
initResultRowInfo(&pInfo->resultRowInfo);
}
2024-07-22 04:51:25 +00:00
static void destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_t numOfOutput) {
if (pCtx == NULL) {
2024-07-22 04:51:25 +00:00
return;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
if (pExpr != NULL) {
SExprInfo* pExprInfo = &pExpr[i];
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
if (pExprInfo->base.pParam[j].type == FUNC_PARAM_TYPE_VALUE) {
taosMemoryFree(pCtx[i].input.pData[j]);
taosMemoryFree(pCtx[i].input.pColumnDataAgg[j]);
}
}
}
for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
taosVariantDestroy(&pCtx[i].param[j].param);
}
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
taosMemoryFree(pCtx[i].input.pData);
taosMemoryFree(pCtx[i].input.pColumnDataAgg);
2023-01-05 01:38:01 +00:00
if (pCtx[i].udfName != NULL) {
taosMemoryFree(pCtx[i].udfName);
}
}
taosMemoryFreeClear(pCtx);
2024-07-22 04:51:25 +00:00
return;
}
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore) {
2022-06-18 06:49:27 +00:00
pSup->pExprInfo = pExprInfo;
pSup->numOfExprs = numOfExpr;
if (pSup->pExprInfo != NULL) {
pSup->pCtx = createSqlFunctionCtx(pExprInfo, numOfExpr, &pSup->rowEntryInfoOffset, pStore);
if (pSup->pCtx == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
2022-06-18 06:49:27 +00:00
}
return TSDB_CODE_SUCCESS;
2022-06-18 06:49:27 +00:00
}
void cleanupExprSupp(SExprSupp* pSupp) {
destroySqlFunctionCtx(pSupp->pCtx, pSupp->pExprInfo, pSupp->numOfExprs);
if (pSupp->pExprInfo != NULL) {
destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs);
2022-08-04 13:27:01 +00:00
taosMemoryFreeClear(pSupp->pExprInfo);
}
2022-10-11 11:37:06 +00:00
if (pSupp->pFilterInfo != NULL) {
filterFreeInfo(pSupp->pFilterInfo);
pSupp->pFilterInfo = NULL;
}
taosMemoryFree(pSupp->rowEntryInfoOffset);
}
2024-07-22 04:51:25 +00:00
void cleanupBasicInfo(SOptrBasicInfo* pInfo) {
blockDataDestroy(pInfo->pRes);
pInfo->pRes = NULL;
}
bool groupbyTbname(SNodeList* pGroupList) {
2022-07-29 05:54:14 +00:00
bool bytbname = false;
if (LIST_LENGTH(pGroupList) == 1) {
2022-07-29 05:54:14 +00:00
SNode* p = nodesListGetNode(pGroupList, 0);
2024-08-05 08:09:01 +00:00
if (!p) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return false;
}
2022-07-29 05:54:14 +00:00
if (p->type == QUERY_NODE_FUNCTION) {
// partition by tbname/group by tbname
bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0);
}
}
return bytbname;
}
2023-04-06 06:40:01 +00:00
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle) {
2022-06-06 12:59:36 +00:00
switch (pNode->type) {
2022-07-06 08:29:51 +00:00
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
if (NULL == pInserterParam) {
return terrno;
2022-07-06 08:29:51 +00:00
}
pInserterParam->readHandle = readHandle;
2022-07-10 06:48:16 +00:00
2022-07-06 08:29:51 +00:00
*pParam = pInserterParam;
break;
}
2022-06-06 12:59:36 +00:00
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam));
2022-06-06 12:59:36 +00:00
if (NULL == pDeleterParam) {
return terrno;
2022-06-06 12:59:36 +00:00
}
2024-07-23 09:31:41 +00:00
SArray* pInfoList = NULL;
int32_t code = getTableListInfo(pTask, &pInfoList);
if (code != TSDB_CODE_SUCCESS || pInfoList == NULL) {
taosMemoryFree(pDeleterParam);
2024-07-23 09:31:41 +00:00
return code;
}
2023-04-06 06:40:01 +00:00
STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
taosArrayDestroy(pInfoList);
pDeleterParam->suid = tableListGetSuid(pTableListInfo);
2022-10-30 14:13:49 +00:00
// TODO extract uid list
int32_t numOfTables = 0;
code = tableListGetSize(pTableListInfo, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
taosMemoryFree(pDeleterParam);
return code;
}
2023-04-06 06:40:01 +00:00
pDeleterParam->pUidList = taosArrayInit(numOfTables, sizeof(uint64_t));
2022-06-06 12:59:36 +00:00
if (NULL == pDeleterParam->pUidList) {
taosMemoryFree(pDeleterParam);
2024-09-20 05:23:44 +00:00
return terrno;
2022-06-06 12:59:36 +00:00
}
2022-10-30 14:13:49 +00:00
2023-04-06 06:40:01 +00:00
for (int32_t i = 0; i < numOfTables; ++i) {
STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i);
2024-08-06 01:24:31 +00:00
if (!pTable) {
taosArrayDestroy(pDeleterParam->pUidList);
taosMemoryFree(pDeleterParam);
return TSDB_CODE_OUT_OF_MEMORY;
}
2024-07-22 04:51:25 +00:00
void* tmp = taosArrayPush(pDeleterParam->pUidList, &pTable->uid);
if (!tmp) {
taosArrayDestroy(pDeleterParam->pUidList);
taosMemoryFree(pDeleterParam);
2024-09-20 05:23:44 +00:00
return terrno;
2024-07-22 04:51:25 +00:00
}
2022-06-06 12:59:36 +00:00
}
*pParam = pDeleterParam;
break;
}
default:
break;
}
return TSDB_CODE_SUCCESS;
}
2023-06-20 09:30:59 +00:00
void streamOpReleaseState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.releaseStreamStateFn) {
downstream->fpSet.releaseStreamStateFn(downstream);
}
}
void streamOpReloadState(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
if (downstream->fpSet.reloadStreamStateFn) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
2023-06-20 09:30:59 +00:00
}
2023-08-14 06:24:21 +00:00
2023-08-10 03:29:14 +00:00
void freeOperatorParamImpl(SOperatorParam* pParam, SOperatorParamType type) {
int32_t childrenNum = taosArrayGetSize(pParam->pChildren);
for (int32_t i = 0; i < childrenNum; ++i) {
SOperatorParam* pChild = taosArrayGetP(pParam->pChildren, i);
freeOperatorParam(pChild, type);
}
taosArrayDestroy(pParam->pChildren);
taosMemoryFree(pParam->value);
2024-07-22 04:51:25 +00:00
2023-08-10 03:29:14 +00:00
taosMemoryFree(pParam);
}
void freeExchangeGetBasicOperatorParam(void* pParam) {
SExchangeOperatorBasicParam* pBasic = (SExchangeOperatorBasicParam*)pParam;
taosArrayDestroy(pBasic->uidList);
}
void freeExchangeGetOperatorParam(SOperatorParam* pParam) {
SExchangeOperatorParam* pExcParam = (SExchangeOperatorParam*)pParam->value;
if (pExcParam->multiParams) {
SExchangeOperatorBatchParam* pExcBatch = (SExchangeOperatorBatchParam*)pParam->value;
tSimpleHashCleanup(pExcBatch->pBatchs);
} else {
freeExchangeGetBasicOperatorParam(&pExcParam->basic);
}
freeOperatorParamImpl(pParam, OP_GET_PARAM);
}
2024-07-22 04:51:25 +00:00
void freeExchangeNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
2023-08-10 03:29:14 +00:00
2024-07-22 04:51:25 +00:00
void freeGroupCacheGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
2023-08-10 03:29:14 +00:00
2024-07-22 04:51:25 +00:00
void freeGroupCacheNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
2023-08-10 03:29:14 +00:00
2024-07-22 04:51:25 +00:00
void freeMergeJoinGetOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_GET_PARAM); }
2023-08-10 03:29:14 +00:00
2024-07-22 04:51:25 +00:00
void freeMergeJoinNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
2023-08-10 03:29:14 +00:00
void freeTableScanGetOperatorParam(SOperatorParam* pParam) {
STableScanOperatorParam* pTableScanParam = (STableScanOperatorParam*)pParam->value;
taosArrayDestroy(pTableScanParam->pUidList);
freeOperatorParamImpl(pParam, OP_GET_PARAM);
}
2024-07-22 04:51:25 +00:00
void freeTableScanNotifyOperatorParam(SOperatorParam* pParam) { freeOperatorParamImpl(pParam, OP_NOTIFY_PARAM); }
2023-08-10 03:29:14 +00:00
void freeOperatorParam(SOperatorParam* pParam, SOperatorParamType type) {
if (NULL == pParam) {
return;
}
2024-07-22 04:51:25 +00:00
2023-08-10 03:29:14 +00:00
switch (pParam->opType) {
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
type == OP_GET_PARAM ? freeExchangeGetOperatorParam(pParam) : freeExchangeNotifyOperatorParam(pParam);
break;
case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:
type == OP_GET_PARAM ? freeGroupCacheGetOperatorParam(pParam) : freeGroupCacheNotifyOperatorParam(pParam);
break;
case QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN:
type == OP_GET_PARAM ? freeMergeJoinGetOperatorParam(pParam) : freeMergeJoinNotifyOperatorParam(pParam);
break;
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
type == OP_GET_PARAM ? freeTableScanGetOperatorParam(pParam) : freeTableScanNotifyOperatorParam(pParam);
break;
default:
qError("unsupported op %d param, type %d", pParam->opType, type);
break;
}
}
void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree) {
2024-07-22 04:51:25 +00:00
SOperatorParam** ppParam = NULL;
2023-08-10 03:29:14 +00:00
SOperatorParam*** pppDownstramParam = NULL;
switch (type) {
case OP_GET_PARAM:
ppParam = &pOperator->pOperatorGetParam;
pppDownstramParam = &pOperator->pDownstreamGetParams;
break;
case OP_NOTIFY_PARAM:
ppParam = &pOperator->pOperatorNotifyParam;
pppDownstramParam = &pOperator->pDownstreamNotifyParams;
break;
default:
return;
}
if (*ppParam) {
freeOperatorParam(*ppParam, type);
*ppParam = NULL;
}
if (*pppDownstramParam) {
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
if ((*pppDownstramParam)[i]) {
freeOperatorParam((*pppDownstramParam)[i], type);
(*pppDownstramParam)[i] = NULL;
}
}
if (allFree) {
taosMemoryFreeClear(*pppDownstramParam);
}
}
}
2024-07-24 09:08:08 +00:00
FORCE_INLINE int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
SSDataBlock** pResBlock) {
QRY_PARAM_CHECK(pResBlock);
2024-07-24 09:08:08 +00:00
int32_t code = 0;
2023-08-10 03:29:14 +00:00
if (pOperator->pDownstreamGetParams && pOperator->pDownstreamGetParams[idx]) {
qDebug("DynOp: op %s start to get block from downstream %s", pOperator->name, pOperator->pDownstream[idx]->name);
2024-07-24 09:08:08 +00:00
code = pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx],
pOperator->pDownstreamGetParams[idx], pResBlock);
if (clearParam && (code == 0)) {
2023-08-10 03:29:14 +00:00
freeOperatorParam(pOperator->pDownstreamGetParams[idx], OP_GET_PARAM);
pOperator->pDownstreamGetParams[idx] = NULL;
}
2024-09-24 10:19:47 +00:00
if (code) {
qError("failed to get next data block from upstream at %s, line:%d code:%s", __func__, __LINE__, tstrerror(code));
}
2024-07-24 09:08:08 +00:00
return code;
2023-08-10 03:29:14 +00:00
}
2024-07-22 04:51:25 +00:00
2024-08-27 09:04:44 +00:00
code = pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx], pResBlock);
2024-09-24 10:19:47 +00:00
if (code) {
qError("failed to get next data block from upstream at %s, %d code:%s", __func__, __LINE__, tstrerror(code));
}
2024-08-27 09:04:44 +00:00
return code;
2023-08-10 03:29:14 +00:00
}
2023-08-14 06:24:21 +00:00
bool compareVal(const char* v, const SStateKeys* pKey) {
if (IS_VAR_DATA_TYPE(pKey->type)) {
if (varDataLen(v) != varDataLen(pKey->pData)) {
return false;
} else {
return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
}
} else {
return memcmp(pKey->pData, v, pKey->bytes) == 0;
}
}