TDengine/source/libs/sync/src/syncRespMgr.c

167 lines
5.6 KiB
C
Raw Normal View History

2022-04-18 13:50:56 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncRespMgr.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
2022-04-18 13:50:56 +00:00
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
memset(pObj, 0, sizeof(SSyncRespMgr));
pObj->pRespHash =
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
2022-06-21 08:02:36 +00:00
ASSERT(pObj->pRespHash != NULL);
2022-04-18 13:50:56 +00:00
pObj->ttl = ttl;
pObj->data = data;
pObj->seqNum = 0;
taosThreadMutexInit(&(pObj->mutex), NULL);
return pObj;
}
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
2022-06-27 08:53:02 +00:00
if (pObj != NULL) {
taosThreadMutexLock(&(pObj->mutex));
taosHashCleanup(pObj->pRespHash);
taosThreadMutexUnlock(&(pObj->mutex));
taosThreadMutexDestroy(&(pObj->mutex));
taosMemoryFree(pObj);
}
2022-04-18 13:50:56 +00:00
}
int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosThreadMutexLock(&(pObj->mutex));
uint64_t keyCode = ++(pObj->seqNum);
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
2022-06-14 12:39:53 +00:00
SSyncNode *pSyncNode = pObj->data;
2022-06-18 07:17:58 +00:00
char eventLog[128];
2022-07-08 10:00:03 +00:00
snprintf(eventLog, sizeof(eventLog), "resp mgr add, type:%s,%d, seq:%" PRIu64 ", handle:%p, ahandle:%p",
2022-06-18 07:17:58 +00:00
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
syncNodeEventLog(pSyncNode, eventLog);
2022-06-14 12:39:53 +00:00
2022-04-18 13:50:56 +00:00
taosThreadMutexUnlock(&(pObj->mutex));
return keyCode;
}
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index) {
taosThreadMutexLock(&(pObj->mutex));
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
taosThreadMutexUnlock(&(pObj->mutex));
return 0;
}
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
taosThreadMutexLock(&(pObj->mutex));
void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
if (pTmp != NULL) {
memcpy(pStub, pTmp, sizeof(SRespStub));
2022-06-14 12:33:57 +00:00
SSyncNode *pSyncNode = pObj->data;
2022-06-18 07:17:58 +00:00
char eventLog[128];
2022-07-08 10:00:03 +00:00
snprintf(eventLog, sizeof(eventLog), "resp mgr get, type:%s,%d, seq:%" PRIu64 ", handle:%p, ahandle:%p",
2022-06-18 07:17:58 +00:00
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
syncNodeEventLog(pSyncNode, eventLog);
2022-06-14 12:33:57 +00:00
2022-04-18 13:50:56 +00:00
taosThreadMutexUnlock(&(pObj->mutex));
return 1; // get one object
}
taosThreadMutexUnlock(&(pObj->mutex));
return 0; // get none object
}
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
taosThreadMutexLock(&(pObj->mutex));
void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
if (pTmp != NULL) {
memcpy(pStub, pTmp, sizeof(SRespStub));
2022-06-14 12:33:57 +00:00
SSyncNode *pSyncNode = pObj->data;
2022-06-18 07:17:58 +00:00
char eventLog[128];
2022-07-08 10:00:03 +00:00
snprintf(eventLog, sizeof(eventLog), "resp mgr get-and-del, type:%s,%d, seq:%" PRIu64 ", handle:%p, ahandle:%p",
2022-06-18 07:17:58 +00:00
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
syncNodeEventLog(pSyncNode, eventLog);
2022-06-14 12:33:57 +00:00
2022-04-18 13:50:56 +00:00
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
2022-05-21 09:01:18 +00:00
taosThreadMutexUnlock(&(pObj->mutex));
2022-04-18 13:50:56 +00:00
return 1; // get one object
}
taosThreadMutexUnlock(&(pObj->mutex));
return 0; // get none object
}
void syncRespClean(SSyncRespMgr *pObj) {
taosThreadMutexLock(&(pObj->mutex));
syncRespCleanByTTL(pObj, pObj->ttl);
taosThreadMutexUnlock(&(pObj->mutex));
}
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
int cnt = 0;
SSyncNode *pSyncNode = pObj->data;
2022-07-12 08:57:19 +00:00
SArray *delIndexArray = taosArrayInit(0, sizeof(uint64_t));
ASSERT(delIndexArray != NULL);
while (pStub) {
2022-07-12 08:57:19 +00:00
size_t len;
void * key = taosHashGetKey(pStub, &len);
2022-07-12 08:57:19 +00:00
uint64_t *pSeqNum = (uint64_t *)key;
int64_t nowMS = taosGetTimestampMs();
if (nowMS - pStub->createTime > ttl) {
2022-07-12 08:57:19 +00:00
taosArrayPush(delIndexArray, pSeqNum);
cnt++;
2022-07-12 08:57:19 +00:00
SFsmCbMeta cbMeta = {0};
cbMeta.index = SYNC_INDEX_INVALID;
cbMeta.lastConfigIndex = SYNC_INDEX_INVALID;
cbMeta.isWeak = false;
cbMeta.code = TSDB_CODE_SYN_TIMEOUT;
cbMeta.state = pSyncNode->state;
cbMeta.seqNum = *pSeqNum;
cbMeta.term = SYNC_TERM_INVALID;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.flag = 0;
2022-07-12 12:23:31 +00:00
pStub->rpcMsg.pCont = NULL;
pStub->rpcMsg.contLen = 0;
2022-07-12 08:57:19 +00:00
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
}
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
}
int32_t arraySize = taosArrayGetSize(delIndexArray);
2022-07-12 08:57:19 +00:00
sDebug("vgId:%d, resp mgr clean by ttl, cnt:%d, array-size:%d", pSyncNode->vgId, cnt, arraySize);
for (int32_t i = 0; i < arraySize; ++i) {
2022-07-12 08:57:19 +00:00
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
sDebug("vgId:%d, resp mgr clean by ttl, seq:%d", pSyncNode->vgId, *pSeqNum);
}
taosArrayDestroy(delIndexArray);
}