mirror of
https://github.com/taosdata/TDengine
synced 2026-05-24 10:09:01 +00:00
* opt stream build twa result * opt force window close memory * feat(stream):optimize new interval and scan operator * adj log * opt code * opt code * fill history * fix issue for fill history * add ci * feat(stream): add new stream nonblock interval operator * adjust code * use new scan operator * use new scan operator * add log * fix issue * recover stream scan next function * fix issue * fix issue * fix issue * ignore disorder data * fix issue * fix issue for interval sliding * fix issue * fix ci issue * fix ci issue * fix ci issue * add semi && final nonblock interval operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * refactor(stream): track the msgId for each upstream tasks. * fix(stream): fix race condition. * fix(stream): update the task last msgId when putting into input queue succ. * fix issue * fix issue * put recalculate data to rocksdb * fix issue * fix issue * enh(query)[TD-33071]: add support for saving and restoring tsdbReader scan progress - Implement functionality to save scan progress during tsdbReader operations - Enable resuming scans from the last saved position * fix issue * fix issue * fix issue * fix issue * fix issue * add rec interval check * enh(stream):add recalculate tasks. * enh(stream): support the re-calculate the tasks. * fix issue && do refactor * do refactor * fix issue * fix issue * update backend opt * add new interface * add new session operator * support blob * add new session operator * fix issue * add rec state for rec task * fix invalid read * add new session window * enh(stream): update the stream tasks backend. * new session operator * add pull data * fix(stream): fix error in expand stream backend. * fix issue * fix issue * fix issue * merge code * fix issue * fix(stream): check for null ptr. * fix(stream): add more check. * fix issue * fix issue * fix issue * add debug code * fix issue * fix issue * fix issue * set rec end flag * fix(stream): opt re-calculate stream tasks. * fix issue * fix issue * add new operator * enh(stream): dispatch recalculate block to agg tasks. * fix issue * fix issue * fix(stream): adjust filter. * fix issue * refactor * refactor(stream): adjust the recalculate end block. * fix issue * fix(stream): set correct create trigger block. * fix issue * fix(stream): fix error in build recalculate end block. * fix(stream): check null ptr. * add stream client && fix issue * fix mem leak * fix(stream): free msg. * add stream client * fix(stream): fix error. * add stream client && fix issue * add stream client * refactor(stream): set the recalculate task complete. * add wend and group_id for session window dest table * feat(stream): refactor and set the recalcul agg task complete. * add cfg for adapter * fix issue * add state && event operator * feat(stream): support fill-history task. * add new fill operator * fix(stream): set correct backend when updating fill-history task to recalculate task. * add new fill operator * fix(stream): s2025-03-06 11:10:31.272 et ops always open in scan tsdb * fix(stream):set the correct taskType for sink task. * new fill operator * adj stream fill operator * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * fix issue * adj test * fix issue * fix(stream): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(steam): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix(stream): fix issue * fix: ut com error * fix(stream): fix mem leak and adjust operator type check rule * fix(stream): fix mem leak and adjust test case * refactor code * fix(stream): free items. * fix(stream): free fix memory leak. * fix(stream): fix syntax error. * fix: ignore unexpect block * fix: adjust op type --------- Co-authored-by: Haojun Liao <hjliao@taosdata.com> Co-authored-by: Jinqing Kuang <kuangjinqingcn@gmail.com> Co-authored-by: yihaoDeng <luomoxyz@126.com>
157 lines
7.5 KiB
C
157 lines
7.5 KiB
C
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "storageapi.h"
|
|
#include "streamState.h"
|
|
#include "tstreamUpdate.h"
|
|
|
|
static void initStateStoreAPI(SStateStore* pStore);
|
|
static void initFunctionStateStore(SFunctionStateStore* pStore);
|
|
|
|
void initStreamStateAPI(SStorageAPI* pAPI) {
|
|
initStateStoreAPI(&pAPI->stateStore);
|
|
initFunctionStateStore(&pAPI->functionStore);
|
|
}
|
|
|
|
void initStateStoreAPI(SStateStore* pStore) {
|
|
pStore->streamFileStateInit = streamFileStateInit;
|
|
pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;
|
|
|
|
pStore->streamStatePutParName = streamStatePutParName;
|
|
pStore->streamStateGetParName = streamStateGetParName;
|
|
pStore->streamStateDeleteParName = streamStateDeleteParName;
|
|
pStore->streamStateSetParNameInvalid = streamStateSetParNameInvalid;
|
|
|
|
pStore->streamStateAddIfNotExist = streamStateAddIfNotExist;
|
|
pStore->streamStateReleaseBuf = streamStateReleaseBuf;
|
|
pStore->streamStateClearBuff = streamStateClearBuff;
|
|
pStore->streamStateFreeVal = streamStateFreeVal;
|
|
|
|
pStore->streamStatePut = streamStatePut;
|
|
pStore->streamStateGet = streamStateGet;
|
|
pStore->streamStateCheck = streamStateCheck;
|
|
pStore->streamStateGetByPos = streamStateGetByPos;
|
|
pStore->streamStateDel = streamStateDel;
|
|
pStore->streamStateDelByGroupId = streamStateDelByGroupId;
|
|
pStore->streamStateClear = streamStateClear;
|
|
pStore->streamStateSaveInfo = streamStateSaveInfo;
|
|
pStore->streamStateGetInfo = streamStateGetInfo;
|
|
pStore->streamStateGetNumber = streamStateGetNumber;
|
|
pStore->streamStateDeleteInfo = streamStateDeleteInfo;
|
|
pStore->streamStateSetNumber = streamStateSetNumber;
|
|
pStore->streamStateGetPrev = streamStateGetPrev;
|
|
pStore->streamStateGetAllPrev = streamStateGetAllPrev;
|
|
|
|
pStore->streamStateFillPut = streamStateFillPut;
|
|
pStore->streamStateFillGet = streamStateFillGet;
|
|
pStore->streamStateFillAddIfNotExist = streamStateFillAddIfNotExist;
|
|
pStore->streamStateFillDel = streamStateFillDel;
|
|
pStore->streamStateFillGetNext = streamStateFillGetNext;
|
|
pStore->streamStateFillGetPrev = streamStateFillGetPrev;
|
|
|
|
pStore->streamStateCurNext = streamStateCurNext;
|
|
pStore->streamStateCurPrev = streamStateCurPrev;
|
|
|
|
pStore->streamStateGetAndCheckCur = streamStateGetAndCheckCur;
|
|
pStore->streamStateSeekKeyNext = streamStateSeekKeyNext;
|
|
pStore->streamStateFillSeekKeyNext = streamStateFillSeekKeyNext;
|
|
pStore->streamStateFillSeekKeyPrev = streamStateFillSeekKeyPrev;
|
|
pStore->streamStateFreeCur = streamStateFreeCur;
|
|
|
|
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
|
|
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
|
|
|
|
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
|
|
pStore->streamStateClearExpiredSessionState = streamStateClearExpiredSessionState;
|
|
pStore->streamStateSetRecFlag = streamStateSetRecFlag;
|
|
pStore->streamStateGetRecFlag = streamStateGetRecFlag;
|
|
|
|
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
|
|
pStore->streamStateSessionPut = streamStateSessionPut;
|
|
pStore->streamStateSessionGet = streamStateSessionGet;
|
|
pStore->streamStateSessionDel = streamStateSessionDel;
|
|
pStore->streamStateSessionReset = streamStateSessionReset;
|
|
pStore->streamStateSessionClear = streamStateSessionClear;
|
|
pStore->streamStateSessionGetKVByCur = streamStateSessionGetKVByCur;
|
|
pStore->streamStateStateAddIfNotExist = streamStateStateAddIfNotExist;
|
|
pStore->streamStateSessionGetKeyByRange = streamStateSessionGetKeyByRange;
|
|
pStore->streamStateCountGetKeyByRange = streamStateCountGetKeyByRange;
|
|
pStore->streamStateSessionAllocWinBuffByNextPosition = streamStateSessionAllocWinBuffByNextPosition;
|
|
pStore->streamStateSessionSaveToDisk = streamStateSessionSaveToDisk;
|
|
pStore->streamStateFlushReaminInfoToDisk = streamStateFlushReaminInfoToDisk;
|
|
pStore->streamStateSessionDeleteAll = streamStateSessionDeleteAll;
|
|
|
|
pStore->updateInfoInit = updateInfoInit;
|
|
pStore->updateInfoFillBlockData = updateInfoFillBlockData;
|
|
pStore->updateInfoIsUpdated = updateInfoIsUpdated;
|
|
pStore->updateInfoIsTableInserted = updateInfoIsTableInserted;
|
|
pStore->updateInfoDestroy = updateInfoDestroy;
|
|
pStore->windowSBfDelete = windowSBfDelete;
|
|
pStore->windowSBfAdd = windowSBfAdd;
|
|
pStore->isIncrementalTimeStamp = isIncrementalTimeStamp;
|
|
|
|
pStore->updateInfoInitP = updateInfoInitP;
|
|
pStore->updateInfoAddCloseWindowSBF = updateInfoAddCloseWindowSBF;
|
|
pStore->updateInfoDestoryColseWinSBF = updateInfoDestoryColseWinSBF;
|
|
pStore->updateInfoSerialize = updateInfoSerialize;
|
|
pStore->updateInfoDeserialize = updateInfoDeserialize;
|
|
|
|
pStore->streamStateSessionSeekKeyPrev = streamStateSessionSeekKeyPrev;
|
|
pStore->streamStateSessionSeekKeyNext = streamStateSessionSeekKeyNext;
|
|
pStore->streamStateCountSeekKeyPrev = streamStateCountSeekKeyPrev;
|
|
pStore->streamStateSessionSeekKeyCurrentPrev = streamStateSessionSeekKeyCurrentPrev;
|
|
pStore->streamStateSessionSeekKeyCurrentNext = streamStateSessionSeekKeyCurrentNext;
|
|
|
|
pStore->streamStateGroupPut = streamStateGroupPut;
|
|
pStore->streamStateGroupGetCur = streamStateGroupGetCur;
|
|
pStore->streamStateGroupCurNext = streamStateGroupCurNext;
|
|
pStore->streamStateGroupGetKVByCur = streamStateGroupGetKVByCur;
|
|
|
|
pStore->streamFileStateDestroy = streamFileStateDestroy;
|
|
pStore->streamFileStateClear = streamFileStateClear;
|
|
pStore->needClearDiskBuff = needClearDiskBuff;
|
|
|
|
pStore->streamStateGetAndSetTsData = streamStateGetAndSetTsData;
|
|
pStore->streamStateTsDataCommit = streamStateTsDataCommit;
|
|
pStore->streamStateInitTsDataState = streamStateInitTsDataState;
|
|
pStore->streamStateDestroyTsDataState = streamStateDestroyTsDataState;
|
|
pStore->streamStateRecoverTsData = streamStateRecoverTsData;
|
|
pStore->streamStateReloadTsDataState = streamStateReloadTsDataState;
|
|
pStore->streamStateMergeAndSaveScanRange = streamStateMergeAndSaveScanRange;
|
|
pStore->streamStateMergeAllScanRange = streamStateMergeAllScanRange;
|
|
pStore->streamStatePopScanRange = streamStatePopScanRange;
|
|
|
|
pStore->streamStateCheckSessionState = streamStateCheckSessionState;
|
|
pStore->streamStateGetLastStateCur = streamStateGetLastStateCur;
|
|
pStore->streamStateLastStateCurNext = streamStateLastStateCurNext;
|
|
pStore->streamStateNLastStateGetKVByCur = streamStateNLastStateGetKVByCur;
|
|
pStore->streamStateGetLastSessionStateCur = streamStateGetLastSessionStateCur;
|
|
pStore->streamStateLastSessionStateCurNext = streamStateLastSessionStateCurNext;
|
|
pStore->streamStateNLastSessionStateGetKVByCur = streamStateNLastSessionStateGetKVByCur;
|
|
|
|
pStore->streamStateOpen = streamStateOpen;
|
|
pStore->streamStateRecalatedOpen = streamStateRecalatedOpen;
|
|
pStore->streamStateClose = streamStateClose;
|
|
pStore->streamStateBegin = streamStateBegin;
|
|
pStore->streamStateCommit = streamStateCommit;
|
|
pStore->streamStateDestroy = streamStateDestroy;
|
|
pStore->streamStateReloadInfo = streamStateReloadInfo;
|
|
pStore->streamStateCopyBackend = streamStateCopyBackend;
|
|
}
|
|
|
|
void initFunctionStateStore(SFunctionStateStore* pStore) {
|
|
pStore->streamStateFuncPut = streamStateFuncPut;
|
|
pStore->streamStateFuncGet = streamStateFuncGet;
|
|
}
|