TDengine/source/dnode/mgmt/impl/src/dndQnode.c

376 lines
10 KiB
C
Raw Normal View History

2021-12-27 12:33:23 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dndQnode.h"
2022-01-06 11:51:44 +00:00
#include "dndMgmt.h"
2021-12-27 12:33:23 +00:00
#include "dndTransport.h"
2021-12-28 03:57:24 +00:00
#include "dndWorker.h"
2021-12-27 12:33:23 +00:00
2021-12-28 03:57:24 +00:00
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg);
2021-12-27 12:33:23 +00:00
static SQnode *dndAcquireQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SQnode *pQnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
2022-01-04 13:31:49 +00:00
if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pQnode != NULL) {
2021-12-27 12:33:23 +00:00
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
pQnode = pMgmt->pQnode;
} else {
terrno = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
}
taosRUnLockLatch(&pMgmt->latch);
if (pQnode != NULL) {
dTrace("acquire qnode, refCount:%d", refCount);
}
return pQnode;
}
static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) {
2022-01-08 08:48:09 +00:00
if (pQnode == NULL) return;
2021-12-27 12:33:23 +00:00
2022-01-08 08:48:09 +00:00
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
2021-12-27 12:33:23 +00:00
taosRLockLatch(&pMgmt->latch);
2022-01-08 08:48:09 +00:00
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
2021-12-27 12:33:23 +00:00
taosRUnLockLatch(&pMgmt->latch);
2022-01-08 08:48:09 +00:00
dTrace("release qnode, refCount:%d", refCount);
2021-12-27 12:33:23 +00:00
}
static int32_t dndReadQnodeFile(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
int32_t code = TSDB_CODE_DND_QNODE_READ_FILE_ERROR;
int32_t len = 0;
2022-01-04 12:42:03 +00:00
int32_t maxLen = 1024;
2021-12-27 12:33:23 +00:00
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
2021-12-28 03:57:24 +00:00
char file[PATH_MAX + 20];
snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
// FILE *fp = fopen(file, "r");
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
2021-12-28 03:57:24 +00:00
dDebug("file %s not exist", file);
2021-12-27 12:33:23 +00:00
code = 0;
2021-12-28 03:57:24 +00:00
goto PRASE_QNODE_OVER;
2021-12-27 12:33:23 +00:00
}
len = (int32_t)taosReadFile(pFile, content, maxLen);
2021-12-27 12:33:23 +00:00
if (len <= 0) {
2021-12-28 03:57:24 +00:00
dError("failed to read %s since content is null", file);
goto PRASE_QNODE_OVER;
2021-12-27 12:33:23 +00:00
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
2021-12-28 03:57:24 +00:00
dError("failed to read %s since invalid json format", file);
goto PRASE_QNODE_OVER;
2021-12-27 12:33:23 +00:00
}
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_Number) {
2021-12-28 03:57:24 +00:00
dError("failed to read %s since deployed not found", file);
goto PRASE_QNODE_OVER;
2021-12-27 12:33:23 +00:00
}
pMgmt->deployed = deployed->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
2021-12-28 03:57:24 +00:00
dError("failed to read %s since dropped not found", file);
goto PRASE_QNODE_OVER;
2021-12-27 12:33:23 +00:00
}
pMgmt->dropped = dropped->valueint;
code = 0;
2021-12-28 03:57:24 +00:00
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
2021-12-27 12:33:23 +00:00
2021-12-28 03:57:24 +00:00
PRASE_QNODE_OVER:
2021-12-27 12:33:23 +00:00
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile);
2021-12-27 12:33:23 +00:00
terrno = code;
return code;
}
static int32_t dndWriteQnodeFile(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
2021-12-28 03:57:24 +00:00
char file[PATH_MAX + 20];
snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
2021-12-27 12:33:23 +00:00
// FILE *fp = fopen(file, "w");
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
2021-12-27 12:33:23 +00:00
terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR;
dError("failed to write %s since %s", file, terrstr());
return -1;
}
int32_t len = 0;
2022-01-04 12:42:03 +00:00
int32_t maxLen = 1024;
2021-12-27 12:33:23 +00:00
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, "}\n");
taosWriteFile(pFile, content, len);
taosFsyncFile(pFile);
taosCloseFile(&pFile);
2021-12-27 12:33:23 +00:00
free(content);
2021-12-28 09:38:15 +00:00
char realfile[PATH_MAX + 20];
snprintf(realfile, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
if (taosRenameFile(file, realfile) != 0) {
2021-12-27 12:33:23 +00:00
terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR;
2021-12-28 03:57:24 +00:00
dError("failed to rename %s since %s", file, terrstr());
2021-12-27 12:33:23 +00:00
return -1;
}
2021-12-28 09:38:15 +00:00
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
2021-12-27 12:33:23 +00:00
return 0;
}
static int32_t dndStartQnodeWorker(SDnode *pDnode) {
2021-12-28 03:57:24 +00:00
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
2021-12-28 09:38:15 +00:00
if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, dndProcessQnodeQueue) != 0) {
2021-12-27 12:33:23 +00:00
dError("failed to start qnode query worker since %s", terrstr());
return -1;
}
2021-12-28 09:38:15 +00:00
if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, dndProcessQnodeQueue) != 0) {
2021-12-27 12:33:23 +00:00
dError("failed to start qnode fetch worker since %s", terrstr());
return -1;
}
return 0;
}
static void dndStopQnodeWorker(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch);
2022-01-04 13:31:49 +00:00
while (pMgmt->refCount > 0) {
2021-12-28 03:57:24 +00:00
taosMsleep(10);
}
2021-12-27 12:33:23 +00:00
2021-12-28 03:57:24 +00:00
dndCleanupWorker(&pMgmt->queryWorker);
dndCleanupWorker(&pMgmt->fetchWorker);
2021-12-27 12:33:23 +00:00
}
static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
pOption->pDnode = pDnode;
2022-01-04 11:44:44 +00:00
pOption->sendReqToDnodeFp = dndSendReqToDnode;
pOption->sendReqToMnodeFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dndSendRedirectRsp;
2021-12-27 12:33:23 +00:00
pOption->dnodeId = dndGetDnodeId(pDnode);
pOption->clusterId = dndGetClusterId(pDnode);
2022-02-24 11:47:32 +00:00
pOption->sver = tsVersion;
2021-12-27 12:33:23 +00:00
}
static int32_t dndOpenQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
2022-01-04 13:31:49 +00:00
SQnode *pQnode = dndAcquireQnode(pDnode);
if (pQnode != NULL) {
dndReleaseQnode(pDnode, pQnode);
terrno = TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED;
dError("failed to create qnode since %s", terrstr());
return -1;
}
SQnodeOpt option = {0};
2021-12-27 12:33:23 +00:00
dndBuildQnodeOption(pDnode, &option);
2022-01-04 13:31:49 +00:00
pQnode = qndOpen(&option);
2021-12-27 12:33:23 +00:00
if (pQnode == NULL) {
dError("failed to open qnode since %s", terrstr());
return -1;
}
2021-12-28 03:57:24 +00:00
if (dndStartQnodeWorker(pDnode) != 0) {
dError("failed to start qnode worker since %s", terrstr());
2021-12-27 12:33:23 +00:00
qndClose(pQnode);
return -1;
}
2021-12-28 09:38:15 +00:00
pMgmt->deployed = 1;
2021-12-28 03:57:24 +00:00
if (dndWriteQnodeFile(pDnode) != 0) {
2021-12-28 09:38:15 +00:00
pMgmt->deployed = 0;
2021-12-28 03:57:24 +00:00
dError("failed to write qnode file since %s", terrstr());
2021-12-27 12:33:23 +00:00
dndStopQnodeWorker(pDnode);
qndClose(pQnode);
return -1;
}
taosWLockLatch(&pMgmt->latch);
pMgmt->pQnode = pQnode;
taosWUnLockLatch(&pMgmt->latch);
dInfo("qnode open successfully");
return 0;
}
static int32_t dndDropQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SQnode *pQnode = dndAcquireQnode(pDnode);
if (pQnode == NULL) {
dError("failed to drop qnode since %s", terrstr());
return -1;
}
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 1;
taosRUnLockLatch(&pMgmt->latch);
if (dndWriteQnodeFile(pDnode) != 0) {
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 0;
taosRUnLockLatch(&pMgmt->latch);
dndReleaseQnode(pDnode, pQnode);
dError("failed to drop qnode since %s", terrstr());
return -1;
}
dndReleaseQnode(pDnode, pQnode);
dndStopQnodeWorker(pDnode);
2021-12-28 09:38:15 +00:00
pMgmt->deployed = 0;
dndWriteQnodeFile(pDnode);
2021-12-27 12:33:23 +00:00
qndClose(pQnode);
pMgmt->pQnode = NULL;
return 0;
}
2022-02-16 09:33:07 +00:00
int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateQnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
2021-12-27 12:33:23 +00:00
2022-02-16 09:33:07 +00:00
if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
2022-01-07 10:03:28 +00:00
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
2022-01-04 13:31:49 +00:00
dError("failed to create qnode since %s", terrstr());
2021-12-27 12:33:23 +00:00
return -1;
} else {
return dndOpenQnode(pDnode);
}
}
2022-02-16 09:33:07 +00:00
int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropQnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
2021-12-27 12:33:23 +00:00
2022-02-16 09:33:07 +00:00
if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
2022-01-07 10:03:28 +00:00
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
2022-01-04 13:31:49 +00:00
dError("failed to drop qnode since %s", terrstr());
2021-12-27 12:33:23 +00:00
return -1;
} else {
return dndDropQnode(pDnode);
}
}
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SRpcMsg *pRsp = NULL;
2021-12-28 03:57:24 +00:00
int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
2021-12-27 12:33:23 +00:00
SQnode *pQnode = dndAcquireQnode(pDnode);
2021-12-28 03:57:24 +00:00
if (pQnode != NULL) {
code = qndProcessMsg(pQnode, pMsg, &pRsp);
2021-12-27 12:33:23 +00:00
}
2022-01-04 13:31:49 +00:00
dndReleaseQnode(pDnode, pQnode);
2021-12-27 12:33:23 +00:00
2022-01-05 02:27:11 +00:00
if (pMsg->msgType & 1u) {
if (pRsp != NULL) {
pRsp->ahandle = pMsg->ahandle;
rpcSendResponse(pRsp);
free(pRsp);
} else {
if (code != 0) code = terrno;
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rpcRsp);
}
2021-12-27 12:33:23 +00:00
}
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
2021-12-28 03:57:24 +00:00
static void dndWriteQnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
2021-12-27 12:33:23 +00:00
2021-12-28 03:57:24 +00:00
SQnode *pQnode = dndAcquireQnode(pDnode);
if (pQnode != NULL) {
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
2021-12-27 12:33:23 +00:00
}
2021-12-28 03:57:24 +00:00
dndReleaseQnode(pDnode, pQnode);
2021-12-27 12:33:23 +00:00
if (code != 0) {
2021-12-28 03:57:24 +00:00
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
2021-12-27 12:33:23 +00:00
rpcSendResponse(&rsp);
}
2021-12-28 03:57:24 +00:00
rpcFreeCont(pMsg->pCont);
2021-12-27 12:33:23 +00:00
}
}
void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
2021-12-28 03:57:24 +00:00
dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg);
2021-12-27 12:33:23 +00:00
}
void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
2021-12-28 03:57:24 +00:00
dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg);
2021-12-27 12:33:23 +00:00
}
int32_t dndInitQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
taosInitRWLatch(&pMgmt->latch);
if (dndReadQnodeFile(pDnode) != 0) {
return -1;
}
if (pMgmt->dropped) return 0;
if (!pMgmt->deployed) return 0;
return dndOpenQnode(pDnode);
}
void dndCleanupQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
2021-12-28 03:57:24 +00:00
if (pMgmt->pQnode) {
dndStopQnodeWorker(pDnode);
qndClose(pMgmt->pQnode);
pMgmt->pQnode = NULL;
}
2021-12-27 12:33:23 +00:00
}