ssecli-stream.cc 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. //
  2. // ssecli-stream.cc
  3. //
  4. // Copyright (c) 2025 Yuji Hirose. All rights reserved.
  5. // MIT License
  6. //
  7. // SSE (Server-Sent Events) client example using Streaming API
  8. // with automatic reconnection support (similar to JavaScript's EventSource)
  9. //
  10. #include <httplib.h>
  11. #include <chrono>
  12. #include <iostream>
  13. #include <string>
  14. #include <thread>
  15. //------------------------------------------------------------------------------
  16. // SSE Event Parser
  17. //------------------------------------------------------------------------------
  18. // Parses SSE events from the stream according to the SSE specification.
  19. // SSE format:
  20. // event: <event-type> (optional, defaults to "message")
  21. // data: <payload> (can have multiple lines)
  22. // id: <event-id> (optional, used for reconnection)
  23. // retry: <milliseconds> (optional, reconnection interval)
  24. // <blank line> (signals end of event)
  25. //
  26. struct SSEEvent {
  27. std::string event = "message"; // Event type (default: "message")
  28. std::string data; // Event payload
  29. std::string id; // Event ID for Last-Event-ID header
  30. void clear() {
  31. event = "message";
  32. data.clear();
  33. id.clear();
  34. }
  35. };
  36. // Parse a single SSE field line (e.g., "data: hello")
  37. // Returns true if this line ends an event (blank line)
  38. bool parse_sse_line(const std::string &line, SSEEvent &event, int &retry_ms) {
  39. // Blank line signals end of event
  40. if (line.empty() || line == "\r") { return true; }
  41. // Find the colon separator
  42. auto colon_pos = line.find(':');
  43. if (colon_pos == std::string::npos) {
  44. // Line with no colon is treated as field name with empty value
  45. return false;
  46. }
  47. std::string field = line.substr(0, colon_pos);
  48. std::string value;
  49. // Value starts after colon, skip optional single space
  50. if (colon_pos + 1 < line.size()) {
  51. size_t value_start = colon_pos + 1;
  52. if (line[value_start] == ' ') { value_start++; }
  53. value = line.substr(value_start);
  54. // Remove trailing \r if present
  55. if (!value.empty() && value.back() == '\r') { value.pop_back(); }
  56. }
  57. // Handle known fields
  58. if (field == "event") {
  59. event.event = value;
  60. } else if (field == "data") {
  61. // Multiple data lines are concatenated with newlines
  62. if (!event.data.empty()) { event.data += "\n"; }
  63. event.data += value;
  64. } else if (field == "id") {
  65. // Empty id is valid (clears the last event ID)
  66. event.id = value;
  67. } else if (field == "retry") {
  68. // Parse retry interval in milliseconds
  69. try {
  70. retry_ms = std::stoi(value);
  71. } catch (...) {
  72. // Invalid retry value, ignore
  73. }
  74. }
  75. // Unknown fields are ignored per SSE spec
  76. return false;
  77. }
  78. //------------------------------------------------------------------------------
  79. // Main - SSE Client with Auto-Reconnection
  80. //------------------------------------------------------------------------------
  81. int main(void) {
  82. // Configuration
  83. const std::string host = "http://localhost:1234";
  84. const std::string path = "/event1";
  85. httplib::Client cli(host);
  86. // State for reconnection (persists across connections)
  87. std::string last_event_id; // Sent as Last-Event-ID header on reconnect
  88. int retry_ms = 3000; // Reconnection delay (server can override via retry:)
  89. int connection_count = 0;
  90. std::cout << "SSE Client starting...\n";
  91. std::cout << "Target: " << host << path << "\n";
  92. std::cout << "Press Ctrl+C to exit\n\n";
  93. //----------------------------------------------------------------------------
  94. // Main reconnection loop
  95. // This mimics JavaScript's EventSource behavior:
  96. // - Automatically reconnects on connection failure
  97. // - Sends Last-Event-ID header to resume from last received event
  98. // - Respects server's retry interval
  99. //----------------------------------------------------------------------------
  100. while (true) {
  101. connection_count++;
  102. std::cout << "[Connection #" << connection_count << "] Connecting...\n";
  103. // Build headers, including Last-Event-ID if we have one
  104. httplib::Headers headers;
  105. if (!last_event_id.empty()) {
  106. headers.emplace("Last-Event-ID", last_event_id);
  107. std::cout << "[Connection #" << connection_count
  108. << "] Resuming from event ID: " << last_event_id << "\n";
  109. }
  110. // Open streaming connection
  111. auto result = httplib::stream::Get(cli, path, headers);
  112. //--------------------------------------------------------------------------
  113. // Connection error handling
  114. //--------------------------------------------------------------------------
  115. if (!result) {
  116. std::cerr << "[Connection #" << connection_count
  117. << "] Failed: " << httplib::to_string(result.error()) << "\n";
  118. std::cerr << "Reconnecting in " << retry_ms << "ms...\n\n";
  119. std::this_thread::sleep_for(std::chrono::milliseconds(retry_ms));
  120. continue;
  121. }
  122. if (result.status() != 200) {
  123. std::cerr << "[Connection #" << connection_count
  124. << "] HTTP error: " << result.status() << "\n";
  125. // For certain errors, don't reconnect
  126. if (result.status() == 204 || // No Content - server wants us to stop
  127. result.status() == 404 || // Not Found
  128. result.status() == 401 || // Unauthorized
  129. result.status() == 403) { // Forbidden
  130. std::cerr << "Permanent error, not reconnecting.\n";
  131. return 1;
  132. }
  133. std::cerr << "Reconnecting in " << retry_ms << "ms...\n\n";
  134. std::this_thread::sleep_for(std::chrono::milliseconds(retry_ms));
  135. continue;
  136. }
  137. // Verify Content-Type (optional but recommended)
  138. auto content_type = result.get_header_value("Content-Type");
  139. if (content_type.find("text/event-stream") == std::string::npos) {
  140. std::cerr << "[Warning] Content-Type is not text/event-stream: "
  141. << content_type << "\n";
  142. }
  143. std::cout << "[Connection #" << connection_count << "] Connected!\n\n";
  144. //--------------------------------------------------------------------------
  145. // Event receiving loop
  146. // Reads chunks from the stream and parses SSE events
  147. //--------------------------------------------------------------------------
  148. std::string buffer;
  149. SSEEvent current_event;
  150. int event_count = 0;
  151. // Read data from stream using httplib::stream API
  152. while (result.next()) {
  153. buffer.append(result.data(), result.size());
  154. // Process complete lines in the buffer
  155. size_t line_start = 0;
  156. size_t newline_pos;
  157. while ((newline_pos = buffer.find('\n', line_start)) !=
  158. std::string::npos) {
  159. std::string line = buffer.substr(line_start, newline_pos - line_start);
  160. line_start = newline_pos + 1;
  161. // Parse the line and check if event is complete
  162. bool event_complete = parse_sse_line(line, current_event, retry_ms);
  163. if (event_complete && !current_event.data.empty()) {
  164. // Event received - process it
  165. event_count++;
  166. std::cout << "--- Event #" << event_count << " ---\n";
  167. std::cout << "Type: " << current_event.event << "\n";
  168. std::cout << "Data: " << current_event.data << "\n";
  169. if (!current_event.id.empty()) {
  170. std::cout << "ID: " << current_event.id << "\n";
  171. }
  172. std::cout << "\n";
  173. // Update last_event_id for reconnection
  174. // Note: Empty id clears the last event ID per SSE spec
  175. if (!current_event.id.empty()) { last_event_id = current_event.id; }
  176. current_event.clear();
  177. }
  178. }
  179. // Keep unprocessed data in buffer
  180. buffer.erase(0, line_start);
  181. }
  182. //--------------------------------------------------------------------------
  183. // Connection ended - check why
  184. //--------------------------------------------------------------------------
  185. if (result.read_error() != httplib::Error::Success) {
  186. std::cerr << "\n[Connection #" << connection_count
  187. << "] Error: " << httplib::to_string(result.read_error())
  188. << "\n";
  189. } else {
  190. std::cout << "\n[Connection #" << connection_count
  191. << "] Stream ended normally\n";
  192. }
  193. std::cout << "Received " << event_count << " events in this connection\n";
  194. std::cout << "Reconnecting in " << retry_ms << "ms...\n\n";
  195. std::this_thread::sleep_for(std::chrono::milliseconds(retry_ms));
  196. }
  197. return 0;
  198. }