TDengine/source/dnode/vnode/src/vnd/vnodeSvr.c

917 lines
28 KiB
C
Raw Normal View History

2022-04-14 08:42:50 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-04-26 11:04:26 +00:00
#include "vnd.h"
2022-04-14 08:42:50 +00:00
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
2022-06-09 12:54:59 +00:00
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
2022-06-06 12:59:36 +00:00
static int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp);
2022-04-14 10:21:06 +00:00
2022-05-23 08:38:05 +00:00
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
2022-06-02 05:57:12 +00:00
int32_t code = 0;
2022-05-23 08:38:05 +00:00
SDecoder dc = {0};
2022-04-14 08:42:50 +00:00
2022-05-23 08:38:05 +00:00
switch (pMsg->msgType) {
case TDMT_VND_CREATE_TABLE: {
int64_t ctime = taosGetTimestampMs();
int32_t nReqs;
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
tStartDecode(&dc);
tDecodeI32v(&dc, &nReqs);
for (int32_t iReq = 0; iReq < nReqs; iReq++) {
tb_uid_t uid = tGenIdPI64();
2022-05-27 03:42:59 +00:00
char *name = NULL;
2022-05-23 08:38:05 +00:00
tStartDecode(&dc);
tDecodeI32v(&dc, NULL);
2022-05-27 03:42:59 +00:00
tDecodeCStr(&dc, &name);
2022-05-23 08:38:05 +00:00
*(int64_t *)(dc.data + dc.pos) = uid;
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
tEndDecode(&dc);
}
tEndDecode(&dc);
tDecoderClear(&dc);
} break;
case TDMT_VND_SUBMIT: {
SSubmitMsgIter msgIter = {0};
SSubmitReq *pSubmitReq = (SSubmitReq *)pMsg->pCont;
SSubmitBlk *pBlock = NULL;
int64_t ctime = taosGetTimestampMs();
2022-05-23 09:34:08 +00:00
tb_uid_t uid;
2022-05-23 08:38:05 +00:00
tInitSubmitMsgIter(pSubmitReq, &msgIter);
for (;;) {
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (msgIter.schemaLen > 0) {
2022-05-27 03:38:09 +00:00
char *name = NULL;
2022-05-23 09:34:08 +00:00
2022-05-23 08:38:05 +00:00
tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
tStartDecode(&dc);
tDecodeI32v(&dc, NULL);
2022-05-27 03:38:09 +00:00
tDecodeCStr(&dc, &name);
uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
if (uid == 0) {
uid = tGenIdPI64();
}
2022-05-23 09:34:08 +00:00
*(int64_t *)(dc.data + dc.pos) = uid;
2022-05-23 08:38:05 +00:00
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
2022-05-23 09:34:08 +00:00
pBlock->uid = htobe64(uid);
2022-05-23 08:38:05 +00:00
tEndDecode(&dc);
tDecoderClear(&dc);
}
}
} break;
default:
break;
}
2022-04-19 02:37:45 +00:00
2022-06-02 05:57:12 +00:00
return code;
2022-04-14 08:42:50 +00:00
}
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL;
void *pReq;
int32_t len;
int32_t ret;
2022-04-14 08:42:50 +00:00
2022-06-08 06:44:42 +00:00
vTrace("vgId:%d, start to process write request %s, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
2022-04-20 08:50:45 +00:00
version);
2022-04-14 08:42:50 +00:00
2022-04-21 03:47:58 +00:00
pVnode->state.applied = version;
2022-04-20 08:50:45 +00:00
// skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead);
2022-04-14 08:42:50 +00:00
switch (pMsg->msgType) {
2022-04-20 08:50:45 +00:00
/* META */
2022-04-14 10:21:06 +00:00
case TDMT_VND_CREATE_STB:
2022-04-21 14:01:58 +00:00
if (vnodeProcessCreateStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
2022-04-19 03:45:22 +00:00
break;
2022-04-14 10:21:06 +00:00
case TDMT_VND_ALTER_STB:
2022-05-14 14:29:04 +00:00
if (vnodeProcessAlterStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
2022-04-19 03:45:22 +00:00
break;
2022-04-14 08:42:50 +00:00
case TDMT_VND_DROP_STB:
2022-04-28 04:09:31 +00:00
if (vnodeProcessDropStbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
2022-04-14 08:42:50 +00:00
break;
2022-04-20 08:50:45 +00:00
case TDMT_VND_CREATE_TABLE:
2022-04-22 09:42:31 +00:00
if (vnodeProcessCreateTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
2022-04-20 08:50:45 +00:00
break;
case TDMT_VND_ALTER_TABLE:
2022-05-16 06:17:56 +00:00
if (vnodeProcessAlterTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
2022-04-20 08:50:45 +00:00
break;
2022-04-14 08:42:50 +00:00
case TDMT_VND_DROP_TABLE:
2022-04-28 04:09:31 +00:00
if (vnodeProcessDropTbReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
2022-04-14 08:42:50 +00:00
break;
case TDMT_VND_CREATE_SMA: {
if (vnodeProcessCreateTSmaReq(pVnode, version, pReq, len, pRsp) < 0) goto _err;
2022-04-20 08:50:45 +00:00
} break;
/* TSDB */
2022-04-14 08:42:50 +00:00
case TDMT_VND_SUBMIT:
2022-04-25 11:38:05 +00:00
if (vnodeProcessSubmitReq(pVnode, version, pMsg->pCont, pMsg->contLen, pRsp) < 0) goto _err;
2022-04-19 03:45:22 +00:00
break;
2022-06-05 12:33:21 +00:00
case TDMT_VND_DELETE:
2022-06-06 12:59:36 +00:00
if (vnodeProcessWriteMsg(pVnode, version, pMsg, pRsp) < 0) goto _err;
2022-06-05 12:33:21 +00:00
break;
2022-04-20 08:50:45 +00:00
/* TQ */
2022-04-20 13:36:55 +00:00
case TDMT_VND_MQ_VG_CHANGE:
if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
2022-06-14 05:58:40 +00:00
goto _err;
2022-04-20 13:36:55 +00:00
}
break;
2022-05-16 20:26:37 +00:00
case TDMT_VND_MQ_VG_DELETE:
if (tqProcessVgDeleteReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
2022-06-14 05:58:40 +00:00
goto _err;
}
break;
case TDMT_VND_MQ_COMMIT_OFFSET:
if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
2022-05-16 20:26:37 +00:00
}
break;
2022-06-07 08:24:50 +00:00
case TDMT_STREAM_TASK_DEPLOY: {
2022-04-14 08:42:50 +00:00
if (tqProcessTaskDeploy(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
2022-06-14 05:58:40 +00:00
goto _err;
2022-04-14 08:42:50 +00:00
}
} break;
case TDMT_VND_ALTER_CONFIRM:
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
break;
2022-06-09 12:54:59 +00:00
case TDMT_VND_ALTER_HASHRANGE:
vnodeProcessAlterHasnRangeReq(pVnode, version, pReq, len, pRsp);
break;
case TDMT_VND_ALTER_CONFIG:
2022-04-20 06:58:14 +00:00
break;
2022-04-14 08:42:50 +00:00
default:
ASSERT(0);
break;
}
2022-06-08 06:44:42 +00:00
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
2022-04-14 08:42:50 +00:00
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
2022-04-24 07:34:53 +00:00
// commit if need
2022-04-20 08:50:45 +00:00
if (vnodeShouldCommit(pVnode)) {
2022-06-02 05:57:39 +00:00
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
2022-04-24 07:34:53 +00:00
// commit current change
vnodeCommit(pVnode);
// start a new one
vnodeBegin(pVnode);
2022-04-14 08:42:50 +00:00
}
return 0;
2022-04-21 03:47:58 +00:00
_err:
2022-06-02 05:57:39 +00:00
vError("vgId:%d, process %s request failed since %s, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
2022-04-21 03:47:58 +00:00
tstrerror(terrno), version);
return -1;
2022-04-14 08:42:50 +00:00
}
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
2022-06-02 13:02:02 +00:00
if (TDMT_VND_QUERY != pMsg->msgType) {
return 0;
}
return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
}
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
2022-04-24 12:11:34 +00:00
vTrace("message in vnode query queue is processing");
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
2022-04-14 08:42:50 +00:00
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
2022-05-28 14:13:26 +00:00
return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
2022-04-14 08:42:50 +00:00
case TDMT_VND_QUERY_CONTINUE:
2022-05-28 14:13:26 +00:00
return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0);
2022-04-14 08:42:50 +00:00
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
2022-04-14 08:42:50 +00:00
vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
2022-06-08 07:20:29 +00:00
2022-04-14 08:42:50 +00:00
switch (pMsg->msgType) {
case TDMT_VND_FETCH:
2022-05-28 14:13:26 +00:00
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
2022-04-14 08:42:50 +00:00
case TDMT_VND_FETCH_RSP:
2022-05-28 14:13:26 +00:00
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
2022-04-14 08:42:50 +00:00
case TDMT_VND_CANCEL_TASK:
2022-05-28 14:13:26 +00:00
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg, 0);
2022-04-14 08:42:50 +00:00
case TDMT_VND_DROP_TASK:
2022-05-28 14:13:26 +00:00
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
2022-04-14 08:42:50 +00:00
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
2022-06-07 08:24:50 +00:00
case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH:
2022-05-20 10:53:21 +00:00
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
2022-06-07 08:24:50 +00:00
case TDMT_STREAM_TASK_RECOVER:
2022-05-20 10:53:21 +00:00
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
2022-06-07 08:24:50 +00:00
case TDMT_STREAM_TASK_DISPATCH_RSP:
2022-05-20 10:53:21 +00:00
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
2022-06-07 08:24:50 +00:00
case TDMT_STREAM_TASK_RECOVER_RSP:
2022-05-20 10:53:21 +00:00
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
2022-04-14 08:42:50 +00:00
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
2022-06-06 12:59:36 +00:00
int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRpcMsg *pRsp) {
2022-06-05 12:33:21 +00:00
vTrace("message in write queue is processing");
2022-06-07 08:24:50 +00:00
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SDeleteRes res = {0};
2022-06-06 12:59:36 +00:00
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
2022-06-07 08:24:50 +00:00
2022-06-05 12:33:21 +00:00
switch (pMsg->msgType) {
case TDMT_VND_DELETE:
2022-06-06 12:59:36 +00:00
return qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, pRsp, &res);
2022-06-05 12:33:21 +00:00
default:
vError("unknown msg type:%d in write queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
2022-04-14 08:42:50 +00:00
// TODO: remove the function
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
2022-06-07 12:49:27 +00:00
// blockDebugShowData(data, __func__);
tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
2022-04-14 08:42:50 +00:00
}
2022-06-01 12:28:29 +00:00
void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
strcpy(pMetaRsp->dbFName, pVnode->config.dbname);
pMetaRsp->dbId = pVnode->config.dbId;
pMetaRsp->vgId = TD_VID(pVnode);
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
2022-06-12 07:21:56 +00:00
int32_t ret = TAOS_SYNC_OTHER_ERROR;
2022-05-16 12:13:59 +00:00
if (syncEnvIsStart()) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL);
2022-04-19 07:43:24 +00:00
ESyncState state = syncGetMyRole(pVnode->sync);
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
2022-04-19 07:43:24 +00:00
SMsgHead *pHead = pMsg->pCont;
2022-04-19 07:43:24 +00:00
char logBuf[512] = {0};
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
2022-06-10 14:29:28 +00:00
static int64_t vndTick = 0;
if (++vndTick % 10 == 1) {
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
}
syncRpcMsgLog2(logBuf, pMsg);
taosMemoryFree(syncNodeStr);
2022-04-19 07:43:24 +00:00
SRpcMsg *pRpcMsg = pMsg;
2022-04-19 07:43:24 +00:00
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
2022-04-19 07:43:24 +00:00
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
2022-04-19 07:43:24 +00:00
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
2022-04-19 07:43:24 +00:00
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
2022-04-19 07:43:24 +00:00
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
2022-04-19 07:43:24 +00:00
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
2022-04-19 07:43:24 +00:00
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
2022-04-19 07:43:24 +00:00
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
2022-04-19 07:43:24 +00:00
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
2022-04-19 07:43:24 +00:00
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
2022-04-19 07:43:24 +00:00
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
2022-04-19 07:43:24 +00:00
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
2022-04-19 07:43:24 +00:00
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
2022-04-19 07:43:24 +00:00
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
2022-04-19 07:43:24 +00:00
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
2022-04-19 07:43:24 +00:00
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
2022-06-12 07:21:56 +00:00
ret = TAOS_SYNC_OTHER_ERROR;
}
2022-04-19 07:43:24 +00:00
syncNodeRelease(pSyncNode);
} else {
vError("==vnodeProcessSyncReq== error syncEnv stop");
2022-06-12 07:21:56 +00:00
ret = TAOS_SYNC_OTHER_ERROR;
}
return ret;
2022-04-14 10:21:06 +00:00
}
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
2022-04-20 10:03:50 +00:00
SVCreateStbReq req = {0};
2022-05-07 10:03:06 +00:00
SDecoder coder;
2022-04-20 10:03:50 +00:00
2022-04-21 03:47:58 +00:00
pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
// decode and process req
2022-05-07 10:03:06 +00:00
tDecoderInit(&coder, pReq, len);
2022-04-20 10:03:50 +00:00
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
2022-04-21 03:47:58 +00:00
pRsp->code = terrno;
goto _err;
2022-04-20 10:03:50 +00:00
}
2022-04-22 05:30:15 +00:00
if (metaCreateSTable(pVnode->pMeta, version, &req) < 0) {
2022-04-21 03:47:58 +00:00
pRsp->code = terrno;
goto _err;
2022-04-14 10:21:06 +00:00
}
2022-05-31 07:31:28 +00:00
tdProcessRSmaCreate(pVnode, &req);
2022-04-27 11:49:04 +00:00
2022-05-07 10:03:06 +00:00
tDecoderClear(&coder);
2022-04-14 10:21:06 +00:00
return 0;
2022-04-21 03:47:58 +00:00
_err:
2022-05-07 10:03:06 +00:00
tDecoderClear(&coder);
2022-04-21 03:47:58 +00:00
return -1;
2022-04-14 10:21:06 +00:00
}
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
2022-05-07 10:03:06 +00:00
SDecoder decoder = {0};
int32_t rcode = 0;
2022-04-22 09:42:31 +00:00
SVCreateTbBatchReq req = {0};
SVCreateTbReq *pCreateReq;
2022-04-23 10:50:26 +00:00
SVCreateTbBatchRsp rsp = {0};
SVCreateTbRsp cRsp = {0};
char tbName[TSDB_TABLE_FNAME_LEN];
2022-04-27 11:49:04 +00:00
STbUidStore *pStore = NULL;
SArray *tbUids = NULL;
2022-04-21 03:47:58 +00:00
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
2022-04-23 10:50:26 +00:00
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
2022-04-14 10:21:06 +00:00
2022-04-22 09:42:31 +00:00
// decode
2022-05-07 10:03:06 +00:00
tDecoderInit(&decoder, pReq, len);
if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
2022-04-22 09:42:31 +00:00
rcode = -1;
terrno = TSDB_CODE_INVALID_MSG;
goto _exit;
}
2022-04-14 10:21:06 +00:00
2022-05-06 08:34:38 +00:00
rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
if (rsp.pArray == NULL || tbUids == NULL) {
2022-04-23 10:50:26 +00:00
rcode = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
2022-04-22 09:42:31 +00:00
// loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2022-04-22 09:42:31 +00:00
pCreateReq = req.pReqs + iReq;
2022-04-23 10:50:26 +00:00
// validate hash
sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
if (vnodeValidateTableHash(pVnode, tbName) < 0) {
cRsp.code = TSDB_CODE_VND_HASH_MISMATCH;
taosArrayPush(rsp.pArray, &cRsp);
continue;
}
// do create table
2022-04-22 09:42:31 +00:00
if (metaCreateTable(pVnode->pMeta, version, pCreateReq) < 0) {
2022-05-06 13:06:10 +00:00
if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
cRsp.code = TSDB_CODE_SUCCESS;
} else {
cRsp.code = terrno;
}
2022-04-14 10:21:06 +00:00
} else {
2022-04-23 10:50:26 +00:00
cRsp.code = TSDB_CODE_SUCCESS;
tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid);
taosArrayPush(tbUids, &pCreateReq->uid);
2022-04-14 10:21:06 +00:00
}
2022-04-23 10:50:26 +00:00
taosArrayPush(rsp.pArray, &cRsp);
2022-04-14 10:21:06 +00:00
}
2022-05-07 10:03:06 +00:00
tDecoderClear(&decoder);
2022-04-23 10:50:26 +00:00
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
tdUpdateTbUidList(pVnode->pSma, pStore);
tdUidStoreFree(pStore);
2022-04-27 11:49:04 +00:00
2022-04-22 09:42:31 +00:00
// prepare rsp
2022-05-07 10:03:06 +00:00
SEncoder encoder = {0};
int32_t ret = 0;
2022-04-27 09:39:54 +00:00
tEncodeSize(tEncodeSVCreateTbBatchRsp, &rsp, pRsp->contLen, ret);
2022-04-23 10:50:26 +00:00
pRsp->pCont = rpcMallocCont(pRsp->contLen);
if (pRsp->pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
rcode = -1;
goto _exit;
}
2022-05-07 10:03:06 +00:00
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
tEncoderClear(&encoder);
2022-04-14 10:21:06 +00:00
2022-04-22 09:42:31 +00:00
_exit:
2022-05-11 08:26:18 +00:00
taosArrayDestroy(rsp.pArray);
taosArrayDestroy(tbUids);
2022-05-07 10:03:06 +00:00
tDecoderClear(&decoder);
tEncoderClear(&encoder);
2022-04-22 09:42:31 +00:00
return rcode;
2022-04-14 10:21:06 +00:00
}
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
2022-05-14 14:29:04 +00:00
SVCreateStbReq req = {0};
SDecoder dc = {0};
pRsp->msgType = TDMT_VND_ALTER_STB_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
tDecoderInit(&dc, pReq, len);
// decode req
if (tDecodeSVCreateStbReq(&dc, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&dc);
return -1;
2022-04-14 10:21:06 +00:00
}
2022-05-14 14:29:04 +00:00
if (metaAlterSTable(pVnode->pMeta, version, &req) < 0) {
pRsp->code = terrno;
tDecoderClear(&dc);
return -1;
}
tDecoderClear(&dc);
2022-04-21 03:47:58 +00:00
return 0;
}
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
2022-04-28 06:20:00 +00:00
SVDropStbReq req = {0};
int32_t rcode = TSDB_CODE_SUCCESS;
2022-05-07 10:03:06 +00:00
SDecoder decoder = {0};
2022-04-28 06:20:00 +00:00
pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
pRsp->pCont = NULL;
pRsp->contLen = 0;
// decode request
2022-05-07 10:03:06 +00:00
tDecoderInit(&decoder, pReq, len);
if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
2022-04-28 06:20:00 +00:00
rcode = TSDB_CODE_INVALID_MSG;
goto _exit;
}
// process request
2022-05-04 07:15:50 +00:00
// if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
// rcode = terrno;
// goto _exit;
// }
2022-04-28 06:20:00 +00:00
// return rsp
_exit:
pRsp->code = rcode;
2022-05-07 10:03:06 +00:00
tDecoderClear(&decoder);
2022-04-21 03:47:58 +00:00
return 0;
}
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
2022-06-01 12:28:29 +00:00
SVAlterTbReq vAlterTbReq = {0};
SVAlterTbRsp vAlterTbRsp = {0};
SDecoder dc = {0};
int32_t rcode = 0;
int32_t ret;
2022-06-01 12:28:29 +00:00
SEncoder ec = {0};
STableMetaRsp vMetaRsp = {0};
2022-05-16 06:17:56 +00:00
pRsp->msgType = TDMT_VND_ALTER_TABLE_RSP;
pRsp->pCont = NULL;
pRsp->contLen = 0;
pRsp->code = TSDB_CODE_SUCCESS;
tDecoderInit(&dc, pReq, len);
// decode
if (tDecodeSVAlterTbReq(&dc, &vAlterTbReq) < 0) {
2022-05-16 12:13:59 +00:00
vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
2022-05-16 06:17:56 +00:00
tDecoderClear(&dc);
2022-05-16 12:13:59 +00:00
rcode = -1;
goto _exit;
2022-05-16 06:17:56 +00:00
}
// process
2022-06-01 12:28:29 +00:00
if (metaAlterTable(pVnode->pMeta, version, &vAlterTbReq, &vMetaRsp) < 0) {
2022-05-16 12:13:59 +00:00
vAlterTbRsp.code = TSDB_CODE_INVALID_MSG;
2022-05-16 06:17:56 +00:00
tDecoderClear(&dc);
2022-05-16 12:13:59 +00:00
rcode = -1;
goto _exit;
2022-05-16 06:17:56 +00:00
}
tDecoderClear(&dc);
2022-05-16 12:13:59 +00:00
2022-06-01 12:28:29 +00:00
if (NULL != vMetaRsp.pSchemas) {
vnodeUpdateMetaRsp(pVnode, &vMetaRsp);
vAlterTbRsp.pMeta = &vMetaRsp;
}
2022-05-16 12:13:59 +00:00
_exit:
tEncodeSize(tEncodeSVAlterTbRsp, &vAlterTbRsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
tEncoderClear(&ec);
2022-04-21 03:47:58 +00:00
return 0;
}
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
2022-05-04 03:57:16 +00:00
SVDropTbBatchReq req = {0};
SVDropTbBatchRsp rsp = {0};
2022-05-07 10:03:06 +00:00
SDecoder decoder = {0};
2022-05-11 08:26:18 +00:00
SEncoder encoder = {0};
int32_t ret;
SArray *tbUids = NULL;
2022-05-04 03:57:16 +00:00
2022-05-06 05:47:03 +00:00
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
2022-05-04 03:57:16 +00:00
pRsp->pCont = NULL;
pRsp->contLen = 0;
pRsp->code = TSDB_CODE_SUCCESS;
2022-04-28 04:09:31 +00:00
// decode req
2022-05-07 10:03:06 +00:00
tDecoderInit(&decoder, pReq, len);
ret = tDecodeSVDropTbBatchReq(&decoder, &req);
2022-05-04 03:57:16 +00:00
if (ret < 0) {
terrno = TSDB_CODE_INVALID_MSG;
pRsp->code = terrno;
goto _exit;
}
2022-04-28 04:09:31 +00:00
// process req
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
2022-05-11 08:26:18 +00:00
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
if (tbUids == NULL || rsp.pArray == NULL) goto _exit;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
2022-05-04 03:57:16 +00:00
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
SVDropTbRsp dropTbRsp = {0};
2022-04-28 04:09:31 +00:00
2022-05-04 03:57:16 +00:00
/* code */
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
2022-05-04 03:57:16 +00:00
if (ret < 0) {
2022-05-06 05:47:03 +00:00
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
dropTbRsp.code = TSDB_CODE_SUCCESS;
} else {
dropTbRsp.code = terrno;
}
2022-05-04 03:57:16 +00:00
} else {
2022-05-06 05:47:03 +00:00
dropTbRsp.code = TSDB_CODE_SUCCESS;
2022-05-04 03:57:16 +00:00
}
taosArrayPush(rsp.pArray, &dropTbRsp);
}
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
2022-05-04 03:57:16 +00:00
_exit:
taosArrayDestroy(tbUids);
2022-05-07 10:03:06 +00:00
tDecoderClear(&decoder);
2022-05-11 08:26:18 +00:00
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
tEncodeSVDropTbBatchRsp(&encoder, &rsp);
tEncoderClear(&encoder);
2022-04-16 06:32:34 +00:00
return 0;
}
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
const char *tags) {
2022-05-11 03:44:09 +00:00
SSubmitBlkIter blkIter = {0};
STSchema *pSchema = NULL;
tb_uid_t suid = 0;
STSRow *row = NULL;
2022-05-25 07:10:56 +00:00
int32_t rv = -1;
2022-05-11 03:44:09 +00:00
tInitSubmitBlkIter(msgIter, pBlock, &blkIter);
if (blkIter.row == NULL) return 0;
2022-05-25 07:10:56 +00:00
if (!pSchema || (suid != msgIter->suid) || rv != TD_ROW_SVER(blkIter.row)) {
2022-05-11 03:44:09 +00:00
if (pSchema) {
taosMemoryFreeClear(pSchema);
}
2022-05-25 07:10:56 +00:00
pSchema = metaGetTbTSchema(pMeta, msgIter->suid, TD_ROW_SVER(blkIter.row)); // TODO: use the real schema
2022-05-11 03:44:09 +00:00
if (pSchema) {
suid = msgIter->suid;
2022-05-25 07:10:56 +00:00
rv = TD_ROW_SVER(blkIter.row);
2022-05-11 03:44:09 +00:00
}
}
if (!pSchema) {
printf("%s:%d no valid schema\n", tags, __LINE__);
return -1;
}
char __tags[128] = {0};
snprintf(__tags, 128, "%s: uid %" PRIi64 " ", tags, msgIter->uid);
while ((row = tGetSubmitBlkNext(&blkIter))) {
tdSRowPrint(row, pSchema, __tags);
}
taosMemoryFreeClear(pSchema);
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
2022-05-09 13:52:39 +00:00
ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0};
SMeta *pMeta = pVnode->pMeta;
SSubmitBlk *pBlock = NULL;
if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
while (true) {
if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
if (pBlock == NULL) break;
2022-05-12 07:09:27 +00:00
2022-05-11 03:44:09 +00:00
vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags);
}
2022-05-09 13:52:39 +00:00
return 0;
}
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
2022-05-05 10:56:59 +00:00
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
2022-05-10 05:57:31 +00:00
SSubmitRsp submitRsp = {0};
2022-05-05 10:56:59 +00:00
SSubmitMsgIter msgIter = {0};
SSubmitBlk *pBlock;
SSubmitRsp rsp = {0};
SVCreateTbReq createTbReq = {0};
2022-05-07 10:03:06 +00:00
SDecoder decoder = {0};
2022-05-05 10:56:59 +00:00
int32_t nRows;
2022-05-10 05:57:31 +00:00
int32_t tsize, ret;
SEncoder encoder = {0};
SArray *newTbUids = NULL;
terrno = TSDB_CODE_SUCCESS;
2022-04-16 06:32:34 +00:00
pRsp->code = 0;
2022-05-01 16:30:47 +00:00
2022-05-09 13:52:39 +00:00
#ifdef TD_DEBUG_PRINT_ROW
vnodeDebugPrintSubmitMsg(pVnode, pReq, __func__);
#endif
2022-05-18 08:30:50 +00:00
if (tsdbScanAndConvertSubmitMsg(pVnode->pTsdb, pSubmitReq) < 0) {
pRsp->code = terrno;
goto _exit;
}
2022-04-16 06:32:34 +00:00
// handle the request
2022-05-05 10:56:59 +00:00
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
pRsp->code = TSDB_CODE_INVALID_MSG;
goto _exit;
2022-04-16 06:32:34 +00:00
}
2022-06-03 12:13:48 +00:00
submitRsp.pArray = taosArrayInit(msgIter.numOfBlocks, sizeof(SSubmitBlkRsp));
newTbUids = taosArrayInit(msgIter.numOfBlocks, sizeof(int64_t));
if (!submitRsp.pArray) {
pRsp->code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
2022-05-23 08:38:05 +00:00
for (;;) {
2022-05-05 10:56:59 +00:00
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
2022-05-10 05:57:31 +00:00
SSubmitBlkRsp submitBlkRsp = {0};
2022-05-05 10:56:59 +00:00
// create table for auto create table mode
if (msgIter.schemaLen > 0) {
2022-05-10 05:57:31 +00:00
submitBlkRsp.hashMeta = 1;
2022-05-07 10:03:06 +00:00
tDecoderInit(&decoder, pBlock->data, msgIter.schemaLen);
if (tDecodeSVCreateTbReq(&decoder, &createTbReq) < 0) {
2022-05-05 10:56:59 +00:00
pRsp->code = TSDB_CODE_INVALID_MSG;
2022-05-07 10:03:06 +00:00
tDecoderClear(&decoder);
2022-05-05 10:56:59 +00:00
goto _exit;
}
if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) {
if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
2022-05-12 07:09:27 +00:00
submitBlkRsp.code = terrno;
2022-05-07 10:03:06 +00:00
tDecoderClear(&decoder);
2022-05-05 10:56:59 +00:00
goto _exit;
}
}
taosArrayPush(newTbUids, &createTbReq.uid);
2022-05-05 10:56:59 +00:00
2022-05-10 05:57:31 +00:00
submitBlkRsp.uid = createTbReq.uid;
2022-05-11 03:44:09 +00:00
submitBlkRsp.tblFName = taosMemoryMalloc(strlen(pVnode->config.dbname) + strlen(createTbReq.name) + 2);
2022-05-19 11:06:53 +00:00
sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
2022-05-10 05:57:31 +00:00
2022-05-07 03:28:15 +00:00
msgIter.uid = createTbReq.uid;
if (createTbReq.type == TSDB_CHILD_TABLE) {
msgIter.suid = createTbReq.ctb.suid;
} else {
msgIter.suid = 0;
}
2022-06-10 09:53:13 +00:00
#ifdef TD_DEBUG_PRINT_ROW
2022-05-11 03:44:09 +00:00
vnodeDebugPrintSingleSubmitMsg(pVnode->pMeta, pBlock, &msgIter, "real uid");
2022-06-10 09:53:13 +00:00
#endif
2022-05-07 10:03:06 +00:00
tDecoderClear(&decoder);
2022-05-18 12:31:30 +00:00
} else {
submitBlkRsp.tblFName = taosMemoryMalloc(TSDB_TABLE_FNAME_LEN);
sprintf(submitBlkRsp.tblFName, "%s.", pVnode->config.dbname);
2022-05-05 10:56:59 +00:00
}
2022-06-06 10:00:48 +00:00
if (tsdbInsertTableData(pVnode->pTsdb, version, &msgIter, pBlock, &submitBlkRsp) < 0) {
2022-05-12 07:09:27 +00:00
submitBlkRsp.code = terrno;
2022-05-05 10:56:59 +00:00
}
2022-05-10 05:57:31 +00:00
submitRsp.numOfRows += submitBlkRsp.numOfRows;
submitRsp.affectedRows += submitBlkRsp.affectedRows;
taosArrayPush(submitRsp.pArray, &submitBlkRsp);
2022-05-05 10:56:59 +00:00
}
tqUpdateTbUidList(pVnode->pTq, newTbUids, true);
2022-05-05 10:56:59 +00:00
_exit:
taosArrayDestroy(newTbUids);
2022-05-10 05:57:31 +00:00
tEncodeSize(tEncodeSSubmitRsp, &submitRsp, tsize, ret);
pRsp->pCont = rpcMallocCont(tsize);
pRsp->contLen = tsize;
tEncoderInit(&encoder, pRsp->pCont, tsize);
tEncodeSSubmitRsp(&encoder, &submitRsp);
tEncoderClear(&encoder);
for (int32_t i = 0; i < taosArrayGetSize(submitRsp.pArray); i++) {
2022-05-11 03:44:09 +00:00
taosMemoryFree(((SSubmitBlkRsp *)taosArrayGet(submitRsp.pArray, i))[0].tblFName);
2022-05-10 05:57:31 +00:00
}
taosArrayDestroy(submitRsp.pArray);
2022-04-16 06:32:34 +00:00
// TODO: the partial success scenario and the error case
// TODO: refactor
2022-06-01 07:06:12 +00:00
if ((terrno == TSDB_CODE_SUCCESS) && (pRsp->code == TSDB_CODE_SUCCESS)) {
tdProcessRSmaSubmit(pVnode->pSma, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
}
2022-05-01 16:30:47 +00:00
2022-04-14 10:21:06 +00:00
return 0;
2022-04-20 13:36:55 +00:00
}
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateTSmaReq req = {0};
2022-05-16 06:17:56 +00:00
SDecoder coder;
2022-06-01 07:06:12 +00:00
if (pRsp) {
pRsp->msgType = TDMT_VND_CREATE_SMA_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
}
// decode and process req
tDecoderInit(&coder, pReq, len);
if (tDecodeSVCreateTSmaReq(&coder, &req) < 0) {
2022-06-01 07:06:12 +00:00
terrno = TSDB_CODE_MSG_DECODE_ERROR;
if (pRsp) pRsp->code = terrno;
goto _err;
}
2022-05-01 16:30:47 +00:00
2022-05-16 15:55:17 +00:00
// record current timezone of server side
req.timezoneInt = tsTimezone;
if (tdProcessTSmaCreate(pVnode->pSma, version, (const char *)&req) < 0) {
2022-06-01 07:06:12 +00:00
if (pRsp) pRsp->code = terrno;
goto _err;
}
2022-05-01 16:30:47 +00:00
tDecoderClear(&coder);
2022-06-02 05:57:39 +00:00
vDebug("vgId:%d, success to create tsma %s:%" PRIi64 " version %" PRIi64 " for table %" PRIi64, TD_VID(pVnode),
2022-06-01 07:06:12 +00:00
req.indexName, req.indexUid, version, req.tableUid);
2022-04-14 10:21:06 +00:00
return 0;
_err:
tDecoderClear(&coder);
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to create tsma %s:%" PRIi64 " version %" PRIi64 "for table %" PRIi64 " since %s",
2022-06-01 07:06:12 +00:00
TD_VID(pVnode), req.indexName, req.indexUid, version, req.tableUid, terrstr(terrno));
return -1;
2022-04-20 13:36:55 +00:00
}
2022-06-01 07:06:12 +00:00
/**
* @brief specific for smaDstVnode
*
* @param pVnode
* @param pCont
* @param contLen
* @return int32_t
*/
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
return 0;
2022-06-06 05:58:01 +00:00
}
2022-06-09 12:54:59 +00:00
static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
vInfo("vgId:%d, alter hashrange msg will be processed", TD_VID(pVnode));
2022-06-14 05:58:40 +00:00
// todo
// 1. stop work
2022-06-09 12:54:59 +00:00
// 2. adjust hash range / compact / remove wals / rename vgroups
// 3. reload sync
return 0;
2022-06-10 14:29:28 +00:00
}