Kurlyk
Loading...
Searching...
No Matches
HttpRequestManager.hpp
Go to the documentation of this file.
1#pragma once
2#ifndef _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
3#define _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
4
7
13
14namespace kurlyk {
15
19 public:
20
24 static HttpRequestManager* instance = new HttpRequestManager();
25 return *instance;
26 }
27
32 const bool add_request(
33 std::unique_ptr<HttpRequest> request_ptr,
34 HttpResponseCallback callback) {
35 return submit_request(std::move(request_ptr), std::move(callback)).accepted;
36 }
37
43 std::unique_ptr<HttpRequest> request_ptr,
44 HttpResponseCallback callback) {
45 std::lock_guard<std::mutex> lock(m_mutex);
46 if (m_shutdown) {
48 }
49
50 const std::size_t queue_limit = m_max_pending_requests.load();
51 if (queue_limit && m_pending_requests.size() >= queue_limit) {
53 }
54
55# if __cplusplus >= 201402L
56 m_pending_requests.push_back(std::make_unique<HttpRequestContext>(std::move(request_ptr), std::move(callback)));
57# else
58 m_pending_requests.push_back(std::unique_ptr<HttpRequestContext>(
59 new HttpRequestContext(std::move(request_ptr), std::move(callback))));
60# endif
61 return SubmitResult{true, std::error_code()};
62 }
63
68 const long create_rate_limit(long requests_per_period, long period_ms) {
69 return m_rate_limiter.create_limit(requests_per_period, period_ms);
70 }
71
75 bool remove_limit(long limit_id) {
76 return m_rate_limiter.remove_limit(limit_id);
77 }
78
82 return m_request_id_counter++;
83 }
84
90
93 std::size_t max_pending_requests() const {
94 return m_max_pending_requests.load();
95 }
96
100 void cancel_request_by_id(uint64_t request_id, std::function<void()> callback) {
101 if (m_shutdown) {
102 if (callback) callback();
103 return;
104 }
105 std::lock_guard<std::mutex> lock(m_mutex);
106 m_requests_to_cancel[request_id].push_back(std::move(callback));
107 }
108
118
121 void shutdown() override {
122 m_shutdown = true;
126 }
127
130 const bool is_loaded() const override {
131 std::lock_guard<std::mutex> lock(m_mutex);
132 return
133 !m_pending_requests.empty() ||
134 !m_failed_requests.empty() ||
135 !m_active_request_batches.empty() ||
136 !m_requests_to_cancel.empty();
137 }
138
139 private:
140 mutable std::mutex m_mutex;
141 std::list<std::unique_ptr<HttpRequestContext>> m_pending_requests;
142 std::list<std::unique_ptr<HttpRequestContext>> m_failed_requests;
143 std::list<std::unique_ptr<HttpBatchRequestHandler>> m_active_request_batches;
144 using callback_list_t = std::list<std::function<void()>>;
145 std::unordered_map<uint64_t, callback_list_t> m_requests_to_cancel;
147 std::atomic<uint64_t> m_request_id_counter = ATOMIC_VAR_INIT(1);
148 std::atomic<bool> m_shutdown = ATOMIC_VAR_INIT(false);
149 std::atomic<std::size_t> m_max_pending_requests = ATOMIC_VAR_INIT(0);
150
153 std::unique_lock<std::mutex> lock(m_mutex);
154 if (m_pending_requests.empty()) return;
155
156 std::vector<std::unique_ptr<HttpRequestContext>> pending_request;
157 std::vector<std::unique_ptr<HttpRequestContext>> failed_requests;
158
159 auto it = m_pending_requests.begin();
160 while (it != m_pending_requests.end()) {
161 auto& context = *it;
162 auto& request = context->request;
163 // Check if the request is valid.
164 if (!request) {
165 failed_requests.push_back(std::move(context));
166 it = m_pending_requests.erase(it);
167 continue;
168 }
169
170 // Check if the request is allowed by the rate limiter.
171 const bool allowed = m_rate_limiter.allow_request(
172 request->general_rate_limit_id,
173 request->specific_rate_limit_id);
174 if (!allowed) {
175 ++it;
176 continue;
177 }
178 pending_request.push_back(std::move(context));
179 it = m_pending_requests.erase(it);
180 }
181 lock.unlock();
182
183 // Handle failed requests by calling their callback with a 400 status.
184 if (!failed_requests.empty()) {
185 for (const auto &context : failed_requests) {
186# if __cplusplus >= 201402L
187 auto response = std::make_unique<HttpResponse>();
188# else
189 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
190# endif
191 const long BAD_REQUEST = 400;
192 response->error_code = utils::make_error_code(CURLE_OK);
193 response->status_code = BAD_REQUEST;
194 response->ready = true;
195 context->callback(std::move(response));
196 }
197 failed_requests.clear();
198 }
199
200 // If there are ready requests, create a new HttpBatchRequestHandler to manage them.
201 if (pending_request.empty()) return;
202# if __cplusplus >= 201402L
203 m_active_request_batches.push_back(std::make_unique<HttpBatchRequestHandler>(pending_request));
204# else
205 m_active_request_batches.push_back(std::unique_ptr<HttpBatchRequestHandler>(new HttpBatchRequestHandler(pending_request)));
206# endif
207 }
208
211 auto it = m_active_request_batches.begin();
212 while (it != m_active_request_batches.end()) {
213 auto& request = *it;
214 if (!request->process()) {
215 ++it;
216 continue;
217 }
218 auto failed_requests = request->extract_failed_requests();
219 it = m_active_request_batches.erase(it);
220 for (auto& request : failed_requests) {
221 m_failed_requests.push_back(std::move(request));
222 }
223 }
224 }
225
228 auto it = m_failed_requests.begin();
229 while (it != m_failed_requests.end()) {
230 auto& request_context = *it;
231 if (!request_context || !request_context->request) {
232 it = m_failed_requests.erase(it);
233 continue;
234 }
235
236 const auto now = std::chrono::steady_clock::now();
237 const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - request_context->start_time);
238 const auto& retry_delay_ms = request_context->request->retry_delay_ms;
239 if (duration.count() >= retry_delay_ms) {
240 std::unique_lock<std::mutex> lock(m_mutex);
241 m_pending_requests.push_back(std::move(request_context));
242 lock.unlock();
243 it = m_failed_requests.erase(it);
244 continue;
245 }
246 ++it;
247 }
248 }
249
252 std::unique_lock<std::mutex> lock(m_mutex);
253 if (m_requests_to_cancel.empty()) return;
254
255 auto requests_to_cancel = std::move(m_requests_to_cancel);
256 m_requests_to_cancel.clear();
257 lock.unlock();
258
259 for (const auto &request_context : m_failed_requests) {
260 if (!requests_to_cancel.count(request_context->request->request_id)) continue;
261# if __cplusplus >= 201402L
262 auto response = std::make_unique<HttpResponse>();
263# else
264 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
265# endif
266 const long CANCELED_REQUEST_CODE = 499;
267 response->error_code = utils::make_error_code(CURLE_OK);
268 response->status_code = CANCELED_REQUEST_CODE;
269 response->ready = true;
270 request_context->callback(std::move(response));
271 }
272
273 m_failed_requests.remove_if([&](const std::unique_ptr<HttpRequestContext>& ctx) {
274 return ctx && ctx->request && requests_to_cancel.count(ctx->request->request_id) > 0;
275 });
276
277
278 for (const auto &handler : m_active_request_batches) {
279 handler->cancel_request_by_id(requests_to_cancel);
280 }
281
282 for (const auto &request : requests_to_cancel) {
283 for (const auto &callback : request.second) {
284 if (callback) callback();
285 }
286 }
287 }
288
291 std::unique_lock<std::mutex> lock(m_mutex);
292 auto pending_requests = std::move(m_pending_requests);
293 m_pending_requests.clear();
294 lock.unlock();
295
296 for (const auto &request_context : pending_requests) {
297# if __cplusplus >= 201402L
298 auto response = std::make_unique<HttpResponse>();
299# else
300 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
301# endif
302 const long CANCELED_REQUEST_CODE = 499;
303 response->error_code = utils::make_error_code(CURLE_OK);
304 response->status_code = CANCELED_REQUEST_CODE;
305 response->ready = true;
306 request_context->callback(std::move(response));
307 }
308 for (const auto &request_context : m_failed_requests) {
309# if __cplusplus >= 201402L
310 auto response = std::make_unique<HttpResponse>();
311# else
312 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
313# endif
314 const long CANCELED_REQUEST_CODE = 499;
315 response->error_code = utils::make_error_code(CURLE_OK);
316 response->status_code = CANCELED_REQUEST_CODE;
317 response->ready = true;
318 request_context->callback(std::move(response));
319 }
320 }
321
324 curl_global_init(CURL_GLOBAL_ALL);
325 }
326
329 curl_global_cleanup();
330 }
331
334
337
338 }; // HttpRequestManager
339
340}; // namespace kurlyk
341
342#endif // _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
Manages multiple asynchronous HTTP requests using libcurl's multi interface.
Defines the HttpRateLimiter class for managing rate limits on HTTP requests.
Defines the HttpRequestContext class for managing HTTP request context, including retries and timing.
Handles multiple asynchronous HTTP requests using libcurl's multi interface.
Manages rate limits for HTTP requests, ensuring compliance with set limits.
Represents the context of an HTTP request, including the request object, callback function,...
HttpRequestManager(const HttpRequestManager &)=delete
Deleted copy constructor to enforce the singleton pattern.
std::size_t max_pending_requests() const
Returns the current maximum pending request count.
std::atomic< bool > m_shutdown
Flag indicating if shutdown has been requested.
const bool add_request(std::unique_ptr< HttpRequest > request_ptr, HttpResponseCallback callback)
Adds a new HTTP request to the manager.
std::unordered_map< uint64_t, callback_list_t > m_requests_to_cancel
Map of request IDs to their associated cancellation callbacks.
void set_max_pending_requests(std::size_t max_pending_requests)
Sets the maximum number of pending requests accepted into the global queue.
const long create_rate_limit(long requests_per_period, long period_ms)
Creates a rate limit with specified parameters.
std::list< std::unique_ptr< HttpRequestContext > > m_failed_requests
List of failed HTTP requests for retrying.
void process_active_requests()
Processes active requests, moving failed ones to the failed requests list for retrying.
void cancel_request_by_id(uint64_t request_id, std::function< void()> callback)
Cancels a request by its unique identifier.
static HttpRequestManager & get_instance()
Get the singleton instance of HttpRequestManager.
std::list< std::unique_ptr< HttpBatchRequestHandler > > m_active_request_batches
List of currently active HTTP request batches.
std::atomic< uint64_t > m_request_id_counter
Atomic counter for unique request IDs.
void process_retry_failed_requests()
Attempts to retry failed requests if their retry delay has passed.
std::atomic< std::size_t > m_max_pending_requests
Maximum number of requests accepted into the pending queue, or zero if unbounded.
virtual ~HttpRequestManager()
Private destructor to clean up global resources.
SubmitResult submit_request(std::unique_ptr< HttpRequest > request_ptr, HttpResponseCallback callback)
Attempts to enqueue a new HTTP request and reports the admission result.
HttpRateLimiter m_rate_limiter
Rate limiter for controlling request frequency.
std::list< std::function< void()> > callback_list_t
const bool is_loaded() const override
Checks if there are active, pending, or failed requests.
std::mutex m_mutex
Mutex to protect access to the pending requests list and requests-to-cancel map.
void process_cancel_requests()
Processes and cancels HTTP requests based on their IDs.
bool remove_limit(long limit_id)
Removes an existing rate limit with the specified identifier.
uint64_t generate_request_id()
Generates a new unique request ID.
void shutdown() override
Shuts down the request manager, clearing all active and pending requests.
HttpRequestManager & operator=(const HttpRequestManager &)=delete
Deleted copy assignment operator to enforce the singleton pattern.
HttpRequestManager()
Private constructor to initialize global resources (e.g., cURL).
void cleanup_pending_requests()
Cleans up pending requests, marking each as failed and invoking its callback.
void process() override
Processes all requests in the manager.
void process_pending_requests()
Processes all pending requests, moving valid requests to active batches or marking them as failed.
std::list< std::unique_ptr< HttpRequestContext > > m_pending_requests
List of pending HTTP requests awaiting processing.
Represents the response received from an HTTP request, including headers, content,...
Interface for modules managed by NetworkWorker (e.g., HTTP, WebSocket).
@ ShuttingDown
Operation was rejected because the owning subsystem is shutting down.
@ QueueLimitExceeded
Operation was rejected because the bounded queue is already full.
std::error_code make_error_code(ClientError e)
Creates a std::error_code from a ClientError value.
Primary namespace for the Kurlyk library, encompassing initialization, request management,...
std::function< void(HttpResponsePtr response)> HttpResponseCallback
Type definition for the callback function used to handle HTTP responses.
Represents the synchronous result of trying to enqueue or submit work.
bool accepted
Indicates whether the work item was accepted for processing.