TDengine/source/util/src/tqueue.c

467 lines
11 KiB
C
Raw Normal View History

2020-02-25 07:26:29 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2022-01-25 08:40:07 +00:00
#define _DEFAULT_SOURCE
2020-02-25 07:26:29 +00:00
#include "tqueue.h"
2022-01-25 08:40:07 +00:00
#include "taoserror.h"
2022-02-27 03:05:46 +00:00
#include "tlog.h"
typedef struct STaosQnode STaosQnode;
2020-02-25 07:26:29 +00:00
2020-05-29 13:09:01 +00:00
typedef struct STaosQnode {
STaosQnode *next;
2022-01-25 09:14:24 +00:00
STaosQueue *queue;
char item[];
2020-03-07 14:57:25 +00:00
} STaosQnode;
2020-05-29 13:09:01 +00:00
typedef struct STaosQueue {
int32_t itemSize;
int32_t numOfItems;
2022-01-25 09:14:24 +00:00
int32_t threadId;
2022-02-28 06:16:50 +00:00
STaosQnode *head;
STaosQnode *tail;
STaosQueue *next; // for queue set
STaosQset *qset; // for queue set
void *ahandle; // for queue set
2022-01-25 09:14:24 +00:00
FItem itemFp;
FItems itemsFp;
2022-03-19 16:47:45 +00:00
TdThreadMutex mutex;
2020-03-07 14:57:25 +00:00
} STaosQueue;
2020-05-29 13:09:01 +00:00
typedef struct STaosQset {
2022-02-28 06:16:50 +00:00
STaosQueue *head;
STaosQueue *current;
2022-03-19 16:47:45 +00:00
TdThreadMutex mutex;
2021-10-29 09:11:15 +00:00
int32_t numOfQueues;
int32_t numOfItems;
tsem_t sem;
2020-03-07 14:57:25 +00:00
} STaosQset;
2020-05-29 13:09:01 +00:00
typedef struct STaosQall {
2021-10-29 09:11:15 +00:00
STaosQnode *current;
STaosQnode *start;
int32_t itemSize;
int32_t numOfItems;
} STaosQall;
STaosQueue *taosOpenQueue() {
2022-03-25 16:29:53 +00:00
STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue));
2020-03-07 14:57:25 +00:00
if (queue == NULL) {
2021-11-11 09:31:52 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2020-03-07 14:57:25 +00:00
return NULL;
2020-02-25 07:26:29 +00:00
}
2022-03-19 16:47:45 +00:00
if (taosThreadMutexInit(&queue->mutex, NULL) != 0) {
2022-01-25 08:40:07 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
2022-01-25 08:40:07 +00:00
uDebug("queue:%p is opened", queue);
2020-03-07 14:57:25 +00:00
return queue;
}
2020-02-25 07:26:29 +00:00
2022-01-25 09:14:24 +00:00
void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp) {
if (queue == NULL) return;
2021-11-01 11:49:44 +00:00
queue->itemFp = itemFp;
queue->itemsFp = itemsFp;
2021-10-29 09:11:15 +00:00
}
void taosCloseQueue(STaosQueue *queue) {
if (queue == NULL) return;
2020-03-07 14:57:25 +00:00
STaosQnode *pTemp;
2022-02-28 06:16:50 +00:00
STaosQset *qset;
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
2021-10-29 09:11:15 +00:00
STaosQnode *pNode = queue->head;
2020-03-07 14:57:25 +00:00
queue->head = NULL;
qset = queue->qset;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
2020-02-25 07:26:29 +00:00
2022-01-25 08:40:07 +00:00
if (queue->qset) {
taosRemoveFromQset(qset, queue);
}
2020-02-25 07:26:29 +00:00
2020-03-07 14:57:25 +00:00
while (pNode) {
pTemp = pNode;
pNode = pNode->next;
2022-03-25 16:29:53 +00:00
taosMemoryFree(pTemp);
2020-02-25 07:26:29 +00:00
}
2022-03-19 16:47:45 +00:00
taosThreadMutexDestroy(&queue->mutex);
2022-03-25 16:29:53 +00:00
taosMemoryFree(queue);
2020-08-01 11:49:58 +00:00
2022-01-25 08:40:07 +00:00
uDebug("queue:%p is closed", queue);
2020-03-07 14:57:25 +00:00
}
2020-02-25 07:26:29 +00:00
bool taosQueueEmpty(STaosQueue *queue) {
if (queue == NULL) return true;
2021-11-04 09:03:24 +00:00
bool empty = false;
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
2021-11-04 09:03:24 +00:00
if (queue->head == NULL && queue->tail == NULL) {
empty = true;
}
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
2021-11-04 09:03:24 +00:00
return empty;
}
int32_t taosQueueSize(STaosQueue *queue) {
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
int32_t numOfItems = queue->numOfItems;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
return numOfItems;
}
void *taosAllocateQitem(int32_t size) {
2022-03-25 16:29:53 +00:00
STaosQnode *pNode = taosMemoryCalloc(1, sizeof(STaosQnode) + size);
2022-01-25 08:40:07 +00:00
if (pNode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
2021-10-29 09:11:15 +00:00
uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
2020-03-15 05:44:07 +00:00
return (void *)pNode->item;
}
2020-02-25 07:26:29 +00:00
2022-01-25 09:14:24 +00:00
void taosFreeQitem(void *pItem) {
if (pItem == NULL) return;
2020-03-15 05:44:07 +00:00
2022-01-25 09:14:24 +00:00
char *temp = pItem;
2020-03-15 05:44:07 +00:00
temp -= sizeof(STaosQnode);
2022-01-25 09:14:24 +00:00
uTrace("item:%p, node:%p is freed", pItem, temp);
2022-03-25 16:29:53 +00:00
taosMemoryFree(temp);
2020-03-15 05:44:07 +00:00
}
2020-02-25 07:26:29 +00:00
2022-03-28 09:25:12 +00:00
void taosWriteQitem(STaosQueue *queue, void *pItem) {
STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode));
pNode->next = NULL;
2020-02-25 07:26:29 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
2020-02-25 07:26:29 +00:00
2020-03-07 14:57:25 +00:00
if (queue->tail) {
queue->tail->next = pNode;
queue->tail = pNode;
} else {
queue->head = pNode;
2021-10-29 09:11:15 +00:00
queue->tail = pNode;
2020-03-07 14:57:25 +00:00
}
queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
uTrace("item:%p is put into queue:%p, items:%d", pItem, queue, queue->numOfItems);
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
2020-03-08 02:28:30 +00:00
if (queue->qset) tsem_post(&queue->qset->sem);
2020-02-25 07:26:29 +00:00
}
int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {
2020-03-07 14:57:25 +00:00
STaosQnode *pNode = NULL;
int32_t code = 0;
2020-02-25 07:26:29 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
2020-02-25 07:26:29 +00:00
2020-03-07 14:57:25 +00:00
if (queue->head) {
2021-10-29 09:11:15 +00:00
pNode = queue->head;
*ppItem = pNode->item;
2021-10-29 09:11:15 +00:00
queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1;
2022-01-25 08:40:07 +00:00
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
2021-10-29 09:11:15 +00:00
}
2020-02-25 07:26:29 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
2020-02-25 07:26:29 +00:00
2020-03-07 14:57:25 +00:00
return code;
}
2020-02-25 07:26:29 +00:00
2022-03-25 16:29:53 +00:00
STaosQall *taosAllocateQall() { return taosMemoryCalloc(1, sizeof(STaosQall)); }
2022-03-25 16:29:53 +00:00
void taosFreeQall(STaosQall *qall) { taosMemoryFree(qall); }
int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
int32_t code = 0;
bool empty;
2020-03-07 14:57:25 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
2020-03-07 14:57:25 +00:00
2021-07-15 05:03:25 +00:00
empty = queue->head == NULL;
if (!empty) {
memset(qall, 0, sizeof(STaosQall));
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize;
code = qall->numOfItems;
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
2021-07-15 05:03:25 +00:00
}
2020-02-25 07:26:29 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
2021-07-15 05:03:25 +00:00
// if source queue is empty, we set destination qall to empty too.
if (empty) {
qall->current = NULL;
qall->start = NULL;
qall->numOfItems = 0;
}
return code;
2020-03-07 14:57:25 +00:00
}
2020-02-25 07:26:29 +00:00
int32_t taosGetQitem(STaosQall *qall, void **ppItem) {
2020-03-07 14:57:25 +00:00
STaosQnode *pNode;
int32_t num = 0;
2020-02-25 07:26:29 +00:00
2020-03-07 14:57:25 +00:00
pNode = qall->current;
2021-10-29 09:11:15 +00:00
if (pNode) qall->current = pNode->next;
2020-03-07 14:57:25 +00:00
if (pNode) {
*ppItem = pNode->item;
2020-03-07 14:57:25 +00:00
num = 1;
uTrace("item:%p is fetched", *ppItem);
2020-02-25 07:26:29 +00:00
}
2020-03-07 14:57:25 +00:00
return num;
2020-02-25 07:26:29 +00:00
}
void taosResetQitems(STaosQall *qall) { qall->current = qall->start; }
2020-03-07 14:57:25 +00:00
STaosQset *taosOpenQset() {
2022-03-25 16:29:53 +00:00
STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);
2020-03-07 14:57:25 +00:00
if (qset == NULL) {
2021-11-11 09:31:52 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2020-03-07 14:57:25 +00:00
return NULL;
2020-02-25 07:26:29 +00:00
}
2022-03-19 16:47:45 +00:00
taosThreadMutexInit(&qset->mutex, NULL);
tsem_init(&qset->sem, 0, 0);
2020-02-25 07:26:29 +00:00
2022-01-25 08:40:07 +00:00
uDebug("qset:%p is opened", qset);
2020-03-07 14:57:25 +00:00
return qset;
}
2020-02-25 07:26:29 +00:00
void taosCloseQset(STaosQset *qset) {
if (qset == NULL) return;
// remove all the queues from qset
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&qset->mutex);
while (qset->head) {
STaosQueue *queue = qset->head;
qset->head = qset->head->next;
queue->qset = NULL;
queue->next = NULL;
}
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&qset->mutex);
2022-03-19 16:47:45 +00:00
taosThreadMutexDestroy(&qset->mutex);
tsem_destroy(&qset->sem);
2022-03-25 16:29:53 +00:00
taosMemoryFree(qset);
2022-01-25 08:40:07 +00:00
uDebug("qset:%p is closed", qset);
2020-03-07 14:57:25 +00:00
}
2020-05-20 06:02:47 +00:00
// tsem_post 'qset->sem', so that reader threads waiting for it
// resumes execution and return, should only be used to signal the
// thread to exit.
void taosQsetThreadResume(STaosQset *qset) {
2020-10-11 07:51:17 +00:00
uDebug("qset:%p, it will exit", qset);
2020-05-20 06:02:47 +00:00
tsem_post(&qset->sem);
}
int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle) {
2021-10-29 09:11:15 +00:00
if (queue->qset) return -1;
2020-03-07 14:57:25 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&qset->mutex);
2020-03-07 14:57:25 +00:00
queue->next = qset->head;
2020-03-15 05:44:07 +00:00
queue->ahandle = ahandle;
2020-03-07 14:57:25 +00:00
qset->head = queue;
qset->numOfQueues++;
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
2020-03-07 14:57:25 +00:00
atomic_add_fetch_32(&qset->numOfItems, queue->numOfItems);
queue->qset = qset;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
2020-03-07 14:57:25 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&qset->mutex);
2020-02-25 07:26:29 +00:00
2020-08-01 11:49:58 +00:00
uTrace("queue:%p is added into qset:%p", queue, qset);
2020-02-25 07:26:29 +00:00
return 0;
}
void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) {
STaosQueue *tqueue = NULL;
2020-03-07 14:57:25 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&qset->mutex);
2020-03-07 14:57:25 +00:00
if (qset->head) {
if (qset->head == queue) {
qset->head = qset->head->next;
tqueue = queue;
2020-03-07 14:57:25 +00:00
} else {
STaosQueue *prev = qset->head;
tqueue = qset->head->next;
while (tqueue) {
2020-08-01 11:49:58 +00:00
assert(tqueue->qset);
2021-10-29 09:11:15 +00:00
if (tqueue == queue) {
2020-03-07 14:57:25 +00:00
prev->next = tqueue->next;
break;
2020-03-07 14:57:25 +00:00
} else {
prev = tqueue;
tqueue = tqueue->next;
}
}
}
if (tqueue) {
if (qset->current == queue) qset->current = tqueue->next;
qset->numOfQueues--;
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
atomic_sub_fetch_32(&qset->numOfItems, queue->numOfItems);
queue->qset = NULL;
queue->next = NULL;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
}
2021-10-29 09:11:15 +00:00
}
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&qset->mutex);
2020-08-01 11:49:58 +00:00
2022-01-25 08:40:07 +00:00
uDebug("queue:%p is removed from qset:%p", queue, qset);
2020-03-07 14:57:25 +00:00
}
int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; }
2020-03-07 14:57:25 +00:00
2022-01-25 09:14:24 +00:00
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp) {
2020-03-07 14:57:25 +00:00
STaosQnode *pNode = NULL;
int32_t code = 0;
2021-10-29 09:11:15 +00:00
tsem_wait(&qset->sem);
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&qset->mutex);
2020-03-07 14:57:25 +00:00
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
2021-10-29 09:11:15 +00:00
if (qset->current == NULL) qset->current = qset->head;
2020-03-07 14:57:25 +00:00
STaosQueue *queue = qset->current;
2020-03-08 02:28:30 +00:00
if (queue) qset->current = queue->next;
if (queue == NULL) break;
if (queue->head == NULL) continue;
2020-03-07 14:57:25 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
2020-03-07 14:57:25 +00:00
if (queue->head) {
2021-10-29 09:11:15 +00:00
pNode = queue->head;
*ppItem = pNode->item;
2021-10-29 09:11:15 +00:00
if (ahandle) *ahandle = queue->ahandle;
2021-11-01 11:49:44 +00:00
if (itemFp) *itemFp = queue->itemFp;
2022-01-25 08:40:07 +00:00
2021-10-29 09:11:15 +00:00
queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1;
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
2021-10-29 09:11:15 +00:00
}
2020-03-07 14:57:25 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
2020-03-07 14:57:25 +00:00
if (pNode) break;
}
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&qset->mutex);
2020-04-11 14:28:58 +00:00
2021-10-29 09:11:15 +00:00
return code;
2020-03-07 14:57:25 +00:00
}
2022-01-25 09:14:24 +00:00
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) {
2020-03-07 14:57:25 +00:00
STaosQueue *queue;
int32_t code = 0;
2020-03-07 14:57:25 +00:00
tsem_wait(&qset->sem);
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&qset->mutex);
2020-04-11 14:28:58 +00:00
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
if (qset->current == NULL) qset->current = qset->head;
2020-03-07 14:57:25 +00:00
queue = qset->current;
2020-03-08 02:28:30 +00:00
if (queue) qset->current = queue->next;
if (queue == NULL) break;
if (queue->head == NULL) continue;
2020-03-07 14:57:25 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
2020-03-07 14:57:25 +00:00
if (queue->head) {
qall->current = queue->head;
qall->start = queue->head;
qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize;
code = qall->numOfItems;
2021-10-29 09:11:15 +00:00
if (ahandle) *ahandle = queue->ahandle;
2021-11-01 11:49:44 +00:00
if (itemsFp) *itemsFp = queue->itemsFp;
2021-10-29 09:11:15 +00:00
queue->head = NULL;
queue->tail = NULL;
queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
2022-01-25 10:29:47 +00:00
for (int32_t j = 1; j < qall->numOfItems; ++j) {
tsem_wait(&qset->sem);
}
}
2020-03-07 14:57:25 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
2020-03-07 14:57:25 +00:00
if (code != 0) break;
2020-03-07 14:57:25 +00:00
}
2020-02-25 07:26:29 +00:00
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&qset->mutex);
2020-03-07 14:57:25 +00:00
return code;
}
2020-02-25 07:26:29 +00:00
2022-01-25 09:14:24 +00:00
void taosResetQsetThread(STaosQset *qset, void *pItem) {
if (pItem == NULL) return;
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&qset->mutex);
2022-01-25 10:29:47 +00:00
for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
tsem_post(&qset->sem);
}
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&qset->mutex);
2022-01-25 09:14:24 +00:00
}
int32_t taosGetQueueItemsNumber(STaosQueue *queue) {
2021-07-15 05:03:25 +00:00
if (!queue) return 0;
int32_t num;
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&queue->mutex);
2021-07-15 05:03:25 +00:00
num = queue->numOfItems;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&queue->mutex);
2021-07-15 05:03:25 +00:00
return num;
2020-02-25 07:26:29 +00:00
}
int32_t taosGetQsetItemsNumber(STaosQset *qset) {
2021-07-15 05:03:25 +00:00
if (!qset) return 0;
int32_t num = 0;
2022-03-19 16:47:45 +00:00
taosThreadMutexLock(&qset->mutex);
2021-07-15 05:03:25 +00:00
num = qset->numOfItems;
2022-03-19 16:47:45 +00:00
taosThreadMutexUnlock(&qset->mutex);
2021-07-15 05:03:25 +00:00
return num;
2020-03-07 14:57:25 +00:00
}