TDengine/source/dnode/vnode/src/vnd/vnodeModule.c

189 lines
4.7 KiB
C
Raw Normal View History

2022-04-14 05:53:45 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-04-26 11:04:26 +00:00
#include "vnd.h"
2023-08-04 05:31:00 +00:00
#include "vndCos.h"
2022-04-14 05:53:45 +00:00
typedef struct SVnodeTask SVnodeTask;
struct SVnodeTask {
SVnodeTask* next;
SVnodeTask* prev;
int (*execute)(void*);
void* arg;
};
2023-06-13 09:16:06 +00:00
typedef struct {
2022-04-14 05:53:45 +00:00
int nthreads;
TdThread* threads;
TdThreadMutex mutex;
TdThreadCond hasTask;
SVnodeTask queue;
2023-06-13 09:16:06 +00:00
} SVnodeThreadPool;
struct SVnodeGlobal {
int8_t init;
int8_t stop;
SVnodeThreadPool tp[2];
2022-04-14 05:53:45 +00:00
};
struct SVnodeGlobal vnodeGlobal;
static void* loop(void* arg);
int vnodeInit(int nthreads) {
int8_t init;
int ret;
init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 0, 1);
if (init) {
return 0;
}
2023-06-13 09:16:06 +00:00
vnodeGlobal.stop = 0;
2022-04-14 05:53:45 +00:00
2023-06-13 09:16:06 +00:00
for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) {
taosThreadMutexInit(&vnodeGlobal.tp[i].mutex, NULL);
taosThreadCondInit(&vnodeGlobal.tp[i].hasTask, NULL);
2022-04-14 05:53:45 +00:00
2023-06-13 09:16:06 +00:00
taosThreadMutexLock(&vnodeGlobal.tp[i].mutex);
2022-10-14 05:34:25 +00:00
2023-06-13 09:16:06 +00:00
vnodeGlobal.tp[i].queue.next = &vnodeGlobal.tp[i].queue;
vnodeGlobal.tp[i].queue.prev = &vnodeGlobal.tp[i].queue;
2022-04-14 05:53:45 +00:00
2023-06-13 09:16:06 +00:00
taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex));
2022-10-14 05:34:25 +00:00
2023-06-13 09:16:06 +00:00
vnodeGlobal.tp[i].nthreads = nthreads;
vnodeGlobal.tp[i].threads = taosMemoryCalloc(nthreads, sizeof(TdThread));
if (vnodeGlobal.tp[i].threads == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
vError("failed to init vnode module since:%s", tstrerror(terrno));
return -1;
}
2022-04-14 05:53:45 +00:00
2023-06-13 09:16:06 +00:00
for (int j = 0; j < nthreads; j++) {
taosThreadCreate(&(vnodeGlobal.tp[i].threads[j]), NULL, loop, &vnodeGlobal.tp[i]);
}
2022-04-14 05:53:45 +00:00
}
if (walInit() < 0) {
return -1;
}
2022-06-01 11:57:03 +00:00
if (tqInit() < 0) {
return -1;
}
2023-08-04 05:31:00 +00:00
if (s3Init() < 0) {
return -1;
}
2022-04-14 05:53:45 +00:00
return 0;
}
void vnodeCleanup() {
int8_t init;
init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 1, 0);
if (init == 0) return;
// set stop
vnodeGlobal.stop = 1;
2023-06-13 09:16:06 +00:00
for (int32_t i = 0; i < ARRAY_SIZE(vnodeGlobal.tp); i++) {
taosThreadMutexLock(&(vnodeGlobal.tp[i].mutex));
taosThreadCondBroadcast(&(vnodeGlobal.tp[i].hasTask));
taosThreadMutexUnlock(&(vnodeGlobal.tp[i].mutex));
// wait for threads
for (int j = 0; j < vnodeGlobal.tp[i].nthreads; j++) {
taosThreadJoin(vnodeGlobal.tp[i].threads[j], NULL);
}
2022-04-14 05:53:45 +00:00
2023-06-13 09:16:06 +00:00
// clear source
taosMemoryFreeClear(vnodeGlobal.tp[i].threads);
taosThreadCondDestroy(&(vnodeGlobal.tp[i].hasTask));
taosThreadMutexDestroy(&(vnodeGlobal.tp[i].mutex));
2022-04-14 05:53:45 +00:00
}
2022-06-01 11:57:03 +00:00
walCleanUp();
tqCleanUp();
2022-07-09 07:44:37 +00:00
smaCleanUp();
2023-08-04 05:31:00 +00:00
s3CleanUp();
2022-04-14 05:53:45 +00:00
}
2023-06-13 09:16:06 +00:00
int vnodeScheduleTaskEx(int tpid, int (*execute)(void*), void* arg) {
2022-04-14 05:53:45 +00:00
SVnodeTask* pTask;
ASSERT(!vnodeGlobal.stop);
2022-04-14 05:53:45 +00:00
pTask = taosMemoryMalloc(sizeof(*pTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pTask->execute = execute;
pTask->arg = arg;
2023-06-13 09:16:06 +00:00
taosThreadMutexLock(&(vnodeGlobal.tp[tpid].mutex));
pTask->next = &vnodeGlobal.tp[tpid].queue;
pTask->prev = vnodeGlobal.tp[tpid].queue.prev;
vnodeGlobal.tp[tpid].queue.prev->next = pTask;
vnodeGlobal.tp[tpid].queue.prev = pTask;
taosThreadCondSignal(&(vnodeGlobal.tp[tpid].hasTask));
taosThreadMutexUnlock(&(vnodeGlobal.tp[tpid].mutex));
2022-04-14 05:53:45 +00:00
return 0;
}
2023-06-13 09:16:06 +00:00
int vnodeScheduleTask(int (*execute)(void*), void* arg) { return vnodeScheduleTaskEx(0, execute, arg); }
2022-04-14 05:53:45 +00:00
/* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) {
2023-06-13 09:16:06 +00:00
SVnodeThreadPool* tp = (SVnodeThreadPool*)arg;
SVnodeTask* pTask;
int ret;
2022-04-14 05:53:45 +00:00
2023-06-29 06:03:01 +00:00
if (tp == &vnodeGlobal.tp[0]) {
setThreadName("vnode-commit");
} else if (tp == &vnodeGlobal.tp[1]) {
setThreadName("vnode-merge");
}
2022-04-14 05:53:45 +00:00
for (;;) {
2023-06-13 09:16:06 +00:00
taosThreadMutexLock(&(tp->mutex));
2022-04-14 05:53:45 +00:00
for (;;) {
2023-06-13 09:16:06 +00:00
pTask = tp->queue.next;
if (pTask == &tp->queue) {
2022-04-14 05:53:45 +00:00
// no task
if (vnodeGlobal.stop) {
2023-06-13 09:16:06 +00:00
taosThreadMutexUnlock(&(tp->mutex));
2022-04-14 05:53:45 +00:00
return NULL;
} else {
2023-06-13 09:16:06 +00:00
taosThreadCondWait(&(tp->hasTask), &(tp->mutex));
2022-04-14 05:53:45 +00:00
}
} else {
// has task
pTask->prev->next = pTask->next;
pTask->next->prev = pTask->prev;
break;
}
}
2023-06-13 09:16:06 +00:00
taosThreadMutexUnlock(&(tp->mutex));
2022-04-14 05:53:45 +00:00
pTask->execute(pTask->arg);
taosMemoryFree(pTask);
}
return NULL;
2022-06-01 11:57:03 +00:00
}