Kurlyk
Loading...
Searching...
No Matches
HttpRequestManager.hpp
Go to the documentation of this file.
1#pragma once
2#ifndef _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
3#define _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
4
7
8#include <type_traits>
9
16
17
18namespace kurlyk {
19
24 public:
25
29 static HttpRequestManager* instance = new HttpRequestManager();
30 return *instance;
31 }
32
38 std::unique_ptr<HttpRequest> request_ptr,
39 HttpResponseCallback callback) {
40 return submit_request(std::move(request_ptr), std::move(callback)).accepted;
41 }
42
48 std::unique_ptr<HttpRequest> request_ptr,
49 HttpResponseCallback callback) {
50 std::lock_guard<std::mutex> lock(m_mutex);
51 if (m_shutdown) {
53 }
54
55 const std::size_t queue_limit = m_max_pending_requests.load();
56 if (queue_limit && m_pending_requests.size() >= queue_limit) {
58 }
59
60# if __cplusplus >= 201402L
61 auto context = std::make_unique<HttpRequestContext>(std::move(request_ptr), std::move(callback));
62# else
63 auto context = std::unique_ptr<HttpRequestContext>(
64 new HttpRequestContext(std::move(request_ptr), std::move(callback)));
65# endif
66 context->in_flight_token = m_next_in_flight_token.fetch_add(1, std::memory_order_relaxed);
67 m_pending_requests.push_back(std::move(context));
68 return SubmitResult{true, std::error_code()};
69 }
70
77 HttpRateLimitHandlePtr create_rate_limit(long requests_per_period, long period_ms, bool sequential = false) {
78 return m_rate_limiter.create_limit_handle(requests_per_period, period_ms, sequential);
79 }
80
84 return m_rate_limiter.get_limit(limit_id);
85 }
86
90 bool remove_limit(long limit_id) {
91 return m_rate_limiter.remove_limit(limit_id);
92 }
93
97 return m_rate_limiter.remove_limit(limit);
98 }
99
108 const HttpRateLimitHandlePtr& general_limit,
109 const HttpRateLimitHandlePtr& specific_limit,
110 uint64_t in_flight_token,
111 const std::string& general_key,
112 const std::string& specific_key) {
113 return m_rate_limiter.allow_request(
114 general_limit, specific_limit, in_flight_token, general_key, specific_key);
115 }
116
124 const HttpRateLimitHandlePtr& general_limit,
125 const HttpRateLimitHandlePtr& specific_limit,
126 uint64_t in_flight_token,
127 const std::string& general_key,
128 const std::string& specific_key) {
129 m_rate_limiter.release_request(
130 general_limit, specific_limit, in_flight_token, general_key, specific_key);
131 }
132
140 template<typename Duration = std::chrono::milliseconds>
142 const HttpRateLimitHandlePtr& general_limit,
143 const HttpRateLimitHandlePtr& specific_limit,
144 const std::string& general_key,
145 const std::string& specific_key
146 ) {
147 auto delay = m_rate_limiter.time_until_next_allowed<Duration>(
148 general_limit, specific_limit, general_key, specific_key);
150 if constexpr (std::is_same_v<decltype(delay), RateLimitDelay<Duration>>) {
151 result = delay;
152 } else {
153 result.duration = delay;
154 result.sequential_blocked = (delay == Duration::max());
155 }
156 return result;
157 }
158
162 return m_request_id_counter++;
163 }
164
167 uint64_t generate_group_id() {
168 return m_group_id_counter++;
169 }
170
176
179 std::size_t max_pending_requests() const {
180 return m_max_pending_requests.load();
181 }
182
186 bool has_requests_by_group_id(uint64_t group_id) const {
187 return group_request_count(group_id) != 0;
188 }
189
193 std::size_t group_request_count(uint64_t group_id) const {
194 std::lock_guard<std::mutex> lock(m_mutex);
195 return group_request_count_unlocked(group_id);
196 }
197
201 void wait_requests_by_group_id(uint64_t group_id, std::function<void()> callback) {
202 if (m_shutdown || group_id == 0) {
203 if (callback) callback();
204 return;
205 }
206
207 bool invoke_now = false;
208 {
209 std::lock_guard<std::mutex> lock(m_mutex);
210 if (group_request_count_unlocked(group_id) == 0) {
211 invoke_now = true;
212 } else {
213 m_group_waiters[group_id].push_back(std::move(callback));
214 }
215 }
216
217 if (invoke_now && callback) {
218 callback();
219 }
220 }
221
225 void cancel_request_by_id(uint64_t request_id, std::function<void()> callback) {
226 if (m_shutdown || request_id == 0) {
227 if (callback) callback();
228 return;
229 }
230 std::lock_guard<std::mutex> lock(m_mutex);
231 m_requests_to_cancel_by_id[request_id].push_back(std::move(callback));
232 }
233
237 void cancel_requests_by_group_id(uint64_t group_id, std::function<void()> callback) {
238 if (m_shutdown || group_id == 0) {
239 if (callback) callback();
240 return;
241 }
242 bool invoke_now = false;
243 {
244 std::lock_guard<std::mutex> lock(m_mutex);
245 if (group_request_count_unlocked(group_id) == 0) {
246 invoke_now = true;
247 } else {
248 m_groups_to_cancel[group_id].push_back(std::move(callback));
249 }
250 }
251 if (invoke_now && callback) {
252 callback();
253 }
254 }
255
270
273 void shutdown() override {
274 if (m_shutdown.exchange(true)) {
276 return;
277 }
282 }
283
286 const bool is_loaded() const override {
287 std::lock_guard<std::mutex> lock(m_mutex);
288 return
289 !m_pending_requests.empty() ||
290 !m_failed_requests.empty() ||
291 !m_active_request_batches.empty() ||
293 !m_groups_to_cancel.empty();
294 }
295
296 private:
297 mutable std::mutex m_mutex;
298 std::list<std::unique_ptr<HttpRequestContext>> m_pending_requests;
299 std::list<std::unique_ptr<HttpRequestContext>> m_failed_requests;
300 std::list<std::unique_ptr<HttpBatchRequestHandler>> m_active_request_batches;
301 using callback_list_t = std::list<std::function<void()>>;
302 using cancel_map_t = std::unordered_map<uint64_t, callback_list_t>;
307 std::atomic<uint64_t> m_next_in_flight_token{1};
308 std::atomic<uint64_t> m_request_id_counter = ATOMIC_VAR_INIT(1);
309 std::atomic<uint64_t> m_group_id_counter = ATOMIC_VAR_INIT(1);
310 std::atomic<bool> m_shutdown = ATOMIC_VAR_INIT(false);
311 std::atomic<std::size_t> m_max_pending_requests = ATOMIC_VAR_INIT(0);
312
313 std::size_t group_request_count_unlocked(uint64_t group_id) const {
314 if (group_id == 0) return 0;
315
316 std::size_t count = 0;
317 for (const auto& context : m_pending_requests) {
318 if (context && context->request && context->request->group_id == group_id) {
319 ++count;
320 }
321 }
322 for (const auto& context : m_failed_requests) {
323 if (context && context->request && context->request->group_id == group_id) {
324 ++count;
325 }
326 }
327 for (const auto& batch : m_active_request_batches) {
328 if (batch) {
329 count += batch->group_request_count(group_id);
330 }
331 }
332 return count;
333 }
334
336 cancel_map_t ready_waiters;
337 {
338 std::lock_guard<std::mutex> lock(m_mutex);
339 auto it = m_group_waiters.begin();
340 while (it != m_group_waiters.end()) {
341 if (group_request_count_unlocked(it->first) != 0) {
342 ++it;
343 continue;
344 }
345 ready_waiters.emplace(it->first, std::move(it->second));
346 it = m_group_waiters.erase(it);
347 }
348 }
349
350 for (const auto& item : ready_waiters) {
351 for (const auto& callback : item.second) {
352 if (callback) callback();
353 }
354 }
355 }
356
358 cancel_map_t waiters;
359 {
360 std::lock_guard<std::mutex> lock(m_mutex);
361 waiters = std::move(m_group_waiters);
362 m_group_waiters.clear();
363 }
364
365 for (const auto& item : waiters) {
366 for (const auto& callback : item.second) {
367 if (callback) callback();
368 }
369 }
370 }
371
374 std::unique_lock<std::mutex> lock(m_mutex);
375 if (m_pending_requests.empty()) return;
376
377 std::vector<std::unique_ptr<HttpRequestContext>> pending_request;
378 std::vector<std::unique_ptr<HttpRequestContext>> failed_requests;
379
380 auto it = m_pending_requests.begin();
381 while (it != m_pending_requests.end()) {
382 auto& context = *it;
383 auto& request = context->request;
384 // Check if the request is valid.
385 if (!request) {
386 failed_requests.push_back(std::move(context));
387 it = m_pending_requests.erase(it);
388 continue;
389 }
390
391 // Set up completion callback before allow_request so it is armed
392 // even if an exception occurs after the token is committed.
393 auto general_limit = request->general_rate_limit;
394 auto specific_limit = request->specific_rate_limit;
395 uint64_t token = context->in_flight_token;
396 auto general_key = request->general_rate_limit_key;
397 auto specific_key = request->specific_rate_limit_key;
398
399 // Preserve any previous on_complete so a retry that already owns
400 // a sequential token does not lose its cleanup callback.
401 auto old_on_complete = std::move(context->on_complete);
402 context->on_complete = [this, general_limit, specific_limit, token, general_key, specific_key]() {
403 m_rate_limiter.release_request(general_limit, specific_limit, token, general_key, specific_key);
404 };
405
406 // Check if the request is allowed by the rate limiter.
407 const bool allowed = m_rate_limiter.allow_request(
408 general_limit,
409 specific_limit,
410 token,
411 request->general_rate_limit_key,
412 request->specific_rate_limit_key);
413 if (!allowed) {
414 context->on_complete = std::move(old_on_complete);
415 ++it;
416 continue;
417 }
418
419 pending_request.push_back(std::move(context));
420 it = m_pending_requests.erase(it);
421 }
422 lock.unlock();
423
424 // Handle failed requests by calling their callback with a 400 status.
425 if (!failed_requests.empty()) {
426 for (const auto &context : failed_requests) {
427# if __cplusplus >= 201402L
428 auto response = std::make_unique<HttpResponse>();
429# else
430 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
431# endif
432 const long BAD_REQUEST = 400;
433 response->error_code = utils::make_error_code(CURLE_OK);
434 response->status_code = BAD_REQUEST;
435 response->ready = true;
436 context->callback(std::move(response));
437 context->complete();
438 }
439 failed_requests.clear();
440 }
441
442 // If there are ready requests, create a new HttpBatchRequestHandler to manage them.
443 if (pending_request.empty()) return;
444 try {
445# if __cplusplus >= 201402L
446 m_active_request_batches.push_back(std::make_unique<HttpBatchRequestHandler>(pending_request));
447# else
448 m_active_request_batches.push_back(std::unique_ptr<HttpBatchRequestHandler>(new HttpBatchRequestHandler(pending_request)));
449# endif
450 } catch (...) {
451 for (auto& context : pending_request) {
452 if (!context || !context->callback) continue;
453# if __cplusplus >= 201402L
454 auto response = std::make_unique<HttpResponse>();
455# else
456 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
457# endif
459 response->status_code = 499; // Client closed request
460 response->ready = true;
461 context->callback(std::move(response));
462 context->complete();
463 }
464 return;
465 }
466 }
467
470 auto it = m_active_request_batches.begin();
471 while (it != m_active_request_batches.end()) {
472 auto& request = *it;
473 if (!request->process()) {
474 ++it;
475 continue;
476 }
477 auto failed_requests = request->extract_failed_requests();
478 it = m_active_request_batches.erase(it);
479 for (auto& request : failed_requests) {
480 m_failed_requests.push_back(std::move(request));
481 }
482 }
483 }
484
487 auto it = m_failed_requests.begin();
488 while (it != m_failed_requests.end()) {
489 auto& request_context = *it;
490 if (!request_context || !request_context->request) {
491 it = m_failed_requests.erase(it);
492 continue;
493 }
494
495 const auto now = std::chrono::steady_clock::now();
496 const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - request_context->start_time);
497 const auto& retry_delay_ms = request_context->request->retry_delay_ms;
498 if (duration.count() >= retry_delay_ms) {
499 std::unique_lock<std::mutex> lock(m_mutex);
500 m_pending_requests.push_back(std::move(request_context));
501 lock.unlock();
502 it = m_failed_requests.erase(it);
503 continue;
504 }
505 ++it;
506 }
507 }
508
511 std::unique_lock<std::mutex> lock(m_mutex);
512 if (m_requests_to_cancel_by_id.empty() && m_groups_to_cancel.empty()) return;
513
514 auto requests_to_cancel = std::move(m_requests_to_cancel_by_id);
515 auto groups_to_cancel = std::move(m_groups_to_cancel);
517 m_groups_to_cancel.clear();
518
519 std::list<std::unique_ptr<HttpRequestContext>> canceled_pending_requests;
520 auto pending_it = m_pending_requests.begin();
521 while (pending_it != m_pending_requests.end()) {
522 const auto& ctx = *pending_it;
523 if (!matches_cancel(ctx, requests_to_cancel, groups_to_cancel)) {
524 ++pending_it;
525 continue;
526 }
527 canceled_pending_requests.push_back(std::move(*pending_it));
528 pending_it = m_pending_requests.erase(pending_it);
529 }
530 lock.unlock();
531
532 for (const auto& request_context : canceled_pending_requests) {
533 request_context->callback(make_cancelled_response());
534 request_context->complete();
535 }
536
537 auto failed_it = m_failed_requests.begin();
538 while (failed_it != m_failed_requests.end()) {
539 const auto& request_context = *failed_it;
540 if (!matches_cancel(request_context, requests_to_cancel, groups_to_cancel)) {
541 ++failed_it;
542 continue;
543 }
544 request_context->callback(make_cancelled_response());
545 request_context->complete();
546 failed_it = m_failed_requests.erase(failed_it);
547 }
548
549 for (const auto &handler : m_active_request_batches) {
550 handler->cancel_requests(requests_to_cancel, groups_to_cancel);
551 }
552
553 invoke_cancel_callbacks(requests_to_cancel);
554 invoke_cancel_callbacks(groups_to_cancel);
555 }
556
557 static bool matches_cancel(
558 const std::unique_ptr<HttpRequestContext>& ctx,
559 const cancel_map_t& requests_to_cancel,
560 const cancel_map_t& groups_to_cancel) {
561 if (!ctx || !ctx->request) return false;
562 const auto request_id = ctx->request->request_id;
563 const auto group_id = ctx->request->group_id;
564 return
565 (request_id != 0 && requests_to_cancel.count(request_id) > 0) ||
566 (group_id != 0 && groups_to_cancel.count(group_id) > 0);
567 }
568
570# if __cplusplus >= 201402L
571 auto response = std::make_unique<HttpResponse>();
572# else
573 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
574# endif
576 response->status_code = 499;
577 response->ready = true;
578 return response;
579 }
580
581 static void invoke_cancel_callbacks(const cancel_map_t& requests_to_cancel) {
582 for (const auto& request : requests_to_cancel) {
583 for (const auto& callback : request.second) {
584 if (callback) callback();
585 }
586 }
587 }
588
591 std::unique_lock<std::mutex> lock(m_mutex);
592 auto pending_requests = std::move(m_pending_requests);
593 auto failed_requests = std::move(m_failed_requests);
594 m_pending_requests.clear();
595 m_failed_requests.clear();
596 lock.unlock();
597
598 for (const auto &request_context : pending_requests) {
599# if __cplusplus >= 201402L
600 auto response = std::make_unique<HttpResponse>();
601# else
602 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
603# endif
604 const long CANCELED_REQUEST_CODE = 499;
605 response->error_code = utils::make_error_code(CURLE_OK);
606 response->status_code = CANCELED_REQUEST_CODE;
607 response->ready = true;
608 request_context->callback(std::move(response));
609 request_context->complete();
610 }
611 for (const auto &request_context : failed_requests) {
612# if __cplusplus >= 201402L
613 auto response = std::make_unique<HttpResponse>();
614# else
615 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
616# endif
617 const long CANCELED_REQUEST_CODE = 499;
618 response->error_code = utils::make_error_code(CURLE_OK);
619 response->status_code = CANCELED_REQUEST_CODE;
620 response->ready = true;
621 request_context->callback(std::move(response));
622 request_context->complete();
623 }
624 }
625
628 curl_global_init(CURL_GLOBAL_ALL);
629 }
630
633 curl_global_cleanup();
634 }
635
638
641
642 }; // HttpRequestManager
643
644}; // namespace kurlyk
645
646#endif // _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
Manages multiple asynchronous HTTP requests using libcurl's multi interface.
Defines the RateLimitDelay result type for time-until-allowed queries.
Defines RAII handle for keeping HTTP rate limits alive.
Defines the HttpRateLimiter class for managing rate limits on HTTP requests.
Defines the HttpRequestContext class for managing HTTP request context, including retries and timing.
Handles multiple asynchronous HTTP requests using libcurl's multi interface.
Manages rate limits for HTTP requests.
Represents the context of an HTTP request, including the request object, callback function,...
HttpRequestManager(const HttpRequestManager &)=delete
Deleted copy constructor to enforce the singleton pattern.
cancel_map_t m_group_waiters
Map of group IDs to callbacks waiting until a group becomes idle.
std::size_t max_pending_requests() const
Returns the current maximum pending request count.
std::atomic< bool > m_shutdown
Flag indicating if shutdown has been requested.
static bool matches_cancel(const std::unique_ptr< HttpRequestContext > &ctx, const cancel_map_t &requests_to_cancel, const cancel_map_t &groups_to_cancel)
uint64_t generate_group_id()
Generates a new group ID.
bool has_requests_by_group_id(uint64_t group_id) const
Checks whether pending, failed, or active requests exist for a group.
void set_max_pending_requests(std::size_t max_pending_requests)
Sets the maximum number of pending requests accepted into the global queue.
cancel_map_t m_groups_to_cancel
Map of group IDs to their associated cancellation callbacks.
std::list< std::unique_ptr< HttpRequestContext > > m_failed_requests
List of failed HTTP requests for retrying.
void process_active_requests()
Processes active requests, moving failed ones to the failed requests list for retrying.
std::atomic< uint64_t > m_group_id_counter
Atomic counter for group IDs.
bool remove_limit(const HttpRateLimitHandlePtr &limit)
Releases manager-owned handle for the specified rate-limit handle.
std::size_t group_request_count(uint64_t group_id) const
Counts pending, failed, and active requests for a group.
void cancel_request_by_id(uint64_t request_id, std::function< void()> callback)
Cancels one request by request ID.
std::atomic< uint64_t > m_next_in_flight_token
Atomic counter for sequential rate-limit tokens.
static HttpRequestManager & get_instance()
Get the singleton instance of HttpRequestManager.
std::list< std::unique_ptr< HttpBatchRequestHandler > > m_active_request_batches
List of currently active HTTP request batches.
std::atomic< uint64_t > m_request_id_counter
Atomic counter for unique request IDs.
void process_retry_failed_requests()
Attempts to retry failed requests if their retry delay has passed.
HttpRateLimitHandlePtr get_rate_limit(long limit_id)
Returns a handle for a registered rate limit ID.
std::atomic< std::size_t > m_max_pending_requests
Maximum number of requests accepted into the pending queue, or zero if unbounded.
void wait_requests_by_group_id(uint64_t group_id, std::function< void()> callback)
Registers a callback invoked after all requests from a group finish.
virtual ~HttpRequestManager()
Private destructor to clean up global resources.
RateLimitDelay< Duration > time_until_next_allowed(const HttpRateLimitHandlePtr &general_limit, const HttpRateLimitHandlePtr &specific_limit, const std::string &general_key, const std::string &specific_key)
Calculates delay until request is allowed by two handles.
SubmitResult submit_request(std::unique_ptr< HttpRequest > request_ptr, HttpResponseCallback callback)
Attempts to enqueue a new HTTP request and reports the admission result.
HttpRateLimiter m_rate_limiter
Rate limiter for controlling request frequency.
std::list< std::function< void()> > callback_list_t
const bool is_loaded() const override
Checks if there are active, pending, or failed requests.
std::mutex m_mutex
Mutex to protect access to manager state shared with public entry points.
std::size_t group_request_count_unlocked(uint64_t group_id) const
void process_cancel_requests()
Processes and cancels HTTP requests based on their request or group IDs.
bool remove_limit(long limit_id)
Removes an existing rate limit with the specified identifier.
void cancel_requests_by_group_id(uint64_t group_id, std::function< void()> callback)
Cancels all requests with the specified group ID.
bool allow_request(const HttpRateLimitHandlePtr &general_limit, const HttpRateLimitHandlePtr &specific_limit, uint64_t in_flight_token, const std::string &general_key, const std::string &specific_key)
Checks if a request is allowed by two optional rate-limit handles.
cancel_map_t m_requests_to_cancel_by_id
Map of request IDs to their associated cancellation callbacks.
static HttpResponsePtr make_cancelled_response()
void release_request(const HttpRateLimitHandlePtr &general_limit, const HttpRateLimitHandlePtr &specific_limit, uint64_t in_flight_token, const std::string &general_key, const std::string &specific_key)
Releases in-flight tokens for sequential rate limits.
std::unordered_map< uint64_t, callback_list_t > cancel_map_t
uint64_t generate_request_id()
Generates a new unique request ID.
bool add_request(std::unique_ptr< HttpRequest > request_ptr, HttpResponseCallback callback)
Adds a new HTTP request to the manager.
void shutdown() override
Shuts down the request manager, clearing all active and pending requests.
HttpRequestManager & operator=(const HttpRequestManager &)=delete
Deleted copy assignment operator to enforce the singleton pattern.
static void invoke_cancel_callbacks(const cancel_map_t &requests_to_cancel)
HttpRequestManager()
Private constructor to initialize global resources (e.g., cURL).
void cleanup_pending_requests()
Cleans up pending requests, marking each as failed and invoking its callback.
HttpRateLimitHandlePtr create_rate_limit(long requests_per_period, long period_ms, bool sequential=false)
Creates a rate-limit handle with specified parameters.
void process() override
Processes all requests in the manager.
void process_pending_requests()
Processes all pending requests, moving valid requests to active batches or marking them as failed.
std::list< std::unique_ptr< HttpRequestContext > > m_pending_requests
List of pending HTTP requests awaiting processing.
Represents an HTTP response.
Interface for modules managed by NetworkWorker (e.g., HTTP, WebSocket).
@ ShuttingDown
Operation was rejected because the owning subsystem is shutting down.
@ AbortedDuringDestruction
Request handler was destroyed before completion, causing the request to abort.
@ QueueLimitExceeded
Operation was rejected because the bounded queue is already full.
@ CancelledByUser
Request was cancelled explicitly by the user via cancel().
std::error_code make_error_code(ClientError e)
Creates a std::error_code from a ClientError value.
Primary namespace for the Kurlyk library, encompassing initialization, request management,...
std::function< void(HttpResponsePtr response)> HttpResponseCallback
Callback invoked with an HTTP response.
std::unique_ptr< HttpResponse > HttpResponsePtr
Owning pointer to an HTTP response.
std::shared_ptr< HttpRateLimitHandle > HttpRateLimitHandlePtr
Shared RAII handle for HTTP rate limits.
Result type for time-until-allowed queries.
Represents the synchronous result of trying to enqueue or submit work.
bool accepted
Indicates whether the work item was accepted for processing.