/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #ifndef TDENGINE_EXECUTIL_H #define TDENGINE_EXECUTIL_H #include "executor.h" #include "function.h" #include "nodes.h" #include "plannodes.h" #include "query.h" #include "storageapi.h" #include "tcommon.h" #include "tpagedbuf.h" #include "tsimplehash.h" #include "tjson.h" #define T_LONG_JMP(_obj, _c) \ do { \ qError("error happens at %s, line:%d, code:%s", __func__, __LINE__, tstrerror((_c))); \ longjmp((_obj), (_c)); \ } while (0) #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ do { \ *(uint64_t*)(_k) = (_uid); \ (void)memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ } while (0) #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) struct SGroupResInfo { int32_t index; // rows consumed in func:doCopyToSDataBlockXX int32_t iter; // relate to index-1, last consumed data's slot id in hash table void* dataPos; // relate to index-1, last consumed data's position, in the nodelist of cur slot int32_t delIndex; // rows consumed in func:doBuildDeleteDataBlock SArray* pRows; // SArray char* pBuf; bool freeItem; }; struct SResultRow { int32_t version; int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer int32_t offset : 29; // row index in buffer page bool startInterp; // the time window start timestamp has done the interpolation already. bool endInterp; // the time window end timestamp has done the interpolation already. bool closed; // this result status: closed or opened uint32_t numOfRows; // number of rows of current time window STimeWindow win; int32_t winIdx; struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo }; typedef struct SResultRowPosition { int32_t pageId; int32_t offset; } SResultRowPosition; typedef struct SResKeyPos { SResultRowPosition pos; uint64_t groupId; char key[]; } SResKeyPos; struct SResultRowInfo { int32_t size; // number of result set SResultRowPosition cur; SList* openWindow; }; typedef struct SColMatchItem { int32_t colId; int32_t srcSlotId; int32_t dstSlotId; bool needOutput; SDataType dataType; int32_t funcType; bool isPk; } SColMatchItem; typedef struct SColMatchInfo { SArray* pList; // SArray int32_t matchType; // determinate the source according to col id or slot id } SColMatchInfo; typedef struct STableListIdInfo { uint64_t suid; uint64_t uid; int32_t tableType; } STableListIdInfo; // If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly // The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups typedef struct STableListInfo { bool oneTableForEachGroup; int32_t numOfOuputGroups; // the data block will be generated one by one int32_t* groupOffset; // keep the offset value for each group in the tableList SArray* pTableList; SHashObj* map; // speedup acquire the tableQueryInfo by table uid SHashObj* remainGroups; // remaining group has not yet processed the empty group STableListIdInfo idInfo; // this maybe the super table or ordinary table } STableListInfo; typedef struct SDBVgInfoReq { tsem_t ready; SUseDbRsp* pRsp; } SDBVgInfoReq; typedef struct SDBVgInfoMgr { SHashObj* dbVgInfoMap; SRWLatch lock; } SDBVgInfoMgr; struct SqlFunctionCtx; typedef struct SInsertTableInfo { int64_t uid; int64_t vgid; int32_t version; STSchema* pSchema; char* tbname; } SInsertTableInfo; int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo, SHashObj* groupIdMap); STableListInfo* tableListCreate(); void tableListDestroy(STableListInfo* pTableListInfo); void tableListClear(STableListInfo* pTableListInfo); int32_t tableListGetOutputGroups(const STableListInfo* pTableList); bool oneTableForEachGroup(const STableListInfo* pTableList); uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); int32_t sortTableGroup(STableListInfo* pTableListInfo); int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); int32_t tableListGetSize(const STableListInfo* pTableList, int32_t* pRes); uint64_t tableListGetSuid(const STableListInfo* pTableList); STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex); void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type); int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SNode* pTagCond, void* pVnode, SIdxFltStatus status, SStorageAPI* pAPI, bool addUid, bool* listAdded, void* pStreamInfo); int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SScanPhysiNode* pScanNode, SNodeList* group, bool groupSort, uint8_t* digest, SStorageAPI* pAPI, SHashObj* groupIdMap); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); void closeResultRow(SResultRow* pResultRow); void resetResultRow(SResultRow* pResultRow, size_t entrySize); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset); static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) { SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId); if (!bufPage) { uFatal("failed to get the buffer page:%d since %s", pos->pageId, terrstr()); return NULL; } if (forUpdate) { setBufPageDirty(bufPage, true); } SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); return pRow; } int32_t getResultRowFromBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); int32_t putResultRowToBuf(struct SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize); int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t prepareDataBlockBuf(SSDataBlock* pDataBlock, SColMatchInfo* pMatchInfo); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); int32_t getGroupIdFromTagsVal(void* pVnode, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId, SStorageAPI* pAPI); size_t getTableTagsBufLen(const SNodeList* pGroups); SArray* createSortInfo(SNodeList* pNodeList); SArray* makeColumnArrayFromList(SNodeList* pNodeList); int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type, SColMatchInfo* pMatchInfo); int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId); int32_t createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode); void destroySqlFunctionCtx(SqlFunctionCtx* pCtx, SExprInfo* pExpr, int32_t numOfOutput); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset, SFunctionStateStore* pStore); int32_t relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols, bool outputEveryColumn); int32_t initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SColumn extractColumnFromColumnNode(SColumnNode* pColNode); int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode, const SReadHandle* readHandle); int32_t initQueryTableDataCondWithColArray(SQueryTableDataCond* pCond, SQueryTableDataCond* pOrgCond, const SReadHandle* readHandle, SArray* colArray); int32_t convertFillType(int32_t mode); int32_t resultrowComparAsc(const void* p1, const void* p2); int32_t isQualifiedTable(int64_t uid, SNode* pTagCond, void* vnode, bool* pQualified, SStorageAPI* pAPI); char* getStreamOpName(uint16_t opType); void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr); TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols); void updateTimeWindowInfo(SColumnInfoData* pColData, const STimeWindow* pWin, int64_t delta); SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode, SStorageAPI* pStorageAPI); /** * @brief build a tuple into keyBuf * @param [out] keyBuf the output buf * @param [in] pSortGroupCols the cols to build * @param [in] pBlock block the tuple in */ int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex); int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pDataBlock, int32_t rowIndex); uint64_t calcGroupId(char* pData, int32_t len); SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys); int32_t extractKeysLen(const SArray* keys, int32_t* pLen); int32_t getDbVgInfoForExec(void* clientRpc, const char* dbFName, const char* tbName, SVgroupInfo* pVgInfo); void rmDbVgInfoFromCache(const char* dbFName); int32_t doDropStreamTable(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq); int32_t doDropStreamTableByTbName(SMsgCb* pMsgCb, void* pOutput, SSTriggerDropRequest* pReq, char* tbName); int32_t parseErrorMsgFromAnalyticServer(SJson* pJson, const char* pId); #endif // TDENGINE_EXECUTIL_H