/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "mndStream.h" #include "mndTrans.h" typedef struct SFailedCheckpointInfo { int64_t streamUid; int64_t checkpointId; int32_t transId; } SFailedCheckpointInfo; static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode); static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList); static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId); static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage); static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo); static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList); static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info); static bool validateHbMsg(const SArray *pNodeList, int32_t vgId); static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks); static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId); void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int32_t j = 0; j < numOfNodes; ++j) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, j); if (pNodeEntry == NULL) { continue; } if (pNodeEntry->nodeId == pTaskEntry->nodeId) { mInfo("vgId:%d stage updated from %" PRId64 " to %" PRId64 ", nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId, pTaskEntry->stage, stage, pTaskEntry->id.taskId); pNodeEntry->stageUpdated = true; pTaskEntry->stage = stage; break; } } } void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) { int32_t num = taosArrayGetSize(pList); for (int32_t i = 0; i < num; ++i) { SFailedCheckpointInfo *p = taosArrayGet(pList, i); if (p && (p->transId == pInfo->transId)) { return; } } void* p = taosArrayPush(pList, pInfo); if (p == NULL) { mError("failed to push failed checkpoint info checkpointId:%" PRId64 " in list", pInfo->checkpointId); } } int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { STrans *pTrans = NULL; int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME, " reset from failed checkpoint", &pTrans); if (pTrans == NULL || code) { sdbRelease(pMnode->pSdb, pStream); return terrno; } code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream); if (code) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); if (code != TSDB_CODE_SUCCESS) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } code = mndTransPrepare(pMnode, pTrans); if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return code; } int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId) { int32_t size = sizeof(SStreamTaskResetMsg); int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans); for(int32_t i = 0; i < num; ++i) { SStreamTaskResetMsg* p = taosArrayGet(execInfo.pKilledChkptTrans, i); if (p == NULL) { continue; } if (p->transId == transId && p->streamId == streamId) { mDebug("already reset stream:0x%" PRIx64 ", not send reset-msg again for transId:%d", streamId, transId); return TSDB_CODE_SUCCESS; } } if (num >= 10) { taosArrayRemove(execInfo.pKilledChkptTrans, 0); // remove this first, append new reset trans in the tail } SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId}; void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p); if (px == NULL) { mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1); return terrno; } SStreamTaskResetMsg *pReq = rpcMallocCont(size); if (pReq == NULL) { return terrno; } pReq->streamId = streamId; pReq->transId = transId; SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size}; int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); if (code) { mError("failed to put reset-task msg into write queue, code:%s", tstrerror(code)); } else { mDebug("send reset task status msg for transId:%d succ", transId); } return code; } int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode) { // here reuse the doCheckpointmsg int32_t size = sizeof(SMStreamDoCheckpointMsg); void *pMsg = rpcMallocCont(size); if (pMsg == NULL) { return terrno; } SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size}; int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); if (code) { mError("failed to put update-checkpoint-info msg into write queue, code:%s", tstrerror(code)); } else { mDebug("send update checkpoint-info msg succ"); } return code; } int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList) { SMStreamDropOrphanMsg msg = {.pList = pList}; int32_t num = taosArrayGetSize(pList); int32_t contLen = tSerializeDropOrphanTaskMsg(NULL, 0, &msg); if (contLen <= 0) { return terrno; } void *pReq = rpcMallocCont(contLen); if (pReq == NULL) { return terrno; } (void)tSerializeDropOrphanTaskMsg(pReq, contLen, &msg); SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_DROP_ORPHANTASKS, .pCont = pReq, .contLen = contLen}; int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); if (code) { mError("failed to put drop-orphan task msg into write queue, code:%s", tstrerror(code)); } else { mDebug("send drop %d orphan tasks msg succ", num); } return code; } int32_t mndProcessResetStatusReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; int32_t code = TSDB_CODE_SUCCESS; SStreamObj *pStream = NULL; SStreamTaskResetMsg* pMsg = pReq->pCont; mndKillTransImpl(pMnode, pMsg->transId, ""); code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream); if (pStream == NULL || code != 0) { code = TSDB_CODE_STREAM_TASK_NOT_EXIST; mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); } else { bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); if (conflict) { mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name, pStream->sourceDb, pStream->targetSTbName); } else { mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, pStream->uid, pMsg->transId); code = mndCreateStreamResetStatusTrans(pMnode, pStream); } } mndReleaseStream(pMnode, pStream); return code; } int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { int32_t num = taosArrayGetSize(pNodeList); mInfo("set node expired for %d nodes", num); for (int k = 0; k < num; ++k) { int32_t *pVgId = taosArrayGet(pNodeList, k); if (pVgId == NULL) { continue; } mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num); bool setFlag = false; int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList); for (int i = 0; i < numOfNodes; ++i) { SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); if ((pNodeEntry) && (pNodeEntry->nodeId == *pVgId)) { mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId); pNodeEntry->stageUpdated = true; setFlag = true; break; } } if (!setFlag) { mError("failed to set nodeUpdate flag, nodeId:%d not exists in nodelist", *pVgId); return TSDB_CODE_FAILED; } } return TSDB_CODE_SUCCESS; } int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; int32_t code = 0; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; if (pStream->status != STREAM_STATUS__PAUSE) { SMPauseStreamReq reqPause = {0}; strcpy(reqPause.name, pStream->name); reqPause.igNotExists = 1; int32_t contLen = tSerializeSMPauseStreamReq(NULL, 0, &reqPause); void *pHead = rpcMallocCont(contLen); if (pHead == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; sdbRelease(pSdb, pStream); continue; } code = tSerializeSMPauseStreamReq(pHead, contLen, &reqPause); if (code) { sdbRelease(pSdb, pStream); continue; } SRpcMsg rpcMsg = { .msgType = TDMT_MND_PAUSE_STREAM, .pCont = pHead, .contLen = contLen, .info = *info, }; code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); mInfo("receive pause stream:%s, %s, %" PRId64 ", because grant expired, code:%s", pStream->name, reqPause.name, pStream->uid, tstrerror(code)); } sdbRelease(pSdb, pStream); } return code; } int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; SArray *pFailedChkpt = NULL; SArray *pOrphanTasks = NULL; int32_t code = 0; if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if (suspendAllStreams(pMnode, &pReq->info) < 0) { return code; } } SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); if (tDecodeStreamHbMsg(&decoder, &req) < 0) { tCleanupStreamHbMsg(&req); tDecoderClear(&decoder); TAOS_RETURN(TSDB_CODE_INVALID_MSG); } tDecoderClear(&decoder); mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); if (pFailedChkpt == NULL || pOrphanTasks == NULL) { taosArrayDestroy(pFailedChkpt); taosArrayDestroy(pOrphanTasks); TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } streamMutexLock(&execInfo.lock); mndInitStreamExecInfo(pMnode, &execInfo); if (!validateHbMsg(execInfo.pNodeList, req.vgId)) { mError("vgId:%d not exists in nodeList buf, discarded", req.vgId); doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId); streamMutexUnlock(&execInfo.lock); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); TAOS_RETURN(TSDB_CODE_INVALID_MSG); } int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); (void) setNodeEpsetExpiredFlag(req.pUpdateNodes); } bool snodeChanged = false; for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); if (p == NULL) { continue; } STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list, added into orphan task list", p->id.taskId); SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; void* px = taosArrayPush(pOrphanTasks, &oTask); if (px == NULL) { mError("failed to put task into list, taskId:0x%" PRIx64, p->id.taskId); } continue; } STaskCkptInfo *pChkInfo = &p->checkpointInfo; if (pChkInfo->consensusChkptId != 0) { SRestoreCheckpointInfo cp = { .streamId = p->id.streamId, .taskId = p->id.taskId, .checkpointId = p->checkpointInfo.latestId, .startTs = pChkInfo->consensusTs, }; SStreamObj *pStream = NULL; code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); if (code) { continue; } int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); SCheckpointConsensusInfo *pInfo = NULL; code = mndGetConsensusInfo(execInfo.pStreamConsensus, p->id.streamId, numOfTasks, &pInfo); if (code == 0) { mndAddConsensusTasks(pInfo, &cp); } else { mError("failed to get consensus checkpoint-info"); } mndReleaseStream(pMnode, pStream); } if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); if (pTaskEntry->nodeId == SNODE_HANDLE) { snodeChanged = true; } } else { streamTaskStatusCopy(pTaskEntry, p); if ((pChkInfo->activeId != 0) && pChkInfo->failed) { mError("stream task:0x%" PRIx64 " checkpointId:%" PRId64 " transId:%d failed, kill it", p->id.taskId, pChkInfo->activeId, pChkInfo->activeTransId); SFailedCheckpointInfo info = { .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; addIntoCheckpointList(pFailedChkpt, &info); // remove failed trans from pChkptStreams code = taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId)); if (code) { mError("failed to remove stream:0x%"PRIx64" in checkpoint stream list", p->id.streamId); } } } if (p->status != TASK_STATUS__READY) { mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); } } // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal if (taosArrayGetSize(pFailedChkpt) > 0) { bool allReady = true; if (pMnode != NULL) { SArray *p = NULL; code = mndTakeVgroupSnapshot(pMnode, &allReady, &p); taosArrayDestroy(p); if (code) { mError("failed to get the vgroup snapshot, ignore it and continue"); } } else { allReady = false; } if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal for (int32_t i = 0; i < taosArrayGetSize(pFailedChkpt); ++i) { SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedChkpt, i); if (pInfo == NULL) { continue; } mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", pInfo->checkpointId, pInfo->transId); code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId); if (code) { mError("failed to create reset task trans, code:%s", tstrerror(code)); } } } else { mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status"); } } // handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors. if (taosArrayGetSize(pOrphanTasks) > 0) { code = mndSendDropOrphanTasksMsg(pMnode, pOrphanTasks); if (code) { mError("failed to send drop orphan tasks msg, code:%s, try next time", tstrerror(code)); } } if (pMnode != NULL) { // make sure that the unit test case can work mndStreamSendUpdateChkptInfoMsg(pMnode); } streamMutexUnlock(&execInfo.lock); doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, req.vgId, req.msgId); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); return code; } bool validateHbMsg(const SArray *pNodeList, int32_t vgId) { for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { SNodeEntry *pEntry = taosArrayGet(pNodeList, i); if ((pEntry) && (pEntry->nodeId == vgId)) { return true; } } return false; } void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks) { tCleanupStreamHbMsg(pReq); taosArrayDestroy(pFailedChkptList); taosArrayDestroy(pOrphanTasks); } void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) { SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)}; rsp.pCont = rpcMallocCont(rsp.contLen); SMStreamHbRspMsg *pMsg = rsp.pCont; pMsg->head.vgId = htonl(vgId); pMsg->msgId = msgId; tmsgSendRsp(&rsp); pRpcInfo->handle = NULL; // disable auto rsp }