| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- // ThreadPool unit tests
- // Set a short idle timeout for faster shrink tests
- #define CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT 1
- #include <httplib.h>
- #include <gtest/gtest.h>
- #include <atomic>
- #include <chrono>
- #include <thread>
- #include <vector>
- using namespace httplib;
- TEST(ThreadPoolTest, BasicTaskExecution) {
- ThreadPool pool(4);
- std::atomic<int> count(0);
- for (int i = 0; i < 10; i++) {
- pool.enqueue([&count]() { count++; });
- }
- pool.shutdown();
- EXPECT_EQ(10, count.load());
- }
- TEST(ThreadPoolTest, FixedPoolWhenMaxEqualsBase) {
- // max_n == 0 means max = base (fixed pool behavior)
- ThreadPool pool(4);
- std::atomic<int> count(0);
- for (int i = 0; i < 100; i++) {
- pool.enqueue([&count]() { count++; });
- }
- pool.shutdown();
- EXPECT_EQ(100, count.load());
- }
- TEST(ThreadPoolTest, DynamicScaleUp) {
- // base=2, max=8: block 2 base threads, then enqueue more tasks
- ThreadPool pool(2, 8);
- std::atomic<int> active(0);
- std::atomic<int> max_active(0);
- std::atomic<int> completed(0);
- std::mutex barrier_mutex;
- std::condition_variable barrier_cv;
- bool release = false;
- // Occupy all base threads with blocking tasks
- for (int i = 0; i < 2; i++) {
- pool.enqueue([&]() {
- active++;
- {
- std::unique_lock<std::mutex> lock(barrier_mutex);
- barrier_cv.wait(lock, [&] { return release; });
- }
- active--;
- completed++;
- });
- }
- // Wait for base threads to be occupied
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- // These should trigger dynamic thread creation
- for (int i = 0; i < 4; i++) {
- pool.enqueue([&]() {
- int cur = ++active;
- // Track peak active count
- int prev = max_active.load();
- while (cur > prev && !max_active.compare_exchange_weak(prev, cur)) {}
- std::this_thread::sleep_for(std::chrono::milliseconds(50));
- active--;
- completed++;
- });
- }
- // Wait for dynamic tasks to complete
- std::this_thread::sleep_for(std::chrono::milliseconds(500));
- // Release the blocking tasks
- {
- std::unique_lock<std::mutex> lock(barrier_mutex);
- release = true;
- }
- barrier_cv.notify_all();
- pool.shutdown();
- EXPECT_EQ(6, completed.load());
- // More than 2 threads were active simultaneously
- EXPECT_GT(max_active.load(), 2);
- }
- TEST(ThreadPoolTest, DynamicShrinkAfterIdle) {
- // CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT is set to 1 second
- ThreadPool pool(2, 8);
- std::atomic<int> completed(0);
- // Enqueue tasks that require dynamic threads
- for (int i = 0; i < 8; i++) {
- pool.enqueue([&]() {
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- completed++;
- });
- }
- // Wait for all tasks to complete + idle timeout + margin
- std::this_thread::sleep_for(std::chrono::milliseconds(2500));
- // Now enqueue a simple task to verify the pool still works
- // (base threads are still alive)
- std::atomic<bool> final_task_done(false);
- pool.enqueue([&]() { final_task_done = true; });
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- pool.shutdown();
- EXPECT_EQ(8, completed.load());
- EXPECT_TRUE(final_task_done.load());
- }
- TEST(ThreadPoolTest, ShutdownWithActiveDynamicThreads) {
- ThreadPool pool(2, 8);
- std::atomic<int> started(0);
- std::mutex block_mutex;
- std::condition_variable block_cv;
- bool release = false;
- // Start tasks on dynamic threads that block until released
- for (int i = 0; i < 6; i++) {
- pool.enqueue([&]() {
- started++;
- std::unique_lock<std::mutex> lock(block_mutex);
- block_cv.wait(lock, [&] { return release; });
- });
- }
- // Wait for tasks to start
- std::this_thread::sleep_for(std::chrono::milliseconds(200));
- EXPECT_GE(started.load(), 2);
- // Release all blocked threads, then shutdown
- {
- std::unique_lock<std::mutex> lock(block_mutex);
- release = true;
- }
- block_cv.notify_all();
- pool.shutdown();
- }
- TEST(ThreadPoolTest, MaxQueuedRequests) {
- // base=2, max=2 (fixed), mqr=3
- ThreadPool pool(2, 2, 3);
- std::mutex block_mutex;
- std::condition_variable block_cv;
- bool release = false;
- // Block both threads
- for (int i = 0; i < 2; i++) {
- EXPECT_TRUE(pool.enqueue([&]() {
- std::unique_lock<std::mutex> lock(block_mutex);
- block_cv.wait(lock, [&] { return release; });
- }));
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- // Fill the queue up to max_queued_requests
- EXPECT_TRUE(pool.enqueue([]() {}));
- EXPECT_TRUE(pool.enqueue([]() {}));
- EXPECT_TRUE(pool.enqueue([]() {}));
- // This should fail - queue is full
- EXPECT_FALSE(pool.enqueue([]() {}));
- // Release blocked threads
- {
- std::unique_lock<std::mutex> lock(block_mutex);
- release = true;
- }
- block_cv.notify_all();
- pool.shutdown();
- }
- #ifndef CPPHTTPLIB_NO_EXCEPTIONS
- TEST(ThreadPoolTest, InvalidMaxThreadsThrows) {
- // max_n < n should throw
- EXPECT_THROW(ThreadPool(8, 4), std::invalid_argument);
- }
- #endif
- TEST(ThreadPoolTest, EnqueueAfterShutdownReturnsFalse) {
- ThreadPool pool(2);
- pool.shutdown();
- EXPECT_FALSE(pool.enqueue([]() {}));
- }
- TEST(ThreadPoolTest, ConcurrentEnqueue) {
- ThreadPool pool(4, 16);
- std::atomic<int> count(0);
- const int num_producers = 4;
- const int tasks_per_producer = 100;
- std::vector<std::thread> producers;
- for (int p = 0; p < num_producers; p++) {
- producers.emplace_back([&]() {
- for (int i = 0; i < tasks_per_producer; i++) {
- pool.enqueue([&count]() { count++; });
- }
- });
- }
- for (auto &t : producers) {
- t.join();
- }
- pool.shutdown();
- EXPECT_EQ(num_producers * tasks_per_producer, count.load());
- }
|