Kurlyk
Loading...
Searching...
No Matches
EmscriptenWebSocketClientAdapter.hpp
Go to the documentation of this file.
1/*
2* kurlyk - C++ library for easy HTTP requests
3*
4* Copyright (c) 2021-2024 Elektro Yar. Email: git.electroyar@gmail.com
5*
6* Permission is hereby granted, free of charge, to any person obtaining a copy
7* of this software and associated documentation files (the "Software"), to deal
8* in the Software without restriction, including without limitation the rights
9* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10* copies of the Software, and to permit persons to whom the Software is
11* furnished to do so, subject to the following conditions:
12*
13* The above copyright notice and this permission notice shall be included in
14* all copies or substantial portions of the Software.
15*
16* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22* SOFTWARE.
23*/
24#pragma once
25#ifndef _KURLYK_EMSCRIPTEN_WEB_SOCKET_CLIENT_ADAPTER_HPP_INCLUDED
26#define _KURLYK_EMSCRIPTEN_WEB_SOCKET_CLIENT_ADAPTER_HPP_INCLUDED
27
28#include <functional>
29#include <string>
30#include <emscripten/fetch.h>
31#include <emscripten/websocket.h>
32#include <system_error>
33
34namespace kurlyk {
35
39 public:
40
45
48 reset();
49 }
50
53
60 std::function<void(std::unique_ptr<WebSocketEventData>)>& event_handler() override final {
61 return m_on_event;
62 }
63
67 bool set_config(std::unique_ptr<WebSocketConfig> config) override final {
69 std::lock_guard<std::mutex> lock1(m_client_mutex, std::adopt_lock);
70 std::lock_guard<std::mutex> lock2(m_config_mutex, std::adopt_lock);
71
72 m_pending_config = std::move(config);
74
75 // If the connection is active, update the configuration immediately.
77 m_is_running = true;
79 }
80 return true;
81 }
82
84 void connect() override final {
85 std::lock_guard<std::mutex> lock(m_client_mutex);
88 m_is_running = true;
89 }
90
92 void disconnect() override final {
93 std::lock_guard<std::mutex> lock(m_client_mutex);
96 }
97
100 const bool is_connected() override final {
101 std::lock_guard<std::mutex> lock(m_client_mutex);
103 }
104
107 const bool is_running() override final {
108 return m_is_running;
109 }
110
117 const std::string &message,
118 const RateLimitType& rate_limit_type = RateLimitType::General,
119 std::function<void(const std::error_code& ec)> callback = nullptr) override final {
120 if (message.empty()) return false;
121 if (!is_connected()) return false;
122 std::lock_guard<std::mutex> lock(m_message_queue_mutex);
123# if __cplusplus >= 201402L
124 m_message_queue.push_back(std::make_shared<WebSocketSendInfo>(message, rate_limit_type, false, 0, callback));
125# else
126 m_message_queue.push_back(std::shared_ptr<WebSocketSendInfo>(new WebSocketSendInfo(message, rate_limit_type, false, 0, callback)));
127# endif
128 return true;
129 }
130
137 const int status = 1000,
138 const std::string &reason = std::string(),
139 std::function<void(const std::error_code& ec)> callback = nullptr) override final {
140 if (!is_connected()) return false;
141 std::lock_guard<std::mutex> lock(m_message_queue_mutex);
142# if __cplusplus >= 201402L
143 m_message_queue.push_back(std::make_shared<WebSocketSendInfo>(reason, RateLimitType::General, true, status, callback));
144# else
145 m_message_queue.push_back(std::shared_ptr<WebSocketSendInfo>(new WebSocketSendInfo(reason, RateLimitType::General, true, status, callback)));
146# endif
147 return true;
148 }
149
152 std::list<std::unique_ptr<WebSocketEventData>> receive_events() override final {
153 std::lock_guard<std::mutex> lock(m_client_mutex);
154 auto event_queue = std::move(m_event_queue);
155 m_event_queue.clear();
156 return event_queue;
157 }
158
161 std::unique_ptr<WebSocketEventData> receive_event() override final {
162 std::lock_guard<std::mutex> lock(m_client_mutex);
163 if (m_event_queue.empty()) return nullptr;
164 std::unique_ptr<WebSocketEventData> event = std::move(*m_event_queue.begin());
165 m_event_queue.erase(m_event_queue.begin());
166 return event;
167 }
168
170 void process() override final {
171 process_update_config();
175 }
176
178 void reset() override final {
181 }
182
183 private:
184 using time_point_t = std::chrono::steady_clock::time_point;
185 using event_data_ptr_t = std::unique_ptr<WebSocketEventData>;
186 using send_info_ptr_t = std::shared_ptr<WebSocketSendInfo>;
187
196
205
215
216 std::function<void(std::unique_ptr<WebSocketEventData>)> m_on_event;
218 std::unique_ptr<WebSocketConfig> m_config;
219 std::unique_ptr<WebSocketConfig> m_pending_config;
220 std::mutex m_config_mutex;
222
223 //
226 EMSCRIPTEN_WEBSOCKET_T m_client_ws;
227 std::mutex m_client_mutex;
228
229 std::shared_ptr<WsClient::Connection> m_ws_connection;
230 std::shared_ptr<WssClient::Connection> m_wss_connection;
231 std::list<event_data_ptr_t> m_event_queue;
232
234 std::list<send_info_ptr_t> m_message_queue;
235
237 std::atomic<bool> m_is_connection_active = ATOMIC_VAR_INIT(false);
238 std::atomic<bool> m_is_running = ATOMIC_VAR_INIT(false);
239
241 // Проверка поддержки Websocket
242 if (!emscripten_websocket_is_supported()) {
243 send_event_websocket_error(std::error_code(10022, std::system_category()));
244 return false;
245 }
246
247 // Валидация url
248 if (!utils::is_valid_url(m_config->url, {"ws", "wss"})) {
249 send_event_websocket_error(std::error_code(10022, std::system_category()));
250 return false;
251 }
252
253 // Инициализация протоколов
254
255 // Добавляем протоколы, если они уже есть в m_config->protocols
256 std::vector<const char*> protocols_c_str;
257 for (const auto& protocol : m_config->protocols) {
258 protocols_c_str.push_back(protocol.c_str());
259 }
260
261 // Если нет протоколов в m_config, пытаемся извлечь их из заголовка "Sec-WebSocket-Protocol"
262 if (m_config->m_config->protocols.empty() &&
263 m_config->headers.find("Sec-WebSocket-Protocol") != m_config->headers.end()) {
264 // Извлекаем строку протоколов
265 std::string protocol_header = m_config->headers["Sec-WebSocket-Protocol"];
266
267 // Разбиваем строку по запятым
268 std::stringstream ss(protocol_header);
269 std::string protocol;
270 while (std::getline(ss, protocol, ',')) {
271 // Удаляем лишние пробелы в начале и конце протоколов
272 protocol.erase(0, protocol.find_first_not_of(' '));
273 protocol.erase(protocol.find_last_not_of(' ') + 1);
274
275 protocols_c_str.push_back(protocol.c_str());
276 }
277 }
278
279 protocols_c_str.push_back(nullptr);
280
281 EmscriptenWebSocketCreateAttributes ws_attrs = {
282 m_config->url.c_str(),
283 protocols_c_str.data(),
284 EM_TRUE
285 };
286
287 m_client_ws = emscripten_websocket_new(&ws_attrs);
288 if (m_client_ws <= 0) {
289 send_event_websocket_error(std::error_code(10022, std::system_category()));
290 return false;
291 }
292
293 emscripten_websocket_set_onopen_callback(m_client_ws, this, on_open_cb);
294 emscripten_websocket_set_onerror_callback(m_client_ws, this, on_close_cb);
295 emscripten_websocket_set_onclose_callback(m_client_ws, this, on_error_cb);
296 emscripten_websocket_set_onmessage_callback(m_client_ws, this, on_message_cb);
297 return true;
298 }
299
301 std::unique_lock<std::mutex> lock_message(m_message_queue_mutex);
302 m_message_queue.clear();
303 lock_message.unlock();
304
305 std::unique_lock<std::mutex> lock_client(m_client_mutex);
306 emscripten_websocket_delete(m_client_ws);
307
314 lock_client.unlock();
315 //send_event_websocket_error(std::make_error_code(std::errc::connection_reset));
316 send_event_websocket_error(std::error_code(995, std::system_category()));
317 }
318 }
319
322 if (m_client_ws <= 0) return;
323
324 std::unique_lock<std::mutex> lock(m_message_queue_mutex);
325 if (m_message_queue.empty()) return;
326
327 std::list<send_info_ptr_t> message_queue;
328 auto it = m_message_queue.begin();
329 while (it != m_message_queue.end()) {
330 auto& send_info = *it;
331 // Check if the message is allowed by the rate limiter.
332 if (!m_rate_limiter.allow_request(send_info->rate_limit_type)) {
333 ++it;
334 continue;
335 }
336 message_queue.push_back(std::move(send_info));
337 it = m_message_queue.erase(it);
338 }
339 lock.unlock();
340 if (message_queue.empty()) return;
341
342 // Обрабатываем очередь сообщений на отправку
343 EMSCRIPTEN_RESULT result;
344 for (const auto &send_info : message_queue) {
345 std::unique_lock<std::mutex> lock(m_client_mutex);
347 lock.unlock();
348 if (!send_info->callback) continue;
349 send_info->callback(std::make_error_code(std::errc::not_connected));
350 continue;
351 }
352 lock.unlock();
353
354 // Закрываем соединение
355 if (send_info->is_send_close) {
356 std::unique_lock<std::mutex> lock(m_client_mutex);
357 result = emscripten_websocket_close(m_client_ws, send_info->status, "normal closure");
358 lock.unlock();
359 if (!send_info->callback) continue;
360 if (result != EMSCRIPTEN_RESULT_SUCCESS) {
361 send_info->callback(std::make_error_code(std::errc::not_connected));
362 } else {
363 std::error_code ec;
364 send_info->callback(ec);
365 }
366 continue;
367 }
368
369 // Отправляем сообщение
370 std::unique_lock<std::mutex> lock(m_client_mutex);
371 result = emscripten_websocket_send_binary(m_client_ws, send_info->message.c_str(), send_info->message.size());
372 lock.unlock();
373 if (!send_info->callback) continue;
374 if (result != EMSCRIPTEN_RESULT_SUCCESS) {
375 send_info->callback(std::make_error_code(std::errc::not_connected));
376 } else {
377 std::error_code ec;
378 send_info->callback(ec);
379 }
380 }
381 }
382
385 if (!m_on_event) return;
386
387 std::unique_lock<std::mutex> lock(m_client_mutex);
388 if (m_event_queue.empty()) return;
389 auto event_queue = std::move(m_event_queue);
390 m_event_queue.clear();
391 lock.unlock();
392
393 for (auto &event : event_queue) {
394 m_on_event(std::move(event));
395 }
396 }
397
399 std::unique_lock<std::mutex> lock(m_client_mutex);
400 ws_state = m_ws_state;
401 lock.unlock();
402
403 switch (ws_state) {
406 break;
407 }
409
410 break;
411 }
413
414 break;
415 }
417
418 break;
419 }
420 default:
421
422 break;
423 };
424 }
425
427 if (!m_fsm_event_queue.has_events()) return;
428 switch (m_fsm_event_queue.pop_event()) {
430 if (!init_websocket()) {
432 break;
433 }
435 break;
436 default:
437 break;
438 };
439 }
440
442 if (!m_fsm_event_queue.has_events()) return;
443 switch (m_fsm_event_queue.pop_event()) {
447 break;
451 if (!m_config) {
453 break;
454 }
455 if (!m_config->reconnect) {
457 break;
458 }
460 m_start_time = std::chrono::steady_clock::now();
462 break;
467 break;
471 if (!init_websocket()) {
473 break;
474 }
476 break;
477 default:
478 break;
479 };
480 }
481
482 //----------------------------------------------------------------------
483
484 static EM_BOOL on_open_cb(
485 int event_type,
486 const EmscriptenWebSocketOpenEvent* event_data,
487 void* user_data) {
488 auto* client = static_cast<EmscriptenWebSocketClientAdapter*>(user_data);
489 if (client) client->on_open(event_data);
490 return EM_TRUE;
491 }
492
493 static EM_BOOL on_close_cb(
494 int event_type,
495 const EmscriptenWebSocketCloseEvent* event_data,
496 void* user_data) {
497 auto* client = static_cast<EmscriptenWebSocketClientAdapter*>(user_data);
498 if (client) client->on_close(event_data);
499 return EM_TRUE;
500 }
501
502 static EM_BOOL on_error_cb(
503 int event_type,
504 const EmscriptenWebSocketErrorEvent* event_data,
505 void* user_data) {
506 auto* client = static_cast<EmscriptenWebSocketClientAdapter*>(user_data);
507 if (client) client->on_error(event_data);
508 return EM_TRUE;
509 }
510
511 static EM_BOOL on_message_cb(
512 int event_type,
513 const EmscriptenWebSocketMessageEvent* event_data,
514 void* user_data) {
515 auto* client = static_cast<EmscriptenWebSocketClientAdapter*>(user_data);
516 if (client) client->on_message(event_data);
517 return EM_TRUE;
518 }
519
520 //----------------------------------------------------------------------
521
522 void on_open(const EmscriptenWebSocketOpenEvent* event_data) {
523 std::lock_guard<std::mutex> lock(m_client_mutex);
526 m_event_queue.push_back(create_websocket_event(event_data));
527 }
528
529 void on_message(const EmscriptenWebSocketMessageEvent* event_data) {
530 std::lock_guard<std::mutex> lock(m_client_mutex);
531 m_event_queue.push_back(create_websocket_event(event_data));
532 }
533
534 void on_error(const EmscriptenWebSocketErrorEvent* event_data) {
535 std::lock_guard<std::mutex> lock(m_client_mutex);
537 m_event_queue.push_back(create_websocket_event(event_data));
538 }
541 }
542
543 void on_close(const EmscriptenWebSocketCloseEvent* event_data) {
544 std::lock_guard<std::mutex> lock(m_client_mutex);
547 m_event_queue.push_back(create_websocket_event(event_data));
548 }
549
550 //----------------------------------------------------------------------
551
552
553 std::unique_ptr<WebSocketEventData> create_websocket_event() {
554# if __cplusplus >= 201402L
555 auto event = std::make_unique<WebSocketEventData>();
556# else
557 auto event = std::unique_ptr<WebSocketEventData>(new WebSocketEventData());
558# endif
559 event->sender = shared_from_this();
560 return event;
561 }
562
563 std::unique_ptr<WebSocketEventData> create_websocket_event(
564 const EmscriptenWebSocketOpenEvent* event_data) {
565 auto event = create_websocket_event();
566 event->event_type = WebSocketEventType::Open;
567 return event;
568 }
569
570 std::unique_ptr<WebSocketEventData> create_websocket_event(
571 const EmscriptenWebSocketMessageEvent* event_data) {
572 auto event = create_websocket_event();
573 event->event_type = WebSocketEventType::Message;
574 event->message.assign(event_data->data, event_data->numBytes);
575 return event;
576 }
577
578 std::unique_ptr<WebSocketEventData> create_websocket_event(
579 const EmscriptenWebSocketErrorEvent* event_data) {
580 auto event = create_websocket_event();
581 event->event_type = WebSocketEventType::Error;
582 event->error_code = std::error_code(995, std::system_category());
583 return event;
584 }
585
586 std::unique_ptr<WebSocketEventData> create_websocket_event(
587 const EmscriptenWebSocketCloseEvent* event_data) {
588 auto event = create_websocket_event();
589 event->event_type = WebSocketEventType::Close;
590 event->reason = event_data->reason;
591 event->status_code = event_data->code;
592 return event;
593 }
594 };
595
596};
597
598#endif // _KURLYK_EMSCRIPTEN_WEB_SOCKET_CLIENT_ADAPTER_HPP_INCLUDED
std::unique_ptr< WebSocketEventData > create_websocket_event(const EmscriptenWebSocketErrorEvent *event_data)
bool send_message(const std::string &message, const RateLimitType &rate_limit_type=RateLimitType::General, std::function< void(const std::error_code &ec)> callback=nullptr) override final
Send a message through the WebSocket.
std::unique_ptr< WebSocketEventData > create_websocket_event(const EmscriptenWebSocketMessageEvent *event_data)
void on_error(const EmscriptenWebSocketErrorEvent *event_data)
std::unique_ptr< WebSocketEventData > receive_event() override final
Retrieve the next pending event.
void process() override final
Process pending operations such as connecting, sending, and handling events.
void disconnect() override final
Disconnect from the WebSocket server.
std::shared_ptr< WssClient::Connection > m_wss_connection
void operator=(const EmscriptenWebSocketClientAdapter &)=delete
std::unique_ptr< WebSocketEventData > create_websocket_event()
static EM_BOOL on_message_cb(int event_type, const EmscriptenWebSocketMessageEvent *event_data, void *user_data)
std::function< void(std::unique_ptr< WebSocketEventData >)> & event_handler() override final
Accessor for the event handler function.
void on_message(const EmscriptenWebSocketMessageEvent *event_data)
static EM_BOOL on_close_cb(int event_type, const EmscriptenWebSocketCloseEvent *event_data, void *user_data)
static EM_BOOL on_error_cb(int event_type, const EmscriptenWebSocketErrorEvent *event_data, void *user_data)
void reset() override final
Reset the WebSocket client state.
~EmscriptenWebSocketClientAdapter() override final
Destructor: Cleans up resources and resets the WebSocket client state.
enum kurlyk::EmscriptenWebSocketClientAdapter::WebSocketState m_ws_state
bool set_config(std::unique_ptr< WebSocketConfig > config) override final
Sets the WebSocket configuration.
void connect() override final
Initiate connection to the WebSocket server.
std::list< std::unique_ptr< WebSocketEventData > > receive_events() override final
Retrieve all pending events.
static EM_BOOL on_open_cb(int event_type, const EmscriptenWebSocketOpenEvent *event_data, void *user_data)
bool send_close(const int status=1000, const std::string &reason=std::string(), std::function< void(const std::error_code &ec)> callback=nullptr) override final
Send a close request through the WebSocket.
enum kurlyk::EmscriptenWebSocketClientAdapter::FsmState m_fsm_state
EmscriptenWebSocketClientAdapter(const EmscriptenWebSocketClientAdapter &)=delete
void on_close(const EmscriptenWebSocketCloseEvent *event_data)
std::unique_ptr< WebSocketEventData > create_websocket_event(const EmscriptenWebSocketCloseEvent *event_data)
FsmState
Finite state machine states controlling client workflow.
void process_message_queue()
Process the queue of messages waiting to be sent.
const bool is_connected() override final
Checks if the WebSocket is connected.
EmscriptenWebSocketClientAdapter()
Constructor: Initializes the WebSocket client.
std::function< void(std::unique_ptr< WebSocketEventData >)> m_on_event
FsmEvent
Represents events in the finite state machine.
std::shared_ptr< WsClient::Connection > m_ws_connection
void on_open(const EmscriptenWebSocketOpenEvent *event_data)
std::unique_ptr< WebSocketEventData > create_websocket_event(const EmscriptenWebSocketOpenEvent *event_data)
const bool is_running() override final
Checks if the client is currently running.
Interface for a WebSocket client, providing methods for connection management, configuration,...
Encapsulates data for a WebSocket event, providing information about event type, message,...
Manages rate limiting for WebSocket requests based on predefined limits.
Holds information for sending a WebSocket message, including rate limiting, close status,...
A thread-safe event queue that supports blocking and non-blocking event retrieval.
bool is_valid_url(const std::string &url, const std::vector< std::string > &protocol)
Validates if a URL is correctly formatted.
Primary namespace for the Kurlyk library, encompassing initialization, request management,...
RateLimitType
Defines rate limit scope categories.
Definition enums.hpp:24