/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "mndStream.h" #include "audit.h" #include "mndDb.h" #include "mndPrivilege.h" #include "mndShow.h" #include "mndStb.h" #include "mndTrans.h" #include "mndUser.h" #include "osMemory.h" #include "parser.h" #include "taoserror.h" #include "tmisce.h" #include "tname.h" #define MND_STREAM_MAX_NUM 100000 typedef struct { int8_t placeHolder; // // to fix windows compile error, define place holder } SMStreamNodeCheckMsg; static int32_t mndNodeCheckSentinel = 0; SStmRuntime mStreamMgmt = {0}; static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq); static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); static int32_t mndProcessStopStreamReq(SRpcMsg *pReq); static int32_t mndProcessStartStreamReq(SRpcMsg *pReq); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw); static int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream); static int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream); static int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); void mndCleanupStream(SMnode *pMnode) { mDebug("try to clean up stream"); msmHandleBecomeNotLeader(pMnode); mDebug("mnd stream runtime info cleanup"); } SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { int32_t code = 0; int32_t lino = 0; SSdbRow *pRow = NULL; SStreamObj *pStream = NULL; void *buf = NULL; int8_t sver = 0; int32_t tlen; int32_t dataPos = 0; code = sdbGetRawSoftVer(pRaw, &sver); TSDB_CHECK_CODE(code, lino, _over); if (sver != MND_STREAM_VER_NUMBER && sver != MND_STREAM_COMPATIBLE_VER_NUMBER) { mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER); goto _over; } pRow = sdbAllocRow(sizeof(SStreamObj)); TSDB_CHECK_NULL(pRow, code, lino, _over, terrno); pStream = sdbGetRowObj(pRow); TSDB_CHECK_NULL(pStream, code, lino, _over, terrno); SDB_GET_INT32(pRaw, dataPos, &tlen, _over); buf = taosMemoryMalloc(tlen + 1); TSDB_CHECK_NULL(buf, code, lino, _over, terrno); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, _over); SDecoder decoder; tDecoderInit(&decoder, buf, tlen + 1); code = tDecodeSStreamObj(&decoder, pStream, sver); tDecoderClear(&decoder); if (code < 0) { tFreeStreamObj(pStream); } _over: taosMemoryFreeClear(buf); if (code != TSDB_CODE_SUCCESS) { char *p = (pStream == NULL || NULL == pStream->pCreate) ? "null" : pStream->pCreate->name; mError("stream:%s, failed to decode from raw:%p since %s at:%d", p, pRaw, tstrerror(code), lino); taosMemoryFreeClear(pRow); terrno = code; return NULL; } else { mTrace("stream:%s, decode from raw:%p, row:%p", pStream->pCreate->name, pRaw, pStream); terrno = 0; return pRow; } } static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) { mTrace("stream:%s, perform insert action", pStream->pCreate->name); return 0; } static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) { mInfo("stream:%s, perform delete action", pStream->pCreate->name); tFreeStreamObj(pStream); return 0; } static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream) { mTrace("stream:%s, perform update action", pOldStream->pCreate->name); atomic_store_32(&pOldStream->mainSnodeId, pNewStream->mainSnodeId); atomic_store_8(&pOldStream->userStopped, atomic_load_8(&pNewStream->userStopped)); pOldStream->ownerId = pNewStream->ownerId; pOldStream->updateTime = pNewStream->updateTime; return 0; } int32_t mndAcquireStream(SMnode *pMnode, char *streamName, SStreamObj **pStream) { int32_t code = 0; SSdb *pSdb = pMnode->pSdb; (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName); if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { code = TSDB_CODE_MND_STREAM_NOT_EXIST; } return code; } static bool mndStreamGetNameFromId(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SStreamObj* pStream = pObj; if (pStream->pCreate->streamId == *(int64_t*)p1) { strncpy((char*)p2, pStream->name, TSDB_STREAM_NAME_LEN); return false; } return true; } int32_t mndAcquireStreamById(SMnode *pMnode, int64_t streamId, SStreamObj **pStream) { int32_t code = 0; SSdb *pSdb = pMnode->pSdb; char streamName[TSDB_STREAM_NAME_LEN]; streamName[0] = 0; sdbTraverse(pSdb, SDB_STREAM, mndStreamGetNameFromId, &streamId, streamName, NULL); if (streamName[0]) { (*pStream) = sdbAcquire(pSdb, SDB_STREAM, streamName); if ((*pStream) == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { code = TSDB_CODE_MND_STREAM_NOT_EXIST; } } return code; } void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { SSdb *pSdb = pMnode->pSdb; sdbRelease(pSdb, pStream); } SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; } SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; } int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; } int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; } int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; } static void mndStreamBuildObj(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate, SUserObj *pOperUser, int32_t snodeId) { int32_t code = 0; pObj->pCreate = pCreate; strncpy(pObj->name, pCreate->name, TSDB_STREAM_FNAME_LEN); (void)snprintf(pObj->createUser, sizeof(pObj->createUser), "%s", pOperUser->name); pObj->ownerId = pOperUser->uid; pObj->mainSnodeId = snodeId; pObj->userDropped = 0; pObj->userStopped = 0; pObj->createTime = taosGetTimestampMs(); pObj->updateTime = pObj->createTime; mstLogSStreamObj("create stream", pObj); } static int32_t mndStreamCreateOutStb(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *user) { SStbObj *pStb = NULL; SDbObj *pDb = NULL; int32_t code = 0; int32_t lino = 0; SMCreateStbReq createReq = {0}; TAOS_STRNCAT(createReq.name, pStream->outDB, TSDB_DB_FNAME_LEN); TAOS_STRNCAT(createReq.name, ".", 2); TAOS_STRNCAT(createReq.name, pStream->outTblName, TSDB_TABLE_NAME_LEN); createReq.numOfColumns = taosArrayGetSize(pStream->outCols); createReq.numOfTags = pStream->outTags ? taosArrayGetSize(pStream->outTags) : 1; createReq.pColumns = taosArrayInit_s(sizeof(SFieldWithOptions), createReq.numOfColumns); TSDB_CHECK_NULL(createReq.pColumns, code, lino, _OVER, terrno); // build fields for (int32_t i = 0; i < createReq.numOfColumns; i++) { SFieldWithOptions *pField = taosArrayGet(createReq.pColumns, i); TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno); SFieldWithOptions *pSrc = taosArrayGet(pStream->outCols, i); tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN); pField->flags = pSrc->flags; pField->type = pSrc->type; pField->bytes = pSrc->bytes; pField->compress = createDefaultColCmprByType(pField->type); if (IS_DECIMAL_TYPE(pField->type)) { pField->typeMod = pSrc->typeMod; pField->flags |= COL_HAS_TYPE_MOD; } } if (NULL == pStream->outTags) { createReq.numOfTags = 1; createReq.pTags = taosArrayInit_s(sizeof(SField), 1); TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno); // build tags SField *pField = taosArrayGet(createReq.pTags, 0); TSDB_CHECK_NULL(pField, code, lino, _OVER, terrno); tstrncpy(pField->name, "group_id", sizeof(pField->name)); pField->type = TSDB_DATA_TYPE_UBIGINT; pField->flags = 0; pField->bytes = 8; } else { createReq.numOfTags = taosArrayGetSize(pStream->outTags); createReq.pTags = taosArrayInit_s(sizeof(SField), createReq.numOfTags); TSDB_CHECK_NULL(createReq.pTags, code, lino, _OVER, terrno); for (int32_t i = 0; i < createReq.numOfTags; i++) { SField *pField = taosArrayGet(createReq.pTags, i); if (pField == NULL) { continue; } TAOS_FIELD_E *pSrc = taosArrayGet(pStream->outTags, i); pField->bytes = pSrc->bytes; pField->flags = 0; pField->type = pSrc->type; tstrncpy(pField->name, pSrc->name, TSDB_COL_NAME_LEN); } } if ((code = mndCheckCreateStbReq(&createReq)) != 0) { goto _OVER; } pStb = mndAcquireStb(pMnode, createReq.name); if (pStb != NULL) { code = TSDB_CODE_MND_STB_ALREADY_EXIST; goto _OVER; } pDb = mndAcquireDbByStb(pMnode, createReq.name); if (pDb == NULL) { code = TSDB_CODE_MND_DB_NOT_SELECTED; goto _OVER; } int32_t numOfStbs = -1; if (mndGetNumOfStbs(pMnode, pDb->name, &numOfStbs) != 0) { goto _OVER; } if (pDb->cfg.numOfStables == 1 && numOfStbs != 0) { code = TSDB_CODE_MND_SINGLE_STB_MODE_DB; goto _OVER; } SStbObj stbObj = {0}; if (mndBuildStbFromReq(pMnode, &stbObj, &createReq, pDb) != 0) { goto _OVER; } stbObj.uid = pStream->outStbUid; if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) { mndFreeStb(&stbObj); goto _OVER; } mDebug("stream:%s create dst stable:%s, cols:%d", pStream->name, pStream->outTblName, createReq.numOfColumns); tFreeSMCreateStbReq(&createReq); mndFreeStb(&stbObj); mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); return code; _OVER: tFreeSMCreateStbReq(&createReq); mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); mDebug("stream:%s failed to create dst stable:%s, line:%d code:%s", pStream->name, pStream->outTblName, lino, tstrerror(code)); return code; } static int32_t mndStreamValidateCreate(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq* pCreate) { int32_t code = 0, lino = 0; int64_t streamId = pCreate->streamId; char *pUser = RPC_MSG_USER(pReq); if (pCreate->streamDB) { // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->streamDB); code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pCreate->streamDB); if (code) { mstsError("user %s failed to create stream %s in db %s since %s", pUser, pCreate->name, pCreate->streamDB, tstrerror(code)); } TSDB_CHECK_CODE(code, lino, _OVER); } if (pCreate->triggerDB) { // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, pCreate->triggerDB); code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pCreate->triggerDB); if (code) { mstsError("user %s failed to create stream %s using trigger db %s since %s", pUser, pCreate->name, pCreate->triggerDB, tstrerror(code)); } TSDB_CHECK_CODE(code, lino, _OVER); #if 0 // TODO check the owner of trigger table if (pCreate->triggerTblName) { // check trigger table privilege code = mndCheckObjPrivilegeRecF(pMnode, pUser, PRIV_TBL_SELECT, "", pCreate->triggerDB, pCreate->triggerTblName); if (code) { mstsError("user %s failed to create stream %s using trigger table %s.%s since %s", pUser, pCreate->name, pCreate->triggerDB, pCreate->triggerTblName, tstrerror(code)); } TSDB_CHECK_CODE(code, lino, _OVER); } #endif } if (pCreate->calcDB) { int32_t dbNum = taosArrayGetSize(pCreate->calcDB); for (int32_t i = 0; i < dbNum; ++i) { char *calcDB = taosArrayGetP(pCreate->calcDB, i); // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_READ_DB, calcDB); code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, calcDB); if (code) { mstsError("user %s failed to create stream %s using calcDB %s since %s", pUser, pCreate->name, calcDB, tstrerror(code)); } TSDB_CHECK_CODE(code, lino, _OVER); } } if (pCreate->outDB) { // code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, pCreate->outDB); code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pCreate->outDB); if (code) { mstsError("user %s failed to create stream %s using out db %s since %s", pUser, pCreate->name, pCreate->outDB, tstrerror(code)); } TSDB_CHECK_CODE(code, lino, _OVER); } int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM); if (streamNum > MND_STREAM_MAX_NUM) { code = TSDB_CODE_MND_TOO_MANY_STREAMS; mstsError("failed to create stream %s since %s, stream number:%d", pCreate->name, tstrerror(code), streamNum); return code; } _OVER: return code; } int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; int32_t code = 0; while (1) { SStreamObj *pStream = NULL; pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; if (0 == strcmp(pStream->pCreate->streamDB, pDb->name)) { mInfo("start to drop stream %s in db %s", pStream->pCreate->name, pDb->name); pStream->updateTime = taosGetTimestampMs(); atomic_store_8(&pStream->userDropped, 1); MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime); msmUndeployStream(pMnode, pStream->pCreate->streamId, pStream->pCreate->name); // drop stream code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED); if (code) { mError("drop db trans:%d failed to append drop stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pSdb, pStream); sdbCancelFetch(pSdb, pIter); TAOS_RETURN(code); } } sdbRelease(pSdb, pStream); } return 0; } static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; SUserObj *pOperUser = NULL; int32_t code = 0, lino = 0; bool showAll = false; TAOS_CHECK_EXIT(mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser)); showAll = (0 == mndCheckObjPrivilegeRec(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, 0, pOperUser->acctId, "*", "*")); while (numOfRows < rows) { pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); if (pShow->pIter == NULL) break; if (!showAll) { if ((mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_SHOW, PRIV_OBJ_STREAM, pStream->ownerId, pStream->pCreate->streamDB, pStream->pCreate->name))) { sdbRelease(pSdb, pStream); continue; } } code = mstSetStreamAttrResBlock(pMnode, pStream, pBlock, numOfRows); if (code == 0) { numOfRows++; } sdbRelease(pSdb, pStream); } code = 0; pShow->numOfRows += numOfRows; _exit: mndReleaseUser(pMnode, pOperUser); if (code != 0) { mError("failed to retrieve stream list at line %d since %s", lino, tstrerror(code)); TAOS_RETURN(code); } return numOfRows; } static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetchByType(pSdb, pIter, SDB_STREAM); } static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; int32_t code = 0; while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); if (pShow->pIter == NULL) { break; } code = mstSetStreamTasksResBlock(pStream, pBlock, &numOfRows, rowsCapacity); sdbRelease(pSdb, pStream); } pShow->numOfRows += numOfRows; return numOfRows; } static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetchByType(pSdb, pIter, SDB_STREAM); } static int32_t mndRetrieveStreamRecalculates(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SStreamObj *pStream = NULL; int32_t code = 0; while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); if (pShow->pIter == NULL) { break; } code = mstSetStreamRecalculatesResBlock(pStream, pBlock, &numOfRows, rowsCapacity); sdbRelease(pSdb, pStream); } pShow->numOfRows += numOfRows; return numOfRows; } static void mndCancelGetNextStreamRecalculates(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetchByType(pSdb, pIter, SDB_STREAM); } static bool mndStreamUpdateTagsFlag(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SStreamObj *pStream = pObj; if (atomic_load_8(&pStream->userDropped)) { return true; } if (TSDB_SUPER_TABLE != pStream->pCreate->triggerTblType && TSDB_CHILD_TABLE != pStream->pCreate->triggerTblType && TSDB_VIRTUAL_CHILD_TABLE != pStream->pCreate->triggerTblType) { return true; } if (pStream->pCreate->triggerTblSuid != *(uint64_t*)p1) { return true; } if (NULL == pStream->pCreate->partitionCols) { return true; } SNodeList* pList = NULL; int32_t code = nodesStringToList(pStream->pCreate->partitionCols, &pList); if (code) { nodesDestroyList(pList); mstError("partitionCols [%s] nodesStringToList failed with error:%s", (char*)pStream->pCreate->partitionCols, tstrerror(code)); return true; } SSchema* pTags = (SSchema*)p2; int32_t* tagNum = (int32_t*)p3; SNode* pNode = NULL; FOREACH(pNode, pList) { SColumnNode* pCol = (SColumnNode*)pNode; for (int32_t i = 0; i < *tagNum; ++i) { if (pCol->colId == pTags[i].colId) { pTags[i].flags |= COL_REF_BY_STM; break; } } } nodesDestroyList(pList); return true; } void mndStreamUpdateTagsRefFlag(SMnode *pMnode, int64_t suid, SSchema* pTags, int32_t tagNum) { int32_t streamNum = sdbGetSize(pMnode->pSdb, SDB_STREAM); if (streamNum <= 0) { return; } sdbTraverse(pMnode->pSdb, SDB_STREAM, mndStreamUpdateTagsFlag, &suid, pTags, &tagNum); } static int32_t mndProcessStopStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SUserObj *pOperUser = NULL; int32_t code = 0; SMPauseStreamReq pauseReq = {0}; if (tDeserializeSMPauseStreamReq(pReq->pCont, pReq->contLen, &pauseReq) < 0) { TAOS_RETURN(TSDB_CODE_INVALID_MSG); } code = mndAcquireStream(pMnode, pauseReq.name, &pStream); if (pStream == NULL || code != 0) { if (pauseReq.igNotExists) { mInfo("stream:%s, not exist, not stop stream", pauseReq.name); taosMemoryFree(pauseReq.name); return 0; } else { mError("stream:%s not exist, failed to stop stream", pauseReq.name); taosMemoryFree(pauseReq.name); TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); } } taosMemoryFree(pauseReq.name); int64_t streamId = pStream->pCreate->streamId; mstsInfo("start to stop stream %s", pStream->name); // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB); if((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) { mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pStream->pCreate->streamDB)) || (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_STOP, PRIV_OBJ_STREAM, pStream->ownerId, pStream->pCreate->streamDB, pStream->pCreate->name))) { mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code)); mndReleaseUser(pMnode, pOperUser); sdbRelease(pMnode->pSdb, pStream); return code; } mndReleaseUser(pMnode, pOperUser); // release user after privilege check if (atomic_load_8(&pStream->userDropped)) { code = TSDB_CODE_MND_STREAM_DROPPING; mstsError("user %s failed to stop stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } STrans *pTrans = NULL; code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_STOP_NAME, &pTrans); if (pTrans == NULL || code) { mstsError("failed to stop stream %s since %s", pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } pStream->updateTime = taosGetTimestampMs(); atomic_store_8(&pStream->userStopped, 1); MND_STREAM_SET_LAST_TS(STM_EVENT_STOP_STREAM, pStream->updateTime); msmUndeployStream(pMnode, streamId, pStream->name); // stop stream code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return TSDB_CODE_ACTION_IN_PROGRESS; } static int32_t mndProcessStartStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SUserObj *pOperUser = NULL; int32_t code = 0; if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { return code; } SMResumeStreamReq resumeReq = {0}; if (tDeserializeSMResumeStreamReq(pReq->pCont, pReq->contLen, &resumeReq) < 0) { TAOS_RETURN(TSDB_CODE_INVALID_MSG); } code = mndAcquireStream(pMnode, resumeReq.name, &pStream); if (pStream == NULL || code != 0) { if (resumeReq.igNotExists) { mInfo("stream:%s not exist, not start stream", resumeReq.name); taosMemoryFree(resumeReq.name); sdbRelease(pMnode->pSdb, pStream); return 0; } else { mError("stream:%s not exist, failed to start stream", resumeReq.name); taosMemoryFree(resumeReq.name); TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); } } taosMemoryFree(resumeReq.name); int64_t streamId = pStream->pCreate->streamId; mstsInfo("start to start stream %s from stopped", pStream->name); // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB); if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) { mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pStream->pCreate->streamDB)) || (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_START, PRIV_OBJ_STREAM, pStream->ownerId, pStream->pCreate->streamDB, pStream->pCreate->name))) { mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code)); mndReleaseUser(pMnode, pOperUser); sdbRelease(pMnode->pSdb, pStream); return code; } mndReleaseUser(pMnode, pOperUser); // release user after privilege check if (atomic_load_8(&pStream->userDropped)) { code = TSDB_CODE_MND_STREAM_DROPPING; mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } if (0 == atomic_load_8(&pStream->userStopped)) { code = TSDB_CODE_MND_STREAM_NOT_STOPPED; mstsError("user %s failed to start stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } atomic_store_8(&pStream->userStopped, 0); pStream->updateTime = taosGetTimestampMs(); MND_STREAM_SET_LAST_TS(STM_EVENT_START_STREAM, pStream->updateTime); STrans *pTrans = NULL; code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_START_NAME, &pTrans); if (pTrans == NULL || code) { mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY); if (code != TSDB_CODE_SUCCESS) { mstsError("failed to start stream %s since %s", pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mstsError("trans:%d, failed to prepare start stream %s trans since %s", pTrans->id, pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->name, NULL, true, STREAM_ACT_DEPLOY); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return TSDB_CODE_ACTION_IN_PROGRESS; } static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SUserObj *pOperUser = NULL; int32_t code = 0; int32_t notExistNum = 0; SMDropStreamReq dropReq = {0}; int64_t tss = taosGetTimestampMs(); if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { mError("invalid drop stream msg recv, discarded"); code = TSDB_CODE_INVALID_MSG; TAOS_RETURN(code); } mDebug("recv drop stream msg, count:%d", dropReq.count); // Acquire user object for privilege check code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser); if (code != 0) { tFreeMDropStreamReq(&dropReq); TAOS_RETURN(code); } // check if all streams exist if (!dropReq.igNotExists) { for (int32_t i = 0; i < dropReq.count; i++) { if (!sdbCheckExists(pMnode->pSdb, SDB_STREAM, dropReq.name[i])) { mError("stream:%s not exist failed to drop it", dropReq.name[i]); mndReleaseUser(pMnode, pOperUser); tFreeMDropStreamReq(&dropReq); TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); } } } // Create a single transaction for all stream drops STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME); if (pTrans == NULL) { mError("failed to create drop stream transaction since %s", tstrerror(terrno)); code = terrno; mndReleaseUser(pMnode, pOperUser); tFreeMDropStreamReq(&dropReq); TAOS_RETURN(code); } pTrans->ableToBeKilled = true; // Process all streams and add them to the transaction for (int32_t i = 0; i < dropReq.count; i++) { char *streamName = dropReq.name[i]; mDebug("drop stream[%d/%d]: %s", i + 1, dropReq.count, streamName); code = mndAcquireStream(pMnode, streamName, &pStream); if (pStream == NULL || code != 0) { mWarn("stream:%s not exist, ignore not exist is set, drop stream exec done with success", streamName); sdbRelease(pMnode->pSdb, pStream); pStream = NULL; notExistNum++; continue; } int64_t streamId = pStream->pCreate->streamId; if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pStream->pCreate->streamDB)) || (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_DROP, PRIV_OBJ_STREAM, pStream->ownerId, pStream->pCreate->streamDB, pStream->pCreate->name))) { mstsError("user %s failed to drop stream %s since %s", pReq->info.conn.user, streamName, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); pStream = NULL; mndTransDrop(pTrans); pTrans = NULL; goto _OVER; } if (pStream->pCreate->tsmaId != 0) { mstsDebug("try to drop tsma related stream, tsmaId:%" PRIx64, pStream->pCreate->tsmaId); void *pIter = NULL; SSmaObj *pSma = NULL; pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); while (pIter) { if (pSma && pSma->uid == pStream->pCreate->tsmaId) { sdbRelease(pMnode->pSdb, pSma); sdbRelease(pMnode->pSdb, pStream); pStream = NULL; sdbCancelFetch(pMnode->pSdb, pIter); code = TSDB_CODE_TSMA_MUST_BE_DROPPED; mstsError("refused to drop tsma-related stream %s since tsma still exists", streamName); mndTransDrop(pTrans); pTrans = NULL; goto _OVER; } if (pSma) { sdbRelease(pMnode->pSdb, pSma); } pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); } } mstsInfo("start to drop stream %s", pStream->pCreate->name); pStream->updateTime = taosGetTimestampMs(); atomic_store_8(&pStream->userDropped, 1); MND_STREAM_SET_LAST_TS(STM_EVENT_DROP_STREAM, pStream->updateTime); msmUndeployStream(pMnode, streamId, pStream->pCreate->name); // Append drop stream operation to the transaction code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_DROPPED); if (code) { mstsError("trans:%d, failed to append drop stream %s trans since %s", pTrans->id, streamName, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); pStream = NULL; // mndStreamTransAppend already called mndTransDrop on failure, set pTrans to NULL to avoid double free pTrans = NULL; goto _OVER; } sdbRelease(pMnode->pSdb, pStream); pStream = NULL; mstsDebug("drop stream %s added to transaction", streamName); } // Prepare and execute the transaction for all streams if (notExistNum < dropReq.count) { code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, tstrerror(code)); mndTransDrop(pTrans); goto _OVER; } mInfo("trans:%d, drop stream transaction prepared for %d streams", pTrans->id, dropReq.count - notExistNum); } else { // All streams don't exist, no need to prepare transaction mndTransDrop(pTrans); pTrans = NULL; } if (tsAuditLevel >= AUDIT_LEVEL_DATABASE && notExistNum < dropReq.count) { int64_t tse = taosGetTimestampMs(); double duration = (double)(tse - tss); duration = duration / 1000; // Use first stream's database for audit (assuming all streams are from same db in batch) if (dropReq.count > 0) { SStreamObj *pFirstStream = NULL; if (mndAcquireStream(pMnode, dropReq.name[0], &pFirstStream) == 0 && pFirstStream != NULL) { auditRecord(pReq, pMnode->clusterId, "dropStream", "", pFirstStream->pCreate->streamDB, NULL, 0, duration, 0); sdbRelease(pMnode->pSdb, pFirstStream); } } } // If any stream was successfully added to transaction, return ACTION_IN_PROGRESS // Otherwise, all streams don't exist (and igNotExists is set), return SUCCESS code = (notExistNum < dropReq.count) ? TSDB_CODE_ACTION_IN_PROGRESS : TSDB_CODE_SUCCESS; _OVER: if (pStream) { sdbRelease(pMnode->pSdb, pStream); } if (pTrans) { mndTransDrop(pTrans); } mndReleaseUser(pMnode, pOperUser); tFreeMDropStreamReq(&dropReq); TAOS_RETURN(code); } static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SStreamObj streamObj = {0}; SUserObj *pOperUser = NULL; int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; STrans *pTrans = NULL; uint64_t streamId = 0; SCMCreateStreamReq* pCreate = NULL; int64_t tss = taosGetTimestampMs(); if ((code = grantCheck(TSDB_GRANT_STREAMS)) < 0) { goto _OVER; } #ifdef WINDOWS code = TSDB_CODE_MND_INVALID_PLATFORM; goto _OVER; #endif pCreate = taosMemoryCalloc(1, sizeof(SCMCreateStreamReq)); TSDB_CHECK_NULL(pCreate, code, lino, _OVER, terrno); code = tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, pCreate); TSDB_CHECK_CODE(code, lino, _OVER); streamId = pCreate->streamId; mstsInfo("start to create stream %s, sql:%s", pCreate->name, pCreate->sql); int32_t snodeId = msmAssignRandomSnodeId(pMnode, streamId); if (!GOT_SNODE(snodeId)) { code = terrno; TSDB_CHECK_CODE(code, lino, _OVER); } code = mndAcquireStream(pMnode, pCreate->name, &pStream); if (pStream != NULL && code == 0) { if (pCreate->igExists) { mstsInfo("stream %s already exist, ignore exist is set", pCreate->name); } else { code = TSDB_CODE_MND_STREAM_ALREADY_EXIST; } mndReleaseStream(pMnode, pStream); goto _OVER; } else if (code != TSDB_CODE_MND_STREAM_NOT_EXIST) { goto _OVER; } code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser); if (pOperUser == NULL) { TSDB_CHECK_CODE(TSDB_CODE_MND_NO_USER_FROM_CONN, lino, _OVER); } code = mndStreamValidateCreate(pMnode, pReq, pCreate); TSDB_CHECK_CODE(code, lino, _OVER); mndStreamBuildObj(pMnode, &streamObj, pCreate, pOperUser, snodeId); pCreate = NULL; pStream = &streamObj; code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_DB, MND_STREAM_CREATE_NAME, &pTrans); if (pTrans == NULL || code) { goto _OVER; } // create stb for stream if (TSDB_SUPER_TABLE == pStream->pCreate->outTblType && !pStream->pCreate->outStbExists) { pStream->pCreate->outStbUid = mndGenerateUid(pStream->pCreate->outTblName, strlen(pStream->pCreate->outTblName)); code = mndStreamCreateOutStb(pMnode, pTrans, pStream->pCreate, RPC_MSG_USER(pReq)); TSDB_CHECK_CODE(code, lino, _OVER); } // add stream to trans code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mstsError("failed to persist stream %s since %s", pStream->pCreate->name, tstrerror(code)); goto _OVER; } // execute creation code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mstsError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code)); goto _OVER; } code = TSDB_CODE_ACTION_IN_PROGRESS; if (tsAuditLevel >= AUDIT_LEVEL_DATABASE) { int64_t tse = taosGetTimestampMs(); double duration = (double)(tse - tss); duration = duration / 1000; auditRecord(pReq, pMnode->clusterId, "createStream", pStream->pCreate->streamDB, pStream->pCreate->name, pStream->pCreate->sql, strlen(pStream->pCreate->sql), duration, 0); } MND_STREAM_SET_LAST_TS(STM_EVENT_CREATE_STREAM, taosGetTimestampMs()); mstPostStreamAction(mStreamMgmt.actionQ, streamId, pStream->pCreate->name, NULL, true, STREAM_ACT_DEPLOY); _OVER: if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (pStream && pStream->pCreate) { mstsError("failed to create stream %s at line:%d since %s", pStream->pCreate->name, lino, tstrerror(code)); } else { mstsError("failed to create stream at line:%d since %s", lino, tstrerror(code)); } } else { mstsDebug("create stream %s half completed", pStream->pCreate ? pStream->pCreate->name : "unknown"); } tFreeSCMCreateStreamReq(pCreate); taosMemoryFreeClear(pCreate); mndTransDrop(pTrans); tFreeStreamObj(&streamObj); mndReleaseUser(pMnode, pOperUser); return code; } static int32_t mndProcessRecalcStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; SUserObj *pOperUser = NULL; int32_t code = 0; int64_t tss = taosGetTimestampMs(); if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { return code; } SMRecalcStreamReq recalcReq = {0}; if (tDeserializeSMRecalcStreamReq(pReq->pCont, pReq->contLen, &recalcReq) < 0) { tFreeMRecalcStreamReq(&recalcReq); TAOS_RETURN(TSDB_CODE_INVALID_MSG); } code = mndAcquireStream(pMnode, recalcReq.name, &pStream); if (pStream == NULL || code != 0) { mError("stream:%s not exist, failed to recalc stream", recalcReq.name); tFreeMRecalcStreamReq(&recalcReq); TAOS_RETURN(TSDB_CODE_MND_STREAM_NOT_EXIST); } int64_t streamId = pStream->pCreate->streamId; mstsInfo("start to recalc stream %s", recalcReq.name); // code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), MND_OPER_WRITE_DB, pStream->pCreate->streamDB); // if (code != TSDB_CODE_SUCCESS) { // mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code)); // sdbRelease(pMnode->pSdb, pStream); // tFreeMRecalcStreamReq(&recalcReq); // return code; // } if ((code = mndAcquireUser(pMnode, RPC_MSG_USER(pReq), &pOperUser))) { mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); tFreeMRecalcStreamReq(&recalcReq); TAOS_RETURN(code); } if ((code = mndCheckDbPrivilegeByName(pMnode, RPC_MSG_USER(pReq), RPC_MSG_TOKEN(pReq), MND_OPER_USE_DB, pStream->pCreate->streamDB)) || (code = mndCheckObjPrivilegeRecF(pMnode, pOperUser, PRIV_CM_RECALC, PRIV_OBJ_STREAM, pStream->ownerId, pStream->pCreate->streamDB, pStream->pCreate->name))) { mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), pStream->name, tstrerror(code)); mndReleaseUser(pMnode, pOperUser); sdbRelease(pMnode->pSdb, pStream); tFreeMRecalcStreamReq(&recalcReq); return code; } mndReleaseUser(pMnode, pOperUser); // release user after privilege check if (atomic_load_8(&pStream->userDropped)) { code = TSDB_CODE_MND_STREAM_DROPPING; mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); tFreeMRecalcStreamReq(&recalcReq); return code; } if (atomic_load_8(&pStream->userStopped)) { code = TSDB_CODE_MND_STREAM_STOPPED; mstsError("user %s failed to recalc stream %s since %s", RPC_MSG_USER(pReq), recalcReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); tFreeMRecalcStreamReq(&recalcReq); return code; } if (WINDOW_TYPE_PERIOD == pStream->pCreate->triggerType) { code = TSDB_CODE_OPS_NOT_SUPPORT; mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); tFreeMRecalcStreamReq(&recalcReq); return code; } /* pStream->updateTime = taosGetTimestampMs(); STrans *pTrans = NULL; code = mndStreamCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_RECALC_NAME, &pTrans); if (pTrans == NULL || code) { mstsError("failed to recalc stream %s since %s", recalcReq.name, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); return code; } // stop stream code = mndStreamTransAppend(pStream, pTrans, SDB_STATUS_READY); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare stop stream trans since %s", pTrans->id, tstrerror(code)); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } */ code = msmRecalcStream(pMnode, pStream->pCreate->streamId, &recalcReq.timeRange); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pMnode->pSdb, pStream); tFreeMRecalcStreamReq(&recalcReq); return code; } if (tsAuditLevel >= AUDIT_LEVEL_DATABASE){ char buf[128]; snprintf(buf, sizeof(buf), "start:%" PRId64 ", end:%" PRId64, recalcReq.timeRange.skey, recalcReq.timeRange.ekey); int64_t tse = taosGetTimestampMs(); double duration = (double)(tse - tss); duration = duration / 1000; auditRecord(pReq, pMnode->clusterId, "recalcStream", pStream->name, recalcReq.name, buf, strlen(buf), duration, 0); } sdbRelease(pMnode->pSdb, pStream); tFreeMRecalcStreamReq(&recalcReq); // mndTransDrop(pTrans); return TSDB_CODE_SUCCESS; } int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STREAM, .keyType = SDB_KEY_BINARY, .encodeFp = (SdbEncodeFp)mndStreamActionEncode, .decodeFp = (SdbDecodeFp)mndStreamActionDecode, .insertFp = (SdbInsertFp)mndStreamActionInsert, .updateFp = (SdbUpdateFp)mndStreamActionUpdate, .deleteFp = (SdbDeleteFp)mndStreamActionDelete, }; if (!tsDisableStream) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_START_STREAM, mndProcessStartStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_STOP_STREAM, mndProcessStopStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_MND_RECALC_STREAM, mndProcessRecalcStreamReq); } mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndRetrieveStreamRecalculates); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_RECALCULATES, mndCancelGetNextStreamRecalculates); int32_t code = sdbSetTable(pMnode->pSdb, table); if (code) { return code; } //code = sdbSetTable(pMnode->pSdb, tableSeq); return code; }