TDengine/source/dnode/snode/src/snode.c

271 lines
8.4 KiB
C
Raw Normal View History

2021-09-22 09:27:48 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-03-16 10:29:31 +00:00
#include "executor.h"
2021-12-27 10:43:27 +00:00
#include "sndInt.h"
2022-03-15 07:54:32 +00:00
#include "tuuid.h"
2022-08-02 15:44:47 +00:00
/*SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return NULL; }*/
/*void sndClose(SSnode *pSnode) {}*/
2022-08-02 14:23:33 +00:00
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; }
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { return 0; }
2021-09-22 09:27:48 +00:00
2021-12-28 08:20:48 +00:00
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
2022-03-25 16:29:53 +00:00
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
2022-03-16 10:29:31 +00:00
if (pSnode == NULL) {
return NULL;
}
2022-03-21 11:08:25 +00:00
pSnode->msgCb = pOption->msgCb;
2022-08-02 15:44:47 +00:00
#if 0
2022-03-16 10:29:31 +00:00
pSnode->pMeta = sndMetaNew();
if (pSnode->pMeta == NULL) {
2022-03-25 16:29:53 +00:00
taosMemoryFree(pSnode);
2022-03-16 10:29:31 +00:00
return NULL;
}
2022-08-02 15:44:47 +00:00
#endif
2021-12-27 10:43:27 +00:00
return pSnode;
2021-09-22 09:27:48 +00:00
}
2022-03-16 10:29:31 +00:00
void sndClose(SSnode *pSnode) {
2022-08-02 15:44:47 +00:00
/*sndMetaDelete(pSnode->pMeta);*/
2022-03-25 16:29:53 +00:00
taosMemoryFree(pSnode);
2022-03-16 10:29:31 +00:00
}
2021-12-27 10:43:27 +00:00
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
2022-08-02 15:44:47 +00:00
#if 0
2022-03-16 10:29:31 +00:00
SStreamMeta *sndMetaNew() {
2022-03-25 16:29:53 +00:00
SStreamMeta *pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
2022-03-16 10:29:31 +00:00
if (pMeta == NULL) {
return NULL;
}
pMeta->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMeta->pHash == NULL) {
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMeta);
2022-03-16 10:29:31 +00:00
return NULL;
}
return pMeta;
}
void sndMetaDelete(SStreamMeta *pMeta) {
taosHashCleanup(pMeta->pHash);
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMeta);
2022-03-16 10:29:31 +00:00
}
int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) {
2022-05-23 08:51:12 +00:00
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
2022-03-16 10:29:31 +00:00
return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *));
}
2022-03-16 11:40:48 +00:00
SStreamTask *sndMetaGetTask(SStreamMeta *pMeta, int32_t taskId) {
return taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
}
2022-03-16 10:29:31 +00:00
int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) {
SStreamTask *pTask = taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
if (pTask == NULL) {
2022-03-15 07:54:32 +00:00
return -1;
}
2022-03-25 16:29:53 +00:00
taosMemoryFree(pTask->exec.qmsg);
2022-03-16 10:29:31 +00:00
// TODO:free executor
2022-03-25 16:29:53 +00:00
taosMemoryFree(pTask);
2022-03-16 10:29:31 +00:00
return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t));
2022-03-15 07:54:32 +00:00
}
2022-06-20 06:29:18 +00:00
static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
2022-06-20 11:53:48 +00:00
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
2022-06-20 06:29:18 +00:00
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
}
tDecoderClear(&decoder);
2022-06-21 03:32:36 +00:00
pTask->execStatus = TASK_EXEC_STATUS__IDLE;
2022-06-20 06:29:18 +00:00
pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_INPUT_STATUS__NORMAL;
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) goto FAIL;
pTask->pMsgCb = &pNode->msgCb;
2022-06-20 11:53:48 +00:00
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
2022-06-20 06:29:18 +00:00
ASSERT(pTask->exec.executor);
streamSetupTrigger(pTask);
2022-07-08 10:00:03 +00:00
qInfo("deploy stream: stream id %" PRId64 " task id %d child id %d on snode", pTask->streamId, pTask->taskId,
2022-06-22 09:56:46 +00:00
pTask->selfChildId);
2022-06-20 06:29:18 +00:00
2022-06-20 11:53:48 +00:00
taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void *));
2022-06-20 06:29:18 +00:00
return 0;
FAIL:
if (pTask->inputQueue) streamQueueClose(pTask->inputQueue);
if (pTask->outputQueue) streamQueueClose(pTask->outputQueue);
2022-06-20 11:53:48 +00:00
if (pTask) taosMemoryFree(pTask);
2022-06-20 06:29:18 +00:00
return -1;
}
static int32_t sndProcessTaskRunReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamTaskRunReq *pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
2022-06-20 11:53:48 +00:00
streamProcessRunReq(pTask);
2022-03-16 11:40:48 +00:00
return 0;
}
2022-06-20 06:29:18 +00:00
static int32_t sndProcessTaskDispatchReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
2022-08-02 14:23:33 +00:00
streamProcessDispatchReq(pTask, &req, &rsp, true);
2022-06-20 06:29:18 +00:00
return 0;
}
static int32_t sndProcessTaskRecoverReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamTaskRecoverReq *pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
2022-06-20 11:53:48 +00:00
streamProcessRecoverReq(pTask, pReq, pMsg);
2022-06-20 06:29:18 +00:00
return 0;
}
static int32_t sndProcessTaskDispatchRsp(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
2022-06-20 11:53:48 +00:00
streamProcessDispatchRsp(pTask, pRsp);
2022-06-20 06:29:18 +00:00
return 0;
}
static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamTaskRecoverRsp *pRsp = pMsg->pCont;
2022-07-28 07:30:11 +00:00
int32_t taskId = pRsp->rspTaskId;
2022-06-20 06:29:18 +00:00
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
streamProcessRecoverRsp(pTask, pRsp);
return 0;
}
static int32_t sndProcessTaskDropReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
char *msg = pMsg->pCont;
int32_t msgLen = pMsg->contLen;
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
int32_t code = taosHashRemove(pMeta->pHash, &pReq->taskId, sizeof(int32_t));
ASSERT(code == 0);
if (code == 0) {
// sendrsp
}
return code;
}
2022-06-22 09:56:46 +00:00
static int32_t sndProcessTaskRetrieveReq(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamRetrieveReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
if (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) {
return 0;
}
SRpcMsg rsp = {
.info = pMsg->info,
.code = 0,
};
streamProcessRetrieveReq(pTask, &req, &rsp);
return 0;
}
static int32_t sndProcessTaskRetrieveRsp(SSnode *pNode, SRpcMsg *pMsg) {
//
return 0;
}
2022-06-20 06:29:18 +00:00
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
2022-03-16 10:29:31 +00:00
// stream deploy
2022-03-14 12:32:19 +00:00
// stream stop/resume
// operator exec
2022-06-20 06:29:18 +00:00
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_DEPLOY:
return sndProcessTaskDeployReq(pSnode, pMsg);
2022-06-21 03:32:36 +00:00
case TDMT_STREAM_TASK_DROP:
2022-06-20 06:29:18 +00:00
return sndProcessTaskDropReq(pSnode, pMsg);
default:
2022-03-18 06:54:08 +00:00
ASSERT(0);
2022-03-16 10:29:31 +00:00
}
2022-06-20 06:29:18 +00:00
return 0;
2022-03-14 12:32:19 +00:00
}
2022-06-20 06:29:18 +00:00
int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN:
return sndProcessTaskRunReq(pSnode, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
return sndProcessTaskDispatchReq(pSnode, pMsg);
case TDMT_STREAM_TASK_RECOVER:
return sndProcessTaskRecoverReq(pSnode, pMsg);
2022-06-22 09:56:46 +00:00
case TDMT_STREAM_RETRIEVE:
return sndProcessTaskRecoverReq(pSnode, pMsg);
2022-06-20 06:29:18 +00:00
case TDMT_STREAM_TASK_DISPATCH_RSP:
return sndProcessTaskDispatchRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_RECOVER_RSP:
return sndProcessTaskRecoverRsp(pSnode, pMsg);
2022-06-22 09:56:46 +00:00
case TDMT_STREAM_RETRIEVE_RSP:
2022-06-25 08:12:00 +00:00
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
2022-06-20 06:29:18 +00:00
default:
ASSERT(0);
}
return 0;
2022-03-14 12:32:19 +00:00
}
2022-08-02 14:23:33 +00:00
#endif