mirror of
https://github.com/taosdata/TDengine
synced 2026-05-24 10:09:01 +00:00
1102 lines
39 KiB
C++
1102 lines
39 KiB
C++
|
|
/**
|
|||
|
|
* Unit tests for SQueryAutoQWorkerPool self-healing mechanisms.
|
|||
|
|
*
|
|||
|
|
* Tests verify that the worker pool can:
|
|||
|
|
* 1. Prevent running counter from going negative (atomicSafeDecRunning)
|
|||
|
|
* 2. Detect and correct negative running counters (healRunning)
|
|||
|
|
* 3. Survive cross-pool beforeBlocking/afterRecoverFromBlocking abuse
|
|||
|
|
* 4. Continue processing messages after self-healing
|
|||
|
|
*/
|
|||
|
|
|
|||
|
|
#include <gtest/gtest.h>
|
|||
|
|
#include <atomic>
|
|||
|
|
#include <chrono>
|
|||
|
|
#include <thread>
|
|||
|
|
#include <vector>
|
|||
|
|
|
|||
|
|
extern "C" {
|
|||
|
|
#include "os.h"
|
|||
|
|
#include "tqueue.h"
|
|||
|
|
#include "tworker.h"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
extern int64_t tsQueueMemoryAllowed;
|
|||
|
|
|
|||
|
|
#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32)
|
|||
|
|
#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF)
|
|||
|
|
#define MAKE_ACTIVE_RUNNING(active, running) (((int64_t)(active) << 32) | ((int64_t)(uint32_t)(running)))
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test fixture
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
class QueryAutoQWorkerTest : public ::testing::Test {
|
|||
|
|
public:
|
|||
|
|
SQueryAutoQWorkerPool pool{};
|
|||
|
|
STaosQueue *queue{nullptr};
|
|||
|
|
std::atomic<int32_t> processedCount{0};
|
|||
|
|
std::atomic<int32_t> blockingCallCount{0};
|
|||
|
|
|
|||
|
|
void SetUp() override {
|
|||
|
|
tsQueueMemoryAllowed = 1024 * 1024;
|
|||
|
|
memset(&pool, 0, sizeof(pool));
|
|||
|
|
pool.num = 4;
|
|||
|
|
pool.max = 8;
|
|||
|
|
pool.min = 2;
|
|||
|
|
pool.name = "test-worker";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void TearDown() override {
|
|||
|
|
// Cleanup is done per-test where pool was initialized
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Initialize pool and allocate queue with given FItem
|
|||
|
|
bool initPool(FItem fp, void *ahandle = nullptr) {
|
|||
|
|
int32_t code = tQueryAutoQWorkerInit(&pool);
|
|||
|
|
if (code != TSDB_CODE_SUCCESS) return false;
|
|||
|
|
queue = tQueryAutoQWorkerAllocQueue(&pool, ahandle ? ahandle : (void *)this, fp);
|
|||
|
|
return queue != nullptr;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
void cleanupPool() { tQueryAutoQWorkerCleanup(&pool); }
|
|||
|
|
|
|||
|
|
// Submit N messages to the pool's queue
|
|||
|
|
bool submitMessages(int32_t count) {
|
|||
|
|
for (int32_t i = 0; i < count; ++i) {
|
|||
|
|
void *qitem = nullptr;
|
|||
|
|
int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
|
|||
|
|
if (code != 0) return false;
|
|||
|
|
*(int32_t *)qitem = i;
|
|||
|
|
code = taosWriteQitem(queue, qitem);
|
|||
|
|
if (code != 0) return false;
|
|||
|
|
}
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Wait for processedCount to reach target, with timeout
|
|||
|
|
bool waitForProcessed(int32_t target, int32_t timeoutMs = 5000) {
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
while (processedCount.load() < target) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
if (elapsed > timeoutMs) return false;
|
|||
|
|
}
|
|||
|
|
return true;
|
|||
|
|
}
|
|||
|
|
};
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Simple message processor for basic tests
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static void simpleProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
auto *self = (QueryAutoQWorkerTest *)pQInfo->ahandle;
|
|||
|
|
self->processedCount.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 1: beforeBlocking on correct pool doesn't go negative
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, BeforeBlockingCorrectPool) {
|
|||
|
|
ASSERT_TRUE(initPool(simpleProcessFp));
|
|||
|
|
|
|||
|
|
// Let pool start workers by submitting messages
|
|||
|
|
ASSERT_TRUE(submitMessages(10));
|
|||
|
|
ASSERT_TRUE(waitForProcessed(10));
|
|||
|
|
|
|||
|
|
// Directly call beforeBlocking on the correct pool
|
|||
|
|
// This simulates a normal blocking operation
|
|||
|
|
int32_t runningBefore = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
|
|||
|
|
int32_t code = pool.pCb->beforeBlocking(pool.pCb->pPool);
|
|||
|
|
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
|||
|
|
|
|||
|
|
// running should not be negative
|
|||
|
|
int32_t runningAfter = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(runningAfter, 0) << "running went negative after beforeBlocking";
|
|||
|
|
|
|||
|
|
// Recover
|
|||
|
|
code = pool.pCb->afterRecoverFromBlocking(pool.pCb->pPool);
|
|||
|
|
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
|||
|
|
|
|||
|
|
int32_t runningRecovered = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(runningRecovered, 0) << "running went negative after recoverFromBlocking";
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 2: Repeated beforeBlocking without matching recovery (simulates cross-pool abuse)
|
|||
|
|
// Running counter must never go below 0
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, RepeatedBeforeBlockingFloorProtection) {
|
|||
|
|
ASSERT_TRUE(initPool(simpleProcessFp));
|
|||
|
|
|
|||
|
|
// Submit and wait for some messages to get the pool active
|
|||
|
|
ASSERT_TRUE(submitMessages(4));
|
|||
|
|
ASSERT_TRUE(waitForProcessed(4));
|
|||
|
|
|
|||
|
|
// Abuse: call beforeBlocking many more times than there are running threads
|
|||
|
|
// In the old code this would drive running to large negative values
|
|||
|
|
for (int32_t i = 0; i < 20; i++) {
|
|||
|
|
pool.pCb->beforeBlocking(pool.pCb->pPool);
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running went negative after " << (i + 1) << " beforeBlocking calls";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 3: Direct injection of negative running, verify healRunning via
|
|||
|
|
// afterRecoverFromBlocking
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, HealNegativeRunningViaRecover) {
|
|||
|
|
ASSERT_TRUE(initPool(simpleProcessFp));
|
|||
|
|
|
|||
|
|
// Inject negative running directly
|
|||
|
|
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
|
|||
|
|
pool.activeRunningN = MAKE_ACTIVE_RUNNING(active > 0 ? active : 1, -5);
|
|||
|
|
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_EQ(running, -5) << "injection failed";
|
|||
|
|
|
|||
|
|
// afterRecoverFromBlocking should trigger healRunning
|
|||
|
|
int32_t code = pool.pCb->afterRecoverFromBlocking(pool.pCb->pPool);
|
|||
|
|
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
|||
|
|
|
|||
|
|
running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "healRunning failed to correct negative running";
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 4: Direct injection of negative running, verify pool can still
|
|||
|
|
// process messages (end-to-end self-healing)
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static void blockingProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
auto *self = (QueryAutoQWorkerTest *)pQInfo->ahandle;
|
|||
|
|
|
|||
|
|
// Simulate a blocking operation using the pool's callbacks
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
// Simulate some I/O wait
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
self->processedCount.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, PoolRecoverAfterNegativeRunningInjection) {
|
|||
|
|
ASSERT_TRUE(initPool(blockingProcessFp));
|
|||
|
|
|
|||
|
|
// Process some messages normally first
|
|||
|
|
ASSERT_TRUE(submitMessages(8));
|
|||
|
|
ASSERT_TRUE(waitForProcessed(8));
|
|||
|
|
|
|||
|
|
int32_t baseline = processedCount.load();
|
|||
|
|
|
|||
|
|
// Inject anomalous negative running
|
|||
|
|
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
|
|||
|
|
pool.activeRunningN = MAKE_ACTIVE_RUNNING(active > 0 ? active : pool.num, -3);
|
|||
|
|
|
|||
|
|
// Submit more messages - pool should self-heal and process them
|
|||
|
|
ASSERT_TRUE(submitMessages(8));
|
|||
|
|
ASSERT_TRUE(waitForProcessed(baseline + 8, 10000))
|
|||
|
|
<< "Pool failed to recover: only processed " << processedCount.load()
|
|||
|
|
<< " of expected " << (baseline + 8);
|
|||
|
|
|
|||
|
|
// Verify running is non-negative
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running still negative after self-healing";
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 5: Concurrent cross-pool abuse simulation
|
|||
|
|
// Multiple threads call beforeBlocking on a pool that's not theirs
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, ConcurrentCrossPoolAbuse) {
|
|||
|
|
ASSERT_TRUE(initPool(simpleProcessFp));
|
|||
|
|
|
|||
|
|
// Start workers
|
|||
|
|
ASSERT_TRUE(submitMessages(4));
|
|||
|
|
ASSERT_TRUE(waitForProcessed(4));
|
|||
|
|
|
|||
|
|
// Simulate concurrent cross-pool abuse: multiple threads calling
|
|||
|
|
// beforeBlocking without matching afterRecoverFromBlocking
|
|||
|
|
std::vector<std::thread> abusers;
|
|||
|
|
std::atomic<int32_t> negativeDetected{0};
|
|||
|
|
|
|||
|
|
for (int i = 0; i < 8; i++) {
|
|||
|
|
abusers.emplace_back([this, &negativeDetected]() {
|
|||
|
|
for (int j = 0; j < 50; j++) {
|
|||
|
|
pool.pCb->beforeBlocking(pool.pCb->pPool);
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
if (running < 0) {
|
|||
|
|
negativeDetected.fetch_add(1);
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for (auto &t : abusers) t.join();
|
|||
|
|
|
|||
|
|
// With self-healing, running should never have gone negative
|
|||
|
|
ASSERT_EQ(negativeDetected.load(), 0)
|
|||
|
|
<< "running went negative " << negativeDetected.load() << " times during concurrent abuse";
|
|||
|
|
|
|||
|
|
int32_t finalRunning = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(finalRunning, 0) << "final running is negative";
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 6: Paired beforeBlocking/afterRecoverFromBlocking maintains invariant
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, PairedBlockingRecoveryMaintainsInvariant) {
|
|||
|
|
ASSERT_TRUE(initPool(simpleProcessFp));
|
|||
|
|
|
|||
|
|
ASSERT_TRUE(submitMessages(4));
|
|||
|
|
ASSERT_TRUE(waitForProcessed(4));
|
|||
|
|
|
|||
|
|
// Properly paired blocking/recovery cycles
|
|||
|
|
for (int i = 0; i < 20; i++) {
|
|||
|
|
int32_t code = pool.pCb->beforeBlocking(pool.pCb->pPool);
|
|||
|
|
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
|||
|
|
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running negative after beforeBlocking, iteration " << i;
|
|||
|
|
|
|||
|
|
code = pool.pCb->afterRecoverFromBlocking(pool.pCb->pPool);
|
|||
|
|
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
|
|||
|
|
|
|||
|
|
running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running negative after recovery, iteration " << i;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 7: Two pools - verify cross-pool callback doesn't corrupt either pool
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int32_t> g_pool1Processed{0};
|
|||
|
|
static std::atomic<int32_t> g_pool2Processed{0};
|
|||
|
|
|
|||
|
|
static void pool1ProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
g_pool1Processed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
static void pool2ProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
g_pool2Processed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, TwoPoolsCrossCallbackProtection) {
|
|||
|
|
g_pool1Processed = 0;
|
|||
|
|
g_pool2Processed = 0;
|
|||
|
|
|
|||
|
|
// Pool 1 (uses the fixture pool)
|
|||
|
|
ASSERT_TRUE(initPool(pool1ProcessFp));
|
|||
|
|
|
|||
|
|
// Pool 2
|
|||
|
|
SQueryAutoQWorkerPool pool2{};
|
|||
|
|
memset(&pool2, 0, sizeof(pool2));
|
|||
|
|
pool2.num = 4;
|
|||
|
|
pool2.max = 8;
|
|||
|
|
pool2.min = 2;
|
|||
|
|
pool2.name = "test-worker-2";
|
|||
|
|
ASSERT_EQ(tQueryAutoQWorkerInit(&pool2), TSDB_CODE_SUCCESS);
|
|||
|
|
STaosQueue *q2 = tQueryAutoQWorkerAllocQueue(&pool2, nullptr, pool2ProcessFp);
|
|||
|
|
ASSERT_NE(q2, nullptr);
|
|||
|
|
|
|||
|
|
// Submit messages to both pools
|
|||
|
|
ASSERT_TRUE(submitMessages(10));
|
|||
|
|
|
|||
|
|
// Submit to pool2
|
|||
|
|
for (int32_t i = 0; i < 10; i++) {
|
|||
|
|
void *qitem = nullptr;
|
|||
|
|
ASSERT_EQ(taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem), 0);
|
|||
|
|
*(int32_t *)qitem = i;
|
|||
|
|
ASSERT_EQ(taosWriteQitem(q2, qitem), 0);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Wait for messages to be processed
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
while (g_pool1Processed.load() < 10 || g_pool2Processed.load() < 10) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
if (elapsed > 5000) break;
|
|||
|
|
}
|
|||
|
|
ASSERT_GE(g_pool1Processed.load(), 10);
|
|||
|
|
ASSERT_GE(g_pool2Processed.load(), 10);
|
|||
|
|
|
|||
|
|
// Cross-pool abuse: call pool1's beforeBlocking with pool2's pPool
|
|||
|
|
// This is the exact scenario that caused the original deadlock
|
|||
|
|
for (int i = 0; i < 10; i++) {
|
|||
|
|
pool.pCb->beforeBlocking(pool2.pCb->pPool); // pool1's cb on pool2's data
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Pool2's running should not go negative
|
|||
|
|
int32_t pool2Running = GET_RUNNING_N(pool2.activeRunningN);
|
|||
|
|
ASSERT_GE(pool2Running, 0) << "pool2 running went negative from cross-pool abuse";
|
|||
|
|
|
|||
|
|
// Pool1's running should be unaffected
|
|||
|
|
int32_t pool1Running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(pool1Running, 0) << "pool1 running went negative";
|
|||
|
|
|
|||
|
|
// Both pools should still be able to process messages
|
|||
|
|
g_pool1Processed = 0;
|
|||
|
|
g_pool2Processed = 0;
|
|||
|
|
|
|||
|
|
ASSERT_TRUE(submitMessages(5));
|
|||
|
|
for (int32_t i = 0; i < 5; i++) {
|
|||
|
|
void *qitem = nullptr;
|
|||
|
|
ASSERT_EQ(taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem), 0);
|
|||
|
|
*(int32_t *)qitem = i;
|
|||
|
|
ASSERT_EQ(taosWriteQitem(q2, qitem), 0);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
start = std::chrono::steady_clock::now();
|
|||
|
|
while (g_pool1Processed.load() < 5 || g_pool2Processed.load() < 5) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
if (elapsed > 10000) break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ASSERT_GE(g_pool1Processed.load(), 5) << "pool1 stopped processing after cross-pool abuse";
|
|||
|
|
ASSERT_GE(g_pool2Processed.load(), 5) << "pool2 stopped processing after cross-pool abuse";
|
|||
|
|
|
|||
|
|
tQueryAutoQWorkerCleanup(&pool2);
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 8: activeRunningN bit-packing correctness
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, ActiveRunningBitPacking) {
|
|||
|
|
// Verify the bit-packing macros work correctly
|
|||
|
|
int64_t val;
|
|||
|
|
|
|||
|
|
// Case 1: Both positive
|
|||
|
|
val = MAKE_ACTIVE_RUNNING(5, 3);
|
|||
|
|
ASSERT_EQ(GET_ACTIVE_N(val), 5);
|
|||
|
|
ASSERT_EQ(GET_RUNNING_N(val), 3);
|
|||
|
|
|
|||
|
|
// Case 2: active=0, running=0
|
|||
|
|
val = MAKE_ACTIVE_RUNNING(0, 0);
|
|||
|
|
ASSERT_EQ(GET_ACTIVE_N(val), 0);
|
|||
|
|
ASSERT_EQ(GET_RUNNING_N(val), 0);
|
|||
|
|
|
|||
|
|
// Case 3: Negative running (what happens when bug corrupts it)
|
|||
|
|
// -1 in 32-bit = 0xFFFFFFFF, but GET_RUNNING_N casts to int32_t
|
|||
|
|
val = MAKE_ACTIVE_RUNNING(4, -1);
|
|||
|
|
int32_t running = GET_RUNNING_N(val);
|
|||
|
|
ASSERT_EQ(running, -1) << "negative running encoding/decoding mismatch";
|
|||
|
|
|
|||
|
|
// Case 4: Large values
|
|||
|
|
val = MAKE_ACTIVE_RUNNING(100, 50);
|
|||
|
|
ASSERT_EQ(GET_ACTIVE_N(val), 100);
|
|||
|
|
ASSERT_EQ(GET_RUNNING_N(val), 50);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 9: Stress test - heavy concurrent blocking/recovery with fault injection
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int32_t> g_stressProcessed{0};
|
|||
|
|
|
|||
|
|
static void stressProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
g_stressProcessed.fetch_add(1);
|
|||
|
|
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, StressConcurrentBlockingWithFaultInjection) {
|
|||
|
|
g_stressProcessed = 0;
|
|||
|
|
|
|||
|
|
ASSERT_TRUE(initPool(stressProcessFp));
|
|||
|
|
|
|||
|
|
const int32_t totalMsgs = 100;
|
|||
|
|
|
|||
|
|
// Submit messages
|
|||
|
|
ASSERT_TRUE(submitMessages(totalMsgs));
|
|||
|
|
|
|||
|
|
// Concurrently inject faults while messages are being processed
|
|||
|
|
std::atomic<bool> stopFaults{false};
|
|||
|
|
std::thread faultInjector([this, &stopFaults]() {
|
|||
|
|
int iteration = 0;
|
|||
|
|
while (!stopFaults.load()) {
|
|||
|
|
if (iteration % 10 == 0) {
|
|||
|
|
// Inject negative running
|
|||
|
|
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
|
|||
|
|
if (active <= 0) active = pool.num;
|
|||
|
|
pool.activeRunningN = MAKE_ACTIVE_RUNNING(active, -2);
|
|||
|
|
}
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
|||
|
|
iteration++;
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
// Wait for all messages to be processed
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
while (g_stressProcessed.load() < totalMsgs) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
if (elapsed > 30000) break; // 30s timeout
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
stopFaults = true;
|
|||
|
|
faultInjector.join();
|
|||
|
|
|
|||
|
|
ASSERT_GE(g_stressProcessed.load(), totalMsgs)
|
|||
|
|
<< "Pool failed to process all messages under fault injection: "
|
|||
|
|
<< g_stressProcessed.load() << "/" << totalMsgs;
|
|||
|
|
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running still negative after stress test";
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 10: Real-usage happy path — every message does beforeBlocking +
|
|||
|
|
// afterRecoverFromBlocking inside the worker thread (no fault injection)
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int32_t> g_happyProcessed{0};
|
|||
|
|
|
|||
|
|
static void happyBlockingProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
int32_t code = cb->beforeBlocking(cb->pPool);
|
|||
|
|
EXPECT_EQ(code, TSDB_CODE_SUCCESS);
|
|||
|
|
// Simulate I/O blocking (RPC wait, disk read, etc.)
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(2));
|
|||
|
|
code = cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
EXPECT_EQ(code, TSDB_CODE_SUCCESS);
|
|||
|
|
}
|
|||
|
|
g_happyProcessed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, RealUsageBlockingWorkload) {
|
|||
|
|
g_happyProcessed = 0;
|
|||
|
|
|
|||
|
|
pool.min = 4;
|
|||
|
|
pool.num = 4;
|
|||
|
|
pool.max = 32;
|
|||
|
|
ASSERT_TRUE(initPool(happyBlockingProcessFp));
|
|||
|
|
|
|||
|
|
const int32_t totalMsgs = 2000;
|
|||
|
|
ASSERT_TRUE(submitMessages(totalMsgs));
|
|||
|
|
|
|||
|
|
// Wait for all messages
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
while (g_happyProcessed.load() < totalMsgs) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
ASSERT_LT(elapsed, 60000) << "timeout: processed " << g_happyProcessed.load()
|
|||
|
|
<< "/" << totalMsgs;
|
|||
|
|
// Invariant: running must always be >= 0
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running went negative during normal workload";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ASSERT_EQ(g_happyProcessed.load(), totalMsgs);
|
|||
|
|
|
|||
|
|
// Final state check
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0);
|
|||
|
|
ASSERT_GE(active, 0);
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 11: Sustained workload — multiple rounds of submit + drain, simulating
|
|||
|
|
// a long-running service that continuously processes queries
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int32_t> g_sustainedProcessed{0};
|
|||
|
|
|
|||
|
|
static void sustainedProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
std::this_thread::sleep_for(std::chrono::microseconds(500));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
}
|
|||
|
|
g_sustainedProcessed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, SustainedMultiRoundWorkload) {
|
|||
|
|
g_sustainedProcessed = 0;
|
|||
|
|
|
|||
|
|
pool.min = 4;
|
|||
|
|
pool.num = 4;
|
|||
|
|
pool.max = 32;
|
|||
|
|
ASSERT_TRUE(initPool(sustainedProcessFp));
|
|||
|
|
|
|||
|
|
const int32_t rounds = 50;
|
|||
|
|
const int32_t msgsPerRound = 100;
|
|||
|
|
|
|||
|
|
for (int32_t r = 0; r < rounds; ++r) {
|
|||
|
|
int32_t baseline = g_sustainedProcessed.load();
|
|||
|
|
ASSERT_TRUE(submitMessages(msgsPerRound))
|
|||
|
|
<< "failed to submit messages in round " << r;
|
|||
|
|
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
while (g_sustainedProcessed.load() < baseline + msgsPerRound) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
ASSERT_LT(elapsed, 10000)
|
|||
|
|
<< "round " << r << " timeout: processed "
|
|||
|
|
<< (g_sustainedProcessed.load() - baseline) << "/" << msgsPerRound;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Between rounds: invariant must hold
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running negative after round " << r;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ASSERT_EQ(g_sustainedProcessed.load(), rounds * msgsPerRound);
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 12: Concurrent producers — multiple threads submit messages while
|
|||
|
|
// workers are actively processing with blocking
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int32_t> g_concurrentProducerProcessed{0};
|
|||
|
|
|
|||
|
|
static void concurrentProducerProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
std::this_thread::sleep_for(std::chrono::microseconds(200));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
}
|
|||
|
|
g_concurrentProducerProcessed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, ConcurrentProducersWithBlockingWorkers) {
|
|||
|
|
g_concurrentProducerProcessed = 0;
|
|||
|
|
|
|||
|
|
pool.min = 4;
|
|||
|
|
pool.num = 4;
|
|||
|
|
pool.max = 64;
|
|||
|
|
ASSERT_TRUE(initPool(concurrentProducerProcessFp));
|
|||
|
|
|
|||
|
|
const int32_t numProducers = 16;
|
|||
|
|
const int32_t msgsPerProducer = 500;
|
|||
|
|
const int32_t totalMsgs = numProducers * msgsPerProducer;
|
|||
|
|
|
|||
|
|
// Launch producer threads
|
|||
|
|
std::vector<std::thread> producers;
|
|||
|
|
std::atomic<int32_t> submitFailures{0};
|
|||
|
|
for (int p = 0; p < numProducers; ++p) {
|
|||
|
|
producers.emplace_back([this, msgsPerProducer, &submitFailures]() {
|
|||
|
|
for (int32_t i = 0; i < msgsPerProducer; ++i) {
|
|||
|
|
void *qitem = nullptr;
|
|||
|
|
int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
|
|||
|
|
if (code != 0) { submitFailures.fetch_add(1); continue; }
|
|||
|
|
*(int32_t *)qitem = i;
|
|||
|
|
code = taosWriteQitem(queue, qitem);
|
|||
|
|
if (code != 0) { submitFailures.fetch_add(1); taosFreeQitem(qitem); }
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for (auto &t : producers) t.join();
|
|||
|
|
ASSERT_EQ(submitFailures.load(), 0) << "some messages failed to submit";
|
|||
|
|
|
|||
|
|
// Wait for all messages to be processed
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
while (g_concurrentProducerProcessed.load() < totalMsgs) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(20));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
ASSERT_LT(elapsed, 120000)
|
|||
|
|
<< "timeout: processed " << g_concurrentProducerProcessed.load()
|
|||
|
|
<< "/" << totalMsgs;
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running went negative during concurrent produce";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ASSERT_EQ(g_concurrentProducerProcessed.load(), totalMsgs);
|
|||
|
|
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0);
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 13: Pool scaling — start with min workers, submit burst to trigger
|
|||
|
|
// dynamic scaling, verify pool grows and all messages are processed
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int32_t> g_scalingProcessed{0};
|
|||
|
|
|
|||
|
|
static void scalingProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
// Long blocking to force pool to scale up
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
}
|
|||
|
|
g_scalingProcessed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, PoolScalingUnderLoad) {
|
|||
|
|
g_scalingProcessed = 0;
|
|||
|
|
|
|||
|
|
pool.min = 2;
|
|||
|
|
pool.num = 2; // start small
|
|||
|
|
pool.max = 64; // allow significant scaling
|
|||
|
|
ASSERT_TRUE(initPool(scalingProcessFp));
|
|||
|
|
|
|||
|
|
// Submit a burst of long-blocking messages to force worker scaling
|
|||
|
|
const int32_t burstSize = 200;
|
|||
|
|
ASSERT_TRUE(submitMessages(burstSize));
|
|||
|
|
|
|||
|
|
// Wait for processing
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
while (g_scalingProcessed.load() < burstSize) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
ASSERT_LT(elapsed, 30000)
|
|||
|
|
<< "timeout: processed " << g_scalingProcessed.load() << "/" << burstSize;
|
|||
|
|
|
|||
|
|
// running must stay non-negative during scaling
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running went negative during pool scaling";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ASSERT_EQ(g_scalingProcessed.load(), burstSize);
|
|||
|
|
|
|||
|
|
// Pool should have scaled beyond initial num=2
|
|||
|
|
ASSERT_GT(pool.num, 2) << "pool did not scale up under load";
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 14: Mixed blocking and non-blocking — some messages block, others don't.
|
|||
|
|
// Verifies the pool handles mixed workloads correctly.
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int32_t> g_mixedProcessed{0};
|
|||
|
|
|
|||
|
|
static void mixedProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
int32_t val = *(int32_t *)pMsg;
|
|||
|
|
// Even-numbered messages block, odd-numbered ones don't
|
|||
|
|
if (val % 2 == 0 && pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(3));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
}
|
|||
|
|
g_mixedProcessed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, MixedBlockingAndNonBlocking) {
|
|||
|
|
g_mixedProcessed = 0;
|
|||
|
|
|
|||
|
|
pool.min = 4;
|
|||
|
|
pool.num = 4;
|
|||
|
|
pool.max = 32;
|
|||
|
|
ASSERT_TRUE(initPool(mixedProcessFp));
|
|||
|
|
|
|||
|
|
const int32_t totalMsgs = 2000;
|
|||
|
|
ASSERT_TRUE(submitMessages(totalMsgs));
|
|||
|
|
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
while (g_mixedProcessed.load() < totalMsgs) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
ASSERT_LT(elapsed, 60000)
|
|||
|
|
<< "timeout: processed " << g_mixedProcessed.load() << "/" << totalMsgs;
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running went negative during mixed workload";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ASSERT_EQ(g_mixedProcessed.load(), totalMsgs);
|
|||
|
|
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0);
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 15: Two independent pools processing concurrently — simulates the real
|
|||
|
|
// mnode scenario with queryWorker and mqueryWorker both active
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int32_t> g_dualQueryProcessed{0};
|
|||
|
|
static std::atomic<int32_t> g_dualMqueryProcessed{0};
|
|||
|
|
|
|||
|
|
static void dualQueryProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
std::this_thread::sleep_for(std::chrono::microseconds(500));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
}
|
|||
|
|
g_dualQueryProcessed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
static void dualMqueryProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
// mquery tasks are heavier (merge)
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(5));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
}
|
|||
|
|
g_dualMqueryProcessed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, DualPoolConcurrentProcessing) {
|
|||
|
|
g_dualQueryProcessed = 0;
|
|||
|
|
g_dualMqueryProcessed = 0;
|
|||
|
|
|
|||
|
|
// Pool 1: queryWorker (uses fixture pool, larger)
|
|||
|
|
pool.min = 4;
|
|||
|
|
pool.num = 4;
|
|||
|
|
pool.max = 16;
|
|||
|
|
pool.name = "mnode-query";
|
|||
|
|
ASSERT_TRUE(initPool(dualQueryProcessFp));
|
|||
|
|
|
|||
|
|
// Pool 2: mqueryWorker (smaller, like real mnode)
|
|||
|
|
SQueryAutoQWorkerPool mqueryPool{};
|
|||
|
|
memset(&mqueryPool, 0, sizeof(mqueryPool));
|
|||
|
|
mqueryPool.min = 2;
|
|||
|
|
mqueryPool.num = 2;
|
|||
|
|
mqueryPool.max = 4;
|
|||
|
|
mqueryPool.name = "mnode-mquery";
|
|||
|
|
ASSERT_EQ(tQueryAutoQWorkerInit(&mqueryPool), TSDB_CODE_SUCCESS);
|
|||
|
|
STaosQueue *mq = tQueryAutoQWorkerAllocQueue(&mqueryPool, nullptr, dualMqueryProcessFp);
|
|||
|
|
ASSERT_NE(mq, nullptr);
|
|||
|
|
|
|||
|
|
// Submit messages to both pools concurrently
|
|||
|
|
const int32_t queryMsgs = 1000;
|
|||
|
|
const int32_t mqueryMsgs = 300;
|
|||
|
|
|
|||
|
|
std::thread queryProducer([this, queryMsgs]() {
|
|||
|
|
for (int32_t i = 0; i < queryMsgs; ++i) {
|
|||
|
|
void *qitem = nullptr;
|
|||
|
|
(void)taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
|
|||
|
|
*(int32_t *)qitem = i;
|
|||
|
|
(void)taosWriteQitem(queue, qitem);
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
std::thread mqueryProducer([mq, mqueryMsgs]() {
|
|||
|
|
for (int32_t i = 0; i < mqueryMsgs; ++i) {
|
|||
|
|
void *qitem = nullptr;
|
|||
|
|
(void)taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
|
|||
|
|
*(int32_t *)qitem = i;
|
|||
|
|
(void)taosWriteQitem(mq, qitem);
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
|
|||
|
|
queryProducer.join();
|
|||
|
|
mqueryProducer.join();
|
|||
|
|
|
|||
|
|
// Wait for both pools to finish
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
while (g_dualQueryProcessed.load() < queryMsgs ||
|
|||
|
|
g_dualMqueryProcessed.load() < mqueryMsgs) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(20));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
ASSERT_LT(elapsed, 30000)
|
|||
|
|
<< "timeout: query=" << g_dualQueryProcessed.load() << "/" << queryMsgs
|
|||
|
|
<< " mquery=" << g_dualMqueryProcessed.load() << "/" << mqueryMsgs;
|
|||
|
|
|
|||
|
|
// Neither pool should have negative running
|
|||
|
|
int32_t r1 = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
int32_t r2 = GET_RUNNING_N(mqueryPool.activeRunningN);
|
|||
|
|
ASSERT_GE(r1, 0) << "queryPool running negative";
|
|||
|
|
ASSERT_GE(r2, 0) << "mqueryPool running negative";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ASSERT_EQ(g_dualQueryProcessed.load(), queryMsgs);
|
|||
|
|
ASSERT_EQ(g_dualMqueryProcessed.load(), mqueryMsgs);
|
|||
|
|
|
|||
|
|
tQueryAutoQWorkerCleanup(&mqueryPool);
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 16: Heavy stress — 32 producer threads × 1000 messages each = 32000 msgs,
|
|||
|
|
// pool max=128 workers, every message does blocking I/O.
|
|||
|
|
// Runs for extended time to expose race conditions.
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int32_t> g_heavyProcessed{0};
|
|||
|
|
static std::atomic<int32_t> g_heavyMinRunning{INT32_MAX};
|
|||
|
|
|
|||
|
|
static void heavyStressProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
// Variable blocking time to create diverse scheduling patterns
|
|||
|
|
int32_t val = *(int32_t *)pMsg;
|
|||
|
|
int32_t delayUs = 50 + (val % 200); // 50-250us
|
|||
|
|
std::this_thread::sleep_for(std::chrono::microseconds(delayUs));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
}
|
|||
|
|
g_heavyProcessed.fetch_add(1);
|
|||
|
|
|
|||
|
|
// Track minimum running observed by any worker
|
|||
|
|
auto *pool = (SQueryAutoQWorkerPool *)((SQueryAutoQWorkerPoolCB *)pQInfo->workerCb)->pPool;
|
|||
|
|
int32_t running = GET_RUNNING_N(pool->activeRunningN);
|
|||
|
|
int32_t curMin = g_heavyMinRunning.load();
|
|||
|
|
while (running < curMin) {
|
|||
|
|
if (g_heavyMinRunning.compare_exchange_weak(curMin, running)) break;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, HeavyStress32Producers) {
|
|||
|
|
g_heavyProcessed = 0;
|
|||
|
|
g_heavyMinRunning = INT32_MAX;
|
|||
|
|
|
|||
|
|
pool.min = 4;
|
|||
|
|
pool.num = 4;
|
|||
|
|
pool.max = 128;
|
|||
|
|
pool.name = "heavy-stress";
|
|||
|
|
ASSERT_TRUE(initPool(heavyStressProcessFp));
|
|||
|
|
|
|||
|
|
const int32_t numProducers = 32;
|
|||
|
|
const int32_t msgsPerProducer = 1000;
|
|||
|
|
const int32_t totalMsgs = numProducers * msgsPerProducer;
|
|||
|
|
|
|||
|
|
// Launch producers
|
|||
|
|
std::vector<std::thread> producers;
|
|||
|
|
std::atomic<int32_t> submitFailures{0};
|
|||
|
|
for (int p = 0; p < numProducers; ++p) {
|
|||
|
|
producers.emplace_back([this, msgsPerProducer, p, &submitFailures]() {
|
|||
|
|
for (int32_t i = 0; i < msgsPerProducer; ++i) {
|
|||
|
|
void *qitem = nullptr;
|
|||
|
|
int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
|
|||
|
|
if (code != 0) { submitFailures.fetch_add(1); continue; }
|
|||
|
|
*(int32_t *)qitem = p * msgsPerProducer + i;
|
|||
|
|
code = taosWriteQitem(queue, qitem);
|
|||
|
|
if (code != 0) { submitFailures.fetch_add(1); taosFreeQitem(qitem); }
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for (auto &t : producers) t.join();
|
|||
|
|
ASSERT_EQ(submitFailures.load(), 0) << "submit failures detected";
|
|||
|
|
|
|||
|
|
// Wait with continuous invariant checking
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
int64_t checkCount = 0;
|
|||
|
|
while (g_heavyProcessed.load() < totalMsgs) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|||
|
|
++checkCount;
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
ASSERT_LT(elapsed, 120000)
|
|||
|
|
<< "timeout after " << elapsed << "ms: processed "
|
|||
|
|
<< g_heavyProcessed.load() << "/" << totalMsgs;
|
|||
|
|
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running=" << running << " at check #" << checkCount;
|
|||
|
|
ASSERT_GE(active, 0) << "active=" << active << " at check #" << checkCount;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ASSERT_EQ(g_heavyProcessed.load(), totalMsgs);
|
|||
|
|
ASSERT_GE(g_heavyMinRunning.load(), 0)
|
|||
|
|
<< "minimum running observed by workers was " << g_heavyMinRunning.load();
|
|||
|
|
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
printf(" Heavy stress: %d msgs, %d producers, %d max workers, %" PRId64 "ms, "
|
|||
|
|
"%" PRId64 " invariant checks passed\n",
|
|||
|
|
totalMsgs, numProducers, pool.num, (int64_t)elapsed, (int64_t)checkCount);
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// ============================================================================
|
|||
|
|
// Test 17: Endurance — sustained load for >= 10 seconds with continuous
|
|||
|
|
// producer pressure. 16 producers keep submitting at steady rate.
|
|||
|
|
// Verifies no drift, no leak, no livelock over extended operation.
|
|||
|
|
// ============================================================================
|
|||
|
|
|
|||
|
|
static std::atomic<int64_t> g_enduranceProcessed{0};
|
|||
|
|
static std::atomic<int32_t> g_enduranceNegativeDetected{0};
|
|||
|
|
|
|||
|
|
static void enduranceProcessFp(SQueueInfo *pQInfo, void *pMsg) {
|
|||
|
|
if (pQInfo->workerCb) {
|
|||
|
|
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
|
|||
|
|
cb->beforeBlocking(cb->pPool);
|
|||
|
|
// Realistic RPC-like latency: 100-500us
|
|||
|
|
int32_t val = *(int32_t *)pMsg;
|
|||
|
|
std::this_thread::sleep_for(std::chrono::microseconds(100 + (val % 400)));
|
|||
|
|
cb->afterRecoverFromBlocking(cb->pPool);
|
|||
|
|
|
|||
|
|
auto *pool = (SQueryAutoQWorkerPool *)cb->pPool;
|
|||
|
|
int32_t running = GET_RUNNING_N(pool->activeRunningN);
|
|||
|
|
if (running < 0) g_enduranceNegativeDetected.fetch_add(1);
|
|||
|
|
}
|
|||
|
|
g_enduranceProcessed.fetch_add(1);
|
|||
|
|
taosFreeQitem(pMsg);
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
TEST_F(QueryAutoQWorkerTest, EnduranceSustainedLoad) {
|
|||
|
|
g_enduranceProcessed = 0;
|
|||
|
|
g_enduranceNegativeDetected = 0;
|
|||
|
|
|
|||
|
|
pool.min = 4;
|
|||
|
|
pool.num = 4;
|
|||
|
|
pool.max = 64;
|
|||
|
|
pool.name = "endurance";
|
|||
|
|
ASSERT_TRUE(initPool(enduranceProcessFp));
|
|||
|
|
|
|||
|
|
const int32_t numProducers = 16;
|
|||
|
|
const int64_t durationMs = 10000; // run for 10 seconds
|
|||
|
|
std::atomic<bool> stopProducers{false};
|
|||
|
|
std::atomic<int64_t> totalSubmitted{0};
|
|||
|
|
std::atomic<int32_t> submitFailures{0};
|
|||
|
|
|
|||
|
|
// Producers: each continuously submits messages until told to stop
|
|||
|
|
std::vector<std::thread> producers;
|
|||
|
|
for (int p = 0; p < numProducers; ++p) {
|
|||
|
|
producers.emplace_back([this, p, &stopProducers, &totalSubmitted, &submitFailures]() {
|
|||
|
|
int32_t seq = 0;
|
|||
|
|
while (!stopProducers.load()) {
|
|||
|
|
void *qitem = nullptr;
|
|||
|
|
int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
|
|||
|
|
if (code != 0) { submitFailures.fetch_add(1); continue; }
|
|||
|
|
*(int32_t *)qitem = p * 100000 + seq++;
|
|||
|
|
code = taosWriteQitem(queue, qitem);
|
|||
|
|
if (code != 0) {
|
|||
|
|
submitFailures.fetch_add(1);
|
|||
|
|
taosFreeQitem(qitem);
|
|||
|
|
continue;
|
|||
|
|
}
|
|||
|
|
totalSubmitted.fetch_add(1);
|
|||
|
|
// Steady-rate: ~1 msg every 100-300us per producer
|
|||
|
|
std::this_thread::sleep_for(std::chrono::microseconds(100 + (seq % 200)));
|
|||
|
|
}
|
|||
|
|
});
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Monitor: check invariants throughout the run
|
|||
|
|
auto start = std::chrono::steady_clock::now();
|
|||
|
|
int64_t checkCount = 0;
|
|||
|
|
while (true) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|||
|
|
++checkCount;
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
if (elapsed >= durationMs) break;
|
|||
|
|
|
|||
|
|
int32_t running = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(running, 0) << "running=" << running << " at " << elapsed << "ms";
|
|||
|
|
ASSERT_GE(active, 0) << "active=" << active << " at " << elapsed << "ms";
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
// Stop producers
|
|||
|
|
stopProducers = true;
|
|||
|
|
for (auto &t : producers) t.join();
|
|||
|
|
|
|||
|
|
int64_t submitted = totalSubmitted.load();
|
|||
|
|
ASSERT_GT(submitted, 0) << "no messages were submitted";
|
|||
|
|
|
|||
|
|
// Wait for remaining messages to drain
|
|||
|
|
auto drainStart = std::chrono::steady_clock::now();
|
|||
|
|
while (g_enduranceProcessed.load() < submitted) {
|
|||
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|||
|
|
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - drainStart)
|
|||
|
|
.count();
|
|||
|
|
ASSERT_LT(elapsed, 60000)
|
|||
|
|
<< "drain timeout: processed " << g_enduranceProcessed.load()
|
|||
|
|
<< "/" << submitted;
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
ASSERT_EQ(g_enduranceProcessed.load(), submitted);
|
|||
|
|
ASSERT_EQ(g_enduranceNegativeDetected.load(), 0)
|
|||
|
|
<< "negative running detected " << g_enduranceNegativeDetected.load()
|
|||
|
|
<< " times inside workers";
|
|||
|
|
ASSERT_EQ(submitFailures.load(), 0) << "submit failures: " << submitFailures.load();
|
|||
|
|
|
|||
|
|
int32_t finalRunning = GET_RUNNING_N(pool.activeRunningN);
|
|||
|
|
ASSERT_GE(finalRunning, 0);
|
|||
|
|
|
|||
|
|
auto totalElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
|
|||
|
|
std::chrono::steady_clock::now() - start)
|
|||
|
|
.count();
|
|||
|
|
printf(" Endurance: %" PRId64 " msgs submitted/processed, %d producers, "
|
|||
|
|
"%d max workers, %" PRId64 "ms total, %" PRId64 " invariant checks\n",
|
|||
|
|
submitted, numProducers, pool.num, (int64_t)totalElapsed, checkCount);
|
|||
|
|
|
|||
|
|
cleanupPool();
|
|||
|
|
}
|