/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #ifndef _TD_EXECUTOR_H_ #define _TD_EXECUTOR_H_ #ifdef __cplusplus extern "C" { #endif #include "query.h" #include "storageapi.h" #include "tcommon.h" #include "tmsgcb.h" #include "storageapi.h" #include "functionMgt.h" typedef void* qTaskInfo_t; typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; typedef int32_t (*localFetchFp)(void*, uint64_t, uint64_t, uint64_t, uint64_t, int64_t, int32_t, void**, SArray*); typedef struct { void* handle; bool localExec; localFetchFp fp; SArray* explainRes; } SLocalFetch; typedef struct { void* tqReader; // todo remove it void* vnode; void* mnd; SMsgCb* pMsgCb; int64_t version; uint64_t checkpointId; bool initTableReader; bool initTqReader; bool skipRollup; int32_t numOfVgroups; void* sContext; // SSnapContext* void* pStateBackend; void* pOtherBackend; int8_t fillHistory; STimeWindow winRange; struct SStorageAPI api; void* pWorkerCb; bool localExec; } SReadHandle; typedef struct { SStreamRuntimeFuncInfo funcInfo; int32_t execId; bool resetFlag; const SArray* pForceOutputCols; } SStreamRuntimeInfo; // in queue mode, data streams are seperated by msg typedef enum { OPTR_EXEC_MODEL_BATCH = 0x1, OPTR_EXEC_MODEL_STREAM = 0x2, OPTR_EXEC_MODEL_QUEUE = 0x3, } EOPTR_EXEC_MODEL; /** * Create the exec task for stream mode * @param pMsg * @param SReadHandle * @param vgId * @return */ int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId); /** * Create the exec task for queue mode * @param pMsg * @param SReadHandle * @return */ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id); int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList, void* pTaskInfo); /** * set the task Id, usually used by message queue process * @param tinfo * @param taskId * @param queryId */ int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); /** * Set block for sma * @param tinfo * @param pBlocks * @param numOfInputBlock * @param type * @return */ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type); /** * Update the table id list, add or remove. * * @param tinfo * @param id * @param isAdd * @return */ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd); bool qIsDynamicExecTask(qTaskInfo_t tinfo); void qDestroyOperatorParam(SOperatorParam* pParam); void qUpdateOperatorParam(qTaskInfo_t tinfo, void* pParam); /** * Create the exec task object according to task json * @param readHandle * @param vgId * @param pTaskInfoMsg * @param pTaskInfo * @param qId * @return */ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, int8_t compressResult, char* sql, EOPTR_EXEC_MODEL model); /** * * @param tinfo * @param sversion * @param tversion * @return */ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, int32_t dbNameBuffLen, char* tableName, int32_t tbaleNameBuffLen, int32_t* sversion, int32_t* tversion, int32_t idx, bool* tbGet); /** * The main task execution function, including query on both table and multiple tables, * which are decided according to the tag or table name query conditions * * @param tinfo * @param handle * @return */ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bool* hasMore, SLocalFetch* pLocal, bool processOneBlock); int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); int32_t qExecutorInit(void); void qResetTaskCode(qTaskInfo_t tinfo); void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); /** * kill the ongoing query asynchronously * @param tinfo qhandle * @return */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration); bool qTaskIsExecuting(qTaskInfo_t qinfo); /** * destroy query info structure * @param qHandle */ void qDestroyTask(qTaskInfo_t tinfo); void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList); TSKEY getNextTimeWindowStart(const SInterval* pInterval, TSKEY start, int32_t order); void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order); void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery); STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key); SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); void qStreamSetOpen(qTaskInfo_t tinfo); void qStreamSetSourceExcluded(qTaskInfo_t tinfo, int8_t sourceExcluded); int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset); SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); const char* qExtractTbnameFromTask(qTaskInfo_t tinfo); void* qExtractReaderFromStreamScanner(void* scanner); void qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); bool qStreamScanhistoryFinished(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval, STimeWindow* pLastWindow, TSKEY* pRecInteral); int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); int32_t streamCollectExprsForReplace(qTaskInfo_t tInfo, SArray* pExprs); int32_t streamClearStatesForOperators(qTaskInfo_t tInfo); int32_t streamExecuteTask(qTaskInfo_t tInfo, SSDataBlock** ppBlock, uint64_t* ts); void streamDestroyExecTask(qTaskInfo_t tInfo); int32_t qStreamCreateTableListForReader(void* pVnode, uint64_t suid, uint64_t uid, int8_t tableType, SNodeList* pGroupTags, bool groupSort, SNode* pTagCond, SNode* pTagIndexCond, SStorageAPI* storageAPI, void** pTableListInfo); int32_t qStreamGetTableList(void* pTableListInfo, int32_t currentGroupId, STableKeyInfo** pKeyInfo, int32_t* size); uint64_t qStreamGetGroupId(void* pTableListInfo, int64_t uid); void qStreamDestroyTableList(void* pTableListInfo); int32_t qStreamGetTableListGroupNum(const void* pTableList); int32_t qStreamGetGroupIndex(void* pTableListInfo, int64_t gid); int32_t qStreamFilter(SSDataBlock* pBlock, void* pFilterInfo); bool qStreamUidInTableList(void* pTableListInfo, uint64_t uid); void initStorageAPI(SStorageAPI* pAPI); int32_t streamCalcOutputTbName(SNode *pExpr, char *tbname, const SStreamRuntimeFuncInfo *pPartColVals); void streamSetTaskRuntimeInfo(qTaskInfo_t tinfo, SStreamRuntimeInfo* pRuntimeInfo); typedef void (*getMnodeEpset_f)(void *pDnode, SEpSet *pEpset); typedef int32_t (*getDnodeId_f)(void *pData); typedef void (*taskUndeplyCallback)(void*); typedef struct SGlobalExecInfo { void* dnode; int32_t dnodeId; int32_t snodeId; getMnodeEpset_f getMnode; getDnodeId_f getDnodeId; } SGlobalExecInfo; extern SGlobalExecInfo gExecInfo; void gExecInfoInit(void* pDnode, getDnodeId_f getDnodeId, getMnodeEpset_f getMnode); int32_t getCurrentMnodeEpset(SEpSet* pEpSet); #ifdef __cplusplus } #endif #endif /*_TD_EXECUTOR_H_*/