Kurlyk
Loading...
Searching...
No Matches
NetworkWorker.hpp
Go to the documentation of this file.
1#pragma once
2#ifndef _KURLYK_NETWORK_WORKER_HPP_INCLUDED
3#define _KURLYK_NETWORK_WORKER_HPP_INCLUDED
4
7
8#define KURLYK_HANDLE_ERROR(e, msg) \
9 ::kurlyk::core::NetworkWorker::get_instance().handle_error((e), (msg), __FILE__, __LINE__, __FUNCTION__)
10
11namespace kurlyk::core {
12
19 public:
20 using ErrorHandler = std::function<void(const std::exception&, const char*, const char*, int, const char*)>;
21
25 static NetworkWorker* instance = new NetworkWorker();
26 return *instance;
27 }
28
32 std::lock_guard<std::mutex> lock(m_error_handlers_mutex);
33 m_error_handlers.push_back(std::move(handler));
34 }
35
43 const std::exception& e,
44 const char* msg,
45 const char* file,
46 int line,
47 const char* func) {
48
49 std::unique_lock<std::mutex> lock(m_error_handlers_mutex);
50 if (m_error_handlers.empty()) return;
51 std::vector<ErrorHandler> handlers = m_error_handlers;
52 lock.unlock();
53
54 for (const auto& handler : handlers) {
55 try {
56 handler(e, msg, file, line, func);
57 } catch (...) {
58 // Never let handler crash the system
59 }
60 }
61 }
62
70 std::exception_ptr eptr,
71 const char* msg,
72 const char* file,
73 int line,
74 const char* func) {
75
76 if (!eptr) return;
77
78 try {
79 std::rethrow_exception(eptr);
80 } catch (const std::exception& e) {
81 handle_error(e, msg, file, line, func);
82 } catch (...) {
83 const std::runtime_error unknown("Unknown non-std::exception");
84 handle_error(unknown, msg, file, line, func);
85 }
86 }
87
90 void add_task(std::function<void()> task) {
91 std::unique_lock<std::mutex> lock(m_tasks_list_mutex);
92 m_tasks_list.push_back(std::move(task));
93 lock.unlock();
94 notify();
95 }
96
100 std::lock_guard<std::mutex> lock(m_managers_mutex);
101 if (std::find(m_managers.begin(), m_managers.end(), manager) == m_managers.end()) {
102 m_managers.push_back(manager);
103 }
104 }
105
109 void process() {
110 std::unique_lock<std::mutex> lock(m_managers_mutex);
111 for (auto* m : m_managers) m->process();
112 lock.unlock();
114 }
115
119 void notify() {
120 std::lock_guard<std::mutex> locker(m_notify_mutex);
121 m_notify_condition.notify_one();
122 m_notify = true;
123 }
124
130 void start(const bool use_async) {
131 std::unique_lock<std::mutex> locker(m_is_worker_started_mutex);
132 if (m_is_worker_started) return;
133 m_is_worker_started = true;
134 if (!use_async) return;
135 locker.unlock();
136
137 m_future = std::async(
138 std::launch::async,
139 [this] {
140 for (;;) {
141 std::unique_lock<std::mutex> locker(m_notify_mutex);
142 m_notify_condition.wait(locker, [this]() { return m_notify; });
143 m_notify = false;
144 locker.unlock();
145
146 if (m_shutdown) {
147 shutdown();
148 return;
149 }
150
151 while (is_loaded()) {
152 process();
153 if (m_shutdown) {
154 shutdown();
155 return;
156 }
157
158 std::unique_lock<std::mutex> locker(m_notify_mutex);
159 m_notify_condition.wait_for(locker, std::chrono::milliseconds(1), [this] {
160 return m_notify || m_shutdown;
161 });
162 m_notify = false;
163 locker.unlock();
164
165 if (m_shutdown) {
166 shutdown();
167 return;
168 }
169 }
170
171 if (m_shutdown) {
172 shutdown();
173 return;
174 }
175 }
176 }).share();
177 }
178
182 void stop() {
183 {
184 std::lock_guard<std::mutex> locker(m_is_worker_started_mutex);
185 if (!m_is_worker_started && !m_future.valid()) return;
186 }
187
188 const bool was_shutdown = m_shutdown.exchange(true);
189 if (!m_future.valid()) {
190 if (!was_shutdown) shutdown();
191 return;
192 }
193
194 notify();
195 try {
196 m_future.wait();
197 m_future.get();
198 } catch(...) {
199 KURLYK_HANDLE_ERROR(std::current_exception(), "Exception during NetworkWorker shutdown");
200 };
201 }
202
206 void shutdown() {
207 std::unique_lock<std::mutex> lock(m_managers_mutex);
208 for (auto* m : m_managers) m->shutdown();
209 lock.unlock();
211 }
212
213 private:
214 std::shared_future<void> m_future;
215 std::atomic<bool> m_shutdown = ATOMIC_VAR_INIT(false);
216 std::mutex m_notify_mutex;
217 std::condition_variable m_notify_condition;
218 bool m_notify = false;
220 bool m_is_worker_started = false;
221 mutable std::mutex m_tasks_list_mutex;
222 std::list<std::function<void()>> m_tasks_list;
223 mutable std::mutex m_managers_mutex;
224 std::vector<INetworkTaskManager*> m_managers;
226 std::vector<ErrorHandler> m_error_handlers;
227
228
234
237 stop();
238 }
239
241 NetworkWorker(const NetworkWorker&) = delete;
242
245
250 std::unique_lock<std::mutex> lock(m_tasks_list_mutex);
251 if (m_tasks_list.empty()) return;
252 auto tasks_list = std::move(m_tasks_list);
253 m_tasks_list.clear();
254 lock.unlock();
255 for (auto &item : tasks_list) {
256 item();
257 }
258 tasks_list.clear();
259 }
260
263 const bool has_pending_tasks() const {
264 std::lock_guard<std::mutex> lock(m_tasks_list_mutex);
265 return !m_tasks_list.empty();
266 }
267
272 const bool is_loaded() const {
273 std::unique_lock<std::mutex> lock(m_managers_mutex);
274 for (auto* m : m_managers) {
275 if (m->is_loaded()) return true;
276 }
277 lock.unlock();
278 return has_pending_tasks();
279 }
280
281 }; // NetworkWorker
282
283}; // namespace kurlyk
284
285#endif // _KURLYK_NETWORK_WORKER_HPP_INCLUDED
#define KURLYK_HANDLE_ERROR(e, msg)
Interface for modules managed by NetworkWorker (e.g., HTTP, WebSocket).
void stop()
Stops the worker thread, ensuring all tasks are completed.
void notify()
Notifies the worker to begin processing requests or tasks.
bool m_is_worker_started
Flag indicating if the worker thread is started.
const bool is_loaded() const
Checks if the NetworkWorker has pending tasks or active network events.
static NetworkWorker & get_instance()
Get the singleton instance of NetworkWorker.
void add_task(std::function< void()> task)
Adds a task to the queue and notifies the worker thread.
void handle_error(const std::exception &e, const char *msg, const char *file, int line, const char *func)
Dispatches an exception to all registered error handlers.
std::atomic< bool > m_shutdown
Flag indicating if shutdown has been requested.
NetworkWorker(const NetworkWorker &)=delete
Deleted copy constructor to enforce the singleton pattern.
void handle_error(std::exception_ptr eptr, const char *msg, const char *file, int line, const char *func)
Handles an exception captured as exception_ptr.
void process()
Processes all queued tasks and active HTTP and WebSocket requests.
std::condition_variable m_notify_condition
Condition variable for notifying the worker.
NetworkWorker & operator=(const NetworkWorker &)=delete
Deleted copy assignment operator to enforce the singleton pattern.
std::shared_future< void > m_future
Future for managing asynchronous worker execution.
std::vector< INetworkTaskManager * > m_managers
List of registered network task managers.
void process_tasks()
Processes all tasks in the task list, then clears the list.
void start(const bool use_async)
Starts the worker thread for asynchronous task processing.
bool m_notify
Flag indicating whether a notification is pending.
std::list< std::function< void()> > m_tasks_list
List of tasks queued for processing by the worker.
const bool has_pending_tasks() const
Checks if there are any pending tasks in the task list.
void register_manager(INetworkTaskManager *manager)
Registers a network task manager to be managed by the NetworkWorker.
void add_error_handler(ErrorHandler handler)
Registers a callback for handling network errors.
std::mutex m_managers_mutex
Mutex protecting access to registered managers.
~NetworkWorker()
Private destructor, ensuring the worker thread stops on destruction.
void shutdown()
Shuts down the worker, clearing all active requests and pending tasks.
std::mutex m_tasks_list_mutex
Mutex for protecting access to the task list.
std::mutex m_is_worker_started_mutex
Mutex to control worker thread initialization.
NetworkWorker()
Private constructor to enforce singleton pattern.
std::mutex m_error_handlers_mutex
Mutex guarding the error handler list.
std::function< void(const std::exception &, const char *, const char *, int, const char *)> ErrorHandler
std::mutex m_notify_mutex
Mutex for managing worker notifications.
std::vector< ErrorHandler > m_error_handlers
Collection of registered error handlers.
bool is_valid_email_id(const std::string &str)
Validates an email address format.
std::string convert_user_agent_to_sec_ch_ua(const std::string &user_agent)
Converts a User-Agent string to a sec-ch-ua header value.