2#ifndef _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
3#define _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
38 std::unique_ptr<HttpRequest> request_ptr,
48 std::unique_ptr<HttpRequest> request_ptr,
50 std::lock_guard<std::mutex> lock(
m_mutex);
60# if __cplusplus >= 201402L
61 auto context = std::make_unique<HttpRequestContext>(std::move(request_ptr), std::move(callback));
63 auto context = std::unique_ptr<HttpRequestContext>(
78 return m_rate_limiter.create_limit_handle(requests_per_period, period_ms, sequential);
110 uint64_t in_flight_token,
111 const std::string& general_key,
112 const std::string& specific_key) {
114 general_limit, specific_limit, in_flight_token, general_key, specific_key);
126 uint64_t in_flight_token,
127 const std::string& general_key,
128 const std::string& specific_key) {
130 general_limit, specific_limit, in_flight_token, general_key, specific_key);
140 template<
typename Duration = std::chrono::milliseconds>
144 const std::string& general_key,
145 const std::string& specific_key
148 general_limit, specific_limit, general_key, specific_key);
153 result.duration = delay;
154 result.sequential_blocked = (delay == Duration::max());
194 std::lock_guard<std::mutex> lock(
m_mutex);
203 if (callback) callback();
207 bool invoke_now =
false;
209 std::lock_guard<std::mutex> lock(
m_mutex);
217 if (invoke_now && callback) {
227 if (callback) callback();
230 std::lock_guard<std::mutex> lock(
m_mutex);
239 if (callback) callback();
242 bool invoke_now =
false;
244 std::lock_guard<std::mutex> lock(
m_mutex);
251 if (invoke_now && callback) {
287 std::lock_guard<std::mutex> lock(
m_mutex);
314 if (group_id == 0)
return 0;
316 std::size_t count = 0;
318 if (context && context->request && context->request->group_id == group_id) {
323 if (context && context->request && context->request->group_id == group_id) {
329 count += batch->group_request_count(group_id);
338 std::lock_guard<std::mutex> lock(
m_mutex);
345 ready_waiters.emplace(it->first, std::move(it->second));
350 for (
const auto& item : ready_waiters) {
351 for (
const auto& callback : item.second) {
352 if (callback) callback();
360 std::lock_guard<std::mutex> lock(
m_mutex);
365 for (
const auto& item : waiters) {
366 for (
const auto& callback : item.second) {
367 if (callback) callback();
374 std::unique_lock<std::mutex> lock(
m_mutex);
377 std::vector<std::unique_ptr<HttpRequestContext>> pending_request;
378 std::vector<std::unique_ptr<HttpRequestContext>> failed_requests;
383 auto& request = context->request;
386 failed_requests.push_back(std::move(context));
393 auto general_limit = request->general_rate_limit;
394 auto specific_limit = request->specific_rate_limit;
395 uint64_t token = context->in_flight_token;
396 auto general_key = request->general_rate_limit_key;
397 auto specific_key = request->specific_rate_limit_key;
401 auto old_on_complete = std::move(context->on_complete);
402 context->on_complete = [
this, general_limit, specific_limit, token, general_key, specific_key]() {
403 m_rate_limiter.release_request(general_limit, specific_limit, token, general_key, specific_key);
411 request->general_rate_limit_key,
412 request->specific_rate_limit_key);
414 context->on_complete = std::move(old_on_complete);
419 pending_request.push_back(std::move(context));
425 if (!failed_requests.empty()) {
426 for (
const auto &context : failed_requests) {
427# if __cplusplus >= 201402L
428 auto response = std::make_unique<HttpResponse>();
430 auto response = std::unique_ptr<HttpResponse>(
new HttpResponse());
432 const long BAD_REQUEST = 400;
434 response->status_code = BAD_REQUEST;
435 response->ready =
true;
436 context->callback(std::move(response));
439 failed_requests.clear();
443 if (pending_request.empty())
return;
445# if __cplusplus >= 201402L
451 for (
auto& context : pending_request) {
452 if (!context || !context->callback)
continue;
453# if __cplusplus >= 201402L
454 auto response = std::make_unique<HttpResponse>();
456 auto response = std::unique_ptr<HttpResponse>(
new HttpResponse());
459 response->status_code = 499;
460 response->ready =
true;
461 context->callback(std::move(response));
473 if (!request->process()) {
477 auto failed_requests = request->extract_failed_requests();
479 for (
auto& request : failed_requests) {
489 auto& request_context = *it;
490 if (!request_context || !request_context->request) {
495 const auto now = std::chrono::steady_clock::now();
496 const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - request_context->start_time);
497 const auto& retry_delay_ms = request_context->request->retry_delay_ms;
498 if (duration.count() >= retry_delay_ms) {
499 std::unique_lock<std::mutex> lock(
m_mutex);
511 std::unique_lock<std::mutex> lock(
m_mutex);
519 std::list<std::unique_ptr<HttpRequestContext>> canceled_pending_requests;
522 const auto& ctx = *pending_it;
527 canceled_pending_requests.push_back(std::move(*pending_it));
532 for (
const auto& request_context : canceled_pending_requests) {
534 request_context->complete();
539 const auto& request_context = *failed_it;
540 if (!
matches_cancel(request_context, requests_to_cancel, groups_to_cancel)) {
545 request_context->complete();
550 handler->cancel_requests(requests_to_cancel, groups_to_cancel);
558 const std::unique_ptr<HttpRequestContext>& ctx,
561 if (!ctx || !ctx->request)
return false;
562 const auto request_id = ctx->request->request_id;
563 const auto group_id = ctx->request->group_id;
565 (request_id != 0 && requests_to_cancel.count(request_id) > 0) ||
566 (group_id != 0 && groups_to_cancel.count(group_id) > 0);
570# if __cplusplus >= 201402L
571 auto response = std::make_unique<HttpResponse>();
573 auto response = std::unique_ptr<HttpResponse>(
new HttpResponse());
576 response->status_code = 499;
577 response->ready =
true;
582 for (
const auto& request : requests_to_cancel) {
583 for (
const auto& callback : request.second) {
584 if (callback) callback();
591 std::unique_lock<std::mutex> lock(
m_mutex);
598 for (
const auto &request_context : pending_requests) {
599# if __cplusplus >= 201402L
600 auto response = std::make_unique<HttpResponse>();
602 auto response = std::unique_ptr<HttpResponse>(
new HttpResponse());
604 const long CANCELED_REQUEST_CODE = 499;
606 response->status_code = CANCELED_REQUEST_CODE;
607 response->ready =
true;
608 request_context->callback(std::move(response));
609 request_context->complete();
611 for (
const auto &request_context : failed_requests) {
612# if __cplusplus >= 201402L
613 auto response = std::make_unique<HttpResponse>();
615 auto response = std::unique_ptr<HttpResponse>(
new HttpResponse());
617 const long CANCELED_REQUEST_CODE = 499;
619 response->status_code = CANCELED_REQUEST_CODE;
620 response->ready =
true;
621 request_context->callback(std::move(response));
622 request_context->complete();
628 curl_global_init(CURL_GLOBAL_ALL);
633 curl_global_cleanup();
Manages multiple asynchronous HTTP requests using libcurl's multi interface.
Defines the RateLimitDelay result type for time-until-allowed queries.
Defines RAII handle for keeping HTTP rate limits alive.
Defines the HttpRateLimiter class for managing rate limits on HTTP requests.
Defines the HttpRequestContext class for managing HTTP request context, including retries and timing.
Handles multiple asynchronous HTTP requests using libcurl's multi interface.
Manages rate limits for HTTP requests.
Represents the context of an HTTP request, including the request object, callback function,...
HttpRequestManager(const HttpRequestManager &)=delete
Deleted copy constructor to enforce the singleton pattern.
void notify_group_waiters_if_idle()
cancel_map_t m_group_waiters
Map of group IDs to callbacks waiting until a group becomes idle.
std::size_t max_pending_requests() const
Returns the current maximum pending request count.
std::atomic< bool > m_shutdown
Flag indicating if shutdown has been requested.
static bool matches_cancel(const std::unique_ptr< HttpRequestContext > &ctx, const cancel_map_t &requests_to_cancel, const cancel_map_t &groups_to_cancel)
uint64_t generate_group_id()
Generates a new group ID.
bool has_requests_by_group_id(uint64_t group_id) const
Checks whether pending, failed, or active requests exist for a group.
void set_max_pending_requests(std::size_t max_pending_requests)
Sets the maximum number of pending requests accepted into the global queue.
cancel_map_t m_groups_to_cancel
Map of group IDs to their associated cancellation callbacks.
std::list< std::unique_ptr< HttpRequestContext > > m_failed_requests
List of failed HTTP requests for retrying.
void process_active_requests()
Processes active requests, moving failed ones to the failed requests list for retrying.
std::atomic< uint64_t > m_group_id_counter
Atomic counter for group IDs.
bool remove_limit(const HttpRateLimitHandlePtr &limit)
Releases manager-owned handle for the specified rate-limit handle.
std::size_t group_request_count(uint64_t group_id) const
Counts pending, failed, and active requests for a group.
void cancel_request_by_id(uint64_t request_id, std::function< void()> callback)
Cancels one request by request ID.
std::atomic< uint64_t > m_next_in_flight_token
Atomic counter for sequential rate-limit tokens.
static HttpRequestManager & get_instance()
Get the singleton instance of HttpRequestManager.
std::list< std::unique_ptr< HttpBatchRequestHandler > > m_active_request_batches
List of currently active HTTP request batches.
std::atomic< uint64_t > m_request_id_counter
Atomic counter for unique request IDs.
void process_retry_failed_requests()
Attempts to retry failed requests if their retry delay has passed.
HttpRateLimitHandlePtr get_rate_limit(long limit_id)
Returns a handle for a registered rate limit ID.
std::atomic< std::size_t > m_max_pending_requests
Maximum number of requests accepted into the pending queue, or zero if unbounded.
void wait_requests_by_group_id(uint64_t group_id, std::function< void()> callback)
Registers a callback invoked after all requests from a group finish.
virtual ~HttpRequestManager()
Private destructor to clean up global resources.
RateLimitDelay< Duration > time_until_next_allowed(const HttpRateLimitHandlePtr &general_limit, const HttpRateLimitHandlePtr &specific_limit, const std::string &general_key, const std::string &specific_key)
Calculates delay until request is allowed by two handles.
SubmitResult submit_request(std::unique_ptr< HttpRequest > request_ptr, HttpResponseCallback callback)
Attempts to enqueue a new HTTP request and reports the admission result.
HttpRateLimiter m_rate_limiter
Rate limiter for controlling request frequency.
std::list< std::function< void()> > callback_list_t
void notify_all_group_waiters()
const bool is_loaded() const override
Checks if there are active, pending, or failed requests.
std::mutex m_mutex
Mutex to protect access to manager state shared with public entry points.
std::size_t group_request_count_unlocked(uint64_t group_id) const
void process_cancel_requests()
Processes and cancels HTTP requests based on their request or group IDs.
bool remove_limit(long limit_id)
Removes an existing rate limit with the specified identifier.
void cancel_requests_by_group_id(uint64_t group_id, std::function< void()> callback)
Cancels all requests with the specified group ID.
bool allow_request(const HttpRateLimitHandlePtr &general_limit, const HttpRateLimitHandlePtr &specific_limit, uint64_t in_flight_token, const std::string &general_key, const std::string &specific_key)
Checks if a request is allowed by two optional rate-limit handles.
cancel_map_t m_requests_to_cancel_by_id
Map of request IDs to their associated cancellation callbacks.
static HttpResponsePtr make_cancelled_response()
void release_request(const HttpRateLimitHandlePtr &general_limit, const HttpRateLimitHandlePtr &specific_limit, uint64_t in_flight_token, const std::string &general_key, const std::string &specific_key)
Releases in-flight tokens for sequential rate limits.
std::unordered_map< uint64_t, callback_list_t > cancel_map_t
uint64_t generate_request_id()
Generates a new unique request ID.
bool add_request(std::unique_ptr< HttpRequest > request_ptr, HttpResponseCallback callback)
Adds a new HTTP request to the manager.
void shutdown() override
Shuts down the request manager, clearing all active and pending requests.
HttpRequestManager & operator=(const HttpRequestManager &)=delete
Deleted copy assignment operator to enforce the singleton pattern.
static void invoke_cancel_callbacks(const cancel_map_t &requests_to_cancel)
HttpRequestManager()
Private constructor to initialize global resources (e.g., cURL).
void cleanup_pending_requests()
Cleans up pending requests, marking each as failed and invoking its callback.
HttpRateLimitHandlePtr create_rate_limit(long requests_per_period, long period_ms, bool sequential=false)
Creates a rate-limit handle with specified parameters.
void process() override
Processes all requests in the manager.
void process_pending_requests()
Processes all pending requests, moving valid requests to active batches or marking them as failed.
std::list< std::unique_ptr< HttpRequestContext > > m_pending_requests
List of pending HTTP requests awaiting processing.
Represents an HTTP response.
Interface for modules managed by NetworkWorker (e.g., HTTP, WebSocket).
@ ShuttingDown
Operation was rejected because the owning subsystem is shutting down.
@ AbortedDuringDestruction
Request handler was destroyed before completion, causing the request to abort.
@ QueueLimitExceeded
Operation was rejected because the bounded queue is already full.
@ CancelledByUser
Request was cancelled explicitly by the user via cancel().
std::error_code make_error_code(ClientError e)
Creates a std::error_code from a ClientError value.
Primary namespace for the Kurlyk library, encompassing initialization, request management,...
std::function< void(HttpResponsePtr response)> HttpResponseCallback
Callback invoked with an HTTP response.
std::unique_ptr< HttpResponse > HttpResponsePtr
Owning pointer to an HTTP response.
std::shared_ptr< HttpRateLimitHandle > HttpRateLimitHandlePtr
Shared RAII handle for HTTP rate limits.
Result type for time-until-allowed queries.
Represents the synchronous result of trying to enqueue or submit work.
bool accepted
Indicates whether the work item was accepted for processing.