mirror of
https://github.com/taosdata/TDengine
synced 2026-05-24 10:09:01 +00:00
* docs: remove queryBufferSize configuration description * refactor: remove deprecated queryBufferSize and cacheLazyLoadThreshold configs Both configs were dead code with no actual effect: - queryBufferSize/tsQueryBufferSize/tsQueryBufferSizeBytes: explicitly documented as 'not effective yet'; checkForQueryBuf/releaseQueryBuf were defined but never called anywhere - tsCacheLazyLoadThreshold/cacheLazyLoadThreshold: registered and read from config but never referenced in any business logic Remove variable declarations, definitions, config registrations, config loading, associated functions, cfg file examples, docs, and test entries.
844 lines
33 KiB
C
844 lines
33 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
#ifndef TDENGINE_EXECUTORINT_H
|
|
#define TDENGINE_EXECUTORINT_H
|
|
|
|
#ifdef __cplusplus
|
|
extern "C" {
|
|
#endif
|
|
|
|
#include "os.h"
|
|
#include "tcommon.h"
|
|
#include "theap.h"
|
|
#include "tlosertree.h"
|
|
#include "tsort.h"
|
|
#include "tvariant.h"
|
|
|
|
#include "dataSinkMgt.h"
|
|
#include "executil.h"
|
|
#include "executor.h"
|
|
#include "planner.h"
|
|
#include "scalar.h"
|
|
#include "taosdef.h"
|
|
#include "tarray.h"
|
|
#include "tfill.h"
|
|
#include "thash.h"
|
|
#include "tlockfree.h"
|
|
#include "tmsg.h"
|
|
#include "tpagedbuf.h"
|
|
#include "tlrucache.h"
|
|
#include "tworker.h"
|
|
|
|
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
|
|
|
|
typedef struct STsdbReader STsdbReader;
|
|
typedef struct STqReader STqReader;
|
|
|
|
typedef enum EExtWinMode {
|
|
EEXT_MODE_SCALAR = 1,
|
|
EEXT_MODE_AGG,
|
|
EEXT_MODE_INDEFR_FUNC,
|
|
} EExtWinMode;
|
|
|
|
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
|
|
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
|
|
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
|
|
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
|
|
|
|
#define IS_NON_STREAM_MODE(_task) ((_task)->execModel != OPTR_EXEC_MODEL_STREAM)
|
|
#define IS_STREAM_MODE(_task) ((_task)->execModel == OPTR_EXEC_MODEL_STREAM)
|
|
#define IS_STREAM_SINGLE_GRP(_task) (!(_task)->pStreamRuntimeInfo->funcInfo.isMultiGroupCalc)
|
|
|
|
/**
|
|
* If the number of generated results is greater than this value,
|
|
* query will be halt and return results to client immediate.
|
|
*/
|
|
typedef struct SResultInfo { // TODO refactor
|
|
int64_t totalRows; // total generated result size in rows
|
|
int64_t totalBytes; // total results in bytes.
|
|
int32_t capacity; // capacity of current result output buffer
|
|
int32_t threshold; // result size threshold in rows.
|
|
} SResultInfo;
|
|
|
|
typedef struct STableQueryInfo {
|
|
TSKEY lastKey; // last check ts, todo remove it later
|
|
SResultRowPosition pos; // current active time window
|
|
} STableQueryInfo;
|
|
|
|
typedef struct SLimit {
|
|
int64_t limit; // default -1, no limit
|
|
int64_t offset;
|
|
} SLimit;
|
|
|
|
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
|
|
|
|
enum {
|
|
STREAM_RECOVER_STEP__NONE = 0,
|
|
STREAM_RECOVER_STEP__PREPARE1,
|
|
STREAM_RECOVER_STEP__PREPARE2,
|
|
STREAM_RECOVER_STEP__SCAN1,
|
|
};
|
|
|
|
extern int32_t fetchObjRefPool;
|
|
|
|
typedef struct {
|
|
char* pData;
|
|
bool isNull;
|
|
int16_t type;
|
|
int32_t bytes;
|
|
} SGroupKeys, SStateKeys;
|
|
|
|
typedef struct {
|
|
char* tablename;
|
|
char* dbname;
|
|
int32_t tversion;
|
|
int32_t rversion;
|
|
SSchemaWrapper* sw;
|
|
SSchemaWrapper* qsw;
|
|
} SSchemaInfo;
|
|
|
|
typedef struct SExchangeOpStopInfo {
|
|
int32_t operatorType;
|
|
int64_t refId;
|
|
} SExchangeOpStopInfo;
|
|
|
|
typedef struct SGcOperatorParam {
|
|
int64_t sessionId;
|
|
int32_t downstreamIdx;
|
|
int32_t vgId;
|
|
int64_t tbUid;
|
|
bool needCache;
|
|
} SGcOperatorParam;
|
|
|
|
typedef struct SGcNotifyOperatorParam {
|
|
int32_t downstreamIdx;
|
|
int32_t vgId;
|
|
int64_t tbUid;
|
|
} SGcNotifyOperatorParam;
|
|
|
|
struct SExprSupp {
|
|
SExprInfo* pExprInfo;
|
|
int32_t numOfExprs; // the number of scalar expression in group operator
|
|
SqlFunctionCtx* pCtx;
|
|
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
|
|
SFilterInfo* pFilterInfo;
|
|
bool hasWindowOrGroup; // denote that the function is used with time window or group
|
|
bool hasWindow; // denote that the function is used with time window
|
|
bool hasIndefRowsFunc;
|
|
};
|
|
|
|
typedef enum {
|
|
EX_SOURCE_DATA_NOT_READY = 0x1,
|
|
EX_SOURCE_DATA_STARTED,
|
|
EX_SOURCE_DATA_READY,
|
|
EX_SOURCE_DATA_EXHAUSTED,
|
|
} EX_SOURCE_STATUS;
|
|
|
|
#define COL_MATCH_FROM_COL_ID 0x1
|
|
#define COL_MATCH_FROM_SLOT_ID 0x2
|
|
|
|
typedef struct SLoadRemoteDataInfo {
|
|
uint64_t totalSize; // total load bytes from remote
|
|
uint64_t totalRows; // total number of rows
|
|
uint64_t totalElapsed; // total elapsed time
|
|
} SLoadRemoteDataInfo;
|
|
|
|
typedef struct SLimitInfo {
|
|
SLimit limit;
|
|
SLimit slimit;
|
|
uint64_t currentGroupId;
|
|
int64_t remainGroupOffset;
|
|
int64_t numOfOutputGroups;
|
|
int64_t remainOffset;
|
|
int64_t numOfOutputRows;
|
|
} SLimitInfo;
|
|
|
|
typedef struct SSortMergeJoinOperatorParam {
|
|
bool initDownstream;
|
|
} SSortMergeJoinOperatorParam;
|
|
|
|
typedef enum EExchangeSourceType {
|
|
EX_SRC_TYPE_STB_JOIN_SCAN = 1,
|
|
EX_SRC_TYPE_VSTB_SCAN,
|
|
EX_SRC_TYPE_VSTB_WIN_SCAN,
|
|
EX_SRC_TYPE_VSTB_AGG_SCAN,
|
|
EX_SRC_TYPE_VSTB_TAG_SCAN,
|
|
EX_SRC_TYPE_VTB_WIN_SCAN,
|
|
EX_SRC_TYPE_VSTB_TS_SCAN,
|
|
EX_SRC_TYPE_VSTB_INTERVAL_SCAN,
|
|
EX_SRC_TYPE_VSTB_PART_INTERVAL_SCAN,
|
|
} EExchangeSourceType;
|
|
|
|
typedef enum {
|
|
DYN_TYPE_EXCHANGE_PARAM = 1,
|
|
NOTIFY_TYPE_EXCHANGE_PARAM,
|
|
} EExchangeGetParamType;
|
|
|
|
typedef struct SExchangeOperatorBasicParam {
|
|
EExchangeGetParamType paramType;
|
|
/* dynamic scan params */
|
|
int32_t vgId;
|
|
int32_t srcOpType;
|
|
bool tableSeq;
|
|
SArray* uidList;
|
|
EExchangeSourceType type;
|
|
bool isNewDeployed; // used with newDeployedSrc
|
|
bool isNewParam;
|
|
uint64_t groupid;
|
|
SOrgTbInfo* orgTbInfo;
|
|
SArray* batchOrgTbInfo; // SArray<SOrgTbInfo>
|
|
SArray* tagList;
|
|
STimeWindow window;
|
|
SDownstreamSourceNode newDeployedSrc; // used with isNewDeployed
|
|
/* notify scan params */
|
|
TSKEY notifyTs;
|
|
} SExchangeOperatorBasicParam;
|
|
|
|
typedef struct SExchangeOperatorBatchParam {
|
|
bool multiParams;
|
|
SSHashObj* pBatchs; // SExchangeOperatorBasicParam
|
|
} SExchangeOperatorBatchParam;
|
|
|
|
typedef struct SExchangeOperatorParam {
|
|
bool multiParams;
|
|
SExchangeOperatorBasicParam basic;
|
|
} SExchangeOperatorParam;
|
|
|
|
typedef struct SExchangeSrcIndex {
|
|
int32_t srcIdx;
|
|
int32_t inUseIdx;
|
|
} SExchangeSrcIndex;
|
|
|
|
typedef struct SExchangeInfo {
|
|
bool isExchange; // KEEP IT FIRST
|
|
int64_t seqId;
|
|
SArray* pSources;
|
|
SSHashObj* pHashSources;
|
|
SArray* pSourceDataInfo;
|
|
tsem_t ready;
|
|
void* pTransporter;
|
|
|
|
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
|
|
// passed by downstream operator
|
|
SArray* pResultBlockList;
|
|
SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
|
|
SSDataBlock* pDummyBlock; // dummy block, not keep data
|
|
bool seqLoadData; // sequential load data or not, false by default
|
|
bool dynamicOp;
|
|
bool dynTbname; // %%tbname for stream
|
|
int32_t current;
|
|
SLoadRemoteDataInfo loadInfo;
|
|
int64_t self;
|
|
SLimitInfo limitInfo;
|
|
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
|
|
char* pTaskId;
|
|
SArray* pFetchRpcHandles;
|
|
bool notifyToSend; // need to send notify STEP DONE message
|
|
TSKEY notifyTs; // notify timestamp
|
|
} SExchangeInfo;
|
|
|
|
typedef struct SScanInfo {
|
|
int32_t numOfAsc;
|
|
int32_t numOfDesc;
|
|
} SScanInfo;
|
|
|
|
typedef struct SSampleExecInfo {
|
|
double sampleRatio; // data block sample ratio, 1 by default
|
|
uint32_t seed; // random seed value
|
|
} SSampleExecInfo;
|
|
|
|
enum {
|
|
TABLE_SCAN__TABLE_ORDER = 1,
|
|
TABLE_SCAN__BLOCK_ORDER = 2,
|
|
};
|
|
|
|
typedef enum ETableCountState {
|
|
TABLE_COUNT_STATE_NONE = 0, // before start scan
|
|
TABLE_COUNT_STATE_SCAN = 1, // cur group scanning
|
|
TABLE_COUNT_STATE_PROCESSED = 2, // cur group processed
|
|
TABLE_COUNT_STATE_END = 3, // finish or noneed to process
|
|
} ETableCountState;
|
|
|
|
struct SAggSupporter {
|
|
SSHashObj* pResultRowHashTable; // quick locate the window object for each result
|
|
char* keyBuf; // window key buffer
|
|
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
|
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
|
int32_t currentPageId; // current write page id
|
|
};
|
|
|
|
typedef struct {
|
|
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
|
|
// current data block needs to be loaded.
|
|
SInterval interval;
|
|
SAggSupporter* pAggSup;
|
|
SExprSupp* pExprSup; // expr supporter of aggregate operator
|
|
} SAggOptrPushDownInfo;
|
|
|
|
typedef struct STableMetaCacheInfo {
|
|
SLRUCache* pTableMetaEntryCache; // 100 by default
|
|
uint64_t metaFetch;
|
|
uint64_t cacheHit;
|
|
} STableMetaCacheInfo;
|
|
|
|
typedef struct STableScanBase {
|
|
STsdbReader* dataReader;
|
|
SFileBlockLoadRecorder readRecorder;
|
|
SQueryTableDataCond cond;
|
|
SQueryTableDataCond orgCond; // use for virtual super table scan
|
|
SAggOptrPushDownInfo pdInfo;
|
|
SColMatchInfo matchInfo;
|
|
SReadHandle readHandle;
|
|
SExprSupp pseudoSup;
|
|
STableMetaCacheInfo metaCache;
|
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
|
int32_t dataBlockLoadFlag;
|
|
SLimitInfo limitInfo;
|
|
// there are more than one table list exists in one task, if only one vnode exists.
|
|
STableListInfo* pTableListInfo;
|
|
TsdReader readerAPI;
|
|
} STableScanBase;
|
|
|
|
typedef struct STableScanInfo {
|
|
STableScanBase base;
|
|
SScanInfo scanInfo;
|
|
int32_t scanTimes;
|
|
SSDataBlock* pResBlock;
|
|
SHashObj* pIgnoreTables;
|
|
SSampleExecInfo sample; // sample execution info
|
|
int32_t tableStartIndex; // current group scan start
|
|
int32_t tableEndIndex; // current group scan end
|
|
int32_t currentGroupId;
|
|
int32_t currentTable;
|
|
int8_t scanMode;
|
|
int8_t assignBlockUid;
|
|
uint8_t countState; // empty table count state
|
|
bool hasGroupByTag;
|
|
bool filesetDelimited;
|
|
bool needCountEmptyTable;
|
|
// for virtual super table scan
|
|
SSDataBlock* pOrgBlock;
|
|
bool ignoreTag;
|
|
bool virtualStableScan;
|
|
SHashObj* readerCache;
|
|
bool newReader;
|
|
SArray* pBlockColMap;
|
|
// for virtual super table batch scan
|
|
int32_t lastBatchIdx;
|
|
int32_t currentBatchIdx;
|
|
STimeWindow lastTimeWindow;
|
|
SArray* lastColArray;
|
|
SArray* lastBlockColArray;
|
|
SArray* pBatchColMap; // SArray<SOrgTbInfo>
|
|
STimeWindow cachedTimeWindow;
|
|
SArray* cachedTagList;
|
|
uint64_t cachedGroupId;
|
|
} STableScanInfo;
|
|
|
|
typedef enum ESubTableInputType {
|
|
SUB_TABLE_MEM_BLOCK,
|
|
SUB_TABLE_EXT_PAGES,
|
|
} ESubTableInputType;
|
|
|
|
typedef struct STmsSubTableInput {
|
|
STsdbReader* pReader;
|
|
SQueryTableDataCond tblCond;
|
|
STableKeyInfo* pKeyInfo;
|
|
bool bInMemReader;
|
|
ESubTableInputType type;
|
|
SSDataBlock* pReaderBlock;
|
|
|
|
SArray* aBlockPages;
|
|
SSDataBlock* pPageBlock;
|
|
int32_t pageIdx;
|
|
|
|
int32_t rowIdx;
|
|
int64_t* aTs;
|
|
SSDataBlock* pInputBlock;
|
|
} STmsSubTableInput;
|
|
|
|
typedef struct SBlockOrderInfo SBlockOrderInfo;
|
|
typedef struct STmsSubTablesMergeInfo {
|
|
SBlockOrderInfo* pTsOrderInfo;
|
|
SBlockOrderInfo* pPkOrderInfo;
|
|
|
|
int32_t numSubTables;
|
|
STmsSubTableInput* aInputs;
|
|
SMultiwayMergeTreeInfo* pTree;
|
|
int32_t numSubTablesCompleted;
|
|
|
|
int32_t numTableBlocksInMem;
|
|
SDiskbasedBuf* pBlocksBuf;
|
|
|
|
int32_t numInMemReaders;
|
|
} STmsSubTablesMergeInfo;
|
|
|
|
typedef struct STableMergeScanInfo {
|
|
int32_t tableStartIndex;
|
|
int32_t tableEndIndex;
|
|
bool hasGroupId;
|
|
uint64_t groupId;
|
|
STableScanBase base;
|
|
int32_t bufPageSize;
|
|
uint32_t sortBufSize; // max buffer size for in-memory sort
|
|
SArray* pSortInfo;
|
|
SSortHandle* pSortHandle;
|
|
SSDataBlock* pSortInputBlock;
|
|
SSDataBlock* pReaderBlock;
|
|
int64_t startTs; // sort start time
|
|
SLimitInfo limitInfo;
|
|
int64_t numOfRows;
|
|
SScanInfo scanInfo;
|
|
int32_t scanTimes;
|
|
int32_t readIdx;
|
|
SSDataBlock* pResBlock;
|
|
SSampleExecInfo sample; // sample execution info
|
|
SSHashObj* mTableNumRows; // uid->num of table rows
|
|
SHashObj* mSkipTables;
|
|
int64_t mergeLimit;
|
|
SSortExecInfo sortExecInfo;
|
|
bool needCountEmptyTable;
|
|
bool bGroupProcessed; // the group return data means processed
|
|
bool filesetDelimited;
|
|
bool bNewFilesetEvent;
|
|
bool bNextDurationBlockEvent;
|
|
int32_t numNextDurationBlocks;
|
|
SSDataBlock* nextDurationBlocks[2];
|
|
bool rtnNextDurationBlocks;
|
|
int32_t nextDurationBlocksIdx;
|
|
bool bSortRowId;
|
|
|
|
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
|
|
} STableMergeScanInfo;
|
|
|
|
typedef struct STagScanFilterContext {
|
|
SHashObj* colHash;
|
|
int32_t index;
|
|
SArray* cInfoList;
|
|
int32_t code;
|
|
} STagScanFilterContext;
|
|
|
|
typedef struct STagScanInfo {
|
|
SColumnInfo* pCols;
|
|
SSDataBlock* pRes;
|
|
SColMatchInfo matchInfo;
|
|
int32_t curPos;
|
|
SReadHandle readHandle;
|
|
STableListInfo* pTableListInfo;
|
|
uint64_t suid;
|
|
void* pCtbCursor;
|
|
SNode* pTagCond;
|
|
SNode* pTagIndexCond;
|
|
STagScanFilterContext filterCtx;
|
|
SArray* aUidTags; // SArray<STUidTagInfo>
|
|
SArray* aFilterIdxs; // SArray<int32_t>
|
|
SStorageAPI* pStorageAPI;
|
|
SLimitInfo limitInfo;
|
|
} STagScanInfo;
|
|
|
|
|
|
enum {
|
|
PROJECT_RETRIEVE_CONTINUE = 0x1,
|
|
PROJECT_RETRIEVE_DONE = 0x2,
|
|
};
|
|
|
|
typedef struct SPartitionBySupporter {
|
|
SArray* pGroupCols; // group by columns, SArray<SColumn>
|
|
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
|
|
char* keyBuf; // group by keys for hash
|
|
bool needCalc; // partition by column
|
|
} SPartitionBySupporter;
|
|
|
|
typedef struct SPartitionDataInfo {
|
|
uint64_t groupId;
|
|
char* tbname;
|
|
SArray* rowIds;
|
|
} SPartitionDataInfo;
|
|
|
|
typedef struct STimeWindowAggSupp {
|
|
TSKEY maxTs;
|
|
TSKEY minTs;
|
|
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
|
} STimeWindowAggSupp;
|
|
|
|
typedef struct STmqQueryScanInfo {
|
|
SExprInfo* pPseudoExpr;
|
|
int32_t numOfPseudoExpr;
|
|
SExprSupp tagCalSup;
|
|
int32_t primaryTsIndex; // primary time stamp slot id
|
|
SReadHandle readHandle;
|
|
SColMatchInfo matchInfo;
|
|
SHashObj* pCol2SlotId;
|
|
|
|
SSDataBlock* pRes; // result SSDataBlock
|
|
STqReader* tqReader;
|
|
|
|
STableListInfo* pTableListInfo;
|
|
|
|
struct SOperatorInfo* pTmqScanOp;
|
|
struct SOperatorInfo* pTableScanOp;
|
|
// status for tmq
|
|
SNodeList* pGroupTags;
|
|
SNode* pTagCond;
|
|
SNode* pTagIndexCond;
|
|
SStoreTqReader readerFn;
|
|
} STmqQueryScanInfo;
|
|
|
|
typedef struct {
|
|
struct SVnode* vnode; // todo remove this
|
|
SSDataBlock pRes; // result SSDataBlock
|
|
STsdbReader* dataReader;
|
|
struct SSnapContext* sContext;
|
|
SStorageAPI* pAPI;
|
|
STableListInfo* pTableListInfo;
|
|
} STmqRawScanInfo;
|
|
|
|
typedef struct STableCountScanSupp {
|
|
int16_t dbNameSlotId;
|
|
int16_t stbNameSlotId;
|
|
int16_t tbCountSlotId;
|
|
bool groupByDbName;
|
|
bool groupByStbName;
|
|
char dbNameFilter[TSDB_DB_NAME_LEN];
|
|
char stbNameFilter[TSDB_TABLE_NAME_LEN];
|
|
} STableCountScanSupp;
|
|
|
|
typedef struct SOptrBasicInfo {
|
|
SResultRowInfo resultRowInfo;
|
|
SSDataBlock* pRes;
|
|
bool mergeResultBlock;
|
|
int32_t inputTsOrder;
|
|
int32_t outputTsOrder;
|
|
} SOptrBasicInfo;
|
|
|
|
typedef struct SIntervalAggOperatorInfo {
|
|
SOptrBasicInfo binfo; // basic info
|
|
SAggSupporter aggSup; // aggregate supporter
|
|
SExprSupp scalarSupp; // supporter for perform scalar function
|
|
SGroupResInfo groupResInfo; // multiple results build supporter
|
|
SInterval interval; // interval info
|
|
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
|
|
STimeWindow win; // query time range
|
|
bool timeWindowInterpo; // interpolation needed or not
|
|
SArray* pInterpCols; // interpolation columns
|
|
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
|
STimeWindowAggSupp twAggSup;
|
|
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
|
|
bool cleanGroupResInfo;
|
|
struct SOperatorInfo* pOperator;
|
|
// for limit optimization
|
|
bool limited;
|
|
int64_t limit;
|
|
bool slimited;
|
|
int64_t slimit;
|
|
uint64_t curGroupId; // initialize to UINT64_MAX
|
|
uint64_t handledGroupNum;
|
|
BoundedQueue* pBQ;
|
|
} SIntervalAggOperatorInfo;
|
|
|
|
typedef struct SMergeAlignedIntervalAggOperatorInfo {
|
|
SIntervalAggOperatorInfo* intervalAggOperatorInfo;
|
|
|
|
uint64_t groupId; // current groupId
|
|
int64_t curTs; // current ts
|
|
SSDataBlock* prefetchedBlock;
|
|
SResultRow* pResultRow;
|
|
} SMergeAlignedIntervalAggOperatorInfo;
|
|
|
|
typedef struct SOpCheckPointInfo {
|
|
uint16_t checkPointId;
|
|
SHashObj* children; // key:child id
|
|
} SOpCheckPointInfo;
|
|
|
|
typedef struct SDataGroupInfo {
|
|
uint64_t groupId;
|
|
int64_t numOfRows;
|
|
SArray* pPageList;
|
|
SArray* blockForNotLoaded; // SSDataBlock that data is not loaded
|
|
int32_t offsetForNotLoaded; // read offset for SSDataBlock that data is not loaded
|
|
} SDataGroupInfo;
|
|
|
|
typedef struct SWindowRowsSup {
|
|
STimeWindow win;
|
|
TSKEY prevTs; // previous timestamp, used for window aggregation
|
|
int32_t startRowIndex;
|
|
int32_t numOfRows;
|
|
uint64_t groupId;
|
|
uint32_t numNullRows; // number of continuous rows with null state col
|
|
TSKEY lastTs; // last row's timestamp, used for checking duplicated ts
|
|
} SWindowRowsSup;
|
|
|
|
// return true if there are continuous rows with null state col
|
|
// state window operator needs to handle these rows specially
|
|
static inline bool hasContinuousNullRows(SWindowRowsSup* pRowSup) {
|
|
return pRowSup->numNullRows > 0;
|
|
}
|
|
|
|
// reset on initialization or found of a row with non-null state col
|
|
static inline void resetNumNullRows(SWindowRowsSup* pRowSup) {
|
|
pRowSup->numNullRows = 0;
|
|
}
|
|
|
|
static inline void resetWindowRowsSup(SWindowRowsSup* pRowSup) {
|
|
if (NULL == pRowSup) {
|
|
return;
|
|
}
|
|
|
|
pRowSup->win.skey = pRowSup->win.ekey = 0;
|
|
pRowSup->prevTs = INT64_MIN;
|
|
pRowSup->startRowIndex = pRowSup->groupId = 0;
|
|
pRowSup->numOfRows = pRowSup->numNullRows = 0;
|
|
}
|
|
|
|
typedef int32_t (*AggImplFn)(struct SOperatorInfo* pOperator, SSDataBlock* pBlock);
|
|
|
|
typedef struct SSessionAggOperatorInfo {
|
|
SOptrBasicInfo binfo;
|
|
SAggSupporter aggSup;
|
|
SExprSupp scalarSupp; // supporter for perform scalar function
|
|
SGroupResInfo groupResInfo;
|
|
SWindowRowsSup winSup;
|
|
bool reptScan; // next round scan
|
|
int64_t gap; // session window gap
|
|
int32_t tsSlotId; // primary timestamp slot id
|
|
STimeWindowAggSupp twAggSup;
|
|
struct SOperatorInfo* pOperator;
|
|
bool cleanGroupResInfo;
|
|
} SSessionAggOperatorInfo;
|
|
|
|
typedef struct SStateWindowOperatorInfo {
|
|
SOptrBasicInfo binfo;
|
|
SAggSupporter aggSup;
|
|
SExprSupp scalarSup;
|
|
SGroupResInfo groupResInfo;
|
|
SWindowRowsSup winSup;
|
|
SColumn stateCol;
|
|
bool hasKey; // has key means the state window has started
|
|
SStateKeys stateKey;
|
|
int32_t tsSlotId; // primary timestamp column slot id
|
|
STimeWindowAggSupp twAggSup;
|
|
struct SOperatorInfo* pOperator;
|
|
bool cleanGroupResInfo;
|
|
STrueForInfo trueForInfo;
|
|
EStateWinExtendOption extendOption;
|
|
} SStateWindowOperatorInfo;
|
|
|
|
|
|
typedef struct SEventWindowOperatorInfo {
|
|
SOptrBasicInfo binfo;
|
|
SAggSupporter aggSup;
|
|
SExprSupp scalarSup;
|
|
SWindowRowsSup winSup;
|
|
int32_t tsSlotId; // primary timestamp column slot id
|
|
STimeWindowAggSupp twAggSup;
|
|
uint64_t groupId; // current group id, used to identify the data block from different groups
|
|
SFilterInfo* pStartCondInfo;
|
|
SFilterInfo* pEndCondInfo;
|
|
bool inWindow;
|
|
SResultRow* pRow;
|
|
SSDataBlock* pPreDataBlock;
|
|
struct SOperatorInfo* pOperator;
|
|
STrueForInfo trueForInfo;
|
|
} SEventWindowOperatorInfo;
|
|
|
|
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
|
|
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
|
|
#define OPTR_CLR_OPENED(_optr) ((_optr)->status &= ~OP_OPENED)
|
|
|
|
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
|
|
|
|
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName,
|
|
SExecTaskInfo* pTaskInfo);
|
|
void cleanupQueriedTableScanInfo(void* p);
|
|
|
|
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
|
|
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
|
|
|
|
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr, SFunctionStateStore* pStore);
|
|
void checkIndefRowsFuncs(SExprSupp* pSup);
|
|
void cleanupExprSupp(SExprSupp* pSup);
|
|
void cleanupExprSuppWithoutFilter(SExprSupp* pSupp);
|
|
|
|
void cleanupResultInfoInHashMap(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
|
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap);
|
|
void cleanupResultInfo(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SGroupResInfo* pGroupResInfo,
|
|
SAggSupporter *pAggSup, bool cleanHashmap);
|
|
void cleanupResultInfoWithoutHash(SExecTaskInfo* pTaskInfo, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
|
SGroupResInfo* pGroupResInfo);
|
|
|
|
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
|
const char* pkey, void* pState, SFunctionStateStore* pStore);
|
|
void cleanupAggSup(SAggSupporter* pAggSup);
|
|
|
|
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
|
|
|
|
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
|
SDiskbasedBuf* pBuf);
|
|
|
|
/**
|
|
* @brief copydata from hash table, instead of copying from SGroupResInfo's pRow
|
|
*/
|
|
void doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
|
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup);
|
|
void doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
|
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup, STrueForInfo *pTrueForInfo);
|
|
|
|
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
|
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
|
|
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
|
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
|
|
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
|
|
|
int32_t applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
|
|
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
|
|
|
|
int32_t setFunctionResultOutput(struct SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage,
|
|
int32_t numOfExprs);
|
|
int32_t setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pResList);
|
|
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart, bool isVstbScan);
|
|
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
|
|
struct SOperatorInfo* pOperator);
|
|
|
|
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
|
|
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, int64_t* defaultBufsz);
|
|
|
|
extern void doDestroyExchangeOperatorInfo(void* param);
|
|
|
|
int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo, SColumnInfoData** pRet);
|
|
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
|
|
int32_t rows, SExecTaskInfo* pTask, STableMetaCacheInfo* pCache);
|
|
|
|
int32_t appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
|
int32_t setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
|
|
int32_t* rowEntryInfoOffset);
|
|
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
|
|
|
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
|
|
int32_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
|
|
bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
|
|
|
|
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
|
|
int32_t numOfOutput, SArray* pPseudoList, const void* pExtraParams);
|
|
int32_t projectApplyFunctionsWithSelect(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock,
|
|
SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList,
|
|
const void* pExtraParams, bool doSelectFunc, bool hasIndefRowsFunc);
|
|
|
|
int32_t setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
|
|
bool createDummyCol);
|
|
|
|
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle);
|
|
|
|
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
|
|
int32_t order);
|
|
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
|
|
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
|
|
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
|
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
|
|
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
|
|
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
|
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
|
|
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
|
|
|
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
|
|
|
|
void finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
|
|
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
|
|
|
bool groupbyTbname(SNodeList* pGroupList);
|
|
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
|
|
int64_t* pData);
|
|
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
|
|
|
|
int32_t copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
|
|
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
|
|
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
|
|
|
|
void streamOpReleaseState(struct SOperatorInfo* pOperator);
|
|
void streamOpReloadState(struct SOperatorInfo* pOperator);
|
|
void clearGroupResInfo(SGroupResInfo* pGroupResInfo);
|
|
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
|
SSDataBlock* pResultBlock, SFunctionStateStore* pStore);
|
|
int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
|
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
|
void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
|
|
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
|
|
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed);
|
|
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar);
|
|
int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2);
|
|
void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins);
|
|
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
|
|
int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
|
|
struct SOperatorInfo* pOperator, int64_t winDelta);
|
|
void setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo);
|
|
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated);
|
|
int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key);
|
|
void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite,
|
|
SGroupResInfo* pGroupResInfo);
|
|
void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
|
|
SSDataBlock* pBlock, SArray* pSessionKeys);
|
|
void resetWinRange(STimeWindow* winRange);
|
|
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
|
|
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
|
|
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
|
|
void destroyFlusedPos(void* pRes);
|
|
bool isIrowtsPseudoColumn(SExprInfo* pExprInfo);
|
|
bool isIsfilledPseudoColumn(SExprInfo* pExprInfo);
|
|
bool isInterpFunc(SExprInfo* pExprInfo);
|
|
bool isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo);
|
|
|
|
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
|
|
void* decodeSSessionKey(void* buf, SSessionKey* key);
|
|
int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen);
|
|
void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen);
|
|
int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup);
|
|
void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup);
|
|
|
|
void destroyOperatorParamValue(void* pValues);
|
|
int32_t mergeOperatorParams(SOperatorParam* pDst, SOperatorParam* pSrc);
|
|
int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, bool tableSeq);
|
|
int32_t buildTableScanOperatorParamEx(SOperatorParam** ppRes, SArray* pUidList, int32_t srcOpType, SOrgTbInfo *pMap, bool tableSeq, STimeWindow *window, bool isNewParam, ETableScanDynType type);
|
|
int32_t buildTableScanOperatorParamNotify(SOperatorParam** ppRes,
|
|
int32_t srcOpType, TSKEY notifyTs);
|
|
void freeExchangeGetBasicOperatorParam(void* pParam);
|
|
void freeResetOperatorParams(struct SOperatorInfo* pOperator, SOperatorParamType type, bool allFree);
|
|
int32_t getNextBlockFromDownstreamImpl(struct SOperatorInfo* pOperator, int32_t idx, bool clearParam,
|
|
SSDataBlock** pResBlock);
|
|
|
|
int32_t saveDeleteInfo(SArray* pWins, SSessionKey key);
|
|
int32_t copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted);
|
|
int32_t copyDeleteSessionKey(SSHashObj* source, SSHashObj* dest);
|
|
|
|
bool inSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, const SDataBlockInfo* pBlockInfo);
|
|
bool inCalSlidingWindow(const SInterval* pInterval, const STimeWindow* pWin, TSKEY calStart, TSKEY calEnd);
|
|
bool compareVal(const char* v, const SStateKeys* pKey);
|
|
bool inWinRange(STimeWindow* range, STimeWindow* cur);
|
|
|
|
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
|
|
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
|
|
int32_t extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
|
|
bool getIgoreNullRes(SExprSupp* pExprSup);
|
|
bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull);
|
|
STrueForInfo* getTrueForInfo(struct SOperatorInfo* pOperator);
|
|
|
|
void destroyTmqScanOperatorInfo(void* param);
|
|
void resetBasicOperatorState(SOptrBasicInfo* pBasicInfo);
|
|
|
|
int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size);
|
|
|
|
#ifdef __cplusplus
|
|
}
|
|
#endif
|
|
|
|
#endif // TDENGINE_EXECUTORINT_H
|