TDengine/source/dnode/mnode/impl/src/mndScheduler.c

558 lines
16 KiB
C
Raw Normal View History

2022-03-07 08:30:28 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
2022-03-22 10:00:03 +00:00
#include "mndSnode.h"
2022-03-07 08:30:28 +00:00
#include "mndStb.h"
2022-03-15 12:04:19 +00:00
#include "mndStream.h"
2022-03-07 08:30:28 +00:00
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
2022-03-15 12:04:19 +00:00
#include "tuuid.h"
2022-03-23 02:44:32 +00:00
extern bool tsStreamSchedV;
2022-04-21 06:19:58 +00:00
int32_t mndConvertRSmaTask(const char* ast, int8_t triggerType, int64_t watermark, char** pStr, int32_t* pLen) {
SNode* pAst = NULL;
SQueryPlan* pPlan = NULL;
terrno = TSDB_CODE_SUCCESS;
if (nodesStringToNode(ast, &pAst) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.rSmaQuery = true,
.triggerType = triggerType,
.watermark = watermark,
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
int32_t opNum = LIST_LENGTH(inner->pNodeList);
if (opNum != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
if (qSubPlanToString(plan, pStr, pLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
goto END;
}
END:
if (pAst) nodesDestroyNode(pAst);
if (pPlan) nodesDestroyNode(pPlan);
return terrno;
}
2022-03-23 06:47:24 +00:00
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
2022-05-07 10:03:06 +00:00
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
2022-03-21 08:35:27 +00:00
tEncodeSStreamTask(&encoder, pTask);
2022-03-23 06:47:24 +00:00
int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
2022-05-07 10:03:06 +00:00
tEncoderClear(&encoder);
2022-03-25 16:29:53 +00:00
void* buf = taosMemoryMalloc(tlen);
2022-03-21 08:35:27 +00:00
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
2022-03-25 12:19:09 +00:00
((SMsgHead*)buf)->vgId = htonl(nodeId);
2022-03-21 08:35:27 +00:00
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
2022-05-07 10:03:06 +00:00
tEncoderInit(&encoder, abuf, size);
2022-03-21 08:35:27 +00:00
tEncodeSStreamTask(&encoder, pTask);
2022-05-07 10:03:06 +00:00
tEncoderClear(&encoder);
2022-03-21 08:35:27 +00:00
STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
action.pCont = buf;
action.contLen = tlen;
2022-03-22 10:00:03 +00:00
action.msgType = type;
2022-03-21 08:35:27 +00:00
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
2022-03-25 16:29:53 +00:00
taosMemoryFree(buf);
2022-03-21 08:35:27 +00:00
return -1;
}
return 0;
}
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen;
2022-03-26 06:50:26 +00:00
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
2022-03-21 08:35:27 +00:00
plan->execNode.nodeId = pVgroup->vgId;
2022-03-26 06:50:26 +00:00
plan->execNode.epSet = pTask->epSet;
2022-03-21 08:35:27 +00:00
2022-03-25 12:19:09 +00:00
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
2022-03-21 08:35:27 +00:00
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
2022-03-23 06:47:24 +00:00
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
2022-03-21 08:35:27 +00:00
return 0;
}
2022-03-22 10:00:03 +00:00
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
SSnodeObj* pObj = NULL;
pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
return pObj;
}
2022-03-21 08:35:27 +00:00
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
const SSnodeObj* pSnode) {
2022-03-22 10:00:03 +00:00
int32_t msgLen;
2022-03-26 06:50:26 +00:00
pTask->nodeId = 0;
pTask->epSet = mndAcquireEpFromSnode(pMnode, pSnode);
plan->execNode.nodeId = 0;
plan->execNode.epSet = pTask->epSet;
2022-03-22 10:00:03 +00:00
2022-03-25 12:19:09 +00:00
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
2022-03-22 10:00:03 +00:00
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
2022-03-23 06:47:24 +00:00
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
2022-03-21 08:35:27 +00:00
return 0;
}
2022-03-26 06:50:26 +00:00
SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
void* pIter = NULL;
SVgObj* pVgroup = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != dbUid) {
sdbRelease(pMnode->pSdb, pVgroup);
continue;
}
return pVgroup;
}
return pVgroup;
}
2022-03-29 02:40:13 +00:00
int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
2022-03-28 12:31:05 +00:00
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
ASSERT(taosArrayGetSize(pStream->tasks) == 1);
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosArrayPush(tasks, &pTask);
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// source
pTask->sourceType = TASK_SOURCE__MERGE;
// exec
pTask->execType = TASK_EXEC__NONE;
// sink
2022-03-29 02:40:13 +00:00
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
2022-03-28 12:31:05 +00:00
pTask->sinkType = TASK_SINK__SMA;
2022-03-29 02:40:13 +00:00
pTask->smaSink.smaId = pStream->smaId;
2022-03-28 12:31:05 +00:00
} else {
pTask->sinkType = TASK_SINK__TABLE;
2022-05-07 15:19:05 +00:00
pTask->tbSink.stbUid = pStream->targetStbUid;
2022-05-05 10:57:09 +00:00
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
ASSERT(pTask->tbSink.pSchemaWrapper);
2022-03-28 12:31:05 +00:00
}
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
}
return 0;
}
2022-03-29 02:40:13 +00:00
int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
ASSERT(pStream->fixedSinkVgId != 0);
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosArrayPush(tasks, &pTask);
pTask->nodeId = pStream->fixedSinkVgId;
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
if (pVgroup == NULL) {
return -1;
}
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// source
pTask->sourceType = TASK_SOURCE__MERGE;
// exec
pTask->execType = TASK_EXEC__NONE;
// sink
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->sinkType = TASK_SINK__TABLE;
2022-05-07 15:19:05 +00:00
pTask->tbSink.stbUid = pStream->targetStbUid;
2022-05-05 10:57:09 +00:00
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
2022-03-29 02:40:13 +00:00
}
2022-05-07 15:19:05 +00:00
2022-03-29 02:40:13 +00:00
// dispatch
pTask->dispatchType = TASK_DISPATCH__NONE;
mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
return 0;
}
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
2022-03-15 12:04:19 +00:00
SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
ASSERT(pStream->vgNum == 0);
2022-03-16 10:29:31 +00:00
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
2022-03-26 06:50:26 +00:00
ASSERT(totLevel <= 2);
pStream->tasks = taosArrayInit(totLevel, sizeof(void*));
2022-03-15 12:04:19 +00:00
2022-03-28 12:31:05 +00:00
bool hasExtraSink = false;
if (totLevel == 2) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
hasExtraSink = true;
2022-03-29 02:40:13 +00:00
if (pStream->fixedSinkVgId == 0) {
mndAddShuffledSinkToStream(pMnode, pTrans, pStream);
} else {
mndAddFixedSinkToStream(pMnode, pTrans, pStream);
}
2022-03-28 12:31:05 +00:00
}
2022-03-26 06:50:26 +00:00
for (int32_t level = 0; level < totLevel; level++) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
ASSERT(LIST_LENGTH(inner->pNodeList) == 1);
2022-03-15 12:04:19 +00:00
2022-03-23 09:00:21 +00:00
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
2022-03-26 06:50:26 +00:00
// if (level == totLevel - 1 /* or no snode */) {
if (level == totLevel - 1) {
// last level, source, must assign to vnode
// must be scan type
2022-03-23 06:47:24 +00:00
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
2022-03-26 06:50:26 +00:00
// replicate task to each vnode
2022-03-15 12:04:19 +00:00
void* pIter = NULL;
while (1) {
2022-03-26 06:50:26 +00:00
SVgObj* pVgroup;
2022-03-15 12:04:19 +00:00
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
2022-03-26 06:50:26 +00:00
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
// source part
pTask->sourceType = TASK_SOURCE__SCAN;
2022-03-15 12:04:19 +00:00
2022-03-26 06:50:26 +00:00
// sink part
if (level == 0) {
// only for inplace
2022-04-28 07:53:12 +00:00
pTask->sinkType = TASK_SINK__NONE;
2022-03-28 12:31:05 +00:00
if (!hasExtraSink) {
2022-03-29 02:40:13 +00:00
#if 1
if (pStream->createdBy == STREAM_CREATED_BY__SMA) {
2022-03-28 12:31:05 +00:00
pTask->sinkType = TASK_SINK__SMA;
2022-03-29 02:40:13 +00:00
pTask->smaSink.smaId = pStream->smaId;
2022-03-28 12:31:05 +00:00
} else {
pTask->sinkType = TASK_SINK__TABLE;
2022-05-07 15:19:05 +00:00
pTask->tbSink.stbUid = pStream->targetStbUid;
2022-05-05 10:57:09 +00:00
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
2022-03-28 12:31:05 +00:00
}
2022-03-29 02:40:13 +00:00
#endif
2022-03-28 06:40:22 +00:00
}
2022-03-26 06:50:26 +00:00
} else {
pTask->sinkType = TASK_SINK__NONE;
}
2022-03-16 10:29:31 +00:00
2022-03-26 06:50:26 +00:00
// dispatch part
if (level == 0) {
pTask->dispatchType = TASK_DISPATCH__NONE;
} else {
// add fixed ep dispatcher
int32_t lastLevel = level - 1;
ASSERT(lastLevel == 0);
2022-03-28 12:31:05 +00:00
if (hasExtraSink) lastLevel++;
2022-03-26 06:50:26 +00:00
SArray* pArray = taosArrayGetP(pStream->tasks, lastLevel);
// one merge only
ASSERT(taosArrayGetSize(pArray) == 1);
2022-03-28 12:31:05 +00:00
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
2022-03-26 06:50:26 +00:00
pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
pTask->dispatchType = TASK_DISPATCH__FIXED;
2022-03-28 03:47:39 +00:00
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
2022-03-26 06:50:26 +00:00
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
// exec part
pTask->execType = TASK_EXEC__PIPE;
2022-03-25 12:19:09 +00:00
pTask->exec.parallelizable = 1;
2022-03-21 08:35:27 +00:00
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
2022-03-16 10:29:31 +00:00
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
2022-03-26 06:50:26 +00:00
sdbRelease(pSdb, pVgroup);
taosArrayPush(taskOneLevel, &pTask);
2022-03-15 12:04:19 +00:00
}
2022-03-21 08:35:27 +00:00
} else {
2022-03-26 06:50:26 +00:00
// merge plan
// TODO if has snode, assign to snode
// else, assign to vnode
ASSERT(plan->subplanType == SUBPLAN_TYPE_MERGE);
2022-03-25 12:19:09 +00:00
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
2022-03-26 06:50:26 +00:00
// source part, currently only support multi source
pTask->sourceType = TASK_SOURCE__PIPE;
// sink part
2022-04-28 07:53:12 +00:00
pTask->sinkType = TASK_SINK__NONE;
2022-03-26 06:50:26 +00:00
// dispatch part
2022-03-28 12:31:05 +00:00
ASSERT(hasExtraSink);
/*pTask->dispatchType = TASK_DISPATCH__NONE;*/
#if 1
if (hasExtraSink) {
// add dispatcher
2022-03-29 02:40:13 +00:00
if (pStream->fixedSinkVgId == 0) {
pTask->dispatchType = TASK_DISPATCH__SHUFFLE;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
2022-04-28 05:36:43 +00:00
SDbObj* pDb = mndAcquireDb(pMnode, pStream->sourceDb);
2022-03-29 02:40:13 +00:00
ASSERT(pDb);
if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) {
sdbRelease(pSdb, pDb);
qDestroyQueryPlan(pPlan);
return -1;
}
2022-03-28 12:31:05 +00:00
sdbRelease(pSdb, pDb);
2022-03-29 02:40:13 +00:00
// put taskId to useDbRsp
// TODO: optimize
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
int32_t sinkLvSize = taosArrayGetSize(sinkLv);
for (int32_t i = 0; i < sz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
/*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->taskId;
/*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/
break;
}
2022-03-28 12:31:05 +00:00
}
}
2022-03-29 02:40:13 +00:00
} else {
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
ASSERT(taosArrayGetSize(pArray) == 1);
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
2022-03-28 12:31:05 +00:00
}
2022-03-28 09:22:18 +00:00
}
#endif
2022-03-26 06:50:26 +00:00
// exec part
pTask->execType = TASK_EXEC__MERGE;
pTask->exec.parallelizable = 0;
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->dbUid);
ASSERT(pVgroup);
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pVgroup);
taosArrayPush(taskOneLevel, &pTask);
}
taosArrayPush(pStream->tasks, &taskOneLevel);
}
if (totLevel == 2) {
void* pIter = NULL;
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
2022-03-17 09:48:21 +00:00
}
2022-03-26 06:50:26 +00:00
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
2022-03-23 02:44:32 +00:00
2022-03-26 06:50:26 +00:00
// source part
pTask->sourceType = TASK_SOURCE__MERGE;
// sink part
2022-04-28 07:53:12 +00:00
pTask->sinkType = TASK_SINK__NONE;
2022-03-26 06:50:26 +00:00
// dispatch part
pTask->dispatchType = TASK_DISPATCH__NONE;
// exec part
pTask->execType = TASK_EXEC__NONE;
pTask->exec.parallelizable = 0;
2022-03-15 12:04:19 +00:00
}
}
2022-03-26 06:50:26 +00:00
// free memory
2022-03-24 08:18:08 +00:00
qDestroyQueryPlan(pPlan);
2022-03-26 06:50:26 +00:00
2022-03-15 12:04:19 +00:00
return 0;
}
2022-03-07 08:30:28 +00:00
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
2022-03-15 12:04:19 +00:00
SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL;
2022-04-25 07:40:25 +00:00
SQueryPlan* pPlan = NULL;
SSubplan* plan = NULL;
2022-04-26 09:08:42 +00:00
2022-04-25 07:40:25 +00:00
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
pPlan = qStringToQueryPlan(pTopic->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
2022-03-07 08:30:28 +00:00
2022-04-25 07:40:25 +00:00
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) {
qDestroyQueryPlan(pPlan);
2022-04-27 09:15:02 +00:00
terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
2022-04-25 07:40:25 +00:00
return -1;
}
2022-03-07 08:30:28 +00:00
2022-04-25 07:40:25 +00:00
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
2022-03-07 09:53:02 +00:00
2022-04-25 07:40:25 +00:00
int32_t opNum = LIST_LENGTH(inner->pNodeList);
if (opNum != 1) {
qDestroyQueryPlan(pPlan);
2022-04-27 09:15:02 +00:00
terrno = TSDB_CODE_MND_INVALID_TOPIC_QUERY;
2022-04-25 07:40:25 +00:00
return -1;
}
plan = nodesListGetNode(inner->pNodeList, 0);
2022-03-07 09:53:02 +00:00
}
2022-03-07 08:30:28 +00:00
ASSERT(pSub->unassignedVgs);
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
2022-03-07 08:30:28 +00:00
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
pSub->vgNum++;
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
2022-04-25 07:40:25 +00:00
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
pVgEp->vgId = pVgroup->vgId;
taosArrayPush(pSub->unassignedVgs, &pVgEp);
2022-04-26 09:08:42 +00:00
mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
2022-04-02 15:03:12 +00:00
2022-04-25 07:40:25 +00:00
if (pTopic->subType == TOPIC_SUB_TYPE__TABLE) {
int32_t msgLen;
plan->execNode.epSet = pVgEp->epSet;
plan->execNode.nodeId = pVgEp->vgId;
if (qSubPlanToString(plan, &pVgEp->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
} else {
pVgEp->qmsg = strdup("");
2022-03-08 09:22:21 +00:00
}
2022-03-07 08:30:28 +00:00
}
ASSERT(pSub->unassignedVgs->size > 0);
2022-04-20 13:36:55 +00:00
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
2022-04-20 13:36:55 +00:00
2022-03-10 02:35:24 +00:00
qDestroyQueryPlan(pPlan);
2022-03-08 09:22:21 +00:00
2022-03-07 08:30:28 +00:00
return 0;
}