TDengine/source/dnode/vnode/impl/src/vnodeBufferPool.c

186 lines
5.1 KiB
C
Raw Normal View History

2021-11-12 02:53:52 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
/* ------------------------ STRUCTURES ------------------------ */
2021-11-16 03:41:53 +00:00
#define VNODE_BUF_POOL_SHARDS 3
2021-11-12 02:53:52 +00:00
struct SVBufPool {
2021-12-14 07:26:38 +00:00
pthread_mutex_t mutex;
pthread_cond_t hasFree;
2021-12-13 13:06:33 +00:00
TD_DLIST(SVMemAllocator) free;
TD_DLIST(SVMemAllocator) incycle;
2021-12-13 08:31:39 +00:00
SVMemAllocator *inuse;
2021-12-16 07:24:33 +00:00
// MAF for submodules to use
SMemAllocatorFactory *pMAF;
2021-11-12 02:53:52 +00:00
};
2021-12-16 08:20:46 +00:00
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF);
static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA);
2021-12-13 08:31:39 +00:00
int vnodeOpenBufPool(SVnode *pVnode) {
uint64_t capacity;
if ((pVnode->pBufPool = (SVBufPool *)calloc(1, sizeof(SVBufPool))) == NULL) {
/* TODO */
return -1;
}
2021-12-17 06:49:34 +00:00
TD_DLIST_INIT(&(pVnode->pBufPool->free));
TD_DLIST_INIT(&(pVnode->pBufPool->incycle));
2021-12-13 08:31:39 +00:00
pVnode->pBufPool->inuse = NULL;
// TODO
capacity = pVnode->config.wsize / VNODE_BUF_POOL_SHARDS;
for (int i = 0; i < VNODE_BUF_POOL_SHARDS; i++) {
SVMemAllocator *pVMA = vmaCreate(capacity, pVnode->config.ssize, pVnode->config.lsize);
if (pVMA == NULL) {
// TODO: handle error
return -1;
}
2021-12-17 06:49:34 +00:00
TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
2021-12-13 08:31:39 +00:00
}
2021-12-16 08:20:46 +00:00
pVnode->pBufPool->pMAF = (SMemAllocatorFactory *)malloc(sizeof(SMemAllocatorFactory));
if (pVnode->pBufPool->pMAF == NULL) {
// TODO: handle error
return -1;
}
pVnode->pBufPool->pMAF->impl = pVnode;
pVnode->pBufPool->pMAF->create = vBufPoolCreateMA;
pVnode->pBufPool->pMAF->destroy = vBufPoolDestroyMA;
2021-12-13 08:31:39 +00:00
return 0;
}
void vnodeCloseBufPool(SVnode *pVnode) {
if (pVnode->pBufPool) {
vmaDestroy(pVnode->pBufPool->inuse);
while (true) {
2021-12-13 13:50:34 +00:00
SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->incycle));
2021-12-13 08:31:39 +00:00
if (pVMA == NULL) break;
2021-12-17 06:49:34 +00:00
TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA);
2021-12-13 08:31:39 +00:00
vmaDestroy(pVMA);
}
while (true) {
2021-12-13 13:50:34 +00:00
SVMemAllocator *pVMA = TD_DLIST_HEAD(&(pVnode->pBufPool->free));
2021-12-13 08:31:39 +00:00
if (pVMA == NULL) break;
2021-12-17 06:49:34 +00:00
TD_DLIST_POP(&(pVnode->pBufPool->free), pVMA);
2021-12-13 08:31:39 +00:00
vmaDestroy(pVMA);
}
free(pVnode->pBufPool);
pVnode->pBufPool = NULL;
}
}
2021-12-14 06:32:07 +00:00
int vnodeBufPoolSwitch(SVnode *pVnode) {
SVMemAllocator *pvma = pVnode->pBufPool->inuse;
pVnode->pBufPool->inuse = NULL;
2021-12-17 06:49:34 +00:00
TD_DLIST_APPEND(&(pVnode->pBufPool->incycle), pvma);
2021-12-14 06:32:07 +00:00
return 0;
}
int vnodeBufPoolRecycle(SVnode *pVnode) {
SVBufPool * pBufPool = pVnode->pBufPool;
SVMemAllocator *pvma = TD_DLIST_HEAD(&(pBufPool->incycle));
ASSERT(pvma != NULL);
2021-12-17 06:49:34 +00:00
TD_DLIST_POP(&(pBufPool->incycle), pvma);
2021-12-14 06:32:07 +00:00
vmaReset(pvma);
2021-12-17 06:49:34 +00:00
TD_DLIST_APPEND(&(pBufPool->free), pvma);
2021-12-14 06:32:07 +00:00
return 0;
}
2021-12-13 08:31:39 +00:00
void *vnodeMalloc(SVnode *pVnode, uint64_t size) {
2021-12-13 10:06:32 +00:00
SVBufPool *pBufPool = pVnode->pBufPool;
if (pBufPool->inuse == NULL) {
while (true) {
// TODO: add sem_wait and sem_post
2021-12-13 13:50:34 +00:00
pBufPool->inuse = TD_DLIST_HEAD(&(pBufPool->free));
2021-12-13 10:06:32 +00:00
if (pBufPool->inuse) {
2021-12-17 06:49:34 +00:00
TD_DLIST_POP(&(pBufPool->free), pBufPool->inuse);
2021-12-13 10:06:32 +00:00
break;
2021-12-14 07:26:38 +00:00
} else {
// tsem_wait(&(pBufPool->hasFree));
2021-12-13 10:06:32 +00:00
}
}
}
return vmaMalloc(pBufPool->inuse, size);
2021-12-13 08:31:39 +00:00
}
bool vnodeBufPoolIsFull(SVnode *pVnode) {
2021-12-13 10:06:32 +00:00
if (pVnode->pBufPool->inuse == NULL) return false;
return vmaIsFull(pVnode->pBufPool->inuse);
2021-12-13 08:31:39 +00:00
}
2021-12-16 07:24:33 +00:00
SMemAllocatorFactory *vBufPoolGetMAF(SVnode *pVnode) { return pVnode->pBufPool->pMAF; }
2021-12-16 08:20:46 +00:00
/* ------------------------ STATIC METHODS ------------------------ */
2021-11-17 07:02:42 +00:00
typedef struct {
2021-12-16 08:20:46 +00:00
SVnode * pVnode;
SVMemAllocator *pVMA;
2021-11-17 07:02:42 +00:00
} SVMAWrapper;
2021-12-16 08:20:46 +00:00
static FORCE_INLINE void *vmaMaloocCb(SMemAllocator *pMA, uint64_t size) {
SVMAWrapper *pWrapper = (SVMAWrapper *)(pMA->impl);
2021-11-12 02:53:52 +00:00
2021-12-16 08:20:46 +00:00
return vmaMalloc(pWrapper->pVMA, size);
2021-11-12 02:53:52 +00:00
}
2021-12-16 08:20:46 +00:00
// TODO: Add atomic operations here
static SMemAllocator *vBufPoolCreateMA(SMemAllocatorFactory *pMAF) {
SMemAllocator *pMA;
SVnode * pVnode = (SVnode *)(pMAF->impl);
SVMAWrapper * pWrapper;
2021-11-12 02:53:52 +00:00
2021-12-16 08:20:46 +00:00
pMA = (SMemAllocator *)calloc(1, sizeof(*pMA) + sizeof(SVMAWrapper));
if (pMA == NULL) {
2021-11-17 07:02:42 +00:00
return NULL;
}
2021-12-16 08:20:46 +00:00
pVnode->pBufPool->inuse->_ref.val++;
pWrapper = POINTER_SHIFT(pMA, sizeof(*pMA));
pWrapper->pVnode = pVnode;
pWrapper->pVMA = pVnode->pBufPool->inuse;
2021-11-17 07:02:42 +00:00
2021-12-16 08:20:46 +00:00
pMA->impl = pWrapper;
2021-12-17 08:12:49 +00:00
TD_MA_MALLOC_FUNC(pMA) = vmaMaloocCb;
2021-11-17 07:02:42 +00:00
2021-12-16 08:20:46 +00:00
return pMA;
2021-11-17 07:02:42 +00:00
}
2021-12-16 08:20:46 +00:00
static void vBufPoolDestroyMA(SMemAllocatorFactory *pMAF, SMemAllocator *pMA) {
SVMAWrapper * pWrapper = (SVMAWrapper *)(pMA->impl);
SVnode * pVnode = pWrapper->pVnode;
SVMemAllocator *pVMA = pWrapper->pVMA;
2021-11-17 07:31:03 +00:00
2021-12-16 08:20:46 +00:00
free(pMA);
if (--pVMA->_ref.val == 0) {
2021-12-17 06:49:34 +00:00
TD_DLIST_POP(&(pVnode->pBufPool->incycle), pVMA);
TD_DLIST_APPEND(&(pVnode->pBufPool->free), pVMA);
2021-11-17 07:31:03 +00:00
}
2021-12-16 08:20:46 +00:00
}