TDengine/source/libs/executor/src/groupcacheoperator.c

1186 lines
38 KiB
C
Raw Normal View History

2023-06-30 11:20:36 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
#include "groupcache.h"
static int32_t initGroupColsInfo(SGroupColsInfo* pCols, bool grpColsMayBeNull, SNodeList* pList) {
pCols->colNum = LIST_LENGTH(pList);
pCols->withNull = grpColsMayBeNull;
pCols->pColsInfo = taosMemoryMalloc(pCols->colNum * sizeof(SGroupColInfo));
if (NULL == pCols->pColsInfo) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t i = 0;
SNode* pNode = NULL;
FOREACH(pNode, pList) {
SColumnNode* pColNode = (SColumnNode*)pNode;
pCols->pColsInfo[i].slot = pColNode->slotId;
pCols->pColsInfo[i].vardata = IS_VAR_DATA_TYPE(pColNode->node.resType.type);
pCols->pColsInfo[i].bytes = pColNode->node.resType.bytes;
pCols->bufSize += pColNode->node.resType.bytes;
++i;
}
if (pCols->withNull) {
pCols->bitMapSize = pCols->colNum / sizeof(int8_t) + ((pCols->colNum % sizeof(int8_t)) ? 1 : 0);
pCols->bufSize += pCols->bitMapSize;
}
if (pCols->colNum > 1) {
pCols->pBuf = taosMemoryMalloc(pCols->bufSize);
if (NULL == pCols->pBuf) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
2023-07-07 08:49:35 +00:00
static void logGroupCacheExecInfo(SGroupCacheOperatorInfo* pGrpCacheOperator) {
char* buf = taosMemoryMalloc(pGrpCacheOperator->execInfo.downstreamNum * 32 + 100);
if (NULL == buf) {
return;
}
int32_t offset = sprintf(buf, "groupCache exec info, downstreamBlkNum:");
for (int32_t i = 0; i < pGrpCacheOperator->execInfo.downstreamNum; ++i) {
offset += sprintf(buf + offset, " %" PRId64 , pGrpCacheOperator->execInfo.pDownstreamBlkNum[i]);
}
qDebug("%s", buf);
}
2023-06-30 11:20:36 +00:00
static void destroyGroupCacheOperator(void* param) {
SGroupCacheOperatorInfo* pGrpCacheOperator = (SGroupCacheOperatorInfo*)param;
2023-07-07 08:49:35 +00:00
logGroupCacheExecInfo(pGrpCacheOperator);
2023-06-30 11:20:36 +00:00
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pColsInfo);
taosMemoryFree(pGrpCacheOperator->groupColsInfo.pBuf);
taosHashCleanup(pGrpCacheOperator->pGrpHash);
2023-06-30 11:20:36 +00:00
taosMemoryFreeClear(param);
}
2023-07-21 11:25:38 +00:00
static FORCE_INLINE int32_t initOpenCacheFile(SGroupCacheFileFd* pFileFd, char* filename) {
2023-07-21 09:30:09 +00:00
TdFilePtr newFd = taosOpenFile(filename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
if (NULL == newFd) {
return TAOS_SYSTEM_ERROR(errno);
}
pFileFd->fd = newFd;
taosThreadMutexInit(&pFileFd->mutex, NULL);
return TSDB_CODE_SUCCESS;
}
2023-07-21 11:25:38 +00:00
static int32_t acquireVgroupFileFd(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, uint32_t fileId, SGroupCacheFileFd** ppFd) {
2023-07-21 09:30:09 +00:00
int32_t code = TSDB_CODE_SUCCESS;
2023-07-21 11:25:38 +00:00
SGroupCacheFileFd* pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId));
2023-07-21 09:30:09 +00:00
if (NULL == pTmp) {
sprintf(pVgCtx->baseFilename[pVgCtx->baseNameLen], "_%u", fileId);
2023-07-21 11:25:38 +00:00
SGroupCacheFileFd newVgFd = {0};
2023-07-21 09:30:09 +00:00
tSimpleHashPut(pVgCtx->pCacheFile, &fileId, sizeof(fileId), &newVgFd, sizeof(newVgFd));
pTmp = tSimpleHashGet(pVgCtx->pCacheFile, &fileId, sizeof(fileId));
code = initOpenCacheFile(pTmp, pVgCtx->baseFilename);
if (code) {
return code;
}
}
taosThreadMutexLock(&pTmp->mutex);
*ppFd = pTmp;
return TSDB_CODE_SUCCESS;
}
2023-07-21 11:25:38 +00:00
static FORCE_INLINE void releaseVgroupFileFd(SGroupCacheFileFd* pFd) {
2023-07-21 09:30:09 +00:00
taosThreadMutexUnlock(&pFd->mutex);
}
static int32_t saveBatchBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS;
SGcBlkBufBasic blkBasic;
while (NULL != pHead) {
2023-07-21 11:25:38 +00:00
SGroupCacheFileFd *pFd;
code = acquireDownstreamFileFd(pHead->pCtx, &pFd);
if (code) {
return code;
2023-07-21 09:30:09 +00:00
}
2023-07-21 11:25:38 +00:00
int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
2023-07-21 09:30:09 +00:00
if (ret == -1) {
2023-07-21 11:25:38 +00:00
releaseDownstreamFileFd();
2023-07-21 09:30:09 +00:00
return TAOS_SYSTEM_ERROR(errno);
}
2023-07-21 11:25:38 +00:00
ret = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize);
if (ret != pHead->basic.bufSize) {
releaseDownstreamFileFd();
2023-07-21 09:30:09 +00:00
return TAOS_SYSTEM_ERROR(errno);
}
2023-07-21 11:25:38 +00:00
releaseDownstreamFileFd();
2023-07-21 09:30:09 +00:00
2023-07-21 11:25:38 +00:00
taosWLockLatch(&pHead->pGroup->batchList.lock);
taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic);
taosWUnLockLatch(&pHead->pGroup->batchList.lock);
int64_t blkId = pHead->basic.blkId;
2023-07-21 09:30:09 +00:00
pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
}
return code;
}
static int32_t saveSeqBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS;
2023-07-21 11:25:38 +00:00
SGroupCacheFileFd* pFd = NULL;
2023-07-21 09:30:09 +00:00
while (NULL != pHead) {
code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
2023-07-21 11:25:38 +00:00
int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
2023-07-21 09:30:09 +00:00
if (ret == -1) {
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
2023-07-21 11:25:38 +00:00
code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize);
2023-07-21 09:30:09 +00:00
releaseVgroupFileFd(pFd);
2023-07-21 11:25:38 +00:00
if (code != pHead->basic.bufSize) {
2023-07-21 09:30:09 +00:00
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
2023-07-21 11:25:38 +00:00
taosWLockLatch(&pHead->pGroup->batchList.lock);
taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic);
taosWUnLockLatch(&pHead->pGroup->batchList.lock);
2023-07-21 09:30:09 +00:00
2023-07-21 11:25:38 +00:00
int64_t blkId = pHead->basic.blkId;
2023-07-21 09:30:09 +00:00
pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
}
return code;
}
static int32_t saveBlocksToDisk(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS;
if (pGCache->batchFetch) {
code = saveBatchBlocksToDisk(pGCache, pHead);
} else {
code = saveSeqBlocksToDisk(pGCache, pHead);
}
atomic_val_compare_exchange_32(&pGCache->blkCache.writeDownstreamId, pCtx->id, -1);
return code;
}
static int32_t addBlkToDirtyBufList(SGroupCacheOperatorInfo* pGCache, SGcDownstreamCtx* pCtx, SGcVgroupCtx* pVgCtx, SGcBlkCacheInfo* pCache, SGcBlkBufInfo* pBufInfo) {
if (0 != taosHashPut(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId), pBufInfo, sizeof(*pBufInfo))) {
2023-06-30 11:20:36 +00:00
return TSDB_CODE_OUT_OF_MEMORY;
}
pBufInfo = taosHashGet(pCache->pDirtyBlk, &pBufInfo->blkId, sizeof(pBufInfo->blkId));
2023-07-21 09:30:09 +00:00
int32_t code = TSDB_CODE_SUCCESS;
SGcBlkBufInfo* pWriteHead = NULL;
taosWLockLatch(&pCache->dirtyLock);
2023-07-21 09:30:09 +00:00
pCache->blkCacheSize += pBufInfo->bufSize;
qError("group cache block cache num:%d size:%" PRId64 , taosHashGetSize(pCache->pDirtyBlk), pCache->blkCacheSize);
if (NULL == pCache->pDirtyHead) {
pCache->pDirtyHead = pBufInfo;
} else {
pCache->pDirtyTail->next = pBufInfo;
}
pCache->pDirtyTail = pBufInfo;
2023-07-21 09:30:09 +00:00
if (pGCache->maxCacheSize > 0 && pCache->blkCacheSize > pGCache->maxCacheSize) {
if (-1 == atomic_val_compare_exchange_32(&pCache->writeDownstreamId, -1, pCtx->id)) {
pWriteHead = pCache->pDirtyHead;
SGcBlkBufInfo* pTmp = pCache->pDirtyHead;
while (NULL != pTmp) {
pCache->blkCacheSize -= pTmp->bufSize;
if (pCache->blkCacheSize <= pGCache->maxCacheSize) {
pCache->pDirtyHead = pTmp->next;
pTmp->next = NULL;
break;
}
pTmp = pTmp->next;
}
}
}
taosWUnLockLatch(&pCache->dirtyLock);
2023-07-21 09:30:09 +00:00
if (NULL != pWriteHead) {
code = saveBlocksToDisk(pGCache, pCtx, pWriteHead);
}
2023-06-30 11:20:36 +00:00
2023-07-21 09:30:09 +00:00
return code;
2023-06-30 11:20:36 +00:00
}
static int32_t addBlkToBufCache(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, SGcBlkBufInfo* pBufInfo) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
int64_t bufSize = blockDataGetSize(pBlock) + sizeof(int32_t) + taosArrayGetSize(pBlock->pDataBlock) * sizeof(int32_t);
pBufInfo->pBuf = taosMemoryMalloc(bufSize);
if (NULL == pBufInfo->pBuf) {
qError("group cache add block to cache failed, size:%" PRId64, bufSize);
return TSDB_CODE_OUT_OF_MEMORY;
}
blockDataToBuf(pBufInfo->pBuf, pBlock);
pBufInfo->next = NULL;
pBufInfo->blkId = atomic_add_fetch_64(&pGCache->currentBlkId, 1);
pBufInfo->fileId = pGroup->fileId;
pBufInfo->bufSize = bufSize;
2023-07-21 09:30:09 +00:00
pBufInfo->offset = atomic_fetch_add_64(&pGroup->pVgCtx->fileSize, bufSize);
pBufInfo->pCtx = pCtx;
pBufInfo->pGroup = pGroup;
2023-07-21 09:30:09 +00:00
int32_t code = addBlkToDirtyBufList(pGCache, pCtx, pGroup->pVgCtx, &pGCache->blkCache, pBufInfo);
return code;
}
2023-07-19 03:34:44 +00:00
void blockDataDeepCleanup(SSDataBlock* pDataBlock) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
p->pData = NULL;
if (IS_VAR_DATA_TYPE(p->info.type)) {
p->varmeta.offset = NULL;
p->varmeta.length = 0;
p->varmeta.allocLen = 0;
} else {
p->nullbitmap = NULL;
}
}
pDataBlock->info.capacity = 0;
2023-07-19 05:42:15 +00:00
pDataBlock->info.rows = 0;
2023-07-19 03:34:44 +00:00
}
static int32_t buildGroupCacheBaseBlock(SSDataBlock** ppDst, SSDataBlock* pSrc) {
*ppDst = taosMemoryMalloc(sizeof(*pSrc));
if (NULL == *ppDst) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*ppDst)->pBlockAgg = NULL;
(*ppDst)->pDataBlock = taosArrayDup(pSrc->pDataBlock, NULL);
if (NULL == (*ppDst)->pDataBlock) {
taosMemoryFree(*ppDst);
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(&(*ppDst)->info, &pSrc->info, sizeof(pSrc->info));
2023-07-19 03:42:54 +00:00
blockDataDeepCleanup(*ppDst);
return TSDB_CODE_SUCCESS;
}
static int32_t acquireBaseBlockFromList(SGcDownstreamCtx* pCtx, SSDataBlock** ppRes) {
taosWLockLatch(&pCtx->blkLock);
if (taosArrayGetSize(pCtx->pFreeBlock) <= 0) {
taosWUnLockLatch(&pCtx->blkLock);
return buildGroupCacheBaseBlock(ppRes, pCtx->pBaseBlock);
}
2023-07-17 01:25:28 +00:00
*ppRes = *(SSDataBlock**)taosArrayPop(pCtx->pFreeBlock);
taosWUnLockLatch(&pCtx->blkLock);
return TSDB_CODE_SUCCESS;
}
2023-07-14 11:36:56 +00:00
static void releaseBaseBlockToList(SGcDownstreamCtx* pCtx, SSDataBlock* pBlock) {
2023-07-19 03:34:44 +00:00
blockDataDeepCleanup(pBlock);
taosWLockLatch(&pCtx->blkLock);
taosArrayPush(pCtx->pFreeBlock, &pBlock);
taosWUnLockLatch(&pCtx->blkLock);
}
2023-07-21 09:30:09 +00:00
static int32_t buildGroupCacheResultBlock(SGroupCacheOperatorInfo* pGCache, int32_t downstreamIdx, void* pBuf, SSDataBlock** ppRes) {
int32_t code = acquireBaseBlockFromList(&pGCache->pDownstreams[downstreamIdx], ppRes);
if (code) {
return code;
}
//TODO OPTIMIZE PERF
2023-07-21 09:30:09 +00:00
return blockDataFromBuf(*ppRes, pBuf);
}
2023-07-21 11:25:38 +00:00
static int32_t acquireDownstreamFileFd(SGcDownstreamCtx* pCtx, SGroupCacheFileFd** ppFd) {
if (NULL == pCtx->cacheFileFd.fd) {
pCtx->cacheFileFd.fd = taosOpenFile(pCtx->baseFilename, TD_FILE_CREATE|TD_FILE_READ|TD_FILE_WRITE|TD_FILE_AUTO_DEL);
if (NULL == pCtx->cacheFileFd.fd) {
return TAOS_SYSTEM_ERROR(errno);
}
pCtx->cacheFileFdNum = 1;
taosThreadMutexInit(&pCtx->cacheFileFd.mutex, NULL);
} else {
taosThreadMutexLock(&pCtx->cacheFileFd.mutex);
}
*ppFd = &pCtx->cacheFileFd;
return TSDB_CODE_SUCCESS;
}
static int32_t readBatchBlocksFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppBuf) {
TdFilePtr cacheFileFd = NULL;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pGrp->downstreamIdx];
int32_t code = acquireDownstreamFileFd(pCtx, &cacheFileFd);
if (code) {
return code;
}
int32_t ret = taosLSeekFile(cacheFileFd, pBasic->offset, SEEK_SET);
if (ret == -1) {
return TAOS_SYSTEM_ERROR(errno);
}
*ppBuf = taosMemoryMalloc(pBasic->bufSize);
if (NULL == *ppBuf) {
releaseDownstreamFileFd();
return TSDB_CODE_OUT_OF_MEMORY;
}
ret = (int32_t)taosReadFile(cacheFileFd, *ppBuf, pBasic->bufSize);
if (ret != pBasic->bufSize) {
taosMemoryFreeClear(*ppBuf);
return TAOS_SYSTEM_ERROR(errno);
}
releaseDownstreamFileFd();
return code;
}
static int32_t readSeqBlocksFromDisk(SGroupCacheOperatorInfo* pGCache, SGcBlkBufInfo* pHead) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheFileFd* pFd = NULL;
while (NULL != pHead) {
code = acquireVgroupFileFd(pGCache, pHead->pCtx, pHead->pGroup->pVgCtx, pHead->fileId, &pFd);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
int32_t ret = taosLSeekFile(pFd->fd, pHead->basic.offset, SEEK_SET);
if (ret == -1) {
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
code = (int32_t)taosWriteFile(pFd->fd, pHead->pBuf, pHead->basic.bufSize);
releaseVgroupFileFd(pFd);
if (code != pHead->basic.bufSize) {
code= TAOS_SYSTEM_ERROR(errno);
return code;
}
taosWLockLatch(&pHead->pGroup->batchList.lock);
taosArrayPush(pHead->pGroup->batchList.pList, &pHead->basic);
taosWUnLockLatch(&pHead->pGroup->batchList.lock);
int64_t blkId = pHead->basic.blkId;
pHead = pHead->next;
taosHashRemove(pGCache->blkCache.pDirtyBlk, &blkId, sizeof(blkId));
}
return code;
}
static int32_t readBlockFromDisk(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, SGcBlkBufBasic* pBasic, void** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
if (pGCache->batchFetch) {
code = readBatchBlocksFromDisk(pGCache, pGrp, pBasic, ppRes);
} else {
code = readSeqBlocksFromDisk(pGCache, pGrp, pBasic, ppRes);
}
return code;
}
static int32_t retrieveBlkFromBufCache(SGroupCacheOperatorInfo* pGCache, SGroupCacheData* pGrp, int64_t sessionId, SGcBlkBufBasic* pBasic, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SGcBlkCacheInfo* pCache = &pGCache->blkCache;
2023-07-21 09:30:09 +00:00
void* pBuf = NULL;
taosRLockLatch(&pCache->dirtyLock);
2023-07-21 11:25:38 +00:00
SGcBlkBufInfo* pBufInfo = taosHashAcquire(pCache->pDirtyBlk, &pBasic->blkId, sizeof(pBasic->blkId));
2023-07-21 09:30:09 +00:00
if (NULL == pBufInfo) {
2023-07-21 11:25:38 +00:00
code = readBlockFromDisk(pGCache, pGrp, pBasic, &pBuf);
if (code) {
return code;
}
2023-07-21 09:30:09 +00:00
} else {
pBuf = pBufInfo->pBuf;
}
2023-07-21 09:30:09 +00:00
code = buildGroupCacheResultBlock(pGCache, pGrp->downstreamIdx, pBuf, ppRes);
taosRUnLockLatch(&pCache->dirtyLock);
2023-07-21 09:30:09 +00:00
if (code) {
return code;
}
taosHashPut(pCache->pReadBlk, &sessionId, sizeof(sessionId), ppRes, POINTER_BYTES);
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void initGcVgroupCtx(SGcVgroupCtx* pVgCtx, int32_t downstreamId, int32_t vgId, SArray* pTbList) {
pVgCtx->pTbList = pTbList;
2023-07-20 02:21:19 +00:00
2023-07-21 09:30:09 +00:00
snprintf(pVgCtx->baseFilename, sizeof(pVgCtx->baseFilename) - 1, "%s/gc_%d_%s_%d_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), downstreamId, vgId);
pVgCtx->baseFilename[sizeof(pVgCtx->baseFilename) - 1] = 0;
pVgCtx->baseNameLen = strlen(pVgCtx->baseFilename);
}
static int32_t addNewGroupToVgHash(SSHashObj* pHash, SGcNewGroupInfo* pNew) {
SGcVgroupCtx* pVgCtx = pNew->pGroup->pVgCtx;
if (NULL == pVgCtx) {
SArray* pList = taosArrayInit(10, sizeof(*pNew));
if (NULL == pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pList, pNew);
2023-07-21 09:30:09 +00:00
SGcVgroupCtx vgCtx = {0};
initGcVgroupCtx(&vgCtx, pNew->pGroup->downstreamIdx, pNew->vgId, pList);
tSimpleHashPut(pHash, &pNew->vgId, sizeof(pNew->vgId), &vgCtx, sizeof(vgCtx));
pNew->pGroup->pVgCtx = tSimpleHashGet(pHash, &pNew->vgId, sizeof(pNew->vgId));
return TSDB_CODE_SUCCESS;
}
taosArrayPush(pVgCtx->pTbList, pNew);
return TSDB_CODE_SUCCESS;
2023-07-03 05:42:06 +00:00
}
static FORCE_INLINE int32_t appendNewGroupToDownstream(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SOperatorParam** ppParam) {
int32_t code = TSDB_CODE_SUCCESS;
2023-07-11 11:34:03 +00:00
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[downstreamIdx];
SOperatorParam* pDst = NULL;
taosWLockLatch(&pCtx->grpLock);
int32_t num = taosArrayGetSize(pCtx->pNewGrpList);
if (num <= 0) {
goto _return;
}
2023-07-11 11:34:03 +00:00
for (int32_t i = 0; i < num; ++i) {
SGcNewGroupInfo* pNew = taosArrayGet(pCtx->pNewGrpList, i);
2023-07-18 11:35:45 +00:00
if (!pGCache->batchFetch) {
code = addNewGroupToVgHash(pCtx->pVgTbHash, pNew);
if (code) {
goto _return;
}
}
2023-07-18 11:35:45 +00:00
if (NULL == pDst) {
pDst = pNew->pParam;
2023-07-18 11:35:45 +00:00
} else if (pNew->pParam) {
code = mergeOperatorParams(pDst, pNew->pParam);
if (code) {
goto _return;
}
}
2023-07-11 11:34:03 +00:00
}
taosArrayClear(pCtx->pNewGrpList);
2023-07-11 11:34:03 +00:00
_return:
taosWUnLockLatch(&pCtx->grpLock);
*ppParam = pDst;
2023-07-11 11:34:03 +00:00
return code;
2023-07-11 11:34:03 +00:00
}
static FORCE_INLINE int32_t getBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, int32_t downstreamIdx, SSDataBlock** ppRes) {
2023-07-10 11:23:52 +00:00
int32_t code = TSDB_CODE_SUCCESS;
SOperatorParam* pDownstreamParam = NULL;
SSDataBlock* pBlock = NULL;
2023-07-10 11:23:52 +00:00
SGroupCacheOperatorInfo* pGCache = pOperator->info;
2023-07-17 01:25:28 +00:00
code = appendNewGroupToDownstream(pOperator, downstreamIdx, &pDownstreamParam);
if (code) {
return code;
2023-07-11 11:34:03 +00:00
}
if (pDownstreamParam) {
pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextExtFn(pOperator->pDownstream[downstreamIdx], pDownstreamParam);
} else {
pBlock = pOperator->pDownstream[downstreamIdx]->fpSet.getNextFn(pOperator->pDownstream[downstreamIdx]);
}
2023-07-17 01:25:28 +00:00
if (pBlock) {
2023-07-19 02:37:30 +00:00
qError("%s group cache retrieved block with groupId: %" PRIu64, GET_TASKID(pOperator->pTaskInfo), pBlock->info.id.groupId);
2023-07-11 11:34:03 +00:00
pGCache->execInfo.pDownstreamBlkNum[downstreamIdx]++;
if (NULL == pGCache->pDownstreams[downstreamIdx].pBaseBlock) {
code = buildGroupCacheBaseBlock(&pGCache->pDownstreams[downstreamIdx].pBaseBlock, pBlock);
if (code) {
return code;
}
taosArrayPush(pGCache->pDownstreams[downstreamIdx].pFreeBlock, &pGCache->pDownstreams[downstreamIdx].pBaseBlock);
}
2023-07-11 11:34:03 +00:00
}
2023-07-11 11:34:03 +00:00
*ppRes = pBlock;
return code;
2023-07-11 11:34:03 +00:00
}
static void notifyWaitingSessions(SArray* pWaitQueue) {
if (NULL == pWaitQueue || taosArrayGetSize(pWaitQueue) <= 0) {
return;
}
int32_t n = taosArrayGetSize(pWaitQueue);
for (int32_t i = 0; i < n; ++i) {
SGcSessionCtx* pSession = taosArrayGetP(pWaitQueue, i);
tsem_post(&pSession->waitSem);
}
}
static FORCE_INLINE void handleGroupFetchDone(SGroupCacheData* pGroup) {
pGroup->pBlock = NULL;
2023-07-21 09:30:09 +00:00
atomic_store_8((int8_t*)&pGroup->fetchDone, true);
taosThreadMutexLock(&pGroup->mutex);
notifyWaitingSessions(pGroup->waitQueue);
taosArrayClear(pGroup->waitQueue);
taosThreadMutexUnlock(&pGroup->mutex);
}
2023-07-21 09:30:09 +00:00
static int32_t vgroupSwitchNewFile(SGcVgroupCtx* pVgCtx) {
if (NULL != pVgCtx->cacheFileFd) {
if (NULL == pVgCtx->pCacheFile) {
pVgCtx->pCacheFile = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pVgCtx->pCacheFile) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
tSimpleHashPut(pVgCtx->pCacheFile, &pVgCtx->fileId, sizeof(pVgCtx->fileId), &pVgCtx->cacheFileFd, sizeof(pVgCtx->cacheFileFd));
}
pVgCtx->fileId++;
pVgCtx->fileSize = 0;
return TSDB_CODE_SUCCESS;
}
static int32_t handleVgroupTableFetchDone(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, uint64_t uid) {
if (pCtx->lastBlkUid == uid || pGroup->pVgCtx->lastUid == uid) {
2023-07-21 09:30:09 +00:00
return TSDB_CODE_SUCCESS;
}
2023-07-21 09:30:09 +00:00
pCtx->lastBlkUid = uid;
pGroup->pVgCtx->lastUid = uid;
int32_t i = 0;
while (true) {
SGcNewGroupInfo* pNew = taosArrayGet(pGroup->pVgCtx->pTbList, i++);
if (NULL == pNew || pNew->uid == uid) {
break;
}
handleGroupFetchDone(pNew->pGroup);
}
if (pGroup->pVgCtx->fileSize >= GROUP_CACHE_DEFAULT_MAX_FILE_SIZE) {
2023-07-21 09:30:09 +00:00
int32_t code = vgroupSwitchNewFile(pGroup->pVgCtx);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
pGroup->fileId = pGroup->pVgCtx->fileId;
pGroup->startOffset = pGroup->pVgCtx->fileSize;
2023-07-21 09:30:09 +00:00
return TSDB_CODE_SUCCESS;
}
2023-07-19 02:37:30 +00:00
2023-07-19 11:14:30 +00:00
static FORCE_INLINE void initNewGroupData(SGcDownstreamCtx* pCtx, SGroupCacheData* pGroup, int32_t downstreamIdx, int32_t vgId, bool batchFetch) {
2023-07-19 02:37:30 +00:00
taosThreadMutexInit(&pGroup->mutex, NULL);
pGroup->downstreamIdx = downstreamIdx;
pGroup->vgId = vgId;
2023-07-19 11:14:30 +00:00
if (batchFetch) {
2023-07-21 09:30:09 +00:00
pGroup->fileId = 0;
pGroup->batchList.pList = taosArrayInit(10, POINTER_BYTES);
2023-07-19 11:14:30 +00:00
} else {
2023-07-21 09:30:09 +00:00
pGroup->fileId = -1;
2023-07-20 02:21:19 +00:00
pGroup->seqList.startBlkId = -1;
pGroup->seqList.endBlkId = -1;
2023-07-19 11:14:30 +00:00
}
2023-07-19 02:37:30 +00:00
pGroup->startOffset = -1;
pGroup->pVgCtx = tSimpleHashGet(pCtx->pVgTbHash, &pGroup->vgId, sizeof(pGroup->vgId));
}
static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGroupCacheData** ppGrp, int32_t vgId, int64_t uid) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData grpData = {0};
2023-07-19 11:14:30 +00:00
initNewGroupData(pCtx, &grpData, pParam->downstreamIdx, vgId, pGCache->batchFetch);
2023-07-19 02:37:30 +00:00
while (true) {
if (0 != taosHashPut(pGrpHash, &uid, sizeof(uid), &grpData, sizeof(grpData))) {
if (terrno == TSDB_CODE_DUP_KEY) {
*ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid));
if (*ppGrp) {
break;
}
} else {
return terrno;
}
}
*ppGrp = taosHashAcquire(pGrpHash, &uid, sizeof(uid));
if (*ppGrp && pParam->pChildren) {
SGcNewGroupInfo newGroup;
newGroup.pGroup = *ppGrp;
newGroup.vgId = vgId;
newGroup.uid = uid;
newGroup.pParam = taosArrayGetP(pParam->pChildren, 0);
taosWLockLatch(&pCtx->grpLock);
if (NULL == taosArrayPush(pCtx->pNewGrpList, &newGroup)) {
taosWUnLockLatch(&pCtx->grpLock);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosWUnLockLatch(&pCtx->grpLock);
break;
}
}
return TSDB_CODE_SUCCESS;
}
2023-07-21 11:25:38 +00:00
static int32_t addBlkToGroupCache(bool batchFetch, SGroupCacheData* pGroup, SGcBlkBufInfo* pNewBlk, int32_t* pIdx) {
taosWLockLatch(&pGroup->batchList.lock);
2023-07-19 11:14:30 +00:00
if (batchFetch) {
2023-07-21 09:30:09 +00:00
taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic);
2023-07-19 11:14:30 +00:00
} else {
2023-07-21 11:25:38 +00:00
taosArrayPush(pGroup->batchList.pList, &pNewBlk->basic.offset);
2023-07-19 11:14:30 +00:00
}
2023-07-21 11:25:38 +00:00
*pIdx = taosArrayGetSize(pGroup->batchList.pList) - 1;
qError("block added to group cache, total block num:%" PRId64, (int64_t)taosArrayGetSize(pGroup->batchList.pList));
taosWUnLockLatch(&pGroup->batchList.lock);
2023-07-19 11:14:30 +00:00
return TSDB_CODE_SUCCESS;
}
2023-07-19 02:37:30 +00:00
static int32_t handleGroupCacheRetrievedBlk(struct SOperatorInfo* pOperator, SSDataBlock* pBlock, SGcSessionCtx* pSession, bool* continueFetch) {
2023-07-11 11:34:03 +00:00
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
2023-07-14 11:36:56 +00:00
SGroupCacheData* pGroup = taosHashGet(pGrpHash, &pBlock->info.id.groupId, sizeof(pBlock->info.id.groupId));
if (NULL == pGroup) {
2023-07-18 11:35:45 +00:00
if (pGCache->batchFetch) {
SOperatorParam fakeParam = {0};
fakeParam.downstreamIdx = pSession->downstreamIdx;
code = addNewGroupData(pOperator, &fakeParam, &pGroup, -1, pBlock->info.id.groupId);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
} else {
qError("table uid:%" PRIu64 " not found in group hash", pBlock->info.id.groupId);
return TSDB_CODE_INVALID_PARA;
}
2023-07-14 11:36:56 +00:00
}
2023-07-11 11:34:03 +00:00
2023-07-18 11:35:45 +00:00
if (!pGCache->batchFetch) {
2023-07-21 09:30:09 +00:00
code = handleVgroupTableFetchDone(pCtx, pGroup, pBlock->info.id.groupId);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
2023-07-18 11:35:45 +00:00
}
2023-07-14 11:36:56 +00:00
SGcBlkBufInfo newBlkBuf;
code = addBlkToBufCache(pOperator, pBlock, pCtx, pGroup, &newBlkBuf);
if (code) {
return code;
}
2023-07-19 11:14:30 +00:00
2023-07-21 11:25:38 +00:00
int64_t blkIdx = 0;
code = addBlkToGroupCache(pGCache->batchFetch, pGroup, &newBlkBuf, &blkIdx);
2023-07-19 11:14:30 +00:00
if (code) {
return code;
2023-07-14 11:36:56 +00:00
}
2023-07-11 11:34:03 +00:00
2023-07-14 11:36:56 +00:00
notifyWaitingSessions(pGroup->waitQueue);
if (pGroup == pSession->pGroupData) {
2023-07-21 11:25:38 +00:00
pSession->lastBlkId = blkIdx;
2023-07-14 11:36:56 +00:00
*continueFetch = false;
2023-07-03 05:42:06 +00:00
}
2023-07-11 11:34:03 +00:00
2023-07-14 11:36:56 +00:00
return TSDB_CODE_SUCCESS;
2023-07-11 11:34:03 +00:00
}
static int32_t handleDownstreamFetchDone(struct SOperatorInfo* pOperator, SGcSessionCtx* pSession) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
2023-07-18 11:35:45 +00:00
if (pGCache->batchFetch) {
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData* pGroup = NULL;
while (pGroup = taosHashIterate(pGrpHash, pGroup)) {
handleGroupFetchDone(pGroup);
}
} else {
int32_t uidNum = 0;
SGcVgroupCtx* pVgCtx = NULL;
int32_t iter = 0;
while (pVgCtx = tSimpleHashIterate(pCtx->pVgTbHash, pVgCtx, &iter)) {
uidNum = taosArrayGetSize(pVgCtx->pTbList);
for (int32_t i = 0; i < uidNum; ++i) {
SGcNewGroupInfo* pNew = taosArrayGet(pVgCtx->pTbList, i);
handleGroupFetchDone(pNew->pGroup);
}
taosArrayClear(pVgCtx->pTbList);
}
}
2023-07-14 11:36:56 +00:00
taosHashClear(pCtx->pWaitSessions);
return TSDB_CODE_SUCCESS;
2023-07-11 11:34:03 +00:00
}
2023-07-14 11:36:56 +00:00
static int32_t getCacheBlkFromDownstreamOperator(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
2023-07-11 11:34:03 +00:00
bool continueFetch = true;
int32_t code = TSDB_CODE_SUCCESS;
2023-07-14 11:36:56 +00:00
SGroupCacheOperatorInfo* pGCache = pOperator->info;
2023-07-11 11:34:03 +00:00
while (continueFetch && TSDB_CODE_SUCCESS == code) {
int32_t code = getBlkFromDownstreamOperator(pOperator, pSession->downstreamIdx, ppRes);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (NULL == *ppRes) {
code = handleDownstreamFetchDone(pOperator, pSession);
break;
2023-07-14 11:36:56 +00:00
} else {
2023-07-17 01:25:28 +00:00
code = handleGroupCacheRetrievedBlk(pOperator, *ppRes, pSession, &continueFetch);
2023-07-14 11:36:56 +00:00
}
}
if (!continueFetch) {
SGcSessionCtx** ppWaitCtx = taosHashIterate(pCtx->pWaitSessions, NULL);
if (ppWaitCtx) {
taosHashCancelIterate(pCtx->pWaitSessions, ppWaitCtx);
int64_t* pSessionId = taosHashGetKey(ppWaitCtx, NULL);
if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, *pSessionId)) {
qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SGcSessionCtx* pWaitCtx = *ppWaitCtx;
pWaitCtx->newFetch = true;
taosHashRemove(pCtx->pWaitSessions, pSessionId, sizeof(*pSessionId));
tsem_post(&pWaitCtx->waitSem);
return code;
2023-07-11 11:34:03 +00:00
}
2023-07-03 05:42:06 +00:00
}
2023-07-14 11:36:56 +00:00
if (sessionId != atomic_val_compare_exchange_64(&pCtx->fetchSessionId, sessionId, -1)) {
qError("wrong fetch sessionId: %" PRIu64 " expected: %" PRIu64 , pCtx->fetchSessionId, sessionId);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
2023-07-11 11:34:03 +00:00
return code;
}
2023-07-19 11:14:30 +00:00
static int32_t getBlkFromSessionCacheImpl(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes, bool* got) {
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
*got = true;
2023-07-21 11:25:38 +00:00
SGroupBatchBlkList* pBatchList = &pSession->pGroupData->batchList;
taosRLockLatch(&pBatchList->lock);
int64_t blkNum = taosArrayGetSize(pBatchList->pList);
if (pSession->lastBlkId < 0) {
if (blkNum > 0) {
SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, 0);
2023-07-20 02:21:19 +00:00
taosRUnLockLatch(&pBatchList->lock);
2023-07-21 11:25:38 +00:00
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
pSession->lastBlkId = 0;
2023-07-19 11:14:30 +00:00
return code;
}
2023-07-21 11:25:38 +00:00
} else if ((pSession->lastBlkId + 1) < blkNum) {
SGcBlkBufBasic* pBasic = taosArrayGet(pBatchList->pList, pSession->lastBlkId + 1);
2023-07-20 02:21:19 +00:00
taosRUnLockLatch(&pBatchList->lock);
2023-07-21 11:25:38 +00:00
code = retrieveBlkFromBufCache(pGCache, pSession->pGroupData, sessionId, pBasic, ppRes);
pSession->lastBlkId++;
return code;
2023-07-19 11:14:30 +00:00
}
2023-07-21 11:25:38 +00:00
taosRUnLockLatch(&pBatchList->lock);
2023-07-19 11:14:30 +00:00
if (atomic_load_8((int8_t*)&pSession->pGroupData->fetchDone)) {
*ppRes = NULL;
return code;
}
*got = false;
return code;
}
2023-07-14 11:36:56 +00:00
static int32_t groupCacheSessionWait(struct SOperatorInfo* pOperator, SGcDownstreamCtx* pCtx, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGroupCacheData* pGroup = pSession->pGroupData;
int32_t code = TSDB_CODE_SUCCESS;
2023-07-11 11:34:03 +00:00
if (NULL == pGroup->waitQueue) {
pGroup->waitQueue = taosArrayInit(1, POINTER_BYTES);
if (NULL == pGroup->waitQueue) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosArrayPush(pGroup->waitQueue, &pSession);
if (!pSession->semInit) {
tsem_init(&pSession->waitSem, 0, 0);
pSession->semInit = true;
}
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
2023-07-14 11:36:56 +00:00
taosHashPut(pCtx->pWaitSessions, &sessionId, sizeof(sessionId), &pSession, POINTER_BYTES);
2023-07-11 11:34:03 +00:00
tsem_wait(&pSession->waitSem);
2023-07-14 11:36:56 +00:00
if (pSession->newFetch) {
pSession->newFetch = false;
return getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
}
taosHashRemove(pCtx->pWaitSessions, &sessionId, sizeof(sessionId));
2023-07-19 11:14:30 +00:00
bool got = false;
return getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
2023-07-11 11:34:03 +00:00
}
2023-07-19 11:14:30 +00:00
static int32_t getBlkFromSessionCache(struct SOperatorInfo* pOperator, int64_t sessionId, SGcSessionCtx* pSession, SSDataBlock** ppRes) {
2023-07-11 11:34:03 +00:00
int32_t code = TSDB_CODE_SUCCESS;
SGroupCacheOperatorInfo* pGCache = pOperator->info;
bool locked = false;
2023-07-14 11:36:56 +00:00
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pSession->downstreamIdx];
2023-07-11 11:34:03 +00:00
while (true) {
2023-07-19 11:14:30 +00:00
bool got = false;
code = getBlkFromSessionCacheImpl(pOperator, sessionId, pSession, ppRes, &got);
if (TSDB_CODE_SUCCESS != code || got) {
2023-07-17 01:25:28 +00:00
goto _return;
2023-07-11 11:34:03 +00:00
}
2023-07-14 11:36:56 +00:00
if ((atomic_load_64(&pCtx->fetchSessionId) == sessionId)
|| (-1 == atomic_val_compare_exchange_64(&pCtx->fetchSessionId, -1, sessionId))) {
2023-07-11 11:34:03 +00:00
if (locked) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
locked = false;
}
2023-07-14 11:36:56 +00:00
code = getCacheBlkFromDownstreamOperator(pOperator, pCtx, sessionId, pSession, ppRes);
2023-07-11 11:34:03 +00:00
goto _return;
}
if (locked) {
2023-07-14 11:36:56 +00:00
code = groupCacheSessionWait(pOperator, pCtx, sessionId, pSession, ppRes);
2023-07-11 11:34:03 +00:00
locked = false;
if (TSDB_CODE_SUCCESS != code) {
goto _return;
}
break;
}
taosThreadMutexLock(&pSession->pGroupData->mutex);
locked = true;
};
_return:
if (locked) {
taosThreadMutexUnlock(&pSession->pGroupData->mutex);
}
return code;
2023-07-03 05:42:06 +00:00
}
static int32_t initGroupCacheBlockCache(SGroupCacheOperatorInfo* pInfo) {
SGcBlkCacheInfo* pCache = &pInfo->blkCache;
pCache->pDirtyBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pCache->pDirtyBlk) {
return TSDB_CODE_OUT_OF_MEMORY;
}
2023-07-21 09:30:09 +00:00
taosHashSetFreeFp(pCache->pDirtyBlk,,,,,,,,);
pCache->pReadBlk = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pCache->pReadBlk) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
2023-06-30 11:20:36 +00:00
2023-07-14 11:36:56 +00:00
static FORCE_INLINE void initGroupCacheSessionCtx(SGcSessionCtx* pSession, SGcOperatorParam* pGcParam, SGroupCacheData* pGroup) {
pSession->pParam = pGcParam;
pSession->downstreamIdx = pGcParam->downstreamIdx;
pSession->pGroupData = pGroup;
pSession->lastBlkId = -1;
}
static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SOperatorParam* pParam, SGcSessionCtx** ppSession) {
int32_t code = TSDB_CODE_SUCCESS;
2023-07-03 05:42:06 +00:00
SGcSessionCtx ctx = {0};
SGcOperatorParam* pGcParam = pParam->value;
2023-07-03 11:29:46 +00:00
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
SHashObj* pGrpHash = pGCache->globalGrp ? pGCache->pGrpHash : pCtx->pGrpHash;
SGroupCacheData* pGroup = taosHashAcquire(pGrpHash, &pGcParam->tbUid, sizeof(pGcParam->tbUid));
2023-07-14 11:36:56 +00:00
if (NULL == pGroup) {
2023-07-19 02:37:30 +00:00
code = addNewGroupData(pOperator, pParam, &pGroup, pGCache->batchFetch ? -1 : pGcParam->vgId, pGcParam->tbUid);
2023-07-10 11:23:52 +00:00
if (TSDB_CODE_SUCCESS != code) {
return code;
}
2023-07-03 05:42:06 +00:00
}
2023-07-06 11:33:31 +00:00
2023-07-14 11:36:56 +00:00
initGroupCacheSessionCtx(&ctx, pGcParam, pGroup);
code = taosHashPut(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId), &ctx, sizeof(ctx));
2023-07-10 11:23:52 +00:00
if (TSDB_CODE_SUCCESS != code) {
return code;
2023-07-06 11:33:31 +00:00
}
2023-07-10 11:23:52 +00:00
2023-07-14 11:36:56 +00:00
*ppSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
2023-07-03 05:42:06 +00:00
return TSDB_CODE_SUCCESS;
}
static int32_t getBlkFromGroupCache(struct SOperatorInfo* pOperator, SSDataBlock** ppRes, SOperatorParam* pParam) {
2023-07-14 11:36:56 +00:00
int32_t code = TSDB_CODE_SUCCESS;
2023-07-11 11:34:03 +00:00
SGroupCacheOperatorInfo* pGCache = pOperator->info;
SGcOperatorParam* pGcParam = pParam->value;
2023-07-14 11:36:56 +00:00
SGcDownstreamCtx* pCtx = &pGCache->pDownstreams[pParam->downstreamIdx];
SGcSessionCtx* pSession = taosHashGet(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
2023-07-11 11:34:03 +00:00
if (NULL == pSession) {
int32_t code = initGroupCacheSession(pOperator, pParam, &pSession);
2023-07-03 05:42:06 +00:00
if (TSDB_CODE_SUCCESS != code) {
2023-07-10 11:23:52 +00:00
return code;
2023-07-03 05:42:06 +00:00
}
2023-07-17 01:25:28 +00:00
} else {
2023-07-14 11:36:56 +00:00
SSDataBlock** ppBlock = taosHashGet(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
if (ppBlock) {
releaseBaseBlockToList(pCtx, *ppBlock);
taosHashRemove(pGCache->blkCache.pReadBlk, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
}
2023-07-03 05:42:06 +00:00
}
2023-07-19 11:14:30 +00:00
code = getBlkFromSessionCache(pOperator, pGcParam->sessionId, pSession, ppRes);
2023-07-14 11:36:56 +00:00
if (NULL == ppRes) {
taosHashRemove(pCtx->pSessions, &pGcParam->sessionId, sizeof(pGcParam->sessionId));
2023-07-03 05:42:06 +00:00
}
2023-07-14 11:36:56 +00:00
return code;
2023-07-03 05:42:06 +00:00
}
2023-07-07 08:49:35 +00:00
static int32_t initGroupCacheExecInfo(SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pInfo = pOperator->info;
pInfo->execInfo.downstreamNum = pOperator->numOfDownstream;
pInfo->execInfo.pDownstreamBlkNum = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(int64_t));
if (NULL == pInfo->execInfo.pDownstreamBlkNum) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
2023-06-30 11:20:36 +00:00
2023-07-11 11:34:03 +00:00
static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
SGroupCacheOperatorInfo* pInfo = pOperator->info;
2023-07-14 11:36:56 +00:00
pInfo->pDownstreams = taosMemoryCalloc(pOperator->numOfDownstream, sizeof(*pInfo->pDownstreams));
2023-07-11 11:34:03 +00:00
if (NULL == pInfo->pDownstreams) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
2023-07-21 09:30:09 +00:00
SGcDownstreamCtx* pCtx = &pInfo->pDownstreams[i];
pCtx->id = i;
pCtx->fetchSessionId = -1;
pCtx->lastBlkUid = 0;
pCtx->pVgTbHash = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT));
if (NULL == pCtx->pVgTbHash) {
2023-07-11 11:34:03 +00:00
return TSDB_CODE_OUT_OF_MEMORY;
}
2023-07-19 02:37:30 +00:00
if (pInfo->batchFetch) {
2023-07-21 09:30:09 +00:00
int32_t defaultVg = 0;
SGcVgroupCtx vgCtx = {0};
initGcVgroupCtx(&vgCtx, pCtx->id, defaultVg, NULL);
tSimpleHashPut(pCtx->pVgTbHash, &defaultVg, sizeof(defaultVg), &vgCtx, sizeof(vgCtx));
2023-07-19 02:37:30 +00:00
}
2023-07-21 09:30:09 +00:00
pCtx->pNewGrpList = taosArrayInit(10, sizeof(SGcNewGroupInfo));
if (NULL == pCtx->pNewGrpList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (!pInfo->globalGrp) {
2023-07-21 09:30:09 +00:00
pCtx->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pCtx->pGrpHash == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
2023-07-14 11:36:56 +00:00
2023-07-21 09:30:09 +00:00
pCtx->pSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (pCtx->pSessions == NULL) {
2023-07-14 11:36:56 +00:00
return TSDB_CODE_OUT_OF_MEMORY;
}
2023-07-21 09:30:09 +00:00
pCtx->pFreeBlock = taosArrayInit(10, POINTER_BYTES);
if (NULL == pCtx->pFreeBlock) {
return TSDB_CODE_OUT_OF_MEMORY;
}
2023-07-14 11:36:56 +00:00
2023-07-21 09:30:09 +00:00
pCtx->pWaitSessions = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
if (pCtx->pWaitSessions == NULL) {
2023-07-14 11:36:56 +00:00
return TSDB_CODE_OUT_OF_MEMORY;
}
2023-07-21 09:30:09 +00:00
snprintf(pCtx->baseFilename, sizeof(pCtx->baseFilename) - 1, "%s/gc_%d_%s_%d", tsTempDir, getpid(), GET_TASKID(pOperator->pTaskInfo), pCtx->id);
pCtx->baseFilename[sizeof(pCtx->baseFilename) - 1] = 0;
2023-07-11 11:34:03 +00:00
}
return TSDB_CODE_SUCCESS;
}
SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
SSDataBlock* pBlock = NULL;
2023-07-11 11:34:03 +00:00
int32_t code = getBlkFromGroupCache(pOperator, &pBlock, pParam);
2023-07-11 11:34:03 +00:00
if (TSDB_CODE_SUCCESS != code) {
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
return pBlock;
2023-07-11 11:34:03 +00:00
}
2023-06-30 11:20:36 +00:00
SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
SGroupCachePhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) {
SGroupCacheOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupCacheOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
int32_t code = TSDB_CODE_SUCCESS;
if (pOperator == NULL || pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pOperator->transparent = true;
2023-06-30 11:20:36 +00:00
setOperatorInfo(pOperator, "GroupCacheOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
2023-07-21 09:30:09 +00:00
pInfo->maxCacheSize = 104857600;
2023-07-11 11:34:03 +00:00
pInfo->grpByUid = pPhyciNode->grpByUid;
pInfo->globalGrp = pPhyciNode->globalGrp;
2023-07-19 02:37:30 +00:00
pInfo->batchFetch = pPhyciNode->batchFetch;
2023-07-11 11:34:03 +00:00
if (!pInfo->grpByUid) {
qError("only group cache by uid is supported now");
code = TSDB_CODE_INVALID_PARA;
goto _error;
2023-07-11 11:34:03 +00:00
}
if (pPhyciNode->pGroupCols) {
code = initGroupColsInfo(&pInfo->groupColsInfo, pPhyciNode->grpColsMayBeNull, pPhyciNode->pGroupCols);
if (code) {
goto _error;
}
2023-06-30 11:20:36 +00:00
}
code = initGroupCacheBlockCache(pInfo);
2023-06-30 11:20:36 +00:00
if (code) {
goto _error;
}
if (pInfo->globalGrp) {
pInfo->pGrpHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (pInfo->pGrpHash == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
2023-06-30 11:20:36 +00:00
}
2023-07-03 11:29:46 +00:00
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
if (TSDB_CODE_SUCCESS != code) {
2023-06-30 11:20:36 +00:00
goto _error;
}
2023-07-11 11:34:03 +00:00
code = initGroupCacheDownstreamCtx(pOperator);
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
2023-07-07 08:49:35 +00:00
code = initGroupCacheExecInfo(pOperator);
if (TSDB_CODE_SUCCESS != code) {
goto _error;
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, groupCacheGetNext, NULL);
2023-06-30 11:20:36 +00:00
return pOperator;
_error:
if (pInfo != NULL) {
destroyGroupCacheOperator(pInfo);
}
taosMemoryFree(pOperator);
pTaskInfo->code = code;
return NULL;
}