TDengine/source/dnode/vnode/src/tsdb/tsdbCommit.c

1694 lines
51 KiB
C
Raw Normal View History

2021-12-15 06:48:47 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-04-26 11:04:26 +00:00
#include "tsdb.h"
2021-12-15 06:48:47 +00:00
2022-09-06 09:41:06 +00:00
typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT;
2022-08-26 13:57:02 +00:00
2022-09-30 02:52:17 +00:00
#define USE_STREAM_COMPRESSION 0
2022-09-29 09:06:18 +00:00
2022-08-26 07:35:43 +00:00
typedef struct {
SRBTreeNode n;
SRowInfo r;
2022-08-26 13:57:02 +00:00
EDataIterT type;
2022-08-26 07:35:43 +00:00
union {
struct {
int32_t iTbDataP;
STbDataIter iter;
}; // memory data iter
struct {
2022-09-05 09:31:41 +00:00
int32_t iStt;
SArray *aSttBlk;
int32_t iSttBlk;
2022-08-26 07:35:43 +00:00
SBlockData bData;
int32_t iRow;
2022-09-05 09:31:41 +00:00
}; // stt file data iter
2022-08-26 07:35:43 +00:00
};
} SDataIter;
2022-06-14 06:57:07 +00:00
typedef struct {
2022-06-14 13:19:46 +00:00
STsdb *pTsdb;
2022-06-10 09:49:01 +00:00
/* commit data */
2022-06-23 03:08:19 +00:00
int64_t commitID;
2022-06-10 06:48:21 +00:00
int32_t minutes;
int8_t precision;
2022-06-13 13:13:37 +00:00
int32_t minRow;
int32_t maxRow;
2022-06-24 07:42:17 +00:00
int8_t cmprAlg;
2022-09-08 08:56:29 +00:00
int8_t sttTrigger;
2022-08-23 02:59:30 +00:00
SArray *aTbDataP; // memory
STsdbFS fs; // disk
2022-06-14 13:19:46 +00:00
// --------------
2022-06-20 06:41:17 +00:00
TSKEY nextKey; // reset by each table commit
2022-06-14 13:19:46 +00:00
int32_t commitFid;
2022-12-07 07:11:37 +00:00
int32_t expLevel;
2022-06-14 13:19:46 +00:00
TSKEY minKey;
TSKEY maxKey;
2022-06-10 06:48:21 +00:00
// commit file data
2022-07-30 12:08:04 +00:00
struct {
SDataFReader *pReader;
2022-08-23 05:49:10 +00:00
SArray *aBlockIdx; // SArray<SBlockIdx>
int32_t iBlockIdx;
SBlockIdx *pBlockIdx;
2022-09-02 03:16:23 +00:00
SMapData mBlock; // SMapData<SDataBlk>
2022-08-23 05:49:10 +00:00
SBlockData bData;
2022-07-30 12:08:04 +00:00
} dReader;
2022-08-26 07:35:43 +00:00
struct {
SDataIter *pIter;
SRBTree rbt;
2022-08-26 13:57:02 +00:00
SDataIter dataIter;
2022-09-08 09:58:33 +00:00
SDataIter aDataIter[TSDB_MAX_STT_TRIGGER];
2022-08-27 09:11:00 +00:00
int8_t toLastOnly;
2022-08-26 07:35:43 +00:00
};
2022-07-30 12:08:04 +00:00
struct {
SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx>
2022-09-05 09:31:41 +00:00
SArray *aSttBlk; // SArray<SSttBlk>
2022-09-02 03:16:23 +00:00
SMapData mBlock; // SMapData<SDataBlk>
2022-07-30 12:08:04 +00:00
SBlockData bData;
2022-09-29 09:06:18 +00:00
#if USE_STREAM_COMPRESSION
2022-09-23 08:45:38 +00:00
SDiskDataBuilder *pBuilder;
2022-09-29 09:06:18 +00:00
#else
SBlockData bDatal;
#endif
2022-07-30 12:08:04 +00:00
} dWriter;
SSkmInfo skmTable;
SSkmInfo skmRow;
2022-06-10 09:49:01 +00:00
/* commit del */
2022-06-10 11:50:06 +00:00
SDelFReader *pDelFReader;
SDelFWriter *pDelFWriter;
2022-06-28 07:20:46 +00:00
SArray *aDelIdx; // SArray<SDelIdx>
SArray *aDelIdxN; // SArray<SDelIdx>
SArray *aDelData; // SArray<SDelData>
2022-06-14 06:57:07 +00:00
} SCommitter;
2022-01-08 04:39:12 +00:00
2022-12-04 07:14:04 +00:00
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo);
2022-06-10 06:48:21 +00:00
static int32_t tsdbCommitData(SCommitter *pCommitter);
static int32_t tsdbCommitDel(SCommitter *pCommitter);
static int32_t tsdbCommitCache(SCommitter *pCommitter);
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno);
2022-08-26 11:07:51 +00:00
static int32_t tsdbNextCommitRow(SCommitter *pCommitter);
2022-09-06 09:41:06 +00:00
int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
2022-08-26 11:07:51 +00:00
SRowInfo *pInfo1 = (SRowInfo *)p1;
SRowInfo *pInfo2 = (SRowInfo *)p2;
if (pInfo1->suid < pInfo2->suid) {
return -1;
} else if (pInfo1->suid > pInfo2->suid) {
return 1;
}
if (pInfo1->uid < pInfo2->uid) {
return -1;
} else if (pInfo1->uid > pInfo2->uid) {
return 1;
}
return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
}
2022-01-08 04:39:12 +00:00
2022-06-07 08:49:25 +00:00
int32_t tsdbBegin(STsdb *pTsdb) {
2022-06-10 06:48:21 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-01 10:34:17 +00:00
2022-06-25 12:46:16 +00:00
if (!pTsdb) return code;
2022-07-19 06:19:01 +00:00
SMemTable *pMemTable;
code = tsdbMemTableCreate(pTsdb, &pMemTable);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-01 10:34:17 +00:00
2022-07-19 06:19:01 +00:00
// lock
2022-09-23 02:40:54 +00:00
if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) {
2022-07-19 06:19:01 +00:00
code = TAOS_SYSTEM_ERROR(code);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-07-19 06:19:01 +00:00
}
pTsdb->mem = pMemTable;
// unlock
2022-09-23 02:40:54 +00:00
if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) {
2022-07-19 06:19:01 +00:00
code = TAOS_SYSTEM_ERROR(code);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-07-19 06:19:01 +00:00
}
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-06-10 06:48:21 +00:00
return code;
2022-06-01 10:34:17 +00:00
}
2022-12-02 07:44:24 +00:00
int32_t tsdbPrepareCommit(STsdb *pTsdb) {
taosThreadRwlockWrlock(&pTsdb->rwLock);
ASSERT(pTsdb->imem == NULL);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock);
return 0;
}
2022-12-04 07:14:04 +00:00
int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
if (!pTsdb) return 0;
2022-06-14 08:45:11 +00:00
2022-06-04 07:38:27 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-14 06:57:07 +00:00
SCommitter commith;
2022-12-02 07:44:24 +00:00
SMemTable *pMemTable = pTsdb->imem;
2022-06-14 06:57:07 +00:00
// check
2022-06-20 06:41:17 +00:00
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
2022-07-19 06:19:01 +00:00
taosThreadRwlockWrlock(&pTsdb->rwLock);
2022-12-04 06:50:58 +00:00
pTsdb->imem = NULL;
2022-07-19 06:19:01 +00:00
taosThreadRwlockUnlock(&pTsdb->rwLock);
2023-01-10 03:36:40 +00:00
tsdbUnrefMemTable(pMemTable, NULL, true);
2022-06-14 06:57:07 +00:00
goto _exit;
}
2022-01-08 04:39:12 +00:00
2022-06-04 07:38:27 +00:00
// start commit
2022-12-04 07:14:04 +00:00
code = tsdbStartCommit(pTsdb, &commith, pInfo);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-01-08 04:39:12 +00:00
2022-06-07 08:49:25 +00:00
// commit impl
code = tsdbCommitData(&commith);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-07 08:49:25 +00:00
code = tsdbCommitDel(&commith);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-07 08:49:25 +00:00
// end commit
2022-06-07 11:08:41 +00:00
code = tsdbEndCommit(&commith, 0);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-07 08:49:25 +00:00
2022-06-14 06:57:07 +00:00
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
tsdbEndCommit(&commith, code);
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-06-07 08:49:25 +00:00
return code;
}
2022-06-10 06:48:21 +00:00
static int32_t tsdbCommitDelStart(SCommitter *pCommitter) {
2022-06-10 11:50:06 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-10 11:50:06 +00:00
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
2022-09-23 02:40:54 +00:00
if ((pCommitter->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
2022-06-28 07:20:46 +00:00
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-28 07:20:46 +00:00
}
2022-09-23 02:40:54 +00:00
if ((pCommitter->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
2022-06-28 07:20:46 +00:00
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-28 07:20:46 +00:00
}
2022-09-23 02:40:54 +00:00
if ((pCommitter->aDelIdxN = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
2022-06-28 07:20:46 +00:00
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-28 07:20:46 +00:00
}
2022-06-14 08:45:11 +00:00
2022-07-21 11:42:42 +00:00
SDelFile *pDelFileR = pCommitter->fs.pDelFile;
2022-06-10 11:50:06 +00:00
if (pDelFileR) {
2022-08-06 12:23:29 +00:00
code = tsdbDelFReaderOpen(&pCommitter->pDelFReader, pDelFileR, pTsdb);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-10 11:50:06 +00:00
2022-08-06 12:23:29 +00:00
code = tsdbReadDelIdx(pCommitter->pDelFReader, pCommitter->aDelIdx);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-10 11:50:06 +00:00
}
2022-06-11 06:36:22 +00:00
// prepare new
2022-06-28 07:20:46 +00:00
SDelFile wDelFile = {.commitID = pCommitter->commitID, .size = 0, .offset = 0};
code = tsdbDelFWriterOpen(&pCommitter->pDelFWriter, &wDelFile, pTsdb);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-10 11:50:06 +00:00
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-09-23 08:45:38 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-09-23 02:40:54 +00:00
} else {
tsdbDebug("vgId:%d, commit del start", TD_VID(pTsdb->pVnode));
}
2022-06-10 06:48:21 +00:00
return code;
}
2022-06-14 11:46:46 +00:00
static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) {
2022-06-18 02:27:09 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-28 07:20:46 +00:00
SDelData *pDelData;
2022-06-18 02:27:09 +00:00
tb_uid_t suid;
tb_uid_t uid;
2022-06-14 11:46:46 +00:00
if (pTbData) {
2022-06-28 07:20:46 +00:00
suid = pTbData->suid;
uid = pTbData->uid;
2022-06-14 11:46:46 +00:00
2022-06-28 07:20:46 +00:00
if (pTbData->pHead == NULL) {
pTbData = NULL;
}
}
2022-06-14 11:46:46 +00:00
if (pDelIdx) {
2022-06-28 07:20:46 +00:00
suid = pDelIdx->suid;
uid = pDelIdx->uid;
2022-08-06 12:23:29 +00:00
code = tsdbReadDelData(pCommitter->pDelFReader, pDelIdx, pCommitter->aDelData);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-07-07 06:41:05 +00:00
} else {
taosArrayClear(pCommitter->aDelData);
2022-06-14 11:46:46 +00:00
}
2022-06-28 07:20:46 +00:00
if (pTbData == NULL && pDelIdx == NULL) goto _exit;
2022-06-14 11:46:46 +00:00
2022-06-28 07:20:46 +00:00
SDelIdx delIdx = {.suid = suid, .uid = uid};
2022-06-14 11:46:46 +00:00
// memory
2022-06-18 02:27:09 +00:00
pDelData = pTbData ? pTbData->pHead : NULL;
for (; pDelData; pDelData = pDelData->pNext) {
2022-06-28 07:20:46 +00:00
if (taosArrayPush(pCommitter->aDelData, pDelData) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-28 07:20:46 +00:00
}
2022-06-14 11:46:46 +00:00
}
// write
2022-08-06 12:23:29 +00:00
code = tsdbWriteDelData(pCommitter->pDelFWriter, pCommitter->aDelData, &delIdx);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-14 11:46:46 +00:00
// put delIdx
2022-07-07 06:41:05 +00:00
if (taosArrayPush(pCommitter->aDelIdxN, &delIdx) == NULL) {
2022-06-28 07:20:46 +00:00
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-28 07:20:46 +00:00
}
2022-06-14 11:46:46 +00:00
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-09-23 08:45:38 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-06-14 11:46:46 +00:00
return code;
}
2022-06-10 06:48:21 +00:00
static int32_t tsdbCommitDelEnd(SCommitter *pCommitter) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-28 07:20:46 +00:00
STsdb *pTsdb = pCommitter->pTsdb;
2022-06-10 09:49:01 +00:00
2022-08-06 12:23:29 +00:00
code = tsdbWriteDelIdx(pCommitter->pDelFWriter, pCommitter->aDelIdxN);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-10 12:54:02 +00:00
2022-06-28 07:20:46 +00:00
code = tsdbUpdateDelFileHdr(pCommitter->pDelFWriter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-28 07:20:46 +00:00
2022-07-21 11:42:42 +00:00
code = tsdbFSUpsertDelFile(&pCommitter->fs, &pCommitter->pDelFWriter->fDel);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-11 06:36:22 +00:00
2022-07-05 03:20:25 +00:00
code = tsdbDelFWriterClose(&pCommitter->pDelFWriter, 1);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-10 12:54:02 +00:00
if (pCommitter->pDelFReader) {
2022-07-05 03:20:25 +00:00
code = tsdbDelFReaderClose(&pCommitter->pDelFReader);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-10 12:54:02 +00:00
}
2022-06-28 07:20:46 +00:00
taosArrayDestroy(pCommitter->aDelIdx);
taosArrayDestroy(pCommitter->aDelData);
taosArrayDestroy(pCommitter->aDelIdxN);
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-09-23 08:45:38 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-06-10 06:48:21 +00:00
return code;
}
2022-09-06 09:41:06 +00:00
int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) {
2022-08-08 10:06:07 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-08 10:06:07 +00:00
2022-08-17 07:08:44 +00:00
if (suid) {
2022-09-06 09:41:06 +00:00
if (pSkmInfo->suid == suid) {
pSkmInfo->uid = uid;
2022-09-01 06:24:47 +00:00
goto _exit;
}
2022-08-17 07:08:44 +00:00
} else {
2022-09-06 09:41:06 +00:00
if (pSkmInfo->uid == uid) goto _exit;
2022-08-08 10:06:07 +00:00
}
2022-09-06 09:41:06 +00:00
pSkmInfo->suid = suid;
pSkmInfo->uid = uid;
2022-11-23 02:45:58 +00:00
tDestroyTSchema(pSkmInfo->pTSchema);
2022-09-06 09:41:06 +00:00
code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-08 10:06:07 +00:00
_exit:
return code;
}
static int32_t tsdbCommitterUpdateRowSchema(SCommitter *pCommitter, int64_t suid, int64_t uid, int32_t sver) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-08 10:06:07 +00:00
if (pCommitter->skmRow.pTSchema) {
if (pCommitter->skmRow.suid == suid) {
if (suid == 0) {
if (pCommitter->skmRow.uid == uid && sver == pCommitter->skmRow.pTSchema->version) goto _exit;
} else {
if (sver == pCommitter->skmRow.pTSchema->version) goto _exit;
}
}
}
pCommitter->skmRow.suid = suid;
pCommitter->skmRow.uid = uid;
2022-11-23 02:45:58 +00:00
tDestroyTSchema(pCommitter->skmRow.pTSchema);
2022-08-08 10:06:07 +00:00
code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, sver, &pCommitter->skmRow.pTSchema);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-08 10:06:07 +00:00
_exit:
return code;
}
2022-08-04 07:58:53 +00:00
static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-04 07:58:53 +00:00
ASSERT(pCommitter->dReader.pBlockIdx);
pCommitter->dReader.iBlockIdx++;
if (pCommitter->dReader.iBlockIdx < taosArrayGetSize(pCommitter->dReader.aBlockIdx)) {
pCommitter->dReader.pBlockIdx =
(SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx);
2022-09-07 09:06:42 +00:00
code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-04 07:58:53 +00:00
ASSERT(pCommitter->dReader.mBlock.nItem > 0);
} else {
pCommitter->dReader.pBlockIdx = NULL;
}
_exit:
return code;
}
2022-10-14 05:34:25 +00:00
static int32_t tDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
SDataIter *pIter1 = (SDataIter *)((uint8_t *)n1 - offsetof(SDataIter, n));
SDataIter *pIter2 = (SDataIter *)((uint8_t *)n2 - offsetof(SDataIter, n));
return tRowInfoCmprFn(&pIter1->r, &pIter2->r);
}
2022-08-26 11:07:51 +00:00
static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-26 11:07:51 +00:00
pCommitter->pIter = NULL;
2022-10-14 05:34:25 +00:00
tRBTreeCreate(&pCommitter->rbt, tDataIterCmprFn);
2022-08-26 11:07:51 +00:00
// memory
2022-08-27 09:11:00 +00:00
TSDBKEY tKey = {.ts = pCommitter->minKey, .version = VERSION_MIN};
2022-08-26 13:57:02 +00:00
SDataIter *pIter = &pCommitter->dataIter;
pIter->type = MEMORY_DATA_ITER;
2022-08-26 11:07:51 +00:00
pIter->iTbDataP = 0;
for (; pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP); pIter->iTbDataP++) {
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
tsdbTbDataIterOpen(pTbData, &tKey, 0, &pIter->iter);
TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
2022-08-27 09:11:00 +00:00
pRow = NULL;
2022-08-26 11:07:51 +00:00
}
2022-08-27 09:11:00 +00:00
if (pRow == NULL) continue;
2022-08-26 11:07:51 +00:00
pIter->r.suid = pTbData->suid;
pIter->r.uid = pTbData->uid;
pIter->r.row = *pRow;
break;
}
2022-08-27 09:11:00 +00:00
ASSERT(pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP));
2022-08-26 11:07:51 +00:00
tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
// disk
2022-09-01 06:24:47 +00:00
pCommitter->toLastOnly = 0;
2022-08-26 13:57:02 +00:00
SDataFReader *pReader = pCommitter->dReader.pReader;
2022-09-01 06:24:47 +00:00
if (pReader) {
2022-09-08 08:56:29 +00:00
if (pReader->pSet->nSttF >= pCommitter->sttTrigger) {
2022-09-01 06:24:47 +00:00
int8_t iIter = 0;
2022-09-05 09:31:41 +00:00
for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
2022-09-01 06:24:47 +00:00
pIter = &pCommitter->aDataIter[iIter];
2022-09-06 09:41:06 +00:00
pIter->type = STT_DATA_ITER;
2022-09-05 09:31:41 +00:00
pIter->iStt = iStt;
2022-09-01 06:24:47 +00:00
2022-09-05 09:31:41 +00:00
code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-01 06:24:47 +00:00
2022-09-05 09:31:41 +00:00
if (taosArrayGetSize(pIter->aSttBlk) == 0) continue;
2022-09-01 06:24:47 +00:00
2022-09-05 09:31:41 +00:00
pIter->iSttBlk = 0;
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0);
2022-09-27 07:37:00 +00:00
code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData);
2022-09-27 08:26:16 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-01 06:24:47 +00:00
pIter->iRow = 0;
pIter->r.suid = pIter->bData.suid;
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pIter);
iIter++;
}
2022-08-27 17:00:23 +00:00
} else {
2022-09-05 09:31:41 +00:00
for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
if (pSttFile->size > pSttFile->offset) {
2022-09-01 06:24:47 +00:00
pCommitter->toLastOnly = 1;
break;
}
}
2022-08-27 09:11:00 +00:00
}
2022-08-26 11:07:51 +00:00
}
code = tsdbNextCommitRow(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 11:07:51 +00:00
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-26 11:07:51 +00:00
return code;
}
2022-06-23 13:23:43 +00:00
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-23 13:23:43 +00:00
STsdb *pTsdb = pCommitter->pTsdb;
SDFileSet *pRSet = NULL;
2022-06-17 12:22:45 +00:00
2022-06-23 13:23:43 +00:00
// memory
2022-08-03 08:50:36 +00:00
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
2022-12-07 07:11:37 +00:00
pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
2022-08-03 08:50:36 +00:00
tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
&pCommitter->maxKey);
#if 0
ASSERT(pCommitter->minKey <= pCommitter->nextKey && pCommitter->maxKey >= pCommitter->nextKey);
#endif
2022-11-23 02:45:58 +00:00
2022-06-23 13:23:43 +00:00
pCommitter->nextKey = TSKEY_MAX;
2022-06-17 12:22:45 +00:00
2022-08-01 10:03:00 +00:00
// Reader
2022-08-27 09:11:00 +00:00
SDFileSet tDFileSet = {.fid = pCommitter->commitFid};
pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &tDFileSet, tDFileSetCmprFn, TD_EQ);
2022-06-23 13:23:43 +00:00
if (pRSet) {
2022-07-30 12:08:04 +00:00
code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-17 12:22:45 +00:00
2022-08-02 09:06:18 +00:00
// data
2022-08-06 12:04:42 +00:00
code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-01 10:03:00 +00:00
2022-08-02 09:06:18 +00:00
pCommitter->dReader.iBlockIdx = 0;
2022-08-27 09:11:00 +00:00
if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) {
pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0);
2022-09-07 09:06:42 +00:00
code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-02 09:06:18 +00:00
} else {
pCommitter->dReader.pBlockIdx = NULL;
}
2022-08-04 05:42:15 +00:00
tBlockDataReset(&pCommitter->dReader.bData);
2022-08-01 10:03:00 +00:00
} else {
2022-08-02 09:06:18 +00:00
pCommitter->dReader.pBlockIdx = NULL;
2022-06-17 06:29:06 +00:00
}
2022-06-17 02:50:25 +00:00
2022-08-01 10:03:00 +00:00
// Writer
2022-08-27 09:11:00 +00:00
SHeadFile fHead = {.commitID = pCommitter->commitID};
SDataFile fData = {.commitID = pCommitter->commitID};
SSmaFile fSma = {.commitID = pCommitter->commitID};
2022-09-05 09:31:41 +00:00
SSttFile fStt = {.commitID = pCommitter->commitID};
2022-08-27 09:11:00 +00:00
SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
2022-06-23 13:23:43 +00:00
if (pRSet) {
2022-09-08 08:56:29 +00:00
ASSERT(pRSet->nSttF <= pCommitter->sttTrigger);
2022-07-21 06:27:32 +00:00
fData = *pRSet->pDataF;
fSma = *pRSet->pSmaF;
2022-08-23 07:47:42 +00:00
wSet.diskId = pRSet->diskId;
2022-09-08 08:56:29 +00:00
if (pRSet->nSttF < pCommitter->sttTrigger) {
2022-09-05 09:31:41 +00:00
for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) {
wSet.aSttF[iStt] = pRSet->aSttF[iStt];
2022-08-27 09:11:00 +00:00
}
2022-09-05 09:31:41 +00:00
wSet.nSttF = pRSet->nSttF + 1;
2022-08-27 09:11:00 +00:00
} else {
2022-09-05 09:31:41 +00:00
wSet.nSttF = 1;
2022-08-23 07:47:42 +00:00
}
2022-06-23 13:23:43 +00:00
} else {
2022-08-23 07:47:42 +00:00
SDiskID did = {0};
2022-12-07 07:11:37 +00:00
if (tfsAllocDisk(pTsdb->pVnode->pTfs, pCommitter->expLevel, &did) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
2022-07-30 07:20:38 +00:00
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
2022-07-29 12:01:45 +00:00
wSet.diskId = did;
2022-09-05 09:31:41 +00:00
wSet.nSttF = 1;
2022-06-17 02:50:25 +00:00
}
2022-09-05 09:31:41 +00:00
wSet.aSttF[wSet.nSttF - 1] = &fStt;
2022-07-30 12:08:04 +00:00
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-17 02:50:25 +00:00
2022-08-01 10:03:00 +00:00
taosArrayClear(pCommitter->dWriter.aBlockIdx);
2022-09-05 09:31:41 +00:00
taosArrayClear(pCommitter->dWriter.aSttBlk);
2022-08-01 10:03:00 +00:00
tMapDataReset(&pCommitter->dWriter.mBlock);
tBlockDataReset(&pCommitter->dWriter.bData);
2022-09-29 09:06:18 +00:00
#if USE_STREAM_COMPRESSION
2022-09-27 10:59:29 +00:00
tDiskDataBuilderClear(pCommitter->dWriter.pBuilder);
2022-09-29 09:06:18 +00:00
#else
2022-08-01 10:03:00 +00:00
tBlockDataReset(&pCommitter->dWriter.bDatal);
2022-09-29 09:06:18 +00:00
#endif
2022-08-01 10:03:00 +00:00
2022-08-26 11:07:51 +00:00
// open iter
code = tsdbOpenCommitIter(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 11:07:51 +00:00
2022-06-23 13:23:43 +00:00
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-06-16 12:53:25 +00:00
return code;
2022-06-15 11:13:55 +00:00
}
2022-09-07 09:56:40 +00:00
int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-03 08:50:36 +00:00
2022-09-07 09:56:40 +00:00
if (pBlockData->nRow == 0) return code;
2022-08-03 08:50:36 +00:00
2022-09-07 09:56:40 +00:00
SDataBlk dataBlk;
2022-09-07 01:51:39 +00:00
tDataBlkReset(&dataBlk);
2022-08-04 07:58:53 +00:00
2022-08-06 12:04:42 +00:00
// info
2022-09-07 01:51:39 +00:00
dataBlk.nRow += pBlockData->nRow;
2022-08-05 15:22:05 +00:00
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
2022-08-03 08:50:36 +00:00
2022-08-05 15:22:05 +00:00
if (iRow == 0) {
2022-09-07 01:51:39 +00:00
if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) {
dataBlk.minKey = key;
2022-08-05 15:22:05 +00:00
}
} else {
if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
2022-09-07 01:51:39 +00:00
dataBlk.hasDup = 1;
2022-08-05 15:22:05 +00:00
}
}
2022-09-07 01:51:39 +00:00
if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) {
dataBlk.maxKey = key;
2022-08-05 15:22:05 +00:00
}
2022-09-07 01:51:39 +00:00
dataBlk.minVer = TMIN(dataBlk.minVer, key.version);
dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version);
2022-08-05 15:22:05 +00:00
}
// write
2022-09-07 01:51:39 +00:00
dataBlk.nSubBlock++;
2022-09-07 09:56:40 +00:00
code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1],
((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-05 15:22:05 +00:00
2022-09-02 03:16:23 +00:00
// put SDataBlk
2022-09-07 09:56:40 +00:00
code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-03 08:50:36 +00:00
2022-08-05 15:22:05 +00:00
// clear
2022-08-08 05:34:01 +00:00
tBlockDataClear(pBlockData);
2022-08-03 08:50:36 +00:00
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-03 08:50:36 +00:00
return code;
}
2022-09-07 09:56:40 +00:00
int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-09-07 09:56:40 +00:00
SSttBlk sstBlk;
2022-08-05 15:22:05 +00:00
2022-09-07 09:56:40 +00:00
if (pBlockData->nRow == 0) return code;
2022-08-05 15:22:05 +00:00
2022-08-06 12:04:42 +00:00
// info
2022-09-07 01:51:39 +00:00
sstBlk.suid = pBlockData->suid;
sstBlk.nRow = pBlockData->nRow;
sstBlk.minKey = TSKEY_MAX;
sstBlk.maxKey = TSKEY_MIN;
sstBlk.minVer = VERSION_MAX;
sstBlk.maxVer = VERSION_MIN;
2022-08-05 15:22:05 +00:00
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
2022-09-07 01:51:39 +00:00
sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]);
sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]);
sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]);
sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]);
2022-08-05 15:22:05 +00:00
}
2022-09-07 01:51:39 +00:00
sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0];
sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1];
2022-08-03 08:50:36 +00:00
2022-08-05 15:22:05 +00:00
// write
2022-09-07 09:56:40 +00:00
code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-03 08:50:36 +00:00
2022-09-05 09:31:41 +00:00
// push SSttBlk
2022-09-07 09:56:40 +00:00
if (taosArrayPush(aSttBlk, &sstBlk) == NULL) {
2022-08-03 08:50:36 +00:00
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-03 08:50:36 +00:00
}
2022-08-05 15:22:05 +00:00
// clear
2022-08-08 05:34:01 +00:00
tBlockDataClear(pBlockData);
2022-08-03 08:50:36 +00:00
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-05 15:22:05 +00:00
return code;
2022-08-03 08:50:36 +00:00
}
2022-09-23 09:13:14 +00:00
static int32_t tsdbCommitSttBlk(SDataFWriter *pWriter, SDiskDataBuilder *pBuilder, SArray *aSttBlk) {
int32_t code = 0;
int32_t lino = 0;
if (pBuilder->nRow == 0) return code;
// gnrt
2022-09-27 10:59:29 +00:00
const SDiskData *pDiskData;
const SBlkInfo *pBlkInfo;
code = tGnrtDiskData(pBuilder, &pDiskData, &pBlkInfo);
2022-09-23 09:13:14 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-27 10:59:29 +00:00
SSttBlk sttBlk = {.suid = pBuilder->suid,
.minUid = pBlkInfo->minUid,
.maxUid = pBlkInfo->maxUid,
.minKey = pBlkInfo->minKey,
.maxKey = pBlkInfo->maxKey,
.minVer = pBlkInfo->minVer,
2022-09-29 05:23:12 +00:00
.maxVer = pBlkInfo->maxVer,
.nRow = pBuilder->nRow};
2022-09-23 09:13:14 +00:00
// write
2022-09-27 10:59:29 +00:00
code = tsdbWriteDiskData(pWriter, pDiskData, &sttBlk.bInfo, NULL);
2022-09-23 09:13:14 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
// push
if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
2022-08-05 15:22:05 +00:00
2022-09-27 10:59:29 +00:00
// clear
tDiskDataBuilderClear(pBuilder);
2022-09-23 09:13:14 +00:00
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, lino,
tstrerror(code));
2022-09-23 09:13:14 +00:00
}
2022-08-03 08:50:36 +00:00
return code;
}
2022-06-30 06:35:50 +00:00
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-30 06:35:50 +00:00
2022-08-01 10:03:00 +00:00
// write aBlockIdx
2022-08-06 12:04:42 +00:00
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-30 06:35:50 +00:00
2022-09-05 09:31:41 +00:00
// write aSttBlk
code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-03 12:16:57 +00:00
2022-06-30 06:35:50 +00:00
// update file header
2022-07-30 12:08:04 +00:00
code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-30 06:35:50 +00:00
// upsert SDFileSet
2022-07-30 12:08:04 +00:00
code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-30 06:35:50 +00:00
// close and sync
2022-07-30 12:08:04 +00:00
code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-30 06:35:50 +00:00
2022-07-30 12:08:04 +00:00
if (pCommitter->dReader.pReader) {
code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-30 06:35:50 +00:00
}
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-06-30 06:35:50 +00:00
return code;
}
2022-08-02 09:06:18 +00:00
static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-02 09:06:18 +00:00
2022-08-26 08:56:42 +00:00
while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) {
2022-08-02 09:06:18 +00:00
SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx;
2022-09-07 09:06:42 +00:00
code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-02 09:06:18 +00:00
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-02 09:06:18 +00:00
}
2022-08-04 07:58:53 +00:00
code = tsdbCommitterNextTableData(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-02 09:06:18 +00:00
}
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-02 09:06:18 +00:00
return code;
}
2022-08-26 07:48:44 +00:00
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter);
2022-06-30 06:35:50 +00:00
static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
2022-06-10 06:48:21 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-10 06:48:21 +00:00
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
2022-06-30 06:35:50 +00:00
// commit file data start
code = tsdbCommitFileDataStart(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-30 06:35:50 +00:00
2022-08-26 07:48:44 +00:00
// impl
code = tsdbCommitFileDataImpl(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-23 13:23:43 +00:00
2022-06-14 13:19:46 +00:00
// commit file data end
code = tsdbCommitFileDataEnd(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-10 06:48:21 +00:00
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-09-23 02:40:54 +00:00
tsdbDataFReaderClose(&pCommitter->dReader.pReader);
tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
}
2022-06-10 06:48:21 +00:00
return code;
}
2022-06-14 13:19:46 +00:00
// ----------------------------------------------------------------------------
2022-12-04 07:14:04 +00:00
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter, SCommitInfo *pInfo) {
2022-06-10 06:48:21 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-13 10:44:57 +00:00
2022-06-14 13:19:46 +00:00
memset(pCommitter, 0, sizeof(*pCommitter));
2022-12-02 07:44:24 +00:00
ASSERT(pTsdb->imem && "last tsdb commit incomplete");
2022-06-13 10:44:57 +00:00
2022-06-14 13:19:46 +00:00
pCommitter->pTsdb = pTsdb;
2022-12-04 07:14:04 +00:00
pCommitter->commitID = pInfo->info.state.commitID;
2022-06-20 06:41:17 +00:00
pCommitter->minutes = pTsdb->keepCfg.days;
pCommitter->precision = pTsdb->keepCfg.precision;
2022-12-04 07:14:04 +00:00
pCommitter->minRow = pInfo->info.config.tsdbCfg.minRows;
pCommitter->maxRow = pInfo->info.config.tsdbCfg.maxRows;
pCommitter->cmprAlg = pInfo->info.config.tsdbCfg.compression;
pCommitter->sttTrigger = pInfo->info.config.sttTrigger;
2022-08-15 10:16:07 +00:00
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
if (pCommitter->aTbDataP == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-15 10:16:07 +00:00
}
2022-07-21 11:42:42 +00:00
code = tsdbFSCopy(pTsdb, &pCommitter->fs);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-22 12:03:44 +00:00
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-06-10 06:48:21 +00:00
return code;
}
2022-06-22 12:03:44 +00:00
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-06-22 12:03:44 +00:00
2022-08-26 14:23:03 +00:00
// reader
2022-07-30 12:08:04 +00:00
pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dReader.aBlockIdx == NULL) {
2022-07-01 15:10:46 +00:00
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-07-01 15:10:46 +00:00
}
2022-08-08 03:22:24 +00:00
code = tBlockDataCreate(&pCommitter->dReader.bData);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-03 08:50:36 +00:00
2022-08-26 14:23:03 +00:00
// merger
2022-09-08 09:58:33 +00:00
for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
2022-09-05 09:31:41 +00:00
SDataIter *pIter = &pCommitter->aDataIter[iStt];
pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pIter->aSttBlk == NULL) {
2022-08-26 14:23:03 +00:00
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 14:23:03 +00:00
}
code = tBlockDataCreate(&pIter->bData);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 14:23:03 +00:00
}
// writer
2022-08-01 10:03:00 +00:00
pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dWriter.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-01 10:03:00 +00:00
}
2022-09-05 09:31:41 +00:00
pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pCommitter->dWriter.aSttBlk == NULL) {
2022-08-01 10:03:00 +00:00
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-01 10:03:00 +00:00
}
2022-08-08 03:22:24 +00:00
code = tBlockDataCreate(&pCommitter->dWriter.bData);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-22 12:03:44 +00:00
2022-09-29 09:06:18 +00:00
#if USE_STREAM_COMPRESSION
2022-09-23 08:45:38 +00:00
code = tDiskDataBuilderCreate(&pCommitter->dWriter.pBuilder);
2022-09-29 09:06:18 +00:00
#else
2022-08-08 03:22:24 +00:00
code = tBlockDataCreate(&pCommitter->dWriter.bDatal);
2022-09-29 09:06:18 +00:00
#endif
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-01 10:03:00 +00:00
2022-06-22 12:03:44 +00:00
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-06-22 12:03:44 +00:00
return code;
}
static void tsdbCommitDataEnd(SCommitter *pCommitter) {
2022-08-26 14:23:03 +00:00
// reader
2022-07-30 12:08:04 +00:00
taosArrayDestroy(pCommitter->dReader.aBlockIdx);
tMapDataClear(&pCommitter->dReader.mBlock);
2022-12-01 06:24:10 +00:00
tBlockDataDestroy(&pCommitter->dReader.bData);
2022-08-01 10:03:00 +00:00
2022-08-26 14:23:03 +00:00
// merger
2022-09-08 09:58:33 +00:00
for (int32_t iStt = 0; iStt < TSDB_MAX_STT_TRIGGER; iStt++) {
2022-09-05 09:31:41 +00:00
SDataIter *pIter = &pCommitter->aDataIter[iStt];
taosArrayDestroy(pIter->aSttBlk);
2022-12-01 06:24:10 +00:00
tBlockDataDestroy(&pIter->bData);
2022-08-26 14:23:03 +00:00
}
// writer
2022-07-30 12:08:04 +00:00
taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
2022-09-05 09:31:41 +00:00
taosArrayDestroy(pCommitter->dWriter.aSttBlk);
2022-07-30 12:08:04 +00:00
tMapDataClear(&pCommitter->dWriter.mBlock);
2022-12-01 06:24:10 +00:00
tBlockDataDestroy(&pCommitter->dWriter.bData);
2022-09-29 09:06:18 +00:00
#if USE_STREAM_COMPRESSION
2022-09-23 08:45:38 +00:00
tDiskDataBuilderDestroy(pCommitter->dWriter.pBuilder);
2022-09-29 09:06:18 +00:00
#else
2022-12-01 06:24:10 +00:00
tBlockDataDestroy(&pCommitter->dWriter.bDatal);
2022-09-29 09:06:18 +00:00
#endif
2022-11-23 02:45:58 +00:00
tDestroyTSchema(pCommitter->skmTable.pTSchema);
tDestroyTSchema(pCommitter->skmRow.pTSchema);
2022-06-22 12:03:44 +00:00
}
2022-06-14 11:46:46 +00:00
static int32_t tsdbCommitData(SCommitter *pCommitter) {
2022-09-23 02:40:54 +00:00
int32_t code = 0;
int32_t lino = 0;
2022-06-14 11:46:46 +00:00
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
2022-06-11 09:20:29 +00:00
2022-06-14 11:46:46 +00:00
// check
2022-06-15 07:41:14 +00:00
if (pMemTable->nRow == 0) goto _exit;
2022-06-13 06:05:05 +00:00
2022-06-22 12:03:44 +00:00
// start ====================
code = tsdbCommitDataStart(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-22 12:03:44 +00:00
// impl ====================
pCommitter->nextKey = pMemTable->minKey;
2022-06-14 11:46:46 +00:00
while (pCommitter->nextKey < TSKEY_MAX) {
code = tsdbCommitFileData(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-13 06:05:05 +00:00
}
2022-06-11 09:20:29 +00:00
2022-06-22 12:03:44 +00:00
// end ====================
tsdbCommitDataEnd(pCommitter);
2022-06-14 11:46:46 +00:00
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-06-14 11:46:46 +00:00
return code;
}
2022-06-11 09:20:29 +00:00
2022-06-14 11:46:46 +00:00
static int32_t tsdbCommitDel(SCommitter *pCommitter) {
2022-09-23 02:40:54 +00:00
int32_t code = 0;
int32_t lino = 0;
2022-06-14 11:46:46 +00:00
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
2022-06-11 09:20:29 +00:00
2022-06-14 11:46:46 +00:00
if (pMemTable->nDel == 0) {
goto _exit;
2022-06-14 09:55:04 +00:00
}
2022-06-11 09:20:29 +00:00
2022-06-14 11:46:46 +00:00
// start
code = tsdbCommitDelStart(pCommitter);
if (code) {
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-14 11:46:46 +00:00
}
2022-06-11 09:20:29 +00:00
2022-06-14 11:46:46 +00:00
// impl
2022-06-30 06:44:44 +00:00
int32_t iDelIdx = 0;
int32_t nDelIdx = taosArrayGetSize(pCommitter->aDelIdx);
int32_t iTbData = 0;
2022-08-15 10:16:07 +00:00
int32_t nTbData = taosArrayGetSize(pCommitter->aTbDataP);
2022-06-30 06:44:44 +00:00
STbData *pTbData;
SDelIdx *pDelIdx;
ASSERT(nTbData > 0);
2022-08-15 10:16:07 +00:00
pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
2022-06-30 06:44:44 +00:00
pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
while (true) {
if (pTbData == NULL && pDelIdx == NULL) break;
if (pTbData && pDelIdx) {
int32_t c = tTABLEIDCmprFn(pTbData, pDelIdx);
if (c == 0) {
goto _commit_mem_and_disk_del;
} else if (c < 0) {
goto _commit_mem_del;
} else {
goto _commit_disk_del;
}
} else if (pTbData) {
goto _commit_mem_del;
} else {
goto _commit_disk_del;
}
_commit_mem_del:
code = tsdbCommitTableDel(pCommitter, pTbData, NULL);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-30 06:44:44 +00:00
iTbData++;
2022-08-15 10:16:07 +00:00
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
2022-06-30 06:44:44 +00:00
continue;
_commit_disk_del:
code = tsdbCommitTableDel(pCommitter, NULL, pDelIdx);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-30 06:44:44 +00:00
iDelIdx++;
pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
continue;
_commit_mem_and_disk_del:
code = tsdbCommitTableDel(pCommitter, pTbData, pDelIdx);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-30 06:44:44 +00:00
iTbData++;
2022-08-15 10:16:07 +00:00
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData) : NULL;
2022-06-30 06:44:44 +00:00
iDelIdx++;
pDelIdx = (iDelIdx < nDelIdx) ? (SDelIdx *)taosArrayGet(pCommitter->aDelIdx, iDelIdx) : NULL;
continue;
2022-06-14 11:46:46 +00:00
}
2022-06-11 09:20:29 +00:00
2022-06-14 11:46:46 +00:00
// end
code = tsdbCommitDelEnd(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-06-11 09:20:29 +00:00
2022-06-14 09:55:04 +00:00
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-09-23 02:40:54 +00:00
} else {
2022-10-21 02:39:19 +00:00
tsdbDebug("vgId:%d, commit del done, nDel:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nDel);
2022-09-23 02:40:54 +00:00
}
2022-06-11 09:20:29 +00:00
return code;
2022-06-14 11:46:46 +00:00
}
2022-06-14 13:19:46 +00:00
static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno) {
2022-09-23 02:40:54 +00:00
int32_t code = 0;
int32_t lino = 0;
2022-10-18 08:54:53 +00:00
STsdb *pTsdb = pCommitter->pTsdb;
2022-07-21 11:42:42 +00:00
2022-10-18 08:54:53 +00:00
if (eno) {
code = eno;
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbFSPrepareCommit(pCommitter->pTsdb, &pCommitter->fs);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-07-21 11:42:42 +00:00
}
2022-10-18 08:54:53 +00:00
_exit:
2022-07-21 11:42:42 +00:00
tsdbFSDestroy(&pCommitter->fs);
2022-08-15 10:16:07 +00:00
taosArrayDestroy(pCommitter->aTbDataP);
2022-10-20 01:36:15 +00:00
pCommitter->aTbDataP = NULL;
2022-10-18 08:54:53 +00:00
if (code || eno) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-09-23 02:40:54 +00:00
} else {
2022-10-21 02:39:19 +00:00
tsdbInfo("vgId:%d, tsdb end commit", TD_VID(pTsdb->pVnode));
2022-09-23 02:40:54 +00:00
}
2022-06-14 13:19:46 +00:00
return code;
}
2022-08-26 05:33:16 +00:00
2022-08-26 07:35:43 +00:00
// ================================================================================
2022-08-26 05:33:16 +00:00
2022-08-26 09:47:37 +00:00
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
2022-08-26 08:56:42 +00:00
}
2022-08-26 09:47:37 +00:00
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
2022-08-26 07:35:43 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-26 07:35:43 +00:00
if (pCommitter->pIter) {
SDataIter *pIter = pCommitter->pIter;
2022-08-31 14:04:14 +00:00
if (pCommitter->pIter->type == MEMORY_DATA_ITER) { // memory
2022-08-26 07:35:43 +00:00
tsdbTbDataIterNext(&pIter->iter);
TSDBROW *pRow = tsdbTbDataIterGet(&pIter->iter);
while (true) {
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow));
pRow = NULL;
}
if (pRow) {
pIter->r.suid = pIter->iter.pTbData->suid;
pIter->r.uid = pIter->iter.pTbData->uid;
pIter->r.row = *pRow;
break;
}
pIter->iTbDataP++;
2022-08-26 11:07:51 +00:00
if (pIter->iTbDataP < taosArrayGetSize(pCommitter->aTbDataP)) {
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, pIter->iTbDataP);
2022-08-26 07:35:43 +00:00
TSDBKEY keyFrom = {.ts = pCommitter->minKey, .version = VERSION_MIN};
tsdbTbDataIterOpen(pTbData, &keyFrom, 0, &pIter->iter);
pRow = tsdbTbDataIterGet(&pIter->iter);
continue;
} else {
pCommitter->pIter = NULL;
break;
}
}
2022-09-06 09:41:06 +00:00
} else if (pCommitter->pIter->type == STT_DATA_ITER) { // last file
2022-08-26 07:35:43 +00:00
pIter->iRow++;
if (pIter->iRow < pIter->bData.nRow) {
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow];
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow);
} else {
2022-09-05 09:31:41 +00:00
pIter->iSttBlk++;
if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) {
SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk);
2022-08-26 07:35:43 +00:00
2022-09-27 07:37:00 +00:00
code = tsdbReadSttBlockEx(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData);
2022-08-26 07:35:43 +00:00
if (code) goto _exit;
pIter->iRow = 0;
pIter->r.suid = pIter->bData.suid;
pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0];
pIter->r.row = tsdbRowFromBlockData(&pIter->bData, 0);
} else {
pCommitter->pIter = NULL;
}
}
} else {
ASSERT(0);
}
// compare with min in RB Tree
pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
if (pCommitter->pIter && pIter) {
int32_t c = tRowInfoCmprFn(&pCommitter->pIter->r, &pIter->r);
if (c > 0) {
tRBTreePut(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
pCommitter->pIter = NULL;
} else {
ASSERT(c);
}
}
}
if (pCommitter->pIter == NULL) {
pCommitter->pIter = (SDataIter *)tRBTreeMin(&pCommitter->rbt);
if (pCommitter->pIter) {
tRBTreeDrop(&pCommitter->rbt, (SRBTreeNode *)pCommitter->pIter);
}
}
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-26 07:35:43 +00:00
return code;
}
2022-09-02 03:16:23 +00:00
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
2022-09-23 02:40:54 +00:00
int32_t code = 0;
int32_t lino = 0;
2022-08-26 09:47:37 +00:00
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
tBlockDataClear(pBlockData);
while (pRowInfo) {
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
code = tsdbNextCommitRow(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo) {
if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
pRowInfo = NULL;
} else {
TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
2022-09-02 03:16:23 +00:00
if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL;
2022-08-26 09:47:37 +00:00
}
}
2022-08-27 16:00:48 +00:00
if (pBlockData->nRow >= pCommitter->maxRow) {
2022-09-07 09:56:40 +00:00
code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
}
}
2022-09-07 09:56:40 +00:00
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-26 09:05:17 +00:00
return code;
}
2022-09-02 03:16:23 +00:00
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
2022-09-23 02:40:54 +00:00
int32_t code = 0;
int32_t lino = 0;
2022-08-26 09:47:37 +00:00
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
SBlockData *pBDataR = &pCommitter->dReader.bData;
SBlockData *pBDataW = &pCommitter->dWriter.bData;
2022-09-02 03:16:23 +00:00
code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
tBlockDataClear(pBDataW);
int32_t iRow = 0;
TSDBROW row = tsdbRowFromBlockData(pBDataR, 0);
TSDBROW *pRow = &row;
while (pRow && pRowInfo) {
int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
if (c < 0) {
code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
iRow++;
if (iRow < pBDataR->nRow) {
row = tsdbRowFromBlockData(pBDataR, iRow);
} else {
pRow = NULL;
}
} else if (c > 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
code = tsdbNextCommitRow(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo) {
if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
pRowInfo = NULL;
} else {
TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
2022-09-02 03:16:23 +00:00
if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL;
2022-08-26 09:47:37 +00:00
}
}
} else {
ASSERT(0 && "dup rows not allowed");
2022-08-26 09:47:37 +00:00
}
2022-08-27 16:00:48 +00:00
if (pBDataW->nRow >= pCommitter->maxRow) {
2022-09-07 09:56:40 +00:00
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
}
}
while (pRow) {
code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
iRow++;
if (iRow < pBDataR->nRow) {
row = tsdbRowFromBlockData(pBDataR, iRow);
} else {
pRow = NULL;
}
2022-08-27 16:00:48 +00:00
if (pBDataW->nRow >= pCommitter->maxRow) {
2022-09-07 09:56:40 +00:00
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
}
}
2022-09-07 09:56:40 +00:00
code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:47:37 +00:00
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-26 09:05:17 +00:00
return code;
}
2022-08-26 08:56:42 +00:00
static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
2022-09-23 02:40:54 +00:00
int32_t code = 0;
int32_t lino = 0;
2022-08-26 08:56:42 +00:00
SBlockIdx *pBlockIdx = pCommitter->dReader.pBlockIdx;
ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
int32_t iBlock = 0;
2022-09-02 03:16:23 +00:00
SDataBlk block;
SDataBlk *pDataBlk = &block;
2022-08-26 08:56:42 +00:00
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);
2022-09-02 03:16:23 +00:00
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
while (pDataBlk && pRowInfo) {
SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
2022-09-02 03:19:34 +00:00
int32_t c = tDataBlkCmprFn(pDataBlk, &tBlock);
2022-08-26 08:56:42 +00:00
if (c < 0) {
2022-09-02 03:16:23 +00:00
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 08:56:42 +00:00
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
2022-09-02 03:16:23 +00:00
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
2022-08-26 08:56:42 +00:00
} else {
2022-09-02 03:16:23 +00:00
pDataBlk = NULL;
2022-08-26 08:56:42 +00:00
}
} else if (c > 0) {
2022-09-02 03:16:23 +00:00
code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:05:17 +00:00
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
2022-08-26 08:56:42 +00:00
} else {
2022-09-02 03:16:23 +00:00
code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 09:05:17 +00:00
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
2022-09-02 03:16:23 +00:00
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
2022-08-26 09:05:17 +00:00
} else {
2022-09-02 03:16:23 +00:00
pDataBlk = NULL;
2022-08-26 09:05:17 +00:00
}
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
2022-08-26 08:56:42 +00:00
}
}
2022-09-02 03:16:23 +00:00
while (pDataBlk) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 08:56:42 +00:00
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
2022-09-02 03:16:23 +00:00
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
2022-08-26 08:56:42 +00:00
} else {
2022-09-02 03:16:23 +00:00
pDataBlk = NULL;
2022-08-26 08:56:42 +00:00
}
}
2022-08-27 09:11:00 +00:00
code = tsdbCommitterNextTableData(pCommitter);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 08:56:42 +00:00
}
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-26 08:56:42 +00:00
return code;
}
2022-09-23 09:13:14 +00:00
static int32_t tsdbInitSttBlockBuilderIfNeed(SCommitter *pCommitter, TABLEID id) {
2022-08-27 16:00:48 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-27 16:00:48 +00:00
2022-09-29 09:06:18 +00:00
#if USE_STREAM_COMPRESSION
2022-09-23 08:45:38 +00:00
SDiskDataBuilder *pBuilder = pCommitter->dWriter.pBuilder;
if (pBuilder->suid || pBuilder->uid) {
if (!TABLE_SAME_SCHEMA(pBuilder->suid, pBuilder->uid, id.suid, id.uid)) {
2022-09-23 09:13:14 +00:00
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pBuilder, pCommitter->dWriter.aSttBlk);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-27 15:34:06 +00:00
tDiskDataBuilderClear(pBuilder);
2022-08-27 16:00:48 +00:00
}
}
2022-09-23 08:45:38 +00:00
if (!pBuilder->suid && !pBuilder->uid) {
2022-08-27 16:19:24 +00:00
ASSERT(pCommitter->skmTable.suid == id.suid);
ASSERT(pCommitter->skmTable.uid == id.uid);
2022-09-27 15:34:06 +00:00
code = tDiskDataBuilderInit(pBuilder, pCommitter->skmTable.pTSchema, &id, pCommitter->cmprAlg, 0);
2022-09-27 08:26:16 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
}
2022-09-29 09:06:18 +00:00
#else
SBlockData *pBData = &pCommitter->dWriter.bDatal;
if (pBData->suid || pBData->uid) {
if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataReset(pBData);
2022-08-27 16:00:48 +00:00
}
}
2022-09-29 09:06:18 +00:00
if (!pBData->suid && !pBData->uid) {
2022-08-27 16:19:24 +00:00
ASSERT(pCommitter->skmTable.suid == id.suid);
ASSERT(pCommitter->skmTable.uid == id.uid);
2022-09-29 09:06:18 +00:00
TABLEID tid = {.suid = id.suid, .uid = id.suid ? 0 : id.uid};
code = tBlockDataInit(pBData, &tid, pCommitter->skmTable.pTSchema, NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
}
2022-09-29 09:06:18 +00:00
#endif
2022-08-27 16:00:48 +00:00
2022-08-27 16:19:24 +00:00
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-27 16:19:24 +00:00
return code;
}
static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-27 16:19:24 +00:00
SBlockData *pBData = &pCommitter->dWriter.bData;
2022-09-27 15:34:06 +00:00
TABLEID id = {.suid = pBData->suid, .uid = pBData->uid};
2022-08-27 16:19:24 +00:00
2022-09-23 09:13:14 +00:00
code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:19:24 +00:00
2022-08-27 16:00:48 +00:00
for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
2022-09-29 09:06:18 +00:00
#if USE_STREAM_COMPRESSION
2022-09-27 09:36:02 +00:00
code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &row, NULL, &id);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
2022-09-23 08:45:38 +00:00
if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
2022-09-23 09:13:14 +00:00
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-27 15:34:06 +00:00
code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
}
2022-09-29 09:06:18 +00:00
#else
code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &row, NULL, id.uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
}
2022-09-29 09:06:18 +00:00
#endif
2022-08-27 16:00:48 +00:00
}
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-27 16:00:48 +00:00
return code;
}
2022-08-27 07:34:01 +00:00
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
2022-08-27 16:00:48 +00:00
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-26 10:06:27 +00:00
2022-08-27 16:00:48 +00:00
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
2022-08-26 10:06:27 +00:00
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
pRowInfo = NULL;
}
2022-08-27 16:00:48 +00:00
if (pRowInfo == NULL) goto _exit;
2022-08-26 10:06:27 +00:00
2022-08-27 16:00:48 +00:00
if (pCommitter->toLastOnly) {
2022-09-27 15:34:06 +00:00
code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 10:06:27 +00:00
2022-09-23 08:45:38 +00:00
while (pRowInfo) {
STSchema *pTSchema = NULL;
2022-12-01 08:51:36 +00:00
if (pRowInfo->row.type == TSDBROW_ROW_FMT) {
2022-09-23 08:45:38 +00:00
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
TSDB_CHECK_CODE(code, lino, _exit);
pTSchema = pCommitter->skmRow.pTSchema;
}
2022-09-29 09:06:18 +00:00
#if USE_STREAM_COMPRESSION
2022-09-27 09:36:02 +00:00
code = tDiskDataAddRow(pCommitter->dWriter.pBuilder, &pRowInfo->row, pTSchema, &id);
2022-09-29 09:06:18 +00:00
#else
code = tBlockDataAppendRow(&pCommitter->dWriter.bDatal, &pRowInfo->row, pTSchema, id.uid);
#endif
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
2022-09-23 08:45:38 +00:00
code = tsdbNextCommitRow(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
2022-09-23 08:45:38 +00:00
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
pRowInfo = NULL;
}
2022-08-27 16:00:48 +00:00
2022-09-29 09:06:18 +00:00
#if USE_STREAM_COMPRESSION
2022-09-23 08:45:38 +00:00
if (pCommitter->dWriter.pBuilder->nRow >= pCommitter->maxRow) {
2022-09-23 09:13:14 +00:00
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
2022-09-23 08:45:38 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-27 15:34:06 +00:00
code = tsdbInitSttBlockBuilderIfNeed(pCommitter, id);
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-23 08:45:38 +00:00
}
2022-09-29 09:06:18 +00:00
#else
if (pCommitter->dWriter.bDatal.nRow >= pCommitter->maxRow) {
2022-09-29 09:06:18 +00:00
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
}
#endif
2022-08-27 16:00:48 +00:00
}
} else {
2022-09-23 08:45:38 +00:00
SBlockData *pBData = &pCommitter->dWriter.bData;
2022-08-27 16:00:48 +00:00
ASSERT(pBData->nRow == 0);
2022-08-26 10:06:27 +00:00
2022-09-23 08:45:38 +00:00
while (pRowInfo) {
STSchema *pTSchema = NULL;
2022-12-01 08:51:36 +00:00
if (pRowInfo->row.type == TSDBROW_ROW_FMT) {
2022-09-23 08:45:38 +00:00
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-23 08:45:38 +00:00
pTSchema = pCommitter->skmRow.pTSchema;
}
2022-08-27 16:00:48 +00:00
2022-09-23 08:45:38 +00:00
code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
2022-09-23 08:45:38 +00:00
code = tsdbNextCommitRow(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
2022-09-23 08:45:38 +00:00
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
pRowInfo = NULL;
}
2022-08-27 16:00:48 +00:00
2022-09-23 08:45:38 +00:00
if (pBData->nRow >= pCommitter->maxRow) {
2022-09-07 09:56:40 +00:00
code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 16:00:48 +00:00
}
2022-08-26 10:06:27 +00:00
}
2022-09-23 08:45:38 +00:00
if (pBData->nRow) {
if (pBData->nRow > pCommitter->minRow) {
code =
tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbAppendLastBlock(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
}
2022-08-27 16:24:11 +00:00
}
}
2022-08-27 16:00:48 +00:00
_exit:
2022-09-23 02:40:54 +00:00
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-26 08:56:42 +00:00
return code;
}
2022-08-26 07:35:43 +00:00
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
int32_t code = 0;
2022-09-23 02:40:54 +00:00
int32_t lino = 0;
2022-08-26 07:35:43 +00:00
2022-08-27 09:11:00 +00:00
SRowInfo *pRowInfo;
2022-08-26 07:35:43 +00:00
TABLEID id = {0};
2022-08-27 09:11:00 +00:00
while ((pRowInfo = tsdbGetCommitRow(pCommitter)) != NULL) {
2022-08-26 09:47:37 +00:00
ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
id.suid = pRowInfo->suid;
id.uid = pRowInfo->uid;
2022-08-27 09:11:00 +00:00
2022-08-26 09:47:37 +00:00
code = tsdbMoveCommitData(pCommitter, id);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 09:11:00 +00:00
// start
2022-08-26 09:47:37 +00:00
tMapDataReset(&pCommitter->dWriter.mBlock);
2022-08-27 09:11:00 +00:00
// impl
2022-09-06 09:41:06 +00:00
code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable);
2022-09-27 08:26:16 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-27 03:02:04 +00:00
code = tBlockDataInit(&pCommitter->dReader.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
2022-09-27 08:26:16 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-09-27 03:02:04 +00:00
code = tBlockDataInit(&pCommitter->dWriter.bData, &id, pCommitter->skmTable.pTSchema, NULL, 0);
2022-09-27 08:26:16 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 07:35:43 +00:00
2022-08-26 08:56:42 +00:00
/* merge with data in .data file */
code = tsdbMergeTableData(pCommitter, id);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 07:35:43 +00:00
2022-08-26 08:56:42 +00:00
/* handle remain table data */
2022-08-27 07:34:01 +00:00
code = tsdbCommitTableData(pCommitter, id);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 10:25:02 +00:00
2022-08-27 09:11:00 +00:00
// end
2022-08-26 10:25:02 +00:00
if (pCommitter->dWriter.mBlock.nItem > 0) {
SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid};
2022-09-07 09:06:42 +00:00
code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 10:25:02 +00:00
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 10:25:02 +00:00
}
}
2022-08-26 07:35:43 +00:00
}
2022-08-27 09:11:00 +00:00
id.suid = INT64_MAX;
id.uid = INT64_MAX;
code = tsdbMoveCommitData(pCommitter, id);
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-27 09:11:00 +00:00
2022-09-29 09:06:18 +00:00
#if USE_STREAM_COMPRESSION
2022-09-23 09:57:38 +00:00
code = tsdbCommitSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.pBuilder, pCommitter->dWriter.aSttBlk);
2022-09-29 09:06:18 +00:00
#else
2022-09-07 09:56:40 +00:00
code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk,
pCommitter->cmprAlg);
2022-09-29 09:06:18 +00:00
#endif
2022-09-23 02:40:54 +00:00
TSDB_CHECK_CODE(code, lino, _exit);
2022-08-26 07:35:43 +00:00
2022-09-23 02:40:54 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
2022-09-23 02:46:05 +00:00
tstrerror(code));
2022-09-23 02:40:54 +00:00
}
2022-08-26 07:35:43 +00:00
return code;
}
2022-10-18 08:54:53 +00:00
int32_t tsdbFinishCommit(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
SMemTable *pMemTable = pTsdb->imem;
2022-08-27 09:11:00 +00:00
2022-10-18 08:54:53 +00:00
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
TSDB_CHECK_CODE(code, lino, _exit);
}
pTsdb->imem = NULL;
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
2022-10-19 02:43:06 +00:00
if (pMemTable) {
2023-01-10 03:36:40 +00:00
tsdbUnrefMemTable(pMemTable, NULL, true);
2022-10-19 02:43:06 +00:00
}
2022-10-18 08:54:53 +00:00
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-10-18 08:54:53 +00:00
} else {
2022-10-21 02:39:19 +00:00
tsdbInfo("vgId:%d, tsdb finish commit", TD_VID(pTsdb->pVnode));
2022-10-18 08:54:53 +00:00
}
2022-08-26 07:35:43 +00:00
return code;
2022-10-18 08:54:53 +00:00
}
2022-08-26 07:35:43 +00:00
2022-10-18 08:54:53 +00:00
int32_t tsdbRollbackCommit(STsdb *pTsdb) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbFSRollback(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
2022-10-21 02:39:19 +00:00
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
2022-10-18 08:54:53 +00:00
} else {
2022-10-21 02:39:19 +00:00
tsdbInfo("vgId:%d, tsdb rollback commit", TD_VID(pTsdb->pVnode));
2022-10-18 08:54:53 +00:00
}
2022-08-26 07:35:43 +00:00
return code;
}