TDengine/tools/rocks-reader/rreader.cpp
2025-11-18 15:07:56 +08:00

378 lines
11 KiB
C++

//#ifdef USE_ROCKSDB
#include "rocksdb/c.h"
//#endif
#include <inttypes.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "../../include/client/taos.h"
#define TSKEY int64_t
#define IS_VAR_DATA_TYPE(t) \
(((t) == TSDB_DATA_TYPE_VARCHAR) || ((t) == TSDB_DATA_TYPE_VARBINARY) || ((t) == TSDB_DATA_TYPE_NCHAR) || \
((t) == TSDB_DATA_TYPE_JSON) || ((t) == TSDB_DATA_TYPE_GEOMETRY) || ((t) == TSDB_DATA_TYPE_BLOB) || \
((t) == TSDB_DATA_TYPE_MEDIUMBLOB))
// SColVal ================================
#define CV_FLAG_VALUE ((int8_t)0x0)
#define CV_FLAG_NONE ((int8_t)0x1)
#define CV_FLAG_NULL ((int8_t)0x2)
#define COL_VAL_IS_NONE(CV) ((CV)->flag == CV_FLAG_NONE)
#define COL_VAL_IS_NULL(CV) ((CV)->flag == CV_FLAG_NULL)
#define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE)
typedef int64_t tb_uid_t;
struct SValue {
int8_t type;
union {
int64_t val;
struct {
uint8_t *pData;
uint32_t nData;
};
};
};
#define TD_MAX_PK_COLS 2
struct SRowKey {
TSKEY ts;
uint8_t numOfPKs;
SValue pks[TD_MAX_PK_COLS];
};
struct SColVal {
int16_t cid;
int8_t flag;
SValue value;
};
typedef enum {
READER_EXEC_DATA = 0x1,
READER_EXEC_ROWS = 0x2,
} EExecMode;
#define LAST_COL_VERSION_1 (0x1) // add primary key, version
#define LAST_COL_VERSION_2 (0x2) // add cache status
#define LAST_COL_VERSION LAST_COL_VERSION_2
typedef enum {
TSDB_LAST_CACHE_VALID = 0, // last_cache has valid data
TSDB_LAST_CACHE_NO_CACHE, // last_cache has no data, but tsdb may have data
} ELastCacheStatus;
typedef struct {
SRowKey rowKey;
int8_t dirty;
SColVal colVal;
ELastCacheStatus cacheStatus;
} SLastCol;
typedef struct {
TSKEY ts;
int8_t dirty;
struct {
int16_t cid;
int8_t type;
int8_t flag;
union {
int64_t val;
struct {
uint32_t nData;
uint8_t *pData;
};
} value;
} colVal;
} SLastColV0;
static int32_t tsdbCacheDeserializeV0(char const *value, SLastCol *pLastCol) {
SLastColV0 *pLastColV0 = (SLastColV0 *)value;
pLastCol->rowKey.ts = pLastColV0->ts;
pLastCol->rowKey.numOfPKs = 0;
pLastCol->dirty = pLastColV0->dirty;
pLastCol->colVal.cid = pLastColV0->colVal.cid;
pLastCol->colVal.flag = pLastColV0->colVal.flag;
pLastCol->colVal.value.type = pLastColV0->colVal.type;
pLastCol->cacheStatus = TSDB_LAST_CACHE_VALID;
if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) {
pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
pLastCol->colVal.value.pData = NULL;
if (pLastCol->colVal.value.nData > 0) {
pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
}
return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
} else if (pLastCol->colVal.value.type == TSDB_DATA_TYPE_DECIMAL) {
pLastCol->colVal.value.nData = pLastColV0->colVal.value.nData;
pLastCol->colVal.value.pData = (uint8_t *)(&pLastColV0[1]);
return sizeof(SLastColV0) + pLastColV0->colVal.value.nData;
} else {
pLastCol->colVal.value.val = pLastColV0->colVal.value.val;
return sizeof(SLastColV0);
}
}
static int32_t tsdbCacheDeserialize(char const *value, size_t size, SLastCol **ppLastCol) {
if (!value) {
return -1;
}
SLastCol *pLastCol = (SLastCol *)calloc(1, sizeof(SLastCol));
if (NULL == pLastCol) {
return -2;
}
int32_t offset = tsdbCacheDeserializeV0(value, pLastCol);
if (offset == size) {
// version 0
*ppLastCol = pLastCol;
return 0;
} else if (offset > size) {
free(pLastCol);
return -3;
}
// version
int8_t version = *(int8_t *)(value + offset);
offset += sizeof(int8_t);
// numOfPKs
pLastCol->rowKey.numOfPKs = *(uint8_t *)(value + offset);
offset += sizeof(uint8_t);
// pks
for (int32_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) {
pLastCol->rowKey.pks[i] = *(SValue *)(value + offset);
offset += sizeof(SValue);
if (IS_VAR_DATA_TYPE(pLastCol->rowKey.pks[i].type)) {
pLastCol->rowKey.pks[i].pData = NULL;
if (pLastCol->rowKey.pks[i].nData > 0) {
pLastCol->rowKey.pks[i].pData = (uint8_t *)value + offset;
offset += pLastCol->rowKey.pks[i].nData;
}
}
}
if (version >= LAST_COL_VERSION_2) {
pLastCol->cacheStatus = *(ELastCacheStatus *)(value + offset);
}
if (offset > size) {
free(pLastCol);
return -3;
}
*ppLastCol = pLastCol;
return 0;
}
enum {
LFLAG_LAST_ROW = 0,
LFLAG_LAST = 1,
};
typedef struct {
tb_uid_t uid;
int16_t cid;
int8_t lflag;
} SLastKey;
static const char *myCmpName(void *state) {
(void)state;
return "myCmp";
}
static void myCmpDestroy(void *state) { (void)state; }
static int myCmp(void *state, const char *a, size_t alen, const char *b, size_t blen) {
(void)state;
(void)alen;
(void)blen;
SLastKey *lhs = (SLastKey *)a;
SLastKey *rhs = (SLastKey *)b;
if (lhs->uid < rhs->uid) {
return -1;
} else if (lhs->uid > rhs->uid) {
return 1;
}
if (lhs->cid < rhs->cid) {
return -1;
} else if (lhs->cid > rhs->cid) {
return 1;
}
if ((lhs->lflag & LFLAG_LAST) < (rhs->lflag & LFLAG_LAST)) {
return -1;
} else if ((lhs->lflag & LFLAG_LAST) > (rhs->lflag & LFLAG_LAST)) {
return 1;
}
return 0;
}
void printUsage(const char *progName) {
printf("Usage: %s [options]\n", progName);
printf("Options:\n");
printf(" -t <type> Specify cache type to print: last, last_row, or all (default: all)\n");
printf(" -p <path> Specify path to cache.rdb file (default: ./cache.rdb)\n");
printf(" -h, --help, -help Show this help message\n");
printf("\nExamples:\n");
printf(" %s -t last\n", progName);
printf(" %s -t last_row\n", progName);
printf(" %s -t all -p /path/to/cache.rdb\n", progName);
}
int main(int argc, char *argv[]) {
char *err = NULL;
rocksdb_options_t *options = rocksdb_options_create();
char cachePath[256] = "./cache.rdb";
bool printLast = true;
bool printLastRow = true;
// Parse command line arguments
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "-help") == 0) {
printUsage(argv[0]);
rocksdb_options_destroy(options);
return 0;
} else if (strcmp(argv[i], "-t") == 0) {
if (i + 1 < argc) {
i++;
if (strcmp(argv[i], "last") == 0) {
printLast = true;
printLastRow = false;
} else if (strcmp(argv[i], "last_row") == 0) {
printLast = false;
printLastRow = true;
} else if (strcmp(argv[i], "all") == 0) {
printLast = true;
printLastRow = true;
} else {
fprintf(stderr, "Error: Invalid type '%s'. Use: last, last_row, or all\n", argv[i]);
printUsage(argv[0]);
rocksdb_options_destroy(options);
return 1;
}
} else {
fprintf(stderr, "Error: -t option requires an argument\n");
printUsage(argv[0]);
rocksdb_options_destroy(options);
return 1;
}
} else if (strcmp(argv[i], "-p") == 0) {
if (i + 1 < argc) {
i++;
strncpy(cachePath, argv[i], sizeof(cachePath) - 1);
cachePath[sizeof(cachePath) - 1] = '\0';
} else {
fprintf(stderr, "Error: -p option requires an argument\n");
printUsage(argv[0]);
rocksdb_options_destroy(options);
return 1;
}
} else {
fprintf(stderr, "Error: Unknown option '%s'\n", argv[i]);
printUsage(argv[0]);
rocksdb_options_destroy(options);
return 1;
}
}
rocksdb_comparator_t *cmp = rocksdb_comparator_create(NULL, myCmpDestroy, myCmp, myCmpName);
rocksdb_options_set_comparator(options, cmp);
rocksdb_t *db = rocksdb_open(options, cachePath, &err);
if (!db) {
fprintf(stderr, "Failed to open rocksdb at path: %s\n", cachePath);
if (err) {
fprintf(stderr, "Error: %s\n", err);
}
rocksdb_comparator_destroy(cmp);
rocksdb_options_destroy(options);
exit(-1);
}
rocksdb_iterator_t *rocksdb_create_iterator(rocksdb_t * db, const rocksdb_readoptions_t *options);
unsigned char rocksdb_iter_valid(const rocksdb_iterator_t *);
void rocksdb_iter_seek_to_first(rocksdb_iterator_t *);
void rocksdb_iter_seek_to_last(rocksdb_iterator_t *);
void rocksdb_iter_next(rocksdb_iterator_t *);
const char *rocksdb_iter_key(const rocksdb_iterator_t *, size_t *klen);
const char *rocksdb_iter_value(const rocksdb_iterator_t *, size_t *vlen);
void rocksdb_iter_get_error(const rocksdb_iterator_t *, char **errptr);
void rocksdb_iter_destroy(rocksdb_iterator_t *);
rocksdb_readoptions_t *readoptions = rocksdb_readoptions_create();
rocksdb_iterator_t *iter = rocksdb_create_iterator(db, readoptions);
if (!iter) {
fprintf(stderr, "failed to open rocksdb\n");
exit(-1);
}
for (rocksdb_iter_seek_to_first(iter); rocksdb_iter_valid(iter); rocksdb_iter_next(iter)) {
size_t key_len, value_len;
const char *key = rocksdb_iter_key(iter, &key_len);
const char *value = rocksdb_iter_value(iter, &value_len);
SLastCol *pLastCol = NULL;
int32_t code = tsdbCacheDeserialize(value, value_len, &pLastCol);
if (code) {
fprintf(stderr, "rocksdb/err: %d\n", code);
exit(-1);
}
SLastKey *pLastKey = (SLastKey *)key;
// Check if we should print this record based on lflag
bool shouldPrint = false;
const char *cacheType = "";
if (LFLAG_LAST == pLastKey->lflag && printLast) {
shouldPrint = true;
cacheType = "[LAST]";
} else if (LFLAG_LAST_ROW == pLastKey->lflag && printLastRow) {
shouldPrint = true;
cacheType = "[LAST_ROW]";
}
if (shouldPrint) {
if (!COL_VAL_IS_VALUE(&pLastCol->colVal)) {
bool none = COL_VAL_IS_NONE(&pLastCol->colVal);
bool null = COL_VAL_IS_NULL(&pLastCol->colVal);
if (none) {
printf("%s none uid: %" PRId64 ", cid: %" PRId16 "\n", cacheType, pLastKey->uid, pLastKey->cid);
}
if (null) {
printf("%s null uid: %" PRId64 ", cid: %" PRId16 "\n", cacheType, pLastKey->uid, pLastKey->cid);
}
}
}
free(pLastCol);
}
rocksdb_iter_destroy(iter);
rocksdb_readoptions_destroy(readoptions);
rocksdb_comparator_destroy(cmp);
rocksdb_options_destroy(options);
rocksdb_close(db);
if (err) {
fprintf(stderr, "rocksdb/err: %s\n", err);
exit(-1);
}
return 0;
}