test_thread_pool.cc 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. // ThreadPool unit tests
  2. // Set a short idle timeout for faster shrink tests
  3. #define CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT 1
  4. #include <httplib.h>
  5. #include <gtest/gtest.h>
  6. #include <atomic>
  7. #include <chrono>
  8. #include <thread>
  9. #include <vector>
  10. using namespace httplib;
  11. TEST(ThreadPoolTest, BasicTaskExecution) {
  12. ThreadPool pool(4);
  13. std::atomic<int> count(0);
  14. for (int i = 0; i < 10; i++) {
  15. pool.enqueue([&count]() { count++; });
  16. }
  17. pool.shutdown();
  18. EXPECT_EQ(10, count.load());
  19. }
  20. TEST(ThreadPoolTest, FixedPoolWhenMaxEqualsBase) {
  21. // max_n == 0 means max = base (fixed pool behavior)
  22. ThreadPool pool(4);
  23. std::atomic<int> count(0);
  24. for (int i = 0; i < 100; i++) {
  25. pool.enqueue([&count]() { count++; });
  26. }
  27. pool.shutdown();
  28. EXPECT_EQ(100, count.load());
  29. }
  30. TEST(ThreadPoolTest, DynamicScaleUp) {
  31. // base=2, max=8: block 2 base threads, then enqueue more tasks
  32. ThreadPool pool(2, 8);
  33. std::atomic<int> active(0);
  34. std::atomic<int> max_active(0);
  35. std::atomic<int> completed(0);
  36. std::mutex barrier_mutex;
  37. std::condition_variable barrier_cv;
  38. bool release = false;
  39. // Occupy all base threads with blocking tasks
  40. for (int i = 0; i < 2; i++) {
  41. pool.enqueue([&]() {
  42. active++;
  43. {
  44. std::unique_lock<std::mutex> lock(barrier_mutex);
  45. barrier_cv.wait(lock, [&] { return release; });
  46. }
  47. active--;
  48. completed++;
  49. });
  50. }
  51. // Wait for base threads to be occupied
  52. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  53. // These should trigger dynamic thread creation
  54. for (int i = 0; i < 4; i++) {
  55. pool.enqueue([&]() {
  56. int cur = ++active;
  57. // Track peak active count
  58. int prev = max_active.load();
  59. while (cur > prev && !max_active.compare_exchange_weak(prev, cur)) {}
  60. std::this_thread::sleep_for(std::chrono::milliseconds(50));
  61. active--;
  62. completed++;
  63. });
  64. }
  65. // Wait for dynamic tasks to complete
  66. std::this_thread::sleep_for(std::chrono::milliseconds(500));
  67. // Release the blocking tasks
  68. {
  69. std::unique_lock<std::mutex> lock(barrier_mutex);
  70. release = true;
  71. }
  72. barrier_cv.notify_all();
  73. pool.shutdown();
  74. EXPECT_EQ(6, completed.load());
  75. // More than 2 threads were active simultaneously
  76. EXPECT_GT(max_active.load(), 2);
  77. }
  78. TEST(ThreadPoolTest, DynamicShrinkAfterIdle) {
  79. // CPPHTTPLIB_THREAD_POOL_IDLE_TIMEOUT is set to 1 second
  80. ThreadPool pool(2, 8);
  81. std::atomic<int> completed(0);
  82. // Enqueue tasks that require dynamic threads
  83. for (int i = 0; i < 8; i++) {
  84. pool.enqueue([&]() {
  85. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  86. completed++;
  87. });
  88. }
  89. // Wait for all tasks to complete + idle timeout + margin
  90. std::this_thread::sleep_for(std::chrono::milliseconds(2500));
  91. // Now enqueue a simple task to verify the pool still works
  92. // (base threads are still alive)
  93. std::atomic<bool> final_task_done(false);
  94. pool.enqueue([&]() { final_task_done = true; });
  95. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  96. pool.shutdown();
  97. EXPECT_EQ(8, completed.load());
  98. EXPECT_TRUE(final_task_done.load());
  99. }
  100. TEST(ThreadPoolTest, ShutdownWithActiveDynamicThreads) {
  101. ThreadPool pool(2, 8);
  102. std::atomic<int> started(0);
  103. std::mutex block_mutex;
  104. std::condition_variable block_cv;
  105. bool release = false;
  106. // Start tasks on dynamic threads that block until released
  107. for (int i = 0; i < 6; i++) {
  108. pool.enqueue([&]() {
  109. started++;
  110. std::unique_lock<std::mutex> lock(block_mutex);
  111. block_cv.wait(lock, [&] { return release; });
  112. });
  113. }
  114. // Wait for tasks to start
  115. std::this_thread::sleep_for(std::chrono::milliseconds(200));
  116. EXPECT_GE(started.load(), 2);
  117. // Release all blocked threads, then shutdown
  118. {
  119. std::unique_lock<std::mutex> lock(block_mutex);
  120. release = true;
  121. }
  122. block_cv.notify_all();
  123. pool.shutdown();
  124. }
  125. TEST(ThreadPoolTest, MaxQueuedRequests) {
  126. // base=2, max=2 (fixed), mqr=3
  127. ThreadPool pool(2, 2, 3);
  128. std::mutex block_mutex;
  129. std::condition_variable block_cv;
  130. bool release = false;
  131. // Block both threads
  132. for (int i = 0; i < 2; i++) {
  133. EXPECT_TRUE(pool.enqueue([&]() {
  134. std::unique_lock<std::mutex> lock(block_mutex);
  135. block_cv.wait(lock, [&] { return release; });
  136. }));
  137. }
  138. std::this_thread::sleep_for(std::chrono::milliseconds(100));
  139. // Fill the queue up to max_queued_requests
  140. EXPECT_TRUE(pool.enqueue([]() {}));
  141. EXPECT_TRUE(pool.enqueue([]() {}));
  142. EXPECT_TRUE(pool.enqueue([]() {}));
  143. // This should fail - queue is full
  144. EXPECT_FALSE(pool.enqueue([]() {}));
  145. // Release blocked threads
  146. {
  147. std::unique_lock<std::mutex> lock(block_mutex);
  148. release = true;
  149. }
  150. block_cv.notify_all();
  151. pool.shutdown();
  152. }
  153. #ifndef CPPHTTPLIB_NO_EXCEPTIONS
  154. TEST(ThreadPoolTest, InvalidMaxThreadsThrows) {
  155. // max_n < n should throw
  156. EXPECT_THROW(ThreadPool(8, 4), std::invalid_argument);
  157. }
  158. #endif
  159. TEST(ThreadPoolTest, EnqueueAfterShutdownReturnsFalse) {
  160. ThreadPool pool(2);
  161. pool.shutdown();
  162. EXPECT_FALSE(pool.enqueue([]() {}));
  163. }
  164. TEST(ThreadPoolTest, ConcurrentEnqueue) {
  165. ThreadPool pool(4, 16);
  166. std::atomic<int> count(0);
  167. const int num_producers = 4;
  168. const int tasks_per_producer = 100;
  169. std::vector<std::thread> producers;
  170. for (int p = 0; p < num_producers; p++) {
  171. producers.emplace_back([&]() {
  172. for (int i = 0; i < tasks_per_producer; i++) {
  173. pool.enqueue([&count]() { count++; });
  174. }
  175. });
  176. }
  177. for (auto &t : producers) {
  178. t.join();
  179. }
  180. pool.shutdown();
  181. EXPECT_EQ(num_producers * tasks_per_producer, count.load());
  182. }