TDengine/source/libs/executor/src/groupoperator.c

757 lines
26 KiB
C
Raw Normal View History

2022-04-04 06:54:39 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "function.h"
#include "tname.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
2022-05-27 14:05:53 +00:00
#include "executorInt.h"
2022-04-04 06:54:39 +00:00
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
2022-04-08 05:09:44 +00:00
static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity);
static int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
2022-04-08 05:09:44 +00:00
2022-04-08 02:24:35 +00:00
static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) {
2022-04-04 06:54:39 +00:00
SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosMemoryFreeClear(pInfo->keyBuf);
taosArrayDestroy(pInfo->pGroupCols);
taosArrayDestroy(pInfo->pGroupColVals);
}
2022-06-13 11:59:30 +00:00
static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) {
2022-04-08 02:24:35 +00:00
*pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys));
if ((*pGroupColVals) == NULL) {
2022-04-04 06:54:39 +00:00
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t numOfGroupCols = taosArrayGetSize(pGroupColList);
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SColumn* pCol = taosArrayGet(pGroupColList, i);
(*keyLen) += pCol->bytes; // actual data + null_flag
2022-04-04 06:54:39 +00:00
SGroupKeys key = {0};
2022-04-08 02:24:35 +00:00
key.bytes = pCol->bytes;
key.type = pCol->type;
2022-04-04 06:54:39 +00:00
key.isNull = false;
2022-04-08 02:24:35 +00:00
key.pData = taosMemoryCalloc(1, pCol->bytes);
2022-04-04 06:54:39 +00:00
if (key.pData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
2022-04-08 02:24:35 +00:00
taosArrayPush((*pGroupColVals), &key);
2022-04-04 06:54:39 +00:00
}
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
(*keyLen) += nullFlagSize;
2022-04-04 06:54:39 +00:00
(*keyBuf) = taosMemoryCalloc(1, (*keyLen));
2022-04-08 02:24:35 +00:00
if ((*keyBuf) == NULL) {
2022-04-04 06:54:39 +00:00
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
2022-04-08 02:24:35 +00:00
static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) {
2022-04-04 06:54:39 +00:00
SColumnDataAgg* pColAgg = NULL;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
2022-04-08 02:24:35 +00:00
SColumn* pCol = taosArrayGet(pGroupCols, i);
2022-04-04 06:54:39 +00:00
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
2022-04-04 06:54:39 +00:00
}
bool isNull = colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg);
2022-04-08 02:24:35 +00:00
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
2022-04-04 06:54:39 +00:00
if (pkey->isNull && isNull) {
continue;
}
if (isNull || pkey->isNull) {
return false;
}
char* val = colDataGetData(pColInfoData, rowIndex);
2022-06-04 11:28:30 +00:00
if (pkey->type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(val);
if (memcmp(pkey->pData, val, dataLen) == 0){
continue;
} else {
return false;
}
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
2022-04-04 06:54:39 +00:00
int32_t len = varDataLen(val);
if (len == varDataLen(pkey->pData) && memcmp(varDataVal(pkey->pData), varDataVal(val), len) == 0) {
continue;
} else {
return false;
}
} else {
if (memcmp(pkey->pData, val, pkey->bytes) != 0) {
return false;
}
}
}
return true;
}
2022-06-13 11:59:30 +00:00
static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) {
2022-04-04 06:54:39 +00:00
SColumnDataAgg* pColAgg = NULL;
size_t numOfGroupCols = taosArrayGetSize(pGroupCols);
2022-04-04 06:54:39 +00:00
for (int32_t i = 0; i < numOfGroupCols; ++i) {
2022-04-08 02:24:35 +00:00
SColumn* pCol = taosArrayGet(pGroupCols, i);
2022-04-04 06:54:39 +00:00
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
if (pBlock->pBlockAgg != NULL) {
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
2022-04-04 06:54:39 +00:00
}
2022-04-08 02:24:35 +00:00
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
2022-04-04 06:54:39 +00:00
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
pkey->isNull = true;
} else {
pkey->isNull = false;
2022-04-04 06:54:39 +00:00
char* val = colDataGetData(pColInfoData, rowIndex);
2022-06-04 11:28:30 +00:00
if (pkey->type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(val);
memcpy(pkey->pData, val, dataLen);
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
2022-04-04 06:54:39 +00:00
memcpy(pkey->pData, val, varDataTLen(val));
ASSERT(varDataTLen(val) <= pkey->bytes);
2022-04-04 06:54:39 +00:00
} else {
memcpy(pkey->pData, val, pkey->bytes);
}
}
}
}
2022-06-13 11:59:30 +00:00
static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) {
2022-04-04 06:54:39 +00:00
ASSERT(pKey != NULL);
size_t numOfGroupCols = taosArrayGetSize(pGroupColVals);
char* isNull = (char*)pKey;
char* pStart = (char*)pKey + sizeof(int8_t) * numOfGroupCols;
for (int32_t i = 0; i < numOfGroupCols; ++i) {
SGroupKeys* pkey = taosArrayGet(pGroupColVals, i);
if (pkey->isNull) {
isNull[i] = 1;
continue;
}
isNull[i] = 0;
2022-06-04 11:28:30 +00:00
if (pkey->type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(pkey->pData);
memcpy(pStart, (pkey->pData), dataLen);
pStart += dataLen;
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
2022-04-04 06:54:39 +00:00
varDataCopy(pStart, pkey->pData);
pStart += varDataTLen(pkey->pData);
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
} else {
memcpy(pStart, pkey->pData, pkey->bytes);
pStart += pkey->bytes;
}
}
2022-04-07 05:57:47 +00:00
return (int32_t) (pStart - (char*)pKey);
2022-04-04 06:54:39 +00:00
}
// assign the group keys or user input constant values if required
static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) {
for (int32_t i = 0; i < numOfOutput; ++i) {
2022-04-11 09:08:13 +00:00
if (pCtx[i].functionId == -1) { // select count(*),key from t group by key.
2022-04-04 06:54:39 +00:00
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]);
SColumnInfoData* pColInfoData = pCtx[i].input.pData[0];
// todo OPT all/all not NULL
2022-04-04 06:54:39 +00:00
if (!colDataIsNull(pColInfoData, totalRows, rowIndex, NULL)) {
char* dest = GET_ROWCELL_INTERBUF(pEntryInfo);
char* data = colDataGetData(pColInfoData, rowIndex);
2022-06-04 11:28:30 +00:00
if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
int32_t dataLen = getJsonValueLen(data);
memcpy(dest, data, dataLen);
} else if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
varDataCopy(dest, data);
} else {
memcpy(dest, data, pColInfoData->info.bytes);
}
2022-04-06 15:00:32 +00:00
} else { // it is a NULL value
pEntryInfo->isNullRes = 1;
2022-04-04 06:54:39 +00:00
}
2022-04-06 15:00:32 +00:00
pEntryInfo->numOfRes = 1;
2022-04-04 06:54:39 +00:00
}
}
}
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SGroupbyOperatorInfo* pInfo = pOperator->info;
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols);
// if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
// qError("QInfo:0x%"PRIx64" group by not supported on double/float columns, abort", GET_TASKID(pRuntimeEnv));
// return;
// }
int32_t len = 0;
STimeWindow w = TSWINDOW_INITIALIZER;
int32_t num = 0;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
// Compare with the previous row of this column, and do not set the output buffer again if they are identical.
if (!pInfo->isInit) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
2022-04-04 06:54:39 +00:00
pInfo->isInit = true;
num++;
continue;
}
2022-04-08 02:24:35 +00:00
bool equal = groupKeyCompare(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols);
2022-04-04 06:54:39 +00:00
if (equal) {
num++;
continue;
}
2022-04-06 15:00:32 +00:00
// The first row of a new block does not belongs to the previous existed group
if (j == 0) {
2022-04-06 11:46:38 +00:00
num++;
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
2022-04-06 11:46:38 +00:00
continue;
}
2022-04-07 05:57:47 +00:00
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
2022-04-04 06:54:39 +00:00
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = j - num;
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
2022-04-04 06:54:39 +00:00
// assign the group keys or user input constant values if required
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
2022-04-04 06:54:39 +00:00
num = 1;
}
if (num > 0) {
2022-04-07 05:57:47 +00:00
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
2022-04-04 06:54:39 +00:00
int32_t ret =
setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len,
2022-04-04 06:54:39 +00:00
0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = pBlock->info.rows - num;
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC);
doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex);
2022-04-04 06:54:39 +00:00
}
}
2022-05-03 07:27:13 +00:00
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
2022-04-04 06:54:39 +00:00
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2022-04-04 06:54:39 +00:00
SGroupbyOperatorInfo* pInfo = pOperator->info;
2022-04-06 09:59:08 +00:00
SSDataBlock* pRes = pInfo->binfo.pRes;
2022-04-04 06:54:39 +00:00
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
size_t rows = pRes->info.rows;
if (rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
2022-04-04 06:54:39 +00:00
}
pOperator->resultInfo.totalRows += rows;
2022-04-08 02:24:35 +00:00
return (pRes->info.rows == 0)? NULL:pRes;
2022-04-04 06:54:39 +00:00
}
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
int64_t st = taosGetTimestampUs();
2022-04-04 06:54:39 +00:00
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
2022-05-03 07:27:13 +00:00
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2022-04-04 06:54:39 +00:00
if (pBlock == NULL) {
break;
}
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
2022-04-04 06:54:39 +00:00
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSup.pScalarExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSup.pScalarFuncCtx, pInfo->scalarSup.numOfScalarExpr, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
}
2022-04-04 06:54:39 +00:00
doHashGroupbyAgg(pOperator, pBlock);
}
pOperator->status = OP_RES_TO_RETURN;
2022-06-14 06:45:17 +00:00
#if 0
if(pOperator->fpSet.encodeResultRow){
char *result = NULL;
int32_t length = 0;
pOperator->fpSet.encodeResultRow(pOperator, &result, &length);
SAggSupporter* pSup = &pInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pInfo->binfo.resultRowInfo.size = 0;
pOperator->fpSet.decodeResultRow(pOperator, result);
if(result){
taosMemoryFree(result);
}
}
#endif
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
2022-04-06 09:59:08 +00:00
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2022-04-06 09:59:08 +00:00
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes);
2022-04-06 09:59:08 +00:00
bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo);
2022-04-06 09:59:08 +00:00
if (!hasRemain) {
doSetOperatorCompleted(pOperator);
2022-04-06 09:59:08 +00:00
break;
}
if (pRes->info.rows > 0) {
break;
}
2022-04-04 06:54:39 +00:00
}
size_t rows = pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL:pRes;
2022-04-04 06:54:39 +00:00
}
2022-06-14 03:54:13 +00:00
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
2022-04-04 06:54:39 +00:00
SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pGroupCols = pGroupColList;
pInfo->pCondition = pCondition;
pInfo->scalarSup.pScalarExprInfo = pScalarExprInfo;
pInfo->scalarSup.numOfScalarExpr = numOfScalarExpr;
pInfo->scalarSup.pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowCellInfoOffset);
2022-04-08 02:24:35 +00:00
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
2022-04-04 06:54:39 +00:00
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultSizeInfo(pOperator, 4096);
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, pInfo->groupKeyLen, pTaskInfo->id.str);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
2022-04-04 06:54:39 +00:00
pOperator->name = "GroupbyAggOperator";
2022-05-03 15:23:49 +00:00
pOperator->blocking = true;
2022-04-04 06:54:39 +00:00
pOperator->status = OP_NOT_OPENED;
2022-04-11 09:35:17 +00:00
// pOperator->operatorType = OP_Groupby;
2022-04-04 06:54:39 +00:00
pOperator->pExpr = pExprInfo;
2022-06-14 03:54:13 +00:00
pOperator->numOfExprs = numOfCols;
2022-04-04 06:54:39 +00:00
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
2022-04-04 06:54:39 +00:00
2022-04-27 01:27:12 +00:00
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
2022-04-04 06:54:39 +00:00
code = appendDownstream(pOperator, &downstream, 1);
return pOperator;
_error:
2022-04-07 06:51:52 +00:00
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
2022-04-04 06:54:39 +00:00
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
return NULL;
2022-04-06 09:59:08 +00:00
}
2022-04-08 02:24:35 +00:00
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
2022-04-08 09:37:57 +00:00
// SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2022-04-08 02:24:35 +00:00
SPartitionOperatorInfo* pInfo = pOperator->info;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
2022-04-08 02:24:35 +00:00
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
2022-04-08 09:37:57 +00:00
SDataGroupInfo* pGInfo = NULL;
void *pPage = getCurrentDataGroupInfo(pInfo, &pGInfo, len);
2022-04-08 02:24:35 +00:00
2022-04-08 09:37:57 +00:00
pGInfo->numOfRows += 1;
if (pGInfo->groupId == 0) {
pGInfo->groupId = calcGroupId(pInfo->keyBuf, len);
2022-04-08 02:24:35 +00:00
}
// number of rows
2022-04-08 05:09:44 +00:00
int32_t* rows = (int32_t*) pPage;
2022-04-08 02:24:35 +00:00
// group id
size_t numOfCols = pOperator->numOfExprs;
2022-04-08 05:09:44 +00:00
for(int32_t i = 0; i < numOfCols; ++i) {
2022-04-08 07:31:03 +00:00
SExprInfo* pExpr = &pOperator->pExpr[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
2022-04-08 02:24:35 +00:00
2022-04-08 05:09:44 +00:00
int32_t bytes = pColInfoData->info.bytes;
int32_t startOffset = pInfo->columnOffset[i];
2022-04-08 02:24:35 +00:00
int32_t* columnLen = NULL;
int32_t contentLen = 0;
2022-04-08 02:24:35 +00:00
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
2022-04-22 01:54:27 +00:00
int32_t* offset = (int32_t*)((char*)pPage + startOffset);
columnLen = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
char* data = (char*)((char*) columnLen + sizeof(int32_t));
2022-04-08 05:09:44 +00:00
if (colDataIsNull_s(pColInfoData, j)) {
offset[(*rows)] = -1;
contentLen = 0;
2022-06-04 11:28:30 +00:00
} else if(pColInfoData->info.type == TSDB_DATA_TYPE_JSON){
offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j);
int32_t dataLen = getJsonValueLen(src);
memcpy(data + (*columnLen), src, dataLen);
int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
ASSERT(v > 0);
contentLen = dataLen;
2022-04-08 05:09:44 +00:00
} else {
offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j);
memcpy(data + (*columnLen), src, varDataTLen(src));
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
ASSERT(v > 0);
2022-04-08 05:09:44 +00:00
contentLen = varDataTLen(src);
}
2022-04-08 02:24:35 +00:00
} else {
2022-04-22 01:54:27 +00:00
char* bitmap = (char*)pPage + startOffset;
columnLen = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
2022-04-08 05:09:44 +00:00
char* data = (char*) columnLen + sizeof(int32_t);
2022-04-08 02:24:35 +00:00
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
if (isNull) {
2022-04-08 05:09:44 +00:00
colDataSetNull_f(bitmap, (*rows));
2022-04-08 02:24:35 +00:00
} else {
2022-04-08 05:09:44 +00:00
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
2022-04-08 02:24:35 +00:00
}
2022-04-08 05:09:44 +00:00
contentLen = bytes;
2022-04-08 02:24:35 +00:00
}
2022-04-08 05:09:44 +00:00
(*columnLen) += contentLen;
ASSERT(*columnLen >= 0);
2022-04-08 02:24:35 +00:00
}
2022-04-08 05:09:44 +00:00
(*rows) += 1;
2022-04-08 02:24:35 +00:00
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage);
}
2022-04-08 09:37:57 +00:00
}
void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len) {
SDataGroupInfo* p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
void* pPage = NULL;
if (p == NULL) { // it is a new group
SDataGroupInfo gi = {0};
gi.pPageList = taosArrayInit(100, sizeof(int32_t));
taosHashPut(pInfo->pGroupSet, pInfo->keyBuf, len, &gi, sizeof(SDataGroupInfo));
p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId);
*(int32_t *) pPage = 0;
} else {
int32_t* curId = taosArrayGetLast(p->pPageList);
pPage = getBufPage(pInfo->pBuf, *curId);
int32_t *rows = (int32_t*) pPage;
if (*rows >= pInfo->rowCapacity) {
// release buffer
releaseBufPage(pInfo->pBuf, pPage);
2022-04-08 09:37:57 +00:00
// add a new page for current group
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId);
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
2022-04-08 09:37:57 +00:00
}
}
2022-04-08 02:24:35 +00:00
2022-04-08 09:37:57 +00:00
*pGroupInfo = p;
return pPage;
}
uint64_t calcGroupId(char* pData, int32_t len) {
T_MD5_CTX context;
tMD5Init(&context);
tMD5Update(&context, (uint8_t*)pData, len);
tMD5Final(&context);
// NOTE: only extract the initial 8 bytes of the final MD5 digest
uint64_t id = 0;
memcpy(&id, context.digest, sizeof(uint64_t));
return id;
2022-04-08 02:24:35 +00:00
}
2022-04-08 05:09:44 +00:00
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
size_t numOfCols = pBlock->info.numOfCols;
int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));
offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
2022-04-08 05:09:44 +00:00
for(int32_t i = 0; i < numOfCols - 1; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int32_t bytes = pColInfoData->info.bytes;
int32_t payloadLen = bytes * rowCapacity;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
// offset segment + content length + payload
offset[i + 1] = rowCapacity * sizeof(int32_t) + sizeof(int32_t) + payloadLen + offset[i];
} else {
// bitmap + content length + payload
offset[i + 1] = BitmapLen(rowCapacity) + sizeof(int32_t) + payloadLen + offset[i];
}
}
return offset;
}
2022-06-14 03:15:10 +00:00
static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
void *ite = NULL;
while( (ite = taosHashIterate(pInfo->pGroupSet, ite)) != NULL ) {
taosArrayDestroy( ((SDataGroupInfo *)ite)->pPageList);
}
taosHashClear(pInfo->pGroupSet);
clearDiskbasedBuf(pInfo->pBuf);
}
2022-04-08 02:24:35 +00:00
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info;
SDataGroupInfo* pGroupInfo = pInfo->pGroupIter;
if (pInfo->pGroupIter == NULL || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
// try next group data
pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
if (pInfo->pGroupIter == NULL) {
doSetOperatorCompleted(pOperator);
2022-06-14 03:15:10 +00:00
clearPartitionOperator(pInfo);
2022-04-08 02:24:35 +00:00
return NULL;
}
pGroupInfo = pInfo->pGroupIter;
pInfo->pageIndex = 0;
}
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
void* page = getBufPage(pInfo->pBuf, *pageId);
2022-04-08 05:09:44 +00:00
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
2022-04-08 02:24:35 +00:00
pInfo->pageIndex += 1;
releaseBufPage(pInfo->pBuf, page);
2022-04-08 09:37:57 +00:00
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
2022-04-08 09:37:57 +00:00
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
2022-04-08 02:24:35 +00:00
return pInfo->binfo.pRes;
}
2022-05-03 07:27:13 +00:00
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
2022-04-07 06:51:52 +00:00
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
2022-04-06 09:59:08 +00:00
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SPartitionOperatorInfo* pInfo = pOperator->info;
2022-04-08 02:24:35 +00:00
SSDataBlock* pRes = pInfo->binfo.pRes;
2022-04-06 09:59:08 +00:00
2022-04-07 06:51:52 +00:00
if (pOperator->status == OP_RES_TO_RETURN) {
2022-04-08 02:24:35 +00:00
blockDataCleanup(pRes);
return buildPartitionResult(pOperator);
2022-04-07 06:51:52 +00:00
}
int64_t st = taosGetTimestampUs();
2022-04-08 02:24:35 +00:00
SOperatorInfo* downstream = pOperator->pDownstream[0];
2022-04-07 06:51:52 +00:00
2022-04-08 02:24:35 +00:00
while (1) {
2022-05-03 07:27:13 +00:00
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
2022-04-08 02:24:35 +00:00
if (pBlock == NULL) {
break;
}
2022-04-07 06:51:52 +00:00
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if (pInfo->scalarSupp.pScalarExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSupp.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSupp.pScalarFuncCtx, pInfo->scalarSupp.numOfScalarExpr, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
}
}
2022-04-08 02:24:35 +00:00
doHashPartition(pOperator, pBlock);
2022-04-07 06:51:52 +00:00
}
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
2022-04-07 06:51:52 +00:00
pOperator->status = OP_RES_TO_RETURN;
2022-04-08 02:24:35 +00:00
blockDataEnsureCapacity(pRes, 4096);
return buildPartitionResult(pOperator);
}
static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
2022-04-08 05:09:44 +00:00
SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param;
2022-04-08 02:24:35 +00:00
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
taosArrayDestroy(pInfo->pGroupCols);
2022-05-27 14:05:53 +00:00
for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i);
taosMemoryFree(key.pData);
}
2022-04-08 02:24:35 +00:00
taosArrayDestroy(pInfo->pGroupColVals);
2022-04-08 05:09:44 +00:00
taosMemoryFree(pInfo->keyBuf);
2022-05-27 14:05:53 +00:00
taosHashCleanup(pInfo->pGroupSet);
2022-04-08 05:09:44 +00:00
taosMemoryFree(pInfo->columnOffset);
2022-04-07 06:51:52 +00:00
}
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
2022-04-08 02:24:35 +00:00
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
2022-04-07 06:51:52 +00:00
if (pInfo == NULL || pOperator == NULL) {
2022-04-07 05:57:47 +00:00
goto _error;
}
2022-04-06 09:59:08 +00:00
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
if (pPartNode->pExprs != NULL) {
pInfo->scalarSupp.numOfScalarExpr = 0;
pInfo->scalarSupp.pScalarExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSupp.numOfScalarExpr);
pInfo->scalarSupp.pScalarFuncCtx = createSqlFunctionCtx(
pInfo->scalarSupp.pScalarExprInfo, pInfo->scalarSupp.numOfScalarExpr, &pInfo->scalarSupp.rowCellInfoOffset);
}
2022-04-08 02:24:35 +00:00
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
if (pInfo->pGroupSet == NULL) {
goto _error;
}
uint32_t defaultPgsz = 0;
uint32_t defaultBufsz = 0;
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
2022-04-08 02:24:35 +00:00
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf));
pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity);
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
2022-04-08 02:24:35 +00:00
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
2022-04-07 06:51:52 +00:00
pOperator->name = "PartitionOperator";
pOperator->blocking = true;
2022-04-06 09:59:08 +00:00
pOperator->status = OP_NOT_OPENED;
2022-04-08 02:24:35 +00:00
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResBlock;
pOperator->numOfExprs = numOfCols;
2022-04-08 07:31:03 +00:00
pOperator->pExpr = pExprInfo;
2022-04-08 02:24:35 +00:00
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
2022-04-26 12:26:32 +00:00
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
NULL, NULL, NULL);
2022-04-07 06:51:52 +00:00
2022-04-08 02:24:35 +00:00
code = appendDownstream(pOperator, &downstream, 1);
2022-04-06 09:59:08 +00:00
return pOperator;
_error:
2022-04-07 06:51:52 +00:00
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
2022-04-08 02:24:35 +00:00
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
2022-04-06 09:59:08 +00:00
return NULL;
}
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo,
SAggSupporter* pAggSup) {
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
SqlFunctionCtx* pCtx = binfo->pCtx;
SResultRow* pResultRow =
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
assert(pResultRow != NULL);
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
return TSDB_CODE_SUCCESS;
2022-04-07 05:57:47 +00:00
}