TDengine/source/dnode/vnode/src/vnd/vnodeCommit.c

404 lines
9.4 KiB
C
Raw Normal View History

2021-09-27 02:49:36 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-04-26 11:04:26 +00:00
#include "vnd.h"
2021-09-27 02:49:36 +00:00
2022-07-19 08:30:49 +00:00
#define VND_INFO_FNAME "vnode.json"
2022-04-15 05:47:57 +00:00
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
2022-04-15 06:27:04 +00:00
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
2022-04-15 07:07:44 +00:00
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
2022-01-06 06:34:20 +00:00
static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode);
2022-04-24 07:34:53 +00:00
static int vnodeCommitImpl(void *arg);
2022-01-06 06:34:20 +00:00
static void vnodeWaitCommit(SVnode *pVnode);
2021-09-27 02:49:36 +00:00
2022-04-19 13:10:03 +00:00
int vnodeBegin(SVnode *pVnode) {
2022-04-20 06:56:34 +00:00
// alloc buffer pool
2022-07-19 08:30:49 +00:00
taosThreadMutexLock(&pVnode->mutex);
2022-04-20 06:56:34 +00:00
while (pVnode->pPool == NULL) {
2022-07-19 08:30:49 +00:00
taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex);
2022-04-20 06:56:34 +00:00
}
pVnode->inUse = pVnode->pPool;
2022-07-19 08:30:49 +00:00
pVnode->inUse->nRef = 1;
2022-04-20 06:56:34 +00:00
pVnode->pPool = pVnode->inUse->next;
pVnode->inUse->next = NULL;
2022-07-19 08:30:49 +00:00
taosThreadMutexUnlock(&pVnode->mutex);
2022-04-19 13:10:03 +00:00
2022-06-23 03:08:19 +00:00
pVnode->state.commitID++;
2022-04-19 13:10:03 +00:00
// begin meta
if (metaBegin(pVnode->pMeta) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
2022-04-19 13:10:03 +00:00
return -1;
}
// begin tsdb
2022-06-23 03:54:20 +00:00
if (tsdbBegin(pVnode->pTsdb) < 0) {
vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
2022-05-01 16:30:47 +00:00
2022-06-23 03:54:20 +00:00
if (pVnode->pSma) {
if (VND_RSMA1(pVnode) && tsdbBegin(VND_RSMA1(pVnode)) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
2022-05-01 16:30:47 +00:00
return -1;
}
2022-06-23 03:54:20 +00:00
if (VND_RSMA2(pVnode) && tsdbBegin(VND_RSMA2(pVnode)) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
2022-05-01 16:30:47 +00:00
return -1;
}
2022-04-19 13:10:03 +00:00
}
2022-07-02 07:17:41 +00:00
// begin sma
smaBegin(pVnode->pSma); // TODO: refactor to include the rsma1/rsma2 tsdbBegin() after tsdb_refact branch merged
2022-04-19 13:10:03 +00:00
return 0;
}
2022-06-26 10:44:49 +00:00
int vnodeShouldCommit(SVnode *pVnode) {
if (pVnode->inUse) {
return pVnode->inUse->size > pVnode->config.szBuf / 3;
}
return false;
}
2022-04-20 08:50:45 +00:00
2022-04-15 05:47:57 +00:00
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
TdFilePtr pFile;
2022-04-15 06:27:04 +00:00
char *data;
2022-04-15 05:47:57 +00:00
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
// encode info
data = NULL;
2022-04-15 06:27:04 +00:00
if (vnodeEncodeInfo(pInfo, &data) < 0) {
2022-04-15 05:47:57 +00:00
return -1;
}
// save info to a vnode_tmp.json
pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
2022-08-02 07:57:37 +00:00
vError("failed to open info file:%s for write:%s", fname, terrstr());
2022-04-15 05:47:57 +00:00
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
2022-04-15 06:27:04 +00:00
if (taosWriteFile(pFile, data, strlen(data)) < 0) {
2022-08-02 07:57:37 +00:00
vError("failed to write info file:%s data:%s", fname, terrstr());
2022-04-15 05:47:57 +00:00
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pFile) < 0) {
2022-08-02 07:57:37 +00:00
vError("failed to fsync info file:%s error:%s", fname, terrstr());
2022-04-15 05:47:57 +00:00
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pFile);
// free info binary
taosMemoryFree(data);
2022-08-02 07:57:37 +00:00
vInfo("vgId:%d, vnode info is saved, fname:%s", pInfo->config.vgId, fname);
2022-04-15 05:47:57 +00:00
return 0;
_err:
taosCloseFile(&pFile);
taosMemoryFree(data);
return -1;
}
int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
char tfname[TSDB_FILENAME_LEN];
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
if (taosRenameFile(tfname, fname) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
2022-06-02 05:57:39 +00:00
vInfo("vgId:%d, vnode info is committed", pInfo->config.vgId);
2022-04-15 05:47:57 +00:00
return 0;
}
2022-04-15 07:07:44 +00:00
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
TdFilePtr pFile = NULL;
char *pData = NULL;
int64_t size;
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
// read info
pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
2022-04-16 07:16:10 +00:00
pData = taosMemoryMalloc(size + 1);
2022-04-15 07:07:44 +00:00
if (pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (taosReadFile(pFile, pData, size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
2022-04-16 07:16:10 +00:00
pData[size] = '\0';
2022-04-15 07:07:44 +00:00
taosCloseFile(&pFile);
// decode info
if (vnodeDecodeInfo(pData, pInfo) < 0) {
taosMemoryFree(pData);
return -1;
}
taosMemoryFree(pData);
2022-04-15 05:47:57 +00:00
return 0;
2022-04-15 07:07:44 +00:00
_err:
taosCloseFile(&pFile);
taosMemoryFree(pData);
return -1;
2022-04-15 05:47:57 +00:00
}
2021-09-27 02:49:36 +00:00
int vnodeAsyncCommit(SVnode *pVnode) {
2022-01-06 06:34:20 +00:00
vnodeWaitCommit(pVnode);
2022-04-20 06:56:34 +00:00
// vnodeBufPoolSwitch(pVnode);
2022-04-26 11:04:26 +00:00
// tsdbPrepareCommit(pVnode->pTsdb);
2021-12-17 08:47:30 +00:00
2022-04-24 07:34:53 +00:00
vnodeScheduleTask(vnodeCommitImpl, pVnode);
2022-04-14 06:16:43 +00:00
2021-12-14 06:32:07 +00:00
return 0;
}
2021-09-27 02:49:36 +00:00
2022-01-06 06:34:20 +00:00
int vnodeSyncCommit(SVnode *pVnode) {
vnodeAsyncCommit(pVnode);
vnodeWaitCommit(pVnode);
2022-01-10 08:53:57 +00:00
tsem_post(&(pVnode->canCommit));
2022-01-06 06:34:20 +00:00
return 0;
}
2022-04-24 07:34:53 +00:00
int vnodeCommit(SVnode *pVnode) {
SVnodeInfo info = {0};
2022-04-24 07:34:53 +00:00
char dir[TSDB_FILENAME_LEN];
2022-06-23 03:08:19 +00:00
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied);
2022-04-24 09:14:37 +00:00
2022-07-19 08:30:49 +00:00
vnodeBufPoolUnRef(pVnode->inUse);
2022-04-24 07:34:53 +00:00
pVnode->inUse = NULL;
2022-08-06 09:20:15 +00:00
pVnode->state.commitTerm = pVnode->state.applyTerm;
2022-04-24 07:34:53 +00:00
// save info
info.config = pVnode->config;
2022-04-24 09:14:37 +00:00
info.state.committed = pVnode->state.applied;
2022-07-06 09:46:14 +00:00
info.state.commitTerm = pVnode->state.applyTerm;
2022-06-25 03:46:05 +00:00
info.state.commitID = pVnode->state.commitID;
2022-04-24 07:34:53 +00:00
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
if (vnodeSaveInfo(dir, &info) < 0) {
ASSERT(0);
return -1;
}
walBeginSnapshot(pVnode->pWal, pVnode->state.applied);
2022-04-24 07:34:53 +00:00
2022-07-01 12:54:57 +00:00
// preCommit
2022-07-16 09:12:45 +00:00
// smaSyncPreCommit(pVnode->pSma);
smaAsyncPreCommit(pVnode->pSma);
2022-07-02 07:17:41 +00:00
2022-04-24 07:34:53 +00:00
// commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0);
return -1;
}
2022-05-02 10:16:43 +00:00
2022-06-01 07:06:12 +00:00
if (VND_IS_RSMA(pVnode)) {
2022-07-16 09:12:45 +00:00
smaAsyncCommit(pVnode->pSma);
2022-05-02 10:16:43 +00:00
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0);
return -1;
}
if (tsdbCommit(VND_RSMA1(pVnode)) < 0) {
ASSERT(0);
return -1;
}
if (tsdbCommit(VND_RSMA2(pVnode)) < 0) {
ASSERT(0);
return -1;
}
} else {
if (tsdbCommit(pVnode->pTsdb) < 0) {
ASSERT(0);
return -1;
}
2022-04-24 07:34:53 +00:00
}
2022-05-02 10:16:43 +00:00
2022-04-24 07:34:53 +00:00
if (tqCommit(pVnode->pTq) < 0) {
ASSERT(0);
return -1;
}
// walCommit (TODO)
// commit info
if (vnodeCommitInfo(dir, &info) < 0) {
ASSERT(0);
return -1;
}
2022-07-06 09:46:14 +00:00
2022-06-30 07:28:03 +00:00
pVnode->state.committed = info.state.committed;
2022-04-24 07:34:53 +00:00
2022-07-01 12:54:57 +00:00
// postCommit
2022-07-16 09:12:45 +00:00
// smaSyncPostCommit(pVnode->pSma);
smaAsyncPostCommit(pVnode->pSma);
2022-04-24 07:34:53 +00:00
// apply the commit (TODO)
walEndSnapshot(pVnode->pWal);
2022-04-24 07:34:53 +00:00
2022-07-28 06:35:56 +00:00
vInfo("vgId:%d, commit end", TD_VID(pVnode));
2022-04-24 09:14:37 +00:00
2022-04-24 07:34:53 +00:00
return 0;
}
static int vnodeCommitImpl(void *arg) {
2021-12-14 06:32:07 +00:00
SVnode *pVnode = (SVnode *)arg;
2021-09-27 02:49:36 +00:00
2022-04-06 06:37:37 +00:00
// metaCommit(pVnode->pMeta);
2021-12-17 08:47:30 +00:00
tqCommit(pVnode->pTq);
2022-06-23 03:08:19 +00:00
// tsdbCommit(pVnode->pTsdb, );
2021-12-17 08:47:30 +00:00
2022-04-20 06:56:34 +00:00
// vnodeBufPoolRecycle(pVnode);
2021-12-20 08:52:39 +00:00
tsem_post(&(pVnode->canCommit));
2021-09-27 02:49:36 +00:00
return 0;
}
static int vnodeStartCommit(SVnode *pVnode) {
// TODO
return 0;
}
static int vnodeEndCommit(SVnode *pVnode) {
// TODO
return 0;
2022-01-06 06:34:20 +00:00
}
2022-04-15 05:47:57 +00:00
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
2022-04-15 06:27:04 +00:00
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
const SVState *pState = (SVState *)pObj;
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
2022-06-23 03:08:19 +00:00
if (tjsonAddIntegerToObject(pJson, "commit ID", pState->commitID) < 0) return -1;
2022-07-06 09:46:14 +00:00
if (tjsonAddIntegerToObject(pJson, "commit term", pState->commitTerm) < 0) return -1;
2022-04-15 06:27:04 +00:00
return 0;
}
static int vnodeDecodeState(const SJson *pJson, void *pObj) {
SVState *pState = (SVState *)pObj;
int32_t code;
tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
2022-06-21 07:13:12 +00:00
if (code < 0) return -1;
2022-06-23 03:08:19 +00:00
tjsonGetNumberValue(pJson, "commit ID", pState->commitID, code);
if (code < 0) return -1;
2022-07-06 09:46:14 +00:00
tjsonGetNumberValue(pJson, "commit term", pState->commitTerm, code);
if (code < 0) return -1;
2022-04-15 06:27:04 +00:00
return 0;
}
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
SJson *pJson;
char *pData;
*ppData = NULL;
pJson = tjsonCreateObject();
if (pJson == NULL) {
return -1;
}
if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) {
goto _err;
}
if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) {
goto _err;
}
pData = tjsonToString(pJson);
if (pData == NULL) {
goto _err;
}
tjsonDelete(pJson);
*ppData = pData;
2022-04-15 05:47:57 +00:00
return 0;
2022-04-15 06:27:04 +00:00
_err:
tjsonDelete(pJson);
return -1;
2022-04-15 05:47:57 +00:00
}
2022-04-15 07:07:44 +00:00
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
2022-04-15 06:27:04 +00:00
SJson *pJson = NULL;
2022-04-16 07:26:11 +00:00
pJson = tjsonParse(pData);
2022-04-15 06:27:04 +00:00
if (pJson == NULL) {
return -1;
}
if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) {
goto _err;
}
if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) {
goto _err;
}
tjsonDelete(pJson);
2022-04-15 05:47:57 +00:00
return 0;
2022-04-15 06:27:04 +00:00
_err:
tjsonDelete(pJson);
return -1;
2022-04-15 05:47:57 +00:00
}