/** * Unit tests for SQueryAutoQWorkerPool self-healing mechanisms. * * Tests verify that the worker pool can: * 1. Prevent running counter from going negative (atomicSafeDecRunning) * 2. Detect and correct negative running counters (healRunning) * 3. Survive cross-pool beforeBlocking/afterRecoverFromBlocking abuse * 4. Continue processing messages after self-healing */ #include #include #include #include #include extern "C" { #include "os.h" #include "tqueue.h" #include "tworker.h" } extern int64_t tsQueueMemoryAllowed; #define GET_ACTIVE_N(int64_val) (int32_t)((int64_val) >> 32) #define GET_RUNNING_N(int64_val) (int32_t)(int64_val & 0xFFFFFFFF) #define MAKE_ACTIVE_RUNNING(active, running) (((int64_t)(active) << 32) | ((int64_t)(uint32_t)(running))) // ============================================================================ // Test fixture // ============================================================================ class QueryAutoQWorkerTest : public ::testing::Test { public: SQueryAutoQWorkerPool pool{}; STaosQueue *queue{nullptr}; std::atomic processedCount{0}; std::atomic blockingCallCount{0}; void SetUp() override { tsQueueMemoryAllowed = 1024 * 1024; memset(&pool, 0, sizeof(pool)); pool.num = 4; pool.max = 8; pool.min = 2; pool.name = "test-worker"; } void TearDown() override { // Cleanup is done per-test where pool was initialized } // Initialize pool and allocate queue with given FItem bool initPool(FItem fp, void *ahandle = nullptr) { int32_t code = tQueryAutoQWorkerInit(&pool); if (code != TSDB_CODE_SUCCESS) return false; queue = tQueryAutoQWorkerAllocQueue(&pool, ahandle ? ahandle : (void *)this, fp); return queue != nullptr; } void cleanupPool() { tQueryAutoQWorkerCleanup(&pool); } // Submit N messages to the pool's queue bool submitMessages(int32_t count) { for (int32_t i = 0; i < count; ++i) { void *qitem = nullptr; int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem); if (code != 0) return false; *(int32_t *)qitem = i; code = taosWriteQitem(queue, qitem); if (code != 0) return false; } return true; } // Wait for processedCount to reach target, with timeout bool waitForProcessed(int32_t target, int32_t timeoutMs = 5000) { auto start = std::chrono::steady_clock::now(); while (processedCount.load() < target) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); if (elapsed > timeoutMs) return false; } return true; } }; // ============================================================================ // Simple message processor for basic tests // ============================================================================ static void simpleProcessFp(SQueueInfo *pQInfo, void *pMsg) { auto *self = (QueryAutoQWorkerTest *)pQInfo->ahandle; self->processedCount.fetch_add(1); taosFreeQitem(pMsg); } // ============================================================================ // Test 1: beforeBlocking on correct pool doesn't go negative // ============================================================================ TEST_F(QueryAutoQWorkerTest, BeforeBlockingCorrectPool) { ASSERT_TRUE(initPool(simpleProcessFp)); // Let pool start workers by submitting messages ASSERT_TRUE(submitMessages(10)); ASSERT_TRUE(waitForProcessed(10)); // Directly call beforeBlocking on the correct pool // This simulates a normal blocking operation int32_t runningBefore = GET_RUNNING_N(pool.activeRunningN); int32_t code = pool.pCb->beforeBlocking(pool.pCb->pPool); ASSERT_EQ(code, TSDB_CODE_SUCCESS); // running should not be negative int32_t runningAfter = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(runningAfter, 0) << "running went negative after beforeBlocking"; // Recover code = pool.pCb->afterRecoverFromBlocking(pool.pCb->pPool); ASSERT_EQ(code, TSDB_CODE_SUCCESS); int32_t runningRecovered = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(runningRecovered, 0) << "running went negative after recoverFromBlocking"; cleanupPool(); } // ============================================================================ // Test 2: Repeated beforeBlocking without matching recovery (simulates cross-pool abuse) // Running counter must never go below 0 // ============================================================================ TEST_F(QueryAutoQWorkerTest, RepeatedBeforeBlockingFloorProtection) { ASSERT_TRUE(initPool(simpleProcessFp)); // Submit and wait for some messages to get the pool active ASSERT_TRUE(submitMessages(4)); ASSERT_TRUE(waitForProcessed(4)); // Abuse: call beforeBlocking many more times than there are running threads // In the old code this would drive running to large negative values for (int32_t i = 0; i < 20; i++) { pool.pCb->beforeBlocking(pool.pCb->pPool); int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running went negative after " << (i + 1) << " beforeBlocking calls"; } cleanupPool(); } // ============================================================================ // Test 3: Direct injection of negative running, verify healRunning via // afterRecoverFromBlocking // ============================================================================ TEST_F(QueryAutoQWorkerTest, HealNegativeRunningViaRecover) { ASSERT_TRUE(initPool(simpleProcessFp)); // Inject negative running directly int32_t active = GET_ACTIVE_N(pool.activeRunningN); pool.activeRunningN = MAKE_ACTIVE_RUNNING(active > 0 ? active : 1, -5); int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_EQ(running, -5) << "injection failed"; // afterRecoverFromBlocking should trigger healRunning int32_t code = pool.pCb->afterRecoverFromBlocking(pool.pCb->pPool); ASSERT_EQ(code, TSDB_CODE_SUCCESS); running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "healRunning failed to correct negative running"; cleanupPool(); } // ============================================================================ // Test 4: Direct injection of negative running, verify pool can still // process messages (end-to-end self-healing) // ============================================================================ static void blockingProcessFp(SQueueInfo *pQInfo, void *pMsg) { auto *self = (QueryAutoQWorkerTest *)pQInfo->ahandle; // Simulate a blocking operation using the pool's callbacks if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); // Simulate some I/O wait std::this_thread::sleep_for(std::chrono::milliseconds(5)); cb->afterRecoverFromBlocking(cb->pPool); } self->processedCount.fetch_add(1); taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, PoolRecoverAfterNegativeRunningInjection) { ASSERT_TRUE(initPool(blockingProcessFp)); // Process some messages normally first ASSERT_TRUE(submitMessages(8)); ASSERT_TRUE(waitForProcessed(8)); int32_t baseline = processedCount.load(); // Inject anomalous negative running int32_t active = GET_ACTIVE_N(pool.activeRunningN); pool.activeRunningN = MAKE_ACTIVE_RUNNING(active > 0 ? active : pool.num, -3); // Submit more messages - pool should self-heal and process them ASSERT_TRUE(submitMessages(8)); ASSERT_TRUE(waitForProcessed(baseline + 8, 10000)) << "Pool failed to recover: only processed " << processedCount.load() << " of expected " << (baseline + 8); // Verify running is non-negative int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running still negative after self-healing"; cleanupPool(); } // ============================================================================ // Test 5: Concurrent cross-pool abuse simulation // Multiple threads call beforeBlocking on a pool that's not theirs // ============================================================================ TEST_F(QueryAutoQWorkerTest, ConcurrentCrossPoolAbuse) { ASSERT_TRUE(initPool(simpleProcessFp)); // Start workers ASSERT_TRUE(submitMessages(4)); ASSERT_TRUE(waitForProcessed(4)); // Simulate concurrent cross-pool abuse: multiple threads calling // beforeBlocking without matching afterRecoverFromBlocking std::vector abusers; std::atomic negativeDetected{0}; for (int i = 0; i < 8; i++) { abusers.emplace_back([this, &negativeDetected]() { for (int j = 0; j < 50; j++) { pool.pCb->beforeBlocking(pool.pCb->pPool); int32_t running = GET_RUNNING_N(pool.activeRunningN); if (running < 0) { negativeDetected.fetch_add(1); } } }); } for (auto &t : abusers) t.join(); // With self-healing, running should never have gone negative ASSERT_EQ(negativeDetected.load(), 0) << "running went negative " << negativeDetected.load() << " times during concurrent abuse"; int32_t finalRunning = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(finalRunning, 0) << "final running is negative"; cleanupPool(); } // ============================================================================ // Test 6: Paired beforeBlocking/afterRecoverFromBlocking maintains invariant // ============================================================================ TEST_F(QueryAutoQWorkerTest, PairedBlockingRecoveryMaintainsInvariant) { ASSERT_TRUE(initPool(simpleProcessFp)); ASSERT_TRUE(submitMessages(4)); ASSERT_TRUE(waitForProcessed(4)); // Properly paired blocking/recovery cycles for (int i = 0; i < 20; i++) { int32_t code = pool.pCb->beforeBlocking(pool.pCb->pPool); ASSERT_EQ(code, TSDB_CODE_SUCCESS); int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running negative after beforeBlocking, iteration " << i; code = pool.pCb->afterRecoverFromBlocking(pool.pCb->pPool); ASSERT_EQ(code, TSDB_CODE_SUCCESS); running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running negative after recovery, iteration " << i; } cleanupPool(); } // ============================================================================ // Test 7: Two pools - verify cross-pool callback doesn't corrupt either pool // ============================================================================ static std::atomic g_pool1Processed{0}; static std::atomic g_pool2Processed{0}; static void pool1ProcessFp(SQueueInfo *pQInfo, void *pMsg) { g_pool1Processed.fetch_add(1); taosFreeQitem(pMsg); } static void pool2ProcessFp(SQueueInfo *pQInfo, void *pMsg) { g_pool2Processed.fetch_add(1); taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, TwoPoolsCrossCallbackProtection) { g_pool1Processed = 0; g_pool2Processed = 0; // Pool 1 (uses the fixture pool) ASSERT_TRUE(initPool(pool1ProcessFp)); // Pool 2 SQueryAutoQWorkerPool pool2{}; memset(&pool2, 0, sizeof(pool2)); pool2.num = 4; pool2.max = 8; pool2.min = 2; pool2.name = "test-worker-2"; ASSERT_EQ(tQueryAutoQWorkerInit(&pool2), TSDB_CODE_SUCCESS); STaosQueue *q2 = tQueryAutoQWorkerAllocQueue(&pool2, nullptr, pool2ProcessFp); ASSERT_NE(q2, nullptr); // Submit messages to both pools ASSERT_TRUE(submitMessages(10)); // Submit to pool2 for (int32_t i = 0; i < 10; i++) { void *qitem = nullptr; ASSERT_EQ(taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem), 0); *(int32_t *)qitem = i; ASSERT_EQ(taosWriteQitem(q2, qitem), 0); } // Wait for messages to be processed auto start = std::chrono::steady_clock::now(); while (g_pool1Processed.load() < 10 || g_pool2Processed.load() < 10) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); if (elapsed > 5000) break; } ASSERT_GE(g_pool1Processed.load(), 10); ASSERT_GE(g_pool2Processed.load(), 10); // Cross-pool abuse: call pool1's beforeBlocking with pool2's pPool // This is the exact scenario that caused the original deadlock for (int i = 0; i < 10; i++) { pool.pCb->beforeBlocking(pool2.pCb->pPool); // pool1's cb on pool2's data } // Pool2's running should not go negative int32_t pool2Running = GET_RUNNING_N(pool2.activeRunningN); ASSERT_GE(pool2Running, 0) << "pool2 running went negative from cross-pool abuse"; // Pool1's running should be unaffected int32_t pool1Running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(pool1Running, 0) << "pool1 running went negative"; // Both pools should still be able to process messages g_pool1Processed = 0; g_pool2Processed = 0; ASSERT_TRUE(submitMessages(5)); for (int32_t i = 0; i < 5; i++) { void *qitem = nullptr; ASSERT_EQ(taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem), 0); *(int32_t *)qitem = i; ASSERT_EQ(taosWriteQitem(q2, qitem), 0); } start = std::chrono::steady_clock::now(); while (g_pool1Processed.load() < 5 || g_pool2Processed.load() < 5) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); if (elapsed > 10000) break; } ASSERT_GE(g_pool1Processed.load(), 5) << "pool1 stopped processing after cross-pool abuse"; ASSERT_GE(g_pool2Processed.load(), 5) << "pool2 stopped processing after cross-pool abuse"; tQueryAutoQWorkerCleanup(&pool2); cleanupPool(); } // ============================================================================ // Test 8: activeRunningN bit-packing correctness // ============================================================================ TEST_F(QueryAutoQWorkerTest, ActiveRunningBitPacking) { // Verify the bit-packing macros work correctly int64_t val; // Case 1: Both positive val = MAKE_ACTIVE_RUNNING(5, 3); ASSERT_EQ(GET_ACTIVE_N(val), 5); ASSERT_EQ(GET_RUNNING_N(val), 3); // Case 2: active=0, running=0 val = MAKE_ACTIVE_RUNNING(0, 0); ASSERT_EQ(GET_ACTIVE_N(val), 0); ASSERT_EQ(GET_RUNNING_N(val), 0); // Case 3: Negative running (what happens when bug corrupts it) // -1 in 32-bit = 0xFFFFFFFF, but GET_RUNNING_N casts to int32_t val = MAKE_ACTIVE_RUNNING(4, -1); int32_t running = GET_RUNNING_N(val); ASSERT_EQ(running, -1) << "negative running encoding/decoding mismatch"; // Case 4: Large values val = MAKE_ACTIVE_RUNNING(100, 50); ASSERT_EQ(GET_ACTIVE_N(val), 100); ASSERT_EQ(GET_RUNNING_N(val), 50); } // ============================================================================ // Test 9: Stress test - heavy concurrent blocking/recovery with fault injection // ============================================================================ static std::atomic g_stressProcessed{0}; static void stressProcessFp(SQueueInfo *pQInfo, void *pMsg) { g_stressProcessed.fetch_add(1); if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); std::this_thread::sleep_for(std::chrono::microseconds(100)); cb->afterRecoverFromBlocking(cb->pPool); } taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, StressConcurrentBlockingWithFaultInjection) { g_stressProcessed = 0; ASSERT_TRUE(initPool(stressProcessFp)); const int32_t totalMsgs = 100; // Submit messages ASSERT_TRUE(submitMessages(totalMsgs)); // Concurrently inject faults while messages are being processed std::atomic stopFaults{false}; std::thread faultInjector([this, &stopFaults]() { int iteration = 0; while (!stopFaults.load()) { if (iteration % 10 == 0) { // Inject negative running int32_t active = GET_ACTIVE_N(pool.activeRunningN); if (active <= 0) active = pool.num; pool.activeRunningN = MAKE_ACTIVE_RUNNING(active, -2); } std::this_thread::sleep_for(std::chrono::milliseconds(5)); iteration++; } }); // Wait for all messages to be processed auto start = std::chrono::steady_clock::now(); while (g_stressProcessed.load() < totalMsgs) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); if (elapsed > 30000) break; // 30s timeout } stopFaults = true; faultInjector.join(); ASSERT_GE(g_stressProcessed.load(), totalMsgs) << "Pool failed to process all messages under fault injection: " << g_stressProcessed.load() << "/" << totalMsgs; int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running still negative after stress test"; cleanupPool(); } // ============================================================================ // Test 10: Real-usage happy path — every message does beforeBlocking + // afterRecoverFromBlocking inside the worker thread (no fault injection) // ============================================================================ static std::atomic g_happyProcessed{0}; static void happyBlockingProcessFp(SQueueInfo *pQInfo, void *pMsg) { if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; int32_t code = cb->beforeBlocking(cb->pPool); EXPECT_EQ(code, TSDB_CODE_SUCCESS); // Simulate I/O blocking (RPC wait, disk read, etc.) std::this_thread::sleep_for(std::chrono::milliseconds(2)); code = cb->afterRecoverFromBlocking(cb->pPool); EXPECT_EQ(code, TSDB_CODE_SUCCESS); } g_happyProcessed.fetch_add(1); taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, RealUsageBlockingWorkload) { g_happyProcessed = 0; pool.min = 4; pool.num = 4; pool.max = 32; ASSERT_TRUE(initPool(happyBlockingProcessFp)); const int32_t totalMsgs = 2000; ASSERT_TRUE(submitMessages(totalMsgs)); // Wait for all messages auto start = std::chrono::steady_clock::now(); while (g_happyProcessed.load() < totalMsgs) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); ASSERT_LT(elapsed, 60000) << "timeout: processed " << g_happyProcessed.load() << "/" << totalMsgs; // Invariant: running must always be >= 0 int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running went negative during normal workload"; } ASSERT_EQ(g_happyProcessed.load(), totalMsgs); // Final state check int32_t running = GET_RUNNING_N(pool.activeRunningN); int32_t active = GET_ACTIVE_N(pool.activeRunningN); ASSERT_GE(running, 0); ASSERT_GE(active, 0); cleanupPool(); } // ============================================================================ // Test 11: Sustained workload — multiple rounds of submit + drain, simulating // a long-running service that continuously processes queries // ============================================================================ static std::atomic g_sustainedProcessed{0}; static void sustainedProcessFp(SQueueInfo *pQInfo, void *pMsg) { if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); std::this_thread::sleep_for(std::chrono::microseconds(500)); cb->afterRecoverFromBlocking(cb->pPool); } g_sustainedProcessed.fetch_add(1); taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, SustainedMultiRoundWorkload) { g_sustainedProcessed = 0; pool.min = 4; pool.num = 4; pool.max = 32; ASSERT_TRUE(initPool(sustainedProcessFp)); const int32_t rounds = 50; const int32_t msgsPerRound = 100; for (int32_t r = 0; r < rounds; ++r) { int32_t baseline = g_sustainedProcessed.load(); ASSERT_TRUE(submitMessages(msgsPerRound)) << "failed to submit messages in round " << r; auto start = std::chrono::steady_clock::now(); while (g_sustainedProcessed.load() < baseline + msgsPerRound) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); ASSERT_LT(elapsed, 10000) << "round " << r << " timeout: processed " << (g_sustainedProcessed.load() - baseline) << "/" << msgsPerRound; } // Between rounds: invariant must hold int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running negative after round " << r; } ASSERT_EQ(g_sustainedProcessed.load(), rounds * msgsPerRound); cleanupPool(); } // ============================================================================ // Test 12: Concurrent producers — multiple threads submit messages while // workers are actively processing with blocking // ============================================================================ static std::atomic g_concurrentProducerProcessed{0}; static void concurrentProducerProcessFp(SQueueInfo *pQInfo, void *pMsg) { if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); std::this_thread::sleep_for(std::chrono::microseconds(200)); cb->afterRecoverFromBlocking(cb->pPool); } g_concurrentProducerProcessed.fetch_add(1); taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, ConcurrentProducersWithBlockingWorkers) { g_concurrentProducerProcessed = 0; pool.min = 4; pool.num = 4; pool.max = 64; ASSERT_TRUE(initPool(concurrentProducerProcessFp)); const int32_t numProducers = 16; const int32_t msgsPerProducer = 500; const int32_t totalMsgs = numProducers * msgsPerProducer; // Launch producer threads std::vector producers; std::atomic submitFailures{0}; for (int p = 0; p < numProducers; ++p) { producers.emplace_back([this, msgsPerProducer, &submitFailures]() { for (int32_t i = 0; i < msgsPerProducer; ++i) { void *qitem = nullptr; int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem); if (code != 0) { submitFailures.fetch_add(1); continue; } *(int32_t *)qitem = i; code = taosWriteQitem(queue, qitem); if (code != 0) { submitFailures.fetch_add(1); taosFreeQitem(qitem); } } }); } for (auto &t : producers) t.join(); ASSERT_EQ(submitFailures.load(), 0) << "some messages failed to submit"; // Wait for all messages to be processed auto start = std::chrono::steady_clock::now(); while (g_concurrentProducerProcessed.load() < totalMsgs) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); ASSERT_LT(elapsed, 120000) << "timeout: processed " << g_concurrentProducerProcessed.load() << "/" << totalMsgs; int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running went negative during concurrent produce"; } ASSERT_EQ(g_concurrentProducerProcessed.load(), totalMsgs); int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0); cleanupPool(); } // ============================================================================ // Test 13: Pool scaling — start with min workers, submit burst to trigger // dynamic scaling, verify pool grows and all messages are processed // ============================================================================ static std::atomic g_scalingProcessed{0}; static void scalingProcessFp(SQueueInfo *pQInfo, void *pMsg) { if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); // Long blocking to force pool to scale up std::this_thread::sleep_for(std::chrono::milliseconds(50)); cb->afterRecoverFromBlocking(cb->pPool); } g_scalingProcessed.fetch_add(1); taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, PoolScalingUnderLoad) { g_scalingProcessed = 0; pool.min = 2; pool.num = 2; // start small pool.max = 64; // allow significant scaling ASSERT_TRUE(initPool(scalingProcessFp)); // Submit a burst of long-blocking messages to force worker scaling const int32_t burstSize = 200; ASSERT_TRUE(submitMessages(burstSize)); // Wait for processing auto start = std::chrono::steady_clock::now(); while (g_scalingProcessed.load() < burstSize) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); ASSERT_LT(elapsed, 30000) << "timeout: processed " << g_scalingProcessed.load() << "/" << burstSize; // running must stay non-negative during scaling int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running went negative during pool scaling"; } ASSERT_EQ(g_scalingProcessed.load(), burstSize); // Pool should have scaled beyond initial num=2 ASSERT_GT(pool.num, 2) << "pool did not scale up under load"; cleanupPool(); } // ============================================================================ // Test 14: Mixed blocking and non-blocking — some messages block, others don't. // Verifies the pool handles mixed workloads correctly. // ============================================================================ static std::atomic g_mixedProcessed{0}; static void mixedProcessFp(SQueueInfo *pQInfo, void *pMsg) { int32_t val = *(int32_t *)pMsg; // Even-numbered messages block, odd-numbered ones don't if (val % 2 == 0 && pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); std::this_thread::sleep_for(std::chrono::milliseconds(3)); cb->afterRecoverFromBlocking(cb->pPool); } g_mixedProcessed.fetch_add(1); taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, MixedBlockingAndNonBlocking) { g_mixedProcessed = 0; pool.min = 4; pool.num = 4; pool.max = 32; ASSERT_TRUE(initPool(mixedProcessFp)); const int32_t totalMsgs = 2000; ASSERT_TRUE(submitMessages(totalMsgs)); auto start = std::chrono::steady_clock::now(); while (g_mixedProcessed.load() < totalMsgs) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); ASSERT_LT(elapsed, 60000) << "timeout: processed " << g_mixedProcessed.load() << "/" << totalMsgs; int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running went negative during mixed workload"; } ASSERT_EQ(g_mixedProcessed.load(), totalMsgs); int32_t running = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(running, 0); cleanupPool(); } // ============================================================================ // Test 15: Two independent pools processing concurrently — simulates the real // mnode scenario with queryWorker and mqueryWorker both active // ============================================================================ static std::atomic g_dualQueryProcessed{0}; static std::atomic g_dualMqueryProcessed{0}; static void dualQueryProcessFp(SQueueInfo *pQInfo, void *pMsg) { if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); std::this_thread::sleep_for(std::chrono::microseconds(500)); cb->afterRecoverFromBlocking(cb->pPool); } g_dualQueryProcessed.fetch_add(1); taosFreeQitem(pMsg); } static void dualMqueryProcessFp(SQueueInfo *pQInfo, void *pMsg) { if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); // mquery tasks are heavier (merge) std::this_thread::sleep_for(std::chrono::milliseconds(5)); cb->afterRecoverFromBlocking(cb->pPool); } g_dualMqueryProcessed.fetch_add(1); taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, DualPoolConcurrentProcessing) { g_dualQueryProcessed = 0; g_dualMqueryProcessed = 0; // Pool 1: queryWorker (uses fixture pool, larger) pool.min = 4; pool.num = 4; pool.max = 16; pool.name = "mnode-query"; ASSERT_TRUE(initPool(dualQueryProcessFp)); // Pool 2: mqueryWorker (smaller, like real mnode) SQueryAutoQWorkerPool mqueryPool{}; memset(&mqueryPool, 0, sizeof(mqueryPool)); mqueryPool.min = 2; mqueryPool.num = 2; mqueryPool.max = 4; mqueryPool.name = "mnode-mquery"; ASSERT_EQ(tQueryAutoQWorkerInit(&mqueryPool), TSDB_CODE_SUCCESS); STaosQueue *mq = tQueryAutoQWorkerAllocQueue(&mqueryPool, nullptr, dualMqueryProcessFp); ASSERT_NE(mq, nullptr); // Submit messages to both pools concurrently const int32_t queryMsgs = 1000; const int32_t mqueryMsgs = 300; std::thread queryProducer([this, queryMsgs]() { for (int32_t i = 0; i < queryMsgs; ++i) { void *qitem = nullptr; (void)taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem); *(int32_t *)qitem = i; (void)taosWriteQitem(queue, qitem); } }); std::thread mqueryProducer([mq, mqueryMsgs]() { for (int32_t i = 0; i < mqueryMsgs; ++i) { void *qitem = nullptr; (void)taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem); *(int32_t *)qitem = i; (void)taosWriteQitem(mq, qitem); } }); queryProducer.join(); mqueryProducer.join(); // Wait for both pools to finish auto start = std::chrono::steady_clock::now(); while (g_dualQueryProcessed.load() < queryMsgs || g_dualMqueryProcessed.load() < mqueryMsgs) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); ASSERT_LT(elapsed, 30000) << "timeout: query=" << g_dualQueryProcessed.load() << "/" << queryMsgs << " mquery=" << g_dualMqueryProcessed.load() << "/" << mqueryMsgs; // Neither pool should have negative running int32_t r1 = GET_RUNNING_N(pool.activeRunningN); int32_t r2 = GET_RUNNING_N(mqueryPool.activeRunningN); ASSERT_GE(r1, 0) << "queryPool running negative"; ASSERT_GE(r2, 0) << "mqueryPool running negative"; } ASSERT_EQ(g_dualQueryProcessed.load(), queryMsgs); ASSERT_EQ(g_dualMqueryProcessed.load(), mqueryMsgs); tQueryAutoQWorkerCleanup(&mqueryPool); cleanupPool(); } // ============================================================================ // Test 16: Heavy stress — 32 producer threads × 1000 messages each = 32000 msgs, // pool max=128 workers, every message does blocking I/O. // Runs for extended time to expose race conditions. // ============================================================================ static std::atomic g_heavyProcessed{0}; static std::atomic g_heavyMinRunning{INT32_MAX}; static void heavyStressProcessFp(SQueueInfo *pQInfo, void *pMsg) { if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); // Variable blocking time to create diverse scheduling patterns int32_t val = *(int32_t *)pMsg; int32_t delayUs = 50 + (val % 200); // 50-250us std::this_thread::sleep_for(std::chrono::microseconds(delayUs)); cb->afterRecoverFromBlocking(cb->pPool); } g_heavyProcessed.fetch_add(1); // Track minimum running observed by any worker auto *pool = (SQueryAutoQWorkerPool *)((SQueryAutoQWorkerPoolCB *)pQInfo->workerCb)->pPool; int32_t running = GET_RUNNING_N(pool->activeRunningN); int32_t curMin = g_heavyMinRunning.load(); while (running < curMin) { if (g_heavyMinRunning.compare_exchange_weak(curMin, running)) break; } taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, HeavyStress32Producers) { g_heavyProcessed = 0; g_heavyMinRunning = INT32_MAX; pool.min = 4; pool.num = 4; pool.max = 128; pool.name = "heavy-stress"; ASSERT_TRUE(initPool(heavyStressProcessFp)); const int32_t numProducers = 32; const int32_t msgsPerProducer = 1000; const int32_t totalMsgs = numProducers * msgsPerProducer; // Launch producers std::vector producers; std::atomic submitFailures{0}; for (int p = 0; p < numProducers; ++p) { producers.emplace_back([this, msgsPerProducer, p, &submitFailures]() { for (int32_t i = 0; i < msgsPerProducer; ++i) { void *qitem = nullptr; int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem); if (code != 0) { submitFailures.fetch_add(1); continue; } *(int32_t *)qitem = p * msgsPerProducer + i; code = taosWriteQitem(queue, qitem); if (code != 0) { submitFailures.fetch_add(1); taosFreeQitem(qitem); } } }); } for (auto &t : producers) t.join(); ASSERT_EQ(submitFailures.load(), 0) << "submit failures detected"; // Wait with continuous invariant checking auto start = std::chrono::steady_clock::now(); int64_t checkCount = 0; while (g_heavyProcessed.load() < totalMsgs) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); ++checkCount; auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); ASSERT_LT(elapsed, 120000) << "timeout after " << elapsed << "ms: processed " << g_heavyProcessed.load() << "/" << totalMsgs; int32_t running = GET_RUNNING_N(pool.activeRunningN); int32_t active = GET_ACTIVE_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running=" << running << " at check #" << checkCount; ASSERT_GE(active, 0) << "active=" << active << " at check #" << checkCount; } ASSERT_EQ(g_heavyProcessed.load(), totalMsgs); ASSERT_GE(g_heavyMinRunning.load(), 0) << "minimum running observed by workers was " << g_heavyMinRunning.load(); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); printf(" Heavy stress: %d msgs, %d producers, %d max workers, %" PRId64 "ms, " "%" PRId64 " invariant checks passed\n", totalMsgs, numProducers, pool.num, (int64_t)elapsed, (int64_t)checkCount); cleanupPool(); } // ============================================================================ // Test 17: Endurance — sustained load for >= 10 seconds with continuous // producer pressure. 16 producers keep submitting at steady rate. // Verifies no drift, no leak, no livelock over extended operation. // ============================================================================ static std::atomic g_enduranceProcessed{0}; static std::atomic g_enduranceNegativeDetected{0}; static void enduranceProcessFp(SQueueInfo *pQInfo, void *pMsg) { if (pQInfo->workerCb) { auto *cb = (SQueryAutoQWorkerPoolCB *)pQInfo->workerCb; cb->beforeBlocking(cb->pPool); // Realistic RPC-like latency: 100-500us int32_t val = *(int32_t *)pMsg; std::this_thread::sleep_for(std::chrono::microseconds(100 + (val % 400))); cb->afterRecoverFromBlocking(cb->pPool); auto *pool = (SQueryAutoQWorkerPool *)cb->pPool; int32_t running = GET_RUNNING_N(pool->activeRunningN); if (running < 0) g_enduranceNegativeDetected.fetch_add(1); } g_enduranceProcessed.fetch_add(1); taosFreeQitem(pMsg); } TEST_F(QueryAutoQWorkerTest, EnduranceSustainedLoad) { g_enduranceProcessed = 0; g_enduranceNegativeDetected = 0; pool.min = 4; pool.num = 4; pool.max = 64; pool.name = "endurance"; ASSERT_TRUE(initPool(enduranceProcessFp)); const int32_t numProducers = 16; const int64_t durationMs = 10000; // run for 10 seconds std::atomic stopProducers{false}; std::atomic totalSubmitted{0}; std::atomic submitFailures{0}; // Producers: each continuously submits messages until told to stop std::vector producers; for (int p = 0; p < numProducers; ++p) { producers.emplace_back([this, p, &stopProducers, &totalSubmitted, &submitFailures]() { int32_t seq = 0; while (!stopProducers.load()) { void *qitem = nullptr; int32_t code = taosAllocateQitem(sizeof(int32_t), DEF_QITEM, 0, &qitem); if (code != 0) { submitFailures.fetch_add(1); continue; } *(int32_t *)qitem = p * 100000 + seq++; code = taosWriteQitem(queue, qitem); if (code != 0) { submitFailures.fetch_add(1); taosFreeQitem(qitem); continue; } totalSubmitted.fetch_add(1); // Steady-rate: ~1 msg every 100-300us per producer std::this_thread::sleep_for(std::chrono::microseconds(100 + (seq % 200))); } }); } // Monitor: check invariants throughout the run auto start = std::chrono::steady_clock::now(); int64_t checkCount = 0; while (true) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); ++checkCount; auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); if (elapsed >= durationMs) break; int32_t running = GET_RUNNING_N(pool.activeRunningN); int32_t active = GET_ACTIVE_N(pool.activeRunningN); ASSERT_GE(running, 0) << "running=" << running << " at " << elapsed << "ms"; ASSERT_GE(active, 0) << "active=" << active << " at " << elapsed << "ms"; } // Stop producers stopProducers = true; for (auto &t : producers) t.join(); int64_t submitted = totalSubmitted.load(); ASSERT_GT(submitted, 0) << "no messages were submitted"; // Wait for remaining messages to drain auto drainStart = std::chrono::steady_clock::now(); while (g_enduranceProcessed.load() < submitted) { std::this_thread::sleep_for(std::chrono::milliseconds(50)); auto elapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - drainStart) .count(); ASSERT_LT(elapsed, 60000) << "drain timeout: processed " << g_enduranceProcessed.load() << "/" << submitted; } ASSERT_EQ(g_enduranceProcessed.load(), submitted); ASSERT_EQ(g_enduranceNegativeDetected.load(), 0) << "negative running detected " << g_enduranceNegativeDetected.load() << " times inside workers"; ASSERT_EQ(submitFailures.load(), 0) << "submit failures: " << submitFailures.load(); int32_t finalRunning = GET_RUNNING_N(pool.activeRunningN); ASSERT_GE(finalRunning, 0); auto totalElapsed = std::chrono::duration_cast( std::chrono::steady_clock::now() - start) .count(); printf(" Endurance: %" PRId64 " msgs submitted/processed, %d producers, " "%d max workers, %" PRId64 "ms total, %" PRId64 " invariant checks\n", submitted, numProducers, pool.num, (int64_t)totalElapsed, checkCount); cleanupPool(); }