TDengine/source/libs/executor/src/dataDispatcher.c

224 lines
8.4 KiB
C
Raw Normal View History

2022-01-06 23:34:51 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
2022-01-06 23:34:51 +00:00
#include "planner.h"
#include "tcompression.h"
2022-04-09 08:44:31 +00:00
#include "tdatablock.h"
2022-01-06 23:34:51 +00:00
#include "tglobal.h"
#include "tqueue.h"
typedef struct SDataDispatchBuf {
int32_t useSize;
int32_t allocSize;
2022-04-09 08:44:31 +00:00
char* pData;
} SDataDispatchBuf;
typedef struct SDataCacheEntry {
int32_t dataLen;
int32_t numOfRows;
int32_t numOfCols;
int8_t compressed;
char data[];
} SDataCacheEntry;
2022-01-06 23:34:51 +00:00
typedef struct SDataDispatchHandle {
2022-04-09 08:44:31 +00:00
SDataSinkHandle sink;
SDataSinkManager* pManager;
2022-03-04 07:17:04 +00:00
SDataBlockDescNode* pSchema;
2022-04-09 08:44:31 +00:00
STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput;
int32_t status;
bool queryEnd;
uint64_t useconds;
TdThreadMutex mutex;
2022-01-06 23:34:51 +00:00
} SDataDispatchHandle;
2022-04-09 08:44:31 +00:00
static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
2022-01-06 23:34:51 +00:00
if (tsCompressColData < 0 || 0 == pData->info.rows) {
return false;
}
2022-02-28 09:02:43 +00:00
for (int32_t col = 0; col < numOfCols; ++col) {
2022-01-06 23:34:51 +00:00
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
2022-04-09 08:44:31 +00:00
int32_t colSize = pColRes->info.bytes * pData->info.rows;
2022-01-06 23:34:51 +00:00
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
}
return false;
}
2022-03-17 05:42:49 +00:00
// data format:
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | total length | group id | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
// | | (4 bytes) |(8 bytes) | sizeof(int32_t) * numOfCols | actual size | | actual size | |
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
2022-03-17 05:42:49 +00:00
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
2022-04-09 08:44:31 +00:00
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
2022-04-09 08:44:31 +00:00
pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols);
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->numOfCols = pInput->pData->info.numOfCols;
2022-04-09 08:44:31 +00:00
pEntry->dataLen = 0;
2022-01-06 23:34:51 +00:00
2022-03-05 07:58:28 +00:00
pBuf->useSize = sizeof(SRetrieveTableRsp);
2022-04-09 08:44:31 +00:00
blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
2022-04-09 08:44:31 +00:00
pBuf->useSize += pEntry->dataLen;
2022-01-06 23:34:51 +00:00
}
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
2022-01-25 02:41:35 +00:00
uint32_t capacity = pDispatcher->pManager->cfg.maxDataBlockNumPerQuery;
if (taosQueueItemSize(pDispatcher->pDataBlocks) > capacity) {
2022-01-25 02:41:35 +00:00
qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity,
taosQueueItemSize(pDispatcher->pDataBlocks));
return false;
}
2022-01-25 02:41:35 +00:00
2022-04-13 08:00:30 +00:00
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pInput->pData);
2022-03-17 05:42:49 +00:00
2022-03-25 16:29:53 +00:00
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
2022-01-25 02:41:35 +00:00
if (pBuf->pData == NULL) {
qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno));
}
return NULL != pBuf->pData;
}
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&pDispatcher->mutex);
int32_t blockNums = taosQueueItemSize(pDispatcher->pDataBlocks);
2022-04-09 08:44:31 +00:00
int32_t status =
(0 == blockNums ? DS_BUF_EMPTY
: (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDispatcher->status = status;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&pDispatcher->mutex);
return status;
}
static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&pDispatcher->mutex);
int32_t status = pDispatcher->status;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&pDispatcher->mutex);
return status;
}
2022-01-11 07:56:24 +00:00
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
2022-01-06 23:34:51 +00:00
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM);
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
toDataCacheEntry(pDispatcher, pInput, pBuf);
taosWriteQitem(pDispatcher->pDataBlocks, pBuf);
2022-01-11 07:56:24 +00:00
*pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false);
return TSDB_CODE_SUCCESS;
2022-01-06 23:34:51 +00:00
}
2022-01-18 06:12:25 +00:00
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&pDispatcher->mutex);
2022-01-11 07:56:24 +00:00
pDispatcher->queryEnd = true;
pDispatcher->useconds = useconds;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&pDispatcher->mutex);
}
2022-01-06 23:34:51 +00:00
2022-01-11 07:56:24 +00:00
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
2022-01-11 07:56:24 +00:00
*pQueryEnd = pDispatcher->queryEnd;
*pLen = 0;
return;
}
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf);
2022-01-11 07:56:24 +00:00
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
2022-04-09 08:44:31 +00:00
*pQueryEnd = pDispatcher->queryEnd;
2022-01-06 23:34:51 +00:00
}
2022-01-11 07:56:24 +00:00
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (NULL == pDispatcher->nextOutput.pData) {
assert(pDispatcher->queryEnd);
pOutput->useconds = pDispatcher->useconds;
2022-03-04 07:17:04 +00:00
pOutput->precision = pDispatcher->pSchema->precision;
2022-04-08 02:27:27 +00:00
pOutput->bufStatus = DS_BUF_EMPTY;
pOutput->queryEnd = pDispatcher->queryEnd;
return TSDB_CODE_SUCCESS;
}
SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData);
memcpy(pOutput->pData, pEntry->data, pEntry->dataLen);
pOutput->numOfRows = pEntry->numOfRows;
pOutput->numOfCols = pEntry->numOfCols;
pOutput->compressed = pEntry->compressed;
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(pDispatcher->nextOutput.pData); // todo persistent
2022-01-11 07:56:24 +00:00
pOutput->bufStatus = updateStatus(pDispatcher);
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&pDispatcher->mutex);
2022-01-11 07:56:24 +00:00
pOutput->queryEnd = pDispatcher->queryEnd;
pOutput->useconds = pDispatcher->useconds;
2022-03-04 07:17:04 +00:00
pOutput->precision = pDispatcher->pSchema->precision;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS;
}
2022-01-06 23:34:51 +00:00
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(pDispatcher->nextOutput.pData);
while (!taosQueueEmpty(pDispatcher->pDataBlocks)) {
SDataDispatchBuf* pBuf = NULL;
taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf);
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(pBuf->pData);
taosFreeQitem(pBuf);
}
taosCloseQueue(pDispatcher->pDataBlocks);
2022-03-19 16:47:45 +00:00
taosThreadMutexDestroy(&pDispatcher->mutex);
return TSDB_CODE_SUCCESS;
2022-01-06 23:34:51 +00:00
}
2022-02-28 09:02:43 +00:00
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle) {
2022-03-25 16:29:53 +00:00
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
2022-01-06 23:34:51 +00:00
if (NULL == dispatcher) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
2022-01-06 23:34:51 +00:00
}
dispatcher->sink.fPut = putDataBlock;
2022-01-11 10:15:37 +00:00
dispatcher->sink.fEndPut = endPut;
dispatcher->sink.fGetLen = getDataLength;
dispatcher->sink.fGetData = getDataBlock;
2022-01-06 23:34:51 +00:00
dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->pManager = pManager;
2022-03-04 07:17:04 +00:00
dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
2022-01-11 07:56:24 +00:00
dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false;
2022-01-06 23:34:51 +00:00
dispatcher->pDataBlocks = taosOpenQueue();
2022-03-19 16:47:45 +00:00
taosThreadMutexInit(&dispatcher->mutex, NULL);
2022-01-06 23:34:51 +00:00
if (NULL == dispatcher->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return TSDB_CODE_QRY_OUT_OF_MEMORY;
2022-01-06 23:34:51 +00:00
}
*pHandle = dispatcher;
return TSDB_CODE_SUCCESS;
}