TDengine/source/libs/executor/inc/executorimpl.h

864 lines
34 KiB
C
Raw Normal View History

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
2022-02-28 09:55:07 +00:00
#include "tcommon.h"
#include "tlosertree.h"
2022-04-14 14:32:49 +00:00
#include "tsort.h"
#include "ttszip.h"
#include "tvariant.h"
2022-01-11 02:51:23 +00:00
#include "dataSinkMgt.h"
#include "executil.h"
2022-02-10 05:55:53 +00:00
#include "executor.h"
2022-01-11 02:51:23 +00:00
#include "planner.h"
2022-02-17 11:30:43 +00:00
#include "scalar.h"
#include "taosdef.h"
#include "tarray.h"
2022-09-27 10:11:44 +00:00
#include "tfill.h"
2022-01-11 02:51:23 +00:00
#include "thash.h"
#include "tlockfree.h"
2022-04-02 11:31:52 +00:00
#include "tmsg.h"
2022-04-14 14:32:49 +00:00
#include "tpagedbuf.h"
2022-07-13 08:37:33 +00:00
#include "tstream.h"
2022-09-02 07:34:16 +00:00
#include "tstreamUpdate.h"
2022-01-08 15:19:46 +00:00
#include "executorInt.h"
2022-09-02 07:34:16 +00:00
#include "vnode.h"
2022-04-24 06:19:12 +00:00
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
2022-10-18 09:44:00 +00:00
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
enum {
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u,
/* Task is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
TASK_COMPLETED = 0x2u,
};
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
*/
2022-04-14 14:32:49 +00:00
typedef struct SResultInfo { // TODO refactor
int64_t totalRows; // total generated result size in rows
int64_t totalBytes; // total results in bytes.
int32_t capacity; // capacity of current result output buffer
int32_t threshold; // result size threshold in rows.
2022-03-24 03:15:05 +00:00
} SResultInfo;
typedef struct STableQueryInfo {
2022-07-08 09:48:34 +00:00
TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window
} STableQueryInfo;
2022-04-02 07:08:48 +00:00
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
2022-05-03 15:52:17 +00:00
2022-01-08 14:59:24 +00:00
typedef struct STaskCostInfo {
int64_t created;
int64_t start;
uint64_t elapsedTime;
double extractListTime;
double groupIdMapTime;
2022-05-03 15:52:17 +00:00
SFileBlockLoadRecorder* pRecoder;
2022-01-08 14:59:24 +00:00
} STaskCostInfo;
2022-03-04 06:31:21 +00:00
typedef struct SOperatorCostInfo {
2022-07-08 09:48:34 +00:00
double openCost;
double totalCost;
2022-03-04 06:31:21 +00:00
} SOperatorCostInfo;
struct SOperatorInfo;
2022-05-28 15:26:22 +00:00
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
2022-05-03 07:27:13 +00:00
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
typedef void (*__optr_close_fn_t)(void* param);
typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
typedef int32_t (*__optr_reqBuf_fn_t)(struct SOperatorInfo* pOptr);
2022-03-12 10:02:56 +00:00
2022-01-08 14:59:24 +00:00
typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id
uint64_t subplanId;
uint64_t templateId;
char* str;
2022-01-08 14:59:24 +00:00
} STaskIdInfo;
2022-07-15 09:48:48 +00:00
enum {
STREAM_RECOVER_STEP__NONE = 0,
STREAM_RECOVER_STEP__PREPARE1,
STREAM_RECOVER_STEP__PREPARE2,
2022-07-15 09:48:48 +00:00
STREAM_RECOVER_STEP__SCAN,
};
2022-07-08 09:48:34 +00:00
typedef struct {
2022-09-02 07:34:16 +00:00
// TODO remove prepareStatus
STqOffsetVal prepareStatus; // for tmq
STqOffsetVal lastStatus; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int8_t returned;
int64_t snapshotVer;
const SSubmitReq* pReq;
2022-09-02 07:34:16 +00:00
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN];
int8_t recoverStep;
2022-12-27 06:35:55 +00:00
int8_t recoverScanFinished;
2022-07-15 09:48:48 +00:00
SQueryTableDataCond tableCond;
2022-10-21 01:47:04 +00:00
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
2022-11-01 03:56:14 +00:00
// int8_t triggerSaved;
// int64_t deleteMarkSaved;
SStreamState* pState;
2022-07-08 09:48:34 +00:00
} SStreamTaskInfo;
typedef struct {
char* tablename;
char* dbname;
int32_t tversion;
SSchemaWrapper* sw;
SSchemaWrapper* qsw;
} SSchemaInfo;
2022-11-24 09:24:24 +00:00
typedef struct SExchangeOpStopInfo {
2022-11-16 07:18:31 +00:00
int32_t operatorType;
int64_t refId;
2022-11-11 09:13:55 +00:00
} SExchangeOpStopInfo;
2022-11-24 09:24:24 +00:00
typedef struct STaskStopInfo {
2022-11-11 09:13:55 +00:00
SRWLatch lock;
SArray* pStopInfo;
} STaskStopInfo;
2022-11-18 03:24:56 +00:00
struct SExecTaskInfo {
STaskIdInfo id;
uint32_t status;
STimeWindow window;
STaskCostInfo cost;
int64_t owner; // if it is in execution
int32_t code;
int32_t qbufQuota; // total available buffer (in KB) during execution query
int64_t version; // used for stream to record wal version, why not move to sschemainfo
2022-09-02 07:34:16 +00:00
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
2022-10-30 14:13:49 +00:00
STableListInfo* pTableInfoList; // this is a table list
2022-09-02 07:34:16 +00:00
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SSubplan* pSubplan;
struct SOperatorInfo* pRoot;
SLocalFetch localFetch;
2022-12-27 06:35:55 +00:00
SArray* pResultBlockList; // result block list
2022-11-11 09:13:55 +00:00
STaskStopInfo stopInfo;
2022-11-18 03:24:56 +00:00
};
2022-01-08 14:59:24 +00:00
enum {
2022-09-02 07:34:16 +00:00
OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1,
2022-03-21 10:31:31 +00:00
OP_RES_TO_RETURN = 0x5,
2022-09-02 07:34:16 +00:00
OP_EXEC_DONE = 0x9,
2022-09-28 12:24:02 +00:00
OP_EXEC_RECV = 0x11,
};
2022-04-26 12:26:32 +00:00
typedef struct SOperatorFpSet {
2022-07-08 09:48:34 +00:00
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn;
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
2022-07-08 09:48:34 +00:00
__optr_close_fn_t closeFn;
2022-12-27 06:35:55 +00:00
__optr_reqBuf_fn_t reqBufFn; // total used buffer for blocking operator
2022-07-08 09:48:34 +00:00
__optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow;
__optr_explain_fn_t getExplainFn;
2022-04-26 12:26:32 +00:00
} SOperatorFpSet;
2022-06-18 04:00:41 +00:00
typedef struct SExprSupp {
SExprInfo* pExprInfo;
2022-07-08 09:48:34 +00:00
int32_t numOfExprs; // the number of scalar expression in group operator
2022-06-18 04:00:41 +00:00
SqlFunctionCtx* pCtx;
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
2022-09-20 02:41:35 +00:00
SFilterInfo* pFilterInfo;
2022-06-18 04:00:41 +00:00
} SExprSupp;
typedef struct SOperatorInfo {
uint16_t operatorType;
int16_t resultDataBlockId;
2022-07-08 09:48:34 +00:00
bool blocking; // block operator or not
uint8_t status; // denote if current operator is completed
char* name; // name, for debug purpose
void* info; // extension attribution
SExprSupp exprSupp;
SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost;
SResultInfo resultInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
SOperatorFpSet fpSet;
} SOperatorInfo;
typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
2022-09-02 07:34:16 +00:00
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
2022-06-18 04:00:41 +00:00
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
2022-03-11 11:29:43 +00:00
typedef struct SLoadRemoteDataInfo {
2022-04-14 14:32:49 +00:00
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
2022-03-11 11:29:43 +00:00
} SLoadRemoteDataInfo;
typedef struct SLimitInfo {
2022-09-02 07:34:16 +00:00
SLimit limit;
SLimit slimit;
uint64_t currentGroupId;
int64_t remainGroupOffset;
int64_t numOfOutputGroups;
int64_t remainOffset;
int64_t numOfOutputRows;
} SLimitInfo;
typedef struct SExchangeInfo {
2022-12-27 06:35:55 +00:00
SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
2022-11-11 14:18:06 +00:00
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
2022-12-27 06:35:55 +00:00
SArray* pResultBlockList;
SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
2022-03-11 11:29:43 +00:00
SLoadRemoteDataInfo loadInfo;
uint64_t self;
SLimitInfo limitInfo;
2022-12-27 06:35:55 +00:00
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
} SExchangeInfo;
typedef struct SScanInfo {
2022-09-02 07:34:16 +00:00
int32_t numOfAsc;
int32_t numOfDesc;
} SScanInfo;
typedef struct SSampleExecInfo {
2022-09-02 07:34:16 +00:00
double sampleRatio; // data block sample ratio, 1 by default
uint32_t seed; // random seed value
} SSampleExecInfo;
2022-06-30 06:41:50 +00:00
enum {
TABLE_SCAN__TABLE_ORDER = 1,
TABLE_SCAN__BLOCK_ORDER = 2,
2022-06-30 06:41:50 +00:00
};
typedef struct SAggSupporter {
2022-08-30 01:47:42 +00:00
SSHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
2022-09-02 07:34:16 +00:00
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // current write page id
} SAggSupporter;
typedef struct {
2022-09-02 07:34:16 +00:00
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
// current data block needs to be loaded.
SInterval interval;
SAggSupporter* pAggSup;
SExprSupp* pExprSup; // expr supporter of aggregate operator
} SAggOptrPushDownInfo;
2022-11-07 03:53:23 +00:00
typedef struct STableMetaCacheInfo {
2022-12-27 06:35:55 +00:00
SLRUCache* pTableMetaEntryCache; // 100 by default
uint64_t metaFetch;
uint64_t cacheHit;
2022-11-07 03:53:23 +00:00
} STableMetaCacheInfo;
2022-11-18 01:47:26 +00:00
typedef struct STableScanBase {
STsdbReader* dataReader;
2022-11-18 01:47:26 +00:00
SFileBlockLoadRecorder readRecorder;
SQueryTableDataCond cond;
SAggOptrPushDownInfo pdInfo;
SColMatchInfo matchInfo;
SReadHandle readHandle;
2022-11-18 01:47:26 +00:00
SExprSupp pseudoSup;
STableMetaCacheInfo metaCache;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
2022-11-18 05:40:03 +00:00
int32_t dataBlockLoadFlag;
SLimitInfo limitInfo;
2022-11-18 01:47:26 +00:00
} STableScanBase;
typedef struct STableScanInfo {
2022-12-27 06:35:55 +00:00
STableScanBase base;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t assignBlockUid;
bool hasGroupByTag;
} STableScanInfo;
typedef struct STableMergeScanInfo {
2022-12-27 06:35:55 +00:00
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond
STableScanBase base;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;
int64_t numOfRows;
SScanInfo scanInfo;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;
typedef struct STagScanInfo {
2022-12-27 06:35:55 +00:00
SColumnInfo* pCols;
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
} STagScanInfo;
2022-05-10 07:34:41 +00:00
typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DELETE_DATA,
2022-06-25 08:12:00 +00:00
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
2022-07-06 13:25:34 +00:00
STREAM_SCAN_FROM_DATAREADER_RANGE,
2022-05-10 07:34:41 +00:00
} EStreamScanMode;
2022-07-25 06:15:49 +00:00
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
2022-05-20 11:34:39 +00:00
typedef struct SStreamAggSupporter {
2022-10-18 09:44:00 +00:00
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SSDataBlock* pScanBlock;
SStreamState* pState;
int64_t gap; // stream session window gap
SqlFunctionCtx* pDummyCtx; // for combine
2022-10-18 09:44:00 +00:00
SSHashObj* pResultRows;
int32_t stateKeySize;
int16_t stateKeyType;
SDiskbasedBuf* pResultBuf;
2022-05-20 11:34:39 +00:00
} SStreamAggSupporter;
typedef struct SWindowSupporter {
2022-05-20 11:34:39 +00:00
SStreamAggSupporter* pStreamAggSup;
2022-07-08 09:48:34 +00:00
int64_t gap;
uint16_t parentType;
SAggSupporter* pIntervalAggSup;
} SWindowSupporter;
typedef struct SPartitionBySupporter {
2022-09-02 07:34:16 +00:00
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
bool needCalc; // partition by column
} SPartitionBySupporter;
typedef struct SPartitionDataInfo {
2022-09-02 07:34:16 +00:00
uint64_t groupId;
char* tbname;
SArray* tags;
2022-09-02 07:34:16 +00:00
SArray* rowIds;
} SPartitionDataInfo;
2022-09-13 08:50:27 +00:00
typedef struct STimeWindowAggSupp {
2022-09-02 07:34:16 +00:00
int8_t calTrigger;
2022-11-01 03:56:14 +00:00
int8_t calTriggerSaved;
2022-09-19 07:51:35 +00:00
int64_t deleteMark;
2022-11-01 03:56:14 +00:00
int64_t deleteMarkSaved;
int64_t waterMark;
2022-09-02 07:34:16 +00:00
TSKEY maxTs;
2022-09-13 08:50:27 +00:00
TSKEY minTs;
2022-09-02 07:34:16 +00:00
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
2022-07-16 01:28:15 +00:00
} STimeWindowAggSupp;
typedef struct SStreamScanInfo {
2022-10-25 16:36:25 +00:00
uint64_t tableUid; // queried super table uid
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SExprSupp tbnameCalSup;
SExprSupp tagCalSup;
int32_t primaryTsIndex; // primary time stamp slot id
SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SColMatchInfo matchInfo;
2022-07-08 09:48:34 +00:00
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times
STqReader* tqReader;
uint64_t groupId;
SUpdateInfo* pUpdateInfo;
2022-09-02 07:34:16 +00:00
EStreamScanMode scanMode;
SOperatorInfo* pStreamScanOp;
SOperatorInfo* pTableScanOp;
SArray* childIds;
SWindowSupporter windowSup;
SPartitionBySupporter partitionSup;
SExprSupp* pPartScalarSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
STimeWindow updateWin;
STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes;
2022-07-07 09:16:12 +00:00
// status for tmq
2022-08-31 03:35:25 +00:00
SNodeList* pGroupTags;
SNode* pTagCond;
SNode* pTagIndexCond;
2022-12-27 06:35:55 +00:00
// recover
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt;
} SStreamScanInfo;
2022-01-20 09:10:28 +00:00
2022-08-31 03:35:25 +00:00
typedef struct {
// int8_t subType;
// bool withMeta;
// int64_t suid;
// int64_t snapVersion;
// void *metaInfo;
// void *dataInfo;
SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader;
SSnapContext* sContext;
} SStreamRawScanInfo;
2022-11-25 15:00:14 +00:00
typedef struct STableCountScanSupp {
int16_t dbNameSlotId;
int16_t stbNameSlotId;
int16_t tbCountSlotId;
bool groupByDbName;
bool groupByStbName;
2022-12-01 11:05:33 +00:00
char dbNameFilter[TSDB_DB_NAME_LEN];
char stbNameFilter[TSDB_TABLE_NAME_LEN];
2022-11-25 15:00:14 +00:00
} STableCountScanSupp;
typedef struct STableCountScanOperatorInfo {
SReadHandle readHandle;
SSDataBlock* pRes;
STableCountScanSupp supp;
2022-12-27 06:35:55 +00:00
int32_t currGrpIdx;
SArray* stbUidList; // when group by db_name and/or stable_name
2022-11-25 15:00:14 +00:00
} STableCountScanOperatorInfo;
2022-11-18 03:53:33 +00:00
typedef struct SOptrBasicInfo {
2022-08-31 03:35:25 +00:00
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
bool mergeResultBlock;
} SOptrBasicInfo;
2022-05-03 07:04:34 +00:00
typedef struct SIntervalAggOperatorInfo {
SOptrBasicInfo binfo; // basic info
2022-05-28 15:26:22 +00:00
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not
SArray* pInterpCols; // interpolation columns
int32_t resultTsOrder; // result timestamp order
int32_t inputOrder; // input data ts order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup;
2022-09-02 07:34:16 +00:00
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
2022-05-03 07:04:34 +00:00
} SIntervalAggOperatorInfo;
typedef struct SMergeAlignedIntervalAggOperatorInfo {
2022-09-02 07:34:16 +00:00
SIntervalAggOperatorInfo* intervalAggOperatorInfo;
uint64_t groupId; // current groupId
int64_t curTs; // current ts
SSDataBlock* prefetchedBlock;
SResultRow* pResultRow;
} SMergeAlignedIntervalAggOperatorInfo;
2022-09-05 02:52:04 +00:00
typedef struct SStreamIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
2022-09-05 02:52:04 +00:00
STimeWindowAggSupp twAggSup;
bool invertible;
bool ignoreExpiredData;
SArray* pDelWins; // SWinRes
2022-09-05 02:52:04 +00:00
int32_t delIndex;
SSDataBlock* pDelRes;
SPhysiNode* pPhyNode; // create new child
2022-06-25 08:12:00 +00:00
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo
2022-06-25 08:12:00 +00:00
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool isFinal;
SArray* pChildren;
SStreamState* pState;
SWinKey delKey;
2022-12-01 06:15:50 +00:00
uint64_t numOfDatapack;
} SStreamIntervalOperatorInfo;
2022-05-17 09:38:21 +00:00
2022-04-08 02:24:35 +00:00
typedef struct SDataGroupInfo {
2022-09-02 07:34:16 +00:00
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
2022-04-08 02:24:35 +00:00
} SDataGroupInfo;
typedef struct SWindowRowsSup {
2022-04-14 14:32:49 +00:00
STimeWindow win;
TSKEY prevTs;
int32_t startRowIndex;
int32_t numOfRows;
uint64_t groupId;
} SWindowRowsSup;
2022-05-20 11:34:39 +00:00
typedef struct SResultWindowInfo {
void* pOutputBuf;
SSessionKey sessionWin;
bool isOutput;
2022-05-20 11:34:39 +00:00
} SResultWindowInfo;
2022-06-04 12:06:07 +00:00
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
2022-10-18 09:44:00 +00:00
SStateKeys* pStateKey;
2022-06-04 12:06:07 +00:00
} SStateWindowInfo;
2022-05-20 11:34:39 +00:00
typedef struct SStreamSessionAggOperatorInfo {
2022-09-02 07:34:16 +00:00
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
2022-09-02 07:34:16 +00:00
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
2022-09-02 07:34:16 +00:00
bool returnUpdate;
2022-10-18 09:44:00 +00:00
SSHashObj* pStDeleted;
2022-09-02 07:34:16 +00:00
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
2022-09-02 07:34:16 +00:00
bool isFinal;
bool ignoreExpiredData;
2022-05-20 11:34:39 +00:00
} SStreamSessionAggOperatorInfo;
2022-10-18 09:44:00 +00:00
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
2022-10-18 09:44:00 +00:00
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
SColumn stateCol;
SSDataBlock* pDelRes;
SSHashObj* pSeDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
2022-10-18 09:44:00 +00:00
bool ignoreExpiredData;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
SOptrBasicInfo binfo;
SPartitionBySupporter partitionSup;
SExprSupp scalarSup;
SExprSupp tbnameCalSup;
2022-10-14 03:10:51 +00:00
SExprSupp tagCalSup;
SHashObj* pPartitions;
void* parIte;
SSDataBlock* pInputDataBlock;
int32_t tsColIndex;
SSDataBlock* pDelRes;
} SStreamPartitionOperatorInfo;
2022-11-20 02:15:26 +00:00
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
SResultRowData cur;
SResultRowData next;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
SExprSupp notFillExprSup;
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
} SStreamFillSupporter;
2022-09-27 10:11:44 +00:00
typedef struct SStreamFillOperatorInfo {
SStreamFillSupporter* pFillSup;
SSDataBlock* pRes;
SSDataBlock* pSrcBlock;
int32_t srcRowIndex;
SSDataBlock* pSrcDelBlock;
int32_t srcDelRowIndex;
SSDataBlock* pDelRes;
2022-10-25 16:36:25 +00:00
SColMatchInfo matchInfo;
int32_t primaryTsCol;
int32_t primarySrcSlotId;
SStreamFillInfo* pFillInfo;
2022-09-27 10:11:44 +00:00
} SStreamFillOperatorInfo;
2022-05-03 06:43:53 +00:00
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
2022-11-09 05:45:46 +00:00
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain);
int32_t optrDummyOpenFn(SOperatorInfo* pOperator);
2022-11-27 16:51:18 +00:00
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
void setOperatorCompleted(SOperatorInfo* pOperator);
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
void* pInfo, SExecTaskInfo* pTaskInfo);
void destroyOperatorInfo(SOperatorInfo* pOperator);
int32_t optrDefaultBufFn(SOperatorInfo* pOperator);
2022-06-18 06:49:27 +00:00
2022-12-27 06:35:55 +00:00
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
2022-11-27 16:51:18 +00:00
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup);
2022-11-27 16:51:18 +00:00
2022-12-27 06:35:55 +00:00
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
2022-11-27 16:27:49 +00:00
2022-11-27 16:51:18 +00:00
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
2022-12-29 03:35:46 +00:00
const char* pkey, void* pState);
2022-11-27 16:51:18 +00:00
void cleanupAggSup(SAggSupporter* pAggSup);
2022-12-27 06:35:55 +00:00
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
2022-11-27 16:27:49 +00:00
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator);
2022-05-03 07:04:34 +00:00
2022-11-27 16:27:49 +00:00
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
2022-09-05 02:19:31 +00:00
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
2022-09-02 07:34:16 +00:00
SOperatorInfo* pOperator);
2022-08-08 04:56:03 +00:00
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
2022-09-02 07:34:16 +00:00
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
2022-11-27 16:27:49 +00:00
extern void doDestroyExchangeOperatorInfo(void* param);
2022-11-09 05:45:46 +00:00
2022-11-04 14:13:40 +00:00
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
2022-11-16 07:18:31 +00:00
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
2022-09-02 07:34:16 +00:00
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
2022-11-04 10:46:48 +00:00
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
2022-06-17 15:23:37 +00:00
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
2022-05-03 06:43:53 +00:00
2022-09-02 07:34:16 +00:00
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
bool isIntervalQuery, SAggSupporter* pSup);
2022-11-27 16:27:49 +00:00
// operator creater functions
// clang-format off
2022-06-05 06:48:15 +00:00
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
2022-11-27 16:27:49 +00:00
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
2022-11-27 16:51:18 +00:00
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
2022-11-27 16:27:49 +00:00
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* handle, STableCountScanPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
2022-10-20 03:27:33 +00:00
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
2022-11-27 16:27:49 +00:00
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo);
2022-11-27 16:27:49 +00:00
SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream);
SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo);
2022-11-27 16:27:49 +00:00
SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo);
2022-11-27 16:27:49 +00:00
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo);
2022-07-08 09:48:34 +00:00
2022-11-27 16:27:49 +00:00
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
2022-07-08 09:48:34 +00:00
2022-11-27 16:27:49 +00:00
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
2022-07-23 01:59:37 +00:00
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo);
2022-07-08 09:48:34 +00:00
2022-11-27 16:27:49 +00:00
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild);
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
2022-06-04 12:06:07 +00:00
2022-11-27 16:27:49 +00:00
SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
2022-11-27 16:51:18 +00:00
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo);
2022-11-27 16:27:49 +00:00
// clang-format on
2022-06-04 12:06:07 +00:00
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
2022-09-02 07:34:16 +00:00
int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
2022-03-21 06:00:30 +00:00
int32_t checkForQueryBuf(size_t numOfTables);
2022-12-27 06:35:55 +00:00
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
2022-11-27 16:51:18 +00:00
2022-04-14 14:32:49 +00:00
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
2022-08-06 10:00:26 +00:00
char* sql, EOPTR_EXEC_MODEL model);
2022-09-02 07:34:16 +00:00
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
2022-07-30 03:30:31 +00:00
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
2022-04-14 14:32:49 +00:00
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
2022-07-10 01:40:35 +00:00
int32_t order);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
2022-09-02 07:34:16 +00:00
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
2022-05-10 07:34:41 +00:00
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
2022-10-18 09:44:00 +00:00
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName);
2022-12-27 06:35:55 +00:00
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
2022-09-02 07:34:16 +00:00
bool groupbyTbname(SNodeList* pGroupList);
int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
2022-10-18 09:44:00 +00:00
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
2022-10-18 09:44:00 +00:00
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
2022-09-27 10:11:44 +00:00
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
2022-11-16 07:18:31 +00:00
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
2022-12-30 01:30:52 +00:00
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos,
int32_t order, int64_t* pData);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_EXECUTORIMPL_H