/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "filter.h" #include "function.h" #include "nodes.h" #include "os.h" #include "querynodes.h" #include "tfill.h" #include "tname.h" #include "executorInt.h" #include "index.h" #include "operator.h" #include "query.h" #include "querytask.h" #include "tcompare.h" #include "tdatablock.h" #include "tglobal.h" #include "thash.h" #include "ttypes.h" typedef struct { bool hasAgg; int32_t numOfRows; int32_t startOffset; } SFunctionCtxStatus; typedef struct SAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; STableQueryInfo* current; uint64_t groupId; SGroupResInfo groupResInfo; SExprSupp scalarExprSup; bool groupKeyOptimized; bool hasValidBlock; SSDataBlock* pNewGroupBlock; bool hasCountFunc; SOperatorInfo* pOperator; bool cleanGroupResInfo; } SAggOperatorInfo; static void destroyAggOperatorInfo(void* param); static int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); static int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes); static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size); static int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); static int32_t resetAggregateOperatorState(SOperatorInfo* pOper); int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_PARAM_CHECK(pOptrInfo); int32_t lino = 0; int32_t code = 0; int32_t num = 0; SExprInfo* pExprInfo = NULL; int32_t numOfScalarExpr = 0; SExprInfo* pScalarExprInfo = NULL; SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = terrno; goto _error; } pOperator->exprSupp.hasWindowOrGroup = false; SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); code = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &pExprInfo, &num); TSDB_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); if (pAggNode->pExprs != NULL) { code = createExprInfo(pAggNode->pExprs, NULL, &pScalarExprInfo, &numOfScalarExpr); TSDB_CHECK_CODE(code, lino, _error); } code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0, GET_STM_RTINFO(pTaskInfo)); TSDB_CHECK_CODE(code, lino, _error); pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized; pInfo->groupId = UINT64_MAX; pInfo->binfo.inputTsOrder = pAggNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; pInfo->hasCountFunc = pAggNode->hasCountLikeFunc; pInfo->pOperator = pOperator; pInfo->cleanGroupResInfo = false; setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, !pAggNode->node.forceCreateNonBlockingOptr, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getAggregateResultNext, NULL, destroyAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorResetStateFn(pOperator, resetAggregateOperatorState); pOperator->pPhyNode = pAggNode; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp; pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup; } code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; } *pOptrInfo = pOperator; return TSDB_CODE_SUCCESS; _error: qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } void destroyAggOperatorInfo(void* param) { if (param == NULL) { return; } SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); if (pInfo->pOperator) { cleanupResultInfo(pInfo->pOperator->pTaskInfo, &pInfo->pOperator->exprSupp, &pInfo->groupResInfo, &pInfo->aggSup, pInfo->cleanGroupResInfo); pInfo->pOperator = NULL; } cleanupAggSup(&pInfo->aggSup); cleanupExprSuppWithoutFilter(&pInfo->scalarExprSup); cleanupGroupResInfo(&pInfo->groupResInfo); taosMemoryFreeClear(param); } /** * @brief get blocks from downstream and fill results into groupedRes after aggragation * @retval false if no more groups * @retval true if there could have new groups coming * @note if pOperator.blocking is true, scan all blocks from downstream, all groups are handled * if false, fill results of ONE GROUP * */ static bool nextGroupedResult(SOperatorInfo* pOperator) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; if(!pAggInfo) { qError("function:%s, pAggInfo is NULL", __func__); return false; } if (pOperator->blocking && pAggInfo->hasValidBlock) { return false; } SExprSupp* pSup = &pOperator->exprSupp; int64_t st = taosGetTimestampUs(); int32_t order = pAggInfo->binfo.inputTsOrder; SSDataBlock* pBlock = pAggInfo->pNewGroupBlock; pAggInfo->cleanGroupResInfo = false; if (pBlock) { pAggInfo->pNewGroupBlock = NULL; tSimpleHashClear(pAggInfo->aggSup.pResultRowHashTable); code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); QUERY_CHECK_CODE(code, lino, _end); code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); QUERY_CHECK_CODE(code, lino, _end); code = doAggregateImpl(pOperator, pSup->pCtx); QUERY_CHECK_CODE(code, lino, _end); } while (1) { bool blockAllocated = false; pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { if (!pAggInfo->hasValidBlock) { code = createDataBlockForEmptyInput(pOperator, &pBlock); QUERY_CHECK_CODE(code, lino, _end); if (pBlock == NULL) { break; } blockAllocated = true; } else { break; } } pAggInfo->hasValidBlock = true; pAggInfo->binfo.pRes->info.scanFlag = pBlock->info.scanFlag; // there is an scalar expression that needs to be calculated before apply the group aggregation. if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) { SExprSupp* pSup1 = &pAggInfo->scalarExprSup; code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } } // if non-blocking mode and new group arrived, save the block and break if (!pOperator->blocking && pAggInfo->groupId != UINT64_MAX && pBlock->info.id.groupId != pAggInfo->groupId) { pAggInfo->pNewGroupBlock = pBlock; break; } // the pDataBlock are always the same one, no need to call this again code = setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } code = setInputDataBlock(pSup, pBlock, order, pBlock->info.scanFlag, true); if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } code = doAggregateImpl(pOperator, pSup->pCtx); if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); } destroyDataBlockForEmptyInput(blockAllocated, &pBlock); } // the downstream operator may return with error code, so let's check the code before generating results. if (pTaskInfo->code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); QUERY_CHECK_CODE(code, lino, _end); pAggInfo->cleanGroupResInfo = true; _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } return pBlock != NULL; } int32_t getAggregateResultNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; if (pOperator->status == OP_EXEC_DONE) { (*ppRes) = NULL; return code; } SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; bool hasNewGroups = false; do { hasNewGroups = nextGroupedResult(pOperator); code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); while (1) { doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); QUERY_CHECK_CODE(code, lino, _end); if (!hasRemainResults(&pAggInfo->groupResInfo)) { if (!hasNewGroups) setOperatorCompleted(pOperator); break; } if (pInfo->pRes->info.rows > 0) { break; } } } while (pInfo->pRes->info.rows == 0 && hasNewGroups); size_t rows = blockDataGetNumOfRows(pInfo->pRes); pOperator->resultInfo.totalRows += rows; _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); } (*ppRes) = (rows == 0) ? NULL : pInfo->pRes; return code; } static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; int32_t code = getAggregateResultNext(pOperator, &pRes); return pRes; } int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { int32_t code = TSDB_CODE_SUCCESS; if (!pOperator || (pOperator->exprSupp.numOfExprs > 0 && pCtx == NULL)) { qError("%s failed at line %d since pCtx is NULL.", __func__, __LINE__); return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { // todo add a dummy function to avoid process check if (pCtx[k].fpSet.process == NULL) { continue; } if ((&pCtx[k])->input.pData[0] == NULL) { code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; qError("%s aggregate function error happens, input data is NULL.", GET_TASKID(pOperator->pTaskInfo)); } else { code = pCtx[k].fpSet.process(&pCtx[k]); } if (code != TSDB_CODE_SUCCESS) { if (pCtx[k].fpSet.cleanup != NULL) { pCtx[k].fpSet.cleanup(&pCtx[k]); } qError("%s aggregate function error happens, code:%s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); return code; } } } return TSDB_CODE_SUCCESS; } static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SSDataBlock* pBlock = NULL; if (!tsCountAlwaysReturnValue) { return TSDB_CODE_SUCCESS; } SAggOperatorInfo* pAggInfo = pOperator->info; if (pAggInfo->groupKeyOptimized) { return TSDB_CODE_SUCCESS; } SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT || (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) { return TSDB_CODE_SUCCESS; } SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; if (!pAggInfo->hasCountFunc) { return TSDB_CODE_SUCCESS; } code = createDataBlock(&pBlock); if (code) { return code; } pBlock->info.rows = 1; pBlock->info.capacity = 0; for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { SColumnInfoData colInfo = {0}; colInfo.hasNull = true; colInfo.info.type = TSDB_DATA_TYPE_NULL; colInfo.info.bytes = 1; SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i]; for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { int32_t slotId = pFuncParam->pCol->slotId; int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); if (slotId >= numOfCols) { code = taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1); QUERY_CHECK_CODE(code, lino, _end); for (int32_t k = numOfCols; k < slotId + 1; ++k) { void* tmp = taosArrayPush(pBlock->pDataBlock, &colInfo); QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); } } } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { // do nothing } } } code = blockDataEnsureCapacity(pBlock, pBlock->info.rows); QUERY_CHECK_CODE(code, lino, _end); for (int32_t i = 0; i < blockDataGetNumOfCols(pBlock); ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno); colDataSetNULL(pColInfoData, 0); } *ppBlock = pBlock; _end: if (code != TSDB_CODE_SUCCESS) { blockDataDestroy(pBlock); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); } return code; } void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { if (!blockAllocated) { return; } blockDataDestroy(*ppBlock); *ppBlock = NULL; } int32_t setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { int32_t code = TSDB_CODE_SUCCESS; SAggOperatorInfo* pAggInfo = pOperator->info; if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { return code; } code = doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); // record the current active group id pAggInfo->groupId = groupId; return code; } int32_t doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { // for simple group by query without interval, all the tables belong to one group result. int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SAggOperatorInfo* pAggInfo = pOperator->info; SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true); if (pResultRow == NULL || pTaskInfo->code != 0) { code = pTaskInfo->code; lino = __LINE__; goto _end; } /* * not assign result buffer yet, add new result buffer * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { code = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); QUERY_CHECK_CODE(code, lino, _end); } code = setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); QUERY_CHECK_CODE(code, lino, _end); _end: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } return code; } // a new buffer page for each table. Needs to opt this design int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) { if (pWindowRes->pageId != -1) { return 0; } SFilePage* pData = NULL; // in the first scan, new space needed for results int32_t pageId = -1; SArray* list = getDataBufPagesIdList(pResultBuf); if (taosArrayGetSize(list) == 0) { pData = getNewBufPage(pResultBuf, &pageId); if (pData == NULL) { qError("failed to get buffer, code:%s", tstrerror(terrno)); return terrno; } pData->num = sizeof(SFilePage); } else { SPageInfo* pi = getLastPageInfo(list); pData = getBufPage(pResultBuf, getPageId(pi)); if (pData == NULL) { qError("failed to get buffer, code:%s", tstrerror(terrno)); return terrno; } pageId = getPageId(pi); if (pData->num + size > getBufPageSize(pResultBuf)) { // release current page first, and prepare the next one releaseBufPageInfo(pResultBuf, pi); pData = getNewBufPage(pResultBuf, &pageId); if (pData == NULL) { qError("failed to get buffer, code:%s", tstrerror(terrno)); return terrno; } pData->num = sizeof(SFilePage); } } if (pData == NULL) { return -1; } // set the number of rows in current disk page if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer pWindowRes->pageId = pageId; pWindowRes->offset = (int32_t)pData->num; pData->num += size; } return 0; } int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey) { int32_t code = 0; // _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pAggSup->currentPageId = -1; pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash); if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { return terrno; } uint32_t defaultPgsz = 0; int64_t defaultBufsz = 0; code = getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); if (code) { qError("failed to get buff page size, rowSize:%d", pAggSup->resultRowSize); return code; } if (!osTempSpaceAvailable()) { code = TSDB_CODE_NO_DISKSPACE; qError("Init stream agg supporter failed since %s, key:%s, tempDir:%s", tstrerror(code), pKey, tsTempDir); return code; } code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); if (code != TSDB_CODE_SUCCESS) { qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey); return code; } return code; } void cleanupResultInfoInStream(SExecTaskInfo* pTaskInfo, void* pState, SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { int32_t code = TSDB_CODE_SUCCESS; SStorageAPI* pAPI = &pTaskInfo->storageAPI; int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; SqlFunctionCtx* pCtx = pSup->pCtx; int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); bool needCleanup = false; for (int32_t j = 0; j < numOfExprs; ++j) { needCleanup |= pCtx[j].needCleanup; } if (!needCleanup) { return; } for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { SResultWindowInfo* pWinInfo = taosArrayGet(pGroupResInfo->pRows, i); SRowBuffPos* pPos = pWinInfo->pStatePos; SResultRow* pRow = NULL; code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow); if (TSDB_CODE_SUCCESS != code) { qError("failed to get state by pos, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo)); continue; } for (int32_t j = 0; j < numOfExprs; ++j) { pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.cleanup) { pCtx[j].fpSet.cleanup(&pCtx[j]); } } } } void cleanupResultInfoInGroupResInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo) { int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; SqlFunctionCtx* pCtx = pSup->pCtx; int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); bool needCleanup = false; for (int32_t j = 0; j < numOfExprs; ++j) { needCleanup |= pCtx[j].needCleanup; } if (!needCleanup) { return; } for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { SResultRow* pRow = NULL; SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); if (page == NULL) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); continue; } pRow = (SResultRow*)((char*)page + pPos->pos.offset); for (int32_t j = 0; j < numOfExprs; ++j) { pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.cleanup) { pCtx[j].fpSet.cleanup(&pCtx[j]); } } releaseBufPage(pBuf, page); } } void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap) { int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; SqlFunctionCtx* pCtx = pSup->pCtx; bool needCleanup = false; for (int32_t j = 0; j < numOfExprs; ++j) { needCleanup |= pCtx[j].needCleanup; } if (!needCleanup) { return; } // begin from last iter void* pData = pGroupResInfo->dataPos; int32_t iter = pGroupResInfo->iter; while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) { SResultRowPosition* pos = pData; SFilePage* page = getBufPage(pBuf, pos->pageId); if (page == NULL) { qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); continue; } SResultRow* pRow = (SResultRow*)((char*)page + pos->offset); for (int32_t j = 0; j < numOfExprs; ++j) { pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.cleanup) { pCtx[j].fpSet.cleanup(&pCtx[j]); } } releaseBufPage(pBuf, page); } } void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo, SAggSupporter *pAggSup, bool cleanGroupResInfo) { if (cleanGroupResInfo) { cleanupResultInfoInGroupResInfo(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo); } else { cleanupResultInfoInHashMap(pTaskInfo, pSup, pAggSup->pResultBuf, pGroupResInfo, pAggSup->pResultRowHashTable); } } void cleanupAggSup(SAggSupporter* pAggSup) { taosMemoryFreeClear(pAggSup->keyBuf); tSimpleHashCleanup(pAggSup->pResultRowHashTable); destroyDiskbasedBuf(pAggSup->pResultBuf); memset(pAggSup, 0, sizeof(SAggSupporter)); } int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey, void* pState, SFunctionStateStore* pStore) { int32_t code = initExprSupp(pSup, pExprInfo, numOfCols, pStore); if (code != TSDB_CODE_SUCCESS) { return code; } code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey); if (code != TSDB_CODE_SUCCESS) { return code; } for (int32_t i = 0; i < numOfCols; ++i) { pSup->pCtx[i].hasWindowOrGroup = pSup->hasWindowOrGroup; if (pState) { pSup->pCtx[i].saveHandle.pBuf = NULL; pSup->pCtx[i].saveHandle.pState = pState; pSup->pCtx[i].exprIdx = i; } else { pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf; } } return TSDB_CODE_SUCCESS; } int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { int32_t code = TSDB_CODE_SUCCESS, lino = 0; for (int32_t k = 0; k < numOfOutput; ++k) { // keep it temporarily SFunctionCtxStatus status = {0}; functionCtxSave(&pCtx[k], &status); pCtx[k].input.startRowIndex = offset; pCtx[k].input.numOfRows = forwardStep; // not a whole block involved in query processing, statistics data can not be used // NOTE: the original value of isSet have been changed here if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) { pCtx[k].input.colDataSMAIsSet = false; } if (fmIsPlaceHolderFunc(pCtx[k].functionId)) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); char* p = GET_ROWCELL_INTERBUF(pEntryInfo); TAOS_CHECK_EXIT(fmSetStreamPseudoFuncParamVal(pCtx[k].functionId, pCtx[k].pExpr->base.pParamList, &taskInfo->pStreamRuntimeInfo->funcInfo)); SValueNode *valueNode = (SValueNode *)nodesListGetNode(pCtx[k].pExpr->base.pParamList, 0); if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) { pEntryInfo->isNullRes = 1; } else if (IS_VAR_DATA_TYPE(pCtx[k].pExpr->base.resSchema.type)){ void* v = nodesGetValueFromNode(valueNode); memcpy(p, v, varDataTLen(v)); } else { memcpy(p, nodesGetValueFromNode(valueNode), pCtx[k].pExpr->base.resSchema.bytes); } pEntryInfo->numOfRes = 1; } else if (pCtx[k].isPseudoFunc) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); char* p = GET_ROWCELL_INTERBUF(pEntryInfo); SColumnInfoData idata = {0}; idata.info.type = TSDB_DATA_TYPE_BIGINT; idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; idata.pData = p; SScalarParam out = {.columnData = &idata}; SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; TAOS_CHECK_EXIT(pCtx[k].sfp.process(&tw, 1, &out)); pEntryInfo->numOfRes = 1; } else { if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { if ((&pCtx[k])->input.pData[0] == NULL) { code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; qError("%s apply functions error, input data is NULL.", GET_TASKID(taskInfo)); } else { code = pCtx[k].fpSet.process(&pCtx[k]); } if (code != TSDB_CODE_SUCCESS) { if (pCtx[k].fpSet.cleanup != NULL) { pCtx[k].fpSet.cleanup(&pCtx[k]); } TAOS_CHECK_EXIT(code); } } // restore it functionCtxRestore(&pCtx[k], &status); } } _exit: if (code) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); taskInfo->code = code; } return code; } void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { pStatus->hasAgg = pCtx->input.colDataSMAIsSet; pStatus->numOfRows = pCtx->input.numOfRows; pStatus->startOffset = pCtx->input.startRowIndex; } void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { pCtx->input.colDataSMAIsSet = pStatus->hasAgg; pCtx->input.numOfRows = pStatus->numOfRows; pCtx->input.startRowIndex = pStatus->startOffset; } static int32_t resetAggregateOperatorState(SOperatorInfo* pOper) { SAggOperatorInfo* pAgg = pOper->info; SAggPhysiNode* pAggNode = (SAggPhysiNode*)pOper->pPhyNode; pOper->status = OP_NOT_OPENED; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; SExecTaskInfo* pTaskInfo = pOper->pTaskInfo; cleanupResultInfo(pTaskInfo, &pOper->exprSupp, &pAgg->groupResInfo, &pAgg->aggSup, pAgg->cleanGroupResInfo); cleanupGroupResInfo(&pAgg->groupResInfo); resetBasicOperatorState(&pAgg->binfo); int32_t code = resetAggSup(&pOper->exprSupp, &pAgg->aggSup, pTaskInfo, pAggNode->pAggFuncs, pAggNode->pGroupKeys, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code == 0) { code = resetExprSupp(&pAgg->scalarExprSup, pTaskInfo, pAggNode->pExprs, NULL, &pTaskInfo->storageAPI.functionStore); } pAgg->groupId = UINT64_MAX; pAgg->cleanGroupResInfo = false; pAgg->hasValidBlock = false; return 0; }