TDengine/source/client/src/clientMsgHandler.c

227 lines
7.1 KiB
C
Raw Normal View History

2021-12-14 07:08:56 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-04-24 10:02:57 +00:00
#include "catalog.h"
2021-12-14 07:08:56 +00:00
#include "clientInt.h"
2021-12-14 07:51:57 +00:00
#include "clientLog.h"
2022-04-24 10:02:57 +00:00
#include "os.h"
2022-03-01 11:48:21 +00:00
#include "query.h"
2022-04-24 10:02:57 +00:00
#include "tdef.h"
#include "tname.h"
2021-12-14 07:08:56 +00:00
2022-02-15 08:56:07 +00:00
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
2021-12-14 07:08:56 +00:00
2021-12-30 02:37:39 +00:00
static void setErrno(SRequestObj* pRequest, int32_t code) {
pRequest->code = code;
terrno = code;
}
2022-02-15 08:56:07 +00:00
int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
2021-12-30 02:37:39 +00:00
setErrno(pRequest, code);
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMsg->pData);
tsem_post(&pRequest->body.rspSem);
2021-12-30 02:37:39 +00:00
return code;
}
2022-02-15 08:56:07 +00:00
int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
2021-12-27 07:19:10 +00:00
if (code != TSDB_CODE_SUCCESS) {
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMsg->pData);
2021-12-30 02:37:39 +00:00
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
2021-12-27 07:19:10 +00:00
return code;
}
2022-02-16 03:45:44 +00:00
STscObj* pTscObj = pRequest->pTscObj;
2021-12-14 07:08:56 +00:00
2022-02-16 03:45:44 +00:00
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
2022-04-24 10:02:57 +00:00
/*assert(connectRsp.epSet.numOfEps > 0);*/
if (connectRsp.epSet.numOfEps == 0) {
taosMemoryFree(pMsg->pData);
setErrno(pRequest, TSDB_CODE_MND_APP_ERROR);
tsem_post(&pRequest->body.rspSem);
return code;
}
2021-12-14 07:08:56 +00:00
2022-02-16 03:45:44 +00:00
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
}
2022-02-16 03:45:44 +00:00
for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
2022-02-15 08:56:07 +00:00
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
2022-02-16 03:45:44 +00:00
connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
2021-12-14 07:08:56 +00:00
}
2022-02-16 03:45:44 +00:00
pTscObj->connId = connectRsp.connId;
pTscObj->acctId = connectRsp.acctId;
tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver));
2021-12-14 07:08:56 +00:00
// update the appInstInfo
2022-02-16 03:45:44 +00:00
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
2021-12-14 07:08:56 +00:00
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
2022-04-14 06:42:51 +00:00
pTscObj->connType = connectRsp.connType;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
2022-01-14 02:48:05 +00:00
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
2022-02-16 03:45:44 +00:00
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
2021-12-27 02:57:31 +00:00
pTscObj->pAppInfo->numOfConns);
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMsg->pData);
tsem_post(&pRequest->body.rspSem);
return 0;
}
2021-12-14 07:08:56 +00:00
2022-04-24 10:02:57 +00:00
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
2022-03-25 16:29:53 +00:00
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2021-12-29 15:07:01 +00:00
pMsgSendInfo->requestObjRefId = pRequest->self;
2022-04-24 10:02:57 +00:00
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->param = pRequest;
pMsgSendInfo->msgType = pRequest->type;
2022-04-12 10:57:57 +00:00
assert(pRequest != NULL);
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
2022-04-24 10:02:57 +00:00
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)
? genericRspCallback
: handleRequestRspFp[TMSG_INDEX(pRequest->type)];
return pMsgSendInfo;
}
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
2021-12-17 08:42:26 +00:00
// todo rsp with the vnode id list
SRequestObj* pRequest = param;
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMsg->pData);
2022-03-21 09:39:28 +00:00
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
}
tsem_post(&pRequest->body.rspSem);
2022-03-21 09:39:28 +00:00
return code;
2021-12-17 08:42:26 +00:00
}
2021-12-17 03:22:31 +00:00
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
2021-12-30 02:37:39 +00:00
SRequestObj* pRequest = param;
2022-03-01 11:48:21 +00:00
if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
2022-04-24 10:02:57 +00:00
struct SCatalog* pCatalog = NULL;
2022-03-01 11:48:21 +00:00
if (usedbRsp.vgVersion >= 0) {
2022-04-15 04:09:27 +00:00
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) {
2022-04-14 06:42:51 +00:00
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
2022-04-15 04:09:27 +00:00
tstrerror(code1));
2022-03-01 11:48:21 +00:00
} else {
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
}
}
2022-04-14 06:42:51 +00:00
tFreeSUsedbRsp(&usedbRsp);
2022-03-01 11:48:21 +00:00
}
2021-12-30 02:37:39 +00:00
if (code != TSDB_CODE_SUCCESS) {
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMsg->pData);
2021-12-30 02:37:39 +00:00
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
2022-02-14 09:39:08 +00:00
SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
SName name = {0};
2022-04-14 06:42:51 +00:00
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB);
2022-02-14 09:39:08 +00:00
2022-03-01 11:48:21 +00:00
SUseDbOutput output = {0};
code = queryBuildUseDbOutput(&output, &usedbRsp);
if (code != 0) {
terrno = code;
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(output.dbVgroup);
2022-03-01 11:48:21 +00:00
tscError("failed to build use db output since %s", terrstr());
2022-04-22 09:48:12 +00:00
} else if (output.dbVgroup) {
2022-04-14 06:42:51 +00:00
struct SCatalog* pCatalog = NULL;
2022-04-15 04:09:27 +00:00
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) {
2022-04-14 06:42:51 +00:00
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
2022-04-15 04:09:27 +00:00
tstrerror(code1));
2022-04-22 09:48:12 +00:00
taosMemoryFreeClear(output.dbVgroup);
2022-03-01 11:48:21 +00:00
} else {
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
}
}
2022-02-14 09:39:08 +00:00
tFreeSUsedbRsp(&usedbRsp);
char db[TSDB_DB_NAME_LEN] = {0};
tNameGetDbName(&name, db);
setConnectionDB(pRequest->pTscObj, db);
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMsg->pData);
tsem_post(&pRequest->body.rspSem);
return 0;
}
int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
2021-12-30 02:37:39 +00:00
assert(pMsg != NULL && param != NULL);
SRequestObj* pRequest = param;
2021-12-30 02:37:39 +00:00
2022-03-25 16:29:53 +00:00
taosMemoryFree(pMsg->pData);
2021-12-30 02:37:39 +00:00
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
tsem_post(&pRequest->body.rspSem);
2021-12-30 02:37:39 +00:00
return code;
}
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
2021-12-30 02:38:32 +00:00
if (code != TSDB_CODE_SUCCESS) {
setErrno(pRequest, code);
tsem_post(&pRequest->body.rspSem);
return code;
}
2022-02-14 09:39:08 +00:00
SDropDbRsp dropdbRsp = {0};
tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
2022-01-26 09:42:31 +00:00
2022-02-14 09:39:08 +00:00
struct SCatalog* pCatalog = NULL;
2022-01-26 09:42:31 +00:00
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
2022-02-14 09:39:08 +00:00
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
2022-01-26 09:42:31 +00:00
tsem_post(&pRequest->body.rspSem);
2021-12-30 02:38:32 +00:00
return code;
}
2021-12-14 07:08:56 +00:00
void initMsgHandleFp() {
2022-04-19 06:23:02 +00:00
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
2022-04-15 04:09:27 +00:00
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
2022-04-19 06:23:02 +00:00
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
2022-01-14 02:48:05 +00:00
}