Kurlyk
Loading...
Searching...
No Matches
NetworkWorker.hpp
Go to the documentation of this file.
1#pragma once
2#ifndef _KURLYK_NETWORK_WORKER_HPP_INCLUDED
3#define _KURLYK_NETWORK_WORKER_HPP_INCLUDED
4
7
8#define KURLYK_HANDLE_ERROR(e, msg) \
9 ::kurlyk::core::NetworkWorker::get_instance().handle_error((e), (msg), __FILE__, __LINE__, __FUNCTION__)
10
11namespace kurlyk::core {
12
19 public:
20 using ErrorHandler = std::function<void(const std::exception&, const char*, const char*, int, const char*)>;
21
25 static NetworkWorker* instance = new NetworkWorker();
26 return *instance;
27 }
28
32 std::lock_guard<std::mutex> lock(m_error_handlers_mutex);
33 m_error_handlers.push_back(std::move(handler));
34 }
35
43 const std::exception& e,
44 const char* msg,
45 const char* file,
46 int line,
47 const char* func) {
48
49 std::unique_lock<std::mutex> lock(m_error_handlers_mutex);
50 if (m_error_handlers.empty()) return;
51 std::vector<ErrorHandler> handlers = m_error_handlers;
52 lock.unlock();
53
54 for (const auto& handler : handlers) {
55 try {
56 handler(e, msg, file, line, func);
57 } catch (...) {
58 // Never let handler crash the system
59 }
60 }
61 }
62
70 std::exception_ptr eptr,
71 const char* msg,
72 const char* file,
73 int line,
74 const char* func) {
75
76 if (!eptr) return;
77
78 try {
79 std::rethrow_exception(eptr);
80 } catch (const std::exception& e) {
81 handle_error(e, msg, file, line, func);
82 } catch (...) {
83 const std::runtime_error unknown("Unknown non-std::exception");
84 handle_error(unknown, msg, file, line, func);
85 }
86 }
87
90 void add_task(std::function<void()> task) {
91 std::unique_lock<std::mutex> lock(m_tasks_list_mutex);
92 m_tasks_list.push_back(std::move(task));
93 lock.unlock();
94 notify();
95 }
96
100 std::lock_guard<std::mutex> lock(m_managers_mutex);
101 if (std::find(m_managers.begin(), m_managers.end(), manager) == m_managers.end()) {
102 m_managers.push_back(manager);
103 }
104 }
105
109 void process() {
110 std::unique_lock<std::mutex> lock(m_managers_mutex);
111 for (auto* m : m_managers) m->process();
112 lock.unlock();
114 }
115
119 void notify() {
120 std::lock_guard<std::mutex> locker(m_notify_mutex);
121 m_notify_condition.notify_one();
122 m_notify = true;
123 }
124
130 void start(const bool use_async) {
131 std::unique_lock<std::mutex> locker(m_is_worker_started_mutex);
132 if (m_is_worker_started) return;
133 m_is_worker_started = true;
134 if (!use_async) return;
135 locker.unlock();
136
137 m_future = std::async(
138 std::launch::async,
139 [this] {
140 for (;;) {
141 std::unique_lock<std::mutex> locker(m_notify_mutex);
142 m_notify_condition.wait(locker, [this]() { return m_notify; });
143 m_notify = false;
144 locker.unlock();
145
146 if (m_shutdown) {
147 shutdown();
148 return;
149 }
150
151 while (is_loaded()) {
152 process();
153 if (m_shutdown) {
154 shutdown();
155 return;
156 }
157
158 std::unique_lock<std::mutex> locker(m_notify_mutex);
159 m_notify_condition.wait_for(locker, std::chrono::milliseconds(1), [this] {
160 return m_notify || m_shutdown;
161 });
162 m_notify = false;
163 locker.unlock();
164
165 if (m_shutdown) {
166 shutdown();
167 return;
168 }
169 }
170
171 if (m_shutdown) {
172 shutdown();
173 return;
174 }
175 }
176 }).share();
177 }
178
182 void stop() {
183 if (!m_future.valid()) return;
184 m_shutdown = true;
185 notify();
186 try {
187 m_future.wait();
188 m_future.get();
189 } catch(...) {
190 KURLYK_HANDLE_ERROR(std::current_exception(), "Exception during NetworkWorker shutdown");
191 };
192 }
193
197 void shutdown() {
198 std::unique_lock<std::mutex> lock(m_managers_mutex);
199 for (auto* m : m_managers) m->shutdown();
200 lock.unlock();
202 }
203
204 private:
205 std::shared_future<void> m_future;
206 std::atomic<bool> m_shutdown = ATOMIC_VAR_INIT(false);
207 std::mutex m_notify_mutex;
208 std::condition_variable m_notify_condition;
209 bool m_notify = false;
211 bool m_is_worker_started = false;
212 mutable std::mutex m_tasks_list_mutex;
213 std::list<std::function<void()>> m_tasks_list;
214 mutable std::mutex m_managers_mutex;
215 std::vector<INetworkTaskManager*> m_managers;
217 std::vector<ErrorHandler> m_error_handlers;
218
219
225
228 stop();
229 }
230
232 NetworkWorker(const NetworkWorker&) = delete;
233
236
241 std::unique_lock<std::mutex> lock(m_tasks_list_mutex);
242 if (m_tasks_list.empty()) return;
243 auto tasks_list = std::move(m_tasks_list);
244 m_tasks_list.clear();
245 lock.unlock();
246 for (auto &item : tasks_list) {
247 item();
248 }
249 tasks_list.clear();
250 }
251
254 const bool has_pending_tasks() const {
255 std::lock_guard<std::mutex> lock(m_tasks_list_mutex);
256 return !m_tasks_list.empty();
257 }
258
263 const bool is_loaded() const {
264 std::unique_lock<std::mutex> lock(m_managers_mutex);
265 for (auto* m : m_managers) {
266 if (m->is_loaded()) return true;
267 }
268 lock.unlock();
269 return has_pending_tasks();
270 }
271
272 }; // NetworkWorker
273
274}; // namespace kurlyk
275
276#endif // _KURLYK_NETWORK_WORKER_HPP_INCLUDED
#define KURLYK_HANDLE_ERROR(e, msg)
Interface for modules managed by NetworkWorker (e.g., HTTP, WebSocket).
void stop()
Stops the worker thread, ensuring all tasks are completed.
void notify()
Notifies the worker to begin processing requests or tasks.
bool m_is_worker_started
Flag indicating if the worker thread is started.
const bool is_loaded() const
Checks if the NetworkWorker has pending tasks or active network events.
static NetworkWorker & get_instance()
Get the singleton instance of NetworkWorker.
void add_task(std::function< void()> task)
Adds a task to the queue and notifies the worker thread.
void handle_error(const std::exception &e, const char *msg, const char *file, int line, const char *func)
Dispatches an exception to all registered error handlers.
std::atomic< bool > m_shutdown
Flag indicating if shutdown has been requested.
NetworkWorker(const NetworkWorker &)=delete
Deleted copy constructor to enforce the singleton pattern.
void handle_error(std::exception_ptr eptr, const char *msg, const char *file, int line, const char *func)
Handles an exception captured as exception_ptr.
void process()
Processes all queued tasks and active HTTP and WebSocket requests.
std::condition_variable m_notify_condition
Condition variable for notifying the worker.
NetworkWorker & operator=(const NetworkWorker &)=delete
Deleted copy assignment operator to enforce the singleton pattern.
std::shared_future< void > m_future
Future for managing asynchronous worker execution.
std::vector< INetworkTaskManager * > m_managers
List of registered network task managers.
void process_tasks()
Processes all tasks in the task list, then clears the list.
void start(const bool use_async)
Starts the worker thread for asynchronous task processing.
bool m_notify
Flag indicating whether a notification is pending.
std::list< std::function< void()> > m_tasks_list
List of tasks queued for processing by the worker.
const bool has_pending_tasks() const
Checks if there are any pending tasks in the task list.
void register_manager(INetworkTaskManager *manager)
Registers a network task manager to be managed by the NetworkWorker.
void add_error_handler(ErrorHandler handler)
Registers a callback for handling network errors.
std::mutex m_managers_mutex
Mutex protecting access to registered managers.
~NetworkWorker()
Private destructor, ensuring the worker thread stops on destruction.
void shutdown()
Shuts down the worker, clearing all active requests and pending tasks.
std::mutex m_tasks_list_mutex
Mutex for protecting access to the task list.
std::mutex m_is_worker_started_mutex
Mutex to control worker thread initialization.
NetworkWorker()
Private constructor to enforce singleton pattern.
std::mutex m_error_handlers_mutex
Mutex guarding the error handler list.
std::function< void(const std::exception &, const char *, const char *, int, const char *)> ErrorHandler
std::mutex m_notify_mutex
Mutex for managing worker notifications.
std::vector< ErrorHandler > m_error_handlers
Collection of registered error handlers.
bool is_valid_email_id(const std::string &str)
Validates an email address format.
std::string convert_user_agent_to_sec_ch_ua(const std::string &user_agent)
Converts a User-Agent string to a sec-ch-ua header value.