LogIt++
Loading...
Searching...
No Matches
TaskExecutor.hpp
Go to the documentation of this file.
1#pragma once
2#ifndef _LOGIT_DETAIL_TASK_EXECUTOR_HPP_INCLUDED
3#define _LOGIT_DETAIL_TASK_EXECUTOR_HPP_INCLUDED
4
7
8#include <thread>
9#include <queue>
10#include <mutex>
11#include <functional>
12#include <condition_variable>
13#include <iostream>
14#include <chrono>
15
16namespace logit { namespace detail {
17
19 enum class QueuePolicy { Drop, Block };
20
21#if defined(__EMSCRIPTEN__)
22
25 class TaskExecutor {
26 public:
27 static TaskExecutor& get_instance() {
28 static TaskExecutor instance;
29 return instance;
30 }
31
32 void add_task(std::function<void()> task) {
33 if (task) task();
34 }
35
36 void wait() {}
37 void shutdown() {}
38
39 void set_max_queue_size(std::size_t) {}
41
42 private:
43 TaskExecutor() = default;
44 ~TaskExecutor() = default;
45 TaskExecutor(const TaskExecutor&) = delete;
46 TaskExecutor& operator=(const TaskExecutor&) = delete;
47 TaskExecutor(TaskExecutor&&) = delete;
49 };
50
51#else
52
59 public:
63 static TaskExecutor* instance = new TaskExecutor();
64 return *instance;
65 }
66
69 void add_task(std::function<void()> task) {
70 std::unique_lock<std::mutex> lock(m_queue_mutex);
71 if (m_stop_flag) return;
72 if (m_max_queue_size > 0 && m_tasks_queue.size() >= m_max_queue_size) {
74 return;
75 }
76 m_queue_condition.wait(lock, [this]() {
78 });
79 if (m_stop_flag) return;
80 }
81 m_tasks_queue.push(std::move(task));
82 lock.unlock();
83 m_queue_condition.notify_one();
84 }
85
87 void wait() {
88 m_queue_condition.notify_one();
89 for (;;) {
90 std::unique_lock<std::mutex> lock(m_queue_mutex);
91 if (m_tasks_queue.empty() || m_stop_flag) break;
92 lock.unlock();
93 std::this_thread::yield();
94 std::this_thread::sleep_for(std::chrono::milliseconds(1));
95 }
96 }
97
100 void shutdown() {
101 std::unique_lock<std::mutex> lock(m_queue_mutex);
102 m_stop_flag = true;
103 lock.unlock();
104 m_queue_condition.notify_all();
105 if (m_worker_thread.joinable()) {
106 m_worker_thread.join();
107 }
108 }
109
112 void set_max_queue_size(std::size_t size) {
113 std::lock_guard<std::mutex> lock(m_queue_mutex);
114 m_max_queue_size = size;
115 }
116
120 std::lock_guard<std::mutex> lock(m_queue_mutex);
121 m_overflow_policy = policy;
122 }
123
124 private:
125 std::queue<std::function<void()>> m_tasks_queue;
126 mutable std::mutex m_queue_mutex;
127 std::condition_variable m_queue_condition;
128 std::thread m_worker_thread;
130 std::size_t m_max_queue_size;
132
135 for (;;) {
136 std::function<void()> task;
137 std::unique_lock<std::mutex> lock(m_queue_mutex);
138 m_queue_condition.wait(lock, [this]() {
139 return !m_tasks_queue.empty() || m_stop_flag;
140 });
141 if (m_stop_flag && m_tasks_queue.empty()) {
142 break;
143 }
144 task = std::move(m_tasks_queue.front());
145 m_tasks_queue.pop();
146 lock.unlock();
147 m_queue_condition.notify_one();
148 task();
149 }
150 }
151
156
159 shutdown();
160 }
161
162 // Delete copy constructor and assignment operators to enforce singleton usage.
163 TaskExecutor(const TaskExecutor&) = delete;
167 };
168
169#endif // defined(__EMSCRIPTEN__)
170
171}} // namespace logit::detail
172
173#endif // _LOGIT_DETAIL_TASK_EXECUTOR_HPP_INCLUDED
std::mutex m_queue_mutex
Mutex to protect access to the task queue.
std::thread m_worker_thread
Worker thread for executing tasks.
void set_max_queue_size(std::size_t size)
Sets the maximum size of the task queue.
TaskExecutor(const TaskExecutor &)=delete
QueuePolicy m_overflow_policy
Policy for handling queue overflow.
TaskExecutor & operator=(const TaskExecutor &)=delete
std::condition_variable m_queue_condition
Condition variable to signal task availability.
void worker_function()
The worker thread function that processes tasks from the queue.
std::queue< std::function< void()> > m_tasks_queue
Queue holding tasks to be executed.
TaskExecutor & operator=(TaskExecutor &&)=delete
TaskExecutor()
Private constructor to enforce the singleton pattern.
bool m_stop_flag
Flag indicating if the worker thread should stop.
void shutdown()
Shuts down the TaskExecutor by stopping the worker thread.
TaskExecutor(TaskExecutor &&)=delete
void set_queue_policy(QueuePolicy policy)
Sets the behavior when the queue is full.
void add_task(std::function< void()> task)
Adds a task to the queue in a thread-safe manner.
void wait()
Waits for all tasks in the queue to be processed.
static TaskExecutor & get_instance()
Get the singleton instance of the TaskExecutor.
~TaskExecutor()
Destructor that stops the worker thread and cleans up resources.
std::size_t m_max_queue_size
Maximum number of tasks in the queue (0 for unlimited).
QueuePolicy
Queue overflow handling policy.
The primary namespace for the LogIt++ library.