TDengine/source/dnode/vnode/src/vnd/vnodeOpen.c

196 lines
5.8 KiB
C
Raw Normal View History

2021-11-07 07:58:32 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-04-26 11:04:26 +00:00
#include "vnd.h"
2021-11-07 07:58:32 +00:00
2022-04-14 12:36:35 +00:00
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
2022-04-15 05:47:57 +00:00
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN];
2022-04-15 03:48:51 +00:00
2022-04-14 12:36:35 +00:00
// TODO: check if directory exists
// check config
if (vnodeCheckCfg(pCfg) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno));
2022-04-14 12:36:35 +00:00
return -1;
}
// create vnode env
2022-04-15 05:47:57 +00:00
if (tfsMkdir(pTfs, path) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to create vnode since: %s", pCfg->vgId, tstrerror(terrno));
2022-04-15 05:47:57 +00:00
return -1;
}
2022-04-14 12:36:35 +00:00
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
2022-04-15 05:47:57 +00:00
info.config = *pCfg;
2022-04-20 11:58:36 +00:00
info.state.committed = -1;
info.state.applied = -1;
2022-04-15 05:47:57 +00:00
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to save vnode config since %s", pCfg->vgId, tstrerror(terrno));
2022-04-14 12:36:35 +00:00
return -1;
}
2022-06-02 05:57:39 +00:00
vInfo("vgId:%d, vnode is created", pCfg->vgId);
2022-04-15 06:27:04 +00:00
2022-04-14 12:36:35 +00:00
return 0;
}
2022-04-16 07:04:25 +00:00
void vnodeDestroy(const char *path, STfs *pTfs) { tfsRmdir(pTfs, path); }
2022-04-16 07:16:10 +00:00
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
SVnode *pVnode = NULL;
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN];
2022-04-16 10:17:33 +00:00
char tdir[TSDB_FILENAME_LEN * 2];
2022-04-16 07:16:10 +00:00
int ret;
2021-11-08 02:04:24 +00:00
2022-04-16 07:16:10 +00:00
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
// load vnode info
ret = vnodeLoadInfo(dir, &info);
if (ret < 0) {
vError("failed to open vnode from %s since %s", path, tstrerror(terrno));
2021-11-08 02:04:24 +00:00
return NULL;
}
2022-04-16 08:19:21 +00:00
// create handle
2022-04-16 11:22:49 +00:00
pVnode = (SVnode *)taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
2021-11-08 02:04:24 +00:00
if (pVnode == NULL) {
2022-04-16 08:19:21 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to open vnode since %s", info.config.vgId, tstrerror(terrno));
2021-11-08 02:04:24 +00:00
return NULL;
}
2022-04-16 11:22:49 +00:00
pVnode->path = (char *)&pVnode[1];
strcpy(pVnode->path, path);
2022-04-16 08:19:21 +00:00
pVnode->config = info.config;
2022-05-21 03:13:58 +00:00
pVnode->state.committed = info.state.committed;
pVnode->state.applied = info.state.committed;
2022-04-16 08:19:21 +00:00
pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb;
2022-06-08 06:44:42 +00:00
pVnode->syncCount = 0;
2022-04-16 08:19:21 +00:00
2022-06-08 06:44:42 +00:00
tsem_init(&pVnode->syncSem, 0, 0);
2022-04-16 08:19:21 +00:00
tsem_init(&(pVnode->canCommit), 0, 1);
2022-04-16 10:17:33 +00:00
// open buffer pool
2022-04-20 06:56:34 +00:00
if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
2022-04-16 10:17:33 +00:00
goto _err;
2021-11-08 06:50:20 +00:00
}
2022-04-16 10:17:33 +00:00
// open meta
2022-04-16 11:22:49 +00:00
if (metaOpen(pVnode, &pVnode->pMeta) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to open vnode meta since %s", TD_VID(pVnode), tstrerror(terrno));
2022-04-16 10:17:33 +00:00
goto _err;
2021-11-08 05:40:45 +00:00
}
2022-04-16 10:17:33 +00:00
// open tsdb
2022-06-01 07:06:12 +00:00
if (!VND_IS_RSMA(pVnode) && tsdbOpen(pVnode, &VND_TSDB(pVnode), VNODE_TSDB_DIR, NULL) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open sma
if (smaOpen(pVnode)) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
2021-11-08 05:40:45 +00:00
}
2022-04-16 10:17:33 +00:00
// open wal
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_WAL_DIR);
2022-05-19 07:18:18 +00:00
taosRealPath(tdir, NULL, sizeof(tdir));
2022-04-16 10:17:33 +00:00
pVnode->pWal = walOpen(tdir, &(pVnode->config.walCfg));
2021-11-16 07:49:05 +00:00
if (pVnode->pWal == NULL) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to open vnode wal since %s", TD_VID(pVnode), tstrerror(terrno));
2022-04-16 10:17:33 +00:00
goto _err;
2021-11-16 07:49:05 +00:00
}
2021-11-08 05:40:45 +00:00
2022-04-16 10:17:33 +00:00
// open tq
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
2022-05-19 07:18:18 +00:00
taosRealPath(tdir, NULL, sizeof(tdir));
2022-04-22 02:55:17 +00:00
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal);
2022-01-20 07:39:28 +00:00
if (pVnode->pTq == NULL) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
2022-04-16 10:17:33 +00:00
goto _err;
2022-01-20 07:39:28 +00:00
}
2022-04-16 10:17:33 +00:00
// open query
2021-12-24 01:41:09 +00:00
if (vnodeQueryOpen(pVnode)) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to open vnode query since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-04-16 10:17:33 +00:00
goto _err;
2021-12-24 01:41:09 +00:00
}
2022-04-19 13:10:03 +00:00
// vnode begin
if (vnodeBegin(pVnode) < 0) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to begin since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-04-16 10:17:33 +00:00
goto _err;
}
// open sync
if (vnodeSyncOpen(pVnode, dir)) {
2022-06-02 05:57:39 +00:00
vError("vgId:%d, failed to open sync since %s", TD_VID(pVnode), tstrerror(terrno));
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-04-16 10:17:33 +00:00
goto _err;
}
return pVnode;
_err:
if (pVnode->pQuery) vnodeQueryClose(pVnode);
if (pVnode->pTq) tqClose(pVnode->pTq);
if (pVnode->pWal) walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
2022-04-16 10:17:33 +00:00
if (pVnode->pMeta) metaClose(pVnode->pMeta);
if (pVnode->pSma) smaClose(pVnode->pSma);
2022-04-16 10:17:33 +00:00
tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode);
return NULL;
2021-11-08 02:17:51 +00:00
}
2022-04-16 10:17:33 +00:00
void vnodeClose(SVnode *pVnode) {
2021-11-08 05:40:45 +00:00
if (pVnode) {
2022-04-24 07:36:26 +00:00
vnodeCommit(pVnode);
2022-04-20 02:54:27 +00:00
vnodeSyncClose(pVnode);
2022-03-09 08:13:46 +00:00
vnodeQueryClose(pVnode);
2022-04-16 10:17:33 +00:00
walClose(pVnode->pWal);
tqClose(pVnode->pTq);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
smaClose(pVnode->pSma);
2022-04-16 10:17:33 +00:00
metaClose(pVnode->pMeta);
vnodeCloseBufPool(pVnode);
// destroy handle
tsem_destroy(&(pVnode->canCommit));
2022-06-08 06:44:42 +00:00
tsem_destroy(&pVnode->syncSem);
2022-04-16 10:17:33 +00:00
taosMemoryFree(pVnode);
2021-11-08 05:40:45 +00:00
}
2022-01-20 07:18:33 +00:00
}
2022-04-19 09:07:42 +00:00
2022-04-25 07:39:52 +00:00
// start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) {
vnodeSyncStart(pVnode);
return 0;
}
void vnodeStop(SVnode *pVnode) {}
2022-04-20 11:30:18 +00:00
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
2022-06-01 09:26:15 +00:00
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }