// ThreadPool unit tests // Set a short idle timeout for faster shrink tests #define CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT 1 #include #include #include #include #include #include using namespace httplib; TEST(ThreadPoolTest, BasicTaskExecution) { ThreadPool pool(4); std::atomic count(0); for (int i = 0; i < 10; i++) { pool.enqueue([&count]() { count++; }); } pool.shutdown(); EXPECT_EQ(10, count.load()); } TEST(ThreadPoolTest, FixedPoolWhenMaxEqualsBase) { // max_n == 0 means max = base (fixed pool behavior) ThreadPool pool(4); std::atomic count(0); for (int i = 0; i < 100; i++) { pool.enqueue([&count]() { count++; }); } pool.shutdown(); EXPECT_EQ(100, count.load()); } TEST(ThreadPoolTest, DynamicScaleUp) { // base=2, max=8: block 2 base threads, then enqueue more tasks ThreadPool pool(2, 8); std::atomic active(0); std::atomic max_active(0); std::atomic completed(0); std::mutex barrier_mutex; std::condition_variable barrier_cv; bool release = false; // Occupy all base threads with blocking tasks for (int i = 0; i < 2; i++) { pool.enqueue([&]() { active++; { std::unique_lock lock(barrier_mutex); barrier_cv.wait(lock, [&] { return release; }); } active--; completed++; }); } // Wait for base threads to be occupied std::this_thread::sleep_for(std::chrono::milliseconds(100)); // These should trigger dynamic thread creation for (int i = 0; i < 4; i++) { pool.enqueue([&]() { int cur = ++active; // Track peak active count int prev = max_active.load(); while (cur > prev && !max_active.compare_exchange_weak(prev, cur)) {} std::this_thread::sleep_for(std::chrono::milliseconds(50)); active--; completed++; }); } // Wait for dynamic tasks to complete std::this_thread::sleep_for(std::chrono::milliseconds(500)); // Release the blocking tasks { std::unique_lock lock(barrier_mutex); release = true; } barrier_cv.notify_all(); pool.shutdown(); EXPECT_EQ(6, completed.load()); // More than 2 threads were active simultaneously EXPECT_GT(max_active.load(), 2); } TEST(ThreadPoolTest, DynamicShrinkAfterIdle) { // CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT is set to 1 second ThreadPool pool(2, 8); std::atomic completed(0); // Enqueue tasks that require dynamic threads for (int i = 0; i < 8; i++) { pool.enqueue([&]() { std::this_thread::sleep_for(std::chrono::milliseconds(100)); completed++; }); } // Wait for all tasks to complete + idle timeout + margin std::this_thread::sleep_for(std::chrono::milliseconds(2500)); // Now enqueue a simple task to verify the pool still works // (base threads are still alive) std::atomic final_task_done(false); pool.enqueue([&]() { final_task_done = true; }); std::this_thread::sleep_for(std::chrono::milliseconds(100)); pool.shutdown(); EXPECT_EQ(8, completed.load()); EXPECT_TRUE(final_task_done.load()); } TEST(ThreadPoolTest, ShutdownWithActiveDynamicThreads) { ThreadPool pool(2, 8); std::atomic started(0); std::mutex block_mutex; std::condition_variable block_cv; bool release = false; // Start tasks on dynamic threads that block until released for (int i = 0; i < 6; i++) { pool.enqueue([&]() { started++; std::unique_lock lock(block_mutex); block_cv.wait(lock, [&] { return release; }); }); } // Wait for tasks to start std::this_thread::sleep_for(std::chrono::milliseconds(200)); EXPECT_GE(started.load(), 2); // Release all blocked threads, then shutdown { std::unique_lock lock(block_mutex); release = true; } block_cv.notify_all(); pool.shutdown(); } TEST(ThreadPoolTest, MaxQueuedRequests) { // base=2, max=2 (fixed), mqr=3 ThreadPool pool(2, 2, 3); std::mutex block_mutex; std::condition_variable block_cv; bool release = false; // Block both threads for (int i = 0; i < 2; i++) { EXPECT_TRUE(pool.enqueue([&]() { std::unique_lock lock(block_mutex); block_cv.wait(lock, [&] { return release; }); })); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Fill the queue up to max_queued_requests EXPECT_TRUE(pool.enqueue([]() {})); EXPECT_TRUE(pool.enqueue([]() {})); EXPECT_TRUE(pool.enqueue([]() {})); // This should fail - queue is full EXPECT_FALSE(pool.enqueue([]() {})); // Release blocked threads { std::unique_lock lock(block_mutex); release = true; } block_cv.notify_all(); pool.shutdown(); } #ifndef CPPHTTPLIB_NO_EXCEPTIONS TEST(ThreadPoolTest, InvalidMaxThreadsThrows) { // max_n < n should throw EXPECT_THROW(ThreadPool(8, 4), std::invalid_argument); } #endif TEST(ThreadPoolTest, EnqueueAfterShutdownReturnsFalse) { ThreadPool pool(2); pool.shutdown(); EXPECT_FALSE(pool.enqueue([]() {})); } TEST(ThreadPoolTest, ConcurrentEnqueue) { ThreadPool pool(4, 16); std::atomic count(0); const int num_producers = 4; const int tasks_per_producer = 100; std::vector producers; for (int p = 0; p < num_producers; p++) { producers.emplace_back([&]() { for (int i = 0; i < tasks_per_producer; i++) { pool.enqueue([&count]() { count++; }); } }); } for (auto &t : producers) { t.join(); } pool.shutdown(); EXPECT_EQ(num_producers * tasks_per_producer, count.load()); }