Kurlyk
Loading...
Searching...
No Matches
NetworkWorker.hpp
Go to the documentation of this file.
1#pragma once
2#ifndef _KURLYK_NETWORK_WORKER_HPP_INCLUDED
3#define _KURLYK_NETWORK_WORKER_HPP_INCLUDED
4
7
8#define KURLYK_HANDLE_ERROR(e, msg) \
9 ::kurlyk::core::NetworkWorker::get_instance().handle_error((e), (msg), __FILE__, __LINE__, __FUNCTION__)
10
11#include <thread>
12
13namespace kurlyk::core {
14
21 public:
22 using ErrorHandler = std::function<void(const std::exception&, const char*, const char*, int, const char*)>;
23
27 static NetworkWorker* instance = new NetworkWorker();
28 return *instance;
29 }
30
34 std::lock_guard<std::mutex> lock(m_error_handlers_mutex);
35 m_error_handlers.push_back(std::move(handler));
36 }
37
45 const std::exception& e,
46 const char* msg,
47 const char* file,
48 int line,
49 const char* func) {
50
51 std::unique_lock<std::mutex> lock(m_error_handlers_mutex);
52 if (m_error_handlers.empty()) return;
53 std::vector<ErrorHandler> handlers = m_error_handlers;
54 lock.unlock();
55
56 for (const auto& handler : handlers) {
57 try {
58 handler(e, msg, file, line, func);
59 } catch (...) {
60 // Never let handler crash the system
61 }
62 }
63 }
64
72 std::exception_ptr eptr,
73 const char* msg,
74 const char* file,
75 int line,
76 const char* func) {
77
78 if (!eptr) return;
79
80 try {
81 std::rethrow_exception(eptr);
82 } catch (const std::exception& e) {
83 handle_error(e, msg, file, line, func);
84 } catch (...) {
85 const std::runtime_error unknown("Unknown non-std::exception");
86 handle_error(unknown, msg, file, line, func);
87 }
88 }
89
91 bool is_worker_thread() const {
92 return std::this_thread::get_id() == m_worker_thread_id;
93 }
94
97 void add_task(std::function<void()> task) {
98 std::unique_lock<std::mutex> lock(m_tasks_list_mutex);
99 m_tasks_list.push_back(std::move(task));
100 lock.unlock();
101 notify();
102 }
103
107 std::lock_guard<std::mutex> lock(m_managers_mutex);
108 if (std::find(m_managers.begin(), m_managers.end(), manager) == m_managers.end()) {
109 m_managers.push_back(manager);
110 }
111 }
112
116 void process() {
117 std::unique_lock<std::mutex> lock(m_managers_mutex);
118 for (auto* m : m_managers) m->process();
119 lock.unlock();
121 }
122
126 void notify() {
127 std::lock_guard<std::mutex> locker(m_notify_mutex);
128 m_notify = true;
129 m_notify_condition.notify_one();
130 }
131
137 void start(const bool use_async) {
138 std::unique_lock<std::mutex> locker(m_is_worker_started_mutex);
139 if (m_is_worker_started) return;
140 m_is_worker_started = true;
141 if (!use_async) return;
142 locker.unlock();
143
144 m_future = std::async(
145 std::launch::async,
146 [this] {
147 m_worker_thread_id = std::this_thread::get_id();
148 for (;;) {
149 std::unique_lock<std::mutex> locker(m_notify_mutex);
150 m_notify_condition.wait(locker, [this]() { return m_notify; });
151 m_notify = false;
152 locker.unlock();
153
154 if (m_shutdown) {
155 shutdown();
156 return;
157 }
158
159 while (is_loaded()) {
160 process();
161 if (m_shutdown) {
162 shutdown();
163 return;
164 }
165
166 std::unique_lock<std::mutex> locker(m_notify_mutex);
167 m_notify_condition.wait_for(locker, std::chrono::milliseconds(1), [this] {
168 return m_notify || m_shutdown;
169 });
170 m_notify = false;
171 locker.unlock();
172
173 if (m_shutdown) {
174 shutdown();
175 return;
176 }
177 }
178
179 if (m_shutdown) {
180 shutdown();
181 return;
182 }
183 }
184 }).share();
185 }
186
190 void stop() {
191 {
192 std::lock_guard<std::mutex> locker(m_is_worker_started_mutex);
193 if (!m_is_worker_started && !m_future.valid()) return;
194 }
195
196 const bool was_shutdown = m_shutdown.exchange(true);
197 if (!m_future.valid()) {
198 if (!was_shutdown) shutdown();
199 return;
200 }
201
202 notify();
203 try {
204 m_future.wait();
205 m_future.get();
206 } catch(...) {
207 KURLYK_HANDLE_ERROR(std::current_exception(), "Exception during NetworkWorker shutdown");
208 };
209 }
210
214 void shutdown() {
215 std::unique_lock<std::mutex> lock(m_managers_mutex);
216 for (auto* m : m_managers) m->shutdown();
217 lock.unlock();
219 }
220
221 private:
222 std::shared_future<void> m_future;
223 std::atomic<bool> m_shutdown = ATOMIC_VAR_INIT(false);
224 std::thread::id m_worker_thread_id;
225 std::mutex m_notify_mutex;
226 std::condition_variable m_notify_condition;
227 bool m_notify = false;
229 bool m_is_worker_started = false;
230 mutable std::mutex m_tasks_list_mutex;
231 std::list<std::function<void()>> m_tasks_list;
232 mutable std::mutex m_managers_mutex;
233 std::vector<INetworkTaskManager*> m_managers;
235 std::vector<ErrorHandler> m_error_handlers;
236
237
243
246 stop();
247 }
248
250 NetworkWorker(const NetworkWorker&) = delete;
251
254
259 std::unique_lock<std::mutex> lock(m_tasks_list_mutex);
260 if (m_tasks_list.empty()) return;
261 auto tasks_list = std::move(m_tasks_list);
262 m_tasks_list.clear();
263 lock.unlock();
264 for (auto &item : tasks_list) {
265 item();
266 }
267 tasks_list.clear();
268 }
269
272 const bool has_pending_tasks() const {
273 std::lock_guard<std::mutex> lock(m_tasks_list_mutex);
274 return !m_tasks_list.empty();
275 }
276
281 const bool is_loaded() const {
282 std::unique_lock<std::mutex> lock(m_managers_mutex);
283 for (auto* m : m_managers) {
284 if (m->is_loaded()) return true;
285 }
286 lock.unlock();
287 return has_pending_tasks();
288 }
289
290 }; // NetworkWorker
291
292}; // namespace kurlyk
293
294#endif // _KURLYK_NETWORK_WORKER_HPP_INCLUDED
#define KURLYK_HANDLE_ERROR(e, msg)
Interface for modules managed by NetworkWorker (e.g., HTTP, WebSocket).
void stop()
Stops the worker thread, ensuring all tasks are completed.
void notify()
Notifies the worker to begin processing requests or tasks.
bool m_is_worker_started
Flag indicating if the worker thread is started.
const bool is_loaded() const
Checks if the NetworkWorker has pending tasks or active network events.
static NetworkWorker & get_instance()
Get the singleton instance of NetworkWorker.
void add_task(std::function< void()> task)
Adds a task to the queue and notifies the worker thread.
void handle_error(const std::exception &e, const char *msg, const char *file, int line, const char *func)
Dispatches an exception to all registered error handlers.
std::atomic< bool > m_shutdown
Flag indicating if shutdown has been requested.
NetworkWorker(const NetworkWorker &)=delete
Deleted copy constructor to enforce the singleton pattern.
void handle_error(std::exception_ptr eptr, const char *msg, const char *file, int line, const char *func)
Handles an exception captured as exception_ptr.
void process()
Processes all queued tasks and active HTTP and WebSocket requests.
std::condition_variable m_notify_condition
Condition variable for notifying the worker.
NetworkWorker & operator=(const NetworkWorker &)=delete
Deleted copy assignment operator to enforce the singleton pattern.
std::shared_future< void > m_future
Future for managing asynchronous worker execution.
std::vector< INetworkTaskManager * > m_managers
List of registered network task managers.
void process_tasks()
Processes all tasks in the task list, then clears the list.
void start(const bool use_async)
Starts the worker thread for asynchronous task processing.
bool m_notify
Flag indicating whether a notification is pending. Access is always guarded by m_notify_mutex; atomic...
std::list< std::function< void()> > m_tasks_list
List of tasks queued for processing by the worker.
const bool has_pending_tasks() const
Checks if there are any pending tasks in the task list.
void register_manager(INetworkTaskManager *manager)
Registers a network task manager to be managed by the NetworkWorker.
void add_error_handler(ErrorHandler handler)
Registers a callback for handling network errors.
std::mutex m_managers_mutex
Mutex protecting access to registered managers.
~NetworkWorker()
Private destructor, ensuring the worker thread stops on destruction.
void shutdown()
Shuts down the worker, clearing all active requests and pending tasks.
std::mutex m_tasks_list_mutex
Mutex for protecting access to the task list.
std::mutex m_is_worker_started_mutex
Mutex to control worker thread initialization.
NetworkWorker()
Private constructor to enforce singleton pattern.
bool is_worker_thread() const
Returns true if the calling thread is the worker thread.
std::thread::id m_worker_thread_id
ID of the async worker thread, if started.
std::mutex m_error_handlers_mutex
Mutex guarding the error handler list.
std::function< void(const std::exception &, const char *, const char *, int, const char *)> ErrorHandler
std::mutex m_notify_mutex
Mutex for managing worker notifications.
std::vector< ErrorHandler > m_error_handlers
Collection of registered error handlers.
bool is_valid_email_id(const std::string &str)
Validates an email address format.
std::string convert_user_agent_to_sec_ch_ua(const std::string &user_agent)
Converts a User-Agent string to a sec-ch-ua header value.