/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #ifndef _TD_MND_STREAM_H_ #define _TD_MND_STREAM_H_ #include "stream.h" #include "mndInt.h" #include "mndTrans.h" #ifdef __cplusplus extern "C" { #endif typedef enum { STM_ERR_TASK_NOT_EXISTS = 1, } EStmErrType; #define MND_STM_STATE_WATCH 1 #define MND_STM_STATE_NORMAL 2 #define MND_STM_STATE_RESTART 3 static const char* gMndStreamState[] = {"X", "W", "N", "R"}; #define MND_STREAM_RUNNER_DEPLOY_NUM 3 #define MND_STREAM_ISOLATION_PERIOD_NUM 3 #define MND_STREAM_REPORT_PERIOD (STREAM_HB_INTERVAL_MS * STREAM_MAX_GROUP_NUM) #define MND_STREAM_WATCH_DURATION (MND_STREAM_REPORT_PERIOD * MND_STREAM_ISOLATION_PERIOD_NUM) #define MND_STREAM_HEALTH_CHECK_PERIOD_SEC (MND_STREAM_REPORT_PERIOD / 1000) #define STREAM_RUNNER_MAX_DEPLOYS #define STREAM_RUNNER_MAX_REPLICA #define STREAM_ACT_DEPLOY 0 #define STREAM_ACT_UNDEPLOY 1 #define STREAM_ACT_START 2 static const char* gMndStreamAction[] = {"DEPLOY", "UNDEPLOY", "START"}; #define STREAM_FLAG_TRIGGER_READER (1 << 0) #define STREAM_FLAG_TOP_RUNNER (1 << 1) #define STREAM_IS_TRIGGER_READER(_flags) ((_flags) & STREAM_FLAG_TRIGGER_READER) #define STREAM_IS_TOP_RUNNER(_flags) ((_flags) & STREAM_FLAG_TOP_RUNNER) #define MND_STREAM_RESERVE_SIZE 64 #define MND_STREAM_VER_NUMBER 6 #define MND_STREAM_TRIGGER_NAME_SIZE 20 #define MND_STREAM_DEFAULT_NUM 100 #define MND_STREAM_DEFAULT_TASK_NUM 200 #define MND_SET_RUNNER_TASKIDX(_level, _idx) (((_level) << 16) & (_idx)) #define MND_GET_RUNNER_SUBPLANID(_id) ((_id) &0xFFFFFFFF) #define MND_STREAM_CREATE_NAME "stream-create" #define MND_STREAM_START_NAME "stream-start" #define MND_STREAM_DROP_NAME "stream-drop" #define MND_STREAM_STOP_NAME "stream-stop" #define GOT_SNODE(_snodeId) ((_snodeId) != 0) #define STREAM_IS_RUNNING(_status) (STREAM_STATUS_RUNNING == (_status)) // clang-format off #define mstFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("MSTM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define mstError(...) do { if (stDebugFlag & DEBUG_ERROR) { taosPrintLog("MSTM ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define mstWarn(...) do { if (stDebugFlag & DEBUG_WARN) { taosPrintLog("MSTM WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define mstInfo(...) do { if (stDebugFlag & DEBUG_INFO) { taosPrintLog("MSTM INFO ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define mstDebug(...) do { if (stDebugFlag & DEBUG_DEBUG) { taosPrintLog("MSTM DEBUG ", DEBUG_DEBUG, stDebugFlag, __VA_ARGS__); }} while(0) #define mstDebugL(...) do { if (stDebugFlag & DEBUG_DEBUG) { taosPrintLongString("MSTM DEBUG ", DEBUG_DEBUG, stDebugFlag, __VA_ARGS__); }} while(0) #define mstTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("MSTM TRACE ", DEBUG_TRACE, stDebugFlag, __VA_ARGS__); }} while(0) // clang-format on #define msttFatal(param, ...) \ mstFatal("%s NODE:%d %" PRIx64 " %s TASK:%" PRIx64 " SID:%" PRId64 " " param, gMndStreamState[mStreamMgmt.state],\ ((SStreamTask *)pTask)->nodeId, ((SStreamTask *)pTask)->streamId, gStreamTaskTypeStr[((SStreamTask *)pTask)->type], \ ((SStreamTask *)pTask)->taskId, ((SStreamTask *)pTask)->seriousId, __VA_ARGS__) #define msttError(param, ...) \ mstError("%s NODE:%d %" PRIx64 " %s TASK:%" PRIx64 " SID:%" PRId64 " " param, gMndStreamState[mStreamMgmt.state],\ ((SStreamTask *)pTask)->nodeId, ((SStreamTask *)pTask)->streamId, gStreamTaskTypeStr[((SStreamTask *)pTask)->type], \ ((SStreamTask *)pTask)->taskId, ((SStreamTask *)pTask)->seriousId, __VA_ARGS__) #define msttWarn(param, ...) \ mstWarn("%s NODE:%d %" PRIx64 " %s TASK:%" PRIx64 " SID:%" PRId64 " " param, gMndStreamState[mStreamMgmt.state],\ ((SStreamTask *)pTask)->nodeId, ((SStreamTask *)pTask)->streamId, gStreamTaskTypeStr[((SStreamTask *)pTask)->type], \ ((SStreamTask *)pTask)->taskId, ((SStreamTask *)pTask)->seriousId, __VA_ARGS__) #define msttInfo(param, ...) \ mstInfo("%s NODE:%d %" PRIx64 " %s TASK:%" PRIx64 " SID:%" PRId64 " " param, gMndStreamState[mStreamMgmt.state],\ ((SStreamTask *)pTask)->nodeId, ((SStreamTask *)pTask)->streamId, gStreamTaskTypeStr[((SStreamTask *)pTask)->type], \ ((SStreamTask *)pTask)->taskId, ((SStreamTask *)pTask)->seriousId, __VA_ARGS__) #define msttDebug(param, ...) \ mstDebug("%s NODE:%d %" PRIx64 " %s TASK:%" PRIx64 " SID:%" PRId64 " " param, gMndStreamState[mStreamMgmt.state],\ ((SStreamTask *)pTask)->nodeId, ((SStreamTask *)pTask)->streamId, gStreamTaskTypeStr[((SStreamTask *)pTask)->type], \ ((SStreamTask *)pTask)->taskId, ((SStreamTask *)pTask)->seriousId, __VA_ARGS__) #define msttDebugL(param, ...) \ mstDebugL("%s NODE:%d %" PRIx64 " %s TASK:%" PRIx64 " SID:%" PRId64 " " param, gMndStreamState[mStreamMgmt.state],\ ((SStreamTask *)pTask)->nodeId, ((SStreamTask *)pTask)->streamId, gStreamTaskTypeStr[((SStreamTask *)pTask)->type], \ ((SStreamTask *)pTask)->taskId, ((SStreamTask *)pTask)->seriousId, __VA_ARGS__) #define msttTrace(param, ...) \ mstTrace("%s NODE:%d %" PRIx64 " %s TASK:%" PRIx64 " SID:%" PRId64 " " param, gMndStreamState[mStreamMgmt.state],\ ((SStreamTask *)pTask)->nodeId, ((SStreamTask *)pTask)->streamId, gStreamTaskTypeStr[((SStreamTask *)pTask)->type], \ ((SStreamTask *)pTask)->taskId, ((SStreamTask *)pTask)->seriousId, __VA_ARGS__) #define mstsFatal(param, ...) mstFatal("%s %" PRIx64 " " param, gMndStreamState[mStreamMgmt.state], streamId, __VA_ARGS__) #define mstsError(param, ...) mstError("%s %" PRIx64 " " param, gMndStreamState[mStreamMgmt.state], streamId, __VA_ARGS__) #define mstsWarn(param, ...) mstWarn("%s %" PRIx64 " " param, gMndStreamState[mStreamMgmt.state], streamId, __VA_ARGS__) #define mstsInfo(param, ...) mstInfo("%s %" PRIx64 " " param, gMndStreamState[mStreamMgmt.state], streamId, __VA_ARGS__) #define mstsDebug(param, ...) mstDebug("%s %" PRIx64 " " param, gMndStreamState[mStreamMgmt.state], streamId, __VA_ARGS__) #define mstsDebugL(param, ...) mstDebugL("%s %" PRIx64 " " param, gMndStreamState[mStreamMgmt.state], streamId, __VA_ARGS__) #define mstsTrace(param, ...) mstTrace("%s %" PRIx64 " " param, gMndStreamState[mStreamMgmt.state], streamId, __VA_ARGS__) typedef struct SStmStreamAction { int64_t streamId; char streamName[TSDB_STREAM_FNAME_LEN]; } SStmStreamAction; typedef struct SStmTaskId { int64_t taskId; // KEEP IT FIRST int32_t deployId; // only for runner task int64_t seriousId; int32_t nodeId; int32_t taskIdx; } SStmTaskId; typedef struct SStmTaskStatus { SStmTaskId id; EStreamTaskType type; int64_t flags; EStreamStatus status; int64_t lastUpTs; } SStmTaskStatus; typedef struct SStmTaskAction { // KEEP IT TOGETHER int64_t streamId; SStmTaskId id; // KEEP IT TOGETHER // for snode runner redeploy int32_t deployNum; int32_t deployId[MND_STREAM_RUNNER_DEPLOY_NUM]; SStmTaskStatus* triggerStatus; EStreamTaskType type; bool multiRunner; int64_t flag; } SStmTaskAction; typedef union { SStmStreamAction stream; SStmTaskAction task; } SStmQAction; typedef struct SStmQNode { int32_t type; bool streamAct; SStmQAction action; void* next; } SStmQNode; typedef struct SStmActionQ { bool stopQueue; SRWLatch lock; SStmQNode* head; SStmQNode* tail; uint64_t qRemainNum; } SStmActionQ; typedef struct SVgroupChangeInfo { SHashObj *pDBMap; SArray *pUpdateNodeList; // SArray } SVgroupChangeInfo; typedef struct SStmTaskSrcAddr { bool isFromCache; int64_t taskId; int32_t vgId; int32_t groupId; SEpSet epset; } SStmTaskSrcAddr; typedef struct SStmStatus { // static part char* streamName; int32_t runnerNum; // task num for one deploy int32_t runnerDeploys; int32_t runnerReplica; bool allTaskBuilt; int64_t lastActionTs; SArray* trigReaders; // SArray SArray* calcReaders; // SArray SStmTaskStatus* triggerTask; SArray* runners[MND_STREAM_RUNNER_DEPLOY_NUM]; // SArray } SStmStatus; typedef struct SStmTaskStatusExt{ int64_t streamId; SStmTaskStatus* status; } SStmTaskStatusExt; typedef struct SStmSnodeStreamStatus { SStmTaskStatus* trigger; SArray* runners[MND_STREAM_RUNNER_DEPLOY_NUM]; // SArray } SStmSnodeStreamStatus; typedef struct SStmSnodeStatus { int32_t runnerThreadNum; // runner thread num in snode int64_t lastUpTs; SHashObj* streamTasks; // streamId => SStmSnodeStreamStatus } SStmSnodeStatus; typedef struct SStmVgStreamStatus { SArray* trigReaders; // SArray SArray* calcReaders; // SArray } SStmVgStreamStatus; typedef struct SStmVgroupStatus { SHashObj* streamTasks; // streamId => SStmVgStreamStatus SArray* taskList; // SArray } SStmVgroupStatus; typedef struct SStmTaskToDeployExt { bool deployed; bool lowestRunner; int32_t deployId; // only for runner task SStmTaskDeploy deploy; } SStmTaskToDeployExt; typedef struct SStmVgTasksToDeploy { SRWLatch lock; int64_t streamVer; int32_t deployed; SArray* taskList; // SArray } SStmVgTasksToDeploy; typedef struct SStmSnodeTasksDeploy { SRWLatch lock; int32_t triggerDeployed; int32_t runnerDeployed; SArray* triggerList; // SArray SArray* runnerList; // SArray } SStmSnodeTasksDeploy; typedef struct SStmStreamUndeploy{ SArray* taskList; // SArray int8_t doCheckpoint; int8_t doCleanup; } SStmStreamUndeploy; typedef struct SStmAction { int32_t actions; SStmStreamDeploy deploy; SStmStreamUndeploy undeploy; } SStmAction; typedef struct SStmGrpCtx { SMnode* pMnode; int64_t currTs; SStreamHbMsg* pReq; SMStreamHbRspMsg* pRsp; int32_t tidx; // status update int32_t taskNum; // reserved for trigger task deploy int64_t triggerTaskId; int32_t triggerNodeId; SHashObj* deployStm; SHashObj* actionStm; } SStmGrpCtx; typedef struct SStmThreadCtx { SStmGrpCtx grpCtx[STREAM_MAX_GROUP_NUM]; SHashObj* deployStm[STREAM_MAX_GROUP_NUM]; // streamId => SStmStreamDeploy SHashObj* actionStm[STREAM_MAX_GROUP_NUM]; // streamId => SStmAction } SStmThreadCtx; typedef struct SStmHealthCheckCtx { bool checkAll; int32_t slotIdx; int64_t currentTs; int32_t validStreamNum; } SStmHealthCheckCtx; typedef struct SStmRuntimeStat { int64_t activeTimes; int64_t inactiveTimes; } SStmRuntimeStat; typedef struct SStmWatchCtx { int8_t ending; int8_t taskRemains; int32_t processing; } SStmWatchCtx; typedef struct SStmCheckStatusCtx { bool checkAll; int32_t code; int32_t handledNum; int32_t checkedNum; } SStmCheckStatusCtx; typedef struct SStmRuntime { int8_t active; SRWLatch runtimeLock; int32_t activeStreamNum; int64_t profile; int64_t activeBeginTs; int8_t state; int32_t fatalError; SRWLatch actionQLock; SStmActionQ* actionQ; int32_t threadNum; SStmThreadCtx* tCtx; int64_t lastTaskId; // ST SHashObj* streamMap; // streamId => SStmStatus SHashObj* taskMap; // streamId + taskId => SStmTaskStatus* SHashObj* vgroupMap; // vgId => SStmVgroupStatus (only reader tasks) SHashObj* snodeMap; // snodeId => SStmSnodeStatus (only trigger and runner tasks) SHashObj* dnodeMap; // dnodeId => lastUpTs // TD int32_t toDeployVgTaskNum; SHashObj* toDeployVgMap; // vgId => SStmVgTasksToDeploy (only reader tasks) int32_t toDeploySnodeTaskNum; SHashObj* toDeploySnodeMap; // snodeId => SStmSnodeTasksDeploy (only trigger and runner tasks) // UP int32_t toUpdateScanNum; SHashObj* toUpdateScanMap; // streamId + subplanId => SStmTaskSrcAddr (only scan's target runner tasks) // HEALTH SStmHealthCheckCtx healthCtx; SStmWatchCtx watch; SStmRuntimeStat stat; } SStmRuntime; extern SStmRuntime mStreamMgmt; int32_t mndInitStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode); int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream); int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream); void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mstGetStreamsNumInDb(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, int32_t retryCode, int32_t acceptCode); int32_t doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, const char *pMsg, STrans **pTrans1); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj **pStream); int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb); int32_t mndProcessStreamHb(SRpcMsg *pReq); int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); int32_t mndStreamSetStopStreamTasksActions(SMnode* pMnode, STrans *pTrans, uint64_t dbUid); int32_t msmInitRuntimeInfo(SMnode *pMnode); int32_t mndStreamTransAppend(SStreamObj *pStream, STrans *pTrans, int32_t status); int32_t mndStreamCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, ETrnConflct conflict, const char *name, STrans **ppTrans); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); int32_t mstCheckSnodeExists(SMnode *pMnode); void mstSetTaskStatusFromMsg(SStmGrpCtx* pCtx, SStmTaskStatus* pTask, SStmTaskStatusMsg* pMsg); void msmClearStreamToDeployMaps(SStreamHbMsg* pHb); void msmCleanStreamGrpCtx(SStreamHbMsg* pHb); int32_t msmHandleStreamHbMsg(SMnode* pMnode, int64_t currTs, SStreamHbMsg* pHb, SMStreamHbRspMsg* pRsp); int32_t msmHandleGrantExpired(SMnode *pMnode); bool mndStreamActionDequeue(SStmActionQ* pQueue, SStmQNode **param); void msmHandleBecomeLeader(SMnode *pMnode); void msmHandleBecomeNotLeader(SMnode *pMnode); int32_t msmUndeployStream(SMnode* pMnode, int64_t streamId, char* streamName); int32_t mstIsStreamDropped(SMnode *pMnode, int64_t streamId, bool* dropped); void mstWaitRLock(SRWLatch* pLock); void msmHealthCheck(SMnode *pMnode, bool checkAll, bool needLock); void mndStreamPostAction(SStmActionQ* actionQ, int64_t streamId, char* streamName, int32_t action); void mndStreamPostTaskAction(SStmActionQ* actionQ, SStmTaskAction* pAction, int32_t action); int32_t msmAssignRandomSnodeId(SMnode* pMnode, int64_t streamId); int32_t msmCheckSnodeReassign(SMnode *pMnode, SSnodeObj* pSnode, SArray** ppRes); void mndStreamLogSStreamObj(char* tips, SStreamObj* p); void mndStreamLogSStmStatus(char* tips, int64_t streamId, SStmStatus* p); void mstDestroySStmVgStreamStatus(void* p); void mstDestroyVgroupStatus(SStmVgroupStatus* pVgStatus); void mstDestroySStmSnodeStreamStatus(void* p); #ifdef __cplusplus } #endif #endif /*_TD_MND_STREAM_H_*/