TDengine/source/dnode/mnode/impl/src/mndConsumer.c

182 lines
5.9 KiB
C
Raw Normal View History

2021-12-29 09:53:43 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndConsumer.h"
2022-02-15 10:17:18 +00:00
#include "mndAuth.h"
2021-12-29 09:53:43 +00:00
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
2022-01-06 02:28:34 +00:00
#include "tcompare.h"
2021-12-29 09:53:43 +00:00
#include "tname.h"
#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64
2022-01-25 07:17:02 +00:00
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
2022-03-15 12:53:29 +00:00
static int32_t mndProcessConsumerMetaMsg(SNodeMsg *pMsg);
static int32_t mndRetrieveConsumer(SNodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
2022-01-25 07:17:02 +00:00
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
2021-12-29 09:53:43 +00:00
int32_t mndInitConsumer(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_CONSUMER,
2022-01-25 07:17:02 +00:00
.keyType = SDB_KEY_INT64,
2021-12-29 09:53:43 +00:00
.encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
.decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
.insertFp = (SdbInsertFp)mndConsumerActionInsert,
.updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupConsumer(SMnode *pMnode) {}
2022-02-15 10:17:18 +00:00
SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup) {
2022-03-25 16:29:53 +00:00
SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
2022-02-10 03:22:50 +00:00
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
2022-02-15 10:17:18 +00:00
pConsumer->recentRemovedTopics = taosArrayInit(1, sizeof(char *));
2022-02-10 03:22:50 +00:00
pConsumer->epoch = 1;
pConsumer->consumerId = consumerId;
2022-02-11 10:09:25 +00:00
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT);
2022-02-10 03:22:50 +00:00
strcpy(pConsumer->cgroup, cgroup);
taosInitRWLatch(&pConsumer->lock);
return pConsumer;
}
2022-01-19 08:28:13 +00:00
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-02-15 10:17:18 +00:00
void *buf = NULL;
2022-01-19 08:28:13 +00:00
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
2022-01-24 10:21:25 +00:00
int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
2021-12-31 06:22:50 +00:00
if (pRaw == NULL) goto CM_ENCODE_OVER;
2021-12-29 09:53:43 +00:00
2022-03-25 16:29:53 +00:00
buf = taosMemoryMalloc(tlen);
2022-01-19 08:28:13 +00:00
if (buf == NULL) goto CM_ENCODE_OVER;
2022-01-25 07:17:02 +00:00
void *abuf = buf;
2022-01-19 08:28:13 +00:00
tEncodeSMqConsumerObj(&abuf, pConsumer);
2021-12-29 09:53:43 +00:00
int32_t dataPos = 0;
2022-01-19 08:28:13 +00:00
SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
2022-01-06 02:56:25 +00:00
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
2021-12-29 09:53:43 +00:00
2022-01-19 08:28:13 +00:00
terrno = TSDB_CODE_SUCCESS;
2021-12-31 06:22:50 +00:00
CM_ENCODE_OVER:
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(buf);
2021-12-31 06:22:50 +00:00
if (terrno != 0) {
mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
2021-12-31 06:22:50 +00:00
sdbFreeRaw(pRaw);
return NULL;
}
mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
2021-12-29 09:53:43 +00:00
return pRaw;
}
2022-01-19 08:28:13 +00:00
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
2021-12-31 06:22:50 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-02-15 10:17:18 +00:00
void *buf = NULL;
2021-12-31 06:22:50 +00:00
2021-12-29 09:53:43 +00:00
int8_t sver = 0;
2022-01-24 10:21:25 +00:00
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
2021-12-29 09:53:43 +00:00
if (sver != MND_CONSUMER_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
2022-01-24 10:21:25 +00:00
goto CM_DECODE_OVER;
2021-12-29 09:53:43 +00:00
}
2022-01-19 08:28:13 +00:00
SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
2022-01-24 10:21:25 +00:00
if (pRow == NULL) goto CM_DECODE_OVER;
2021-12-31 06:22:50 +00:00
2022-01-06 02:28:34 +00:00
SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
2022-01-24 10:21:25 +00:00
if (pConsumer == NULL) goto CM_DECODE_OVER;
2021-12-29 09:53:43 +00:00
int32_t dataPos = 0;
2022-01-19 08:28:13 +00:00
int32_t len;
2022-01-24 10:21:25 +00:00
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
2022-03-25 16:29:53 +00:00
buf = taosMemoryMalloc(len);
2022-01-24 10:21:25 +00:00
if (buf == NULL) goto CM_DECODE_OVER;
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
2022-01-19 08:28:13 +00:00
2022-01-24 10:21:25 +00:00
if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
goto CM_DECODE_OVER;
}
2022-01-19 08:28:13 +00:00
terrno = TSDB_CODE_SUCCESS;
2022-01-24 10:21:25 +00:00
CM_DECODE_OVER:
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(buf);
2022-01-24 10:21:25 +00:00
if (terrno != TSDB_CODE_SUCCESS) {
mError("consumer:%" PRId64 ", failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(pRow);
2021-12-31 06:22:50 +00:00
return NULL;
2022-01-06 02:56:25 +00:00
}
2021-12-29 09:53:43 +00:00
return pRow;
}
2022-01-06 02:28:34 +00:00
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId);
2021-12-29 09:53:43 +00:00
return 0;
}
2022-01-06 02:28:34 +00:00
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
mTrace("consumer:%" PRId64 ", perform delete action", pConsumer->consumerId);
2021-12-29 09:53:43 +00:00
return 0;
}
2022-01-06 02:28:34 +00:00
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mTrace("consumer:%" PRId64 ", perform update action", pOldConsumer->consumerId);
2022-01-06 02:28:34 +00:00
/*taosWLockLatch(&pOldConsumer->lock);*/
2022-04-02 15:03:12 +00:00
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
2022-01-06 02:28:34 +00:00
/*taosWUnLockLatch(&pOldConsumer->lock);*/
2021-12-29 09:53:43 +00:00
return 0;
}
2022-01-25 07:17:02 +00:00
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
2022-01-06 02:28:34 +00:00
SSdb *pSdb = pMnode->pSdb;
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
2021-12-29 09:53:43 +00:00
if (pConsumer == NULL) {
terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
2021-12-29 09:53:43 +00:00
}
return pConsumer;
}
2022-01-06 02:28:34 +00:00
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
2021-12-29 09:53:43 +00:00
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pConsumer);
}