TDengine/source/dnode/vnode/src/meta/metaCache.c

1603 lines
No EOL
52 KiB
C

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#ifdef TD_ENTERPRISE
extern const char* tkLogStb[];
extern const char* tkAuditStb[];
extern const int tkLogStbNum;
extern const int tkAuditStbNum;
#endif
#define TAG_FILTER_RES_KEY_LEN 32
#define META_CACHE_BASE_BUCKET 1024
#define META_CACHE_STATS_BUCKET 16
// (uid , suid) : child table
// (uid, 0) : normal table
// (suid, suid) : super table
typedef struct SMetaCacheEntry SMetaCacheEntry;
struct SMetaCacheEntry {
SMetaCacheEntry* next;
SMetaInfo info;
};
typedef struct SMetaStbStatsEntry {
struct SMetaStbStatsEntry* next;
SMetaStbStats info;
} SMetaStbStatsEntry;
typedef struct STagFilterResEntry {
SHashObj *set; // the set of md5 digest, extracted from the serialized tag query condition
uint32_t hitTimes; // queried times for current super table
} STagFilterResEntry;
typedef struct STagCondFilterEntry {
SArray* pColIds; // SArray<col_id_t>
SHashObj* set; // SHashObj<digest, SArray<uid>>
uint32_t hitTimes; // queried times for current tag filter condition
} STagCondFilterEntry;
typedef struct STagConds {
SHashObj* set; // SHashObj<tagColIdStr, STagCondFilterEntry>
uint32_t hitTimes; // queried times for current super table
uint32_t numTagDataEntries; // total num of tag data entries in this stable
} STagConds;
struct SMetaCache {
// child, normal, super, table entry cache
struct SEntryCache {
int32_t nEntry;
int32_t nBucket;
SMetaCacheEntry** aBucket;
} sEntryCache;
// stable stats cache
struct SStbStatsCache {
int32_t nEntry;
int32_t nBucket;
SMetaStbStatsEntry** aBucket;
} sStbStatsCache;
// query cache
struct STagFilterResCache {
TdThreadMutex lock;
uint32_t accTimes;
SHashObj* pTableEntry;
SLRUCache* pUidResCache;
} sTagFilterResCache;
// cache table list for tag filter conditions
// that match format "tag1 = v1 AND tag2 = v2 AND ..."
struct SStableTagFilterResCache {
TdThreadRwlock rwlock;
SHashObj* pTableEntry; // HashObj<suid, STagConds>
// access times
uint64_t accTimes;
// hit times
uint64_t hitTimes;
// total num of tag data entries in all stables
uint32_t numTagDataEntries;
} sStableTagFilterResCache;
struct STbGroupResCache {
TdThreadMutex lock;
uint32_t accTimes;
SHashObj* pTableEntry;
SLRUCache* pResCache;
} STbGroupResCache;
struct STbFilterCache {
SHashObj* pStb;
SHashObj* pStbName;
} STbFilterCache;
struct STbRefDbCache {
TdThreadMutex lock;
SHashObj* pStbRefs; // key: suid, value: SHashObj<dbName, refTimes>
} STbRefDbCache;
};
static void entryCacheClose(SMeta* pMeta) {
if (pMeta->pCache) {
// close entry cache
for (int32_t iBucket = 0; iBucket < pMeta->pCache->sEntryCache.nBucket; iBucket++) {
SMetaCacheEntry* pEntry = pMeta->pCache->sEntryCache.aBucket[iBucket];
while (pEntry) {
SMetaCacheEntry* tEntry = pEntry->next;
taosMemoryFree(pEntry);
pEntry = tEntry;
}
}
taosMemoryFree(pMeta->pCache->sEntryCache.aBucket);
}
}
static void statsCacheClose(SMeta* pMeta) {
if (pMeta->pCache) {
// close entry cache
for (int32_t iBucket = 0; iBucket < pMeta->pCache->sStbStatsCache.nBucket; iBucket++) {
SMetaStbStatsEntry* pEntry = pMeta->pCache->sStbStatsCache.aBucket[iBucket];
while (pEntry) {
SMetaStbStatsEntry* tEntry = pEntry->next;
taosMemoryFree(pEntry);
pEntry = tEntry;
}
}
taosMemoryFree(pMeta->pCache->sStbStatsCache.aBucket);
}
}
static void freeCacheEntryFp(void* param) {
STagFilterResEntry** p = param;
taosHashCleanup((*p)->set);
taosMemoryFreeClear(*p);
}
static void freeTagFilterEntryFp(void* param) {
STagCondFilterEntry** p = param;
taosArrayDestroy((*p)->pColIds);
taosHashCleanup((*p)->set);
taosMemoryFreeClear(*p);
}
static void freeTagCondsFp(void* param) {
STagConds** p = param;
taosHashCleanup((*p)->set);
taosMemoryFreeClear(*p);
}
static void freeRefDbFp(void* param) {
SHashObj** p = param;
taosHashCleanup(*p);
*p = NULL;
}
int32_t metaCacheOpen(SMeta* pMeta) {
int32_t code = 0;
int32_t lino;
pMeta->pCache = (SMetaCache*)taosMemoryCalloc(1, sizeof(SMetaCache));
if (pMeta->pCache == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
// open entry cache
pMeta->pCache->sEntryCache.nEntry = 0;
pMeta->pCache->sEntryCache.nBucket = META_CACHE_BASE_BUCKET;
pMeta->pCache->sEntryCache.aBucket =
(SMetaCacheEntry**)taosMemoryCalloc(pMeta->pCache->sEntryCache.nBucket, sizeof(SMetaCacheEntry*));
if (pMeta->pCache->sEntryCache.aBucket == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
// open stats cache
pMeta->pCache->sStbStatsCache.nEntry = 0;
pMeta->pCache->sStbStatsCache.nBucket = META_CACHE_STATS_BUCKET;
pMeta->pCache->sStbStatsCache.aBucket =
(SMetaStbStatsEntry**)taosMemoryCalloc(pMeta->pCache->sStbStatsCache.nBucket, sizeof(SMetaStbStatsEntry*));
if (pMeta->pCache->sStbStatsCache.aBucket == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
// open tag filter cache
pMeta->pCache->sTagFilterResCache.pUidResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
if (pMeta->pCache->sTagFilterResCache.pUidResCache == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
pMeta->pCache->sTagFilterResCache.accTimes = 0;
pMeta->pCache->sTagFilterResCache.pTableEntry =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (pMeta->pCache->sTagFilterResCache.pTableEntry == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
taosHashSetFreeFp(pMeta->pCache->sTagFilterResCache.pTableEntry, freeCacheEntryFp);
(void)taosThreadMutexInit(&pMeta->pCache->sTagFilterResCache.lock, NULL);
// open stable tag filter cache
pMeta->pCache->sStableTagFilterResCache.accTimes = 0;
pMeta->pCache->sStableTagFilterResCache.numTagDataEntries = 0;
pMeta->pCache->sStableTagFilterResCache.pTableEntry =
taosHashInit(1024,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (pMeta->pCache->sStableTagFilterResCache.pTableEntry == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
taosHashSetFreeFp(
pMeta->pCache->sStableTagFilterResCache.pTableEntry, freeTagCondsFp);
TAOS_UNUSED(taosThreadRwlockInit(
&pMeta->pCache->sStableTagFilterResCache.rwlock, NULL));
// open group res cache
pMeta->pCache->STbGroupResCache.pResCache = taosLRUCacheInit(5 * 1024 * 1024, -1, 0.5);
if (pMeta->pCache->STbGroupResCache.pResCache == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
pMeta->pCache->STbGroupResCache.accTimes = 0;
pMeta->pCache->STbGroupResCache.pTableEntry =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (pMeta->pCache->STbGroupResCache.pTableEntry == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
taosHashSetFreeFp(pMeta->pCache->STbGroupResCache.pTableEntry, freeCacheEntryFp);
(void)taosThreadMutexInit(&pMeta->pCache->STbGroupResCache.lock, NULL);
// open filter cache
pMeta->pCache->STbFilterCache.pStb =
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pMeta->pCache->STbFilterCache.pStb == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
pMeta->pCache->STbFilterCache.pStbName =
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
if (pMeta->pCache->STbFilterCache.pStbName == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
// open ref db cache
pMeta->pCache->STbRefDbCache.pStbRefs =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pMeta->pCache->STbRefDbCache.pStbRefs == NULL) {
TSDB_CHECK_CODE(code = terrno, lino, _exit);
}
taosHashSetFreeFp(pMeta->pCache->STbRefDbCache.pStbRefs, freeRefDbFp);
(void)taosThreadMutexInit(&pMeta->pCache->STbRefDbCache.lock, NULL);
_exit:
if (code) {
metaError("vgId:%d, %s failed at %s:%d since %s", TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
metaCacheClose(pMeta);
} else {
metaDebug("vgId:%d, %s success", TD_VID(pMeta->pVnode), __func__);
}
return code;
}
void metaCacheClose(SMeta* pMeta) {
if (pMeta->pCache) {
entryCacheClose(pMeta);
statsCacheClose(pMeta);
taosHashClear(pMeta->pCache->sTagFilterResCache.pTableEntry);
taosLRUCacheCleanup(pMeta->pCache->sTagFilterResCache.pUidResCache);
(void)taosThreadMutexDestroy(&pMeta->pCache->sTagFilterResCache.lock);
taosHashCleanup(pMeta->pCache->sTagFilterResCache.pTableEntry);
(void)taosThreadRwlockDestroy(&pMeta->pCache->sStableTagFilterResCache.rwlock);
taosHashCleanup(pMeta->pCache->sStableTagFilterResCache.pTableEntry);
taosHashClear(pMeta->pCache->STbGroupResCache.pTableEntry);
taosLRUCacheCleanup(pMeta->pCache->STbGroupResCache.pResCache);
(void)taosThreadMutexDestroy(&pMeta->pCache->STbGroupResCache.lock);
taosHashCleanup(pMeta->pCache->STbGroupResCache.pTableEntry);
taosHashCleanup(pMeta->pCache->STbFilterCache.pStb);
taosHashCleanup(pMeta->pCache->STbFilterCache.pStbName);
taosHashClear(pMeta->pCache->STbRefDbCache.pStbRefs);
(void)taosThreadMutexDestroy(&pMeta->pCache->STbRefDbCache.lock);
taosHashCleanup(pMeta->pCache->STbRefDbCache.pStbRefs);
taosMemoryFree(pMeta->pCache);
pMeta->pCache = NULL;
}
}
static void metaRehashCache(SMetaCache* pCache, int8_t expand) {
int32_t code = 0;
int32_t nBucket;
if (expand) {
nBucket = pCache->sEntryCache.nBucket * 2;
} else {
nBucket = pCache->sEntryCache.nBucket / 2;
}
SMetaCacheEntry** aBucket = (SMetaCacheEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaCacheEntry*));
if (aBucket == NULL) {
return;
}
// rehash
for (int32_t iBucket = 0; iBucket < pCache->sEntryCache.nBucket; iBucket++) {
SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
while (pEntry) {
SMetaCacheEntry* pTEntry = pEntry->next;
pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
pEntry = pTEntry;
}
}
// final set
taosMemoryFree(pCache->sEntryCache.aBucket);
pCache->sEntryCache.nBucket = nBucket;
pCache->sEntryCache.aBucket = aBucket;
return;
}
int32_t metaCacheUpsert(SMeta* pMeta, SMetaInfo* pInfo) {
int32_t code = 0;
// meta is wlocked for calling this func.
// search
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
ppEntry = &(*ppEntry)->next;
}
if (*ppEntry) { // update
if (pInfo->suid != (*ppEntry)->info.suid) {
metaError("meta/cache: suid should be same as the one in cache.");
return TSDB_CODE_INVALID_PARA;
}
if (pInfo->version > (*ppEntry)->info.version) {
(*ppEntry)->info.version = pInfo->version;
(*ppEntry)->info.skmVer = pInfo->skmVer;
}
} else { // insert
if (pCache->sEntryCache.nEntry >= pCache->sEntryCache.nBucket) {
metaRehashCache(pCache, 1);
iBucket = TABS(pInfo->uid) % pCache->sEntryCache.nBucket;
}
SMetaCacheEntry* pEntryNew = (SMetaCacheEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
if (pEntryNew == NULL) {
code = terrno;
goto _exit;
}
pEntryNew->info = *pInfo;
pEntryNew->next = pCache->sEntryCache.aBucket[iBucket];
pCache->sEntryCache.aBucket[iBucket] = pEntryNew;
pCache->sEntryCache.nEntry++;
}
_exit:
return code;
}
int32_t metaCacheDrop(SMeta* pMeta, int64_t uid) {
int32_t code = 0;
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
SMetaCacheEntry** ppEntry = &pCache->sEntryCache.aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != uid) {
ppEntry = &(*ppEntry)->next;
}
SMetaCacheEntry* pEntry = *ppEntry;
if (pEntry) {
*ppEntry = pEntry->next;
taosMemoryFree(pEntry);
pCache->sEntryCache.nEntry--;
if (pCache->sEntryCache.nEntry < pCache->sEntryCache.nBucket / 4 &&
pCache->sEntryCache.nBucket > META_CACHE_BASE_BUCKET) {
metaRehashCache(pCache, 0);
}
} else {
code = TSDB_CODE_NOT_FOUND;
}
_exit:
return code;
}
int32_t metaCacheGet(SMeta* pMeta, int64_t uid, SMetaInfo* pInfo) {
int32_t code = 0;
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->sEntryCache.nBucket;
SMetaCacheEntry* pEntry = pCache->sEntryCache.aBucket[iBucket];
while (pEntry && pEntry->info.uid != uid) {
pEntry = pEntry->next;
}
if (pEntry) {
if (pInfo) {
*pInfo = pEntry->info;
}
} else {
code = TSDB_CODE_NOT_FOUND;
}
return code;
}
static int32_t metaRehashStatsCache(SMetaCache* pCache, int8_t expand) {
int32_t code = 0;
int32_t nBucket;
if (expand) {
nBucket = pCache->sStbStatsCache.nBucket * 2;
} else {
nBucket = pCache->sStbStatsCache.nBucket / 2;
}
SMetaStbStatsEntry** aBucket = (SMetaStbStatsEntry**)taosMemoryCalloc(nBucket, sizeof(SMetaStbStatsEntry*));
if (aBucket == NULL) {
code = terrno;
goto _exit;
}
// rehash
for (int32_t iBucket = 0; iBucket < pCache->sStbStatsCache.nBucket; iBucket++) {
SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
while (pEntry) {
SMetaStbStatsEntry* pTEntry = pEntry->next;
pEntry->next = aBucket[TABS(pEntry->info.uid) % nBucket];
aBucket[TABS(pEntry->info.uid) % nBucket] = pEntry;
pEntry = pTEntry;
}
}
// final set
taosMemoryFree(pCache->sStbStatsCache.aBucket);
pCache->sStbStatsCache.nBucket = nBucket;
pCache->sStbStatsCache.aBucket = aBucket;
_exit:
return code;
}
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo) {
int32_t code = 0;
// meta is wlocked for calling this func.
// search
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != pInfo->uid) {
ppEntry = &(*ppEntry)->next;
}
if (*ppEntry) { // update
(*ppEntry)->info.ctbNum = pInfo->ctbNum;
(*ppEntry)->info.colNum = pInfo->colNum;
(*ppEntry)->info.flags = pInfo->flags;
(*ppEntry)->info.keep = pInfo->keep;
} else { // insert
if (pCache->sStbStatsCache.nEntry >= pCache->sStbStatsCache.nBucket) {
TAOS_UNUSED(metaRehashStatsCache(pCache, 1));
iBucket = TABS(pInfo->uid) % pCache->sStbStatsCache.nBucket;
}
SMetaStbStatsEntry* pEntryNew = (SMetaStbStatsEntry*)taosMemoryMalloc(sizeof(*pEntryNew));
if (pEntryNew == NULL) {
code = terrno;
goto _exit;
}
pEntryNew->info = *pInfo;
pEntryNew->next = pCache->sStbStatsCache.aBucket[iBucket];
pCache->sStbStatsCache.aBucket[iBucket] = pEntryNew;
pCache->sStbStatsCache.nEntry++;
}
_exit:
return code;
}
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid) {
int32_t code = 0;
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
SMetaStbStatsEntry** ppEntry = &pCache->sStbStatsCache.aBucket[iBucket];
while (*ppEntry && (*ppEntry)->info.uid != uid) {
ppEntry = &(*ppEntry)->next;
}
SMetaStbStatsEntry* pEntry = *ppEntry;
if (pEntry) {
*ppEntry = pEntry->next;
taosMemoryFree(pEntry);
pCache->sStbStatsCache.nEntry--;
if (pCache->sStbStatsCache.nEntry < pCache->sStbStatsCache.nBucket / 4 &&
pCache->sStbStatsCache.nBucket > META_CACHE_STATS_BUCKET) {
TAOS_UNUSED(metaRehashStatsCache(pCache, 0));
}
} else {
code = TSDB_CODE_NOT_FOUND;
}
_exit:
return code;
}
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
SMetaCache* pCache = pMeta->pCache;
int32_t iBucket = TABS(uid) % pCache->sStbStatsCache.nBucket;
SMetaStbStatsEntry* pEntry = pCache->sStbStatsCache.aBucket[iBucket];
while (pEntry && pEntry->info.uid != uid) {
pEntry = pEntry->next;
}
if (pEntry) {
if (pInfo) {
*pInfo = pEntry->info;
}
} else {
code = TSDB_CODE_NOT_FOUND;
}
return code;
}
static FORCE_INLINE void setMD5DigestInKey(uint64_t* pBuf, const char* key, int32_t keyLen) {
memcpy(&pBuf[2], key, keyLen);
}
// the format of key:
// hash table address(8bytes) + suid(8bytes) + MD5 digest(16bytes)
static void initCacheKey(uint64_t* buf, const SHashObj* pHashMap, uint64_t suid, const char* key, int32_t keyLen) {
buf[0] = (uint64_t)pHashMap;
buf[1] = suid;
setMD5DigestInKey(buf, key, keyLen);
}
int32_t metaGetCachedTableUidList(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray* pList1,
bool* acquireRes) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
// generate the composed key for LRU cache
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
SHashObj* pTableMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
*acquireRes = 0;
uint64_t key[4];
initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
// void* tmp = NULL;
// uint32_t len = 0;
// (void)taosAscii2Hex((const char*)key, 32, &tmp, &len);
// qDebug("metaGetCachedTableUidList %p %"PRId64" key: %s", pTableMap, suid, tmp);
// taosMemoryFree(tmp);
(void)taosThreadMutexLock(pLock);
pMeta->pCache->sTagFilterResCache.accTimes += 1;
LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
if (pHandle == NULL) {
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
// do some book mark work after acquiring the filter result from cache
STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
if (NULL == pEntry) {
metaError("meta/cache: pEntry should not be NULL.");
return TSDB_CODE_NOT_FOUND;
}
*acquireRes = 1;
const char* p = taosLRUCacheValue(pCache, pHandle);
int32_t size = *(int32_t*)p;
// set the result into the buffer
if (taosArrayAddBatch(pList1, p + sizeof(int32_t), size) == NULL) {
return terrno;
}
(*pEntry)->hitTimes += 1;
uint32_t acc = pMeta->pCache->sTagFilterResCache.accTimes;
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
metaInfo("vgId:%d cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
((double)(*pEntry)->hitTimes) / acc);
}
bool ret = taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
int32_t metaStableTagFilterCacheGet(void* pVnode, tb_uid_t suid,
const uint8_t* pTagCondKey, int32_t tagCondKeyLen,
const uint8_t* pKey, int32_t keyLen, SArray* pList1, bool* acquireRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
*acquireRes = 0;
// generate the composed key for LRU cache
SHashObj* pTableMap = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
code = taosThreadRwlockRdlock(pRwlock);
TSDB_CHECK_CODE(code, lino, _end);
pMeta->pCache->sStableTagFilterResCache.accTimes += 1;
STagConds** pTagConds =
(STagConds**)taosHashGet(pTableMap, &suid, sizeof(tb_uid_t));
TSDB_CHECK_NULL(pTagConds, code, lino, _end, TSDB_CODE_SUCCESS);
STagCondFilterEntry** pFilterEntry = (STagCondFilterEntry**)taosHashGet(
(*pTagConds)->set, pTagCondKey, tagCondKeyLen);
TSDB_CHECK_NULL(pFilterEntry, code, lino, _end, TSDB_CODE_SUCCESS);
SArray** pArray = (SArray**)taosHashGet((*pFilterEntry)->set, pKey, keyLen);
TSDB_CHECK_NULL(pArray, code, lino, _end, TSDB_CODE_SUCCESS);
// set the result into the buffer
*acquireRes = 1;
TAOS_UNUSED(taosArrayAddBatch(
pList1, TARRAY_GET_ELEM(*pArray, 0), taosArrayGetSize(*pArray)));
// do some bookmark work after acquiring the filter result from cache
(*pTagConds)->hitTimes += 1;
(*pFilterEntry)->hitTimes += 1;
uint64_t hit = ++pMeta->pCache->sStableTagFilterResCache.hitTimes;
uint64_t acc = pMeta->pCache->sStableTagFilterResCache.accTimes;
if ((*pTagConds)->hitTimes % 1000 == 0 && (*pTagConds)->hitTimes > 0) {
metaDebug(
"vgId:%d, suid:%" PRIu64
", current stable cache hit:%" PRIu32 ", this tag condition hit:%" PRIu32
", total cache hit:%" PRIu64 ", acc:%" PRIu64 ", hit rate:%.2f%%",
vgId, suid, (*pTagConds)->hitTimes, (*pFilterEntry)->hitTimes, hit, acc,
((double)hit / acc * 100));
}
_end:
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s failed at %s:%d since %s",
vgId, __func__, __FILE__, lino, tstrerror(code));
}
// unlock meta
code = taosThreadRwlockUnlock(pRwlock);
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s unlock failed at %s:%d since %s",
vgId, __func__, __FILE__, lino, tstrerror(code));
}
return code;
}
static void freeUidCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
if (value == NULL) {
return;
}
const uint64_t* p = key;
if (keyLen != sizeof(int64_t) * 4) {
metaError("key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
return;
}
SHashObj* pHashObj = (SHashObj*)p[0];
STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
if (pEntry != NULL && (*pEntry) != NULL) {
int64_t st = taosGetTimestampUs();
int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
if (code == TSDB_CODE_SUCCESS) {
double el = (taosGetTimestampUs() - st) / 1000.0;
metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", taosHashGetSize((*pEntry)->set),
el);
}
}
metaDebug("free uid cache payload in lru, suid: %" PRIu64
" origin key:%" PRIu64 ",%" PRIu64,
p[1], p[2], p[3]);
taosMemoryFree(value);
}
static int32_t addNewEntry(SHashObj* pTableEntry, const void* pKey, int32_t keyLen, uint64_t suid) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STagFilterResEntry* p = taosMemoryMalloc(sizeof(STagFilterResEntry));
TSDB_CHECK_NULL(p, code, lino, _end, terrno);
p->hitTimes = 0;
p->set = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
TSDB_CHECK_NULL(p->set, code, lino, _end, terrno);
code = taosHashPut(p->set, pKey, keyLen, NULL, 0);
TSDB_CHECK_CODE(code, lino, _end);
code = taosHashPut(pTableEntry, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
if (p != NULL) {
if (p->set != NULL) {
taosHashCleanup(p->set);
}
taosMemoryFree(p);
}
}
return code;
}
// check both the payload size and selectivity ratio
int32_t metaUidFilterCachePut(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen, double selectivityRatio) {
int32_t code = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
if (selectivityRatio > tsSelectivityRatio) {
metaDebug("vgId:%d, suid:%" PRIu64
" failed to add to uid list cache, due to selectivity ratio %.2f less than threshold %.2f",
vgId, suid, selectivityRatio, tsSelectivityRatio);
taosMemoryFree(pPayload);
return TSDB_CODE_SUCCESS;
}
if (payloadLen > tsTagFilterResCacheSize) {
metaDebug("vgId:%d, suid:%" PRIu64
" failed to add to uid list cache, due to payload length %d greater than threshold %d",
vgId, suid, payloadLen, tsTagFilterResCacheSize);
taosMemoryFree(pPayload);
return TSDB_CODE_SUCCESS;
}
SLRUCache* pCache = pMeta->pCache->sTagFilterResCache.pUidResCache;
SHashObj* pTableEntry = pMeta->pCache->sTagFilterResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
uint64_t key[4] = {0};
initCacheKey(key, pTableEntry, suid, pKey, keyLen);
// void* tmp = NULL;
// uint32_t len = 0;
// (void)taosAscii2Hex((const char*)key, 32, &tmp, &len);
// qDebug("metaUidFilterCachePut %p %"PRId64" key: %s", pTableEntry, suid, tmp);
// taosMemoryFree(tmp);
(void)taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) {
code = addNewEntry(pTableEntry, pKey, keyLen, suid);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
} else { // check if it exists or not
code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
if (code == TSDB_CODE_DUP_KEY) {
// we have already found the existed items, no need to added to cache anymore.
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_DUP_KEY;
}
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
}
// add to cache.
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeUidCachePayload, NULL, NULL,
TAOS_LRU_PRIORITY_LOW, NULL);
_end:
(void)taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " list cache added into cache, total:%d, tables:%d", vgId, suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
return code;
}
static void freeSArrayPtr(void* pp) {
SArray* pArray = *(SArray**)pp;
taosArrayDestroy(pArray);
}
int32_t metaStableTagFilterCachePut(
void* pVnode, uint64_t suid, const void* pTagCondKey, int32_t tagCondKeyLen,
const void* pKey, int32_t keyLen, SArray* pUidList, SArray** pTagColIds) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
code = taosThreadRwlockWrlock(pRwlock);
TSDB_CHECK_CODE(code, lino, _end);
STagConds** pTagConds =
(STagConds**)taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pTagConds == NULL) {
// add new (suid -> tag conds) entry
STagConds* pEntry = (STagConds*)taosMemoryMalloc(sizeof(STagConds));
TSDB_CHECK_NULL(pEntry, code, lino, _end, terrno);
pEntry->hitTimes = 0;
pEntry->set = taosHashInit(
1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
false, HASH_NO_LOCK);
taosHashSetFreeFp(pEntry->set, freeTagFilterEntryFp);
TSDB_CHECK_NULL(pEntry->set, code, lino, _end, terrno);
code = taosHashPut(
pTableEntry, &suid, sizeof(uint64_t), &pEntry, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
pTagConds = (STagConds**)taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
TSDB_CHECK_NULL(pTagConds, code, lino, _end, TSDB_CODE_NOT_FOUND);
}
STagCondFilterEntry** pFilterEntry =
(STagCondFilterEntry**)taosHashGet(
(*pTagConds)->set, pTagCondKey, tagCondKeyLen);
if (pFilterEntry == NULL) {
// add new (tag cond -> filter entry) entry
STagCondFilterEntry* pEntry =
(STagCondFilterEntry*)taosMemoryMalloc(sizeof(STagCondFilterEntry));
TSDB_CHECK_NULL(pEntry, code, lino, _end, terrno);
pEntry->hitTimes = 0;
pEntry->set = taosHashInit(
1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
false, HASH_NO_LOCK);
TSDB_CHECK_NULL(pEntry->set, code, lino, _end, terrno);
taosHashSetFreeFp(pEntry->set, freeSArrayPtr);
pEntry->pColIds = *pTagColIds;
*pTagColIds = NULL;
code = taosHashPut(
(*pTagConds)->set, pTagCondKey, tagCondKeyLen, &pEntry, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
pFilterEntry = (STagCondFilterEntry**)taosHashGet(
(*pTagConds)->set, pTagCondKey, tagCondKeyLen);
} else {
// pColIds is already set, so we can destroy the new one
taosArrayDestroy(*pTagColIds);
*pTagColIds = NULL;
}
// add to cache.
SArray* pPayload = taosArrayDup(pUidList, NULL);
code = taosHashPut(
(*pFilterEntry)->set, pKey, keyLen, &pPayload, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
pMeta->pCache->sStableTagFilterResCache.numTagDataEntries += 1;
(*pTagConds)->numTagDataEntries += 1;
_end:
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s failed at %s:%d since %s",
vgId, __func__, __FILE__, lino, tstrerror(code));
} else {
metaDebug("vgId:%d, suid:%" PRIu64 " new tag data filter entry added, "
"uid num:%d, current stable tag conditions num:%d, "
"this tag condition data entries num:%d, "
"cache stable num:%d, total tag data entries num:%" PRIu32 ", "
"total tag data entries num:%" PRIu32,
vgId, suid, (int32_t)taosArrayGetSize(pUidList),
pTagConds ? (int32_t)taosHashGetSize((*pTagConds)->set) : 0,
pFilterEntry ? (int32_t)taosHashGetSize((*pFilterEntry)->set) : 0,
(int32_t)taosHashGetSize(pTableEntry),
pMeta->pCache->sStableTagFilterResCache.numTagDataEntries,
(*pTagConds)->numTagDataEntries);
}
// unlock meta
code = taosThreadRwlockUnlock(pRwlock);
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s unlock failed at %s:%d since %s",
vgId, __func__, __FILE__, lino, tstrerror(code));
}
return code;
}
// drop all the cache entries for a super table
int32_t metaStableTagFilterCacheDropSTable(
SMeta* pMeta, tb_uid_t suid) {
if (pMeta == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
code = taosThreadRwlockWrlock(pRwlock);
TSDB_CHECK_CODE(code, lino, _end);
STagConds** pTagConds = taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
if (pTagConds != NULL) {
pMeta->pCache->sStableTagFilterResCache.
numTagDataEntries -= (*pTagConds)->numTagDataEntries;
}
code = taosHashRemove(pTableEntry, &suid, sizeof(tb_uid_t));
TSDB_CHECK_CODE(code, lino, _end);
_end:
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s failed at %s:%d since %s",
TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
metaDebug(
"vgId:%d, suid:%" PRIu64 " stable tag filter cache dropped from cache"
"left stable num:%d, tag conditions num:%" PRIu32,
TD_VID(pMeta->pVnode), suid, (int32_t)taosHashGetSize(pTableEntry),
pMeta->pCache->sStableTagFilterResCache.numTagDataEntries);
}
code = taosThreadRwlockUnlock(pRwlock);
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s unlock failed at %s:%d since %s",
TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
}
return code;
}
static int32_t getTagColSize(
const SSchema* pTagSchemas, int32_t nTagCols, col_id_t cid) {
for (int32_t i = 0; i < nTagCols; i++) {
if (pTagSchemas[i].colId == cid) {
return pTagSchemas[i].bytes;
}
}
return 0;
}
// when encode nchar tag into tag data entry key, need to convert it to var type
static FORCE_INLINE int32_t ncharToVar(char *pData, int32_t nData, char **ppOut) {
int32_t code = TSDB_CODE_SUCCESS;
char *t = taosMemoryCalloc(1, nData + VARSTR_HEADER_SIZE);
if (NULL == t) {
return terrno;
}
int32_t len = taosUcs4ToMbs(
(TdUcs4 *)pData, nData, varDataVal(t), NULL);
if (len < 0) {
taosMemoryFree(t);
return TSDB_CODE_SCALAR_CONVERT_ERROR;
}
varDataSetLen(t, len);
*ppOut = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE);
memcpy(*ppOut, t, len + VARSTR_HEADER_SIZE);
_return:
taosMemoryFree(t);
return code;
}
static int32_t buildTagDataEntryKey(const SArray* pColIds, const STag* pTag,
const SSchemaWrapper* pTagScheam, T_MD5_CTX* pContext) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t keyLen = 0;
char* pKey = NULL;
// get length first
for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) {
STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)};
if (tTagGet(pTag, &pTagValue)) {
keyLen += sizeof(col_id_t);
if (IS_VAR_DATA_TYPE(pTagValue.type)) {
int32_t varLen = getTagColSize(
pTagScheam->pSchema, pTagScheam->nCols, pTagValue.cid);
code = varLen > 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_NOT_FOUND;
QUERY_CHECK_CODE(code, lino, _end);
keyLen += varLen;
} else {
keyLen += tDataTypes[pTagValue.type].bytes;
}
} else {
// tag value not found
code = TSDB_CODE_NOT_FOUND;
QUERY_CHECK_CODE(code, lino, _end);
}
}
pKey = taosMemoryCalloc(1, keyLen);
if (NULL == pKey) {
code = terrno;
return code;
}
// build the key
char* pStart = pKey;
for (int32_t i = 0; i < taosArrayGetSize(pColIds); i++) {
STagVal pTagValue = {.cid = *(col_id_t*)taosArrayGet(pColIds, i)};
if (tTagGet(pTag, &pTagValue)) {
// copy cid
memcpy(pStart, &pTagValue.cid, sizeof(col_id_t));
pStart += sizeof(col_id_t);
// copy value
if (IS_VAR_DATA_TYPE(pTagValue.type) && pTagValue.pData != NULL) {
if (TSDB_DATA_TYPE_NCHAR == pTagValue.type) {
// need to convert nchar to var
char *pVar = NULL;
code = ncharToVar((char *)pTagValue.pData, pTagValue.nData, &pVar);
QUERY_CHECK_CODE(code, lino, _end);
memcpy(pStart, varDataVal(pVar), varDataLen(pVar));
pStart += varDataLen(pVar);
taosMemoryFree(pVar);
} else {
memcpy(pStart, pTagValue.pData, pTagValue.nData);
pStart += pTagValue.nData;
}
} else {
memcpy(pStart, &pTagValue.i64, tDataTypes[pTagValue.type].bytes);
pStart += tDataTypes[pTagValue.type].bytes;
}
} else {
// tag value not found
code = TSDB_CODE_NOT_FOUND;
QUERY_CHECK_CODE(code, lino, _end);
}
}
// update MD5
tMD5Init(pContext);
tMD5Update(pContext, (uint8_t*)pKey, (uint32_t)keyLen);
tMD5Final(pContext);
_end:
taosMemFreeClear(pKey);
return code;
}
// remove the dropped table uid from all cache entries
// pDroppedTable is the dropped child table meta entry
int32_t metaStableTagFilterCacheUpdateUid(SMeta* pMeta,
const SMetaEntry* pChildTable, const SMetaEntry* pSuperTable,
ETagFilterCacheAction action) {
if (pMeta == NULL || pChildTable == NULL || pSuperTable == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
code = taosThreadRwlockWrlock(pRwlock);
TSDB_CHECK_CODE(code, lino, _end);
tb_uid_t suid = pChildTable->ctbEntry.suid;;
STagConds** pTagConds =
(STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
if (pTagConds != NULL) {
STagCondFilterEntry** ppFilterEntry = NULL;
while ((ppFilterEntry = taosHashIterate((*pTagConds)->set, ppFilterEntry))) {
STagCondFilterEntry* pFilterEntry = *ppFilterEntry;
// rebuild the tagCondKey and check existence
SArray* pColIds = pFilterEntry->pColIds;
// rebuild the tagCondFilterKey
int32_t keyLen = 0;
char* pKey = NULL;
T_MD5_CTX context = {0};
code = buildTagDataEntryKey(pColIds, (STag*)pChildTable->ctbEntry.pTags,
&pSuperTable->stbEntry.schemaTag, &context);
if (code != TSDB_CODE_SUCCESS) {
metaError("vgId:%d, suid:%" PRIu64 " failed to build tag condition"
" key for dropped table uid:%" PRIu64 " since %s",
TD_VID(pMeta->pVnode), suid, pChildTable->uid, tstrerror(code));
goto _end;
}
SArray** pArray = (SArray**)taosHashGet(
pFilterEntry->set, context.digest, tListLen(context.digest));
if (pArray != NULL) {
// check and remove the dropped table uid from the array
// TODO(Tony Zhang): optimize this scan
if (action == STABLE_TAG_FILTER_CACHE_DROP_TABLE) {
for (int32_t i = 0; i < taosArrayGetSize(*pArray); i++) {
uint64_t uid = *(uint64_t*)taosArrayGet(*pArray, i);
if (uid == pChildTable->uid) {
taosArrayRemove(*pArray, i);
metaDebug("vgId:%d, suid:%" PRIu64
" removed dropped table uid:%" PRIu64
" from stable tag filter cache",
TD_VID(pMeta->pVnode), suid, pChildTable->uid);
break;
}
}
} else {
// STABLE_TAG_FILTER_CACHE_ADD_TABLE
void* _tmp = taosArrayPush(*pArray, &pChildTable->uid);
}
}
}
}
_end:
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s failed at %s:%d since %s",
TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
metaDebug(
"vgId:%d, suid:%" PRIu64 " update table uid:%" PRIu64
" in stable tag filter cache, action:%d",
TD_VID(pMeta->pVnode),
pChildTable->ctbEntry.suid, pChildTable->uid, action);
}
code = taosThreadRwlockUnlock(pRwlock);
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s unlock failed at %s:%d since %s",
TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
}
return code;
}
int32_t metaStableTagFilterCacheDropTag(
SMeta* pMeta, tb_uid_t suid, col_id_t cid) {
if (pMeta == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t lino = 0;
int32_t code = TSDB_CODE_SUCCESS;
SHashObj* pTableEntry = pMeta->pCache->sStableTagFilterResCache.pTableEntry;
TdThreadRwlock* pRwlock = &pMeta->pCache->sStableTagFilterResCache.rwlock;
code = taosThreadRwlockWrlock(pRwlock);
TSDB_CHECK_CODE(code, lino, _end);
STagConds** pTagConds =
(STagConds**)taosHashGet(pTableEntry, &suid, sizeof(tb_uid_t));
if (pTagConds != NULL) {
void* pIter = taosHashIterate((*pTagConds)->set, NULL);
while (pIter) {
STagCondFilterEntry* pFilterEntry = *(STagCondFilterEntry**)pIter;
bool found = false;
for (int32_t i = 0; i < taosArrayGetSize(pFilterEntry->pColIds); i++) {
col_id_t existCid = *(col_id_t*)taosArrayGet(pFilterEntry->pColIds, i);
if (existCid == cid) {
found = true;
break;
}
}
if (found) {
uint32_t numEntries = taosHashGetSize(pFilterEntry->set);
size_t keyLen = 0;
char *key = (char *)taosHashGetKey(pIter, &keyLen);
code = taosHashRemove((*pTagConds)->set, key, keyLen);
TSDB_CHECK_CODE(code, lino, _end);
(*pTagConds)->numTagDataEntries -= numEntries;
pMeta->pCache->sStableTagFilterResCache.numTagDataEntries -= numEntries;
}
pIter = taosHashIterate((*pTagConds)->set, pIter);
}
}
_end:
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s failed at %s:%d since %s",
TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
} else {
metaDebug(
"vgId:%d, suid:%" PRIu64 " dropped tag cid:%d "
"from stable tag filter cache",
TD_VID(pMeta->pVnode), suid, cid);
}
code = taosThreadRwlockUnlock(pRwlock);
if (TSDB_CODE_SUCCESS != code) {
metaError("vgId:%d, %s unlock failed at %s:%d since %s",
TD_VID(pMeta->pVnode), __func__, __FILE__, lino, tstrerror(code));
}
return code;
}
void metaCacheClear(SMeta* pMeta) {
metaWLock(pMeta);
metaCacheClose(pMeta);
(void)metaCacheOpen(pMeta);
metaULock(pMeta);
}
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaUidCacheClear(SMeta* pMeta, uint64_t suid) {
uint64_t p[4] = {0};
int32_t vgId = TD_VID(pMeta->pVnode);
SHashObj* pEntryHashMap = pMeta->pCache->sTagFilterResCache.pTableEntry;
uint64_t dummy[2] = {0};
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
TdThreadMutex* pLock = &pMeta->pCache->sTagFilterResCache.lock;
(void)taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
(*pEntry)->hitTimes = 0;
char *iter = taosHashIterate((*pEntry)->set, NULL);
while (iter != NULL) {
setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
taosLRUCacheErase(pMeta->pCache->sTagFilterResCache.pUidResCache, p, TAG_FILTER_RES_KEY_LEN);
iter = taosHashIterate((*pEntry)->set, iter);
}
taosHashClear((*pEntry)->set);
(void)taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d suid:%" PRId64 " cached related tag filter uid list cleared", vgId, suid);
return TSDB_CODE_SUCCESS;
}
int32_t metaGetCachedTbGroup(void* pVnode, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
// generate the composed key for LRU cache
SLRUCache* pCache = pMeta->pCache->STbGroupResCache.pResCache;
SHashObj* pTableMap = pMeta->pCache->STbGroupResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
*pList = NULL;
uint64_t key[4];
initCacheKey(key, pTableMap, suid, (const char*)pKey, keyLen);
(void)taosThreadMutexLock(pLock);
pMeta->pCache->STbGroupResCache.accTimes += 1;
LRUHandle* pHandle = taosLRUCacheLookup(pCache, key, TAG_FILTER_RES_KEY_LEN);
if (pHandle == NULL) {
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
STagFilterResEntry** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
if (NULL == pEntry) {
metaDebug("suid %" PRIu64 " not in tb group cache", suid);
return TSDB_CODE_NOT_FOUND;
}
*pList = taosArrayDup(taosLRUCacheValue(pCache, pHandle), NULL);
(*pEntry)->hitTimes += 1;
uint32_t acc = pMeta->pCache->STbGroupResCache.accTimes;
if ((*pEntry)->hitTimes % 5000 == 0 && (*pEntry)->hitTimes > 0) {
metaInfo("vgId:%d tb group cache hit:%d, total acc:%d, rate:%.2f", vgId, (*pEntry)->hitTimes, acc,
((double)(*pEntry)->hitTimes) / acc);
}
bool ret = taosLRUCacheRelease(pCache, pHandle, false);
// unlock meta
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
static void freeTbGroupCachePayload(const void* key, size_t keyLen, void* value, void* ud) {
(void)ud;
if (value == NULL) {
return;
}
const uint64_t* p = key;
if (keyLen != sizeof(int64_t) * 4) {
metaError("tb group key length is invalid, length:%d, expect:%d", (int32_t)keyLen, (int32_t)sizeof(uint64_t) * 2);
return;
}
SHashObj* pHashObj = (SHashObj*)p[0];
STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t));
if (pEntry != NULL && (*pEntry) != NULL) {
int64_t st = taosGetTimestampUs();
int32_t code = taosHashRemove((*pEntry)->set, &p[2], sizeof(uint64_t) * 2);
if (code == TSDB_CODE_SUCCESS) {
double el = (taosGetTimestampUs() - st) / 1000.0;
metaDebug("clear one item in tb group cache, remain cached item:%d, elapsed time:%.2fms",
taosHashGetSize((*pEntry)->set), el);
}
}
taosArrayDestroy((SArray*)value);
}
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload,
int32_t payloadLen) {
int32_t code = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
int32_t vgId = TD_VID(pMeta->pVnode);
if (payloadLen > tsTagFilterResCacheSize) {
metaDebug("vgId:%d, suid:%" PRIu64
" ignore to add to tb group cache, due to payload length %d greater than threshold %d",
vgId, suid, payloadLen, tsTagFilterResCacheSize);
taosArrayDestroy((SArray*)pPayload);
return TSDB_CODE_SUCCESS;
}
SLRUCache* pCache = pMeta->pCache->STbGroupResCache.pResCache;
SHashObj* pTableEntry = pMeta->pCache->STbGroupResCache.pTableEntry;
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
uint64_t key[4] = {0};
initCacheKey(key, pTableEntry, suid, pKey, keyLen);
(void)taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pTableEntry, &suid, sizeof(uint64_t));
if (pEntry == NULL) {
code = addNewEntry(pTableEntry, pKey, keyLen, suid);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
} else { // check if it exists or not
code = taosHashPut((*pEntry)->set, pKey, keyLen, NULL, 0);
if (code == TSDB_CODE_DUP_KEY) {
// we have already found the existed items, no need to added to cache anymore.
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
}
// add to cache.
(void)taosLRUCacheInsert(pCache, key, TAG_FILTER_RES_KEY_LEN, pPayload, payloadLen, freeTbGroupCachePayload, NULL, NULL,
TAOS_LRU_PRIORITY_LOW, NULL);
_end:
(void)taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d, suid:%" PRIu64 " tb group added into cache, total:%d, tables:%d", vgId, suid,
(int32_t)taosLRUCacheGetUsage(pCache), taosHashGetSize(pTableEntry));
return code;
}
// remove the lru cache that are expired due to the tags value update, or creating, or dropping, of child tables
int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
uint64_t p[4] = {0};
int32_t vgId = TD_VID(pMeta->pVnode);
SHashObj* pEntryHashMap = pMeta->pCache->STbGroupResCache.pTableEntry;
uint64_t dummy[2] = {0};
initCacheKey(p, pEntryHashMap, suid, (char*)&dummy[0], 16);
TdThreadMutex* pLock = &pMeta->pCache->STbGroupResCache.lock;
(void)taosThreadMutexLock(pLock);
STagFilterResEntry** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
if (pEntry == NULL || taosHashGetSize((*pEntry)->set) == 0) {
(void)taosThreadMutexUnlock(pLock);
return TSDB_CODE_SUCCESS;
}
(*pEntry)->hitTimes = 0;
char *iter = taosHashIterate((*pEntry)->set, NULL);
while (iter != NULL) {
setMD5DigestInKey(p, iter, 2 * sizeof(uint64_t));
taosLRUCacheErase(pMeta->pCache->STbGroupResCache.pResCache, p, TAG_FILTER_RES_KEY_LEN);
iter = taosHashIterate((*pEntry)->set, iter);
}
taosHashClear((*pEntry)->set);
(void)taosThreadMutexUnlock(pLock);
metaDebug("vgId:%d suid:%" PRId64 " cached related tb group cleared", vgId, suid);
return TSDB_CODE_SUCCESS;
}
bool metaTbInFilterCache(SMeta* pMeta, const void* key, int8_t type) {
if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t))) {
return true;
}
if (type == 1 && taosHashGet(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key))) {
return true;
}
return false;
}
int32_t metaPutTbToFilterCache(SMeta* pMeta, const void* key, int8_t type) {
if (type == 0) {
return taosHashPut(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t), NULL, 0);
}
if (type == 1) {
return taosHashPut(pMeta->pCache->STbFilterCache.pStbName, key, strlen(key), NULL, 0);
}
return 0;
}
int32_t metaSizeOfTbFilterCache(SMeta* pMeta, int8_t type) {
if (type == 0) {
return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
}
return 0;
}
int32_t metaInitTbFilterCache(SMeta* pMeta) {
#ifdef TD_ENTERPRISE
int32_t tbNum = 0;
const char** pTbArr = NULL;
const char* dbName = NULL;
if (!(dbName = strchr(pMeta->pVnode->config.dbname, '.'))) return 0;
if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
tbNum = tkLogStbNum;
pTbArr = (const char**)&tkLogStb;
} else if (0 == strncmp(dbName, "audit", TSDB_DB_NAME_LEN) || pMeta->pVnode->config.isAudit) {
tbNum = tkAuditStbNum;
pTbArr = (const char**)&tkAuditStb;
}
if (tbNum && pTbArr) {
for (int32_t i = 0; i < tbNum; ++i) {
TAOS_CHECK_RETURN(metaPutTbToFilterCache(pMeta, pTbArr[i], 1));
}
}
#else
#endif
return 0;
}
int64_t metaGetStbKeep(SMeta* pMeta, int64_t uid) {
SMetaStbStats stats = {0};
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
return stats.keep;
}
SMetaEntry* pEntry = NULL;
if (metaFetchEntryByUid(pMeta, uid, &pEntry) == TSDB_CODE_SUCCESS) {
int64_t keep = -1;
if (pEntry->type == TSDB_SUPER_TABLE) {
keep = pEntry->stbEntry.keep;
}
metaFetchEntryFree(&pEntry);
return keep;
}
return -1;
}
int32_t metaRefDbsCacheClear(SMeta* pMeta, uint64_t suid) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pMeta->pVnode);
SHashObj* pEntryHashMap = pMeta->pCache->STbRefDbCache.pStbRefs;
TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
(void)taosThreadMutexLock(pLock);
SHashObj** pEntry = taosHashGet(pEntryHashMap, &suid, sizeof(uint64_t));
if (pEntry == NULL) {
goto _return;
}
code = taosHashRemove(pEntryHashMap, &suid, sizeof(uint64_t));
metaDebug("vgId:%d suid:%" PRId64 " cached virtual stable ref db cleared", vgId, suid);
_return:
(void)taosThreadMutexUnlock(pLock);
return code;
}
int32_t metaGetCachedRefDbs(void* pVnode, tb_uid_t suid, SArray* pList) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t line = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
SHashObj* pTableMap = pMeta->pCache->STbRefDbCache.pStbRefs;
TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
(void)taosThreadMutexLock(pLock);
SHashObj** pEntry = taosHashGet(pTableMap, &suid, sizeof(uint64_t));
if (pEntry) {
void *iter = taosHashIterate(*pEntry, NULL);
while (iter != NULL) {
size_t dbNameLen = 0;
char* name = NULL;
char* dbName = NULL;
name = taosHashGetKey(iter, &dbNameLen);
TSDB_CHECK_NULL(name, code, line, _return, terrno);
dbName = taosMemoryMalloc(dbNameLen + 1);
TSDB_CHECK_NULL(dbName, code, line, _return, terrno);
tstrncpy(dbName, name, dbNameLen + 1);
TSDB_CHECK_NULL(taosArrayPush(pList, &dbName), code, line, _return, terrno);
iter = taosHashIterate(*pEntry, iter);
}
}
_return:
if (code) {
metaError("%s failed at line %d since %s", __func__, line, tstrerror(code));
}
(void)taosThreadMutexUnlock(pLock);
return code;
}
static int32_t addRefDbsCacheNewEntry(SHashObj* pRefDbs, uint64_t suid, SHashObj **pEntry) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SHashObj* p = NULL;
p = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
TSDB_CHECK_NULL(p, code, lino, _end, terrno);
code = taosHashPut(pRefDbs, &suid, sizeof(uint64_t), &p, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
*pEntry = p;
_end:
if (code != TSDB_CODE_SUCCESS) {
metaError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t metaPutRefDbsToCache(void* pVnode, tb_uid_t suid, SArray* pList) {
int32_t code = 0;
int32_t line = 0;
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
SHashObj* pStbRefs = pMeta->pCache->STbRefDbCache.pStbRefs;
TdThreadMutex* pLock = &pMeta->pCache->STbRefDbCache.lock;
(void)taosThreadMutexLock(pLock);
SHashObj* pEntry = NULL;
SHashObj** find = taosHashGet(pStbRefs, &suid, sizeof(uint64_t));
if (find == NULL) {
code = addRefDbsCacheNewEntry(pStbRefs, suid, &pEntry);
TSDB_CHECK_CODE(code, line, _return);
} else { // check if it exists or not
pEntry = *find;
}
for (int32_t i = 0; i < taosArrayGetSize(pList); i++) {
char* dbName = taosArrayGetP(pList, i);
void* pItem = taosHashGet(pEntry, dbName, strlen(dbName));
if (pItem == NULL) {
code = taosHashPut(pEntry, dbName, strlen(dbName), NULL, 0);
TSDB_CHECK_CODE(code, line, _return);
}
}
_return:
if (code) {
metaError("%s failed at line %d since %s", __func__, line, tstrerror(code));
}
(void)taosThreadMutexUnlock(pLock);
return code;
}