TDengine/source/libs/executor/src/executor.c

2006 lines
69 KiB
C
Raw Normal View History

2021-09-24 10:05:56 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
2025-05-08 01:48:53 +00:00
#include <stdint.h>
2025-04-29 07:31:17 +00:00
#include "cmdnodes.h"
2023-04-28 03:42:34 +00:00
#include "executorInt.h"
#include "operator.h"
#include "planner.h"
2025-05-08 01:48:53 +00:00
#include "query.h"
2023-04-28 03:42:34 +00:00
#include "querytask.h"
2025-04-29 07:31:17 +00:00
#include "streamexecutorInt.h"
2022-03-29 07:24:25 +00:00
#include "tdatablock.h"
2022-07-27 02:52:25 +00:00
#include "tref.h"
2024-07-22 04:51:25 +00:00
#include "trpc.h"
2022-07-22 06:38:28 +00:00
#include "tudf.h"
2024-07-22 04:51:25 +00:00
#include "wal.h"
2025-05-08 01:48:53 +00:00
#include "dataSinkInt.h"
#include "storageapi.h"
2022-07-22 06:38:28 +00:00
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1;
2025-05-08 01:48:53 +00:00
SGlobalExecInfo gExecInfo = {0};
void gExecInfoInit(void* pDnode, getDnodeId_f getDnodeId, getMnodeEpset_f getMnode) {
gExecInfo.dnode = pDnode;
gExecInfo.getMnode = getMnode;
gExecInfo.getDnodeId = getDnodeId;
return;
}
int32_t getCurrentMnodeEpset(SEpSet *pEpSet) {
if(gExecInfo.dnode == NULL || gExecInfo.getMnode == NULL) {
qError("gExecInfo is not initialized");
return TSDB_CODE_APP_ERROR;
}
gExecInfo.getMnode(gExecInfo.dnode, pEpSet);
return TSDB_CODE_SUCCESS;
}
2022-07-22 06:38:28 +00:00
static void cleanupRefPool() {
int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
taosCloseRef(ref);
2022-07-22 06:38:28 +00:00
}
2023-04-04 06:50:58 +00:00
static void initRefPool() {
exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo);
2024-07-22 04:51:25 +00:00
(void)atexit(cleanupRefPool);
2023-01-18 09:27:27 +00:00
}
2022-10-27 03:06:16 +00:00
static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
2024-07-22 04:51:25 +00:00
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
2022-10-27 03:06:16 +00:00
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
2022-12-03 02:03:18 +00:00
return TSDB_CODE_APP_ERROR;
2022-10-27 03:06:16 +00:00
}
if (pOperator->numOfDownstream > 1) { // not handle this in join query
qError("join not supported for stream block scan, %s" PRIx64, id);
2022-12-03 02:03:18 +00:00
return TSDB_CODE_APP_ERROR;
2022-10-27 03:06:16 +00:00
}
pOperator->status = OP_NOT_OPENED;
return doSetSMABlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
} else {
pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info;
if (type == STREAM_INPUT__MERGED_SUBMIT) {
for (int32_t i = 0; i < numOfBlocks; i++) {
2022-12-08 06:16:57 +00:00
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
2024-07-22 04:51:25 +00:00
void* tmp = taosArrayPush(pInfo->pBlockLists, pReq);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2022-10-27 03:06:16 +00:00
}
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
2024-07-22 04:51:25 +00:00
void* tmp = taosArrayPush(pInfo->pBlockLists, &input);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2022-10-27 03:06:16 +00:00
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
} else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
2024-07-22 04:51:25 +00:00
SPackedData tmp = {.pDataBlock = pDataBlock};
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
2022-10-27 03:06:16 +00:00
}
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
2023-11-01 07:34:17 +00:00
} else if (type == STREAM_INPUT__CHECKPOINT) {
2023-11-02 01:44:43 +00:00
SPackedData tmp = {.pDataBlock = input};
2024-07-22 04:51:25 +00:00
void* tmpItem = taosArrayPush(pInfo->pBlockLists, &tmp);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmpItem, code, lino, _end, terrno);
2023-11-01 07:34:17 +00:00
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
2023-09-20 02:11:03 +00:00
} else if (type == STREAM_INPUT__REF_DATA_BLOCK) {
2024-07-22 04:51:25 +00:00
for (int32_t i = 0; i < numOfBlocks; ++i) {
2023-11-09 11:51:01 +00:00
SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData));
2024-07-22 04:51:25 +00:00
void* tmp = taosArrayPush(pInfo->pBlockLists, pReq);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2023-09-20 02:11:03 +00:00
}
2023-11-09 11:51:01 +00:00
pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
2022-10-27 03:06:16 +00:00
}
return TSDB_CODE_SUCCESS;
}
2024-07-22 04:51:25 +00:00
_end:
if (code != TSDB_CODE_SUCCESS) {
2024-07-23 06:19:04 +00:00
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
2024-07-22 04:51:25 +00:00
}
return code;
2022-10-27 03:06:16 +00:00
}
2022-11-01 03:56:14 +00:00
static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) {
2023-02-22 06:32:39 +00:00
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
return TSDB_CODE_APP_ERROR;
}
2022-11-01 03:56:14 +00:00
2023-02-22 06:32:39 +00:00
if (pOperator->numOfDownstream > 1) { // not handle this in join query
qError("join not supported for stream block scan, %s" PRIx64, id);
return TSDB_CODE_APP_ERROR;
2022-11-01 03:56:14 +00:00
}
2023-02-22 06:32:39 +00:00
pOperator->status = OP_NOT_OPENED;
return doSetStreamOpOpen(pOperator->pDownstream[0], id);
2022-11-01 03:56:14 +00:00
}
return 0;
}
2024-09-13 15:04:41 +00:00
int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
2023-03-19 11:22:43 +00:00
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pStreamScanInfo = pOperator->info;
2023-05-11 03:36:27 +00:00
if (pStreamScanInfo->pTableScanOp != NULL) {
STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
if (pScanInfo->base.dataReader != NULL) {
2024-09-13 15:04:41 +00:00
int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
if (code) {
qError("failed to set reader id for executor, code:%s", tstrerror(code));
return code;
}
2023-05-11 03:36:27 +00:00
}
2023-03-19 11:22:43 +00:00
}
} else {
2025-05-07 10:14:17 +00:00
if (pOperator->pDownstream) return doSetTaskId(pOperator->pDownstream[0], pAPI);
2023-03-19 11:22:43 +00:00
}
2024-09-13 15:04:41 +00:00
return 0;
2023-03-19 11:22:43 +00:00
}
2024-09-13 15:04:41 +00:00
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
2023-03-19 11:06:28 +00:00
SExecTaskInfo* pTaskInfo = tinfo;
pTaskInfo->id.queryId = queryId;
2023-04-14 11:37:58 +00:00
buildTaskId(taskId, queryId, pTaskInfo->id.str);
2023-03-19 11:22:43 +00:00
// set the idstr for tsdbReader
2024-09-13 15:04:41 +00:00
return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
2023-03-19 11:06:28 +00:00
}
2022-10-27 03:06:16 +00:00
int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) {
2022-12-03 02:03:18 +00:00
return TSDB_CODE_APP_ERROR;
2022-10-27 03:06:16 +00:00
}
if (pBlocks == NULL || numOfBlocks == 0) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = doSetSMABlock(pTaskInfo->pRoot, (void*)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed to set the sma block data", GET_TASKID(pTaskInfo));
} else {
qDebug("%s set the sma block successfully", GET_TASKID(pTaskInfo));
}
return code;
}
2023-04-04 06:50:58 +00:00
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
uint64_t id) {
if (msg == NULL) { // create raw scan
2024-07-23 09:31:41 +00:00
SExecTaskInfo* pTaskInfo = NULL;
int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo);
if (NULL == pTaskInfo || code != 0) {
return NULL;
}
2024-07-29 02:29:40 +00:00
code = createRawScanOperatorInfo(pReaderHandle, pTaskInfo, &pTaskInfo->pRoot);
2024-07-24 09:08:08 +00:00
if (NULL == pTaskInfo->pRoot || code != 0) {
taosMemoryFree(pTaskInfo);
return NULL;
}
2023-03-29 11:35:04 +00:00
pTaskInfo->storageAPI = pReaderHandle->api;
2023-03-29 11:35:04 +00:00
qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
return pTaskInfo;
2022-07-12 06:10:22 +00:00
}
2023-07-11 11:29:52 +00:00
SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
2022-07-12 06:10:22 +00:00
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE);
2022-07-12 06:10:22 +00:00
if (code != TSDB_CODE_SUCCESS) {
2022-07-19 08:34:26 +00:00
qDestroyTask(pTaskInfo);
2022-07-12 06:10:22 +00:00
terrno = code;
return NULL;
}
// extract the number of output columns
2022-07-19 08:34:26 +00:00
SDataBlockDescNode* pDescNode = pPlan->pNode->pOutputDataBlockDesc;
2022-08-29 09:30:50 +00:00
*numOfCols = 0;
2022-07-21 13:46:55 +00:00
SNode* pNode;
FOREACH(pNode, pDescNode->pSlots) {
SSlotDescNode* pSlotDesc = (SSlotDescNode*)pNode;
if (pSlotDesc->output) {
2022-08-29 09:30:50 +00:00
++(*numOfCols);
}
}
2022-07-12 06:10:22 +00:00
return pTaskInfo;
}
2025-05-08 01:48:53 +00:00
static int32_t qCreateStreamExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
EOPTR_EXEC_MODEL model, SStreamInserterParam* streamInserterParam) {
if (pSubplan == NULL || pTaskInfo == NULL || handle == NULL) {
qError("invalid parameter, pSubplan:%p, pTaskInfo:%p, handle:%p", pSubplan, pTaskInfo, handle);
return TSDB_CODE_INVALID_PARA;
}
SInserterParam* pInserterParam = NULL;
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
(void)taosThreadOnce(&initPoolOnce, initRefPool);
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
goto _error;
}
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
void* pSinkManager = NULL;
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
goto _error;
}
2025-05-08 07:25:20 +00:00
if (streamInserterParam) {
2025-05-08 07:07:04 +00:00
pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam));
if (NULL == pInserterParam) {
qError("failed to taosMemoryCalloc, code:%s, %s", tstrerror(terrno), (*pTask)->id.str);
code = terrno;
goto _error;
}
pInserterParam->readHandle = readHandle;
pInserterParam->streamInserterParam = streamInserterParam;
2025-05-08 01:48:53 +00:00
2025-05-08 07:07:04 +00:00
code = createStreamDataInserter(pSinkManager, handle, pInserterParam);
if (code) {
qError("failed to createStreamDataInserter, code:%s, %s", tstrerror(code), (*pTask)->id.str);
}
2025-05-08 01:48:53 +00:00
}
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
tstrerror(code));
_error:
if (code != TSDB_CODE_SUCCESS) {
if (pInserterParam != NULL) {
taosMemoryFree(pInserterParam);
}
}
return code;
}
int32_t qResetStreamExecTask(qTaskInfo_t* pTaskInfo){
SExecTaskInfo* taskInfo = (SExecTaskInfo*)(pTaskInfo);
SOperatorInfo* pOperator = taskInfo->pRoot;
const char* id = GET_TASKID(taskInfo);
int32_t code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
if (pOperator == NULL || code != 0) {
return code;
}
SStreamScanInfo* pInfo = pOperator->info;
STableScanInfo* pScanInfo = pInfo->pTableScanOp->info;
STableScanBase* pScanBaseInfo = &pScanInfo->base;
code = taskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
return code;
}
2025-05-08 06:58:41 +00:00
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers,
SStreamInserterParam* pInserterParams, int32_t vgId, int32_t taskId) {
2022-06-20 06:29:18 +00:00
if (msg == NULL) {
2024-09-13 15:04:41 +00:00
return TSDB_CODE_INVALID_PARA;
}
2024-09-13 15:04:41 +00:00
*pTaskInfo = NULL;
SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
2024-09-13 15:04:41 +00:00
return code;
}
2025-05-08 01:48:53 +00:00
// todo: add stream inserter param
2025-05-08 06:58:41 +00:00
code = qCreateStreamExecTask(readers, vgId, taskId, pPlan, pTaskInfo, &pInserterParams->pSinkHandle, 0, NULL,
OPTR_EXEC_MODEL_STREAM, pInserterParams);
if (code != TSDB_CODE_SUCCESS) {
2024-09-13 15:04:41 +00:00
qDestroyTask(*pTaskInfo);
return code;
}
2024-09-14 05:39:23 +00:00
code = qStreamInfoResetTimewindowFilter(*pTaskInfo);
2024-07-22 04:51:25 +00:00
if (code != TSDB_CODE_SUCCESS) {
2024-09-13 15:04:41 +00:00
qDestroyTask(*pTaskInfo);
2024-07-22 04:51:25 +00:00
}
2024-09-13 15:04:41 +00:00
return code;
}
2024-07-22 04:51:25 +00:00
static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,
SStorageAPI* pAPI, SArray** ppArrayRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
2024-08-27 02:50:28 +00:00
QUERY_CHECK_NULL(qa, code, lino, _error, terrno);
2023-02-23 01:21:32 +00:00
int32_t numOfUids = taosArrayGetSize(tableIdList);
if (numOfUids == 0) {
2024-07-22 04:51:25 +00:00
(*ppArrayRes) = qa;
2024-08-27 02:50:28 +00:00
goto _error;
2023-02-23 01:21:32 +00:00
}
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
uint64_t suid = 0;
uint64_t uid = 0;
2023-06-19 12:48:49 +00:00
int32_t type = 0;
tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
// let's discard the tables those are not created according to the queried super table.
SMetaReader mr = {0};
pAPI->metaReaderFn.initReader(&mr, pScanInfo->readHandle.vnode, META_READER_LOCK, &pAPI->metaFn);
2023-02-23 01:21:32 +00:00
for (int32_t i = 0; i < numOfUids; ++i) {
uint64_t* id = (uint64_t*)taosArrayGet(tableIdList, i);
2024-08-05 08:09:01 +00:00
QUERY_CHECK_NULL(id, code, lino, _end, terrno);
int32_t code = pAPI->metaReaderFn.getTableEntryByUid(&mr, *id);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
continue;
}
2022-08-05 07:42:45 +00:00
tDecoderClear(&mr.coder);
if (mr.me.type == TSDB_SUPER_TABLE) {
continue;
} else {
if (type == TSDB_SUPER_TABLE) {
// this new created child table does not belong to the scanned super table.
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
continue;
}
} else { // ordinary table
// In case that the scanned target table is an ordinary table. When replay the WAL during restore the vnode, we
// should check all newly created ordinary table to make sure that this table isn't the destination table.
if (mr.me.uid != uid) {
continue;
}
}
}
if (pScanInfo->pTagCond != NULL) {
bool qualified = false;
STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid};
2023-05-23 11:10:50 +00:00
code = isQualifiedTable(&info, pScanInfo->pTagCond, pScanInfo->readHandle.vnode, &qualified, pAPI);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
continue;
}
if (!qualified) {
continue;
}
}
// handle multiple partition
2024-07-22 04:51:25 +00:00
void* tmp = taosArrayPush(qa, id);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
2024-08-27 02:50:28 +00:00
_end:
2024-11-06 11:26:14 +00:00
pAPI->metaReaderFn.clearReader(&mr);
2024-07-22 04:51:25 +00:00
(*ppArrayRes) = qa;
2024-08-27 02:50:28 +00:00
_error:
2024-07-22 04:51:25 +00:00
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) {
2022-11-01 11:27:35 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2023-04-08 17:39:09 +00:00
const char* id = GET_TASKID(pTaskInfo);
int32_t code = 0;
2022-09-16 05:06:57 +00:00
if (isAdd) {
2023-06-02 05:10:54 +00:00
qDebug("try to add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id);
2022-09-16 05:06:57 +00:00
}
// traverse to the stream scanner node to add this table id
2024-07-24 09:08:08 +00:00
SOperatorInfo* pInfo = NULL;
code = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pInfo);
if (code != 0 || pInfo == NULL) {
return code;
}
SStreamScanInfo* pScanInfo = pInfo->info;
if (pInfo->pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { // clear meta cache for subscription if tag is changed
for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) {
int64_t* uid = (int64_t*)taosArrayGet(tableIdList, i);
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
taosLRUCacheErase(pTableScanInfo->base.metaCache.pTableMetaEntryCache, uid, LONG_BYTES);
}
}
2023-04-08 17:39:09 +00:00
if (isAdd) { // add new table id
2024-07-22 04:51:25 +00:00
SArray* qa = NULL;
code = filterUnqualifiedTables(pScanInfo, tableIdList, id, &pTaskInfo->storageAPI, &qa);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(qa);
return code;
}
int32_t numOfQualifiedTables = taosArrayGetSize(qa);
2023-04-08 17:39:09 +00:00
qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id);
pTaskInfo->storageAPI.tqReaderFn.tqReaderAddTables(pScanInfo->tqReader, qa);
2022-08-05 07:42:45 +00:00
bool assignUid = false;
2022-07-27 02:52:25 +00:00
size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0;
char* keyBuf = NULL;
2022-07-22 06:38:28 +00:00
if (bufLen > 0) {
2022-07-29 05:54:14 +00:00
assignUid = groupbyTbname(pScanInfo->pGroupTags);
2022-07-22 06:38:28 +00:00
keyBuf = taosMemoryMalloc(bufLen);
if (keyBuf == NULL) {
2022-10-19 05:38:01 +00:00
taosArrayDestroy(qa);
2024-09-20 05:23:44 +00:00
return terrno;
2022-07-22 06:38:28 +00:00
}
}
STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo;
taosWLockLatch(&pTaskInfo->lock);
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
2024-08-24 10:19:25 +00:00
uint64_t* uid = taosArrayGet(qa, i);
2024-08-05 08:09:01 +00:00
if (!uid) {
taosMemoryFree(keyBuf);
taosArrayDestroy(qa);
taosWUnLockLatch(&pTaskInfo->lock);
return terrno;
}
STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0};
2022-07-22 06:38:28 +00:00
if (bufLen > 0) {
2022-07-29 05:54:14 +00:00
if (assignUid) {
keyInfo.groupId = keyInfo.uid;
} else {
2023-05-23 11:10:50 +00:00
code = getGroupIdFromTagsVal(pScanInfo->readHandle.vnode, keyInfo.uid, pScanInfo->pGroupTags, keyBuf,
&keyInfo.groupId, &pTaskInfo->storageAPI);
2022-07-29 05:54:14 +00:00
if (code != TSDB_CODE_SUCCESS) {
2022-10-18 03:43:58 +00:00
taosMemoryFree(keyBuf);
2022-10-19 05:38:01 +00:00
taosArrayDestroy(qa);
taosWUnLockLatch(&pTaskInfo->lock);
2022-07-29 05:54:14 +00:00
return code;
}
2022-07-22 06:38:28 +00:00
}
}
2024-07-22 04:51:25 +00:00
code = tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(keyBuf);
taosArrayDestroy(qa);
taosWUnLockLatch(&pTaskInfo->lock);
return code;
}
}
taosWUnLockLatch(&pTaskInfo->lock);
2022-07-22 06:38:28 +00:00
if (keyBuf != NULL) {
taosMemoryFree(keyBuf);
}
taosArrayDestroy(qa);
} else { // remove the table id in current list
2023-04-08 17:39:09 +00:00
qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id);
taosWLockLatch(&pTaskInfo->lock);
pTaskInfo->storageAPI.tqReaderFn.tqReaderRemoveTables(pScanInfo->tqReader, tableIdList);
taosWUnLockLatch(&pTaskInfo->lock);
}
return code;
2022-03-02 08:22:43 +00:00
}
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, int32_t dbNameBuffLen, char* tableName,
int32_t tbaleNameBuffLen, int32_t* sversion, int32_t* tversion, int32_t idx,
bool* tbGet) {
2024-07-15 02:05:57 +00:00
*tbGet = false;
if (tinfo == NULL || dbName == NULL || tableName == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2023-08-11 09:39:41 +00:00
if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
2024-07-15 02:05:57 +00:00
return TSDB_CODE_SUCCESS;
2022-06-29 02:35:07 +00:00
}
2023-08-11 09:39:41 +00:00
SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
2024-08-05 08:09:01 +00:00
if (!pSchemaInfo) {
2024-08-05 10:18:13 +00:00
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
2024-08-05 08:09:01 +00:00
return terrno;
}
2023-08-11 09:39:41 +00:00
*sversion = pSchemaInfo->sw->version;
*tversion = pSchemaInfo->tversion;
if (pSchemaInfo->dbname) {
tstrncpy(dbName, pSchemaInfo->dbname, dbNameBuffLen);
} else {
dbName[0] = 0;
}
2023-08-11 09:39:41 +00:00
if (pSchemaInfo->tablename) {
tstrncpy(tableName, pSchemaInfo->tablename, tbaleNameBuffLen);
} else {
tableName[0] = 0;
}
2024-07-15 02:05:57 +00:00
*tbGet = true;
2023-08-11 09:39:41 +00:00
return TSDB_CODE_SUCCESS;
}
2022-07-22 06:38:28 +00:00
2024-07-22 04:51:25 +00:00
bool qIsDynamicExecTask(qTaskInfo_t tinfo) { return ((SExecTaskInfo*)tinfo)->dynamicTask; }
void destroyOperatorParam(SOperatorParam* pParam) {
if (NULL == pParam) {
return;
}
2024-07-22 04:51:25 +00:00
// TODO
}
void qDestroyOperatorParam(SOperatorParam* pParam) {
if (NULL == pParam) {
return;
}
freeOperatorParam(pParam, OP_GET_PARAM);
}
void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam) {
destroyOperatorParam(((SExecTaskInfo*)tinfo)->pOpParam);
((SExecTaskInfo*)tinfo)->pOpParam = pParam;
((SExecTaskInfo*)tinfo)->paramSet = false;
}
2022-07-22 06:38:28 +00:00
2024-07-02 11:25:33 +00:00
int32_t qExecutorInit(void) {
2025-02-20 11:20:40 +00:00
(void)taosThreadOnce(&initPoolOnce, initRefPool);
2024-07-02 11:25:33 +00:00
return TSDB_CODE_SUCCESS;
}
2022-07-22 06:38:28 +00:00
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql,
EOPTR_EXEC_MODEL model) {
2022-07-22 06:38:28 +00:00
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
2024-07-22 04:51:25 +00:00
(void)taosThreadOnce(&initPoolOnce, initRefPool);
2022-07-22 06:38:28 +00:00
qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId);
2023-03-19 02:49:30 +00:00
int32_t code = createExecTaskInfo(pSubplan, pTask, readHandle, taskId, vgId, sql, model);
2023-07-06 03:13:43 +00:00
if (code != TSDB_CODE_SUCCESS || NULL == *pTask) {
2025-02-26 03:34:50 +00:00
qError("failed to createExecTaskInfo, code:%s", tstrerror(code));
2022-07-22 06:38:28 +00:00
goto _error;
}
if (handle) {
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50, .compress = compressResult};
2024-07-22 04:51:25 +00:00
void* pSinkManager = NULL;
2023-08-22 10:40:42 +00:00
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
goto _error;
}
2022-07-22 06:38:28 +00:00
void* pSinkParam = NULL;
2023-04-06 06:40:01 +00:00
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
2022-07-22 06:38:28 +00:00
if (code != TSDB_CODE_SUCCESS) {
2022-10-20 03:27:33 +00:00
qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
2023-08-22 10:40:42 +00:00
taosMemoryFree(pSinkManager);
2022-07-22 06:38:28 +00:00
goto _error;
}
2024-11-15 02:16:15 +00:00
SDataSinkNode* pSink = NULL;
if (readHandle->localExec) {
2025-05-07 11:20:45 +00:00
code = nodesCloneNode((SNode*)pSubplan->pDataSink, (SNode**)&pSink);
2024-11-15 02:16:15 +00:00
if (code != TSDB_CODE_SUCCESS) {
2025-05-07 11:20:45 +00:00
qError("failed to nodesCloneNode, srcType:%d, code:%s, %s", nodeType(pSubplan->pDataSink), tstrerror(code),
(*pTask)->id.str);
2024-11-15 02:16:15 +00:00
taosMemoryFree(pSinkManager);
goto _error;
}
}
2023-04-18 02:40:53 +00:00
// pSinkParam has been freed during create sinker.
2025-05-07 11:20:45 +00:00
code = dsCreateDataSinker(pSinkManager, readHandle->localExec ? &pSink : &pSubplan->pDataSink, handle, pSinkParam,
(*pTask)->id.str, pSubplan->processOneBlock);
2024-09-14 05:39:23 +00:00
if (code) {
qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
}
2022-07-22 06:38:28 +00:00
}
2024-09-14 05:39:23 +00:00
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
tstrerror(code));
2022-11-01 11:27:35 +00:00
_error:
2022-07-22 06:38:28 +00:00
// if failed to add ref for all tables in this query, abort current query
return code;
}
2022-08-09 05:45:44 +00:00
static void freeBlock(void* param) {
SSDataBlock* pBlock = *(SSDataBlock**)param;
2022-08-09 05:45:44 +00:00
blockDataDestroy(pBlock);
}
2025-05-07 11:20:45 +00:00
int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal,
bool processOneBlock) {
2024-07-22 04:51:25 +00:00
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
2022-07-22 06:38:28 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
2022-09-09 11:03:42 +00:00
if (pLocal) {
2022-09-15 06:46:15 +00:00
memcpy(&pTaskInfo->localFetch, pLocal, sizeof(*pLocal));
2022-09-09 11:03:42 +00:00
}
2022-09-28 02:38:49 +00:00
2022-11-12 08:03:47 +00:00
taosArrayClear(pResList);
2022-08-09 05:45:44 +00:00
2022-07-22 06:38:28 +00:00
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
return pTaskInfo->code;
}
if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampUs();
2022-07-22 06:38:28 +00:00
}
if (isTaskKilled(pTaskInfo)) {
atomic_store_64(&pTaskInfo->owner, 0);
2022-07-22 06:38:28 +00:00
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
2024-08-19 01:50:38 +00:00
return pTaskInfo->code;
2022-07-22 06:38:28 +00:00
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) {
pTaskInfo->code = ret;
2024-07-22 04:51:25 +00:00
(void)cleanUpUdfs();
2022-09-15 07:02:23 +00:00
2022-07-22 06:38:28 +00:00
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code;
}
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
int32_t current = 0;
2022-08-09 05:45:44 +00:00
SSDataBlock* pRes = NULL;
2024-07-24 09:08:08 +00:00
int64_t st = taosGetTimestampUs();
2022-07-22 06:38:28 +00:00
if (pTaskInfo->pOpParam && !pTaskInfo->paramSet) {
pTaskInfo->paramSet = true;
2024-07-24 09:08:08 +00:00
code = pTaskInfo->pRoot->fpSet.getNextExtFn(pTaskInfo->pRoot, pTaskInfo->pOpParam, &pRes);
} else {
2024-08-27 09:04:44 +00:00
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
}
2024-10-23 07:37:49 +00:00
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataCheck(pRes);
2024-08-27 09:04:44 +00:00
QUERY_CHECK_CODE(code, lino, _end);
2024-07-22 04:51:25 +00:00
if (pRes == NULL) {
st = taosGetTimestampUs();
}
2024-07-22 04:51:25 +00:00
int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold;
if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) {
rowsThreshold = 4096;
}
2024-07-24 09:08:08 +00:00
2022-11-12 08:03:47 +00:00
int32_t blockIndex = 0;
while (pRes != NULL) {
2022-11-12 08:03:47 +00:00
SSDataBlock* p = NULL;
if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
2024-07-27 10:55:34 +00:00
SSDataBlock* p1 = NULL;
code = createOneDataBlock(pRes, true, &p1);
2024-07-28 06:29:56 +00:00
QUERY_CHECK_CODE(code, lino, _end);
2024-07-27 10:55:34 +00:00
void* tmp = taosArrayPush(pTaskInfo->pResultBlockList, &p1);
2024-12-11 11:18:50 +00:00
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2022-11-12 08:03:47 +00:00
p = p1;
} else {
2024-08-05 08:09:01 +00:00
void* tmp = taosArrayGet(pTaskInfo->pResultBlockList, blockIndex);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2024-08-27 09:04:44 +00:00
2024-08-05 08:09:01 +00:00
p = *(SSDataBlock**)tmp;
2024-07-22 04:51:25 +00:00
code = copyDataBlock(p, pRes);
QUERY_CHECK_CODE(code, lino, _end);
2022-11-12 08:03:47 +00:00
}
blockIndex += 1;
2022-08-09 05:45:44 +00:00
current += p->info.rows;
QUERY_CHECK_CONDITION((p->info.rows > 0 || p->info.type == STREAM_CHECKPOINT), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
2024-07-22 04:51:25 +00:00
void* tmp = taosArrayPush(pResList, &p);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2022-08-09 05:45:44 +00:00
if (current >= rowsThreshold || processOneBlock) {
2022-08-09 05:45:44 +00:00
break;
}
2024-08-27 09:04:44 +00:00
code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &pRes);
2024-10-23 07:37:49 +00:00
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataCheck(pRes);
2024-08-27 09:04:44 +00:00
QUERY_CHECK_CODE(code, lino, _end);
2022-08-09 05:45:44 +00:00
}
2024-08-27 09:04:44 +00:00
if (pTaskInfo->pSubplan->dynamicRowThreshold) {
pTaskInfo->pSubplan->rowsThreshold -= current;
}
2022-08-09 05:45:44 +00:00
2022-09-27 02:52:55 +00:00
*hasMore = (pRes != NULL);
2022-07-22 06:38:28 +00:00
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
2022-08-09 05:45:44 +00:00
if (NULL == pRes) {
2022-07-22 06:38:28 +00:00
*useconds = pTaskInfo->cost.elapsedTime;
}
2024-07-22 04:51:25 +00:00
_end:
2024-07-22 05:15:24 +00:00
(void)cleanUpUdfs();
2022-07-22 06:38:28 +00:00
2022-09-15 07:02:23 +00:00
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
2022-08-09 05:45:44 +00:00
qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
2022-07-22 06:38:28 +00:00
atomic_store_64(&pTaskInfo->owner, 0);
2024-08-27 09:04:44 +00:00
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
2022-07-22 06:38:28 +00:00
return pTaskInfo->code;
}
2022-11-27 06:57:44 +00:00
void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2022-12-20 02:15:14 +00:00
SArray* pList = pTaskInfo->pResultBlockList;
size_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
2022-11-27 06:57:44 +00:00
SSDataBlock** p = taosArrayGet(pTaskInfo->pResultBlockList, i);
2024-08-05 08:09:01 +00:00
if (p) {
blockDataDestroy(*p);
}
2022-11-27 06:57:44 +00:00
}
taosArrayClear(pTaskInfo->pResultBlockList);
}
2022-07-22 06:38:28 +00:00
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
2024-08-27 09:04:44 +00:00
int64_t curOwner = 0;
2022-07-22 06:38:28 +00:00
*pRes = NULL;
// todo extract method
taosRLockLatch(&pTaskInfo->lock);
bool isKilled = isTaskKilled(pTaskInfo);
if (isKilled) {
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
2024-08-19 01:50:38 +00:00
return pTaskInfo->code;
}
if (pTaskInfo->owner != 0) {
2022-07-22 06:38:28 +00:00
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
taosRUnLockLatch(&pTaskInfo->lock);
2022-07-22 06:38:28 +00:00
return pTaskInfo->code;
}
pTaskInfo->owner = threadId;
taosRUnLockLatch(&pTaskInfo->lock);
2022-07-22 06:38:28 +00:00
if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampUs();
2022-07-22 06:38:28 +00:00
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) {
pTaskInfo->code = ret;
2024-07-22 04:51:25 +00:00
(void)cleanUpUdfs();
2022-07-22 06:38:28 +00:00
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code;
}
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
int64_t st = taosGetTimestampUs();
2024-08-27 09:04:44 +00:00
int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
if (code) {
pTaskInfo->code = code;
2024-09-24 10:19:47 +00:00
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
2024-08-27 09:04:44 +00:00
}
2024-10-23 07:37:49 +00:00
code = blockDataCheck(*pRes);
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
2024-09-10 08:56:36 +00:00
2022-07-22 06:38:28 +00:00
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
if (NULL == *pRes) {
*useconds = pTaskInfo->cost.elapsedTime;
}
2025-05-07 11:20:45 +00:00
(void)cleanUpUdfs();
2022-07-22 06:38:28 +00:00
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code;
}
2022-11-15 03:59:29 +00:00
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
2022-11-11 09:13:55 +00:00
taosWLockLatch(&pTaskInfo->stopInfo.lock);
2024-07-22 04:51:25 +00:00
void* tmp = taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
2022-11-11 09:13:55 +00:00
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
2024-07-22 05:55:00 +00:00
if (!tmp) {
2024-12-11 11:18:50 +00:00
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
2024-09-20 05:23:44 +00:00
return terrno;
2024-07-22 04:51:25 +00:00
}
2022-11-11 09:13:55 +00:00
return TSDB_CODE_SUCCESS;
}
int32_t stopInfoComp(void const* lp, void const* rp) {
SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;
if (key->refId < pInfo->refId) {
return -1;
} else if (key->refId > pInfo->refId) {
return 1;
}
return 0;
}
2022-11-15 03:59:29 +00:00
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
2022-11-11 09:13:55 +00:00
taosWLockLatch(&pTaskInfo->stopInfo.lock);
int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
if (idx >= 0) {
taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
}
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
taosWLockLatch(&pTaskInfo->stopInfo.lock);
int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
for (int32_t i = 0; i < num; ++i) {
2022-11-15 03:59:29 +00:00
SExchangeOpStopInfo* pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
2024-08-05 08:09:01 +00:00
if (!pStop) {
2024-08-05 10:18:13 +00:00
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
2024-08-05 08:09:01 +00:00
continue;
}
2024-08-24 10:19:25 +00:00
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
2022-11-11 09:13:55 +00:00
if (pExchangeInfo) {
2024-09-04 10:37:41 +00:00
int32_t code = tsem_post(&pExchangeInfo->ready);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
code = taosReleaseRef(exchangeObjRefPool, pStop->refId);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
2022-11-11 09:13:55 +00:00
}
}
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
}
2022-11-29 08:17:05 +00:00
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
2022-07-22 06:38:28 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
2022-11-15 03:59:29 +00:00
2022-11-29 08:17:05 +00:00
setTaskKilled(pTaskInfo, rspCode);
2022-11-11 09:13:55 +00:00
qStopTaskOperators(pTaskInfo);
2022-11-15 03:59:29 +00:00
2022-07-22 06:38:28 +00:00
return TSDB_CODE_SUCCESS;
}
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
int64_t st = taosGetTimestampMs();
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
if (waitDuration > 0) {
2025-05-07 11:20:45 +00:00
qDebug("%s sync killed execTask, and waiting for at most %.2fs", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
} else {
qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
}
2023-03-31 02:35:13 +00:00
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
if (waitDuration > 0) {
while (1) {
taosWLockLatch(&pTaskInfo->lock);
if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
taosWUnLockLatch(&pTaskInfo->lock);
taosMsleep(200);
int64_t d = taosGetTimestampMs() - st;
if (d >= waitDuration && waitDuration >= 0) {
qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
return TSDB_CODE_SUCCESS;
}
} else { // not running now
pTaskInfo->code = rspCode;
taosWUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS;
}
}
}
fix(stream): reduce the consensus checkpoint id trans. (#30105) * fix(stream): reduce the consensus checkpoint id trans. * refactor(stream): add some logs. * refactor(stream): set the max checkpoint exec time 30min. * refactor(stream): add checkpoint-consensus trans conflict check. * refactor(stream): remove unused local variables. * fix(stream): fix syntax error. * fix(stream): 1. fix free memory error 2. continue if put result into dst hashmap failed. * fix issue * fix issue * fix(mnd): follower mnode not processes the timer event. * fix(stream): print correct error msg. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): truncate long subtable name * fix(stream): add buffer len. * refactor(stream): update some logs. * fix issue * refactor(stream): update some logs. * refactor(stream): update some logs. * fix(stream): check return value. * fix(stream): fix syntax error. * fix(stream): check return value. * fix(stream): update the timer check in mnode. * fix(stream): add restart stage tracking. * fix(stream): track the start task stage for meta. * fix(stream): fix error in log. * refactor(stream): adjust log info. * fix mem issue * fix(stream): check the number of required tasks for consensus checkpointId. * fix(stream): lock the whole start procedure. * fix(stream): add lock during start all tasks. * fix(stream): update logs. * fix(stream): update logs. * fix(stream): update logs. * fix(stream): fix dead-lock. * fix(stream): fix syntax error. * fix(stream): not drop the scan-history task. * fix(stream): fix syntax error. * fix(stream): wait for executor stop before restarting. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): add some logs. * fix(stream): disable some logs. * fix(stream): reset the start info if no task left. --------- Co-authored-by: 54liuyao <54liuyao@163.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com>
2025-03-17 02:20:17 +00:00
int64_t et = taosGetTimestampMs() - st;
if (et < waitDuration) {
qInfo("%s waiting %.2fs for executor stopping", GET_TASKID(pTaskInfo), et / 1000.0);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_SUCCESS;
}
bool qTaskIsExecuting(qTaskInfo_t qinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
if (NULL == pTaskInfo) {
return false;
}
return 0 != atomic_load_64(&pTaskInfo->owner);
}
2022-11-30 13:04:58 +00:00
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
STaskCostInfo* pSummary = &pTaskInfo->cost;
int64_t idleTime = pSummary->start - pSummary->created;
2022-11-30 13:04:58 +00:00
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
if (pSummary->pRecoder != NULL) {
qDebug(
"%s :cost summary: idle:%.2f ms, elapsed time:%.2f ms, extract tableList:%.2f ms, "
"createGroupIdMap:%.2f ms, total blocks:%d, "
2022-11-30 13:04:58 +00:00
"load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
GET_TASKID(pTaskInfo), idleTime / 1000.0, pSummary->elapsedTime / 1000.0, pSummary->extractListTime,
pSummary->groupIdMapTime, pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks,
pRecorder->totalRows, pRecorder->totalCheckedRows);
} else {
qDebug("%s :cost summary: idle in queue:%.2f ms, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), idleTime / 1000.0,
pSummary->elapsedTime / 1000.0);
2022-11-30 13:04:58 +00:00
}
}
2022-07-22 06:38:28 +00:00
void qDestroyTask(qTaskInfo_t qTaskHandle) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
if (pTaskInfo == NULL) {
return;
}
2023-04-28 08:29:09 +00:00
if (pTaskInfo->pRoot != NULL) {
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
} else {
qDebug("%s execTask completed", GET_TASKID(pTaskInfo));
}
2022-07-22 06:38:28 +00:00
2022-11-27 16:51:18 +00:00
printTaskExecCostInLog(pTaskInfo); // print the query cost summary
2022-07-22 06:38:28 +00:00
doDestroyTask(pTaskInfo);
}
2022-07-30 03:30:31 +00:00
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList) {
2022-07-22 06:38:28 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2022-07-30 03:30:31 +00:00
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pExecInfoList);
2022-07-22 06:38:28 +00:00
}
void qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
2022-07-22 06:38:28 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
uint16_t type = pOperator->operatorType;
2022-07-22 06:38:28 +00:00
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
*scanner = pOperator->info;
break;
2022-07-22 06:38:28 +00:00
} else {
pOperator = pOperator->pDownstream[0];
}
}
}
2024-07-22 04:51:25 +00:00
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
2022-07-22 06:38:28 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE1;
2024-07-22 04:51:25 +00:00
qDebug("%s step 1. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
", window:%" PRId64 " - %" PRId64,
2023-06-19 09:28:19 +00:00
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
2024-07-22 04:51:25 +00:00
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
2023-06-19 09:28:19 +00:00
SStreamTaskInfo* pStreamInfo = &pTaskInfo->streamInfo;
pStreamInfo->fillHistoryVer = *pVerRange;
pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 "-%" PRId64 ", window:%" PRId64
"-%" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
QUERY_CHECK_CONDITION((pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM), code, lino, _end,
TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
2025-05-07 11:20:45 +00:00
static int32_t getOpratorIntervalInfo(SOperatorInfo* pOperator, int64_t* pWaterMark, SInterval* pInterval,
STimeWindow* pLastWindow, TSKEY* pRecInteral) {
2024-09-20 07:30:44 +00:00
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
return getOpratorIntervalInfo(pOperator->pDownstream[0], pWaterMark, pInterval, pLastWindow, pRecInteral);
2024-09-20 07:30:44 +00:00
}
2025-05-07 11:20:45 +00:00
SStreamScanInfo* pScanOp = (SStreamScanInfo*)pOperator->info;
2024-09-20 07:30:44 +00:00
*pWaterMark = pScanOp->twAggSup.waterMark;
*pInterval = pScanOp->interval;
2024-10-28 03:09:56 +00:00
*pLastWindow = pScanOp->lastScanRange;
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
*pRecInteral = pScanOp->recalculateInterval;
2025-05-07 11:20:45 +00:00
return TSDB_CODE_SUCCESS;
2024-09-20 07:30:44 +00:00
}
2025-05-07 11:20:45 +00:00
int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval,
STimeWindow* pLastWindow, TSKEY* pRecInteral) {
2024-09-20 07:30:44 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
feat(stream): add new trigger continuous_window_close (#30125) * opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 12:14:01 +00:00
return getOpratorIntervalInfo(pOperator, pWaterMark, pInterval, pLastWindow, pRecInteral);
}
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
int32_t type = pOperator->operatorType;
// iterate operator tree
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
if (pOperator->numOfDownstream > 1) {
qError("unexpected stream, multiple downstream");
return -1;
}
return 0;
} else {
pOperator = pOperator->pDownstream[0];
}
}
return 0;
}
2023-11-04 17:23:23 +00:00
bool qStreamScanhistoryFinished(qTaskInfo_t tinfo) {
2022-12-27 06:35:55 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished;
}
2022-07-22 06:38:28 +00:00
2023-07-31 07:24:35 +00:00
int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
2023-06-19 09:28:19 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2024-01-30 07:22:49 +00:00
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
2023-06-19 09:28:19 +00:00
qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
2023-07-31 07:24:35 +00:00
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
2023-07-31 07:24:35 +00:00
pWindow->skey = INT64_MIN;
pWindow->ekey = INT64_MAX;
2023-06-19 09:28:19 +00:00
return 0;
}
2022-07-22 06:38:28 +00:00
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return (void*)pInfo->tqReader;
}
2022-08-09 11:06:24 +00:00
const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.schema;
}
const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.tbName;
2022-07-22 06:38:28 +00:00
}
2024-05-23 09:35:54 +00:00
SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
2022-07-22 06:38:28 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2024-05-23 09:35:54 +00:00
return &pTaskInfo->streamInfo.btMetaRsp;
2022-07-22 06:38:28 +00:00
}
2024-07-29 02:29:40 +00:00
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
2022-08-09 11:06:24 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
tOffsetCopy(pOffset, &pTaskInfo->streamInfo.currentOffset);
2024-07-29 10:16:04 +00:00
return 0;
/*if (code != TSDB_CODE_SUCCESS) {
2024-07-29 02:35:06 +00:00
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
2024-07-29 10:16:04 +00:00
}*/
2022-07-22 06:38:28 +00:00
}
2022-10-17 06:05:40 +00:00
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
2022-10-17 06:05:40 +00:00
pCond->numOfCols = pMtInfo->schema->nCols;
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
2022-12-02 07:52:32 +00:00
pCond->pSlotList = taosMemoryMalloc(sizeof(int32_t) * pCond->numOfCols);
if (pCond->colList == NULL || pCond->pSlotList == NULL) {
taosMemoryFreeClear(pCond->colList);
taosMemoryFreeClear(pCond->pSlotList);
return terrno;
}
feat: support customized taos/taosd (#29736) * feat: support TDAcoreOS * chore: cmake options for TD_ACORE * chore: disable lemon for TD_ACORE * chore: add lzma2 and msvcregex * chore: cmake for lzma2 * chore: adapt for TD_ACORE * chore: adapt strcasecmp for TD_ACORE * chore: adapt for geos/threadName * chore: build adapt for TD_ACORE * chore: build adapt for TD_ACORE * chore: build adapt for TD_ACORE * chore: build adapt for TD_ACORE * chore: build adapt for TD_ACORE termio * chore: refact transComm.h for TD_ACORE * chore: refact transportInt.h for TD_ACORE * chore: refact trans.c for TD_ACORE * chore: refact trpc.h for TD_ACORE * chore: refact transCli.c/transComm.c/transSvr.c for TD_ACORE * chore: refact uv.h for TD_ACORE * chore: refact geosWrapper.h for TD_ACORE * chore: refact token/builtins/udf for TD_ACORE * chore: refact rocks for TD_ACORE * chore: refact tsdbCache.c for TD_ACORE, use LRU cache for last/last_row, not use rocksdb * chore: refact FAIL to _ERR to solve conflicts for TD_ACORE * chore: restore lemon.c/lempar.c * chore: support build lemon for TD_ACORE * chore: refact trpc and siginfo_t for TD_ACORE * chore: refact timezone for TD_ACORE * chore: refact lz4 for TD_ACORE * chore: refact TD_ACORE to make compile pass * chore: code optimization for TD_ASTRA * feat: support run taos with taosd integrated * feat: support invoke taos shell * feat: support invoke taos shell * feat: support invoke taos shell * chore: code optimization * chore: fix undefined reference problem os TD_ASTRA * chore: resolve compile problem for TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix undefined reference problem os TD_ASTRA * chore: fix getpid * chore: fix typo * chore: set stack size and ajust min pack size for TD_ASTRA * chore: fix pthread create parameters * chore: chmod adapt for TD_ASTRA * chore: fix trans compile problem * chore: adapt chmod for TD_ASTRA * chore: byte alignment for TD_ASTRA * chore: more code for adaption of TD_ASTRA * chore: more code for adaption of TD_ASTRA * chore: more code for adaption of TD_ASTRA * chore: byte alignment for TD_ASTRA * chore: conditional compile option * chore: adapt for TD_ASTRA * chore: adjust taosPId and msvcregex for TD_ASTRA * chore: log dir separator for wal build name * chore: fix type of pointer parameter * chore: fix compile problem of tsdbGetS3Size * enh: get last ver from wal log for TD_ASTRA * enh: refact wal meta ver * enh: refact wal meta ver * fix: typo of taosUcs4Compare * enh: process return value of CI * chore: more code for TD_ASTRA adaption * chore: return value of taosCloseFile in walMeta.c * chore: fix compile problem * chore: fix compile problem of TD_ASTRA * fix: update macro for tq and stream task * chore: code optimization for TD_ASTRA * chore: restore create log and init cfg interface * chore: restore strncasecmp and strcasecmp * fix: adjust the field position of SDataBlockInfo * fix: pragma pack min size * fix: pragma pack min size * chore: more code for TD_ASTRA adaption * fix: type of parameters * chore: adapt strncasecmp and strcasecmp for TD_ASTRA * chore: restore interface of init log * enh: pack push optimization * fix: taos init cfg * add astra support * fix: fetch the value of suid * chore: switch of build with udf * add temp code * chore: more code for TD_ASTRA adaption * chore: add macro ERRNO to replace errno * chore: bytes align for TD_ASTRA * fix: remove obsolete codes * enh: support USE_UDF macro * fix compile error * fix: resolve redefinition problem * fix: compile problem of log.cpp * fix: compile problem of osTimezone * fix: resolve compile problem of udf * fix: pragma definition on windows * fix: ucs4 and stpncpy for TD_ASTRA * fix: memory align problem for TD_ASTRA * enh: solve memory leak for TD_ASTRA_RPC * fix: compile problem of taosSetInt64Aligned * fix: restore mndSubscribe.c * fix: scalar for udf * chore: code adaption for TD_ASTRA * chore: code optimization for TD_ASTRA * fix: typo of add definition * fix: typo of macro in tudf.h * chore: remove void to make CI pass * enh: move macro from cmake.platform to cmake.options * enh: byte align for hash node and error code * chore: restore the size for lru cache * enh: restore some code about pack push * chore: restore the pack push in tmsg.h * fix: add macro of pack pop for windows --------- Co-authored-by: yihaoDeng <luomoxyz@126.com>
2025-03-14 05:32:13 +00:00
TAOS_SET_OBJ_ALIGNED(&pCond->twindows, TSWINDOW_INITIALIZER);
2022-10-17 06:05:40 +00:00
pCond->suid = pMtInfo->suid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = sContext->snapVersion;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
2022-12-02 07:52:32 +00:00
SColumnInfo* pColInfo = &pCond->colList[i];
pColInfo->type = pMtInfo->schema->pSchema[i].type;
pColInfo->bytes = pMtInfo->schema->pSchema[i].bytes;
pColInfo->colId = pMtInfo->schema->pSchema[i].colId;
pColInfo->pk = pMtInfo->schema->pSchema[i].flags & COL_IS_KEY;
2022-12-02 07:52:32 +00:00
pCond->pSlotList[i] = i;
}
return TSDB_CODE_SUCCESS;
}
void qStreamSetOpen(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2023-04-11 07:56:28 +00:00
SOperatorInfo* pOperator = pTaskInfo->pRoot;
pOperator->status = OP_NOT_OPENED;
}
void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->streamInfo.sourceExcluded = sourceExcluded;
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
2024-07-22 04:51:25 +00:00
int32_t code = TSDB_CODE_SUCCESS;
2022-07-22 06:38:28 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2023-06-19 12:48:49 +00:00
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
2022-07-22 06:38:28 +00:00
SOperatorInfo* pOperator = pTaskInfo->pRoot;
2023-04-04 06:50:58 +00:00
const char* id = GET_TASKID(pTaskInfo);
2024-07-22 04:51:25 +00:00
if (subType == TOPIC_SUB_TYPE__COLUMN && pOffset->type == TMQ_OFFSET__LOG) {
2024-07-24 09:08:08 +00:00
code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
if (pOperator == NULL || code != 0) {
return code;
2023-06-28 03:54:48 +00:00
}
2024-07-24 09:08:08 +00:00
2023-06-28 03:54:48 +00:00
SStreamScanInfo* pInfo = pOperator->info;
2024-07-22 04:51:25 +00:00
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
2023-06-28 03:54:48 +00:00
walReaderVerifyOffset(pWalReader, pOffset);
}
2023-03-29 09:27:04 +00:00
// if pOffset equal to current offset, means continue consume
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
return 0;
}
if (subType == TOPIC_SUB_TYPE__COLUMN) {
2024-07-29 02:29:40 +00:00
code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id, &pOperator);
2024-07-24 09:08:08 +00:00
if (pOperator == NULL || code != 0) {
return code;
}
2023-05-04 08:15:14 +00:00
SStreamScanInfo* pInfo = pOperator->info;
STableScanInfo* pScanInfo = pInfo->pTableScanOp->info;
STableScanBase* pScanBaseInfo = &pScanInfo->base;
2023-04-06 06:40:01 +00:00
STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo;
if (pOffset->type == TMQ_OFFSET__LOG) {
2023-05-23 10:58:54 +00:00
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL;
2023-05-25 09:51:03 +00:00
SStoreTqReader* pReaderAPI = &pTaskInfo->storageAPI.tqReaderFn;
2023-06-19 12:48:49 +00:00
SWalReader* pWalReader = pReaderAPI->tqReaderGetWalReader(pInfo->tqReader);
2023-05-25 09:51:03 +00:00
walReaderVerifyOffset(pWalReader, pOffset);
code = pReaderAPI->tqReaderSeek(pInfo->tqReader, pOffset->version, id);
if (code < 0) {
qError("tqReaderSeek failed ver:%" PRId64 ", %s", pOffset->version, id);
return code;
2023-05-25 09:51:03 +00:00
}
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
// iterate all tables from tableInfoList, and retrieve rows from each table one-by-one
// those data are from the snapshot in tsdb, besides the data in the wal file.
int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts;
2023-03-29 11:40:34 +00:00
int32_t index = 0;
// this value may be changed if new tables are created
taosRLockLatch(&pTaskInfo->lock);
int32_t numOfTables = 0;
code = tableListGetSize(pTableListInfo, &numOfTables);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
taosRUnLockLatch(&pTaskInfo->lock);
return code;
}
if (uid == 0) {
if (numOfTables != 0) {
STableKeyInfo* tmp = tableListGetInfo(pTableListInfo, 0);
2024-08-06 01:24:31 +00:00
if (!tmp) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
taosRUnLockLatch(&pTaskInfo->lock);
return terrno;
}
if (tmp) uid = tmp->uid;
ts = INT64_MIN;
pScanInfo->currentTable = 0;
} else {
taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id);
return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
2022-07-27 02:52:25 +00:00
}
}
pTaskInfo->storageAPI.tqReaderFn.tqSetTablePrimaryKey(pInfo->tqReader, uid);
2022-07-22 13:16:52 +00:00
2023-04-11 07:56:28 +00:00
qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% " PRId64 " rows returned", uid, ts,
pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0;
2022-07-22 13:16:52 +00:00
// start from current accessed position
// we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
2023-03-30 04:04:47 +00:00
// position, let's find it from the beginning.
index = tableListFind(pTableListInfo, uid, 0);
taosRUnLockLatch(&pTaskInfo->lock);
2022-07-22 06:38:28 +00:00
if (index >= 0) {
pScanInfo->currentTable = index;
} else {
2023-03-30 04:04:47 +00:00
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->currentTable, id);
return TSDB_CODE_TMQ_NO_TABLE_QUALIFIED;
2023-03-11 08:12:11 +00:00
}
2022-07-22 06:38:28 +00:00
STableKeyInfo keyInfo = {.uid = uid};
2023-04-04 06:50:58 +00:00
int64_t oldSkey = pScanBaseInfo->cond.twindows.skey;
// let's start from the next ts that returned to consumer.
2024-07-22 04:51:25 +00:00
if (pTaskInfo->storageAPI.tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader)) {
pScanBaseInfo->cond.twindows.skey = ts;
2024-07-22 04:51:25 +00:00
} else {
pScanBaseInfo->cond.twindows.skey = ts + 1;
}
2023-03-30 04:04:47 +00:00
pScanInfo->scanTimes = 0;
if (pScanBaseInfo->dataReader == NULL) {
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond,
&keyInfo, 1, pScanInfo->pResBlock,
(void**)&pScanBaseInfo->dataReader, id, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
return code;
2022-07-27 02:52:25 +00:00
}
2023-03-30 04:04:47 +00:00
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
} else {
2024-07-22 04:51:25 +00:00
code = pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
code = pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
}
// restore the key value
pScanBaseInfo->cond.twindows.skey = oldSkey;
} else {
qError("invalid pOffset->type:%d, %s", pOffset->type, id);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
2023-03-29 11:40:34 +00:00
} else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
2024-08-24 10:19:25 +00:00
SOperatorInfo* p = NULL;
2024-07-24 09:08:08 +00:00
code = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id, &p);
if (code != 0) {
return code;
}
2023-04-03 06:49:14 +00:00
STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo;
if (pAPI->snapshotFn.setForSnapShot(sContext, pOffset->uid) != 0) {
2023-04-04 06:50:58 +00:00
qError("setDataForSnapShot error. uid:%" PRId64 " , %s", pOffset->uid, id);
2024-07-24 09:08:08 +00:00
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
2022-10-30 14:13:49 +00:00
SMetaTableInfo mtInfo = {0};
code = pTaskInfo->storageAPI.snapshotFn.getMetaTableInfoFromSnapshot(sContext, &mtInfo);
2024-08-24 10:19:25 +00:00
if (code != 0) {
feat(decimal): support decimal data type (#30060) * decimal: create table * decimal: add test case decimal.py * decimal: add decimal.c * support input decimal * decimal test * refactor svalue * fix test cases * add decimal unit test * add decimal test cmake * support insert and query decimal type * define wide integer, support decimal128 * support decimal128 divide * set decimal type expr res types * scalar decimal * convert to decimal * fix decimal64/128 from str and to str * fix decimal from str and decimal to str * decimal simple conversion * unit test for decimal * decimal conversion and unit tests * decimal + - * / * decimal scalar ops and comparision * start to refactor GET_TYPED_DATA * support decimal max func, cast func * refactor GET_TYPED_DATA interface * decimal scalar comparision * start to implement sum for decimal * support sum and avg for decimal type * decimal tests * add decimal test * decimal add test cases * decimal use int256/int128 * decimal testing * fix decimal table meta and add tests for decimal col streams * fix create stream and create tsma * test insert decimal values * decimal from str * test decimal input * test parse decimal from string * add taos_fetch_field_e api * decimal insert tests * test decimal operators * decimal operator test * feat:support decimal in raw block * decimal operator tests * decimal test * feat:support decimal in raw block * feat:support decimal in raw block * feat:add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * decimal test operators * decimal operator test * test decimal operators * test decimal compare operators * decimal unary operator test * decimal col with decimal col oper test * test decimal col filtering * fix decimal float operator test * decimal test where filtering * fix decimal filtering * fix decimal order by * fix decimal op test * test decimal agg funcs * test decimal functions * remove assert * fix ci build for ret check * fix decimal windows build * fix ci ret check * skip decimal ret check * skip decimal ret check * fix decimal tests * fix decimal ci test * decimal test * fix(tmq): heap user after free * fix(tmq): double free * fix(tmq): double free * fix decimal tests * fix(decimal): decimal test ci build * fix(decimal): windows build * fix(decimal): decimal test build * fix(decimal): fix decimal build and tests * fix(decimal): fix decimal tests * fix(decimal): fix taos_fetch_fields_e api * fix(decimal): fix decimal taos_fetch_fields_e api * fix(decimal): rebase 3.0 * fix(decimal): fix decimal functions * fix(decimal): fix decimal test case memory leak * fix(decimal): fix decimal tests * fix(decimal): fix decimal test case * fix(decimal): fix decimal tests * feat(decimal): fix unit tests * feat(decimal): fix deicmal unit test --------- Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
2025-03-14 10:08:07 +00:00
destroyMetaTableInfo(&mtInfo);
return code;
}
2023-05-23 10:58:54 +00:00
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
2022-10-30 14:13:49 +00:00
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
tableListClear(pTableListInfo);
if (mtInfo.uid == 0) {
feat(decimal): support decimal data type (#30060) * decimal: create table * decimal: add test case decimal.py * decimal: add decimal.c * support input decimal * decimal test * refactor svalue * fix test cases * add decimal unit test * add decimal test cmake * support insert and query decimal type * define wide integer, support decimal128 * support decimal128 divide * set decimal type expr res types * scalar decimal * convert to decimal * fix decimal64/128 from str and to str * fix decimal from str and decimal to str * decimal simple conversion * unit test for decimal * decimal conversion and unit tests * decimal + - * / * decimal scalar ops and comparision * start to refactor GET_TYPED_DATA * support decimal max func, cast func * refactor GET_TYPED_DATA interface * decimal scalar comparision * start to implement sum for decimal * support sum and avg for decimal type * decimal tests * add decimal test * decimal add test cases * decimal use int256/int128 * decimal testing * fix decimal table meta and add tests for decimal col streams * fix create stream and create tsma * test insert decimal values * decimal from str * test decimal input * test parse decimal from string * add taos_fetch_field_e api * decimal insert tests * test decimal operators * decimal operator test * feat:support decimal in raw block * decimal operator tests * decimal test * feat:support decimal in raw block * feat:support decimal in raw block * feat:add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * decimal test operators * decimal operator test * test decimal operators * test decimal compare operators * decimal unary operator test * decimal col with decimal col oper test * test decimal col filtering * fix decimal float operator test * decimal test where filtering * fix decimal filtering * fix decimal order by * fix decimal op test * test decimal agg funcs * test decimal functions * remove assert * fix ci build for ret check * fix decimal windows build * fix ci ret check * skip decimal ret check * skip decimal ret check * fix decimal tests * fix decimal ci test * decimal test * fix(tmq): heap user after free * fix(tmq): double free * fix(tmq): double free * fix decimal tests * fix(decimal): decimal test ci build * fix(decimal): windows build * fix(decimal): decimal test build * fix(decimal): fix decimal build and tests * fix(decimal): fix decimal tests * fix(decimal): fix taos_fetch_fields_e api * fix(decimal): fix decimal taos_fetch_fields_e api * fix(decimal): rebase 3.0 * fix(decimal): fix decimal functions * fix(decimal): fix decimal test case memory leak * fix(decimal): fix decimal tests * fix(decimal): fix decimal test case * fix(decimal): fix decimal tests * feat(decimal): fix unit tests * feat(decimal): fix deicmal unit test --------- Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
2025-03-14 10:08:07 +00:00
destroyMetaTableInfo(&mtInfo);
goto end; // no data
}
pAPI->snapshotFn.taosXSetTablePrimaryKey(sContext, mtInfo.uid);
2024-07-22 04:51:25 +00:00
code = initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
feat(decimal): support decimal data type (#30060) * decimal: create table * decimal: add test case decimal.py * decimal: add decimal.c * support input decimal * decimal test * refactor svalue * fix test cases * add decimal unit test * add decimal test cmake * support insert and query decimal type * define wide integer, support decimal128 * support decimal128 divide * set decimal type expr res types * scalar decimal * convert to decimal * fix decimal64/128 from str and to str * fix decimal from str and decimal to str * decimal simple conversion * unit test for decimal * decimal conversion and unit tests * decimal + - * / * decimal scalar ops and comparision * start to refactor GET_TYPED_DATA * support decimal max func, cast func * refactor GET_TYPED_DATA interface * decimal scalar comparision * start to implement sum for decimal * support sum and avg for decimal type * decimal tests * add decimal test * decimal add test cases * decimal use int256/int128 * decimal testing * fix decimal table meta and add tests for decimal col streams * fix create stream and create tsma * test insert decimal values * decimal from str * test decimal input * test parse decimal from string * add taos_fetch_field_e api * decimal insert tests * test decimal operators * decimal operator test * feat:support decimal in raw block * decimal operator tests * decimal test * feat:support decimal in raw block * feat:support decimal in raw block * feat:add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * decimal test operators * decimal operator test * test decimal operators * test decimal compare operators * decimal unary operator test * decimal col with decimal col oper test * test decimal col filtering * fix decimal float operator test * decimal test where filtering * fix decimal filtering * fix decimal order by * fix decimal op test * test decimal agg funcs * test decimal functions * remove assert * fix ci build for ret check * fix decimal windows build * fix ci ret check * skip decimal ret check * skip decimal ret check * fix decimal tests * fix decimal ci test * decimal test * fix(tmq): heap user after free * fix(tmq): double free * fix(tmq): double free * fix decimal tests * fix(decimal): decimal test ci build * fix(decimal): windows build * fix(decimal): decimal test build * fix(decimal): fix decimal build and tests * fix(decimal): fix decimal tests * fix(decimal): fix taos_fetch_fields_e api * fix(decimal): fix decimal taos_fetch_fields_e api * fix(decimal): rebase 3.0 * fix(decimal): fix decimal functions * fix(decimal): fix decimal test case memory leak * fix(decimal): fix decimal tests * fix(decimal): fix decimal test case * fix(decimal): fix decimal tests * feat(decimal): fix unit tests * feat(decimal): fix deicmal unit test --------- Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
2025-03-14 10:08:07 +00:00
destroyMetaTableInfo(&mtInfo);
2024-07-22 04:51:25 +00:00
return code;
}
if (pAPI->snapshotFn.taosXGetTablePrimaryKey(sContext)) {
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
2024-07-22 04:51:25 +00:00
} else {
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts + 1;
}
2022-10-31 03:53:35 +00:00
2024-07-22 04:51:25 +00:00
code = tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
if (code != TSDB_CODE_SUCCESS) {
feat(decimal): support decimal data type (#30060) * decimal: create table * decimal: add test case decimal.py * decimal: add decimal.c * support input decimal * decimal test * refactor svalue * fix test cases * add decimal unit test * add decimal test cmake * support insert and query decimal type * define wide integer, support decimal128 * support decimal128 divide * set decimal type expr res types * scalar decimal * convert to decimal * fix decimal64/128 from str and to str * fix decimal from str and decimal to str * decimal simple conversion * unit test for decimal * decimal conversion and unit tests * decimal + - * / * decimal scalar ops and comparision * start to refactor GET_TYPED_DATA * support decimal max func, cast func * refactor GET_TYPED_DATA interface * decimal scalar comparision * start to implement sum for decimal * support sum and avg for decimal type * decimal tests * add decimal test * decimal add test cases * decimal use int256/int128 * decimal testing * fix decimal table meta and add tests for decimal col streams * fix create stream and create tsma * test insert decimal values * decimal from str * test decimal input * test parse decimal from string * add taos_fetch_field_e api * decimal insert tests * test decimal operators * decimal operator test * feat:support decimal in raw block * decimal operator tests * decimal test * feat:support decimal in raw block * feat:support decimal in raw block * feat:add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * decimal test operators * decimal operator test * test decimal operators * test decimal compare operators * decimal unary operator test * decimal col with decimal col oper test * test decimal col filtering * fix decimal float operator test * decimal test where filtering * fix decimal filtering * fix decimal order by * fix decimal op test * test decimal agg funcs * test decimal functions * remove assert * fix ci build for ret check * fix decimal windows build * fix ci ret check * skip decimal ret check * skip decimal ret check * fix decimal tests * fix decimal ci test * decimal test * fix(tmq): heap user after free * fix(tmq): double free * fix(tmq): double free * fix decimal tests * fix(decimal): decimal test ci build * fix(decimal): windows build * fix(decimal): decimal test build * fix(decimal): fix decimal build and tests * fix(decimal): fix decimal tests * fix(decimal): fix taos_fetch_fields_e api * fix(decimal): fix decimal taos_fetch_fields_e api * fix(decimal): rebase 3.0 * fix(decimal): fix decimal functions * fix(decimal): fix decimal test case memory leak * fix(decimal): fix decimal tests * fix(decimal): fix decimal test case * fix(decimal): fix decimal tests * feat(decimal): fix unit tests * feat(decimal): fix deicmal unit test --------- Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
2025-03-14 10:08:07 +00:00
destroyMetaTableInfo(&mtInfo);
2024-07-22 04:51:25 +00:00
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
return code;
}
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
2024-08-06 01:24:31 +00:00
if (!pList) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
feat(decimal): support decimal data type (#30060) * decimal: create table * decimal: add test case decimal.py * decimal: add decimal.c * support input decimal * decimal test * refactor svalue * fix test cases * add decimal unit test * add decimal test cmake * support insert and query decimal type * define wide integer, support decimal128 * support decimal128 divide * set decimal type expr res types * scalar decimal * convert to decimal * fix decimal64/128 from str and to str * fix decimal from str and decimal to str * decimal simple conversion * unit test for decimal * decimal conversion and unit tests * decimal + - * / * decimal scalar ops and comparision * start to refactor GET_TYPED_DATA * support decimal max func, cast func * refactor GET_TYPED_DATA interface * decimal scalar comparision * start to implement sum for decimal * support sum and avg for decimal type * decimal tests * add decimal test * decimal add test cases * decimal use int256/int128 * decimal testing * fix decimal table meta and add tests for decimal col streams * fix create stream and create tsma * test insert decimal values * decimal from str * test decimal input * test parse decimal from string * add taos_fetch_field_e api * decimal insert tests * test decimal operators * decimal operator test * feat:support decimal in raw block * decimal operator tests * decimal test * feat:support decimal in raw block * feat:support decimal in raw block * feat:add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * decimal test operators * decimal operator test * test decimal operators * test decimal compare operators * decimal unary operator test * decimal col with decimal col oper test * test decimal col filtering * fix decimal float operator test * decimal test where filtering * fix decimal filtering * fix decimal order by * fix decimal op test * test decimal agg funcs * test decimal functions * remove assert * fix ci build for ret check * fix decimal windows build * fix ci ret check * skip decimal ret check * skip decimal ret check * fix decimal tests * fix decimal ci test * decimal test * fix(tmq): heap user after free * fix(tmq): double free * fix(tmq): double free * fix decimal tests * fix(decimal): decimal test ci build * fix(decimal): windows build * fix(decimal): decimal test build * fix(decimal): fix decimal build and tests * fix(decimal): fix decimal tests * fix(decimal): fix taos_fetch_fields_e api * fix(decimal): fix decimal taos_fetch_fields_e api * fix(decimal): rebase 3.0 * fix(decimal): fix decimal functions * fix(decimal): fix decimal test case memory leak * fix(decimal): fix decimal tests * fix(decimal): fix decimal test case * fix(decimal): fix decimal tests * feat(decimal): fix unit tests * feat(decimal): fix deicmal unit test --------- Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
2025-03-14 10:08:07 +00:00
destroyMetaTableInfo(&mtInfo);
2024-08-06 01:24:31 +00:00
return code;
}
int32_t size = 0;
code = tableListGetSize(pTableListInfo, &size);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
feat(decimal): support decimal data type (#30060) * decimal: create table * decimal: add test case decimal.py * decimal: add decimal.c * support input decimal * decimal test * refactor svalue * fix test cases * add decimal unit test * add decimal test cmake * support insert and query decimal type * define wide integer, support decimal128 * support decimal128 divide * set decimal type expr res types * scalar decimal * convert to decimal * fix decimal64/128 from str and to str * fix decimal from str and decimal to str * decimal simple conversion * unit test for decimal * decimal conversion and unit tests * decimal + - * / * decimal scalar ops and comparision * start to refactor GET_TYPED_DATA * support decimal max func, cast func * refactor GET_TYPED_DATA interface * decimal scalar comparision * start to implement sum for decimal * support sum and avg for decimal type * decimal tests * add decimal test * decimal add test cases * decimal use int256/int128 * decimal testing * fix decimal table meta and add tests for decimal col streams * fix create stream and create tsma * test insert decimal values * decimal from str * test decimal input * test parse decimal from string * add taos_fetch_field_e api * decimal insert tests * test decimal operators * decimal operator test * feat:support decimal in raw block * decimal operator tests * decimal test * feat:support decimal in raw block * feat:support decimal in raw block * feat:add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * decimal test operators * decimal operator test * test decimal operators * test decimal compare operators * decimal unary operator test * decimal col with decimal col oper test * test decimal col filtering * fix decimal float operator test * decimal test where filtering * fix decimal filtering * fix decimal order by * fix decimal op test * test decimal agg funcs * test decimal functions * remove assert * fix ci build for ret check * fix decimal windows build * fix ci ret check * skip decimal ret check * skip decimal ret check * fix decimal tests * fix decimal ci test * decimal test * fix(tmq): heap user after free * fix(tmq): double free * fix(tmq): double free * fix decimal tests * fix(decimal): decimal test ci build * fix(decimal): windows build * fix(decimal): decimal test build * fix(decimal): fix decimal build and tests * fix(decimal): fix decimal tests * fix(decimal): fix taos_fetch_fields_e api * fix(decimal): fix decimal taos_fetch_fields_e api * fix(decimal): rebase 3.0 * fix(decimal): fix decimal functions * fix(decimal): fix decimal test case memory leak * fix(decimal): fix decimal tests * fix(decimal): fix decimal test case * fix(decimal): fix decimal tests * feat(decimal): fix unit tests * feat(decimal): fix deicmal unit test --------- Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
2025-03-14 10:08:07 +00:00
destroyMetaTableInfo(&mtInfo);
return code;
2024-08-26 06:49:02 +00:00
}
2024-07-22 04:51:25 +00:00
code = pTaskInfo->storageAPI.tsdReader.tsdReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size,
NULL, (void**)&pInfo->dataReader, NULL, NULL);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
feat(decimal): support decimal data type (#30060) * decimal: create table * decimal: add test case decimal.py * decimal: add decimal.c * support input decimal * decimal test * refactor svalue * fix test cases * add decimal unit test * add decimal test cmake * support insert and query decimal type * define wide integer, support decimal128 * support decimal128 divide * set decimal type expr res types * scalar decimal * convert to decimal * fix decimal64/128 from str and to str * fix decimal from str and decimal to str * decimal simple conversion * unit test for decimal * decimal conversion and unit tests * decimal + - * / * decimal scalar ops and comparision * start to refactor GET_TYPED_DATA * support decimal max func, cast func * refactor GET_TYPED_DATA interface * decimal scalar comparision * start to implement sum for decimal * support sum and avg for decimal type * decimal tests * add decimal test * decimal add test cases * decimal use int256/int128 * decimal testing * fix decimal table meta and add tests for decimal col streams * fix create stream and create tsma * test insert decimal values * decimal from str * test decimal input * test parse decimal from string * add taos_fetch_field_e api * decimal insert tests * test decimal operators * decimal operator test * feat:support decimal in raw block * decimal operator tests * decimal test * feat:support decimal in raw block * feat:support decimal in raw block * feat:add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * feat:remove add schemaExt to SMqDataRsp * decimal test operators * decimal operator test * test decimal operators * test decimal compare operators * decimal unary operator test * decimal col with decimal col oper test * test decimal col filtering * fix decimal float operator test * decimal test where filtering * fix decimal filtering * fix decimal order by * fix decimal op test * test decimal agg funcs * test decimal functions * remove assert * fix ci build for ret check * fix decimal windows build * fix ci ret check * skip decimal ret check * skip decimal ret check * fix decimal tests * fix decimal ci test * decimal test * fix(tmq): heap user after free * fix(tmq): double free * fix(tmq): double free * fix decimal tests * fix(decimal): decimal test ci build * fix(decimal): windows build * fix(decimal): decimal test build * fix(decimal): fix decimal build and tests * fix(decimal): fix decimal tests * fix(decimal): fix taos_fetch_fields_e api * fix(decimal): fix decimal taos_fetch_fields_e api * fix(decimal): rebase 3.0 * fix(decimal): fix decimal functions * fix(decimal): fix decimal test case memory leak * fix(decimal): fix decimal tests * fix(decimal): fix decimal test case * fix(decimal): fix decimal tests * feat(decimal): fix unit tests * feat(decimal): fix deicmal unit test --------- Co-authored-by: wangmm0220 <wangmm0220@gmail.com> Co-authored-by: yihaoDeng <yhdeng@taosdata.com>
2025-03-14 10:08:07 +00:00
destroyMetaTableInfo(&mtInfo);
2024-07-22 04:51:25 +00:00
return code;
}
2022-08-31 07:27:41 +00:00
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN);
2025-05-07 11:20:45 +00:00
// pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid;
2023-05-04 08:15:14 +00:00
tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema;
2023-04-04 06:50:58 +00:00
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64 " %s", mtInfo.uid, pOffset->ts, id);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
code = pTaskInfo->storageAPI.snapshotFn.setForSnapShot(sContext, pOffset->uid);
if (code != 0) {
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
return code;
}
2023-04-04 06:50:58 +00:00
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts,
id);
} else if (pOffset->type == TMQ_OFFSET__LOG) {
SStreamRawScanInfo* pInfo = pOperator->info;
2023-05-23 10:58:54 +00:00
pTaskInfo->storageAPI.tsdReader.tsdReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
2023-03-29 11:40:34 +00:00
qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
2022-08-05 13:12:18 +00:00
}
2022-07-22 06:38:28 +00:00
}
end:
tOffsetCopy(&pTaskInfo->streamInfo.currentOffset, pOffset);
2022-07-22 06:38:28 +00:00
return 0;
}
2022-11-09 05:45:46 +00:00
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
2023-04-04 06:50:58 +00:00
if (pMsg->info.ahandle == NULL) {
2023-03-11 08:12:11 +00:00
qError("pMsg->info.ahandle is NULL");
return;
}
2022-11-09 05:45:46 +00:00
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
if (pMsg->contLen > 0) {
buf.pData = taosMemoryCalloc(1, pMsg->contLen);
if (buf.pData == NULL) {
2024-12-11 11:18:50 +00:00
pMsg->code = terrno;
2022-11-09 05:45:46 +00:00
} else {
memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
}
}
2024-07-22 04:51:25 +00:00
(void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
2022-11-09 05:45:46 +00:00
rpcFreeCont(pMsg->pCont);
destroySendMsgInfo(pSendInfo);
2022-11-15 03:59:29 +00:00
}
2023-01-04 07:14:00 +00:00
2023-04-08 17:39:09 +00:00
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
2024-07-22 04:51:25 +00:00
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
2023-04-08 17:39:09 +00:00
SExecTaskInfo* pTaskInfo = tinfo;
2024-07-23 09:31:41 +00:00
SArray* plist = NULL;
code = getTableListInfo(pTaskInfo, &plist);
if (code || plist == NULL) {
return NULL;
}
2023-04-08 17:39:09 +00:00
// only extract table in the first elements
STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
SArray* pUidList = taosArrayInit(10, sizeof(uint64_t));
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(pUidList, code, lino, _end, terrno);
2023-04-08 17:39:09 +00:00
int32_t numOfTables = 0;
code = tableListGetSize(pTableListInfo, &numOfTables);
QUERY_CHECK_CODE(code, lino, _end);
2023-06-19 12:48:49 +00:00
for (int32_t i = 0; i < numOfTables; ++i) {
2023-04-08 17:39:09 +00:00
STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i);
2024-08-06 01:24:31 +00:00
QUERY_CHECK_NULL(pKeyInfo, code, lino, _end, terrno);
2024-08-24 10:19:25 +00:00
void* tmp = taosArrayPush(pUidList, &pKeyInfo->uid);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2023-04-08 17:39:09 +00:00
}
taosArrayDestroy(plist);
2024-07-22 04:51:25 +00:00
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
T_LONG_JMP(pTaskInfo->env, code);
}
2023-04-08 17:39:09 +00:00
return pUidList;
}
2023-04-28 03:42:34 +00:00
2024-08-06 05:22:14 +00:00
static int32_t extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
2024-07-22 04:51:25 +00:00
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2023-04-28 03:42:34 +00:00
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
2024-08-06 05:22:14 +00:00
void* tmp = taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2023-04-28 03:42:34 +00:00
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = pOperator->info;
2024-08-06 05:22:14 +00:00
void* tmp = taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
2024-07-25 11:11:32 +00:00
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
2023-04-28 03:42:34 +00:00
} else {
if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
2024-08-06 05:22:14 +00:00
code = extractTableList(pList, pOperator->pDownstream[0]);
2023-04-28 03:42:34 +00:00
}
}
2024-07-22 04:51:25 +00:00
_end:
if (code != TSDB_CODE_SUCCESS) {
2024-08-06 05:22:14 +00:00
qError("%s %s failed at line %d since %s", pTaskInfo->id.str, __func__, lino, tstrerror(code));
2024-07-22 04:51:25 +00:00
}
2024-08-06 05:22:14 +00:00
return code;
2023-04-28 03:42:34 +00:00
}
2024-07-23 09:31:41 +00:00
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
if (pList == NULL) {
return TSDB_CODE_INVALID_PARA;
}
2024-08-06 06:05:46 +00:00
*pList = NULL;
2024-07-23 09:31:41 +00:00
SArray* pArray = taosArrayInit(0, POINTER_BYTES);
if (pArray == NULL) {
2024-08-06 06:05:46 +00:00
return terrno;
2024-07-23 09:31:41 +00:00
}
2024-08-06 06:05:46 +00:00
int32_t code = extractTableList(pArray, pTaskInfo->pRoot);
if (code == 0) {
*pList = pArray;
} else {
taosArrayDestroy(pArray);
2024-08-06 06:05:46 +00:00
}
return code;
2023-06-15 06:13:17 +00:00
}
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
2024-07-22 04:51:25 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
if (pTaskInfo->pRoot->fpSet.releaseStreamStateFn != NULL) {
pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
}
2023-06-15 06:13:17 +00:00
return 0;
}
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo) {
2024-07-22 04:51:25 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
2024-10-11 07:27:29 +00:00
if (pTaskInfo->pRoot->fpSet.reloadStreamStateFn != NULL) {
pTaskInfo->pRoot->fpSet.reloadStreamStateFn(pTaskInfo->pRoot);
}
2023-06-15 06:13:17 +00:00
return 0;
}
2024-09-08 06:54:08 +00:00
void qResetTaskCode(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t code = pTaskInfo->code;
pTaskInfo->code = 0;
2024-09-08 07:17:08 +00:00
qDebug("0x%" PRIx64 " reset task code to be success, prev:%s", pTaskInfo->id.taskId, tstrerror(code));
2024-09-08 06:54:08 +00:00
}
2025-04-16 08:36:09 +00:00
int32_t collectExprsToReplaceForStream(SOperatorInfo* pOper, SArray* pExprs) {
int32_t code = 0;
return code;
}
int32_t streamCollectExprsForReplace(qTaskInfo_t tInfo, SArray* pExprs) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
2025-05-07 11:20:45 +00:00
int32_t code = collectExprsToReplaceForStream(pTaskInfo->pRoot, pExprs);
2025-04-16 08:36:09 +00:00
return code;
}
int32_t clearStatesForOperator(SOperatorInfo* pOper) {
int32_t code = 0;
2025-04-29 07:31:17 +00:00
if (pOper->fpSet.resetStateFn) {
code = pOper->fpSet.resetStateFn(pOper);
}
2025-05-08 09:36:10 +00:00
pOper->status = OP_NOT_OPENED;
2025-04-16 08:36:09 +00:00
for (int32_t i = 0; i < pOper->numOfDownstream && code == 0; ++i) {
code = clearStatesForOperator(pOper->pDownstream[i]);
}
return code;
}
int32_t streamClearStatesForOperators(qTaskInfo_t tInfo) {
2025-05-07 11:20:45 +00:00
int32_t code = 0;
2025-04-16 08:36:09 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
SOperatorInfo* pOper = pTaskInfo->pRoot;
code = clearStatesForOperator(pOper);
return code;
}
2025-04-29 07:31:17 +00:00
static int32_t streamDoNotification(qTaskInfo_t tInfo, const SSDataBlock* pBlock) {
int32_t code = 0;
int32_t lino = 0;
if (!pBlock || pBlock->info.rows <= 0) return code;
2025-05-08 06:58:41 +00:00
return 0;
2025-04-29 07:31:17 +00:00
2025-05-07 11:20:45 +00:00
EStreamNotifyEventType eventType = SNOTIFY_EVENT_WINDOW_CLOSE;
2025-04-29 07:31:17 +00:00
SStreamNotifyEventSupp* pSupp = NULL;
2025-05-07 11:20:45 +00:00
STaskNotifyEventStat stats = {0};
2025-04-29 07:31:17 +00:00
pSupp->pWindowEventHashMap =
taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
QUERY_CHECK_NULL(pSupp->pWindowEventHashMap, code, lino, _end, terrno);
// TODO wjm pSupp->windowType = ???
2025-05-07 11:20:45 +00:00
// code = addAggResultNotifyEvent(pBlock, NULL, NULL, pSupp, &stats);
2025-04-29 07:31:17 +00:00
if (code == 0) {
code = buildNotifyEventBlock(tInfo, pSupp, &stats);
}
// add NotifyEvent
//
// build notify block
//
// send events
return code;
_end:
return code;
2025-04-16 08:36:09 +00:00
}
2025-05-07 11:20:45 +00:00
int32_t streamExecuteTask(qTaskInfo_t tInfo, SSDataBlock** pRes, uint64_t* useconds) {
2025-04-16 08:36:09 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
int64_t threadId = taosGetSelfPthreadId();
int64_t curOwner = 0;
*pRes = NULL;
// todo extract method
taosRLockLatch(&pTaskInfo->lock);
bool isKilled = isTaskKilled(pTaskInfo);
if (isKilled) {
2025-05-07 11:20:45 +00:00
// clearStreamBlock(pTaskInfo->pRoot);
2025-04-16 08:36:09 +00:00
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
return pTaskInfo->code;
}
if (pTaskInfo->owner != 0) {
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
taosRUnLockLatch(&pTaskInfo->lock);
return pTaskInfo->code;
}
pTaskInfo->owner = threadId;
taosRUnLockLatch(&pTaskInfo->lock);
if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampUs();
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env);
if (ret != TSDB_CODE_SUCCESS) {
pTaskInfo->code = ret;
(void)cleanUpUdfs();
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code;
}
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
int64_t st = taosGetTimestampUs();
int32_t code = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, pRes);
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
2025-04-29 07:31:17 +00:00
} else {
code = streamForceOutput(tInfo, pRes);
2025-04-16 08:36:09 +00:00
}
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
2025-04-29 07:31:17 +00:00
} else {
code = blockDataCheck(*pRes);
}
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
} else {
code = streamDoNotification(tInfo, *pRes);
2025-04-16 08:36:09 +00:00
}
if (code) {
pTaskInfo->code = code;
qError("%s failed at line %d, code:%s %s", __func__, __LINE__, tstrerror(code), GET_TASKID(pTaskInfo));
}
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
if (NULL == *pRes) {
*useconds = pTaskInfo->cost.elapsedTime;
}
2025-05-07 11:20:45 +00:00
(void)cleanUpUdfs();
2025-04-16 08:36:09 +00:00
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
atomic_store_64(&pTaskInfo->owner, 0);
return pTaskInfo->code;
}
2025-04-29 07:31:17 +00:00
void streamSetTaskRuntimeInfo(qTaskInfo_t tinfo, SStreamRuntimeInfo* pStreamRuntimeInfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
pTaskInfo->pStreamRuntimeInfo = pStreamRuntimeInfo;
2025-04-16 08:36:09 +00:00
}
2025-04-24 10:02:12 +00:00
int32_t qStreamCreateTableListForReader(void* pVnode, uint64_t suid, uint64_t uid, int8_t tableType,
2025-05-07 11:20:45 +00:00
SNodeList* pGroupTags, bool groupSort, SNode* pTagCond, SNode* pTagIndexCond,
SStorageAPI* storageAPI, void** pTableListInfo) {
STableListInfo* pList = tableListCreate();
if (pList == NULL) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return terrno;
}
SScanPhysiNode pScanNode = {.suid = suid, .uid = uid, .tableType = tableType};
SReadHandle pHandle = {.vnode = pVnode};
SExecTaskInfo pTaskInfo = {.id.str = "", .storageAPI = *storageAPI};
int32_t code = createScanTableListInfo(&pScanNode, pGroupTags, groupSort, &pHandle, pList, pTagCond, pTagIndexCond, &pTaskInfo);
2025-05-07 11:20:45 +00:00
if (code != 0) {
tableListDestroy(pList);
qError("failed to createScanTableListInfo, code:%s", tstrerror(code));
return code;
}
*pTableListInfo = pList;
return 0;
2025-04-24 10:02:12 +00:00
}
int32_t qStreamGetTableList(void* pTableListInfo, int32_t currentGroupId, STableKeyInfo** pKeyInfo, int32_t* size) {
2025-05-07 11:20:45 +00:00
if (currentGroupId == -1) {
*size = taosArrayGetSize(((STableListInfo*)pTableListInfo)->pTableList);
*pKeyInfo = taosArrayGet(((STableListInfo*)pTableListInfo)->pTableList, 0);
return 0;
}
return tableListGetGroupList(pTableListInfo, currentGroupId, pKeyInfo, size);
2025-04-24 10:02:12 +00:00
}
int32_t qStreamGetGroupIndex(void* pTableListInfo, int64_t gid) {
2025-05-07 11:20:45 +00:00
for (int32_t i = 0; i < ((STableListInfo*)pTableListInfo)->numOfOuputGroups; ++i) {
int32_t offset = ((STableListInfo*)pTableListInfo)->groupOffset[i];
2025-04-24 10:02:12 +00:00
2025-05-07 11:20:45 +00:00
STableKeyInfo* pKeyInfo = taosArrayGet(((STableListInfo*)pTableListInfo)->pTableList, offset);
if (pKeyInfo != NULL && pKeyInfo->groupId == gid) {
return i;
}
2025-04-24 10:02:12 +00:00
}
2025-05-07 11:20:45 +00:00
return -1;
2025-04-24 10:02:12 +00:00
}
void qStreamDestroyTableList(void* pTableListInfo) { tableListDestroy(pTableListInfo); }
uint64_t qStreamGetGroupId(void* pTableListInfo, int64_t uid) { return tableListGetTableGroupId(pTableListInfo, uid); }
2025-04-24 10:02:12 +00:00
int32_t qStreamGetTableListGroupNum(const void* pTableList) { return ((STableListInfo*)pTableList)->numOfOuputGroups; }
int32_t qStreamFilter(SSDataBlock* pBlock, void* pFilterInfo) { return doFilter(pBlock, pFilterInfo, NULL); }
2025-04-24 10:02:12 +00:00
bool qStreamUidInTableList(void* pTableListInfo, uint64_t uid) {
2025-05-07 11:20:45 +00:00
return tableListGetTableGroupId(pTableListInfo, uid) != -1;
2025-04-24 10:02:12 +00:00
}
2025-04-29 07:31:17 +00:00
void streamDestroyExecTask(qTaskInfo_t tInfo) {}
static int32_t streamCalcOneScalarExpr(SNode* pExpr, SScalarParam* pDst, const SStreamRuntimeFuncInfo* pExtraParams) {
int32_t code = 0;
SNode* pNode = 0;
SNodeList* pList = NULL;
SExprInfo* pExprInfo = NULL;
int32_t numOfExprs = 1;
int32_t* offset = 0;
2025-05-07 10:14:17 +00:00
STargetNode* pTargetNode = NULL;
code = nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTargetNode);
if (code == 0) code = nodesCloneNode(pExpr, &pNode);
2025-04-29 07:31:17 +00:00
2025-05-07 10:14:17 +00:00
if (code == 0) {
pTargetNode->dataBlockId = 0;
pTargetNode->pExpr = pNode;
pTargetNode->slotId = 0;
}
2025-04-29 07:31:17 +00:00
if (code == 0) {
code = nodesMakeList(&pList);
}
if (code == 0) {
2025-05-07 10:14:17 +00:00
code = nodesListAppend(pList, (SNode*)pTargetNode);
2025-04-29 07:31:17 +00:00
}
if (code == 0) {
pNode = NULL;
code = createExprInfo(pList, NULL, &pExprInfo, &numOfExprs);
}
if (code == 0) {
const char* pVal = NULL;
2025-05-07 11:20:45 +00:00
int32_t len = 0;
SNode* pSclNode = NULL;
2025-04-29 07:31:17 +00:00
switch (pExprInfo->pExpr->nodeType) {
case QUERY_NODE_FUNCTION:
pSclNode = (SNode*)pExprInfo->pExpr->_function.pFunctNode;
break;
case QUERY_NODE_OPERATOR:
pSclNode = pExprInfo->pExpr->_optrRoot.pRootNode;
break;
default:
code = TSDB_CODE_OPS_NOT_SUPPORT;
break;
}
2025-05-07 10:14:17 +00:00
SArray* pBlockList = taosArrayInit(2, POINTER_BYTES);
SSDataBlock block = {0};
SSDataBlock* pBlock = &block;
taosArrayPush(pBlockList, &pBlock);
if (code == 0) code = scalarCalculate(pSclNode, pBlockList, pDst, pExtraParams);
2025-04-29 07:31:17 +00:00
}
nodesDestroyList(pList);
destroyExprInfo(pExprInfo, numOfExprs);
return code;
}
int32_t streamForceOutput(qTaskInfo_t tInfo, SSDataBlock** pRes) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
const SArray* pForceOutputCols = pTaskInfo->pStreamRuntimeInfo->pForceOutputCols;
int32_t code = 0;
SNode* pNode = NULL;
SScalarParam dst = {0};
if (!pForceOutputCols) return 0;
if (pRes && *pRes && (*pRes)->info.rows > 0) return 0;
if (!pRes) {
code = createDataBlock(pRes);
}
if (code == 0 && (!(*pRes)->pDataBlock || (*pRes)->pDataBlock->size == 0)) {
int32_t idx = 0;
2025-05-07 11:20:45 +00:00
for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
SStreamOutCol* pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
2025-04-29 07:31:17 +00:00
SColumnInfoData colInfo = createColumnInfoData(pCol->type.type, pCol->type.bytes, idx++);
colInfo.info.precision = pCol->type.precision;
colInfo.info.scale = pCol->type.scale;
code = blockDataAppendColInfo(*pRes, &colInfo);
if (code != 0) break;
}
}
blockDataEnsureCapacity(*pRes, 1);
// loop all exprs for force output, execute all exprs
int32_t idx = 0;
for (int32_t i = 0; i < pForceOutputCols->size; ++i) {
SStreamOutCol* pCol = (SStreamOutCol*)taosArrayGet(pForceOutputCols, i);
pNode = pCol->expr;
SColumnInfoData* pInfo = taosArrayGet((*pRes)->pDataBlock, idx);
if (nodeType(pNode) == QUERY_NODE_VALUE) {
void* p = nodesGetValueFromNode((SValueNode*)pNode);
code = colDataSetVal(pInfo, 0, p, ((SValueNode*)pNode)->isNull);
} else {
dst.columnData = pInfo;
code = streamCalcOneScalarExpr(pNode, &dst, &pTaskInfo->pStreamRuntimeInfo->funcInfo);
}
++idx;
if (code != 0) break;
}
return code;
}
2025-05-07 11:20:45 +00:00
int32_t streamCalcOutputTbName(SNode* pExpr, char* tbname, const SStreamRuntimeFuncInfo* pStreamRuntimeInfo) {
2025-04-29 07:31:17 +00:00
int32_t code = 0;
const char* pVal = NULL;
SScalarParam dst = {0};
2025-05-07 11:20:45 +00:00
int32_t len = 0;
2025-04-29 07:31:17 +00:00
// execute the expr
switch (pExpr->type) {
case QUERY_NODE_VALUE: {
int32_t type = pExpr->type;
if (!IS_STR_DATA_TYPE(type)) {
qError("invalid sub tb expr with non-str type");
code = TSDB_CODE_INVALID_PARA;
break;
}
pVal = nodesGetValueFromNode((SValueNode*)pExpr);
len = strlen(pVal);
} break;
case QUERY_NODE_FUNCTION: {
SFunctionNode* pFunc = (SFunctionNode*)pExpr;
if (!IS_STR_DATA_TYPE(pFunc->node.resType.type)) {
qError("invalid sub tb expr with non-str type func");
code = TSDB_CODE_INVALID_PARA;
break;
}
SColumnInfoData colInfo =
createColumnInfoData(((SExprNode*)pExpr)->resType.type, ((SExprNode*)pExpr)->resType.bytes, 0);
dst.columnData = &colInfo;
code = streamCalcOneScalarExpr(pExpr, &dst, pStreamRuntimeInfo);
if (code == 0) {
pVal = varDataVal(colDataGetVarData(dst.columnData, 0));
2025-05-07 10:14:17 +00:00
len = varDataLen(colDataGetVarData(dst.columnData, 0));
2025-04-29 07:31:17 +00:00
}
} break;
default:
qError("wrong subtable expr with type: %d", pExpr->type);
code = TSDB_CODE_OPS_NOT_SUPPORT;
break;
}
if (code == 0) {
if (!pVal || len == 0) {
qError("tbname generated with no characters which is not allowed");
code = TSDB_CODE_INVALID_PARA;
}
2025-05-07 10:14:17 +00:00
memcpy(tbname, pVal, len);
2025-04-29 07:31:17 +00:00
}
return code;
}