TDengine/source/libs/stream/src/streamData.c

237 lines
8.1 KiB
C
Raw Normal View History

2022-06-06 08:34:12 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2023-07-05 07:55:55 +00:00
#include "streamInt.h"
2022-06-06 08:34:12 +00:00
2023-05-20 10:05:39 +00:00
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
if (pData == NULL) {
return NULL;
}
pData->type = blockType;
pData->srcVgId = srcVg;
2022-06-07 03:36:40 +00:00
int32_t blockNum = pReq->blockNum;
2023-02-20 02:12:27 +00:00
SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
2022-06-07 03:36:40 +00:00
if (pArray == NULL) {
2023-06-29 07:54:09 +00:00
taosFreeQitem(pData);
2023-05-20 10:05:39 +00:00
return NULL;
2022-06-07 03:36:40 +00:00
}
2023-05-20 10:05:39 +00:00
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
2022-06-07 03:36:40 +00:00
for (int32_t i = 0; i < blockNum; i++) {
2023-02-21 09:59:08 +00:00
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
2022-06-07 03:36:40 +00:00
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
blockDecode(pDataBlock, pRetrieve->data);
2023-05-20 10:05:39 +00:00
2022-06-07 03:36:40 +00:00
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
pDataBlock->info.version = be64toh(pRetrieve->version);
pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
2022-06-10 02:24:56 +00:00
pDataBlock->info.type = pRetrieve->streamBlockType;
2022-06-22 09:56:46 +00:00
pDataBlock->info.childId = pReq->upstreamChildId;
}
2023-05-19 07:30:50 +00:00
2022-06-22 09:56:46 +00:00
pData->blocks = pArray;
2023-05-20 10:05:39 +00:00
return pData;
}
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) {
SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize);
if (pStreamBlocks == NULL) {
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
return NULL;
}
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes;
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
2023-06-07 07:03:04 +00:00
pStreamBlocks->childId = pTask->info.selfChildId;
2023-05-20 10:05:39 +00:00
pStreamBlocks->sourceVer = pSubmit->ver;
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
2023-06-07 07:03:04 +00:00
pStreamBlocks->childId = pTask->info.selfChildId;
2023-05-20 10:05:39 +00:00
pStreamBlocks->sourceVer = pMerged->ver;
}
return pStreamBlocks;
}
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {
if (pBlock == NULL) {
return;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
2022-06-22 09:56:46 +00:00
}
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) {
SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock));
if (pArray == NULL) {
return -1;
2022-06-07 03:36:40 +00:00
}
2023-04-03 10:07:28 +00:00
2023-01-17 01:43:33 +00:00
taosArrayPush(pArray, &(SSDataBlock){0});
2022-06-22 09:56:46 +00:00
SRetrieveTableRsp* pRetrieve = pReq->pRetrieve;
SSDataBlock* pDataBlock = taosArrayGet(pArray, 0);
blockDecode(pDataBlock, pRetrieve->data);
2023-04-03 10:07:28 +00:00
// TODO: refactor
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
pDataBlock->info.version = be64toh(pRetrieve->version);
pDataBlock->info.type = pRetrieve->streamBlockType;
2022-07-18 03:41:56 +00:00
pData->reqId = pReq->reqId;
2022-06-07 03:36:40 +00:00
pData->blocks = pArray;
2022-07-18 03:41:56 +00:00
2022-06-07 03:36:40 +00:00
return 0;
}
2023-05-13 15:22:36 +00:00
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) {
SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen);
2023-04-03 09:47:48 +00:00
if (pDataSubmit == NULL) {
return NULL;
}
2022-06-06 08:34:12 +00:00
pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t));
2023-04-03 09:47:48 +00:00
if (pDataSubmit->dataRef == NULL) {
taosFreeQitem(pDataSubmit);
return NULL;
}
2023-05-13 15:22:36 +00:00
pDataSubmit->submit = *pData;
2023-04-03 10:07:28 +00:00
*pDataSubmit->dataRef = 1; // initialize the reference count to be 1
2023-04-08 17:39:09 +00:00
pDataSubmit->type = type;
2023-04-03 10:07:28 +00:00
2022-06-06 08:34:12 +00:00
return pDataSubmit;
}
2023-05-13 15:22:36 +00:00
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
2023-04-03 10:07:28 +00:00
int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1);
ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT);
if (ref == 0) {
taosMemoryFree(pDataSubmit->submit.msgStr);
taosMemoryFree(pDataSubmit->dataRef);
}
}
2023-05-13 15:22:36 +00:00
SStreamMergedSubmit* streamMergedSubmitNew() {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0);
2023-04-03 10:07:28 +00:00
if (pMerged == NULL) {
return NULL;
}
2022-12-08 05:34:42 +00:00
pMerged->submits = taosArrayInit(0, sizeof(SPackedData));
pMerged->dataRefs = taosArrayInit(0, sizeof(void*));
2023-04-03 10:07:28 +00:00
if (pMerged->dataRefs == NULL || pMerged->submits == NULL) {
taosArrayDestroy(pMerged->submits);
taosArrayDestroy(pMerged->dataRefs);
taosFreeQitem(pMerged);
return NULL;
}
2022-07-19 08:54:19 +00:00
pMerged->type = STREAM_INPUT__MERGED_SUBMIT;
return pMerged;
}
2023-05-13 15:22:36 +00:00
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef);
2022-12-07 07:19:34 +00:00
taosArrayPush(pMerged->submits, &pSubmit->submit);
pMerged->ver = pSubmit->ver;
return 0;
}
2023-04-08 17:39:09 +00:00
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) {
terrno = 0;
2023-04-08 17:39:09 +00:00
if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) {
2022-07-13 08:37:33 +00:00
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
2023-04-08 17:39:09 +00:00
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
2022-07-13 08:37:33 +00:00
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
taosArrayDestroy(pBlockSrc->blocks);
2023-04-08 17:39:09 +00:00
taosFreeQitem(pElem);
2022-07-19 08:54:19 +00:00
return dst;
2023-04-08 17:39:09 +00:00
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
2023-05-13 15:22:36 +00:00
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;
streamMergeSubmit(pMerged, pBlockSrc);
2023-04-08 17:39:09 +00:00
taosFreeQitem(pElem);
2022-07-19 08:54:19 +00:00
return dst;
2023-04-08 17:39:09 +00:00
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
2023-05-13 15:22:36 +00:00
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
if (pMerged == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
2023-05-10 14:22:01 +00:00
2023-05-13 15:22:36 +00:00
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
taosFreeQitem(dst);
2023-04-08 17:39:09 +00:00
taosFreeQitem(pElem);
2022-07-19 08:54:19 +00:00
return (SStreamQueueItem*)pMerged;
2022-07-13 08:37:33 +00:00
} else {
qDebug("block type:%d not merged with existed blocks list, type:%d", pElem->type, dst->type);
2022-07-19 08:54:19 +00:00
return NULL;
2022-07-13 08:37:33 +00:00
}
}
void streamFreeQitem(SStreamQueueItem* data) {
int8_t type = data->type;
2022-07-15 09:48:48 +00:00
if (type == STREAM_INPUT__GET_RES) {
2022-07-13 08:37:33 +00:00
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
taosFreeQitem(data);
2023-08-13 12:02:15 +00:00
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) {
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
2022-07-13 08:37:33 +00:00
taosFreeQitem(data);
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
2023-05-13 15:22:36 +00:00
streamDataSubmitDestroy((SStreamDataSubmit*)data);
2022-07-13 08:37:33 +00:00
taosFreeQitem(data);
} else if (type == STREAM_INPUT__MERGED_SUBMIT) {
2023-05-13 15:22:36 +00:00
SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data;
2023-05-20 03:37:17 +00:00
int32_t sz = taosArrayGetSize(pMerge->submits);
for (int32_t i = 0; i < sz; i++) {
2022-08-03 07:19:52 +00:00
int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i);
int32_t ref = atomic_sub_fetch_32(pRef, 1);
ASSERT(ref >= 0);
2023-05-20 03:37:17 +00:00
2022-08-03 07:19:52 +00:00
if (ref == 0) {
2022-12-08 05:34:42 +00:00
SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i);
2022-12-07 07:19:34 +00:00
taosMemoryFree(pSubmit->msgStr);
2022-08-03 07:19:52 +00:00
taosMemoryFree(pRef);
}
}
2022-12-07 07:19:34 +00:00
taosArrayDestroy(pMerge->submits);
taosArrayDestroy(pMerge->dataRefs);
taosFreeQitem(pMerge);
2022-09-21 03:54:44 +00:00
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data;
2023-05-16 05:59:13 +00:00
blockDataDestroy(pRefBlock->pBlock);
2022-09-21 03:54:44 +00:00
taosFreeQitem(pRefBlock);
2022-07-13 08:37:33 +00:00
}
}