TDengine/source/util/test/queryAutoQWorkerTest.cpp

1101 lines
39 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* Unit tests for SQueryAutoQWorkerPool self-healing mechanisms.
*
* Tests verify that the worker pool can:
* 1. Prevent running counter from going negative (atomicSafeDecRunning)
* 2. Detect and correct negative running counters (healRunning)
* 3. Survive cross-pool beforeBlocking/afterRecoverFromBlocking abuse
* 4. Continue processing messages after self-healing
*/
#include <gtest/gtest.h>
#include <atomic>
#include <chrono>
#include <thread>
#include <vector>
extern "C" {
#include "os.h"
#include "tqueue.h"
#include "tworker.h"
}
extern int64_t tsQueueMemoryAllowed;
#define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32)
#define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF)
#define MAKE_ACTIVE_RUNNING(active, running) (((int64_t)(active) << 32) | ((int64_t)(uint32_t)(running)))
// ============================================================================
// Test fixture
// ============================================================================
class QueryAutoQWorkerTest : public ::testing::Test {
public:
SQueryAutoQWorkerPool pool{};
STaosQueue *queue{nullptr};
std::atomic<int32_t> processedCount{0};
std::atomic<int32_t> blockingCallCount{0};
void SetUp() override {
tsQueueMemoryAllowed = 1024 * 1024;
memset(&pool, 0, sizeof(pool));
pool.num = 4;
pool.max = 8;
pool.min = 2;
pool.name = "test-worker";
}
void TearDown() override {
// Cleanup is done per-test where pool was initialized
}
// Initialize pool and allocate queue with given FItem
bool initPool(FItem fp, void *ahandle = nullptr) {
int32_t code = tQueryAutoQWorkerInit(&pool);
if (code != TSDB_CODE_SUCCESS) return false;
queue = tQueryAutoQWorkerAllocQueue(&pool, ahandle ? ahandle : (void *)this, fp);
return queue != nullptr;
}
void cleanupPool() { tQueryAutoQWorkerCleanup(&pool); }
// Submit N messages to the pool's queue
bool submitMessages(int32_t count) {
for (int32_t i = 0; i < count; ++i) {
void *qitem = nullptr;
int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
if (code != 0) return false;
*(int32_t *)qitem = i;
code = taosWriteQitem(queue, qitem);
if (code != 0) return false;
}
return true;
}
// Wait for processedCount to reach target, with timeout
bool waitForProcessed(int32_t target, int32_t timeoutMs = 5000) {
auto start = std::chrono::steady_clock::now();
while (processedCount.load() < target) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
if (elapsed > timeoutMs) return false;
}
return true;
}
};
// ============================================================================
// Simple message processor for basic tests
// ============================================================================
static void simpleProcessFp(SQueueInfo *pQInfo, void *pMsg) {
auto *self = (QueryAutoQWorkerTest *)pQInfo->ahandle;
self->processedCount.fetch_add(1);
taosFreeQitem(pMsg);
}
// ============================================================================
// Test 1: beforeBlocking on correct pool doesn't go negative
// ============================================================================
TEST_F(QueryAutoQWorkerTest, BeforeBlockingCorrectPool) {
ASSERT_TRUE(initPool(simpleProcessFp));
// Let pool start workers by submitting messages
ASSERT_TRUE(submitMessages(10));
ASSERT_TRUE(waitForProcessed(10));
// Directly call beforeBlocking on the correct pool
// This simulates a normal blocking operation
int32_t runningBefore = GET_RUNNING_N(pool.activeRunningN);
int32_t code = pool.pCb->beforeBlocking(pool.pCb->pPool);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
// running should not be negative
int32_t runningAfter = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(runningAfter, 0) << "running went negative after beforeBlocking";
// Recover
code = pool.pCb->afterRecoverFromBlocking(pool.pCb->pPool);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
int32_t runningRecovered = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(runningRecovered, 0) << "running went negative after recoverFromBlocking";
cleanupPool();
}
// ============================================================================
// Test 2: Repeated beforeBlocking without matching recovery (simulates cross-pool abuse)
// Running counter must never go below 0
// ============================================================================
TEST_F(QueryAutoQWorkerTest, RepeatedBeforeBlockingFloorProtection) {
ASSERT_TRUE(initPool(simpleProcessFp));
// Submit and wait for some messages to get the pool active
ASSERT_TRUE(submitMessages(4));
ASSERT_TRUE(waitForProcessed(4));
// Abuse: call beforeBlocking many more times than there are running threads
// In the old code this would drive running to large negative values
for (int32_t i = 0; i < 20; i++) {
pool.pCb->beforeBlocking(pool.pCb->pPool);
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running went negative after " << (i + 1) << " beforeBlocking calls";
}
cleanupPool();
}
// ============================================================================
// Test 3: Direct injection of negative running, verify healRunning via
// afterRecoverFromBlocking
// ============================================================================
TEST_F(QueryAutoQWorkerTest, HealNegativeRunningViaRecover) {
ASSERT_TRUE(initPool(simpleProcessFp));
// Inject negative running directly
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
pool.activeRunningN = MAKE_ACTIVE_RUNNING(active > 0 ? active : 1, -5);
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_EQ(running, -5) << "injection failed";
// afterRecoverFromBlocking should trigger healRunning
int32_t code = pool.pCb->afterRecoverFromBlocking(pool.pCb->pPool);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "healRunning failed to correct negative running";
cleanupPool();
}
// ============================================================================
// Test 4: Direct injection of negative running, verify pool can still
// process messages (end-to-end self-healing)
// ============================================================================
static void blockingProcessFp(SQueueInfo *pQInfo, void *pMsg) {
auto *self = (QueryAutoQWorkerTest *)pQInfo->ahandle;
// Simulate a blocking operation using the pool's callbacks
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
// Simulate some I/O wait
std::this_thread::sleep_for(std::chrono::milliseconds(5));
cb->afterRecoverFromBlocking(cb->pPool);
}
self->processedCount.fetch_add(1);
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, PoolRecoverAfterNegativeRunningInjection) {
ASSERT_TRUE(initPool(blockingProcessFp));
// Process some messages normally first
ASSERT_TRUE(submitMessages(8));
ASSERT_TRUE(waitForProcessed(8));
int32_t baseline = processedCount.load();
// Inject anomalous negative running
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
pool.activeRunningN = MAKE_ACTIVE_RUNNING(active > 0 ? active : pool.num, -3);
// Submit more messages - pool should self-heal and process them
ASSERT_TRUE(submitMessages(8));
ASSERT_TRUE(waitForProcessed(baseline + 8, 10000))
<< "Pool failed to recover: only processed " << processedCount.load()
<< " of expected " << (baseline + 8);
// Verify running is non-negative
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running still negative after self-healing";
cleanupPool();
}
// ============================================================================
// Test 5: Concurrent cross-pool abuse simulation
// Multiple threads call beforeBlocking on a pool that's not theirs
// ============================================================================
TEST_F(QueryAutoQWorkerTest, ConcurrentCrossPoolAbuse) {
ASSERT_TRUE(initPool(simpleProcessFp));
// Start workers
ASSERT_TRUE(submitMessages(4));
ASSERT_TRUE(waitForProcessed(4));
// Simulate concurrent cross-pool abuse: multiple threads calling
// beforeBlocking without matching afterRecoverFromBlocking
std::vector<std::thread> abusers;
std::atomic<int32_t> negativeDetected{0};
for (int i = 0; i < 8; i++) {
abusers.emplace_back([this, &negativeDetected]() {
for (int j = 0; j < 50; j++) {
pool.pCb->beforeBlocking(pool.pCb->pPool);
int32_t running = GET_RUNNING_N(pool.activeRunningN);
if (running < 0) {
negativeDetected.fetch_add(1);
}
}
});
}
for (auto &t : abusers) t.join();
// With self-healing, running should never have gone negative
ASSERT_EQ(negativeDetected.load(), 0)
<< "running went negative " << negativeDetected.load() << " times during concurrent abuse";
int32_t finalRunning = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(finalRunning, 0) << "final running is negative";
cleanupPool();
}
// ============================================================================
// Test 6: Paired beforeBlocking/afterRecoverFromBlocking maintains invariant
// ============================================================================
TEST_F(QueryAutoQWorkerTest, PairedBlockingRecoveryMaintainsInvariant) {
ASSERT_TRUE(initPool(simpleProcessFp));
ASSERT_TRUE(submitMessages(4));
ASSERT_TRUE(waitForProcessed(4));
// Properly paired blocking/recovery cycles
for (int i = 0; i < 20; i++) {
int32_t code = pool.pCb->beforeBlocking(pool.pCb->pPool);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running negative after beforeBlocking, iteration " << i;
code = pool.pCb->afterRecoverFromBlocking(pool.pCb->pPool);
ASSERT_EQ(code, TSDB_CODE_SUCCESS);
running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running negative after recovery, iteration " << i;
}
cleanupPool();
}
// ============================================================================
// Test 7: Two pools - verify cross-pool callback doesn't corrupt either pool
// ============================================================================
static std::atomic<int32_t> g_pool1Processed{0};
static std::atomic<int32_t> g_pool2Processed{0};
static void pool1ProcessFp(SQueueInfo *pQInfo, void *pMsg) {
g_pool1Processed.fetch_add(1);
taosFreeQitem(pMsg);
}
static void pool2ProcessFp(SQueueInfo *pQInfo, void *pMsg) {
g_pool2Processed.fetch_add(1);
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, TwoPoolsCrossCallbackProtection) {
g_pool1Processed = 0;
g_pool2Processed = 0;
// Pool 1 (uses the fixture pool)
ASSERT_TRUE(initPool(pool1ProcessFp));
// Pool 2
SQueryAutoQWorkerPool pool2{};
memset(&pool2, 0, sizeof(pool2));
pool2.num = 4;
pool2.max = 8;
pool2.min = 2;
pool2.name = "test-worker-2";
ASSERT_EQ(tQueryAutoQWorkerInit(&pool2), TSDB_CODE_SUCCESS);
STaosQueue *q2 = tQueryAutoQWorkerAllocQueue(&pool2, nullptr, pool2ProcessFp);
ASSERT_NE(q2, nullptr);
// Submit messages to both pools
ASSERT_TRUE(submitMessages(10));
// Submit to pool2
for (int32_t i = 0; i < 10; i++) {
void *qitem = nullptr;
ASSERT_EQ(taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem), 0);
*(int32_t *)qitem = i;
ASSERT_EQ(taosWriteQitem(q2, qitem), 0);
}
// Wait for messages to be processed
auto start = std::chrono::steady_clock::now();
while (g_pool1Processed.load() < 10 || g_pool2Processed.load() < 10) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
if (elapsed > 5000) break;
}
ASSERT_GE(g_pool1Processed.load(), 10);
ASSERT_GE(g_pool2Processed.load(), 10);
// Cross-pool abuse: call pool1's beforeBlocking with pool2's pPool
// This is the exact scenario that caused the original deadlock
for (int i = 0; i < 10; i++) {
pool.pCb->beforeBlocking(pool2.pCb->pPool); // pool1's cb on pool2's data
}
// Pool2's running should not go negative
int32_t pool2Running = GET_RUNNING_N(pool2.activeRunningN);
ASSERT_GE(pool2Running, 0) << "pool2 running went negative from cross-pool abuse";
// Pool1's running should be unaffected
int32_t pool1Running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(pool1Running, 0) << "pool1 running went negative";
// Both pools should still be able to process messages
g_pool1Processed = 0;
g_pool2Processed = 0;
ASSERT_TRUE(submitMessages(5));
for (int32_t i = 0; i < 5; i++) {
void *qitem = nullptr;
ASSERT_EQ(taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem), 0);
*(int32_t *)qitem = i;
ASSERT_EQ(taosWriteQitem(q2, qitem), 0);
}
start = std::chrono::steady_clock::now();
while (g_pool1Processed.load() < 5 || g_pool2Processed.load() < 5) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
if (elapsed > 10000) break;
}
ASSERT_GE(g_pool1Processed.load(), 5) << "pool1 stopped processing after cross-pool abuse";
ASSERT_GE(g_pool2Processed.load(), 5) << "pool2 stopped processing after cross-pool abuse";
tQueryAutoQWorkerCleanup(&pool2);
cleanupPool();
}
// ============================================================================
// Test 8: activeRunningN bit-packing correctness
// ============================================================================
TEST_F(QueryAutoQWorkerTest, ActiveRunningBitPacking) {
// Verify the bit-packing macros work correctly
int64_t val;
// Case 1: Both positive
val = MAKE_ACTIVE_RUNNING(5, 3);
ASSERT_EQ(GET_ACTIVE_N(val), 5);
ASSERT_EQ(GET_RUNNING_N(val), 3);
// Case 2: active=0, running=0
val = MAKE_ACTIVE_RUNNING(0, 0);
ASSERT_EQ(GET_ACTIVE_N(val), 0);
ASSERT_EQ(GET_RUNNING_N(val), 0);
// Case 3: Negative running (what happens when bug corrupts it)
// -1 in 32-bit = 0xFFFFFFFF, but GET_RUNNING_N casts to int32_t
val = MAKE_ACTIVE_RUNNING(4, -1);
int32_t running = GET_RUNNING_N(val);
ASSERT_EQ(running, -1) << "negative running encoding/decoding mismatch";
// Case 4: Large values
val = MAKE_ACTIVE_RUNNING(100, 50);
ASSERT_EQ(GET_ACTIVE_N(val), 100);
ASSERT_EQ(GET_RUNNING_N(val), 50);
}
// ============================================================================
// Test 9: Stress test - heavy concurrent blocking/recovery with fault injection
// ============================================================================
static std::atomic<int32_t> g_stressProcessed{0};
static void stressProcessFp(SQueueInfo *pQInfo, void *pMsg) {
g_stressProcessed.fetch_add(1);
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
std::this_thread::sleep_for(std::chrono::microseconds(100));
cb->afterRecoverFromBlocking(cb->pPool);
}
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, StressConcurrentBlockingWithFaultInjection) {
g_stressProcessed = 0;
ASSERT_TRUE(initPool(stressProcessFp));
const int32_t totalMsgs = 100;
// Submit messages
ASSERT_TRUE(submitMessages(totalMsgs));
// Concurrently inject faults while messages are being processed
std::atomic<bool> stopFaults{false};
std::thread faultInjector([this, &stopFaults]() {
int iteration = 0;
while (!stopFaults.load()) {
if (iteration % 10 == 0) {
// Inject negative running
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
if (active <= 0) active = pool.num;
pool.activeRunningN = MAKE_ACTIVE_RUNNING(active, -2);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
iteration++;
}
});
// Wait for all messages to be processed
auto start = std::chrono::steady_clock::now();
while (g_stressProcessed.load() < totalMsgs) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
if (elapsed > 30000) break; // 30s timeout
}
stopFaults = true;
faultInjector.join();
ASSERT_GE(g_stressProcessed.load(), totalMsgs)
<< "Pool failed to process all messages under fault injection: "
<< g_stressProcessed.load() << "/" << totalMsgs;
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running still negative after stress test";
cleanupPool();
}
// ============================================================================
// Test 10: Real-usage happy path — every message does beforeBlocking +
// afterRecoverFromBlocking inside the worker thread (no fault injection)
// ============================================================================
static std::atomic<int32_t> g_happyProcessed{0};
static void happyBlockingProcessFp(SQueueInfo *pQInfo, void *pMsg) {
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
int32_t code = cb->beforeBlocking(cb->pPool);
EXPECT_EQ(code, TSDB_CODE_SUCCESS);
// Simulate I/O blocking (RPC wait, disk read, etc.)
std::this_thread::sleep_for(std::chrono::milliseconds(2));
code = cb->afterRecoverFromBlocking(cb->pPool);
EXPECT_EQ(code, TSDB_CODE_SUCCESS);
}
g_happyProcessed.fetch_add(1);
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, RealUsageBlockingWorkload) {
g_happyProcessed = 0;
pool.min = 4;
pool.num = 4;
pool.max = 32;
ASSERT_TRUE(initPool(happyBlockingProcessFp));
const int32_t totalMsgs = 2000;
ASSERT_TRUE(submitMessages(totalMsgs));
// Wait for all messages
auto start = std::chrono::steady_clock::now();
while (g_happyProcessed.load() < totalMsgs) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
ASSERT_LT(elapsed, 60000) << "timeout: processed " << g_happyProcessed.load()
<< "/" << totalMsgs;
// Invariant: running must always be >= 0
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running went negative during normal workload";
}
ASSERT_EQ(g_happyProcessed.load(), totalMsgs);
// Final state check
int32_t running = GET_RUNNING_N(pool.activeRunningN);
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
ASSERT_GE(running, 0);
ASSERT_GE(active, 0);
cleanupPool();
}
// ============================================================================
// Test 11: Sustained workload — multiple rounds of submit + drain, simulating
// a long-running service that continuously processes queries
// ============================================================================
static std::atomic<int32_t> g_sustainedProcessed{0};
static void sustainedProcessFp(SQueueInfo *pQInfo, void *pMsg) {
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
std::this_thread::sleep_for(std::chrono::microseconds(500));
cb->afterRecoverFromBlocking(cb->pPool);
}
g_sustainedProcessed.fetch_add(1);
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, SustainedMultiRoundWorkload) {
g_sustainedProcessed = 0;
pool.min = 4;
pool.num = 4;
pool.max = 32;
ASSERT_TRUE(initPool(sustainedProcessFp));
const int32_t rounds = 50;
const int32_t msgsPerRound = 100;
for (int32_t r = 0; r < rounds; ++r) {
int32_t baseline = g_sustainedProcessed.load();
ASSERT_TRUE(submitMessages(msgsPerRound))
<< "failed to submit messages in round " << r;
auto start = std::chrono::steady_clock::now();
while (g_sustainedProcessed.load() < baseline + msgsPerRound) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
ASSERT_LT(elapsed, 10000)
<< "round " << r << " timeout: processed "
<< (g_sustainedProcessed.load() - baseline) << "/" << msgsPerRound;
}
// Between rounds: invariant must hold
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running negative after round " << r;
}
ASSERT_EQ(g_sustainedProcessed.load(), rounds * msgsPerRound);
cleanupPool();
}
// ============================================================================
// Test 12: Concurrent producers — multiple threads submit messages while
// workers are actively processing with blocking
// ============================================================================
static std::atomic<int32_t> g_concurrentProducerProcessed{0};
static void concurrentProducerProcessFp(SQueueInfo *pQInfo, void *pMsg) {
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
std::this_thread::sleep_for(std::chrono::microseconds(200));
cb->afterRecoverFromBlocking(cb->pPool);
}
g_concurrentProducerProcessed.fetch_add(1);
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, ConcurrentProducersWithBlockingWorkers) {
g_concurrentProducerProcessed = 0;
pool.min = 4;
pool.num = 4;
pool.max = 64;
ASSERT_TRUE(initPool(concurrentProducerProcessFp));
const int32_t numProducers = 16;
const int32_t msgsPerProducer = 500;
const int32_t totalMsgs = numProducers * msgsPerProducer;
// Launch producer threads
std::vector<std::thread> producers;
std::atomic<int32_t> submitFailures{0};
for (int p = 0; p < numProducers; ++p) {
producers.emplace_back([this, msgsPerProducer, &submitFailures]() {
for (int32_t i = 0; i < msgsPerProducer; ++i) {
void *qitem = nullptr;
int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
if (code != 0) { submitFailures.fetch_add(1); continue; }
*(int32_t *)qitem = i;
code = taosWriteQitem(queue, qitem);
if (code != 0) { submitFailures.fetch_add(1); taosFreeQitem(qitem); }
}
});
}
for (auto &t : producers) t.join();
ASSERT_EQ(submitFailures.load(), 0) << "some messages failed to submit";
// Wait for all messages to be processed
auto start = std::chrono::steady_clock::now();
while (g_concurrentProducerProcessed.load() < totalMsgs) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
ASSERT_LT(elapsed, 120000)
<< "timeout: processed " << g_concurrentProducerProcessed.load()
<< "/" << totalMsgs;
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running went negative during concurrent produce";
}
ASSERT_EQ(g_concurrentProducerProcessed.load(), totalMsgs);
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0);
cleanupPool();
}
// ============================================================================
// Test 13: Pool scaling — start with min workers, submit burst to trigger
// dynamic scaling, verify pool grows and all messages are processed
// ============================================================================
static std::atomic<int32_t> g_scalingProcessed{0};
static void scalingProcessFp(SQueueInfo *pQInfo, void *pMsg) {
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
// Long blocking to force pool to scale up
std::this_thread::sleep_for(std::chrono::milliseconds(50));
cb->afterRecoverFromBlocking(cb->pPool);
}
g_scalingProcessed.fetch_add(1);
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, PoolScalingUnderLoad) {
g_scalingProcessed = 0;
pool.min = 2;
pool.num = 2; // start small
pool.max = 64; // allow significant scaling
ASSERT_TRUE(initPool(scalingProcessFp));
// Submit a burst of long-blocking messages to force worker scaling
const int32_t burstSize = 200;
ASSERT_TRUE(submitMessages(burstSize));
// Wait for processing
auto start = std::chrono::steady_clock::now();
while (g_scalingProcessed.load() < burstSize) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
ASSERT_LT(elapsed, 30000)
<< "timeout: processed " << g_scalingProcessed.load() << "/" << burstSize;
// running must stay non-negative during scaling
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running went negative during pool scaling";
}
ASSERT_EQ(g_scalingProcessed.load(), burstSize);
// Pool should have scaled beyond initial num=2
ASSERT_GT(pool.num, 2) << "pool did not scale up under load";
cleanupPool();
}
// ============================================================================
// Test 14: Mixed blocking and non-blocking — some messages block, others don't.
// Verifies the pool handles mixed workloads correctly.
// ============================================================================
static std::atomic<int32_t> g_mixedProcessed{0};
static void mixedProcessFp(SQueueInfo *pQInfo, void *pMsg) {
int32_t val = *(int32_t *)pMsg;
// Even-numbered messages block, odd-numbered ones don't
if (val % 2 == 0 && pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
std::this_thread::sleep_for(std::chrono::milliseconds(3));
cb->afterRecoverFromBlocking(cb->pPool);
}
g_mixedProcessed.fetch_add(1);
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, MixedBlockingAndNonBlocking) {
g_mixedProcessed = 0;
pool.min = 4;
pool.num = 4;
pool.max = 32;
ASSERT_TRUE(initPool(mixedProcessFp));
const int32_t totalMsgs = 2000;
ASSERT_TRUE(submitMessages(totalMsgs));
auto start = std::chrono::steady_clock::now();
while (g_mixedProcessed.load() < totalMsgs) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
ASSERT_LT(elapsed, 60000)
<< "timeout: processed " << g_mixedProcessed.load() << "/" << totalMsgs;
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running went negative during mixed workload";
}
ASSERT_EQ(g_mixedProcessed.load(), totalMsgs);
int32_t running = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(running, 0);
cleanupPool();
}
// ============================================================================
// Test 15: Two independent pools processing concurrently — simulates the real
// mnode scenario with queryWorker and mqueryWorker both active
// ============================================================================
static std::atomic<int32_t> g_dualQueryProcessed{0};
static std::atomic<int32_t> g_dualMqueryProcessed{0};
static void dualQueryProcessFp(SQueueInfo *pQInfo, void *pMsg) {
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
std::this_thread::sleep_for(std::chrono::microseconds(500));
cb->afterRecoverFromBlocking(cb->pPool);
}
g_dualQueryProcessed.fetch_add(1);
taosFreeQitem(pMsg);
}
static void dualMqueryProcessFp(SQueueInfo *pQInfo, void *pMsg) {
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
// mquery tasks are heavier (merge)
std::this_thread::sleep_for(std::chrono::milliseconds(5));
cb->afterRecoverFromBlocking(cb->pPool);
}
g_dualMqueryProcessed.fetch_add(1);
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, DualPoolConcurrentProcessing) {
g_dualQueryProcessed = 0;
g_dualMqueryProcessed = 0;
// Pool 1: queryWorker (uses fixture pool, larger)
pool.min = 4;
pool.num = 4;
pool.max = 16;
pool.name = "mnode-query";
ASSERT_TRUE(initPool(dualQueryProcessFp));
// Pool 2: mqueryWorker (smaller, like real mnode)
SQueryAutoQWorkerPool mqueryPool{};
memset(&mqueryPool, 0, sizeof(mqueryPool));
mqueryPool.min = 2;
mqueryPool.num = 2;
mqueryPool.max = 4;
mqueryPool.name = "mnode-mquery";
ASSERT_EQ(tQueryAutoQWorkerInit(&mqueryPool), TSDB_CODE_SUCCESS);
STaosQueue *mq = tQueryAutoQWorkerAllocQueue(&mqueryPool, nullptr, dualMqueryProcessFp);
ASSERT_NE(mq, nullptr);
// Submit messages to both pools concurrently
const int32_t queryMsgs = 1000;
const int32_t mqueryMsgs = 300;
std::thread queryProducer([this, queryMsgs]() {
for (int32_t i = 0; i < queryMsgs; ++i) {
void *qitem = nullptr;
(void)taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
*(int32_t *)qitem = i;
(void)taosWriteQitem(queue, qitem);
}
});
std::thread mqueryProducer([mq, mqueryMsgs]() {
for (int32_t i = 0; i < mqueryMsgs; ++i) {
void *qitem = nullptr;
(void)taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
*(int32_t *)qitem = i;
(void)taosWriteQitem(mq, qitem);
}
});
queryProducer.join();
mqueryProducer.join();
// Wait for both pools to finish
auto start = std::chrono::steady_clock::now();
while (g_dualQueryProcessed.load() < queryMsgs ||
g_dualMqueryProcessed.load() < mqueryMsgs) {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
ASSERT_LT(elapsed, 30000)
<< "timeout: query=" << g_dualQueryProcessed.load() << "/" << queryMsgs
<< " mquery=" << g_dualMqueryProcessed.load() << "/" << mqueryMsgs;
// Neither pool should have negative running
int32_t r1 = GET_RUNNING_N(pool.activeRunningN);
int32_t r2 = GET_RUNNING_N(mqueryPool.activeRunningN);
ASSERT_GE(r1, 0) << "queryPool running negative";
ASSERT_GE(r2, 0) << "mqueryPool running negative";
}
ASSERT_EQ(g_dualQueryProcessed.load(), queryMsgs);
ASSERT_EQ(g_dualMqueryProcessed.load(), mqueryMsgs);
tQueryAutoQWorkerCleanup(&mqueryPool);
cleanupPool();
}
// ============================================================================
// Test 16: Heavy stress — 32 producer threads × 1000 messages each = 32000 msgs,
// pool max=128 workers, every message does blocking I/O.
// Runs for extended time to expose race conditions.
// ============================================================================
static std::atomic<int32_t> g_heavyProcessed{0};
static std::atomic<int32_t> g_heavyMinRunning{INT32_MAX};
static void heavyStressProcessFp(SQueueInfo *pQInfo, void *pMsg) {
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
// Variable blocking time to create diverse scheduling patterns
int32_t val = *(int32_t *)pMsg;
int32_t delayUs = 50 + (val % 200); // 50-250us
std::this_thread::sleep_for(std::chrono::microseconds(delayUs));
cb->afterRecoverFromBlocking(cb->pPool);
}
g_heavyProcessed.fetch_add(1);
// Track minimum running observed by any worker
auto *pool = (SQueryAutoQWorkerPool *)((SQueryAutoQWorkerPoolCB *)pQInfo->workerCb)->pPool;
int32_t running = GET_RUNNING_N(pool->activeRunningN);
int32_t curMin = g_heavyMinRunning.load();
while (running < curMin) {
if (g_heavyMinRunning.compare_exchange_weak(curMin, running)) break;
}
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, HeavyStress32Producers) {
g_heavyProcessed = 0;
g_heavyMinRunning = INT32_MAX;
pool.min = 4;
pool.num = 4;
pool.max = 128;
pool.name = "heavy-stress";
ASSERT_TRUE(initPool(heavyStressProcessFp));
const int32_t numProducers = 32;
const int32_t msgsPerProducer = 1000;
const int32_t totalMsgs = numProducers * msgsPerProducer;
// Launch producers
std::vector<std::thread> producers;
std::atomic<int32_t> submitFailures{0};
for (int p = 0; p < numProducers; ++p) {
producers.emplace_back([this, msgsPerProducer, p, &submitFailures]() {
for (int32_t i = 0; i < msgsPerProducer; ++i) {
void *qitem = nullptr;
int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
if (code != 0) { submitFailures.fetch_add(1); continue; }
*(int32_t *)qitem = p * msgsPerProducer + i;
code = taosWriteQitem(queue, qitem);
if (code != 0) { submitFailures.fetch_add(1); taosFreeQitem(qitem); }
}
});
}
for (auto &t : producers) t.join();
ASSERT_EQ(submitFailures.load(), 0) << "submit failures detected";
// Wait with continuous invariant checking
auto start = std::chrono::steady_clock::now();
int64_t checkCount = 0;
while (g_heavyProcessed.load() < totalMsgs) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
++checkCount;
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
ASSERT_LT(elapsed, 120000)
<< "timeout after " << elapsed << "ms: processed "
<< g_heavyProcessed.load() << "/" << totalMsgs;
int32_t running = GET_RUNNING_N(pool.activeRunningN);
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running=" << running << " at check #" << checkCount;
ASSERT_GE(active, 0) << "active=" << active << " at check #" << checkCount;
}
ASSERT_EQ(g_heavyProcessed.load(), totalMsgs);
ASSERT_GE(g_heavyMinRunning.load(), 0)
<< "minimum running observed by workers was " << g_heavyMinRunning.load();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
printf(" Heavy stress: %d msgs, %d producers, %d max workers, %" PRId64 "ms, "
"%" PRId64 " invariant checks passed\n",
totalMsgs, numProducers, pool.num, (int64_t)elapsed, (int64_t)checkCount);
cleanupPool();
}
// ============================================================================
// Test 17: Endurance — sustained load for >= 10 seconds with continuous
// producer pressure. 16 producers keep submitting at steady rate.
// Verifies no drift, no leak, no livelock over extended operation.
// ============================================================================
static std::atomic<int64_t> g_enduranceProcessed{0};
static std::atomic<int32_t> g_enduranceNegativeDetected{0};
static void enduranceProcessFp(SQueueInfo *pQInfo, void *pMsg) {
if (pQInfo->workerCb) {
auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb;
cb->beforeBlocking(cb->pPool);
// Realistic RPC-like latency: 100-500us
int32_t val = *(int32_t *)pMsg;
std::this_thread::sleep_for(std::chrono::microseconds(100 + (val % 400)));
cb->afterRecoverFromBlocking(cb->pPool);
auto *pool = (SQueryAutoQWorkerPool *)cb->pPool;
int32_t running = GET_RUNNING_N(pool->activeRunningN);
if (running < 0) g_enduranceNegativeDetected.fetch_add(1);
}
g_enduranceProcessed.fetch_add(1);
taosFreeQitem(pMsg);
}
TEST_F(QueryAutoQWorkerTest, EnduranceSustainedLoad) {
g_enduranceProcessed = 0;
g_enduranceNegativeDetected = 0;
pool.min = 4;
pool.num = 4;
pool.max = 64;
pool.name = "endurance";
ASSERT_TRUE(initPool(enduranceProcessFp));
const int32_t numProducers = 16;
const int64_t durationMs = 10000; // run for 10 seconds
std::atomic<bool> stopProducers{false};
std::atomic<int64_t> totalSubmitted{0};
std::atomic<int32_t> submitFailures{0};
// Producers: each continuously submits messages until told to stop
std::vector<std::thread> producers;
for (int p = 0; p < numProducers; ++p) {
producers.emplace_back([this, p, &stopProducers, &totalSubmitted, &submitFailures]() {
int32_t seq = 0;
while (!stopProducers.load()) {
void *qitem = nullptr;
int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem);
if (code != 0) { submitFailures.fetch_add(1); continue; }
*(int32_t *)qitem = p * 100000 + seq++;
code = taosWriteQitem(queue, qitem);
if (code != 0) {
submitFailures.fetch_add(1);
taosFreeQitem(qitem);
continue;
}
totalSubmitted.fetch_add(1);
// Steady-rate: ~1 msg every 100-300us per producer
std::this_thread::sleep_for(std::chrono::microseconds(100 + (seq % 200)));
}
});
}
// Monitor: check invariants throughout the run
auto start = std::chrono::steady_clock::now();
int64_t checkCount = 0;
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
++checkCount;
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
if (elapsed >= durationMs) break;
int32_t running = GET_RUNNING_N(pool.activeRunningN);
int32_t active = GET_ACTIVE_N(pool.activeRunningN);
ASSERT_GE(running, 0) << "running=" << running << " at " << elapsed << "ms";
ASSERT_GE(active, 0) << "active=" << active << " at " << elapsed << "ms";
}
// Stop producers
stopProducers = true;
for (auto &t : producers) t.join();
int64_t submitted = totalSubmitted.load();
ASSERT_GT(submitted, 0) << "no messages were submitted";
// Wait for remaining messages to drain
auto drainStart = std::chrono::steady_clock::now();
while (g_enduranceProcessed.load() < submitted) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - drainStart)
.count();
ASSERT_LT(elapsed, 60000)
<< "drain timeout: processed " << g_enduranceProcessed.load()
<< "/" << submitted;
}
ASSERT_EQ(g_enduranceProcessed.load(), submitted);
ASSERT_EQ(g_enduranceNegativeDetected.load(), 0)
<< "negative running detected " << g_enduranceNegativeDetected.load()
<< " times inside workers";
ASSERT_EQ(submitFailures.load(), 0) << "submit failures: " << submitFailures.load();
int32_t finalRunning = GET_RUNNING_N(pool.activeRunningN);
ASSERT_GE(finalRunning, 0);
auto totalElapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count();
printf(" Endurance: %" PRId64 " msgs submitted/processed, %d producers, "
"%d max workers, %" PRId64 "ms total, %" PRId64 " invariant checks\n",
submitted, numProducers, pool.num, (int64_t)totalElapsed, checkCount);
cleanupPool();
}