|
@@ -2975,98 +2975,30 @@ namespace stream {
|
|
|
|
|
|
|
|
class Result {
|
|
class Result {
|
|
|
public:
|
|
public:
|
|
|
- Result() : chunk_size_(8192) {}
|
|
|
|
|
-
|
|
|
|
|
- explicit Result(ClientImpl::StreamHandle &&handle, size_t chunk_size = 8192)
|
|
|
|
|
- : handle_(std::move(handle)), chunk_size_(chunk_size) {}
|
|
|
|
|
-
|
|
|
|
|
- Result(Result &&other) noexcept
|
|
|
|
|
- : handle_(std::move(other.handle_)), buffer_(std::move(other.buffer_)),
|
|
|
|
|
- current_size_(other.current_size_), chunk_size_(other.chunk_size_),
|
|
|
|
|
- finished_(other.finished_) {
|
|
|
|
|
- other.current_size_ = 0;
|
|
|
|
|
- other.finished_ = true;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- Result &operator=(Result &&other) noexcept {
|
|
|
|
|
- if (this != &other) {
|
|
|
|
|
- handle_ = std::move(other.handle_);
|
|
|
|
|
- buffer_ = std::move(other.buffer_);
|
|
|
|
|
- current_size_ = other.current_size_;
|
|
|
|
|
- chunk_size_ = other.chunk_size_;
|
|
|
|
|
- finished_ = other.finished_;
|
|
|
|
|
- other.current_size_ = 0;
|
|
|
|
|
- other.finished_ = true;
|
|
|
|
|
- }
|
|
|
|
|
- return *this;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+ Result();
|
|
|
|
|
+ explicit Result(ClientImpl::StreamHandle &&handle, size_t chunk_size = 8192);
|
|
|
|
|
+ Result(Result &&other) noexcept;
|
|
|
|
|
+ Result &operator=(Result &&other) noexcept;
|
|
|
Result(const Result &) = delete;
|
|
Result(const Result &) = delete;
|
|
|
Result &operator=(const Result &) = delete;
|
|
Result &operator=(const Result &) = delete;
|
|
|
|
|
|
|
|
- // Check if the result is valid (connection succeeded and response received)
|
|
|
|
|
- bool is_valid() const { return handle_.is_valid(); }
|
|
|
|
|
- explicit operator bool() const { return is_valid(); }
|
|
|
|
|
-
|
|
|
|
|
- // Response status code
|
|
|
|
|
- int status() const {
|
|
|
|
|
- return handle_.response ? handle_.response->status : -1;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Response headers
|
|
|
|
|
- const Headers &headers() const {
|
|
|
|
|
- static const Headers empty_headers;
|
|
|
|
|
- return handle_.response ? handle_.response->headers : empty_headers;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
|
|
+ // Response info
|
|
|
|
|
+ bool is_valid() const;
|
|
|
|
|
+ explicit operator bool() const;
|
|
|
|
|
+ int status() const;
|
|
|
|
|
+ const Headers &headers() const;
|
|
|
std::string get_header_value(const std::string &key,
|
|
std::string get_header_value(const std::string &key,
|
|
|
- const char *def = "") const {
|
|
|
|
|
- return handle_.response ? handle_.response->get_header_value(key, def)
|
|
|
|
|
- : def;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- bool has_header(const std::string &key) const {
|
|
|
|
|
- return handle_.response ? handle_.response->has_header(key) : false;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Error information
|
|
|
|
|
- Error error() const { return handle_.error; }
|
|
|
|
|
- Error read_error() const { return handle_.get_read_error(); }
|
|
|
|
|
- bool has_read_error() const { return handle_.has_read_error(); }
|
|
|
|
|
-
|
|
|
|
|
- // Streaming iteration API
|
|
|
|
|
- // Call next() to read the next chunk, then access data via data()/size()
|
|
|
|
|
- // Returns true if data was read, false when stream is exhausted
|
|
|
|
|
- bool next() {
|
|
|
|
|
- if (!handle_.is_valid() || finished_) { return false; }
|
|
|
|
|
-
|
|
|
|
|
- if (buffer_.size() < chunk_size_) { buffer_.resize(chunk_size_); }
|
|
|
|
|
-
|
|
|
|
|
- ssize_t n = handle_.read(&buffer_[0], chunk_size_);
|
|
|
|
|
- if (n > 0) {
|
|
|
|
|
- current_size_ = static_cast<size_t>(n);
|
|
|
|
|
- return true;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- current_size_ = 0;
|
|
|
|
|
- finished_ = true;
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Pointer to current chunk data (valid after next() returns true)
|
|
|
|
|
- const char *data() const { return buffer_.data(); }
|
|
|
|
|
-
|
|
|
|
|
- // Size of current chunk (valid after next() returns true)
|
|
|
|
|
- size_t size() const { return current_size_; }
|
|
|
|
|
|
|
+ const char *def = "") const;
|
|
|
|
|
+ bool has_header(const std::string &key) const;
|
|
|
|
|
+ Error error() const;
|
|
|
|
|
+ Error read_error() const;
|
|
|
|
|
+ bool has_read_error() const;
|
|
|
|
|
|
|
|
- // Convenience method: read all remaining data into a string
|
|
|
|
|
- std::string read_all() {
|
|
|
|
|
- std::string result;
|
|
|
|
|
- while (next()) {
|
|
|
|
|
- result.append(data(), size());
|
|
|
|
|
- }
|
|
|
|
|
- return result;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Stream reading
|
|
|
|
|
+ bool next();
|
|
|
|
|
+ const char *data() const;
|
|
|
|
|
+ size_t size() const;
|
|
|
|
|
+ std::string read_all();
|
|
|
|
|
|
|
|
private:
|
|
private:
|
|
|
ClientImpl::StreamHandle handle_;
|
|
ClientImpl::StreamHandle handle_;
|
|
@@ -3331,13 +3263,8 @@ struct SSEMessage {
|
|
|
std::string data; // Event payload
|
|
std::string data; // Event payload
|
|
|
std::string id; // Event ID for Last-Event-ID header
|
|
std::string id; // Event ID for Last-Event-ID header
|
|
|
|
|
|
|
|
- SSEMessage() : event("message") {}
|
|
|
|
|
-
|
|
|
|
|
- void clear() {
|
|
|
|
|
- event = "message";
|
|
|
|
|
- data.clear();
|
|
|
|
|
- id.clear();
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ SSEMessage();
|
|
|
|
|
+ void clear();
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
class SSEClient {
|
|
class SSEClient {
|
|
@@ -3346,288 +3273,411 @@ public:
|
|
|
using ErrorHandler = std::function<void(Error)>;
|
|
using ErrorHandler = std::function<void(Error)>;
|
|
|
using OpenHandler = std::function<void()>;
|
|
using OpenHandler = std::function<void()>;
|
|
|
|
|
|
|
|
- SSEClient(Client &client, const std::string &path)
|
|
|
|
|
- : client_(client), path_(path) {}
|
|
|
|
|
-
|
|
|
|
|
- SSEClient(Client &client, const std::string &path, const Headers &headers)
|
|
|
|
|
- : client_(client), path_(path), headers_(headers) {}
|
|
|
|
|
-
|
|
|
|
|
- ~SSEClient() { stop(); }
|
|
|
|
|
|
|
+ SSEClient(Client &client, const std::string &path);
|
|
|
|
|
+ SSEClient(Client &client, const std::string &path, const Headers &headers);
|
|
|
|
|
+ ~SSEClient();
|
|
|
|
|
|
|
|
SSEClient(const SSEClient &) = delete;
|
|
SSEClient(const SSEClient &) = delete;
|
|
|
SSEClient &operator=(const SSEClient &) = delete;
|
|
SSEClient &operator=(const SSEClient &) = delete;
|
|
|
|
|
|
|
|
// Event handlers
|
|
// Event handlers
|
|
|
- SSEClient &on_message(MessageHandler handler) {
|
|
|
|
|
- on_message_ = std::move(handler);
|
|
|
|
|
- return *this;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ SSEClient &on_message(MessageHandler handler);
|
|
|
|
|
+ SSEClient &on_event(const std::string &type, MessageHandler handler);
|
|
|
|
|
+ SSEClient &on_open(OpenHandler handler);
|
|
|
|
|
+ SSEClient &on_error(ErrorHandler handler);
|
|
|
|
|
+ SSEClient &set_reconnect_interval(int ms);
|
|
|
|
|
+ SSEClient &set_max_reconnect_attempts(int n);
|
|
|
|
|
|
|
|
- SSEClient &on_event(const std::string &type, MessageHandler handler) {
|
|
|
|
|
- event_handlers_[type] = std::move(handler);
|
|
|
|
|
- return *this;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // State accessors
|
|
|
|
|
+ bool is_connected() const;
|
|
|
|
|
+ const std::string &last_event_id() const;
|
|
|
|
|
|
|
|
- SSEClient &on_open(OpenHandler handler) {
|
|
|
|
|
- on_open_ = std::move(handler);
|
|
|
|
|
- return *this;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Blocking start - runs event loop with auto-reconnect
|
|
|
|
|
+ void start();
|
|
|
|
|
|
|
|
- SSEClient &on_error(ErrorHandler handler) {
|
|
|
|
|
- on_error_ = std::move(handler);
|
|
|
|
|
- return *this;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Non-blocking start - runs in background thread
|
|
|
|
|
+ void start_async();
|
|
|
|
|
|
|
|
- SSEClient &set_reconnect_interval(int ms) {
|
|
|
|
|
- reconnect_interval_ms_ = ms;
|
|
|
|
|
- return *this;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Stop the client (thread-safe)
|
|
|
|
|
+ void stop();
|
|
|
|
|
|
|
|
- SSEClient &set_max_reconnect_attempts(int n) {
|
|
|
|
|
- max_reconnect_attempts_ = n;
|
|
|
|
|
- return *this;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+private:
|
|
|
|
|
+ bool parse_sse_line(const std::string &line, SSEMessage &msg, int &retry_ms);
|
|
|
|
|
+ void run_event_loop();
|
|
|
|
|
+ void dispatch_event(const SSEMessage &msg);
|
|
|
|
|
+ bool should_reconnect(int count) const;
|
|
|
|
|
+ void wait_for_reconnect();
|
|
|
|
|
|
|
|
- // State accessors
|
|
|
|
|
- bool is_connected() const { return connected_.load(); }
|
|
|
|
|
- const std::string &last_event_id() const { return last_event_id_; }
|
|
|
|
|
|
|
+ // Client and path
|
|
|
|
|
+ Client &client_;
|
|
|
|
|
+ std::string path_;
|
|
|
|
|
+ Headers headers_;
|
|
|
|
|
|
|
|
- // Blocking start - runs event loop with auto-reconnect
|
|
|
|
|
- void start() {
|
|
|
|
|
- running_.store(true);
|
|
|
|
|
- run_event_loop();
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Callbacks
|
|
|
|
|
+ MessageHandler on_message_;
|
|
|
|
|
+ std::map<std::string, MessageHandler> event_handlers_;
|
|
|
|
|
+ OpenHandler on_open_;
|
|
|
|
|
+ ErrorHandler on_error_;
|
|
|
|
|
|
|
|
- // Non-blocking start - runs in background thread
|
|
|
|
|
- void start_async() {
|
|
|
|
|
- running_.store(true);
|
|
|
|
|
- async_thread_ = std::thread([this]() { run_event_loop(); });
|
|
|
|
|
|
|
+ // Configuration
|
|
|
|
|
+ int reconnect_interval_ms_ = 3000;
|
|
|
|
|
+ int max_reconnect_attempts_ = 0; // 0 = unlimited
|
|
|
|
|
+
|
|
|
|
|
+ // State
|
|
|
|
|
+ std::atomic<bool> running_{false};
|
|
|
|
|
+ std::atomic<bool> connected_{false};
|
|
|
|
|
+ std::string last_event_id_;
|
|
|
|
|
+
|
|
|
|
|
+ // Async support
|
|
|
|
|
+ std::thread async_thread_;
|
|
|
|
|
+};
|
|
|
|
|
+
|
|
|
|
|
+} // namespace sse
|
|
|
|
|
+
|
|
|
|
|
+// ----------------------------------------------------------------------------
|
|
|
|
|
+
|
|
|
|
|
+/*
|
|
|
|
|
+ * Implementation that will be part of the .cc file if split into .h + .cc.
|
|
|
|
|
+ */
|
|
|
|
|
+
|
|
|
|
|
+namespace stream {
|
|
|
|
|
+
|
|
|
|
|
+// stream::Result implementations
|
|
|
|
|
+inline Result::Result() : chunk_size_(8192) {}
|
|
|
|
|
+
|
|
|
|
|
+inline Result::Result(ClientImpl::StreamHandle &&handle, size_t chunk_size)
|
|
|
|
|
+ : handle_(std::move(handle)), chunk_size_(chunk_size) {}
|
|
|
|
|
+
|
|
|
|
|
+inline Result::Result(Result &&other) noexcept
|
|
|
|
|
+ : handle_(std::move(other.handle_)), buffer_(std::move(other.buffer_)),
|
|
|
|
|
+ current_size_(other.current_size_), chunk_size_(other.chunk_size_),
|
|
|
|
|
+ finished_(other.finished_) {
|
|
|
|
|
+ other.current_size_ = 0;
|
|
|
|
|
+ other.finished_ = true;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+inline Result &Result::operator=(Result &&other) noexcept {
|
|
|
|
|
+ if (this != &other) {
|
|
|
|
|
+ handle_ = std::move(other.handle_);
|
|
|
|
|
+ buffer_ = std::move(other.buffer_);
|
|
|
|
|
+ current_size_ = other.current_size_;
|
|
|
|
|
+ chunk_size_ = other.chunk_size_;
|
|
|
|
|
+ finished_ = other.finished_;
|
|
|
|
|
+ other.current_size_ = 0;
|
|
|
|
|
+ other.finished_ = true;
|
|
|
}
|
|
}
|
|
|
|
|
+ return *this;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Stop the client (thread-safe)
|
|
|
|
|
- void stop() {
|
|
|
|
|
- running_.store(false);
|
|
|
|
|
- client_.stop(); // Cancel any pending operations
|
|
|
|
|
- if (async_thread_.joinable()) { async_thread_.join(); }
|
|
|
|
|
|
|
+inline bool Result::is_valid() const { return handle_.is_valid(); }
|
|
|
|
|
+inline Result::operator bool() const { return is_valid(); }
|
|
|
|
|
+
|
|
|
|
|
+inline int Result::status() const {
|
|
|
|
|
+ return handle_.response ? handle_.response->status : -1;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+inline const Headers &Result::headers() const {
|
|
|
|
|
+ static const Headers empty_headers;
|
|
|
|
|
+ return handle_.response ? handle_.response->headers : empty_headers;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+inline std::string Result::get_header_value(const std::string &key,
|
|
|
|
|
+ const char *def) const {
|
|
|
|
|
+ return handle_.response ? handle_.response->get_header_value(key, def) : def;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+inline bool Result::has_header(const std::string &key) const {
|
|
|
|
|
+ return handle_.response ? handle_.response->has_header(key) : false;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+inline Error Result::error() const { return handle_.error; }
|
|
|
|
|
+inline Error Result::read_error() const { return handle_.get_read_error(); }
|
|
|
|
|
+inline bool Result::has_read_error() const { return handle_.has_read_error(); }
|
|
|
|
|
+
|
|
|
|
|
+inline bool Result::next() {
|
|
|
|
|
+ if (!handle_.is_valid() || finished_) { return false; }
|
|
|
|
|
+
|
|
|
|
|
+ if (buffer_.size() < chunk_size_) { buffer_.resize(chunk_size_); }
|
|
|
|
|
+
|
|
|
|
|
+ ssize_t n = handle_.read(&buffer_[0], chunk_size_);
|
|
|
|
|
+ if (n > 0) {
|
|
|
|
|
+ current_size_ = static_cast<size_t>(n);
|
|
|
|
|
+ return true;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-private:
|
|
|
|
|
- // Parse a single SSE field line
|
|
|
|
|
- // Returns true if this line ends an event (blank line)
|
|
|
|
|
- bool parse_sse_line(const std::string &line, SSEMessage &msg, int &retry_ms) {
|
|
|
|
|
- // Blank line signals end of event
|
|
|
|
|
- if (line.empty() || line == "\r") { return true; }
|
|
|
|
|
-
|
|
|
|
|
- // Lines starting with ':' are comments (ignored)
|
|
|
|
|
- if (!line.empty() && line[0] == ':') { return false; }
|
|
|
|
|
-
|
|
|
|
|
- // Find the colon separator
|
|
|
|
|
- auto colon_pos = line.find(':');
|
|
|
|
|
- if (colon_pos == std::string::npos) {
|
|
|
|
|
- // Line with no colon is treated as field name with empty value
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ current_size_ = 0;
|
|
|
|
|
+ finished_ = true;
|
|
|
|
|
+ return false;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- auto field = line.substr(0, colon_pos);
|
|
|
|
|
- std::string value;
|
|
|
|
|
-
|
|
|
|
|
- // Value starts after colon, skip optional single space
|
|
|
|
|
- if (colon_pos + 1 < line.size()) {
|
|
|
|
|
- auto value_start = colon_pos + 1;
|
|
|
|
|
- if (line[value_start] == ' ') { value_start++; }
|
|
|
|
|
- value = line.substr(value_start);
|
|
|
|
|
- // Remove trailing \r if present
|
|
|
|
|
- if (!value.empty() && value.back() == '\r') { value.pop_back(); }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // Handle known fields
|
|
|
|
|
- if (field == "event") {
|
|
|
|
|
- msg.event = value;
|
|
|
|
|
- } else if (field == "data") {
|
|
|
|
|
- // Multiple data lines are concatenated with newlines
|
|
|
|
|
- if (!msg.data.empty()) { msg.data += "\n"; }
|
|
|
|
|
- msg.data += value;
|
|
|
|
|
- } else if (field == "id") {
|
|
|
|
|
- // Empty id is valid (clears the last event ID)
|
|
|
|
|
- msg.id = value;
|
|
|
|
|
- } else if (field == "retry") {
|
|
|
|
|
- // Parse retry interval in milliseconds
|
|
|
|
|
- {
|
|
|
|
|
- int v = 0;
|
|
|
|
|
- auto res =
|
|
|
|
|
- detail::from_chars(value.data(), value.data() + value.size(), v);
|
|
|
|
|
- if (res.ec == std::errc{}) { retry_ms = v; }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- // Unknown fields are ignored per SSE spec
|
|
|
|
|
|
|
+inline const char *Result::data() const { return buffer_.data(); }
|
|
|
|
|
+inline size_t Result::size() const { return current_size_; }
|
|
|
|
|
|
|
|
- return false;
|
|
|
|
|
|
|
+inline std::string Result::read_all() {
|
|
|
|
|
+ std::string result;
|
|
|
|
|
+ while (next()) {
|
|
|
|
|
+ result.append(data(), size());
|
|
|
}
|
|
}
|
|
|
|
|
+ return result;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Main event loop with auto-reconnect
|
|
|
|
|
- void run_event_loop() {
|
|
|
|
|
- auto reconnect_count = 0;
|
|
|
|
|
|
|
+} // namespace stream
|
|
|
|
|
|
|
|
- while (running_.load()) {
|
|
|
|
|
- // Build headers, including Last-Event-ID if we have one
|
|
|
|
|
- auto request_headers = headers_;
|
|
|
|
|
- if (!last_event_id_.empty()) {
|
|
|
|
|
- request_headers.emplace("Last-Event-ID", last_event_id_);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+namespace sse {
|
|
|
|
|
|
|
|
- // Open streaming connection
|
|
|
|
|
- auto result = stream::Get(client_, path_, request_headers);
|
|
|
|
|
|
|
+// SSEMessage implementations
|
|
|
|
|
+inline SSEMessage::SSEMessage() : event("message") {}
|
|
|
|
|
|
|
|
- // Connection error handling
|
|
|
|
|
- if (!result) {
|
|
|
|
|
- connected_.store(false);
|
|
|
|
|
- if (on_error_) { on_error_(result.error()); }
|
|
|
|
|
|
|
+inline void SSEMessage::clear() {
|
|
|
|
|
+ event = "message";
|
|
|
|
|
+ data.clear();
|
|
|
|
|
+ id.clear();
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- if (!should_reconnect(reconnect_count)) { break; }
|
|
|
|
|
- wait_for_reconnect();
|
|
|
|
|
- reconnect_count++;
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+// SSEClient implementations
|
|
|
|
|
+inline SSEClient::SSEClient(Client &client, const std::string &path)
|
|
|
|
|
+ : client_(client), path_(path) {}
|
|
|
|
|
|
|
|
- if (result.status() != 200) {
|
|
|
|
|
- connected_.store(false);
|
|
|
|
|
- // For certain errors, don't reconnect
|
|
|
|
|
- if (result.status() == 204 || // No Content - server wants us to stop
|
|
|
|
|
- result.status() == 404 || // Not Found
|
|
|
|
|
- result.status() == 401 || // Unauthorized
|
|
|
|
|
- result.status() == 403) { // Forbidden
|
|
|
|
|
- if (on_error_) { on_error_(Error::Connection); }
|
|
|
|
|
- break;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+inline SSEClient::SSEClient(Client &client, const std::string &path,
|
|
|
|
|
+ const Headers &headers)
|
|
|
|
|
+ : client_(client), path_(path), headers_(headers) {}
|
|
|
|
|
|
|
|
- if (on_error_) { on_error_(Error::Connection); }
|
|
|
|
|
|
|
+inline SSEClient::~SSEClient() { stop(); }
|
|
|
|
|
|
|
|
- if (!should_reconnect(reconnect_count)) { break; }
|
|
|
|
|
- wait_for_reconnect();
|
|
|
|
|
- reconnect_count++;
|
|
|
|
|
- continue;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+inline SSEClient &SSEClient::on_message(MessageHandler handler) {
|
|
|
|
|
+ on_message_ = std::move(handler);
|
|
|
|
|
+ return *this;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Connection successful
|
|
|
|
|
- connected_.store(true);
|
|
|
|
|
- reconnect_count = 0;
|
|
|
|
|
- if (on_open_) { on_open_(); }
|
|
|
|
|
|
|
+inline SSEClient &SSEClient::on_event(const std::string &type,
|
|
|
|
|
+ MessageHandler handler) {
|
|
|
|
|
+ event_handlers_[type] = std::move(handler);
|
|
|
|
|
+ return *this;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Event receiving loop
|
|
|
|
|
- std::string buffer;
|
|
|
|
|
- SSEMessage current_msg;
|
|
|
|
|
|
|
+inline SSEClient &SSEClient::on_open(OpenHandler handler) {
|
|
|
|
|
+ on_open_ = std::move(handler);
|
|
|
|
|
+ return *this;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- while (running_.load() && result.next()) {
|
|
|
|
|
- buffer.append(result.data(), result.size());
|
|
|
|
|
|
|
+inline SSEClient &SSEClient::on_error(ErrorHandler handler) {
|
|
|
|
|
+ on_error_ = std::move(handler);
|
|
|
|
|
+ return *this;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Process complete lines in the buffer
|
|
|
|
|
- size_t line_start = 0;
|
|
|
|
|
- size_t newline_pos;
|
|
|
|
|
|
|
+inline SSEClient &SSEClient::set_reconnect_interval(int ms) {
|
|
|
|
|
+ reconnect_interval_ms_ = ms;
|
|
|
|
|
+ return *this;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- while ((newline_pos = buffer.find('\n', line_start)) !=
|
|
|
|
|
- std::string::npos) {
|
|
|
|
|
- auto line = buffer.substr(line_start, newline_pos - line_start);
|
|
|
|
|
- line_start = newline_pos + 1;
|
|
|
|
|
|
|
+inline SSEClient &SSEClient::set_max_reconnect_attempts(int n) {
|
|
|
|
|
+ max_reconnect_attempts_ = n;
|
|
|
|
|
+ return *this;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Parse the line and check if event is complete
|
|
|
|
|
- auto event_complete =
|
|
|
|
|
- parse_sse_line(line, current_msg, reconnect_interval_ms_);
|
|
|
|
|
|
|
+inline bool SSEClient::is_connected() const { return connected_.load(); }
|
|
|
|
|
|
|
|
- if (event_complete && !current_msg.data.empty()) {
|
|
|
|
|
- // Update last_event_id for reconnection
|
|
|
|
|
- if (!current_msg.id.empty()) { last_event_id_ = current_msg.id; }
|
|
|
|
|
|
|
+inline const std::string &SSEClient::last_event_id() const {
|
|
|
|
|
+ return last_event_id_;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Dispatch event to appropriate handler
|
|
|
|
|
- dispatch_event(current_msg);
|
|
|
|
|
|
|
+inline void SSEClient::start() {
|
|
|
|
|
+ running_.store(true);
|
|
|
|
|
+ run_event_loop();
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- current_msg.clear();
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+inline void SSEClient::start_async() {
|
|
|
|
|
+ running_.store(true);
|
|
|
|
|
+ async_thread_ = std::thread([this]() { run_event_loop(); });
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Keep unprocessed data in buffer
|
|
|
|
|
- buffer.erase(0, line_start);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+inline void SSEClient::stop() {
|
|
|
|
|
+ running_.store(false);
|
|
|
|
|
+ client_.stop(); // Cancel any pending operations
|
|
|
|
|
+ if (async_thread_.joinable()) { async_thread_.join(); }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+inline bool SSEClient::parse_sse_line(const std::string &line, SSEMessage &msg,
|
|
|
|
|
+ int &retry_ms) {
|
|
|
|
|
+ // Blank line signals end of event
|
|
|
|
|
+ if (line.empty() || line == "\r") { return true; }
|
|
|
|
|
+
|
|
|
|
|
+ // Lines starting with ':' are comments (ignored)
|
|
|
|
|
+ if (!line.empty() && line[0] == ':') { return false; }
|
|
|
|
|
+
|
|
|
|
|
+ // Find the colon separator
|
|
|
|
|
+ auto colon_pos = line.find(':');
|
|
|
|
|
+ if (colon_pos == std::string::npos) {
|
|
|
|
|
+ // Line with no colon is treated as field name with empty value
|
|
|
|
|
+ return false;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ auto field = line.substr(0, colon_pos);
|
|
|
|
|
+ std::string value;
|
|
|
|
|
+
|
|
|
|
|
+ // Value starts after colon, skip optional single space
|
|
|
|
|
+ if (colon_pos + 1 < line.size()) {
|
|
|
|
|
+ auto value_start = colon_pos + 1;
|
|
|
|
|
+ if (line[value_start] == ' ') { value_start++; }
|
|
|
|
|
+ value = line.substr(value_start);
|
|
|
|
|
+ // Remove trailing \r if present
|
|
|
|
|
+ if (!value.empty() && value.back() == '\r') { value.pop_back(); }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Handle known fields
|
|
|
|
|
+ if (field == "event") {
|
|
|
|
|
+ msg.event = value;
|
|
|
|
|
+ } else if (field == "data") {
|
|
|
|
|
+ // Multiple data lines are concatenated with newlines
|
|
|
|
|
+ if (!msg.data.empty()) { msg.data += "\n"; }
|
|
|
|
|
+ msg.data += value;
|
|
|
|
|
+ } else if (field == "id") {
|
|
|
|
|
+ // Empty id is valid (clears the last event ID)
|
|
|
|
|
+ msg.id = value;
|
|
|
|
|
+ } else if (field == "retry") {
|
|
|
|
|
+ // Parse retry interval in milliseconds
|
|
|
|
|
+ {
|
|
|
|
|
+ int v = 0;
|
|
|
|
|
+ auto res =
|
|
|
|
|
+ detail::from_chars(value.data(), value.data() + value.size(), v);
|
|
|
|
|
+ if (res.ec == std::errc{}) { retry_ms = v; }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ // Unknown fields are ignored per SSE spec
|
|
|
|
|
+
|
|
|
|
|
+ return false;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+inline void SSEClient::run_event_loop() {
|
|
|
|
|
+ auto reconnect_count = 0;
|
|
|
|
|
+
|
|
|
|
|
+ while (running_.load()) {
|
|
|
|
|
+ // Build headers, including Last-Event-ID if we have one
|
|
|
|
|
+ auto request_headers = headers_;
|
|
|
|
|
+ if (!last_event_id_.empty()) {
|
|
|
|
|
+ request_headers.emplace("Last-Event-ID", last_event_id_);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Open streaming connection
|
|
|
|
|
+ auto result = stream::Get(client_, path_, request_headers);
|
|
|
|
|
|
|
|
- // Connection ended
|
|
|
|
|
|
|
+ // Connection error handling
|
|
|
|
|
+ if (!result) {
|
|
|
connected_.store(false);
|
|
connected_.store(false);
|
|
|
|
|
+ if (on_error_) { on_error_(result.error()); }
|
|
|
|
|
|
|
|
- if (!running_.load()) { break; }
|
|
|
|
|
|
|
+ if (!should_reconnect(reconnect_count)) { break; }
|
|
|
|
|
+ wait_for_reconnect();
|
|
|
|
|
+ reconnect_count++;
|
|
|
|
|
+ continue;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // Check for read errors
|
|
|
|
|
- if (result.has_read_error()) {
|
|
|
|
|
- if (on_error_) { on_error_(result.read_error()); }
|
|
|
|
|
|
|
+ if (result.status() != 200) {
|
|
|
|
|
+ connected_.store(false);
|
|
|
|
|
+ // For certain errors, don't reconnect
|
|
|
|
|
+ if (result.status() == 204 || // No Content - server wants us to stop
|
|
|
|
|
+ result.status() == 404 || // Not Found
|
|
|
|
|
+ result.status() == 401 || // Unauthorized
|
|
|
|
|
+ result.status() == 403) { // Forbidden
|
|
|
|
|
+ if (on_error_) { on_error_(Error::Connection); }
|
|
|
|
|
+ break;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ if (on_error_) { on_error_(Error::Connection); }
|
|
|
|
|
+
|
|
|
if (!should_reconnect(reconnect_count)) { break; }
|
|
if (!should_reconnect(reconnect_count)) { break; }
|
|
|
wait_for_reconnect();
|
|
wait_for_reconnect();
|
|
|
reconnect_count++;
|
|
reconnect_count++;
|
|
|
|
|
+ continue;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- connected_.store(false);
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Connection successful
|
|
|
|
|
+ connected_.store(true);
|
|
|
|
|
+ reconnect_count = 0;
|
|
|
|
|
+ if (on_open_) { on_open_(); }
|
|
|
|
|
|
|
|
- // Dispatch event to appropriate handler
|
|
|
|
|
- void dispatch_event(const SSEMessage &msg) {
|
|
|
|
|
- // Check for specific event type handler first
|
|
|
|
|
- auto it = event_handlers_.find(msg.event);
|
|
|
|
|
- if (it != event_handlers_.end()) {
|
|
|
|
|
- it->second(msg);
|
|
|
|
|
- return;
|
|
|
|
|
|
|
+ // Event receiving loop
|
|
|
|
|
+ std::string buffer;
|
|
|
|
|
+ SSEMessage current_msg;
|
|
|
|
|
+
|
|
|
|
|
+ while (running_.load() && result.next()) {
|
|
|
|
|
+ buffer.append(result.data(), result.size());
|
|
|
|
|
+
|
|
|
|
|
+ // Process complete lines in the buffer
|
|
|
|
|
+ size_t line_start = 0;
|
|
|
|
|
+ size_t newline_pos;
|
|
|
|
|
+
|
|
|
|
|
+ while ((newline_pos = buffer.find('\n', line_start)) !=
|
|
|
|
|
+ std::string::npos) {
|
|
|
|
|
+ auto line = buffer.substr(line_start, newline_pos - line_start);
|
|
|
|
|
+ line_start = newline_pos + 1;
|
|
|
|
|
+
|
|
|
|
|
+ // Parse the line and check if event is complete
|
|
|
|
|
+ auto event_complete =
|
|
|
|
|
+ parse_sse_line(line, current_msg, reconnect_interval_ms_);
|
|
|
|
|
+
|
|
|
|
|
+ if (event_complete && !current_msg.data.empty()) {
|
|
|
|
|
+ // Update last_event_id for reconnection
|
|
|
|
|
+ if (!current_msg.id.empty()) { last_event_id_ = current_msg.id; }
|
|
|
|
|
+
|
|
|
|
|
+ // Dispatch event to appropriate handler
|
|
|
|
|
+ dispatch_event(current_msg);
|
|
|
|
|
+
|
|
|
|
|
+ current_msg.clear();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // Keep unprocessed data in buffer
|
|
|
|
|
+ buffer.erase(0, line_start);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Fall back to generic message handler
|
|
|
|
|
- if (on_message_) { on_message_(msg); }
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ // Connection ended
|
|
|
|
|
+ connected_.store(false);
|
|
|
|
|
|
|
|
- // Check if we should attempt to reconnect
|
|
|
|
|
- bool should_reconnect(int count) const {
|
|
|
|
|
- if (!running_.load()) { return false; }
|
|
|
|
|
- if (max_reconnect_attempts_ == 0) { return true; } // unlimited
|
|
|
|
|
- return count < max_reconnect_attempts_;
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ if (!running_.load()) { break; }
|
|
|
|
|
|
|
|
- // Wait for reconnect interval
|
|
|
|
|
- void wait_for_reconnect() {
|
|
|
|
|
- // Use small increments to check running_ flag frequently
|
|
|
|
|
- auto waited = 0;
|
|
|
|
|
- while (running_.load() && waited < reconnect_interval_ms_) {
|
|
|
|
|
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
|
- waited += 100;
|
|
|
|
|
|
|
+ // Check for read errors
|
|
|
|
|
+ if (result.has_read_error()) {
|
|
|
|
|
+ if (on_error_) { on_error_(result.read_error()); }
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ if (!should_reconnect(reconnect_count)) { break; }
|
|
|
|
|
+ wait_for_reconnect();
|
|
|
|
|
+ reconnect_count++;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Client and path
|
|
|
|
|
- Client &client_;
|
|
|
|
|
- std::string path_;
|
|
|
|
|
- Headers headers_;
|
|
|
|
|
|
|
+ connected_.store(false);
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Callbacks
|
|
|
|
|
- MessageHandler on_message_;
|
|
|
|
|
- std::map<std::string, MessageHandler> event_handlers_;
|
|
|
|
|
- OpenHandler on_open_;
|
|
|
|
|
- ErrorHandler on_error_;
|
|
|
|
|
|
|
+inline void SSEClient::dispatch_event(const SSEMessage &msg) {
|
|
|
|
|
+ // Check for specific event type handler first
|
|
|
|
|
+ auto it = event_handlers_.find(msg.event);
|
|
|
|
|
+ if (it != event_handlers_.end()) {
|
|
|
|
|
+ it->second(msg);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- // Configuration
|
|
|
|
|
- int reconnect_interval_ms_ = 3000;
|
|
|
|
|
- int max_reconnect_attempts_ = 0; // 0 = unlimited
|
|
|
|
|
|
|
+ // Fall back to generic message handler
|
|
|
|
|
+ if (on_message_) { on_message_(msg); }
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // State
|
|
|
|
|
- std::atomic<bool> running_{false};
|
|
|
|
|
- std::atomic<bool> connected_{false};
|
|
|
|
|
- std::string last_event_id_;
|
|
|
|
|
|
|
+inline bool SSEClient::should_reconnect(int count) const {
|
|
|
|
|
+ if (!running_.load()) { return false; }
|
|
|
|
|
+ if (max_reconnect_attempts_ == 0) { return true; } // unlimited
|
|
|
|
|
+ return count < max_reconnect_attempts_;
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- // Async support
|
|
|
|
|
- std::thread async_thread_;
|
|
|
|
|
-};
|
|
|
|
|
|
|
+inline void SSEClient::wait_for_reconnect() {
|
|
|
|
|
+ // Use small increments to check running_ flag frequently
|
|
|
|
|
+ auto waited = 0;
|
|
|
|
|
+ while (running_.load() && waited < reconnect_interval_ms_) {
|
|
|
|
|
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
|
|
|
+ waited += 100;
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
} // namespace sse
|
|
} // namespace sse
|
|
|
|
|
|
|
|
-// ----------------------------------------------------------------------------
|
|
|
|
|
-
|
|
|
|
|
-/*
|
|
|
|
|
- * Implementation that will be part of the .cc file if split into .h + .cc.
|
|
|
|
|
- */
|
|
|
|
|
-
|
|
|
|
|
#ifdef CPPHTTPLIB_SSL_ENABLED
|
|
#ifdef CPPHTTPLIB_SSL_ENABLED
|
|
|
/*
|
|
/*
|
|
|
* TLS abstraction layer - internal function declarations
|
|
* TLS abstraction layer - internal function declarations
|