| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234 |
- //
- // ssecli-stream.cc
- //
- // Copyright (c) 2026 Yuji Hirose. All rights reserved.
- // MIT License
- //
- // SSE (Server-Sent Events) client example using Streaming API
- // with automatic reconnection support (similar to JavaScript's EventSource)
- //
- #include <httplib.h>
- #include <chrono>
- #include <iostream>
- #include <string>
- #include <thread>
- //------------------------------------------------------------------------------
- // SSE Event Parser
- //------------------------------------------------------------------------------
- // Parses SSE events from the stream according to the SSE specification.
- // SSE format:
- // event: <event-type> (optional, defaults to "message")
- // data: <payload> (can have multiple lines)
- // id: <event-id> (optional, used for reconnection)
- // retry: <milliseconds> (optional, reconnection interval)
- // <blank line> (signals end of event)
- //
- struct SSEEvent {
- std::string event = "message"; // Event type (default: "message")
- std::string data; // Event payload
- std::string id; // Event ID for Last-Event-ID header
- void clear() {
- event = "message";
- data.clear();
- id.clear();
- }
- };
- // Parse a single SSE field line (e.g., "data: hello")
- // Returns true if this line ends an event (blank line)
- bool parse_sse_line(const std::string &line, SSEEvent &event, int &retry_ms) {
- // Blank line signals end of event
- if (line.empty() || line == "\r") { return true; }
- // Find the colon separator
- auto colon_pos = line.find(':');
- if (colon_pos == std::string::npos) {
- // Line with no colon is treated as field name with empty value
- return false;
- }
- std::string field = line.substr(0, colon_pos);
- std::string value;
- // Value starts after colon, skip optional single space
- if (colon_pos + 1 < line.size()) {
- size_t value_start = colon_pos + 1;
- if (line[value_start] == ' ') { value_start++; }
- value = line.substr(value_start);
- // Remove trailing \r if present
- if (!value.empty() && value.back() == '\r') { value.pop_back(); }
- }
- // Handle known fields
- if (field == "event") {
- event.event = value;
- } else if (field == "data") {
- // Multiple data lines are concatenated with newlines
- if (!event.data.empty()) { event.data += "\n"; }
- event.data += value;
- } else if (field == "id") {
- // Empty id is valid (clears the last event ID)
- event.id = value;
- } else if (field == "retry") {
- // Parse retry interval in milliseconds
- try {
- retry_ms = std::stoi(value);
- } catch (...) {
- // Invalid retry value, ignore
- }
- }
- // Unknown fields are ignored per SSE spec
- return false;
- }
- //------------------------------------------------------------------------------
- // Main - SSE Client with Auto-Reconnection
- //------------------------------------------------------------------------------
- int main(void) {
- // Configuration
- const std::string host = "http://localhost:1234";
- const std::string path = "/event1";
- httplib::Client cli(host);
- // State for reconnection (persists across connections)
- std::string last_event_id; // Sent as Last-Event-ID header on reconnect
- int retry_ms = 3000; // Reconnection delay (server can override via retry:)
- int connection_count = 0;
- std::cout << "SSE Client starting...\n";
- std::cout << "Target: " << host << path << "\n";
- std::cout << "Press Ctrl+C to exit\n\n";
- //----------------------------------------------------------------------------
- // Main reconnection loop
- // This mimics JavaScript's EventSource behavior:
- // - Automatically reconnects on connection failure
- // - Sends Last-Event-ID header to resume from last received event
- // - Respects server's retry interval
- //----------------------------------------------------------------------------
- while (true) {
- connection_count++;
- std::cout << "[Connection #" << connection_count << "] Connecting...\n";
- // Build headers, including Last-Event-ID if we have one
- httplib::Headers headers;
- if (!last_event_id.empty()) {
- headers.emplace("Last-Event-ID", last_event_id);
- std::cout << "[Connection #" << connection_count
- << "] Resuming from event ID: " << last_event_id << "\n";
- }
- // Open streaming connection
- auto result = httplib::stream::Get(cli, path, headers);
- //--------------------------------------------------------------------------
- // Connection error handling
- //--------------------------------------------------------------------------
- if (!result) {
- std::cerr << "[Connection #" << connection_count
- << "] Failed: " << httplib::to_string(result.error()) << "\n";
- std::cerr << "Reconnecting in " << retry_ms << "ms...\n\n";
- std::this_thread::sleep_for(std::chrono::milliseconds(retry_ms));
- continue;
- }
- if (result.status() != 200) {
- std::cerr << "[Connection #" << connection_count
- << "] HTTP error: " << result.status() << "\n";
- // For certain errors, don't reconnect
- if (result.status() == 204 || // No Content - server wants us to stop
- result.status() == 404 || // Not Found
- result.status() == 401 || // Unauthorized
- result.status() == 403) { // Forbidden
- std::cerr << "Permanent error, not reconnecting.\n";
- return 1;
- }
- std::cerr << "Reconnecting in " << retry_ms << "ms...\n\n";
- std::this_thread::sleep_for(std::chrono::milliseconds(retry_ms));
- continue;
- }
- // Verify Content-Type (optional but recommended)
- auto content_type = result.get_header_value("Content-Type");
- if (content_type.find("text/event-stream") == std::string::npos) {
- std::cerr << "[Warning] Content-Type is not text/event-stream: "
- << content_type << "\n";
- }
- std::cout << "[Connection #" << connection_count << "] Connected!\n\n";
- //--------------------------------------------------------------------------
- // Event receiving loop
- // Reads chunks from the stream and parses SSE events
- //--------------------------------------------------------------------------
- std::string buffer;
- SSEEvent current_event;
- int event_count = 0;
- // Read data from stream using httplib::stream API
- while (result.next()) {
- buffer.append(result.data(), result.size());
- // Process complete lines in the buffer
- size_t line_start = 0;
- size_t newline_pos;
- while ((newline_pos = buffer.find('\n', line_start)) !=
- std::string::npos) {
- std::string line = buffer.substr(line_start, newline_pos - line_start);
- line_start = newline_pos + 1;
- // Parse the line and check if event is complete
- bool event_complete = parse_sse_line(line, current_event, retry_ms);
- if (event_complete && !current_event.data.empty()) {
- // Event received - process it
- event_count++;
- std::cout << "--- Event #" << event_count << " ---\n";
- std::cout << "Type: " << current_event.event << "\n";
- std::cout << "Data: " << current_event.data << "\n";
- if (!current_event.id.empty()) {
- std::cout << "ID: " << current_event.id << "\n";
- }
- std::cout << "\n";
- // Update last_event_id for reconnection
- // Note: Empty id clears the last event ID per SSE spec
- if (!current_event.id.empty()) { last_event_id = current_event.id; }
- current_event.clear();
- }
- }
- // Keep unprocessed data in buffer
- buffer.erase(0, line_start);
- }
- //--------------------------------------------------------------------------
- // Connection ended - check why
- //--------------------------------------------------------------------------
- if (result.read_error() != httplib::Error::Success) {
- std::cerr << "\n[Connection #" << connection_count
- << "] Error: " << httplib::to_string(result.read_error())
- << "\n";
- } else {
- std::cout << "\n[Connection #" << connection_count
- << "] Stream ended normally\n";
- }
- std::cout << "Received " << event_count << " events in this connection\n";
- std::cout << "Reconnecting in " << retry_ms << "ms...\n\n";
- std::this_thread::sleep_for(std::chrono::milliseconds(retry_ms));
- }
- return 0;
- }
|