2021-11-02 05:37:31 +00:00
/*
* Copyright ( c ) 2019 TAOS Data , Inc . < jhtao @ taosdata . com >
*
* This program is free software : you can use , redistribute , and / or modify
* it under the terms of the GNU Affero General Public License , version 3
* or later ( " AGPL " ) , as published by the Free Software Foundation .
*
* This program is distributed in the hope that it will be useful , but WITHOUT
* ANY WARRANTY ; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE .
*
* You should have received a copy of the GNU Affero General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
*/
2022-04-24 06:19:12 +00:00
// clang-format off
2021-11-02 05:37:31 +00:00
# ifndef TDENGINE_EXECUTORIMPL_H
# define TDENGINE_EXECUTORIMPL_H
2022-02-08 02:21:00 +00:00
# ifdef __cplusplus
extern " C " {
# endif
2021-11-02 05:37:31 +00:00
# include "os.h"
2022-02-28 09:55:07 +00:00
# include "tcommon.h"
2022-02-08 02:21:00 +00:00
# include "tlosertree.h"
2022-04-14 14:32:49 +00:00
# include "tsort.h"
2021-11-02 05:37:31 +00:00
# include "ttszip.h"
# include "tvariant.h"
2022-01-11 02:51:23 +00:00
# include "dataSinkMgt.h"
2021-11-02 05:37:31 +00:00
# include "executil.h"
2022-02-10 05:55:53 +00:00
# include "executor.h"
2022-01-11 02:51:23 +00:00
# include "planner.h"
2022-02-17 11:30:43 +00:00
# include "scalar.h"
2021-11-02 05:37:31 +00:00
# include "taosdef.h"
# include "tarray.h"
2022-01-11 02:51:23 +00:00
# include "thash.h"
2021-11-02 05:37:31 +00:00
# include "tlockfree.h"
2022-04-02 11:31:52 +00:00
# include "tmsg.h"
2022-04-14 14:32:49 +00:00
# include "tpagedbuf.h"
2022-05-05 11:50:11 +00:00
# include "tstreamUpdate.h"
2022-01-08 15:19:46 +00:00
2022-07-08 09:48:34 +00:00
# include "vnode.h"
2022-05-02 15:52:32 +00:00
# include "executorInt.h"
2022-04-24 06:19:12 +00:00
2021-11-02 05:37:31 +00:00
typedef int32_t ( * __block_search_fn_t ) ( char * data , int32_t num , int64_t key , int32_t order ) ;
2022-04-14 14:32:49 +00:00
# define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
# define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
2021-11-02 05:37:31 +00:00
# define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
2022-02-08 02:21:00 +00:00
# define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0)
2021-11-02 05:37:31 +00:00
2022-07-08 09:48:34 +00:00
# define START_TS_COLUMN_INDEX 0
# define END_TS_COLUMN_INDEX 1
# define UID_COLUMN_INDEX 2
# define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX
2022-07-05 09:53:52 +00:00
# define DELETE_GROUPID_COLUMN_INDEX 2
2021-11-02 05:37:31 +00:00
enum {
2022-01-10 11:48:21 +00:00
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u ,
2021-11-02 05:37:31 +00:00
2022-01-10 11:48:21 +00:00
/* Task is over
2021-11-02 05:37:31 +00:00
* 1. this status is used in one row result query process , e . g . , count / sum / first / last / avg . . . etc .
* 2. when all data within queried time window , it is also denoted as query_completed
*/
2022-01-10 11:48:21 +00:00
TASK_COMPLETED = 0x2u ,
2021-11-02 05:37:31 +00:00
} ;
/**
* If the number of generated results is greater than this value ,
* query query will be halt and return results to client immediate .
*/
2022-04-14 14:32:49 +00:00
typedef struct SResultInfo { // TODO refactor
int64_t totalRows ; // total generated result size in rows
int64_t totalBytes ; // total results in bytes.
int32_t capacity ; // capacity of current result output buffer
int32_t threshold ; // result size threshold in rows.
2022-03-24 03:15:05 +00:00
} SResultInfo ;
2021-11-02 05:37:31 +00:00
typedef struct STableQueryInfo {
2022-07-08 09:48:34 +00:00
TSKEY lastKey ; // last check ts, todo remove it later
SResultRowPosition pos ; // current active time window
2021-11-02 05:37:31 +00:00
} STableQueryInfo ;
2022-04-02 07:08:48 +00:00
typedef struct SLimit {
int64_t limit ;
int64_t offset ;
} SLimit ;
2022-05-24 03:29:51 +00:00
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder ;
2022-05-03 15:52:17 +00:00
2022-01-08 14:59:24 +00:00
typedef struct STaskCostInfo {
2022-05-26 09:18:56 +00:00
int64_t created ;
int64_t start ;
2022-02-08 02:21:00 +00:00
uint64_t loadStatisTime ;
uint64_t loadFileBlockTime ;
uint64_t loadDataInCacheTime ;
uint64_t loadStatisSize ;
uint64_t loadFileBlockSize ;
uint64_t loadDataInCacheSize ;
uint64_t loadDataTime ;
2022-05-03 15:52:17 +00:00
SFileBlockLoadRecorder * pRecoder ;
2022-07-08 09:48:34 +00:00
uint64_t elapsedTime ;
2022-05-03 15:52:17 +00:00
2022-02-08 02:21:00 +00:00
uint64_t firstStageMergeTime ;
uint64_t winInfoSize ;
uint64_t tableInfoSize ;
uint64_t hashSize ;
uint64_t numOfTimeWindows ;
SArray * queryProfEvents ; // SArray<SQueryProfEvent>
SHashObj * operatorProfResults ; // map<operator_type, SQueryProfEvent>
2022-01-08 14:59:24 +00:00
} STaskCostInfo ;
2021-11-02 05:37:31 +00:00
2022-03-04 06:31:21 +00:00
typedef struct SOperatorCostInfo {
2022-07-08 09:48:34 +00:00
double openCost ;
double totalCost ;
2022-03-04 06:31:21 +00:00
} SOperatorCostInfo ;
2021-11-02 05:37:31 +00:00
struct SOperatorInfo ;
2022-05-28 15:26:22 +00:00
typedef int32_t ( * __optr_encode_fn_t ) ( struct SOperatorInfo * pOperator , char * * result , int32_t * length ) ;
2022-05-30 04:00:51 +00:00
typedef int32_t ( * __optr_decode_fn_t ) ( struct SOperatorInfo * pOperator , char * result ) ;
2022-03-29 12:07:38 +00:00
2022-03-30 06:54:00 +00:00
typedef int32_t ( * __optr_open_fn_t ) ( struct SOperatorInfo * pOptr ) ;
2022-05-03 07:27:13 +00:00
typedef SSDataBlock * ( * __optr_fn_t ) ( struct SOperatorInfo * pOptr ) ;
2022-03-12 10:02:56 +00:00
typedef void ( * __optr_close_fn_t ) ( void * param , int32_t num ) ;
2022-05-24 03:29:51 +00:00
typedef int32_t ( * __optr_explain_fn_t ) ( struct SOperatorInfo * pOptr , void * * pOptrExplain , uint32_t * len ) ;
2022-03-12 10:02:56 +00:00
2022-01-08 14:59:24 +00:00
typedef struct STaskIdInfo {
2022-02-08 02:21:00 +00:00
uint64_t queryId ; // this is also a request id
uint64_t subplanId ;
uint64_t templateId ;
char * str ;
2022-01-08 14:59:24 +00:00
} STaskIdInfo ;
2022-07-08 09:48:34 +00:00
typedef struct {
STqOffsetVal prepareStatus ; // for tmq
STqOffsetVal lastStatus ; // for tmq
void * metaBlk ; // for tmq fetching meta
SSDataBlock * pullOverBlk ; // for streaming
SWalFilterCond cond ;
2022-07-10 05:56:31 +00:00
int64_t lastScanUid ;
2022-07-08 09:48:34 +00:00
} SStreamTaskInfo ;
2022-01-10 11:48:21 +00:00
typedef struct SExecTaskInfo {
2022-07-08 09:48:34 +00:00
STaskIdInfo id ;
uint32_t status ;
STimeWindow window ;
STaskCostInfo cost ;
int64_t owner ; // if it is in execution
int32_t code ;
SStreamTaskInfo streamInfo ;
2022-05-18 12:39:00 +00:00
struct {
2022-07-08 09:48:34 +00:00
char * tablename ;
char * dbname ;
int32_t tversion ;
SSchemaWrapper * sw ;
2022-05-18 12:39:00 +00:00
} schemaVer ;
2022-07-08 09:48:34 +00:00
STableListInfo tableqinfoList ; // this is a table list
const char * sql ; // query sql string
jmp_buf env ; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel ; // operator execution model [batch model|stream model]
2022-02-08 02:21:00 +00:00
struct SOperatorInfo * pRoot ;
2022-01-10 11:48:21 +00:00
} SExecTaskInfo ;
2022-01-08 14:59:24 +00:00
2021-11-02 05:37:31 +00:00
enum {
2022-06-15 08:26:43 +00:00
OP_NOT_OPENED = 0x0 ,
OP_OPENED = 0x1 ,
2022-03-21 10:31:31 +00:00
OP_RES_TO_RETURN = 0x5 ,
2022-06-15 08:26:43 +00:00
OP_EXEC_DONE = 0x9 ,
2021-11-02 05:37:31 +00:00
} ;
2022-04-26 12:26:32 +00:00
typedef struct SOperatorFpSet {
2022-07-08 09:48:34 +00:00
__optr_open_fn_t _openFn ; // DO NOT invoke this function directly
__optr_fn_t getNextFn ;
__optr_fn_t getStreamResFn ; // execute the aggregate in the stream model, todo remove it
__optr_fn_t cleanupFn ; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn ;
__optr_encode_fn_t encodeResultRow ;
__optr_decode_fn_t decodeResultRow ;
__optr_explain_fn_t getExplainFn ;
2022-04-26 12:26:32 +00:00
} SOperatorFpSet ;
2022-06-18 04:00:41 +00:00
typedef struct SExprSupp {
SExprInfo * pExprInfo ;
2022-07-08 09:48:34 +00:00
int32_t numOfExprs ; // the number of scalar expression in group operator
2022-06-18 04:00:41 +00:00
SqlFunctionCtx * pCtx ;
int32_t * rowEntryInfoOffset ; // offset value for each row result cell info
} SExprSupp ;
2021-11-02 05:37:31 +00:00
typedef struct SOperatorInfo {
2022-07-08 09:48:34 +00:00
uint8_t operatorType ;
bool blocking ; // block operator or not
uint8_t status ; // denote if current operator is completed
char * name ; // name, for debug purpose
void * info ; // extension attribution
SExprSupp exprSupp ;
SExecTaskInfo * pTaskInfo ;
SOperatorCostInfo cost ;
SResultInfo resultInfo ;
struct SOperatorInfo * * pDownstream ; // downstram pointer list
int32_t numOfDownstream ; // number of downstream. The value is always ONE expect for join operator
SOperatorFpSet fpSet ;
2021-11-02 05:37:31 +00:00
} SOperatorInfo ;
2022-03-29 12:07:38 +00:00
typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1 ,
2022-04-18 02:46:07 +00:00
EX_SOURCE_DATA_READY = 0x2 ,
2022-03-29 12:07:38 +00:00
EX_SOURCE_DATA_EXHAUSTED = 0x3 ,
} EX_SOURCE_STATUS ;
2022-02-11 06:33:57 +00:00
2022-06-18 04:00:41 +00:00
# define COL_MATCH_FROM_COL_ID 0x1
# define COL_MATCH_FROM_SLOT_ID 0x2
2022-02-11 06:33:57 +00:00
typedef struct SSourceDataInfo {
2022-07-08 09:48:34 +00:00
int32_t index ;
SRetrieveTableRsp * pRsp ;
uint64_t totalRows ;
int32_t code ;
EX_SOURCE_STATUS status ;
const char * taskId ;
2022-02-11 06:33:57 +00:00
} SSourceDataInfo ;
2022-03-11 11:29:43 +00:00
typedef struct SLoadRemoteDataInfo {
2022-04-14 14:32:49 +00:00
uint64_t totalSize ; // total load bytes from remote
uint64_t totalRows ; // total number of rows
uint64_t totalElapsed ; // total elapsed time
2022-03-11 11:29:43 +00:00
} SLoadRemoteDataInfo ;
2022-01-16 07:56:48 +00:00
typedef struct SExchangeInfo {
2022-04-14 14:32:49 +00:00
SArray * pSources ;
SArray * pSourceDataInfo ;
tsem_t ready ;
void * pTransporter ;
SSDataBlock * pResult ;
bool seqLoadData ; // sequential load data or not, false by default
int32_t current ;
2022-03-11 11:29:43 +00:00
SLoadRemoteDataInfo loadInfo ;
2022-06-15 08:26:43 +00:00
uint64_t self ;
2022-01-16 07:56:48 +00:00
} SExchangeInfo ;
2022-04-04 06:54:39 +00:00
typedef struct SColMatchInfo {
2022-05-07 03:04:38 +00:00
int32_t srcSlotId ; // source slot id
2022-04-04 06:54:39 +00:00
int32_t colId ;
int32_t targetSlotId ;
2022-07-05 15:54:51 +00:00
bool output ; // todo remove this?
2022-06-22 01:31:45 +00:00
bool reserved ;
2022-05-07 03:04:38 +00:00
int32_t matchType ; // determinate the source according to col id or slot id
2022-04-04 06:54:39 +00:00
} SColMatchInfo ;
2022-04-26 05:09:29 +00:00
typedef struct SScanInfo {
2022-06-18 04:00:41 +00:00
int32_t numOfAsc ;
int32_t numOfDesc ;
2022-04-26 05:09:29 +00:00
} SScanInfo ;
2022-06-05 07:24:57 +00:00
typedef struct SSampleExecInfo {
double sampleRatio ; // data block sample ratio, 1 by default
uint32_t seed ; // random seed value
} SSampleExecInfo ;
2022-06-30 06:41:50 +00:00
enum {
TABLE_SCAN__TABLE_ORDER = 1 ,
2022-06-30 09:25:19 +00:00
TABLE_SCAN__BLOCK_ORDER = 2 ,
2022-06-30 06:41:50 +00:00
} ;
2021-11-02 05:37:31 +00:00
typedef struct STableScanInfo {
2022-07-04 05:21:02 +00:00
STsdbReader * dataReader ;
2022-05-12 09:33:36 +00:00
SReadHandle readHandle ;
2022-05-03 15:52:17 +00:00
SFileBlockLoadRecorder readRecorder ;
2022-04-26 05:09:29 +00:00
SScanInfo scanInfo ;
2022-05-03 15:23:49 +00:00
int32_t scanTimes ;
SNode * pFilterNode ; // filter info, which is push down by optimizer
2022-06-29 08:19:19 +00:00
2022-04-12 09:55:17 +00:00
SSDataBlock * pResBlock ;
2022-03-26 07:02:29 +00:00
SArray * pColMatchInfo ;
2022-06-18 04:00:41 +00:00
SExprSupp pseudoSup ;
2022-04-26 05:09:29 +00:00
SQueryTableDataCond cond ;
2022-04-16 14:50:08 +00:00
int32_t scanFlag ; // table scan flag to denote if it is a repeat/reverse/main scan
2022-04-15 10:06:49 +00:00
int32_t dataBlockLoadFlag ;
2022-04-16 14:50:08 +00:00
SInterval interval ; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
2022-06-05 07:24:57 +00:00
SSampleExecInfo sample ; // sample execution info
2022-06-23 11:58:12 +00:00
int32_t currentGroupId ;
2022-06-30 06:41:50 +00:00
int32_t currentTable ;
2022-06-29 08:19:19 +00:00
2022-07-10 06:48:16 +00:00
#if 0
2022-06-29 08:19:19 +00:00
struct {
uint64_t uid ;
2022-06-30 06:41:50 +00:00
int64_t ts ;
} lastStatus ;
2022-07-10 06:48:16 +00:00
# endif
2022-06-30 06:41:50 +00:00
int8_t scanMode ;
2022-07-01 06:39:21 +00:00
int8_t noTable ;
2021-11-02 05:37:31 +00:00
} STableScanInfo ;
typedef struct STagScanInfo {
2022-05-05 15:47:44 +00:00
SColumnInfo * pCols ;
SSDataBlock * pRes ;
SArray * pColMatchInfo ;
int32_t curPos ;
SReadHandle readHandle ;
2022-05-26 08:05:27 +00:00
STableListInfo * pTableList ;
2021-11-02 05:37:31 +00:00
} STagScanInfo ;
2022-06-21 08:15:17 +00:00
typedef struct SLastrowScanInfo {
SSDataBlock * pRes ;
2022-06-28 07:22:32 +00:00
SArray * pTableList ;
2022-06-21 08:15:17 +00:00
SReadHandle readHandle ;
2022-06-28 15:24:20 +00:00
void * pLastrowReader ;
SArray * pColMatchInfo ;
int32_t * pSlotIds ;
2022-06-21 08:15:17 +00:00
} SLastrowScanInfo ;
2022-05-10 07:34:41 +00:00
typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1 ,
STREAM_SCAN_FROM_RES ,
STREAM_SCAN_FROM_UPDATERES ,
2022-07-06 12:18:50 +00:00
STREAM_SCAN_FROM_DATAREADER , // todo(liuyao) delete it
2022-06-25 08:12:00 +00:00
STREAM_SCAN_FROM_DATAREADER_RETRIEVE ,
2022-07-06 13:25:34 +00:00
STREAM_SCAN_FROM_DATAREADER_RANGE ,
2022-05-10 07:34:41 +00:00
} EStreamScanMode ;
2022-05-17 09:38:21 +00:00
typedef struct SCatchSupporter {
2022-07-08 09:48:34 +00:00
SHashObj * pWindowHashTable ; // quick locate the window object for each window
SDiskbasedBuf * pDataBuf ; // buffer based on blocked-wised disk file
int32_t keySize ;
int64_t * pKeyBuf ;
2022-05-17 09:38:21 +00:00
} SCatchSupporter ;
2022-05-20 11:34:39 +00:00
typedef struct SStreamAggSupporter {
2022-06-15 08:40:45 +00:00
SHashObj * pResultRows ;
SArray * pCurWins ;
int32_t valueSize ;
2022-05-20 11:34:39 +00:00
int32_t keySize ;
char * pKeyBuf ; // window key buffer
SDiskbasedBuf * pResultBuf ; // query result buffer based on blocked-wised disk file
int32_t resultRowSize ; // the result buffer size for each result row, with the meta data size for each row
2022-06-04 12:06:07 +00:00
SArray * pScanWindow ;
2022-05-20 11:34:39 +00:00
} SStreamAggSupporter ;
typedef struct SessionWindowSupporter {
SStreamAggSupporter * pStreamAggSup ;
2022-07-08 09:48:34 +00:00
int64_t gap ;
uint8_t parentType ;
2022-05-20 11:34:39 +00:00
} SessionWindowSupporter ;
2022-05-27 02:03:45 +00:00
2022-07-06 06:20:07 +00:00
typedef struct SStreamScanInfo {
2022-07-08 09:48:34 +00:00
uint64_t tableUid ; // queried super table uid
SExprInfo * pPseudoExpr ;
int32_t numOfPseudoExpr ;
int32_t primaryTsIndex ; // primary time stamp slot id
SReadHandle readHandle ;
SInterval interval ; // if the upstream is an interval operator, the interval info is also kept here.
SArray * pColMatchInfo ; //
SNode * pCondition ;
SArray * pBlockLists ; // multiple SSDatablock.
SSDataBlock * pRes ; // result SSDataBlock
SSDataBlock * pUpdateRes ; // update SSDataBlock
int32_t updateResIndex ;
int32_t blockType ; // current block type
int32_t validBlockIndex ; // Is current data has returned?
uint64_t numOfExec ; // execution times
STqReader * tqReader ;
int32_t tsArrayIndex ;
SArray * tsArray ;
uint64_t groupId ;
SUpdateInfo * pUpdateInfo ;
EStreamScanMode scanMode ;
SOperatorInfo * pStreamScanOp ;
SOperatorInfo * pTableScanOp ;
SArray * childIds ;
2022-05-20 11:34:39 +00:00
SessionWindowSupporter sessionSup ;
2022-07-08 09:48:34 +00:00
bool assignBlockUid ; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex ; // for state operator
int32_t pullDataResIndex ;
SSDataBlock * pPullDataRes ; // pull data SSDataBlock
SSDataBlock * pDeleteDataRes ; // delete data SSDataBlock
int32_t deleteDataIndex ;
2022-07-12 07:36:48 +00:00
STimeWindow updateWin ;
2022-07-07 09:16:12 +00:00
// status for tmq
2022-07-08 09:48:34 +00:00
// SSchemaWrapper schema;
2022-07-07 09:16:12 +00:00
STqOffset offset ;
2022-07-06 06:20:07 +00:00
} SStreamScanInfo ;
2022-01-20 09:10:28 +00:00
2022-03-09 06:58:15 +00:00
typedef struct SSysTableScanInfo {
2022-04-14 14:32:49 +00:00
SRetrieveMetaTableRsp * pRsp ;
SRetrieveTableReq req ;
SEpSet epSet ;
tsem_t ready ;
2022-06-10 02:29:53 +00:00
SReadHandle readHandle ;
int32_t accountId ;
2022-06-25 04:10:34 +00:00
const char * pUser ;
2022-06-10 02:29:53 +00:00
bool showRewrite ;
SNode * pCondition ; // db_name filter condition, to discard data that are not in current database
SMTbCursor * pCur ; // cursor for iterate the local table meta store.
SArray * scanCols ; // SArray<int16_t> scan column id list
SName name ;
SSDataBlock * pRes ;
int64_t numOfBlocks ; // extract basic running information.
SLoadRemoteDataInfo loadInfo ;
2022-03-09 06:58:15 +00:00
} SSysTableScanInfo ;
2022-06-10 02:29:53 +00:00
typedef struct SBlockDistInfo {
SSDataBlock * pResBlock ;
void * pHandle ;
2022-06-20 04:54:46 +00:00
SReadHandle readHandle ;
uint64_t uid ; // table uid
2022-06-10 02:29:53 +00:00
} SBlockDistInfo ;
2022-06-17 15:23:37 +00:00
// todo remove this
2021-11-02 05:37:31 +00:00
typedef struct SOptrBasicInfo {
2022-04-14 14:32:49 +00:00
SResultRowInfo resultRowInfo ;
SSDataBlock * pRes ;
2021-11-02 05:37:31 +00:00
} SOptrBasicInfo ;
2022-02-22 05:12:03 +00:00
typedef struct SAggSupporter {
2022-04-14 14:32:49 +00:00
SHashObj * pResultRowHashTable ; // quick locate the window object for each result
char * keyBuf ; // window key buffer
SDiskbasedBuf * pResultBuf ; // query result buffer based on blocked-wised disk file
2022-06-18 01:35:30 +00:00
int32_t resultRowSize ; // the result buffer size for each result row, with the meta data size for each row
2022-02-22 05:12:03 +00:00
} SAggSupporter ;
2022-04-19 02:12:30 +00:00
typedef struct STimeWindowSupp {
int8_t calTrigger ;
int64_t waterMark ;
2022-05-27 08:03:43 +00:00
TSKEY maxTs ;
2022-04-14 14:32:49 +00:00
SColumnInfoData timeWindowData ; // query time window info for scalar function execution.
2022-04-19 02:12:30 +00:00
} STimeWindowAggSupp ;
2022-05-03 07:04:34 +00:00
typedef struct SIntervalAggOperatorInfo {
2022-05-28 15:26:22 +00:00
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
2022-04-19 02:12:30 +00:00
SOptrBasicInfo binfo ; // basic info
2022-05-28 15:26:22 +00:00
SAggSupporter aggSup ; // aggregate supporter
2022-06-27 02:07:48 +00:00
SExprSupp scalarSupp ; // supporter for perform scalar function
2022-04-19 02:12:30 +00:00
SGroupResInfo groupResInfo ; // multiple results build supporter
SInterval interval ; // interval info
int32_t primaryTsIndex ; // primary time stamp slot id from result of downstream operator.
STimeWindow win ; // query time range
bool timeWindowInterpo ; // interpolation needed or not
2022-05-29 04:35:11 +00:00
SArray * pInterpCols ; // interpolation columns
2022-04-19 02:12:30 +00:00
int32_t order ; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel ; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup ;
2022-05-05 11:50:11 +00:00
bool invertible ;
2022-05-29 04:35:11 +00:00
SArray * pPrevValues ; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
2022-07-01 12:05:53 +00:00
bool ignoreExpiredData ;
2022-07-05 09:53:52 +00:00
SArray * pRecycledPages ;
SArray * pDelWins ; // SWinRes
int32_t delIndex ;
SSDataBlock * pDelRes ;
2022-07-06 06:57:53 +00:00
SNode * pCondition ;
2022-05-03 07:04:34 +00:00
} SIntervalAggOperatorInfo ;
2022-02-22 05:12:03 +00:00
2022-07-10 09:31:15 +00:00
typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo * intervalAggOperatorInfo ;
bool hasGroupId ;
uint64_t groupId ;
SSDataBlock * prefetchedBlock ;
bool inputBlocksFinished ;
SNode * pCondition ;
} SMergeAlignedIntervalAggOperatorInfo ;
2022-05-17 09:38:21 +00:00
typedef struct SStreamFinalIntervalOperatorInfo {
2022-05-28 15:26:22 +00:00
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
2022-05-17 09:38:21 +00:00
SOptrBasicInfo binfo ; // basic info
2022-05-28 15:26:22 +00:00
SAggSupporter aggSup ; // aggregate supporter
2022-07-10 08:09:54 +00:00
SExprSupp scalarSupp ; // supporter for perform scalar function
2022-05-17 09:38:21 +00:00
SGroupResInfo groupResInfo ; // multiple results build supporter
SInterval interval ; // interval info
int32_t primaryTsIndex ; // primary time stamp slot id from result of downstream operator.
int32_t order ; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup ;
2022-05-26 06:06:06 +00:00
SArray * pChildren ;
2022-06-06 05:23:04 +00:00
SSDataBlock * pUpdateRes ;
2022-06-25 08:12:00 +00:00
bool returnUpdate ;
2022-06-06 05:23:04 +00:00
SPhysiNode * pPhyNode ; // create new child
2022-06-18 11:42:03 +00:00
bool isFinal ;
2022-06-25 08:12:00 +00:00
SHashObj * pPullDataMap ;
SArray * pPullWins ; // SPullWindowInfo
int32_t pullIndex ;
SSDataBlock * pPullDataRes ;
2022-07-01 12:05:53 +00:00
bool ignoreExpiredData ;
2022-07-05 09:53:52 +00:00
SArray * pRecycledPages ;
SArray * pDelWins ; // SWinRes
int32_t delIndex ;
SSDataBlock * pDelRes ;
2022-05-17 09:38:21 +00:00
} SStreamFinalIntervalOperatorInfo ;
2022-02-22 05:12:03 +00:00
typedef struct SAggOperatorInfo {
2022-05-28 15:26:22 +00:00
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
2022-03-14 08:09:26 +00:00
SOptrBasicInfo binfo ;
SAggSupporter aggSup ;
2022-05-28 15:26:22 +00:00
2022-03-14 08:09:26 +00:00
STableQueryInfo * current ;
2022-04-16 11:10:21 +00:00
uint64_t groupId ;
2022-03-14 08:09:26 +00:00
SGroupResInfo groupResInfo ;
2022-06-18 06:49:27 +00:00
SExprSupp scalarExprSup ;
2022-07-06 05:54:11 +00:00
SNode * pCondition ;
2021-11-02 05:37:31 +00:00
} SAggOperatorInfo ;
typedef struct SProjectOperatorInfo {
2022-05-28 15:26:22 +00:00
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
2022-04-19 02:12:30 +00:00
SOptrBasicInfo binfo ;
SAggSupporter aggSup ;
2022-06-04 08:30:42 +00:00
SNode * pFilterNode ; // filter info, which is push down by optimizer
2022-04-19 02:12:30 +00:00
SSDataBlock * existDataBlock ;
SArray * pPseudoColInfo ;
SLimit limit ;
SLimit slimit ;
uint64_t groupId ;
int64_t curSOffset ;
int64_t curGroupOutput ;
int64_t curOffset ;
int64_t curOutput ;
2021-11-02 05:37:31 +00:00
} SProjectOperatorInfo ;
2022-06-04 11:19:49 +00:00
typedef struct SIndefOperatorInfo {
SOptrBasicInfo binfo ;
SAggSupporter aggSup ;
SArray * pPseudoColInfo ;
2022-06-18 04:00:41 +00:00
SExprSupp scalarSup ;
2022-07-07 07:26:29 +00:00
SNode * pCondition ;
2022-07-11 03:13:49 +00:00
uint64_t groupId ;
SSDataBlock * pNextGroupRes ;
2022-06-04 11:19:49 +00:00
} SIndefOperatorInfo ;
2021-11-02 05:37:31 +00:00
typedef struct SFillOperatorInfo {
2022-02-08 02:21:00 +00:00
struct SFillInfo * pFillInfo ;
SSDataBlock * pRes ;
int64_t totalInputRows ;
void * * p ;
SSDataBlock * existNewGroupBlock ;
bool multigroupResult ;
2022-07-06 10:08:23 +00:00
STimeWindow win ;
2022-07-07 06:30:08 +00:00
SNode * pCondition ;
2022-07-10 12:59:56 +00:00
SArray * pColMatchColInfo ;
2022-07-10 12:12:14 +00:00
int32_t primaryTsCol ;
2021-11-02 05:37:31 +00:00
} SFillOperatorInfo ;
typedef struct SGroupbyOperatorInfo {
2022-05-28 15:26:22 +00:00
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
2022-04-14 14:32:49 +00:00
SOptrBasicInfo binfo ;
2022-05-28 15:26:22 +00:00
SAggSupporter aggSup ;
2022-04-21 05:57:32 +00:00
SArray * pGroupCols ; // group by columns, SArray<SColumn>
2022-04-14 14:32:49 +00:00
SArray * pGroupColVals ; // current group column values, SArray<SGroupKeys>
SNode * pCondition ;
bool isInit ; // denote if current val is initialized or not
char * keyBuf ; // group by keys for hash
int32_t groupKeyLen ; // total group by column width
SGroupResInfo groupResInfo ;
2022-06-18 04:00:41 +00:00
SExprSupp scalarSup ;
2021-11-02 05:37:31 +00:00
} SGroupbyOperatorInfo ;
2022-04-08 02:24:35 +00:00
typedef struct SDataGroupInfo {
2022-04-19 02:12:30 +00:00
uint64_t groupId ;
int64_t numOfRows ;
SArray * pPageList ;
2022-04-08 02:24:35 +00:00
} SDataGroupInfo ;
// The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo {
2022-04-14 14:32:49 +00:00
SOptrBasicInfo binfo ;
SArray * pGroupCols ;
SArray * pGroupColVals ; // current group column values, SArray<SGroupKeys>
char * keyBuf ; // group by keys for hash
int32_t groupKeyLen ; // total group by column width
SHashObj * pGroupSet ; // quick locate the window object for each result
SDiskbasedBuf * pBuf ; // query result buffer based on blocked-wised disk file
int32_t rowCapacity ; // maximum number of rows for each buffer page
int32_t * columnOffset ; // start position for each column data
2022-06-20 12:17:56 +00:00
SArray * sortedGroupArray ; // SDataGroupInfo sorted by group id
int32_t groupIndex ; // group index
2022-04-19 02:12:30 +00:00
int32_t pageIndex ; // page index of current group
2022-06-15 02:44:36 +00:00
SSDataBlock * pUpdateRes ;
2022-06-18 04:00:41 +00:00
SExprSupp scalarSup ;
2022-04-08 02:24:35 +00:00
} SPartitionOperatorInfo ;
2021-11-02 05:37:31 +00:00
2022-04-09 07:01:28 +00:00
typedef struct SWindowRowsSup {
2022-04-14 14:32:49 +00:00
STimeWindow win ;
TSKEY prevTs ;
int32_t startRowIndex ;
int32_t numOfRows ;
2022-04-09 07:01:28 +00:00
} SWindowRowsSup ;
2022-03-12 15:40:04 +00:00
typedef struct SSessionAggOperatorInfo {
2022-05-28 15:26:22 +00:00
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
2022-04-19 02:12:30 +00:00
SOptrBasicInfo binfo ;
SAggSupporter aggSup ;
2022-05-28 15:26:22 +00:00
2022-04-19 02:12:30 +00:00
SGroupResInfo groupResInfo ;
SWindowRowsSup winSup ;
bool reptScan ; // next round scan
int64_t gap ; // session window gap
2022-05-03 07:04:34 +00:00
int32_t tsSlotId ; // primary timestamp slot id
2022-04-19 02:12:30 +00:00
STimeWindowAggSupp twAggSup ;
2022-07-08 09:53:27 +00:00
const SNode * pCondition ;
2022-03-12 15:40:04 +00:00
} SSessionAggOperatorInfo ;
2021-11-02 05:37:31 +00:00
2022-05-20 11:34:39 +00:00
typedef struct SResultWindowInfo {
SResultRowPosition pos ;
STimeWindow win ;
bool isOutput ;
2022-05-27 08:03:43 +00:00
bool isClosed ;
2022-05-20 11:34:39 +00:00
} SResultWindowInfo ;
2022-06-04 12:06:07 +00:00
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo ;
SStateKeys stateKey ;
} SStateWindowInfo ;
2022-05-20 11:34:39 +00:00
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo ;
SStreamAggSupporter streamAggSup ;
2022-07-10 08:09:54 +00:00
SExprSupp scalarSupp ; // supporter for perform scalar function
2022-05-20 11:34:39 +00:00
SGroupResInfo groupResInfo ;
int64_t gap ; // session window gap
int32_t primaryTsIndex ; // primary timestamp slot id
2022-06-18 11:42:03 +00:00
int32_t endTsIndex ; // window end timestamp slot id
2022-05-20 11:34:39 +00:00
int32_t order ; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup ;
SSDataBlock * pWinBlock ; // window result
SqlFunctionCtx * pDummyCtx ; // for combine
2022-06-18 11:42:03 +00:00
SSDataBlock * pDelRes ; // delete result
2022-07-06 12:18:50 +00:00
bool returnDelete ;
2022-06-18 11:42:03 +00:00
SSDataBlock * pUpdateRes ; // update window
2022-05-20 11:34:39 +00:00
SHashObj * pStDeleted ;
void * pDelIterator ;
2022-06-04 12:06:07 +00:00
SArray * pChildren ; // cache for children's result; final stream operator
2022-06-18 11:42:03 +00:00
SPhysiNode * pPhyNode ; // create new child
bool isFinal ;
2022-07-01 12:05:53 +00:00
bool ignoreExpiredData ;
2022-05-20 11:34:39 +00:00
} SStreamSessionAggOperatorInfo ;
2022-04-10 07:35:09 +00:00
typedef struct STimeSliceOperatorInfo {
2022-06-20 15:22:28 +00:00
SSDataBlock * pRes ;
2022-06-14 03:54:13 +00:00
STimeWindow win ;
2022-04-14 14:32:49 +00:00
SInterval interval ;
2022-06-14 03:54:13 +00:00
int64_t current ;
SArray * pPrevRow ; // SArray<SGroupValue>
2022-06-14 06:45:17 +00:00
int32_t fillType ; // fill type
2022-06-20 15:22:28 +00:00
SColumn tsCol ; // primary timestamp column
SExprSupp scalarSup ; // scalar calculation
2022-06-14 06:45:17 +00:00
struct SFillColInfo * pFillColInfo ; // fill column info
2022-04-10 07:35:09 +00:00
} STimeSliceOperatorInfo ;
2021-11-02 05:37:31 +00:00
typedef struct SStateWindowOperatorInfo {
2022-05-28 15:26:22 +00:00
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
2022-04-19 02:12:30 +00:00
SOptrBasicInfo binfo ;
SAggSupporter aggSup ;
2022-05-28 15:26:22 +00:00
2022-04-19 02:12:30 +00:00
SGroupResInfo groupResInfo ;
SWindowRowsSup winSup ;
2022-05-13 10:18:54 +00:00
SColumn stateCol ; // start row index
2022-04-19 02:12:30 +00:00
bool hasKey ;
SStateKeys stateKey ;
2022-05-03 07:33:24 +00:00
int32_t tsSlotId ; // primary timestamp column slot id
2022-04-19 02:12:30 +00:00
STimeWindowAggSupp twAggSup ;
2022-04-14 14:32:49 +00:00
// bool reptScan;
2022-07-08 09:53:27 +00:00
const SNode * pCondition ;
2021-11-02 05:37:31 +00:00
} SStateWindowOperatorInfo ;
2022-06-04 12:06:07 +00:00
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo ;
SStreamAggSupporter streamAggSup ;
2022-07-10 08:09:54 +00:00
SExprSupp scalarSupp ; // supporter for perform scalar function
2022-06-04 12:06:07 +00:00
SGroupResInfo groupResInfo ;
int32_t primaryTsIndex ; // primary timestamp slot id
int32_t order ; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup ;
2022-07-10 08:09:54 +00:00
SColumn stateCol ;
2022-06-04 12:06:07 +00:00
SqlFunctionCtx * pDummyCtx ; // for combine
SSDataBlock * pDelRes ;
SHashObj * pSeDeleted ;
void * pDelIterator ;
SArray * pScanWindow ;
SArray * pChildren ; // cache for children's result;
2022-07-01 12:05:53 +00:00
bool ignoreExpiredData ;
2022-06-04 12:06:07 +00:00
} SStreamStateAggOperatorInfo ;
2022-02-19 08:15:09 +00:00
typedef struct SSortedMergeOperatorInfo {
2022-05-28 15:26:22 +00:00
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
2022-04-14 07:33:37 +00:00
SOptrBasicInfo binfo ;
2022-05-28 15:26:22 +00:00
SAggSupporter aggSup ;
2022-04-14 07:33:37 +00:00
SArray * pSortInfo ;
int32_t numOfSources ;
SSortHandle * pSortHandle ;
int32_t bufPageSize ;
uint32_t sortBufSize ; // max buffer size for in-memory sort
int32_t resultRowFactor ;
bool hasGroupVal ;
SDiskbasedBuf * pTupleStore ; // keep the final results
int32_t numOfResPerPage ;
char * * groupVal ;
SArray * groupInfo ;
2022-02-19 08:15:09 +00:00
} SSortedMergeOperatorInfo ;
2022-02-08 10:01:21 +00:00
2022-03-28 11:08:07 +00:00
typedef struct SSortOperatorInfo {
2022-05-05 10:54:19 +00:00
SOptrBasicInfo binfo ;
2022-05-25 07:22:34 +00:00
uint32_t sortBufSize ; // max buffer size for in-memory sort
2022-04-14 14:32:49 +00:00
SArray * pSortInfo ;
SSortHandle * pSortHandle ;
2022-05-06 09:23:20 +00:00
SArray * pColMatchInfo ; // for index map from table scan output
2022-04-14 14:32:49 +00:00
int32_t bufPageSize ;
2022-02-08 10:01:21 +00:00
2022-05-25 07:22:34 +00:00
int64_t startTs ; // sort start time
uint64_t sortElapsed ; // sort elapsed time, time to flush to disk not included.
2022-07-06 06:42:56 +00:00
SNode * pCondition ;
2022-03-28 11:08:07 +00:00
} SSortOperatorInfo ;
2021-11-02 05:37:31 +00:00
2022-04-14 14:32:49 +00:00
typedef struct STagFilterOperatorInfo {
SOptrBasicInfo binfo ;
} STagFilterOperatorInfo ;
2022-04-16 02:07:28 +00:00
2022-04-14 14:12:10 +00:00
typedef struct SJoinOperatorInfo {
SSDataBlock * pRes ;
int32_t joinType ;
SSDataBlock * pLeft ;
int32_t leftPos ;
SColumnInfo leftCol ;
SSDataBlock * pRight ;
int32_t rightPos ;
SColumnInfo rightCol ;
2022-07-05 05:42:49 +00:00
SNode * pCondAfterMerge ;
2022-04-14 14:12:10 +00:00
} SJoinOperatorInfo ;
2022-05-03 06:43:53 +00:00
# define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
# define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
2022-06-15 08:26:43 +00:00
void doDestroyExchangeOperatorInfo ( void * param ) ;
2022-04-26 12:26:32 +00:00
SOperatorFpSet createOperatorFpSet ( __optr_open_fn_t openFn , __optr_fn_t nextFn , __optr_fn_t streamFn ,
__optr_fn_t cleanup , __optr_close_fn_t closeFn , __optr_encode_fn_t encode ,
2022-05-24 03:29:51 +00:00
__optr_decode_fn_t decode , __optr_explain_fn_t explain ) ;
2022-04-26 12:26:32 +00:00
2022-04-04 06:54:39 +00:00
int32_t operatorDummyOpenFn ( SOperatorInfo * pOperator ) ;
2022-04-14 14:32:49 +00:00
void operatorDummyCloseFn ( void * param , int32_t numOfCols ) ;
2022-04-04 06:54:39 +00:00
int32_t appendDownstream ( SOperatorInfo * p , SOperatorInfo * * pDownstream , int32_t num ) ;
2022-06-18 06:49:27 +00:00
void initBasicInfo ( SOptrBasicInfo * pInfo , SSDataBlock * pBlock ) ;
void cleanupBasicInfo ( SOptrBasicInfo * pInfo ) ;
2022-06-20 15:22:28 +00:00
int32_t initExprSupp ( SExprSupp * pSup , SExprInfo * pExprInfo , int32_t numOfExpr ) ;
2022-06-29 08:19:19 +00:00
void cleanupExprSupp ( SExprSupp * pSup ) ;
2022-06-18 06:49:27 +00:00
int32_t initAggInfo ( SExprSupp * pSup , SAggSupporter * pAggSup , SExprInfo * pExprInfo , int32_t numOfCols , size_t keyBufSize ,
const char * pkey ) ;
2022-04-18 10:47:59 +00:00
void initResultSizeInfo ( SOperatorInfo * pOperator , int32_t numOfRows ) ;
2022-05-13 10:18:54 +00:00
void doBuildResultDatablock ( SOperatorInfo * pOperator , SOptrBasicInfo * pbInfo , SGroupResInfo * pGroupResInfo , SDiskbasedBuf * pBuf ) ;
2022-05-03 07:04:34 +00:00
2022-05-12 01:57:43 +00:00
void doApplyFunctions ( SExecTaskInfo * taskInfo , SqlFunctionCtx * pCtx , STimeWindow * pWin , SColumnInfoData * pTimeWindowData , int32_t offset ,
2022-04-14 14:32:49 +00:00
int32_t forwardStep , TSKEY * tsCol , int32_t numOfTotal , int32_t numOfOutput , int32_t order ) ;
2022-06-17 11:01:45 +00:00
int32_t extractDataBlockFromFetchRsp ( SSDataBlock * pRes , SLoadRemoteDataInfo * pLoadInfo , int32_t numOfRows , char * pData ,
2022-04-14 14:32:49 +00:00
int32_t compLen , int32_t numOfOutput , int64_t startTs , uint64_t * total ,
SArray * pColList ) ;
2022-04-20 06:59:06 +00:00
void getAlignQueryTimeWindow ( SInterval * pInterval , int32_t precision , int64_t key , STimeWindow * win ) ;
2022-07-11 07:49:13 +00:00
STimeWindow getFirstQualifiedTimeWindow ( int64_t ts , STimeWindow * pWindow , SInterval * pInterval , int32_t order ) ;
2022-05-12 09:33:36 +00:00
int32_t getTableScanInfo ( SOperatorInfo * pOperator , int32_t * order , int32_t * scanFlag ) ;
2022-05-26 09:48:07 +00:00
int32_t getBufferPgSize ( int32_t rowSize , uint32_t * defaultPgsz , uint32_t * defaultBufsz ) ;
2022-04-16 14:50:08 +00:00
2022-04-14 14:32:49 +00:00
void doSetOperatorCompleted ( SOperatorInfo * pOperator ) ;
2022-06-11 06:51:54 +00:00
void doFilter ( const SNode * pFilterNode , SSDataBlock * pBlock ) ;
2022-06-17 11:01:45 +00:00
2022-05-03 06:43:53 +00:00
void cleanupAggSup ( SAggSupporter * pAggSup ) ;
2022-05-05 10:54:19 +00:00
void destroyBasicOperatorInfo ( void * param , int32_t numOfOutput ) ;
void appendOneRowToDataBlock ( SSDataBlock * pBlock , STupleHandle * pTupleHandle ) ;
2022-05-16 09:16:20 +00:00
void setTbNameColData ( void * pMeta , const SSDataBlock * pBlock , SColumnInfoData * pColInfoData , int32_t functionId ) ;
2022-05-05 10:54:19 +00:00
2022-06-30 06:41:50 +00:00
int32_t doPrepareScan ( SOperatorInfo * pOperator , uint64_t uid , int64_t ts ) ;
2022-06-29 08:19:19 +00:00
int32_t doGetScanStatus ( SOperatorInfo * pOperator , uint64_t * uid , int64_t * ts ) ;
2022-06-17 15:23:37 +00:00
2022-05-05 10:54:19 +00:00
SSDataBlock * loadNextDataBlock ( void * param ) ;
2022-05-03 06:43:53 +00:00
2022-06-17 15:23:37 +00:00
void setResultRowInitCtx ( SResultRow * pResult , SqlFunctionCtx * pCtx , int32_t numOfOutput , int32_t * rowEntryInfoOffset ) ;
2022-05-03 06:43:53 +00:00
SResultRow * doSetResultOutBufByKey ( SDiskbasedBuf * pResultBuf , SResultRowInfo * pResultRowInfo ,
char * pData , int16_t bytes , bool masterscan , uint64_t groupId ,
SExecTaskInfo * pTaskInfo , bool isIntervalQuery , SAggSupporter * pSup ) ;
2022-04-04 06:54:39 +00:00
2022-06-05 06:48:15 +00:00
SOperatorInfo * createExchangeOperatorInfo ( void * pTransporter , SExchangePhysiNode * pExNode , SExecTaskInfo * pTaskInfo ) ;
2022-06-28 07:22:32 +00:00
SOperatorInfo * createTableScanOperatorInfo ( STableScanPhysiNode * pTableScanNode , SReadHandle * pHandle , SExecTaskInfo * pTaskInfo ) ;
2022-06-08 08:13:52 +00:00
SOperatorInfo * createTagScanOperatorInfo ( SReadHandle * pReadHandle , STagScanPhysiNode * pPhyNode ,
STableListInfo * pTableListInfo , SExecTaskInfo * pTaskInfo ) ;
2022-06-25 04:10:34 +00:00
SOperatorInfo * createSysTableScanOperatorInfo ( void * readHandle , SSystemTableScanPhysiNode * pScanPhyNode , const char * pUser , SExecTaskInfo * pTaskInfo ) ;
2022-04-26 05:09:29 +00:00
2022-07-06 05:54:11 +00:00
SOperatorInfo * createAggregateOperatorInfo ( SOperatorInfo * downstream , SExprInfo * pExprInfo , int32_t numOfCols , SSDataBlock * pResultBlock , SNode * pCondition , SExprInfo * pScalarExprInfo ,
2022-05-26 08:05:27 +00:00
int32_t numOfScalarExpr , SExecTaskInfo * pTaskInfo ) ;
2022-04-09 02:18:52 +00:00
2022-06-04 11:19:49 +00:00
SOperatorInfo * createIndefinitOutputOperatorInfo ( SOperatorInfo * downstream , SPhysiNode * pNode , SExecTaskInfo * pTaskInfo ) ;
2022-06-17 11:01:45 +00:00
SOperatorInfo * createProjectOperatorInfo ( SOperatorInfo * downstream , SProjectPhysiNode * pProjPhyNode , SExecTaskInfo * pTaskInfo ) ;
SOperatorInfo * createSortOperatorInfo ( SOperatorInfo * downstream , SSortPhysiNode * pSortPhyNode , SExecTaskInfo * pTaskInfo ) ;
2022-06-27 09:51:50 +00:00
SOperatorInfo * createMultiwayMergeOperatorInfo ( SOperatorInfo * * dowStreams , size_t numStreams , SMergePhysiNode * pMergePhysiNode , SExecTaskInfo * pTaskInfo ) ;
2022-03-28 11:08:07 +00:00
SOperatorInfo * createSortedMergeOperatorInfo ( SOperatorInfo * * downstream , int32_t numOfDownstream , SExprInfo * pExprInfo , int32_t num , SArray * pSortInfo , SArray * pGroupInfo , SExecTaskInfo * pTaskInfo ) ;
2022-06-28 07:22:32 +00:00
SOperatorInfo * createLastrowScanOperator ( SLastRowScanPhysiNode * pTableScanNode , SReadHandle * readHandle ,
SArray * pTableList , SExecTaskInfo * pTaskInfo ) ;
2022-04-15 13:42:46 +00:00
2022-04-14 14:32:49 +00:00
SOperatorInfo * createIntervalOperatorInfo ( SOperatorInfo * downstream , SExprInfo * pExprInfo , int32_t numOfCols ,
2022-04-19 02:12:30 +00:00
SSDataBlock * pResBlock , SInterval * pInterval , int32_t primaryTsSlotId ,
2022-06-27 02:07:48 +00:00
STimeWindowAggSupp * pTwAggSupp , SIntervalPhysiNode * pPhyNode , SExecTaskInfo * pTaskInfo , bool isStream ) ;
2022-06-10 03:20:00 +00:00
2022-06-22 05:08:20 +00:00
SOperatorInfo * createMergeIntervalOperatorInfo ( SOperatorInfo * downstream , SExprInfo * pExprInfo , int32_t numOfCols ,
SSDataBlock * pResBlock , SInterval * pInterval , int32_t primaryTsSlotId ,
SExecTaskInfo * pTaskInfo ) ;
2022-06-22 02:53:22 +00:00
SOperatorInfo * createMergeAlignedIntervalOperatorInfo ( SOperatorInfo * downstream , SExprInfo * pExprInfo , int32_t numOfCols ,
2022-06-10 03:09:31 +00:00
SSDataBlock * pResBlock , SInterval * pInterval , int32_t primaryTsSlotId ,
2022-07-08 08:20:02 +00:00
SNode * pCondition , SExecTaskInfo * pTaskInfo ) ;
2022-06-10 03:20:00 +00:00
2022-06-06 05:23:04 +00:00
SOperatorInfo * createStreamFinalIntervalOperatorInfo ( SOperatorInfo * downstream ,
SPhysiNode * pPhyNode , SExecTaskInfo * pTaskInfo , int32_t numOfChild ) ;
2022-04-26 12:26:32 +00:00
SOperatorInfo * createStreamIntervalOperatorInfo ( SOperatorInfo * downstream , SExprInfo * pExprInfo , int32_t numOfCols ,
SSDataBlock * pResBlock , SInterval * pInterval , int32_t primaryTsSlotId ,
2022-05-26 08:05:27 +00:00
STimeWindowAggSupp * pTwAggSupp , SExecTaskInfo * pTaskInfo ) ;
2022-04-14 14:32:49 +00:00
SOperatorInfo * createSessionAggOperatorInfo ( SOperatorInfo * downstream , SExprInfo * pExprInfo , int32_t numOfCols ,
2022-05-03 07:04:34 +00:00
SSDataBlock * pResBlock , int64_t gap , int32_t tsSlotId , STimeWindowAggSupp * pTwAggSupp ,
2022-07-07 01:29:51 +00:00
SNode * pCondition , SExecTaskInfo * pTaskInfo ) ;
2022-04-14 14:32:49 +00:00
SOperatorInfo * createGroupOperatorInfo ( SOperatorInfo * downstream , SExprInfo * pExprInfo , int32_t numOfCols ,
SSDataBlock * pResultBlock , SArray * pGroupColList , SNode * pCondition ,
2022-05-26 08:05:27 +00:00
SExprInfo * pScalarExprInfo , int32_t numOfScalarExpr , SExecTaskInfo * pTaskInfo ) ;
2022-06-20 05:42:59 +00:00
SOperatorInfo * createDataBlockInfoScanOperator ( void * dataReader , SReadHandle * readHandle , uint64_t uid , SBlockDistScanPhysiNode * pBlockScanNode ,
SExecTaskInfo * pTaskInfo ) ;
2022-05-28 09:28:47 +00:00
2022-06-23 11:58:12 +00:00
SOperatorInfo * createStreamScanOperatorInfo ( SReadHandle * pHandle ,
STableScanPhysiNode * pTableScanNode , SExecTaskInfo * pTaskInfo , STimeWindowAggSupp * pTwSup , uint64_t queryId , uint64_t taskId ) ;
2022-05-28 09:28:47 +00:00
2022-06-17 07:48:32 +00:00
SOperatorInfo * createFillOperatorInfo ( SOperatorInfo * downstream , SFillPhysiNode * pPhyFillNode , bool multigroupResult ,
SExecTaskInfo * pTaskInfo ) ;
2022-04-02 07:08:48 +00:00
2022-04-14 14:32:49 +00:00
SOperatorInfo * createStatewindowOperatorInfo ( SOperatorInfo * downstream , SExprInfo * pExpr , int32_t numOfCols ,
2022-07-07 01:29:51 +00:00
SSDataBlock * pResBlock , STimeWindowAggSupp * pTwAggSupp , int32_t tsSlotId ,
SColumn * pStateKeyCol , SNode * pCondition , SExecTaskInfo * pTaskInfo ) ;
2022-02-24 09:18:56 +00:00
2022-06-15 15:00:31 +00:00
SOperatorInfo * createPartitionOperatorInfo ( SOperatorInfo * downstream , SPartitionPhysiNode * pPartNode , SExecTaskInfo * pTaskInfo ) ;
2022-04-10 07:35:09 +00:00
2022-06-20 15:22:28 +00:00
SOperatorInfo * createTimeSliceOperatorInfo ( SOperatorInfo * downstream , SPhysiNode * pNode , /*SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock * pResultBlock , const SNodeListNode * pValNode , */ SExecTaskInfo * pTaskInfo ) ;
2022-04-14 14:12:10 +00:00
2022-06-17 11:01:45 +00:00
SOperatorInfo * createMergeJoinOperatorInfo ( SOperatorInfo * * pDownstream , int32_t numOfDownstream , SJoinPhysiNode * pJoinNode ,
SExecTaskInfo * pTaskInfo ) ;
2022-07-08 09:48:34 +00:00
2022-06-17 06:13:45 +00:00
SOperatorInfo * createStreamSessionAggOperatorInfo ( SOperatorInfo * downstream ,
SPhysiNode * pPhyNode , SExecTaskInfo * pTaskInfo ) ;
SOperatorInfo * createStreamFinalSessionAggOperatorInfo ( SOperatorInfo * downstream ,
SPhysiNode * pPhyNode , SExecTaskInfo * pTaskInfo , int32_t numOfChild ) ;
2022-06-04 12:06:07 +00:00
2022-06-15 15:00:31 +00:00
SOperatorInfo * createStreamStateAggOperatorInfo ( SOperatorInfo * downstream , SPhysiNode * pPhyNode , SExecTaskInfo * pTaskInfo ) ;
2022-06-04 12:06:07 +00:00
2022-04-07 05:57:47 +00:00
#if 0
2022-03-12 15:40:04 +00:00
SOperatorInfo * createTableSeqScanOperatorInfo ( void * pTsdbReadHandle , STaskRuntimeEnv * pRuntimeEnv ) ;
2022-04-07 05:57:47 +00:00
# endif
2021-11-02 05:37:31 +00:00
2022-04-28 10:08:56 +00:00
int32_t projectApplyFunctions ( SExprInfo * pExpr , SSDataBlock * pResult , SSDataBlock * pSrcBlock , SqlFunctionCtx * pCtx ,
2022-04-14 14:32:49 +00:00
int32_t numOfOutput , SArray * pPseudoList ) ;
2022-04-12 09:55:17 +00:00
2022-05-12 09:33:36 +00:00
void setInputDataBlock ( SOperatorInfo * pOperator , SqlFunctionCtx * pCtx , SSDataBlock * pBlock , int32_t order , int32_t scanFlag , bool createDummyCol ) ;
2022-03-21 06:00:30 +00:00
2022-02-08 02:21:00 +00:00
bool isTaskKilled ( SExecTaskInfo * pTaskInfo ) ;
2021-11-02 05:37:31 +00:00
int32_t checkForQueryBuf ( size_t numOfTables ) ;
2022-06-17 11:01:45 +00:00
void setTaskKilled ( SExecTaskInfo * pTaskInfo ) ;
void queryCostStatis ( SExecTaskInfo * pTaskInfo ) ;
2021-11-02 05:37:31 +00:00
2022-04-14 14:32:49 +00:00
void doDestroyTask ( SExecTaskInfo * pTaskInfo ) ;
2021-11-02 05:37:31 +00:00
int32_t getMaximumIdleDurationSec ( ) ;
2022-05-28 15:26:22 +00:00
/*
* ops : root operator
* data : * data save the result of encode , need to be freed by caller
* length : * length save the length of * data
2022-06-26 10:44:49 +00:00
* nOptrWithVal : * nOptrWithVal save the number of optr with value
2022-05-28 15:26:22 +00:00
* return : result code , 0 means success
*/
2022-06-27 06:47:14 +00:00
int32_t encodeOperator ( SOperatorInfo * ops , char * * data , int32_t * length , int32_t * nOptrWithVal ) ;
2022-05-28 15:26:22 +00:00
/*
* ops : root operator , created by caller
* data : save the result of decode
* length : the length of data
* return : result code , 0 means success
*/
2022-06-17 12:07:55 +00:00
int32_t decodeOperator ( SOperatorInfo * ops , const char * data , int32_t length ) ;
2022-05-27 15:21:41 +00:00
2022-02-08 02:21:00 +00:00
void setTaskStatus ( SExecTaskInfo * pTaskInfo , int8_t status ) ;
2022-04-14 14:32:49 +00:00
int32_t createExecTaskInfoImpl ( SSubplan * pPlan , SExecTaskInfo * * pTaskInfo , SReadHandle * pHandle , uint64_t taskId ,
2022-06-16 03:43:11 +00:00
const char * sql , EOPTR_EXEC_MODEL model ) ;
2022-07-06 08:29:51 +00:00
int32_t createDataSinkParam ( SDataSinkNode * pNode , void * * pParam , qTaskInfo_t * pTaskInfo , SReadHandle * readHandle ) ;
2022-04-14 14:32:49 +00:00
int32_t getOperatorExplainExecInfo ( SOperatorInfo * operatorInfo , SExplainExecInfo * * pRes , int32_t * capacity ,
int32_t * resNum ) ;
2022-05-30 04:00:51 +00:00
int32_t aggDecodeResultRow ( SOperatorInfo * pOperator , char * result ) ;
2022-05-28 15:26:22 +00:00
int32_t aggEncodeResultRow ( SOperatorInfo * pOperator , char * * result , int32_t * length ) ;
2022-06-05 07:24:57 +00:00
STimeWindow getActiveTimeWindow ( SDiskbasedBuf * pBuf , SResultRowInfo * pResultRowInfo , int64_t ts , SInterval * pInterval ,
2022-07-10 01:40:35 +00:00
int32_t order ) ;
2022-06-05 07:24:57 +00:00
int32_t getNumOfRowsInTimeWindow ( SDataBlockInfo * pDataBlockInfo , TSKEY * pPrimaryColumn , int32_t startPos , TSKEY ekey ,
__block_search_fn_t searchFn , STableQueryInfo * item , int32_t order ) ;
2022-05-10 07:34:41 +00:00
int32_t binarySearchForKey ( char * pValue , int num , TSKEY key , int order ) ;
2022-06-17 15:23:37 +00:00
int32_t initStreamAggSupporter ( SStreamAggSupporter * pSup , const char * pKey , SqlFunctionCtx * pCtx , int32_t numOfOutput ,
int32_t size ) ;
2022-05-29 04:35:11 +00:00
SResultRow * getNewResultRow ( SDiskbasedBuf * pResultBuf , int64_t tableGroupId , int32_t interBufSize ) ;
2022-06-18 11:42:03 +00:00
SResultWindowInfo * getSessionTimeWindow ( SStreamAggSupporter * pAggSup , TSKEY startTs ,
TSKEY endTs , uint64_t groupId , int64_t gap , int32_t * pIndex ) ;
2022-07-06 12:18:50 +00:00
SResultWindowInfo * getCurSessionWindow ( SStreamAggSupporter * pAggSup , TSKEY startTs ,
TSKEY endTs , uint64_t groupId , int64_t gap , int32_t * pIndex ) ;
bool isInTimeWindow ( STimeWindow * pWin , TSKEY ts , int64_t gap ) ;
2022-06-18 11:42:03 +00:00
int32_t updateSessionWindowInfo ( SResultWindowInfo * pWinInfo , TSKEY * pStartTs ,
TSKEY * pEndTs , int32_t rows , int32_t start , int64_t gap , SHashObj * pStDeleted ) ;
2022-05-20 11:34:39 +00:00
bool functionNeedToExecute ( SqlFunctionCtx * pCtx ) ;
2022-05-27 09:57:27 +00:00
2022-06-10 01:55:14 +00:00
int32_t finalizeResultRowIntoResultDataBlock ( SDiskbasedBuf * pBuf , SResultRowPosition * resultRowPosition ,
2022-06-09 11:21:52 +00:00
SqlFunctionCtx * pCtx , SExprInfo * pExprInfo , int32_t numOfExprs , const int32_t * rowCellOffset ,
SSDataBlock * pBlock , SExecTaskInfo * pTaskInfo ) ;
2022-06-05 07:24:57 +00:00
2022-06-22 10:51:16 +00:00
int32_t createScanTableListInfo ( STableScanPhysiNode * pTableScanNode , SReadHandle * pHandle ,
2022-06-23 02:21:04 +00:00
STableListInfo * pTableListInfo , uint64_t queryId , uint64_t taskId ) ;
2022-06-22 15:01:55 +00:00
SOperatorInfo * createGroupSortOperatorInfo ( SOperatorInfo * downstream , SGroupSortPhysiNode * pSortPhyNode ,
SExecTaskInfo * pTaskInfo ) ;
2022-06-22 12:26:44 +00:00
SOperatorInfo * createTableMergeScanOperatorInfo ( STableScanPhysiNode * pTableScanNode , STableListInfo * pTableListInfo ,
SReadHandle * readHandle , SExecTaskInfo * pTaskInfo , uint64_t queryId , uint64_t taskId ) ;
2022-06-15 08:43:34 +00:00
2022-06-15 02:44:36 +00:00
void copyUpdateDataBlock ( SSDataBlock * pDest , SSDataBlock * pSource , int32_t tsColIndex ) ;
2022-06-05 07:24:57 +00:00
2022-06-23 11:58:12 +00:00
int32_t generateGroupIdMap ( STableListInfo * pTableListInfo , SReadHandle * pHandle , SNodeList * groupKey ) ;
2022-06-25 08:12:00 +00:00
SSDataBlock * createPullDataBlock ( ) ;
2022-06-22 10:51:16 +00:00
2022-02-08 02:21:00 +00:00
# ifdef __cplusplus
}
# endif
2021-11-02 05:37:31 +00:00
# endif // TDENGINE_EXECUTORIMPL_H