TDengine/source/dnode/vnode/src/tq/tqOffset.c

151 lines
4.5 KiB
C
Raw Normal View History

2022-02-15 02:11:34 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
2022-04-26 11:04:26 +00:00
#include "tq.h"
2022-02-15 02:11:34 +00:00
struct STqOffsetStore {
char* fname;
2022-06-14 05:58:40 +00:00
STQ* pTq;
SHashObj* pHash; // SHashObj<subscribeKey, offset>
2022-02-15 02:11:34 +00:00
};
static char* buildFileName(const char* path) {
int32_t len = strlen(path);
char* fname = taosMemoryCalloc(1, len + 20);
snprintf(fname, len + 20, "%s/offset", path);
return fname;
}
2022-06-14 05:58:40 +00:00
STqOffsetStore* tqOffsetOpen(STQ* pTq) {
STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
2022-02-15 02:11:34 +00:00
if (pStore == NULL) {
return NULL;
}
pStore->pTq = pTq;
pTq->pOffsetStore = pStore;
2022-02-15 02:11:34 +00:00
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
2022-06-14 05:58:40 +00:00
if (pStore->pHash == NULL) {
if (pStore->pHash) taosHashCleanup(pStore->pHash);
return NULL;
}
char* fname = buildFileName(pStore->pTq->path);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
2022-06-23 14:05:00 +00:00
taosMemoryFree(fname);
2022-06-14 05:58:40 +00:00
if (pFile != NULL) {
STqOffsetHead head = {0};
int64_t code;
while (1) {
if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) {
2022-06-14 07:22:39 +00:00
if (code == 0) {
2022-06-14 05:58:40 +00:00
break;
} else {
ASSERT(0);
// TODO handle error
}
}
int32_t size = htonl(head.size);
void* memBuf = taosMemoryCalloc(1, size);
if ((code = taosReadFile(pFile, memBuf, size)) != size) {
ASSERT(0);
// TODO handle error
}
STqOffset offset;
SDecoder decoder;
tDecoderInit(&decoder, memBuf, size);
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
ASSERT(0);
}
tDecoderClear(&decoder);
if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
ASSERT(0);
// TODO
}
}
taosCloseFile(&pFile);
}
2022-02-15 02:11:34 +00:00
return pStore;
}
2022-06-14 05:58:40 +00:00
void tqOffsetClose(STqOffsetStore* pStore) {
tqOffsetSnapshot(pStore);
taosHashCleanup(pStore->pHash);
2022-07-05 11:22:01 +00:00
taosMemoryFree(pStore);
2022-06-14 05:58:40 +00:00
}
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey));
}
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
2022-06-29 12:57:15 +00:00
/*ASSERT(pOffset->val.type == TMQ_OFFSET__LOG);*/
/*ASSERT(pOffset->val.version >= 0);*/
2022-06-14 05:58:40 +00:00
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}
2022-07-01 06:39:21 +00:00
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
}
2022-06-14 05:58:40 +00:00
int32_t tqOffsetSnapshot(STqOffsetStore* pStore) {
// open file
// TODO file name should be with a version
char* fname = buildFileName(pStore->pTq->path);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
2022-06-23 14:05:00 +00:00
taosMemoryFree(fname);
2022-06-14 05:58:40 +00:00
if (pFile == NULL) {
ASSERT(0);
return -1;
}
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pStore->pHash, pIter);
if (pIter == NULL) break;
STqOffset* pOffset = (STqOffset*)pIter;
int32_t bodyLen;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
ASSERT(code == 0);
if (code < 0) {
ASSERT(0);
taosHashCancelIterate(pStore->pHash, pIter);
return -1;
}
int32_t totLen = sizeof(STqOffsetHead) + bodyLen;
void* buf = taosMemoryCalloc(1, totLen);
void* abuf = POINTER_SHIFT(buf, sizeof(STqOffsetHead));
((STqOffsetHead*)buf)->size = htonl(bodyLen);
SEncoder encoder;
tEncoderInit(&encoder, abuf, bodyLen);
tEncodeSTqOffset(&encoder, pOffset);
// write file
int64_t writeLen;
2022-06-16 14:00:33 +00:00
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
2022-06-14 05:58:40 +00:00
ASSERT(0);
2022-07-08 10:00:03 +00:00
tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
2022-06-14 05:58:40 +00:00
taosHashCancelIterate(pStore->pHash, pIter);
return -1;
}
}
// close and rename file
taosCloseFile(&pFile);
return 0;
}