TDengine/source/dnode/vnode/src/meta/metaQuery.c

1474 lines
36 KiB
C
Raw Normal View History

2022-04-19 07:09:58 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-04-26 11:04:26 +00:00
#include "meta.h"
2022-04-19 13:10:03 +00:00
2022-04-26 11:04:26 +00:00
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
2022-04-24 05:24:55 +00:00
memset(pReader, 0, sizeof(*pReader));
pReader->flags = flags;
2022-04-26 11:04:26 +00:00
pReader->pMeta = pMeta;
if (!(flags & META_READER_NOLOCK)) {
metaRLock(pMeta);
}
2022-04-24 05:24:55 +00:00
}
2022-04-22 05:30:15 +00:00
void metaReaderReleaseLock(SMetaReader *pReader) {
if (pReader->pMeta && !(pReader->flags & META_READER_NOLOCK)) {
metaULock(pReader->pMeta);
pReader->flags |= META_READER_NOLOCK;
}
}
2022-04-24 05:38:56 +00:00
void metaReaderClear(SMetaReader *pReader) {
if (pReader->pMeta && !(pReader->flags & META_READER_NOLOCK)) {
2022-05-11 14:20:14 +00:00
metaULock(pReader->pMeta);
}
2022-05-07 10:03:06 +00:00
tDecoderClear(&pReader->coder);
2022-04-28 07:02:49 +00:00
tdbFree(pReader->pBuf);
2022-04-22 05:30:15 +00:00
}
2022-04-24 05:24:55 +00:00
int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t uid) {
2022-07-04 08:26:16 +00:00
SMeta *pMeta = pReader->pMeta;
2022-04-22 11:55:21 +00:00
STbDbKey tbDbKey = {.version = version, .uid = uid};
2022-04-22 05:30:15 +00:00
// query table.db
2022-05-18 07:57:29 +00:00
if (tdbTbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
2022-04-26 13:51:01 +00:00
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
2022-04-22 05:30:15 +00:00
goto _err;
}
// decode the entry
2022-05-07 10:03:06 +00:00
tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
2022-04-22 05:30:15 +00:00
if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
goto _err;
}
return 0;
_err:
return -1;
}
2022-08-17 07:28:49 +00:00
// int metaGetTableEntryByUidTest(void* meta, SArray *uidList) {
2022-08-16 06:48:31 +00:00
//
2022-08-17 07:28:49 +00:00
// SArray* readerList = taosArrayInit(taosArrayGetSize(uidList), sizeof(SMetaReader));
// SArray* uidVersion = taosArrayInit(taosArrayGetSize(uidList), sizeof(STbDbKey));
// SMeta *pMeta = meta;
// int64_t version;
// SHashObj *uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2022-08-16 06:48:31 +00:00
//
2022-08-17 07:28:49 +00:00
// int64_t stt1 = taosGetTimestampUs();
// for(int i = 0; i < taosArrayGetSize(uidList); i++) {
// void* ppVal = NULL;
// int vlen = 0;
// uint64_t * uid = taosArrayGet(uidList, i);
// // query uid.idx
// if (tdbTbGet(pMeta->pUidIdx, uid, sizeof(*uid), &ppVal, &vlen) < 0) {
// continue;
// }
// version = *(int64_t *)ppVal;
2022-08-16 06:48:31 +00:00
//
2022-08-17 07:28:49 +00:00
// STbDbKey tbDbKey = {.version = version, .uid = *uid};
// taosArrayPush(uidVersion, &tbDbKey);
// taosHashPut(uHash, uid, sizeof(int64_t), ppVal, sizeof(int64_t));
// }
// int64_t stt2 = taosGetTimestampUs();
// qDebug("metaGetTableEntryByUidTest1 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt2-stt1);
2022-08-16 06:48:31 +00:00
//
2022-08-17 07:28:49 +00:00
// TBC *pCur = NULL;
// tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
// tdbTbcMoveToFirst(pCur);
// void *pKey = NULL;
// int kLen = 0;
2022-08-16 06:48:31 +00:00
//
2022-08-17 07:28:49 +00:00
// while(1){
// SMetaReader pReader = {0};
// int32_t ret = tdbTbcNext(pCur, &pKey, &kLen, &pReader.pBuf, &pReader.szBuf);
// if (ret < 0) break;
// STbDbKey *tmp = (STbDbKey*)pKey;
// int64_t *ver = (int64_t*)taosHashGet(uHash, &tmp->uid, sizeof(int64_t));
// if(ver == NULL || *ver != tmp->version) continue;
// taosArrayPush(readerList, &pReader);
// }
// tdbTbcClose(pCur);
2022-08-16 06:48:31 +00:00
//
2022-08-17 07:28:49 +00:00
// taosArrayClear(readerList);
// int64_t stt3 = taosGetTimestampUs();
// qDebug("metaGetTableEntryByUidTest2 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt3-stt2);
// for(int i = 0; i < taosArrayGetSize(uidVersion); i++) {
// SMetaReader pReader = {0};
2022-08-16 06:48:31 +00:00
//
2022-08-17 07:28:49 +00:00
// STbDbKey *tbDbKey = taosArrayGet(uidVersion, i);
// // query table.db
// if (tdbTbGet(pMeta->pTbDb, tbDbKey, sizeof(STbDbKey), &pReader.pBuf, &pReader.szBuf) < 0) {
// continue;
// }
// taosArrayPush(readerList, &pReader);
// }
// int64_t stt4 = taosGetTimestampUs();
// qDebug("metaGetTableEntryByUidTest3 rows:%d, cost:%ld us", taosArrayGetSize(uidList), stt4-stt3);
2022-08-16 06:48:31 +00:00
//
2022-08-17 07:28:49 +00:00
// for(int i = 0; i < taosArrayGetSize(readerList); i++){
// SMetaReader* pReader = taosArrayGet(readerList, i);
// metaReaderInit(pReader, meta, 0);
// // decode the entry
// tDecoderInit(&pReader->coder, pReader->pBuf, pReader->szBuf);
2022-08-16 06:48:31 +00:00
//
2022-08-17 07:28:49 +00:00
// if (metaDecodeEntry(&pReader->coder, &pReader->me) < 0) {
// }
// metaReaderClear(pReader);
// }
// int64_t stt5 = taosGetTimestampUs();
// qDebug("metaGetTableEntryByUidTest4 rows:%d, cost:%ld us", taosArrayGetSize(readerList), stt5-stt4);
// return 0;
// }
2022-08-11 10:29:28 +00:00
2022-08-19 03:52:43 +00:00
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid) {
// query uid.idx
metaRLock(pMeta);
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), NULL, NULL) < 0) {
metaULock(pMeta);
return false;
}
metaULock(pMeta);
return true;
}
2022-04-24 05:24:55 +00:00
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
2022-07-04 08:26:16 +00:00
SMeta *pMeta = pReader->pMeta;
int64_t version1;
2022-04-22 05:30:15 +00:00
// query uid.idx
2022-05-18 07:57:29 +00:00
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
2022-04-26 13:51:01 +00:00
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
2022-04-22 05:30:15 +00:00
return -1;
}
version1 = ((SUidIdxVal *)pReader->pBuf)[0].version;
return metaGetTableEntryByVersion(pReader, version1, uid);
2022-04-22 05:30:15 +00:00
}
2022-04-24 05:24:55 +00:00
int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
2022-07-04 08:26:16 +00:00
SMeta *pMeta = pReader->pMeta;
2022-04-22 05:30:15 +00:00
tb_uid_t uid;
// query name.idx
2022-05-18 07:57:29 +00:00
if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
2022-04-26 13:51:01 +00:00
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
2022-04-22 05:30:15 +00:00
return -1;
}
uid = *(tb_uid_t *)pReader->pBuf;
2022-04-24 05:24:55 +00:00
return metaGetTableEntryByUid(pReader, uid);
2022-04-22 05:30:15 +00:00
}
2022-05-27 03:38:09 +00:00
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
2022-07-04 08:26:16 +00:00
void *pData = NULL;
2022-05-27 03:38:09 +00:00
int nData = 0;
tb_uid_t uid = 0;
2022-06-10 09:53:13 +00:00
metaRLock(pMeta);
2022-05-27 03:38:09 +00:00
if (tdbTbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pData, &nData) == 0) {
uid = *(tb_uid_t *)pData;
tdbFree(pData);
}
2022-06-10 09:53:13 +00:00
metaULock(pMeta);
return uid;
2022-05-27 03:38:09 +00:00
}
2022-07-30 02:30:58 +00:00
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName) {
int code = 0;
2022-07-23 06:55:12 +00:00
SMetaReader mr = {0};
2022-07-30 02:30:58 +00:00
metaReaderInit(&mr, (SMeta *)meta, 0);
code = metaGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
return -1;
}
2022-07-23 06:55:12 +00:00
STR_TO_VARSTR(tbName, mr.me.name);
metaReaderClear(&mr);
return 0;
}
2022-11-27 05:50:36 +00:00
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
code = metaGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
return -1;
}
strncpy(tbName, mr.me.name, TSDB_TABLE_NAME_LEN);
metaReaderClear(&mr);
return 0;
}
2022-10-05 10:32:30 +00:00
int metaGetTableUidByName(void *meta, char *tbName, int64_t *uid) {
2022-09-28 14:07:16 +00:00
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
2022-09-29 03:07:32 +00:00
SMetaReader *pReader = &mr;
// query name.idx
2022-10-01 05:49:22 +00:00
if (tdbTbGet(pReader->pMeta->pNameIdx, tbName, strlen(tbName) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
2022-09-29 03:07:32 +00:00
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
2022-09-28 14:07:16 +00:00
metaReaderClear(&mr);
return -1;
}
2022-09-29 03:07:32 +00:00
*uid = *(tb_uid_t *)pReader->pBuf;
2022-10-01 05:49:22 +00:00
2022-09-28 14:07:16 +00:00
metaReaderClear(&mr);
return 0;
}
2022-07-23 06:55:12 +00:00
2022-09-30 01:31:19 +00:00
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
2022-10-01 05:49:22 +00:00
code = metaGetTableEntryByName(&mr, tbName);
if (code == 0) *tbType = mr.me.type;
2022-09-30 01:31:19 +00:00
metaReaderClear(&mr);
2022-10-01 05:49:22 +00:00
return code;
2022-09-30 01:31:19 +00:00
}
2022-04-24 05:38:56 +00:00
int metaReadNext(SMetaReader *pReader) {
2022-04-24 06:19:12 +00:00
SMeta *pMeta = pReader->pMeta;
2022-04-24 05:38:56 +00:00
// TODO
2022-04-24 06:19:12 +00:00
2022-04-24 05:38:56 +00:00
return 0;
}
#if 1 // ===================================================
2022-04-19 13:10:03 +00:00
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
SMTbCursor *pTbCur = NULL;
pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
if (pTbCur == NULL) {
return NULL;
}
2022-04-26 11:04:26 +00:00
metaReaderInit(&pTbCur->mr, pMeta, 0);
2022-04-24 06:19:12 +00:00
2022-05-18 07:57:29 +00:00
tdbTbcOpen(pMeta->pUidIdx, &pTbCur->pDbc, NULL);
2022-04-19 13:10:03 +00:00
2022-05-18 07:57:29 +00:00
tdbTbcMoveToFirst(pTbCur->pDbc);
2022-04-29 10:02:36 +00:00
2022-04-19 13:10:03 +00:00
return pTbCur;
}
void metaCloseTbCursor(SMTbCursor *pTbCur) {
if (pTbCur) {
2022-04-28 07:02:49 +00:00
tdbFree(pTbCur->pKey);
tdbFree(pTbCur->pVal);
2022-04-24 06:19:12 +00:00
metaReaderClear(&pTbCur->mr);
2022-04-19 13:10:03 +00:00
if (pTbCur->pDbc) {
2022-05-18 07:57:29 +00:00
tdbTbcClose(pTbCur->pDbc);
2022-04-19 13:10:03 +00:00
}
taosMemoryFree(pTbCur);
}
}
2022-04-24 06:19:12 +00:00
int metaTbCursorNext(SMTbCursor *pTbCur) {
2022-04-19 13:10:03 +00:00
int ret;
2022-07-04 08:26:16 +00:00
void *pBuf;
2022-04-19 13:10:03 +00:00
STbCfg tbCfg;
for (;;) {
2022-05-18 07:57:29 +00:00
ret = tdbTbcNext(pTbCur->pDbc, &pTbCur->pKey, &pTbCur->kLen, &pTbCur->pVal, &pTbCur->vLen);
2022-04-24 06:19:12 +00:00
if (ret < 0) {
return -1;
2022-04-19 13:10:03 +00:00
}
2022-07-11 06:44:28 +00:00
tDecoderClear(&pTbCur->mr.coder);
2022-08-16 06:22:55 +00:00
metaGetTableEntryByVersion(&pTbCur->mr, ((SUidIdxVal *)pTbCur->pVal)[0].version, *(tb_uid_t *)pTbCur->pKey);
2022-04-26 11:04:26 +00:00
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
2022-04-24 06:19:12 +00:00
continue;
}
2022-04-24 06:30:35 +00:00
break;
2022-04-19 13:10:03 +00:00
}
2022-04-24 06:19:12 +00:00
return 0;
2022-04-19 13:10:03 +00:00
}
2022-10-10 03:00:55 +00:00
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
2022-07-04 08:26:16 +00:00
void *pData = NULL;
2022-05-28 07:10:59 +00:00
int nData = 0;
int64_t version;
SSchemaWrapper schema = {0};
SSchemaWrapper *pSchema = NULL;
SDecoder dc = {0};
2022-10-10 03:00:55 +00:00
if (lock) {
metaRLock(pMeta);
}
2022-06-07 07:56:33 +00:00
_query:
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
goto _err;
}
2022-04-19 13:10:03 +00:00
2022-08-16 06:22:55 +00:00
version = ((SUidIdxVal *)pData)[0].version;
2022-04-19 13:10:03 +00:00
2022-06-07 07:56:33 +00:00
tdbTbGet(pMeta->pTbDb, &(STbDbKey){.uid = uid, .version = version}, sizeof(STbDbKey), &pData, &nData);
SMetaEntry me = {0};
tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &me);
if (me.type == TSDB_SUPER_TABLE) {
if (sver == -1 || sver == me.stbEntry.schemaRow.version) {
2022-05-28 07:10:59 +00:00
pSchema = tCloneSSchemaWrapper(&me.stbEntry.schemaRow);
2022-06-07 07:56:33 +00:00
tDecoderClear(&dc);
goto _exit;
2022-05-28 07:10:59 +00:00
}
{ // Traverse to find the previous qualified data
TBC *pCur;
tdbTbcOpen(pMeta->pTbDb, &pCur, NULL);
STbDbKey key = {.version = sver, .uid = INT64_MAX};
int c = 0;
tdbTbcMoveTo(pCur, &key, sizeof(key), &c);
if (c < 0) {
tdbTbcMoveToPrev(pCur);
}
void *pKey = NULL;
void *pVal = NULL;
int vLen = 0, kLen = 0;
while (1) {
int32_t ret = tdbTbcPrev(pCur, &pKey, &kLen, &pVal, &vLen);
if (ret < 0) break;
STbDbKey *tmp = (STbDbKey *)pKey;
if (tmp->uid != uid) {
continue;
}
SDecoder dcNew = {0};
SMetaEntry meNew = {0};
tDecoderInit(&dcNew, pVal, vLen);
metaDecodeEntry(&dcNew, &meNew);
pSchema = tCloneSSchemaWrapper(&meNew.stbEntry.schemaRow);
tDecoderClear(&dcNew);
tdbTbcClose(pCur);
tdbFree(pKey);
tdbFree(pVal);
goto _exit;
}
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
}
2022-06-07 07:56:33 +00:00
} else if (me.type == TSDB_CHILD_TABLE) {
uid = me.ctbEntry.suid;
2022-05-28 07:10:59 +00:00
tDecoderClear(&dc);
2022-06-07 07:56:33 +00:00
goto _query;
2022-05-28 07:10:59 +00:00
} else {
2022-06-07 07:56:33 +00:00
if (sver == -1 || sver == me.ntbEntry.schemaRow.version) {
pSchema = tCloneSSchemaWrapper(&me.ntbEntry.schemaRow);
tDecoderClear(&dc);
goto _exit;
2022-05-28 07:10:59 +00:00
}
2022-06-07 07:56:33 +00:00
}
tDecoderClear(&dc);
2022-04-19 13:10:03 +00:00
2022-06-07 07:56:33 +00:00
// query from skm db
if (tdbTbGet(pMeta->pSkmDb, &(SSkmDbKey){.uid = uid, .sver = sver}, sizeof(SSkmDbKey), &pData, &nData) < 0) {
goto _err;
2022-05-28 07:10:59 +00:00
}
2022-04-25 12:23:00 +00:00
2022-06-07 07:56:33 +00:00
tDecoderInit(&dc, pData, nData);
2022-07-18 05:31:13 +00:00
tDecodeSSchemaWrapperEx(&dc, &schema);
2022-06-07 07:56:33 +00:00
pSchema = tCloneSSchemaWrapper(&schema);
tDecoderClear(&dc);
_exit:
tDecoderClear(&dc);
2022-10-10 03:00:55 +00:00
if (lock) {
metaULock(pMeta);
}
2022-05-28 07:10:59 +00:00
tdbFree(pData);
return pSchema;
2022-04-25 12:23:00 +00:00
2022-05-28 07:10:59 +00:00
_err:
tDecoderClear(&dc);
2022-10-10 03:00:55 +00:00
if (lock) {
metaULock(pMeta);
}
2022-05-28 07:10:59 +00:00
tdbFree(pData);
return NULL;
2022-04-19 13:10:03 +00:00
}
2022-07-04 08:26:16 +00:00
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
TBC *pCur;
int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
if (ret < 0) {
return ret;
}
STtlIdxKey ttlKey = {0};
ttlKey.dtime = ttl;
ttlKey.uid = INT64_MAX;
int c = 0;
tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
if (c < 0) {
tdbTbcMoveToPrev(pCur);
}
void *pKey = NULL;
2022-07-04 08:26:16 +00:00
int kLen = 0;
while (1) {
ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
if (ret < 0) {
break;
}
2022-07-04 08:26:16 +00:00
ttlKey = *(STtlIdxKey *)pKey;
taosArrayPush(uidList, &ttlKey.uid);
}
2022-09-19 08:34:07 +00:00
tdbFree(pKey);
tdbTbcClose(pCur);
return 0;
}
2022-04-26 01:42:43 +00:00
struct SMCtbCursor {
2022-07-04 08:26:16 +00:00
SMeta *pMeta;
TBC *pCur;
2022-04-26 01:42:43 +00:00
tb_uid_t suid;
2022-07-04 08:26:16 +00:00
void *pKey;
void *pVal;
2022-04-26 01:42:43 +00:00
int kLen;
int vLen;
};
2022-10-10 03:00:55 +00:00
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) {
2022-04-19 13:10:03 +00:00
SMCtbCursor *pCtbCur = NULL;
2022-04-29 10:02:36 +00:00
SCtbIdxKey ctbIdxKey;
2022-06-23 06:19:21 +00:00
int ret = 0;
int c = 0;
2022-04-19 13:10:03 +00:00
2022-04-26 01:42:43 +00:00
pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
if (pCtbCur == NULL) {
return NULL;
}
2022-04-19 13:10:03 +00:00
2022-05-12 03:34:38 +00:00
pCtbCur->pMeta = pMeta;
2022-04-26 01:42:43 +00:00
pCtbCur->suid = uid;
2022-10-10 03:00:55 +00:00
if (lock) {
metaRLock(pMeta);
}
2022-05-12 03:34:38 +00:00
2022-05-18 07:57:29 +00:00
ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
2022-04-26 01:42:43 +00:00
if (ret < 0) {
2022-05-12 03:34:38 +00:00
metaULock(pMeta);
2022-04-26 01:42:43 +00:00
taosMemoryFree(pCtbCur);
return NULL;
}
2022-04-19 13:10:03 +00:00
2022-04-29 10:02:36 +00:00
// move to the suid
ctbIdxKey.suid = uid;
ctbIdxKey.uid = INT64_MIN;
2022-05-18 07:57:29 +00:00
tdbTbcMoveTo(pCtbCur->pCur, &ctbIdxKey, sizeof(ctbIdxKey), &c);
2022-04-29 10:02:36 +00:00
if (c > 0) {
2022-05-18 07:57:29 +00:00
tdbTbcMoveToNext(pCtbCur->pCur);
2022-04-29 10:02:36 +00:00
}
2022-04-19 13:10:03 +00:00
return pCtbCur;
}
2022-10-10 03:00:55 +00:00
void metaCloseCtbCursor(SMCtbCursor *pCtbCur, int lock) {
2022-04-26 01:42:43 +00:00
if (pCtbCur) {
2022-10-10 03:00:55 +00:00
if (pCtbCur->pMeta && lock) metaULock(pCtbCur->pMeta);
2022-04-26 01:42:43 +00:00
if (pCtbCur->pCur) {
2022-05-18 07:57:29 +00:00
tdbTbcClose(pCtbCur->pCur);
2022-04-19 13:10:03 +00:00
2022-04-28 07:02:49 +00:00
tdbFree(pCtbCur->pKey);
tdbFree(pCtbCur->pVal);
2022-04-26 01:42:43 +00:00
}
2022-04-19 13:10:03 +00:00
2022-04-26 01:42:43 +00:00
taosMemoryFree(pCtbCur);
}
2022-04-19 13:10:03 +00:00
}
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
2022-04-26 01:42:43 +00:00
int ret;
SCtbIdxKey *pCtbIdxKey;
2022-04-19 13:10:03 +00:00
2022-05-18 07:57:29 +00:00
ret = tdbTbcNext(pCtbCur->pCur, &pCtbCur->pKey, &pCtbCur->kLen, &pCtbCur->pVal, &pCtbCur->vLen);
2022-04-26 01:42:43 +00:00
if (ret < 0) {
return 0;
}
2022-04-19 13:10:03 +00:00
2022-04-26 01:42:43 +00:00
pCtbIdxKey = pCtbCur->pKey;
2022-04-29 10:02:36 +00:00
if (pCtbIdxKey->suid > pCtbCur->suid) {
return 0;
}
2022-04-19 13:10:03 +00:00
2022-04-26 01:42:43 +00:00
return pCtbIdxKey->uid;
2022-04-19 13:10:03 +00:00
}
2022-06-26 10:44:49 +00:00
struct SMStbCursor {
SMeta *pMeta;
TBC *pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
int kLen;
int vLen;
};
SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) {
SMStbCursor *pStbCur = NULL;
int ret = 0;
int c = 0;
pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur));
if (pStbCur == NULL) {
2022-06-27 06:47:14 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-06-26 10:44:49 +00:00
return NULL;
}
pStbCur->pMeta = pMeta;
pStbCur->suid = suid;
metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL);
if (ret < 0) {
2022-06-27 06:47:14 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-06-26 10:44:49 +00:00
metaULock(pMeta);
taosMemoryFree(pStbCur);
return NULL;
}
// move to the suid
tdbTbcMoveTo(pStbCur->pCur, &suid, sizeof(suid), &c);
if (c > 0) {
tdbTbcMoveToNext(pStbCur->pCur);
}
return pStbCur;
}
void metaCloseStbCursor(SMStbCursor *pStbCur) {
if (pStbCur) {
if (pStbCur->pMeta) metaULock(pStbCur->pMeta);
if (pStbCur->pCur) {
tdbTbcClose(pStbCur->pCur);
tdbFree(pStbCur->pKey);
tdbFree(pStbCur->pVal);
}
taosMemoryFree(pStbCur);
}
}
tb_uid_t metaStbCursorNext(SMStbCursor *pStbCur) {
int ret;
ret = tdbTbcNext(pStbCur->pCur, &pStbCur->pKey, &pStbCur->kLen, &pStbCur->pVal, &pStbCur->vLen);
if (ret < 0) {
return 0;
}
2022-07-04 08:26:16 +00:00
return *(tb_uid_t *)pStbCur->pKey;
2022-06-26 10:44:49 +00:00
}
2022-10-10 03:00:55 +00:00
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, int lock) {
2022-06-07 07:56:33 +00:00
// SMetaReader mr = {0};
2022-07-04 08:26:16 +00:00
STSchema *pTSchema = NULL;
2022-04-25 12:23:00 +00:00
SSchemaWrapper *pSW = NULL;
STSchemaBuilder sb = {0};
2022-07-04 08:26:16 +00:00
SSchema *pSchema;
2022-04-19 13:10:03 +00:00
2022-10-10 03:00:55 +00:00
pSW = metaGetTableSchema(pMeta, uid, sver, lock);
2022-05-11 13:24:58 +00:00
if (!pSW) return NULL;
2022-05-12 03:34:38 +00:00
tdInitTSchemaBuilder(&sb, pSW->version);
2022-04-19 13:10:03 +00:00
for (int i = 0; i < pSW->nCols; i++) {
pSchema = pSW->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
}
pTSchema = tdGetSchemaFromBuilder(&sb);
2022-05-23 08:30:26 +00:00
2022-04-19 13:10:03 +00:00
tdDestroyTSchemaBuilder(&sb);
2022-04-25 12:23:00 +00:00
taosMemoryFree(pSW->pSchema);
taosMemoryFree(pSW);
2022-04-19 13:10:03 +00:00
return pTSchema;
}
2022-07-04 08:26:16 +00:00
int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema **ppTSchema) {
2022-08-17 07:08:44 +00:00
int32_t code = 0;
2022-07-04 08:26:16 +00:00
void *pData = NULL;
int nData = 0;
2022-08-17 07:08:44 +00:00
SSkmDbKey skmDbKey;
if (sver <= 0) {
SMetaInfo info;
if (metaGetInfo(pMeta, suid ? suid : uid, &info) == 0) {
sver = info.skmVer;
} else {
TBC *pSkmDbC = NULL;
int c;
skmDbKey.uid = suid ? suid : uid;
skmDbKey.sver = INT32_MAX;
tdbTbcOpen(pMeta->pSkmDb, &pSkmDbC, NULL);
metaRLock(pMeta);
if (tdbTbcMoveTo(pSkmDbC, &skmDbKey, sizeof(skmDbKey), &c) < 0) {
metaULock(pMeta);
tdbTbcClose(pSkmDbC);
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
ASSERT(c);
if (c < 0) {
tdbTbcMoveToPrev(pSkmDbC);
}
const void *pKey = NULL;
int32_t nKey = 0;
tdbTbcGet(pSkmDbC, &pKey, &nKey, NULL, NULL);
2022-08-20 14:24:07 +00:00
if (((SSkmDbKey *)pKey)->uid != skmDbKey.uid) {
2022-08-17 07:08:44 +00:00
metaULock(pMeta);
tdbTbcClose(pSkmDbC);
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
sver = ((SSkmDbKey *)pKey)->sver;
metaULock(pMeta);
tdbTbcClose(pSkmDbC);
}
}
2022-07-04 08:26:16 +00:00
2022-08-17 07:08:44 +00:00
ASSERT(sver > 0);
skmDbKey.uid = suid ? suid : uid;
skmDbKey.sver = sver;
2022-07-04 08:26:16 +00:00
metaRLock(pMeta);
2022-08-17 07:08:44 +00:00
if (tdbTbGet(pMeta->pSkmDb, &skmDbKey, sizeof(SSkmDbKey), &pData, &nData) < 0) {
2022-07-04 08:26:16 +00:00
metaULock(pMeta);
2022-08-17 07:08:44 +00:00
code = TSDB_CODE_NOT_FOUND;
goto _exit;
2022-07-04 08:26:16 +00:00
}
metaULock(pMeta);
// decode
SDecoder dc = {0};
SSchemaWrapper schema;
SSchemaWrapper *pSchemaWrapper = &schema;
tDecoderInit(&dc, pData, nData);
2022-10-17 07:28:10 +00:00
(void)tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
2022-07-04 08:26:16 +00:00
tDecoderClear(&dc);
2022-07-07 08:04:12 +00:00
tdbFree(pData);
2022-07-04 08:26:16 +00:00
// convert
STSchemaBuilder sb = {0};
tdInitTSchemaBuilder(&sb, pSchemaWrapper->version);
for (int i = 0; i < pSchemaWrapper->nCols; i++) {
SSchema *pSchema = pSchemaWrapper->pSchema + i;
tdAddColToSchema(&sb, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes);
}
2022-10-19 05:38:01 +00:00
2022-08-17 07:08:44 +00:00
STSchema *pTSchema = tdGetSchemaFromBuilder(&sb);
2022-10-19 05:38:01 +00:00
if (pTSchema == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
2022-07-04 08:26:16 +00:00
tdDestroyTSchemaBuilder(&sb);
*ppTSchema = pTSchema;
taosMemoryFree(pSchemaWrapper->pSchema);
2022-08-17 07:08:44 +00:00
_exit:
2022-07-04 08:26:16 +00:00
return code;
}
// N.B. Called by statusReq per second
int64_t metaGetTbNum(SMeta *pMeta) {
// num of child tables (excluding normal tables , stables and others)
/* int64_t num = 0; */
/* vnodeGetAllCtbNum(pMeta->pVnode, &num); */
return pMeta->pVnode->config.vndStats.numOfCTables + pMeta->pVnode->config.vndStats.numOfNTables;
}
// N.B. Called by statusReq per second
2022-07-18 05:31:13 +00:00
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
// sum of (number of columns of stable - 1) * number of ctables (excluding timestamp column)
if (pMeta->pVnode->config.vndStats.numOfTimeSeries <= 0 ||
++pMeta->pVnode->config.vndStats.itvTimeSeries % (60 * 5) == 0) {
int64_t num = 0;
vnodeGetTimeSeriesNum(pMeta->pVnode, &num);
pMeta->pVnode->config.vndStats.numOfTimeSeries = num;
pMeta->pVnode->config.vndStats.itvTimeSeries = (TD_VID(pMeta->pVnode) % 100) * 2;
}
2022-08-25 08:22:55 +00:00
return pMeta->pVnode->config.vndStats.numOfTimeSeries + pMeta->pVnode->config.vndStats.numOfNTimeSeries;
2022-05-16 15:55:17 +00:00
}
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
typedef struct {
2022-07-04 08:26:16 +00:00
SMeta *pMeta;
TBC *pCur;
2022-05-16 15:55:17 +00:00
tb_uid_t uid;
2022-07-04 08:26:16 +00:00
void *pKey;
void *pVal;
2022-05-16 15:55:17 +00:00
int kLen;
int vLen;
} SMSmaCursor;
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) {
SMSmaCursor *pSmaCur = NULL;
SSmaIdxKey smaIdxKey;
int ret;
int c;
pSmaCur = (SMSmaCursor *)taosMemoryCalloc(1, sizeof(*pSmaCur));
if (pSmaCur == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-04-19 13:10:03 +00:00
return NULL;
}
2022-05-16 15:55:17 +00:00
pSmaCur->pMeta = pMeta;
pSmaCur->uid = uid;
metaRLock(pMeta);
2022-05-18 07:57:29 +00:00
ret = tdbTbcOpen(pMeta->pSmaIdx, &pSmaCur->pCur, NULL);
2022-05-16 15:55:17 +00:00
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pSmaCur);
2022-04-19 13:10:03 +00:00
return NULL;
}
2022-05-16 15:55:17 +00:00
// move to the suid
smaIdxKey.uid = uid;
smaIdxKey.smaUid = INT64_MIN;
2022-05-18 07:57:29 +00:00
tdbTbcMoveTo(pSmaCur->pCur, &smaIdxKey, sizeof(smaIdxKey), &c);
2022-05-16 15:55:17 +00:00
if (c > 0) {
2022-05-18 07:57:29 +00:00
tdbTbcMoveToNext(pSmaCur->pCur);
2022-05-16 15:55:17 +00:00
}
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
return pSmaCur;
}
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
void metaCloseSmaCursor(SMSmaCursor *pSmaCur) {
if (pSmaCur) {
if (pSmaCur->pMeta) metaULock(pSmaCur->pMeta);
if (pSmaCur->pCur) {
2022-05-18 07:57:29 +00:00
tdbTbcClose(pSmaCur->pCur);
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
tdbFree(pSmaCur->pKey);
tdbFree(pSmaCur->pVal);
}
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
taosMemoryFree(pSmaCur);
}
}
tb_uid_t metaSmaCursorNext(SMSmaCursor *pSmaCur) {
int ret;
SSmaIdxKey *pSmaIdxKey;
2022-05-18 07:57:29 +00:00
ret = tdbTbcNext(pSmaCur->pCur, &pSmaCur->pKey, &pSmaCur->kLen, &pSmaCur->pVal, &pSmaCur->vLen);
2022-05-16 15:55:17 +00:00
if (ret < 0) {
return 0;
}
pSmaIdxKey = pSmaCur->pKey;
if (pSmaIdxKey->uid > pSmaCur->uid) {
return 0;
}
return pSmaIdxKey->uid;
}
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
STSmaWrapper *pSW = NULL;
2022-07-04 08:26:16 +00:00
SArray *pSmaIds = NULL;
2022-05-16 15:55:17 +00:00
if (!(pSmaIds = metaGetSmaIdsByTable(pMeta, uid))) {
return NULL;
}
pSW = taosMemoryCalloc(1, sizeof(*pSW));
if (!pSW) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pSW->number = taosArrayGetSize(pSmaIds);
pSW->tSma = taosMemoryCalloc(pSW->number, sizeof(STSma));
if (!pSW->tSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
int64_t smaId;
int smaIdx = 0;
2022-07-04 08:26:16 +00:00
STSma *pTSma = NULL;
2022-05-16 15:55:17 +00:00
for (int i = 0; i < pSW->number; ++i) {
smaId = *(tb_uid_t *)taosArrayGet(pSmaIds, i);
if (metaGetTableEntryByUid(&mr, smaId) < 0) {
tDecoderClear(&mr.coder);
2022-08-02 08:48:49 +00:00
metaWarn("vgId:%d, no entry for tbId:%" PRIi64 ", smaId:%" PRIi64, TD_VID(pMeta->pVnode), uid, smaId);
2022-05-16 15:55:17 +00:00
continue;
}
tDecoderClear(&mr.coder);
2022-05-16 15:55:17 +00:00
pTSma = pSW->tSma + smaIdx;
memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
if (deepCopy) {
if (pTSma->exprLen > 0) {
if (!(pTSma->expr = taosMemoryCalloc(1, pTSma->exprLen))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
2022-05-18 05:46:16 +00:00
memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
2022-04-19 13:10:03 +00:00
}
2022-05-16 15:55:17 +00:00
if (pTSma->tagsFilterLen > 0) {
if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
2022-04-19 13:10:03 +00:00
}
2022-05-18 05:46:16 +00:00
memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
2022-05-16 15:55:17 +00:00
} else {
pTSma->exprLen = 0;
pTSma->expr = NULL;
pTSma->tagsFilterLen = 0;
pTSma->tagsFilter = NULL;
2022-04-19 13:10:03 +00:00
}
2022-05-18 05:46:16 +00:00
2022-05-16 15:55:17 +00:00
++smaIdx;
2022-04-19 13:10:03 +00:00
}
2022-05-16 15:55:17 +00:00
if (smaIdx <= 0) goto _err;
pSW->number = smaIdx;
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
metaReaderClear(&mr);
taosArrayDestroy(pSmaIds);
2022-04-19 13:10:03 +00:00
return pSW;
2022-05-16 15:55:17 +00:00
_err:
metaReaderClear(&mr);
taosArrayDestroy(pSmaIds);
2022-06-01 11:06:58 +00:00
tFreeTSmaWrapper(pSW, deepCopy);
2022-04-19 13:10:03 +00:00
return NULL;
}
2022-05-16 15:55:17 +00:00
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
2022-07-04 08:26:16 +00:00
STSma *pTSma = NULL;
2022-05-16 15:55:17 +00:00
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
if (metaGetTableEntryByUid(&mr, indexUid) < 0) {
2022-08-02 08:48:49 +00:00
metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
2022-05-16 15:55:17 +00:00
metaReaderClear(&mr);
return NULL;
}
pTSma = (STSma *)taosMemoryMalloc(sizeof(STSma));
if (!pTSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaReaderClear(&mr);
return NULL;
}
memcpy(pTSma, mr.me.smaEntry.tsma, sizeof(STSma));
metaReaderClear(&mr);
return pTSma;
2022-04-19 13:10:03 +00:00
}
2022-05-16 15:55:17 +00:00
SArray *metaGetSmaIdsByTable(SMeta *pMeta, tb_uid_t uid) {
2022-07-04 08:26:16 +00:00
SArray *pUids = NULL;
2022-05-16 15:55:17 +00:00
SSmaIdxKey *pSmaIdxKey = NULL;
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
if (!pCur) {
2022-04-19 13:10:03 +00:00
return NULL;
}
2022-05-16 15:55:17 +00:00
while (1) {
tb_uid_t id = metaSmaCursorNext(pCur);
if (id == 0) {
break;
}
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
if (!pUids) {
pUids = taosArrayInit(16, sizeof(tb_uid_t));
2022-04-19 13:10:03 +00:00
if (!pUids) {
2022-05-16 15:55:17 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
return NULL;
2022-04-19 13:10:03 +00:00
}
2022-05-16 15:55:17 +00:00
}
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
pSmaIdxKey = (SSmaIdxKey *)pCur->pKey;
2022-04-19 13:10:03 +00:00
2022-09-28 10:54:39 +00:00
if (!taosArrayPush(pUids, &pSmaIdxKey->smaUid)) {
2022-05-16 15:55:17 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
taosArrayDestroy(pUids);
return NULL;
2022-04-19 13:10:03 +00:00
}
}
metaCloseSmaCursor(pCur);
return pUids;
}
2022-05-16 15:55:17 +00:00
SArray *metaGetSmaTbUids(SMeta *pMeta) {
2022-07-04 08:26:16 +00:00
SArray *pUids = NULL;
2022-05-16 15:55:17 +00:00
SSmaIdxKey *pSmaIdxKey = NULL;
tb_uid_t lastUid = 0;
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, 0);
if (!pCur) {
2022-04-19 13:10:03 +00:00
return NULL;
}
2022-05-16 15:55:17 +00:00
while (1) {
tb_uid_t uid = metaSmaCursorNext(pCur);
if (uid == 0) {
break;
}
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
if (lastUid == uid) {
continue;
}
2022-04-19 13:10:03 +00:00
2022-05-16 15:55:17 +00:00
lastUid = uid;
if (!pUids) {
pUids = taosArrayInit(16, sizeof(tb_uid_t));
if (!pUids) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
return NULL;
}
}
2022-09-28 10:54:39 +00:00
if (!taosArrayPush(pUids, &uid)) {
2022-05-16 15:55:17 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
metaCloseSmaCursor(pCur);
taosArrayDestroy(pUids);
return NULL;
}
2022-04-19 13:10:03 +00:00
}
2022-05-16 15:55:17 +00:00
metaCloseSmaCursor(pCur);
return pUids;
2022-04-22 05:30:15 +00:00
}
2022-04-28 08:31:35 +00:00
#endif
2022-05-12 11:21:44 +00:00
const void *metaGetTableTagVal(void *pTag, int16_t type, STagVal *val) {
2022-08-17 07:28:49 +00:00
STag *tag = (STag *)pTag;
2022-06-02 08:36:48 +00:00
if (type == TSDB_DATA_TYPE_JSON) {
2022-05-31 09:49:33 +00:00
return tag;
}
2022-05-31 15:38:21 +00:00
bool find = tTagGet(tag, val);
2022-06-02 08:36:48 +00:00
if (!find) {
2022-05-31 15:38:21 +00:00
return NULL;
}
2022-08-19 07:12:04 +00:00
2022-08-23 09:28:49 +00:00
#ifdef TAG_FILTER_DEBUG
2022-08-19 07:12:04 +00:00
if (IS_VAR_DATA_TYPE(val->type)) {
char *buf = taosMemoryCalloc(val->nData + 1, 1);
2022-08-19 07:12:04 +00:00
memcpy(buf, val->pData, val->nData);
metaDebug("metaTag table val varchar index:%d cid:%d type:%d value:%s", 1, val->cid, val->type, buf);
taosMemoryFree(buf);
} else {
double dval = 0;
GET_TYPED_DATA(dval, double, val->type, &val->i64);
metaDebug("metaTag table val number index:%d cid:%d type:%d value:%f", 1, val->cid, val->type, dval);
}
SArray *pTagVals = NULL;
tTagToValArray((STag *)pTag, &pTagVals);
2022-08-19 07:12:04 +00:00
for (int i = 0; i < taosArrayGetSize(pTagVals); i++) {
STagVal *pTagVal = (STagVal *)taosArrayGet(pTagVals, i);
2022-08-19 07:12:04 +00:00
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
char *buf = taosMemoryCalloc(pTagVal->nData + 1, 1);
2022-08-19 07:12:04 +00:00
memcpy(buf, pTagVal->pData, pTagVal->nData);
metaDebug("metaTag table varchar index:%d cid:%d type:%d value:%s", i, pTagVal->cid, pTagVal->type, buf);
taosMemoryFree(buf);
} else {
double dval = 0;
GET_TYPED_DATA(dval, double, pTagVal->type, &pTagVal->i64);
metaDebug("metaTag table number index:%d cid:%d type:%d value:%f", i, pTagVal->cid, pTagVal->type, dval);
}
}
2022-08-23 09:28:49 +00:00
#endif
2022-08-19 07:12:04 +00:00
2022-05-31 15:38:21 +00:00
return val;
2022-05-31 10:34:17 +00:00
}
2022-06-01 01:58:58 +00:00
typedef struct {
2022-07-04 08:26:16 +00:00
SMeta *pMeta;
TBC *pCur;
2022-06-01 01:58:58 +00:00
tb_uid_t suid;
int16_t cid;
int16_t type;
2022-07-04 08:26:16 +00:00
void *pKey;
void *pVal;
2022-06-01 01:58:58 +00:00
int32_t kLen;
int32_t vLen;
} SIdxCursor;
2022-10-19 09:54:06 +00:00
int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
int32_t ret = 0;
SIdxCursor *pCursor = NULL;
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
pCursor->pMeta = pMeta;
pCursor->suid = param->suid;
pCursor->cid = param->cid;
pCursor->type = param->type;
metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL);
if (ret != 0) {
goto END;
}
int64_t uidLimit = param->reverse ? INT64_MAX : 0;
SCtimeIdxKey ctimeKey = {.ctime = *(int64_t *)(param->val), .uid = uidLimit};
SCtimeIdxKey *pCtimeKey = &ctimeKey;
int cmp = 0;
if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) {
goto END;
}
2022-10-24 10:59:12 +00:00
2022-10-19 09:54:06 +00:00
int32_t valid = 0;
while (1) {
void *entryKey = NULL;
int32_t nEntryKey = -1;
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL);
if (valid < 0) break;
SCtimeIdxKey *p = entryKey;
2022-10-20 13:12:52 +00:00
2022-10-19 09:54:06 +00:00
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type);
if (cmp == 0) taosArrayPush(pUids, &p->uid);
2022-10-24 10:35:59 +00:00
if (param->reverse == false) {
if (cmp == -1) break;
} else if (param->reverse) {
if (cmp == 1) break;
}
2022-10-19 09:54:06 +00:00
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) break;
}
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
taosMemoryFree(pCursor);
return ret;
}
int32_t metaFilterTableName(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
int32_t ret = 0;
char *buf = NULL;
STagIdxKey *pKey = NULL;
int32_t nKey = 0;
SIdxCursor *pCursor = NULL;
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
pCursor->pMeta = pMeta;
pCursor->suid = param->suid;
pCursor->cid = param->cid;
pCursor->type = param->type;
2022-10-20 05:53:04 +00:00
char *pName = param->val;
2022-10-19 09:54:06 +00:00
metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pNameIdx, &pCursor->pCur, NULL);
2022-10-20 05:53:04 +00:00
if (ret != 0) {
goto END;
}
int cmp = 0;
if (tdbTbcMoveTo(pCursor->pCur, pName, strlen(pName) + 1, &cmp) < 0) {
goto END;
}
bool first = true;
int32_t valid = 0;
while (1) {
void *pEntryKey = NULL, *pEntryVal = NULL;
int32_t nEntryKey = -1, nEntryVal = 0;
valid = tdbTbcGet(pCursor->pCur, (const void **)pEntryKey, &nEntryKey, (const void **)&pEntryVal, &nEntryVal);
if (valid < 0) break;
char *pTableKey = (char *)pEntryKey;
int32_t cmp = (*param->filterFunc)(pTableKey, pName, pCursor->type);
if (cmp == 0) {
tb_uid_t tuid = *(tb_uid_t *)pEntryVal;
taosArrayPush(pUids, &tuid);
} else if (cmp == 1) {
// next
} else {
break;
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) {
break;
}
}
2022-10-19 09:54:06 +00:00
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
taosMemoryFree(buf);
taosMemoryFree(pKey);
taosMemoryFree(pCursor);
return ret;
}
int32_t metaFilterTtl(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
int32_t ret = 0;
char *buf = NULL;
STtlIdxKey *pKey = NULL;
int32_t nKey = 0;
SIdxCursor *pCursor = NULL;
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
pCursor->pMeta = pMeta;
pCursor->suid = param->suid;
pCursor->cid = param->cid;
pCursor->type = param->type;
metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL);
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
taosMemoryFree(buf);
taosMemoryFree(pKey);
taosMemoryFree(pCursor);
return ret;
// impl later
return 0;
}
2022-07-23 07:32:18 +00:00
int32_t metaFilterTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
int32_t ret = 0;
char *buf = NULL;
2022-06-01 01:58:58 +00:00
2022-07-23 07:32:18 +00:00
STagIdxKey *pKey = NULL;
int32_t nKey = 0;
SIdxCursor *pCursor = NULL;
2022-06-01 01:58:58 +00:00
pCursor = (SIdxCursor *)taosMemoryCalloc(1, sizeof(SIdxCursor));
pCursor->pMeta = pMeta;
pCursor->suid = param->suid;
pCursor->cid = param->cid;
pCursor->type = param->type;
metaRLock(pMeta);
2022-10-19 09:54:06 +00:00
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL);
2022-06-01 01:58:58 +00:00
if (ret < 0) {
goto END;
}
2022-07-23 07:32:18 +00:00
int32_t maxSize = 0;
2022-06-01 01:58:58 +00:00
int32_t nTagData = 0;
2022-07-04 08:26:16 +00:00
void *tagData = NULL;
2022-06-02 08:36:48 +00:00
2022-06-10 10:51:48 +00:00
if (param->val == NULL) {
2022-06-17 06:02:51 +00:00
metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode));
2022-10-19 09:54:06 +00:00
ret = -1;
2022-10-17 08:29:50 +00:00
goto END;
2022-06-02 08:36:48 +00:00
} else {
2022-06-10 10:51:48 +00:00
if (IS_VAR_DATA_TYPE(param->type)) {
tagData = varDataVal(param->val);
nTagData = varDataLen(param->val);
if (param->type == TSDB_DATA_TYPE_NCHAR) {
maxSize = 4 * nTagData + 1;
buf = taosMemoryCalloc(1, maxSize);
if (false == taosMbsToUcs4(tagData, nTagData, (TdUcs4 *)buf, maxSize, &maxSize)) {
goto END;
}
2022-06-10 10:24:04 +00:00
2022-06-10 10:51:48 +00:00
tagData = buf;
nTagData = maxSize;
}
} else {
tagData = param->val;
nTagData = tDataTypes[param->type].bytes;
2022-06-10 10:24:04 +00:00
}
2022-06-01 01:58:58 +00:00
}
2022-06-02 08:36:48 +00:00
ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, tagData, nTagData, pCursor->type,
2022-06-01 01:58:58 +00:00
param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
2022-06-10 10:24:04 +00:00
2022-06-01 01:58:58 +00:00
if (ret != 0) {
goto END;
}
int cmp = 0;
if (tdbTbcMoveTo(pCursor->pCur, pKey, nKey, &cmp) < 0) {
goto END;
}
2022-06-02 08:36:48 +00:00
2022-06-14 05:44:13 +00:00
bool first = true;
2022-07-23 07:32:18 +00:00
int32_t valid = 0;
2022-06-01 01:58:58 +00:00
while (1) {
2022-07-23 07:32:18 +00:00
void *entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal;
2022-06-01 01:58:58 +00:00
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
if (valid < 0) {
2022-10-20 05:53:04 +00:00
tdbFree(entryVal);
2022-06-01 01:58:58 +00:00
break;
}
STagIdxKey *p = entryKey;
2022-10-23 04:25:52 +00:00
if (p == NULL) break;
2022-06-14 05:44:13 +00:00
if (p->type != pCursor->type) {
if (first) {
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) break;
continue;
} else {
break;
}
}
2022-10-23 04:25:52 +00:00
if (p->suid != pKey->suid) {
2022-08-17 07:28:49 +00:00
break;
}
2022-06-14 05:44:13 +00:00
first = false;
2022-10-17 08:29:50 +00:00
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
if (cmp == 0) {
// match
tb_uid_t tuid = 0;
if (IS_VAR_DATA_TYPE(pKey->type)) {
tuid = *(tb_uid_t *)(p->data + varDataTLen(p->data));
2022-06-01 01:58:58 +00:00
} else {
2022-10-17 08:29:50 +00:00
tuid = *(tb_uid_t *)(p->data + tDataTypes[pCursor->type].bytes);
2022-06-01 01:58:58 +00:00
}
2022-10-17 08:29:50 +00:00
taosArrayPush(pUids, &tuid);
} else if (cmp == 1) {
// not match but should continue to iter
} else {
// not match and no more result
break;
2022-06-01 01:58:58 +00:00
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) {
break;
}
}
2022-07-23 07:32:18 +00:00
2022-06-01 01:58:58 +00:00
END:
if (pCursor->pMeta) metaULock(pCursor->pMeta);
if (pCursor->pCur) tdbTbcClose(pCursor->pCur);
2022-06-10 10:24:04 +00:00
taosMemoryFree(buf);
2022-07-23 07:32:18 +00:00
taosMemoryFree(pKey);
2022-06-01 01:58:58 +00:00
taosMemoryFree(pCursor);
return ret;
2022-06-02 08:36:48 +00:00
}
2022-08-16 06:22:55 +00:00
2022-10-05 10:32:30 +00:00
static int32_t metaGetTableTagByUid(SMeta *pMeta, int64_t suid, int64_t uid, void **tag, int32_t *len, bool lock) {
2022-10-01 05:49:22 +00:00
int ret = 0;
if (lock) {
metaRLock(pMeta);
}
SCtbIdxKey ctbIdxKey = {.suid = suid, .uid = uid};
ret = tdbTbGet(pMeta->pCtbIdx, &ctbIdxKey, sizeof(SCtbIdxKey), tag, len);
if (lock) {
metaULock(pMeta);
}
return ret;
}
2022-10-05 10:32:30 +00:00
int32_t metaGetTableTagsByUids(SMeta *pMeta, int64_t suid, SArray *uidList, SHashObj *tags) {
2022-10-01 05:49:22 +00:00
const int32_t LIMIT = 128;
int32_t isLock = false;
2022-09-29 15:13:54 +00:00
int32_t sz = uidList ? taosArrayGetSize(uidList) : 0;
for (int i = 0; i < sz; i++) {
2022-10-01 05:49:22 +00:00
tb_uid_t *id = taosArrayGet(uidList, i);
if (i % LIMIT == 0) {
if (isLock) metaULock(pMeta);
metaRLock(pMeta);
isLock = true;
}
if (taosHashGet(tags, id, sizeof(tb_uid_t)) == NULL) {
void *val = NULL;
int32_t len = 0;
if (metaGetTableTagByUid(pMeta, suid, *id, &val, &len, false) == 0) {
taosHashPut(tags, id, sizeof(tb_uid_t), val, len);
tdbFree(val);
2022-10-05 10:32:30 +00:00
} else {
metaError("vgId:%d, failed to table IDs, suid: %" PRId64 ", uid: %" PRId64 "", TD_VID(pMeta->pVnode), suid,
*id);
2022-10-01 05:49:22 +00:00
}
2022-09-29 15:13:54 +00:00
}
}
2022-10-01 05:49:22 +00:00
if (isLock) metaULock(pMeta);
2022-09-29 15:13:54 +00:00
return 0;
}
2022-10-01 05:49:22 +00:00
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList, SHashObj *tags) {
2022-10-10 03:00:55 +00:00
SMCtbCursor *pCur = metaOpenCtbCursor(pMeta, suid, 1);
SHashObj *uHash = NULL;
2022-08-17 07:28:49 +00:00
size_t len = taosArrayGetSize(uidList); // len > 0 means there already have uids
if (len > 0) {
uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
2022-08-17 07:28:49 +00:00
for (int i = 0; i < len; i++) {
int64_t *uid = taosArrayGet(uidList, i);
taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
}
}
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
break;
}
2022-08-17 03:09:49 +00:00
if (len > 0 && taosHashGet(uHash, &id, sizeof(int64_t)) == NULL) {
continue;
2022-08-17 07:28:49 +00:00
} else if (len == 0) {
taosArrayPush(uidList, &id);
}
taosHashPut(tags, &id, sizeof(int64_t), pCur->pVal, pCur->vLen);
}
taosHashCleanup(uHash);
2022-10-10 03:00:55 +00:00
metaCloseCtbCursor(pCur, 1);
return TSDB_CODE_SUCCESS;
}
2022-08-17 03:09:49 +00:00
2022-08-19 03:52:43 +00:00
int32_t metaCacheGet(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo);
2022-08-16 06:22:55 +00:00
int32_t metaGetInfo(SMeta *pMeta, int64_t uid, SMetaInfo *pInfo) {
int32_t code = 0;
void *pData = NULL;
int nData = 0;
metaRLock(pMeta);
// search cache
if (metaCacheGet(pMeta, uid, pInfo) == 0) {
metaULock(pMeta);
goto _exit;
}
// search TDB
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData) < 0) {
// not found
metaULock(pMeta);
code = TSDB_CODE_NOT_FOUND;
goto _exit;
}
metaULock(pMeta);
pInfo->uid = uid;
pInfo->suid = ((SUidIdxVal *)pData)->suid;
pInfo->version = ((SUidIdxVal *)pData)->version;
pInfo->skmVer = ((SUidIdxVal *)pData)->skmVer;
// upsert the cache
metaWLock(pMeta);
metaCacheUpsert(pMeta, pInfo);
metaULock(pMeta);
_exit:
tdbFree(pData);
return code;
}
int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) {
int32_t code = 0;
metaRLock(pMeta);
// fast path: search cache
if (metaStatsCacheGet(pMeta, uid, pInfo) == TSDB_CODE_SUCCESS) {
metaULock(pMeta);
goto _exit;
}
// slow path: search TDB
int64_t ctbNum = 0;
vnodeGetCtbNum(pMeta->pVnode, uid, &ctbNum);
metaULock(pMeta);
pInfo->uid = uid;
pInfo->ctbNum = ctbNum;
// upsert the cache
metaWLock(pMeta);
metaStatsCacheUpsert(pMeta, pInfo);
metaULock(pMeta);
_exit:
return code;
}
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta) {
SMetaStbStats stats = {0};
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
stats.ctbNum += delta;
metaStatsCacheUpsert(pMeta, &stats);
}
}