TDengine/source/dnode/vnode/src/vnd/vnodeCommit.c

392 lines
8.6 KiB
C
Raw Normal View History

2021-09-27 02:49:36 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-04-26 11:04:26 +00:00
#include "vnd.h"
2021-09-27 02:49:36 +00:00
2022-04-15 05:47:57 +00:00
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
2022-04-15 06:27:04 +00:00
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
2022-04-15 07:07:44 +00:00
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
2022-01-06 06:34:20 +00:00
static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode);
2022-04-24 07:34:53 +00:00
static int vnodeCommitImpl(void *arg);
2022-01-06 06:34:20 +00:00
static void vnodeWaitCommit(SVnode *pVnode);
2021-09-27 02:49:36 +00:00
2022-04-19 13:10:03 +00:00
int vnodeBegin(SVnode *pVnode) {
2022-04-20 06:56:34 +00:00
// alloc buffer pool
/* pthread_mutex_lock(); */
while (pVnode->pPool == NULL) {
/* pthread_cond_wait(); */
}
pVnode->inUse = pVnode->pPool;
pVnode->pPool = pVnode->inUse->next;
pVnode->inUse->next = NULL;
/* ref pVnode->inUse buffer pool */
/* pthread_mutex_unlock(); */
2022-04-19 13:10:03 +00:00
// begin meta
if (metaBegin(pVnode->pMeta) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
2022-04-19 13:10:03 +00:00
return -1;
}
// begin tsdb
if (pVnode->pSma) {
2022-05-01 16:30:47 +00:00
if (tsdbBegin(VND_RSMA0(pVnode)) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to begin rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
2022-05-01 16:30:47 +00:00
return -1;
}
if (tsdbBegin(VND_RSMA1(pVnode)) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
2022-05-01 16:30:47 +00:00
return -1;
}
if (tsdbBegin(VND_RSMA2(pVnode)) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
2022-05-01 16:30:47 +00:00
return -1;
}
} else {
if (tsdbBegin(pVnode->pTsdb) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
2022-05-01 16:30:47 +00:00
return -1;
}
2022-04-19 13:10:03 +00:00
}
return 0;
}
2022-06-26 10:44:49 +00:00
int vnodeShouldCommit(SVnode *pVnode) {
if (pVnode->inUse) {
return pVnode->inUse->size > pVnode->config.szBuf / 3;
}
return false;
}
2022-04-20 08:50:45 +00:00
2022-04-15 05:47:57 +00:00
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
TdFilePtr pFile;
2022-04-15 06:27:04 +00:00
char *data;
2022-04-15 05:47:57 +00:00
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
// encode info
data = NULL;
2022-04-15 06:27:04 +00:00
if (vnodeEncodeInfo(pInfo, &data) < 0) {
2022-04-15 05:47:57 +00:00
return -1;
}
// save info to a vnode_tmp.json
pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
2022-04-15 06:27:04 +00:00
if (taosWriteFile(pFile, data, strlen(data)) < 0) {
2022-04-15 05:47:57 +00:00
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
if (taosFsyncFile(pFile) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
taosCloseFile(&pFile);
// free info binary
taosMemoryFree(data);
2022-06-02 05:57:39 +00:00
vInfo("vgId:%d, vnode info is saved, fname: %s", pInfo->config.vgId, fname);
2022-04-15 05:47:57 +00:00
return 0;
_err:
taosCloseFile(&pFile);
taosMemoryFree(data);
return -1;
}
int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
char tfname[TSDB_FILENAME_LEN];
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
snprintf(tfname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME_TMP);
if (taosRenameFile(tfname, fname) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
2022-06-02 05:57:39 +00:00
vInfo("vgId:%d, vnode info is committed", pInfo->config.vgId);
2022-04-15 05:47:57 +00:00
return 0;
}
2022-04-15 07:07:44 +00:00
int vnodeLoadInfo(const char *dir, SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN];
TdFilePtr pFile = NULL;
char *pData = NULL;
int64_t size;
snprintf(fname, TSDB_FILENAME_LEN, "%s%s%s", dir, TD_DIRSEP, VND_INFO_FNAME);
// read info
pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (taosFStatFile(pFile, &size, NULL) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
2022-04-16 07:16:10 +00:00
pData = taosMemoryMalloc(size + 1);
2022-04-15 07:07:44 +00:00
if (pData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
if (taosReadFile(pFile, pData, size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
2022-04-16 07:16:10 +00:00
pData[size] = '\0';
2022-04-15 07:07:44 +00:00
taosCloseFile(&pFile);
// decode info
if (vnodeDecodeInfo(pData, pInfo) < 0) {
taosMemoryFree(pData);
return -1;
}
taosMemoryFree(pData);
2022-04-15 05:47:57 +00:00
return 0;
2022-04-15 07:07:44 +00:00
_err:
taosCloseFile(&pFile);
taosMemoryFree(pData);
return -1;
2022-04-15 05:47:57 +00:00
}
2021-09-27 02:49:36 +00:00
int vnodeAsyncCommit(SVnode *pVnode) {
2022-01-06 06:34:20 +00:00
vnodeWaitCommit(pVnode);
2022-04-20 06:56:34 +00:00
// vnodeBufPoolSwitch(pVnode);
2022-04-26 11:04:26 +00:00
// tsdbPrepareCommit(pVnode->pTsdb);
2021-12-17 08:47:30 +00:00
2022-04-24 07:34:53 +00:00
vnodeScheduleTask(vnodeCommitImpl, pVnode);
2022-04-14 06:16:43 +00:00
2021-12-14 06:32:07 +00:00
return 0;
}
2021-09-27 02:49:36 +00:00
2022-01-06 06:34:20 +00:00
int vnodeSyncCommit(SVnode *pVnode) {
vnodeAsyncCommit(pVnode);
vnodeWaitCommit(pVnode);
2022-01-10 08:53:57 +00:00
tsem_post(&(pVnode->canCommit));
2022-01-06 06:34:20 +00:00
return 0;
}
2022-04-24 07:34:53 +00:00
int vnodeCommit(SVnode *pVnode) {
SVnodeInfo info = {0};
2022-04-24 07:34:53 +00:00
char dir[TSDB_FILENAME_LEN];
2022-06-02 05:57:39 +00:00
vInfo("vgId:%d, start to commit, version: %" PRId64, TD_VID(pVnode), pVnode->state.applied);
2022-04-24 09:14:37 +00:00
2022-04-24 07:34:53 +00:00
pVnode->onCommit = pVnode->inUse;
pVnode->inUse = NULL;
// save info
info.config = pVnode->config;
2022-04-24 09:14:37 +00:00
info.state.committed = pVnode->state.applied;
2022-04-24 07:34:53 +00:00
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
if (vnodeSaveInfo(dir, &info) < 0) {
ASSERT(0);
return -1;
}
2022-07-01 12:54:57 +00:00
// preCommit
// TODO
2022-04-24 07:34:53 +00:00
// commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0);
return -1;
}
2022-05-02 10:16:43 +00:00
2022-06-01 07:06:12 +00:00
if (VND_IS_RSMA(pVnode)) {
2022-05-02 10:16:43 +00:00
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0);
return -1;
}
if (tsdbCommit(VND_RSMA1(pVnode)) < 0) {
ASSERT(0);
return -1;
}
if (tsdbCommit(VND_RSMA2(pVnode)) < 0) {
ASSERT(0);
return -1;
}
} else {
if (tsdbCommit(pVnode->pTsdb) < 0) {
ASSERT(0);
return -1;
}
2022-04-24 07:34:53 +00:00
}
2022-05-02 10:16:43 +00:00
2022-04-24 07:34:53 +00:00
if (tqCommit(pVnode->pTq) < 0) {
ASSERT(0);
return -1;
}
// walCommit (TODO)
// commit info
if (vnodeCommitInfo(dir, &info) < 0) {
ASSERT(0);
return -1;
}
2022-06-30 07:28:03 +00:00
pVnode->state.committed = info.state.committed;
2022-04-24 07:34:53 +00:00
2022-07-01 12:54:57 +00:00
// postCommit
smaPostCommit(pVnode->pSma);
2022-04-24 07:34:53 +00:00
// apply the commit (TODO)
vnodeBufPoolReset(pVnode->onCommit);
pVnode->onCommit->next = pVnode->pPool;
pVnode->pPool = pVnode->onCommit;
pVnode->onCommit = NULL;
2022-06-02 05:57:39 +00:00
vInfo("vgId:%d, commit over", TD_VID(pVnode));
2022-04-24 09:14:37 +00:00
2022-04-24 07:34:53 +00:00
return 0;
}
static int vnodeCommitImpl(void *arg) {
2021-12-14 06:32:07 +00:00
SVnode *pVnode = (SVnode *)arg;
2021-09-27 02:49:36 +00:00
2022-04-06 06:37:37 +00:00
// metaCommit(pVnode->pMeta);
2021-12-17 08:47:30 +00:00
tqCommit(pVnode->pTq);
2021-12-20 08:52:39 +00:00
tsdbCommit(pVnode->pTsdb);
2021-12-17 08:47:30 +00:00
2022-04-20 06:56:34 +00:00
// vnodeBufPoolRecycle(pVnode);
2021-12-20 08:52:39 +00:00
tsem_post(&(pVnode->canCommit));
2021-09-27 02:49:36 +00:00
return 0;
}
static int vnodeStartCommit(SVnode *pVnode) {
// TODO
return 0;
}
static int vnodeEndCommit(SVnode *pVnode) {
// TODO
return 0;
2022-01-06 06:34:20 +00:00
}
2022-04-15 05:47:57 +00:00
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
2022-04-15 06:27:04 +00:00
static int vnodeEncodeState(const void *pObj, SJson *pJson) {
const SVState *pState = (SVState *)pObj;
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
2022-04-20 11:30:18 +00:00
if (tjsonAddIntegerToObject(pJson, "applied version", pState->applied) < 0) return -1;
2022-04-15 06:27:04 +00:00
return 0;
}
static int vnodeDecodeState(const SJson *pJson, void *pObj) {
SVState *pState = (SVState *)pObj;
int32_t code;
tjsonGetNumberValue(pJson, "commit version", pState->committed, code);
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "applied version", pState->applied, code);
if(code < 0) return -1;
2022-04-15 06:27:04 +00:00
return 0;
}
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData) {
SJson *pJson;
char *pData;
*ppData = NULL;
pJson = tjsonCreateObject();
if (pJson == NULL) {
return -1;
}
if (tjsonAddObject(pJson, "config", vnodeEncodeConfig, (void *)&pInfo->config) < 0) {
goto _err;
}
if (tjsonAddObject(pJson, "state", vnodeEncodeState, (void *)&pInfo->state) < 0) {
goto _err;
}
pData = tjsonToString(pJson);
if (pData == NULL) {
goto _err;
}
tjsonDelete(pJson);
*ppData = pData;
2022-04-15 05:47:57 +00:00
return 0;
2022-04-15 06:27:04 +00:00
_err:
tjsonDelete(pJson);
return -1;
2022-04-15 05:47:57 +00:00
}
2022-04-15 07:07:44 +00:00
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo) {
2022-04-15 06:27:04 +00:00
SJson *pJson = NULL;
2022-04-16 07:26:11 +00:00
pJson = tjsonParse(pData);
2022-04-15 06:27:04 +00:00
if (pJson == NULL) {
return -1;
}
if (tjsonToObject(pJson, "config", vnodeDecodeConfig, (void *)&pInfo->config) < 0) {
goto _err;
}
if (tjsonToObject(pJson, "state", vnodeDecodeState, (void *)&pInfo->state) < 0) {
goto _err;
}
tjsonDelete(pJson);
2022-04-15 05:47:57 +00:00
return 0;
2022-04-15 06:27:04 +00:00
_err:
tjsonDelete(pJson);
return -1;
2022-04-15 05:47:57 +00:00
}