Kurlyk
Loading...
Searching...
No Matches
BaseWebSocketClient.hpp
Go to the documentation of this file.
1#pragma once
2#ifndef _KURLYK_BASE_WEB_SOCKET_CLIENT_HPP_INCLUDED
3#define _KURLYK_BASE_WEB_SOCKET_CLIENT_HPP_INCLUDED
4
7
12
13namespace kurlyk {
14
17 class BaseWebSocketClient : public IWebSocketClient, public std::enable_shared_from_this<BaseWebSocketClient> {
18 public:
19
22
24 virtual ~BaseWebSocketClient() = default;
25
30 std::function<void(std::unique_ptr<WebSocketEventData>)>& event_handler() override final {
31 return m_on_event;
32 }
33
39 std::function<void()>& notify_handler() override final {
40 return m_on_event_notify;
41 }
42
46 void set_config(std::unique_ptr<WebSocketConfig> config, std::function<void(bool)> callback) override final {
47 m_fsm_event_queue.push_event(FSMEventData(FsmEvent::UpdateConfig, std::move(config), std::move(callback)));
48 }
49
52 void connect(std::function<void(bool)> callback) override final {
53 m_fsm_event_queue.push_event(FSMEventData(FsmEvent::RequestConnect, std::move(callback)));
54 }
55
58 void disconnect(std::function<void(bool)> callback) override final {
59 m_fsm_event_queue.push_event(FSMEventData(FsmEvent::RequestDisconnect, std::move(callback)));
60 }
61
64 bool is_connected() const override final {
65 return m_is_connected;
66 }
67
70 bool is_running() const override final {
71 return m_is_running || m_fsm_event_queue.has_events();
72 }
73
78 std::list<std::unique_ptr<WebSocketEventData>> receive_events() const override final {
79 std::lock_guard<std::mutex> lock(m_event_queue_mutex);
80 auto event_queue = std::move(m_event_queue);
81 m_event_queue.clear();
82 return event_queue;
83 }
84
89 std::unique_ptr<WebSocketEventData> receive_event() const override final {
90 std::lock_guard<std::mutex> lock(m_event_queue_mutex);
91 if (m_event_queue.empty()) return nullptr;
92 std::unique_ptr<WebSocketEventData> event = std::move(*m_event_queue.begin());
93 m_event_queue.erase(m_event_queue.begin());
94 return event;
95 }
96
103 const std::string &message,
104 long rate_limit_id,
105 std::function<void(const std::error_code& ec)> callback = nullptr) override final {
106 if (message.empty() || !is_connected()) return false;
107 std::lock_guard<std::mutex> lock(m_message_queue_mutex);
108# if __cplusplus >= 201402L
109 m_message_queue.push_back(std::make_shared<WebSocketSendInfo>(message, rate_limit_id, false, 0, std::move(callback)));
110# else
111 m_message_queue.push_back(std::shared_ptr<WebSocketSendInfo>(new WebSocketSendInfo(message, rate_limit_id, false, 0, std::move(callback))));
112# endif
113 return true;
114 }
115
122 const int status = 1000,
123 const std::string &reason = std::string(),
124 std::function<void(const std::error_code& ec)> callback = nullptr) override final {
125 if (!is_connected()) return false;
126 std::lock_guard<std::mutex> lock(m_message_queue_mutex);
127# if __cplusplus >= 201402L
128 m_message_queue.push_back(std::make_shared<WebSocketSendInfo>(reason, 0, true, status, std::move(callback)));
129# else
130 m_message_queue.push_back(std::shared_ptr<WebSocketSendInfo>(new WebSocketSendInfo(reason, 0, true, status, std::move(callback))));
131# endif
132 return true;
133 }
134
135
140 void process() override final {
144 }
145
148 void shutdown() override final {
150 while (is_running()) {
153 }
154 }
155
156 protected:
157
158 std::unique_ptr<WebSocketConfig> m_config;
159
168
179
180
182 virtual bool init_websocket() = 0;
183
185 virtual void deinit_websocket() = 0;
186
189 virtual void send_message(std::shared_ptr<WebSocketSendInfo>& send_info) = 0;
190
193 virtual void send_close(std::shared_ptr<WebSocketSendInfo>& send_info) = 0;
194
197 std::unique_ptr<WebSocketEventData> create_websocket_event() {
198# if __cplusplus >= 201402L
199 auto event = std::make_unique<WebSocketEventData>();
200# else
201 auto event = std::unique_ptr<WebSocketEventData>(new WebSocketEventData());
202# endif
203 event->sender = shared_from_this();
204 return event;
205 }
206
213 std::unique_ptr<WebSocketEventData> create_websocket_close_event(
214 const std::string& reason = "Normal Closure",
215 int status_code = 1000) {
216 auto websocket_event = create_websocket_event();
217 websocket_event->event_type = WebSocketEventType::WS_CLOSE;
218 websocket_event->message = reason;
219 websocket_event->status_code = status_code;
220 return websocket_event;
221 }
222
228 std::unique_ptr<WebSocketEventData> create_websocket_error_event(
229 const std::error_code& error_code) {
230 auto websocket_event = create_websocket_event();
231 websocket_event->event_type = WebSocketEventType::WS_ERROR;
232 websocket_event->error_code = error_code;
233 return websocket_event;
234 }
235
240 const std::error_code& error_code,
241 const std::function<void(const std::error_code& ec)> &callback) {
242 std::lock_guard<std::mutex> lock(m_send_callback_queue_mutex);
243 m_send_callback_queue.push_back(std::make_pair(error_code, callback));
244 }
245
249 void add_fsm_event(FsmEvent event_type, std::unique_ptr<WebSocketEventData> event_data) {
250 m_fsm_event_queue.push_event(FSMEventData(event_type, std::move(event_data)));
252 }
253
254 private:
255
256 std::function<void(std::unique_ptr<WebSocketEventData>)> m_on_event;
257 std::function<void()> m_on_event_notify;
258
263 std::unique_ptr<WebSocketEventData> event_data;
264 std::unique_ptr<WebSocketConfig> config_data;
265 std::function<void(bool)> callback;
266
269 FSMEventData(FSMEventData&& other) noexcept
270 : event_type(other.event_type),
271 event_data(std::move(other.event_data)),
272 config_data(std::move(other.config_data)),
273 callback(std::move(other.callback)) {
274 }
275
280 if (this != &other) {
281 event_type = other.event_type;
282 event_data = std::move(other.event_data);
283 config_data = std::move(other.config_data);
284 callback = std::move(other.callback);
285 }
286 return *this;
287 }
288
290 FSMEventData(const FSMEventData&) = delete;
291
294
300 std::unique_ptr<WebSocketEventData> &&event_data) :
302 event_data(std::move(event_data)) {
303 }
304
311 std::unique_ptr<WebSocketConfig> &&config_data,
312 std::function<void(bool)> &&callback) :
315 callback(std::move(callback)) {
316 }
317
323 std::function<void(bool)> &&callback) :
325 callback(std::move(callback)) {
326 }
327
333
334 }; // FSMEventData
335
338 std::atomic<bool> m_is_running = ATOMIC_VAR_INIT(false);
339 std::atomic<bool> m_is_connected = ATOMIC_VAR_INIT(false);
340
342 std::chrono::steady_clock::time_point m_close_time;
343
344 mutable std::mutex m_event_queue_mutex;
345 using event_data_ptr_t = std::unique_ptr<WebSocketEventData>;
346 mutable std::list<event_data_ptr_t> m_event_queue;
347
349 using send_info_ptr_t = std::shared_ptr<WebSocketSendInfo>;
350 std::list<send_info_ptr_t> m_message_queue;
351
353 using send_callback_t = std::pair<std::error_code, std::function<void(const std::error_code& ec)>>;
354 std::list<send_callback_t> m_send_callback_queue;
355
358 switch (m_fsm_state) {
359 case FsmState::INIT:
361 break;
364 break;
367 break;
370 break;
373 break;
374 };
375 }
376
379 if (!m_fsm_event_queue.has_events()) return;
380
381 auto event = m_fsm_event_queue.pop_event();
382 switch (event.event_type) {
384 if (!m_config) {
386 if (event.callback) event.callback(false);
388 break;
389 }
390 if (!init_websocket()) {
392 if (event.callback) event.callback(false);
394 break;
395 }
396 m_is_running = true;
397 if (event.callback) event.callback(true);
399 break;
401 m_config = std::move(event.config_data);
402 if (m_config) {
403 m_rate_limiter.set_limit(m_config->rate_limits);
404 if (event.callback) event.callback(true);
405 } else {
406 if (event.callback) event.callback(false);
407 }
408 break;
409 default:
410 if (event.callback) event.callback(false);
411 break;
412 };
413 }
414
417 if (!m_fsm_event_queue.has_events()) return;
418
419 auto event = m_fsm_event_queue.pop_event();
420 switch (event.event_type) {
422 handle_open_event(std::move(event.event_data));
424 m_is_running = true;
426 break;
428 handle_error_event(std::move(event.event_data));
431 if (event.event_type == FsmEvent::ConnectionClosed) {
432 handle_close_event(std::move(event.event_data));
433 } else {
435 }
436
437 if (!m_config->reconnect) {
438 m_is_running = false;
440 break;
441 }
442
444 m_close_time = std::chrono::steady_clock::now();
445 m_is_running = true;
447 break;
451
453 m_is_running = false;
454 if (event.callback) event.callback(true);
456 break;
457 }
461
462 m_config = std::move(event.config_data);
463 if (!m_config) {
465 if (event.callback) event.callback(false);
467 break;
468 }
469 m_rate_limiter.set_limit(m_config->rate_limits);
470
472 if (!init_websocket()) {
474 if (event.callback) event.callback(false);
476 break;
477 }
478
479 m_is_running = true;
480 if (event.callback) event.callback(true);
482 break;
483 default:
484 if (event.callback) event.callback(false);
485 break;
486 };
487 }
488
491 bool is_message;
492 while (m_fsm_event_queue.has_events()) {
493 is_message = false;
494 auto event = m_fsm_event_queue.pop_event();
495 switch (event.event_type) {
499
501 m_is_running = false;
502 if (event.callback) event.callback(true);
504 break;
506 handle_error_event(std::move(event.event_data));
509 if (event.event_type == FsmEvent::ConnectionClosed) {
510 handle_close_event(std::move(event.event_data));
511 } else {
513 }
514
515 if (!m_config->reconnect) {
516 m_is_running = false;
518 break;
519 }
520
522 m_close_time = std::chrono::steady_clock::now();
523 m_is_running = true;
525 break;
529
530 m_config = std::move(event.config_data);
531 if (!m_config) {
533 if (event.callback) event.callback(false);
535 break;
536 }
537 m_rate_limiter.set_limit(m_config->rate_limits);
538
540 if (!init_websocket()) {
542 if (event.callback) event.callback(false);
544 break;
545 }
546
547 m_is_running = true;
548 if (event.callback) event.callback(true);
550 break;
552 handle_message_event(std::move(event.event_data));
553 is_message = true;
554 break;
555 default:
556 if (event.callback) event.callback(false);
557 break;
558 };
559 if (!is_message) break;
560 }
561 }
562
565 if (m_fsm_event_queue.has_events()){
566 auto event = m_fsm_event_queue.pop_event();
567 switch (event.event_type) {
569 m_is_running = false;
570 if (event.callback) event.callback(true);
572 break;
574 m_config = std::move(event.config_data);
575 if (!m_config) {
577 if (event.callback) event.callback(false);
579 break;
580 }
581 m_rate_limiter.set_limit(m_config->rate_limits);
582
584 if (!init_websocket()) {
586 if (event.callback) event.callback(false);
588 break;
589 }
590
591 m_is_running = true;
592 if (event.callback) event.callback(true);
594 break;
596 handle_message_event(std::move(event.event_data));
597 break;
598 default:
599 if (event.callback) event.callback(false);
600 break;
601 };
602 }
603
604 if (!m_config) {
607 return;
608 }
609
610 if (m_config->reconnect) {
611 if (m_config->reconnect_attempts &&
612 m_reconnect_attempt >= m_config->reconnect_attempts) {
613 m_is_running = false;
615 return;
616 }
617
618 auto now = std::chrono::steady_clock::now();
619 auto duration = std::chrono::duration_cast<std::chrono::seconds>(now - m_close_time);
620 if (duration.count() >= m_config->reconnect_delay) {
621 if (!init_websocket()) {
624 return;
625 }
626 m_is_running = true;
628 }
629 return;
630 }
631
632 m_is_running = false;
634 }
635
638 if (!m_fsm_event_queue.has_events()) return;
639
640 auto event = m_fsm_event_queue.pop_event();
641 switch (event.event_type) {
643 if (!m_config) {
645 if (event.callback) event.callback(false);
647 break;
648 }
649 if (!init_websocket()) {
651 if (event.callback) event.callback(false);
653 break;
654 }
655 m_is_running = true;
656 if (event.callback) event.callback(true);
658 break;
660 m_config = std::move(event.config_data);
661 if (!m_config) {
663 if (event.callback) event.callback(false);
665 break;
666 }
667 m_rate_limiter.set_limit(m_config->rate_limits);
668
670 if (!init_websocket()) {
672 if (event.callback) event.callback(false);
674 break;
675 }
676
677 m_is_running = true;
678 if (event.callback) event.callback(true);
680 break;
681 default:
682 if (event.callback) event.callback(false);
683 break;
684 };
685 }
686
691 std::unique_lock<std::mutex> lock(m_message_queue_mutex);
692 if (m_message_queue.empty()) return;
693
694 std::list<send_info_ptr_t> message_queue;
695 auto it = m_message_queue.begin();
696 while (it != m_message_queue.end()) {
697 auto& send_info = *it;
698 // Check if the message is allowed by the rate limiter.
699 if (!m_rate_limiter.allow_request(send_info->rate_limit_id)) {
700 ++it;
701 continue;
702 }
703 message_queue.push_back(std::move(send_info));
704 it = m_message_queue.erase(it);
705 }
706 lock.unlock();
707 if (message_queue.empty()) return;
708
709 for (auto &send_info : message_queue) {
710 if (!send_info->is_send_close) {
711 send_message(send_info);
712 continue;
713 }
714 send_close(send_info);
715 }
716 }
717
722 std::unique_lock<std::mutex> lock(m_send_callback_queue_mutex);
723 if (m_send_callback_queue.empty()) return;
724 auto send_callback_queue = std::move(m_send_callback_queue);
725 m_send_callback_queue.clear();
726
727 for (auto &item : send_callback_queue) {
728 item.second(item.first);
729 }
730 }
731
736 void handle_open_event(std::unique_ptr<WebSocketEventData> event) {
737 if (!m_is_connected) {
738 m_is_connected = true;
739 if (m_on_event) {
740 m_on_event(std::move(event));
741 } else {
742 std::lock_guard<std::mutex> lock(m_event_queue_mutex);
743 m_event_queue.push_back(std::move(event));
744 }
745 }
746 }
747
752 void handle_close_event(std::unique_ptr<WebSocketEventData> event = nullptr) {
753 if (!event) {
755 }
756 if (m_is_connected) {
757 m_is_connected = false;
758 if (m_on_event) {
759 m_on_event(std::move(event));
760 } else {
761 std::lock_guard<std::mutex> lock(m_event_queue_mutex);
762 m_event_queue.push_back(std::move(event));
763 }
764 }
765 }
766
770 void handle_error_event(std::unique_ptr<WebSocketEventData> event) {
771 if (m_on_event) {
772 m_on_event(std::move(event));
773 return;
774 }
775 std::lock_guard<std::mutex> lock(m_event_queue_mutex);
776 m_event_queue.push_back(std::move(event));
777 }
778
781 void handle_error_event(const std::error_code& error_code) {
783 }
784
788 void handle_message_event(std::unique_ptr<WebSocketEventData> event) {
789 if (m_on_event) {
790 m_on_event(std::move(event));
791 return;
792 }
793 std::lock_guard<std::mutex> lock(m_event_queue_mutex);
794 m_event_queue.push_back(std::move(event));
795 }
796
797 }; // BaseWebSocketClient
798
799}; // namespace kurlyk
800
801
802#endif // _KURLYK_BASE_WEB_SOCKET_CLIENT_HPP_INCLUDED
Defines an interface for WebSocket client functionality, including connection management,...
Defines the interface for a WebSocket sender, providing methods for sending messages and managing con...
Defines the WebSocketEventData class, which encapsulates data related to WebSocket events.
Defines WebSocket rate limiting to control the frequency of WebSocket requests.
virtual ~BaseWebSocketClient()=default
Virtual destructor.
void process_state_working()
Handles the WORKING state. Processes incoming events and manages connection health.
std::chrono::steady_clock::time_point m_close_time
Timestamp of the last WebSocket close event, used for reconnection timing.
virtual bool init_websocket()=0
Initializes the WebSocket connection. Must be implemented in derived classes.
void handle_message_event(std::unique_ptr< WebSocketEventData > event)
Handles incoming WebSocket message events and queues them if no event handler is set.
bool send_message(const std::string &message, long rate_limit_id, std::function< void(const std::error_code &ec)> callback=nullptr) override final
Send a message through the WebSocket.
std::unique_ptr< WebSocketEventData > event_data_ptr_t
Alias for unique pointers to WebSocketEventData.
std::unique_ptr< WebSocketEventData > create_websocket_event()
Creates a generic WebSocket event.
BaseWebSocketClient()=default
Default constructor.
bool is_connected() const override final
Checks if the WebSocket client is actively running.
void connect(std::function< void(bool)> callback) override final
Initiates a connection to the WebSocket server.
std::mutex m_message_queue_mutex
Mutex for synchronizing access to the message queue.
std::mutex m_send_callback_queue_mutex
Mutex for synchronizing access to the send callback queue.
std::mutex m_event_queue_mutex
Mutex for synchronizing access to the event queue.
bool is_running() const override final
Checks if the WebSocket client is actively running.
void process_state_connecting()
Handles the CONNECTING state. Manages connection attempt, errors, or disconnection.
void process_state_init()
Handles the INIT state. Initializes connection or updates configuration.
enum kurlyk::BaseWebSocketClient::FsmState m_fsm_state
bool send_close(const int status=1000, const std::string &reason=std::string(), std::function< void(const std::error_code &ec)> callback=nullptr) override final
Send a close request through the WebSocket.
void handle_error_event(const std::error_code &error_code)
Overloaded method to handle WebSocket error events using an error code.
WebSocketRateLimiter m_rate_limiter
Rate limiter for controlling the frequency of message sending.
utils::EventQueue< FSMEventData > m_fsm_event_queue
Queue for FSM events, managing the event sequence for the FSM.
void handle_close_event(std::unique_ptr< WebSocketEventData > event=nullptr)
Handles the event when the WebSocket connection is closed.
virtual void deinit_websocket()=0
Deinitializes the WebSocket connection. Must be implemented in derived classes.
void process_state_stopped()
Processes the STOPPED state in the FSM.
std::list< event_data_ptr_t > m_event_queue
Queue holding pending WebSocket events.
std::unique_ptr< WebSocketEventData > create_websocket_close_event(const std::string &reason="Normal Closure", int status_code=1000)
Creates a WebSocket close event with a specified reason and status code.
std::pair< std::error_code, std::function< void(const std::error_code &ec)> > send_callback_t
Alias for callback pairs with error codes.
void set_config(std::unique_ptr< WebSocketConfig > config, std::function< void(bool)> callback) override final
Sets the configuration for the WebSocket client.
void handle_error_event(std::unique_ptr< WebSocketEventData > event)
Handles WebSocket error events and queues them if no event handler is set.
void process_message_queue()
Processes the queue of messages to be sent over the WebSocket.
virtual void send_message(std::shared_ptr< WebSocketSendInfo > &send_info)=0
Sends a WebSocket message.
std::unique_ptr< WebSocketEventData > receive_event() const override final
Retrieves the next available WebSocket event, if any.
FsmState
Finite State Machine (FSM) states for the WebSocket connection.
@ WORKING
Connection active and working.
void process_fsm_state()
Processes the current FSM state and transitions to the appropriate next state.
std::function< void(std::unique_ptr< WebSocketEventData >)> m_on_event
Function to handle WebSocket events. Called when a new event is received.
std::list< std::unique_ptr< WebSocketEventData > > receive_events() const override final
Retrieves all pending WebSocket events in a batch.
void process_send_callback_queue()
Processes the queue of send callbacks.
std::function< void(std::unique_ptr< WebSocketEventData >)> & event_handler() override final
Accessor for the event handler function.
void process_state_reconnecting()
Handles the RECONNECTING state. Attempts to reconnect based on configuration settings.
void add_send_callback(const std::error_code &error_code, const std::function< void(const std::error_code &ec)> &callback)
Adds a send callback to the queue.
std::atomic< bool > m_is_connected
Atomic flag indicating if the client is connected.
void process() override final
Processes internal operations such as event handling and state updates.
void disconnect(std::function< void(bool)> callback) override final
Closes the connection to the WebSocket server.
std::list< send_callback_t > m_send_callback_queue
Queue holding send callbacks with their respective error codes.
void add_fsm_event(FsmEvent event_type, std::unique_ptr< WebSocketEventData > event_data)
Adds an FSM event to the event queue and triggers the notify handler.
std::unique_ptr< WebSocketConfig > m_config
Current configuration for the WebSocket.
std::shared_ptr< WebSocketSendInfo > send_info_ptr_t
Alias for shared pointers to WebSocketSendInfo.
FsmEvent
Represents events in the finite state machine.
@ MessageReceived
Incoming WebSocket message.
@ ConnectionOpened
Connection opened successfully.
std::function< void()> & notify_handler() override final
Accesses the notification handler for WebSocket events.
std::list< send_info_ptr_t > m_message_queue
Queue holding messages to be sent over the WebSocket.
virtual void send_close(std::shared_ptr< WebSocketSendInfo > &send_info)=0
Sends a close request.
std::unique_ptr< WebSocketEventData > create_websocket_error_event(const std::error_code &error_code)
Creates a WebSocket error event with a specified error code.
long m_reconnect_attempt
Counter for the number of reconnection attempts.
std::atomic< bool > m_is_running
Atomic flag indicating if the client is running.
std::function< void()> m_on_event_notify
Function to notify about new events in the FSM.
void handle_open_event(std::unique_ptr< WebSocketEventData > event)
Handles the event when the WebSocket connection is opened.
void shutdown() override final
Shuts down the WebSocket client, disconnecting and clearing all pending events.
IWebSocketClient()=default
Default constructor.
Encapsulates data for a WebSocket event, providing information about event type, message,...
Manages rate limiting for WebSocket requests based on predefined limits.
Holds information for sending a WebSocket message, including rate limiting, close status,...
A thread-safe event queue that supports blocking and non-blocking event retrieval.
@ InvalidConfiguration
Provided configuration is incomplete or invalid.
std::error_code make_error_code(ClientError e)
Creates a std::error_code from a ClientError value.
Primary namespace for the Kurlyk library, encompassing initialization, request management,...
@ WS_ERROR
Error occurred.
Definition enums.hpp:35
@ WS_CLOSE
Connection closed.
Definition enums.hpp:34
Enables use of ClientError with std::error_code.
Represents an event in the finite state machine (FSM) with optional associated data and callback.
std::function< void(bool)> callback
Optional callback function to execute on event completion.
FSMEventData & operator=(const FSMEventData &)=delete
Deleted copy assignment operator to prevent copying.
FSMEventData(FsmEvent event_type, std::unique_ptr< WebSocketEventData > &&event_data)
Constructs FSMEventData with an event type and WebSocket event data.
FSMEventData(const FSMEventData &)=delete
Deleted copy constructor to prevent copying.
std::unique_ptr< WebSocketEventData > event_data
Optional WebSocket event data associated with the FSM event.
FSMEventData(FsmEvent event_type)
Constructs FSMEventData with only an event type.
FSMEventData(FSMEventData &&other) noexcept
Move constructor for FSMEventData.
FSMEventData(FsmEvent event_type, std::function< void(bool)> &&callback)
Constructs FSMEventData with an event type and a callback.
FSMEventData & operator=(FSMEventData &&other) noexcept
Move assignment operator for FSMEventData.
FSMEventData(FsmEvent event_type, std::unique_ptr< WebSocketConfig > &&config_data, std::function< void(bool)> &&callback)
Constructs FSMEventData with an event type, configuration data, and a callback.
FsmEvent event_type
The type of the FSM event.
std::unique_ptr< WebSocketConfig > config_data
Optional configuration data for FSM settings.