TDengine/source/dnode/mnode/impl/src/mndScheduler.c

246 lines
7.4 KiB
C
Raw Normal View History

2022-03-07 08:30:28 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
2022-03-22 10:00:03 +00:00
#include "mndSnode.h"
2022-03-07 08:30:28 +00:00
#include "mndStb.h"
2022-03-15 12:04:19 +00:00
#include "mndStream.h"
2022-03-07 08:30:28 +00:00
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
2022-03-15 12:04:19 +00:00
#include "tuuid.h"
2022-03-23 02:44:32 +00:00
extern bool tsStreamSchedV;
2022-03-23 06:47:24 +00:00
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
2022-03-21 08:35:27 +00:00
SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask);
2022-03-23 06:47:24 +00:00
int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
2022-03-21 08:35:27 +00:00
tCoderClear(&encoder);
void* buf = malloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
2022-03-23 06:47:24 +00:00
((SMsgHead*)buf)->streamTaskId = htonl(nodeId);
2022-03-21 08:35:27 +00:00
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
2022-03-23 06:47:24 +00:00
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
2022-03-21 08:35:27 +00:00
tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder);
STransAction action = {0};
memcpy(&action.epSet, pEpSet, sizeof(SEpSet));
action.pCont = buf;
action.contLen = tlen;
2022-03-22 10:00:03 +00:00
action.msgType = type;
2022-03-21 08:35:27 +00:00
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
rpcFreeCont(buf);
return -1;
}
return 0;
}
int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen;
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
2022-03-23 06:47:24 +00:00
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
2022-03-21 08:35:27 +00:00
return 0;
}
2022-03-22 10:00:03 +00:00
SSnodeObj* mndSchedFetchSnode(SMnode* pMnode) {
SSnodeObj* pObj = NULL;
pObj = sdbFetch(pMnode->pSdb, SDB_SNODE, NULL, (void**)&pObj);
return pObj;
}
2022-03-21 08:35:27 +00:00
int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SSubplan* plan,
const SSnodeObj* pSnode) {
2022-03-22 10:00:03 +00:00
int32_t msgLen;
plan->execNode.nodeId = pSnode->id;
plan->execNode.epSet = mndAcquireEpFromSnode(pMnode, pSnode);
if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
2022-03-23 06:47:24 +00:00
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
2022-03-21 08:35:27 +00:00
return 0;
}
2022-03-16 10:29:31 +00:00
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
2022-03-15 12:04:19 +00:00
SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
ASSERT(pStream->vgNum == 0);
2022-03-16 10:29:31 +00:00
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(totLevel, sizeof(SArray));
2022-03-23 02:44:32 +00:00
int32_t lastUsedVgId = 0;
2022-03-15 12:04:19 +00:00
2022-03-16 10:29:31 +00:00
for (int32_t level = 0; level < totLevel; level++) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask));
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level);
2022-03-15 12:04:19 +00:00
int32_t opNum = LIST_LENGTH(inner->pNodeList);
ASSERT(opNum == 1);
2022-03-23 09:00:21 +00:00
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
2022-03-16 10:29:31 +00:00
if (level == 0) {
2022-03-23 06:47:24 +00:00
ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
2022-03-15 12:04:19 +00:00
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pStream->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
2022-03-23 02:44:32 +00:00
lastUsedVgId = pVgroup->vgId;
2022-03-15 12:04:19 +00:00
pStream->vgNum++;
2022-03-16 10:29:31 +00:00
// send to vnode
SStreamTask* pTask = streamTaskNew(pStream->uid, level);
2022-03-23 02:44:32 +00:00
pTask->pipeSource = 1;
2022-03-22 10:00:03 +00:00
pTask->pipeSink = level == totLevel - 1 ? 1 : 0;
2022-03-23 02:44:32 +00:00
pTask->parallelizable = 1;
2022-03-21 08:35:27 +00:00
// TODO: set to
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) {
2022-03-16 10:29:31 +00:00
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
taosArrayPush(taskOneLevel, pTask);
2022-03-15 12:04:19 +00:00
}
2022-03-21 08:35:27 +00:00
} else {
SStreamTask* pTask = streamTaskNew(pStream->uid, level);
2022-03-23 02:44:32 +00:00
pTask->pipeSource = 0;
2022-03-22 10:00:03 +00:00
pTask->pipeSink = level == totLevel - 1 ? 1 : 0;
2022-03-23 06:47:24 +00:00
pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
2022-03-23 02:44:32 +00:00
pTask->nextOpDst = STREAM_NEXT_OP_DST__VND;
if (tsStreamSchedV) {
ASSERT(lastUsedVgId != 0);
SVgObj* pVg = mndAcquireVgroup(pMnode, lastUsedVgId);
if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVg) < 0) {
sdbRelease(pSdb, pVg);
2022-03-22 10:00:03 +00:00
qDestroyQueryPlan(pPlan);
return -1;
}
2022-03-23 02:44:32 +00:00
sdbRelease(pSdb, pVg);
2022-03-22 10:00:03 +00:00
} else {
2022-03-23 02:44:32 +00:00
SSnodeObj* pSnode = mndSchedFetchSnode(pMnode);
if (pSnode != NULL) {
if (mndAssignTaskToSnode(pMnode, pTrans, pTask, plan, pSnode) < 0) {
sdbRelease(pSdb, pSnode);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pMnode->pSdb, pSnode);
} else {
// TODO: assign to one vg
ASSERT(0);
}
2022-03-17 09:48:21 +00:00
}
2022-03-23 02:44:32 +00:00
2022-03-21 08:35:27 +00:00
taosArrayPush(taskOneLevel, pTask);
2022-03-15 12:04:19 +00:00
}
taosArrayPush(pStream->tasks, taskOneLevel);
}
return 0;
}
2022-03-07 08:30:28 +00:00
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
2022-03-15 12:04:19 +00:00
SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL;
2022-03-08 03:22:03 +00:00
SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan);
2022-03-10 02:35:24 +00:00
if (pPlan == NULL) {
2022-03-08 09:22:21 +00:00
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
2022-03-07 08:30:28 +00:00
ASSERT(pSub->vgNum == 0);
2022-03-08 03:22:03 +00:00
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
2022-03-07 08:30:28 +00:00
if (levelNum != 1) {
2022-03-10 02:35:24 +00:00
qDestroyQueryPlan(pPlan);
2022-03-08 09:22:21 +00:00
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
2022-03-07 08:30:28 +00:00
return -1;
}
2022-03-08 03:22:03 +00:00
SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, 0);
2022-03-07 09:53:02 +00:00
2022-03-08 03:22:03 +00:00
int32_t opNum = LIST_LENGTH(inner->pNodeList);
2022-03-07 09:53:02 +00:00
if (opNum != 1) {
2022-03-10 02:35:24 +00:00
qDestroyQueryPlan(pPlan);
2022-03-08 09:22:21 +00:00
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
2022-03-07 09:53:02 +00:00
return -1;
}
2022-03-08 03:22:03 +00:00
SSubplan* plan = nodesListGetNode(inner->pNodeList, 0);
2022-03-07 08:30:28 +00:00
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
pSub->vgNum++;
plan->execNode.nodeId = pVgroup->vgId;
2022-03-08 09:22:21 +00:00
plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup);
2022-03-07 08:30:28 +00:00
SMqConsumerEp consumerEp = {0};
consumerEp.status = 0;
consumerEp.consumerId = -1;
2022-03-08 09:22:21 +00:00
consumerEp.epSet = plan->execNode.epSet;
2022-03-07 08:30:28 +00:00
consumerEp.vgId = plan->execNode.nodeId;
int32_t msgLen;
2022-03-08 09:22:21 +00:00
if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
2022-03-10 02:35:24 +00:00
qDestroyQueryPlan(pPlan);
2022-03-08 09:22:21 +00:00
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
taosArrayPush(pSub->unassignedVg, &consumerEp);
2022-03-07 08:30:28 +00:00
}
2022-03-10 02:35:24 +00:00
qDestroyQueryPlan(pPlan);
2022-03-08 09:22:21 +00:00
2022-03-07 08:30:28 +00:00
return 0;
}