Kurlyk
Loading...
Searching...
No Matches
HttpRequestManager.hpp
Go to the documentation of this file.
1#pragma once
2#ifndef _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
3#define _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
4
7
13
14namespace kurlyk {
15
19 public:
20
24 static HttpRequestManager* instance = new HttpRequestManager();
25 return *instance;
26 }
27
32 const bool add_request(
33 std::unique_ptr<HttpRequest> request_ptr,
34 HttpResponseCallback callback) {
35 if (m_shutdown) return false;
36 std::lock_guard<std::mutex> lock(m_mutex);
37# if __cplusplus >= 201402L
38 m_pending_requests.push_back(std::make_unique<HttpRequestContext>(std::move(request_ptr), std::move(callback)));
39# else
40 m_pending_requests.push_back(std::unique_ptr<HttpRequestContext>(
41 new HttpRequestContext(std::move(request_ptr), std::move(callback))));
42# endif
43 return true;
44 }
45
50 const long create_rate_limit(long requests_per_period, long period_ms) {
51 return m_rate_limiter.create_limit(requests_per_period, period_ms);
52 }
53
57 bool remove_limit(long limit_id) {
58 return m_rate_limiter.remove_limit(limit_id);
59 }
60
64 return m_request_id_counter++;
65 }
66
70 void cancel_request_by_id(uint64_t request_id, std::function<void()> callback) {
71 if (m_shutdown) {
72 if (callback) callback();
73 return;
74 }
75 std::lock_guard<std::mutex> lock(m_mutex);
76 m_requests_to_cancel[request_id].push_back(std::move(callback));
77 }
78
88
91 void shutdown() override {
92 m_shutdown = true;
96 }
97
100 const bool is_loaded() const override {
101 std::lock_guard<std::mutex> lock(m_mutex);
102 return
103 !m_pending_requests.empty() ||
104 !m_failed_requests.empty() ||
105 !m_active_request_batches.empty() ||
106 !m_requests_to_cancel.empty();
107 }
108
109 private:
110 mutable std::mutex m_mutex;
111 std::list<std::unique_ptr<HttpRequestContext>> m_pending_requests;
112 std::list<std::unique_ptr<HttpRequestContext>> m_failed_requests;
113 std::list<std::unique_ptr<HttpBatchRequestHandler>> m_active_request_batches;
114 using callback_list_t = std::list<std::function<void()>>;
115 std::unordered_map<uint64_t, callback_list_t> m_requests_to_cancel;
117 std::atomic<uint64_t> m_request_id_counter = ATOMIC_VAR_INIT(1);
118 std::atomic<bool> m_shutdown = ATOMIC_VAR_INIT(false);
119
122 std::unique_lock<std::mutex> lock(m_mutex);
123 if (m_pending_requests.empty()) return;
124
125 std::vector<std::unique_ptr<HttpRequestContext>> pending_request;
126 std::vector<std::unique_ptr<HttpRequestContext>> failed_requests;
127
128 auto it = m_pending_requests.begin();
129 while (it != m_pending_requests.end()) {
130 auto& context = *it;
131 auto& request = context->request;
132 // Check if the request is valid.
133 if (!request) {
134 failed_requests.push_back(std::move(context));
135 it = m_pending_requests.erase(it);
136 continue;
137 }
138
139 // Check if the request is allowed by the rate limiter.
140 const bool allowed = m_rate_limiter.allow_request(
141 request->general_rate_limit_id,
142 request->specific_rate_limit_id);
143 if (!allowed) {
144 ++it;
145 continue;
146 }
147 pending_request.push_back(std::move(context));
148 it = m_pending_requests.erase(it);
149 }
150 lock.unlock();
151
152 // Handle failed requests by calling their callback with a 400 status.
153 if (!failed_requests.empty()) {
154 for (const auto &context : failed_requests) {
155# if __cplusplus >= 201402L
156 auto response = std::make_unique<HttpResponse>();
157# else
158 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
159# endif
160 const long BAD_REQUEST = 400;
161 response->error_code = utils::make_error_code(CURLE_OK);
162 response->status_code = BAD_REQUEST;
163 response->ready = true;
164 context->callback(std::move(response));
165 }
166 failed_requests.clear();
167 }
168
169 // If there are ready requests, create a new HttpBatchRequestHandler to manage them.
170 if (pending_request.empty()) return;
171# if __cplusplus >= 201402L
172 m_active_request_batches.push_back(std::make_unique<HttpBatchRequestHandler>(pending_request));
173# else
174 m_active_request_batches.push_back(std::unique_ptr<HttpBatchRequestHandler>(new HttpBatchRequestHandler(pending_request)));
175# endif
176 }
177
180 auto it = m_active_request_batches.begin();
181 while (it != m_active_request_batches.end()) {
182 auto& request = *it;
183 if (!request->process()) {
184 ++it;
185 continue;
186 }
187 auto failed_requests = request->extract_failed_requests();
188 it = m_active_request_batches.erase(it);
189 for (auto& request : failed_requests) {
190 m_failed_requests.push_back(std::move(request));
191 }
192 }
193 }
194
197 auto it = m_failed_requests.begin();
198 while (it != m_failed_requests.end()) {
199 auto& request_context = *it;
200 if (!request_context || !request_context->request) {
201 it = m_failed_requests.erase(it);
202 continue;
203 }
204
205 const auto now = std::chrono::steady_clock::now();
206 const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - request_context->start_time);
207 const auto& retry_delay_ms = request_context->request->retry_delay_ms;
208 if (duration.count() >= retry_delay_ms) {
209 std::unique_lock<std::mutex> lock(m_mutex);
210 m_pending_requests.push_back(std::move(request_context));
211 lock.unlock();
212 it = m_failed_requests.erase(it);
213 continue;
214 }
215 ++it;
216 }
217 }
218
221 std::unique_lock<std::mutex> lock(m_mutex);
222 if (m_requests_to_cancel.empty()) return;
223
224 auto requests_to_cancel = std::move(m_requests_to_cancel);
225 m_requests_to_cancel.clear();
226 lock.unlock();
227
228 for (const auto &request_context : m_failed_requests) {
229 if (!requests_to_cancel.count(request_context->request->request_id)) continue;
230# if __cplusplus >= 201402L
231 auto response = std::make_unique<HttpResponse>();
232# else
233 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
234# endif
235 const long CANCELED_REQUEST_CODE = 499;
236 response->error_code = utils::make_error_code(CURLE_OK);
237 response->status_code = CANCELED_REQUEST_CODE;
238 response->ready = true;
239 request_context->callback(std::move(response));
240 }
241
242 m_failed_requests.remove_if([&](const std::unique_ptr<HttpRequestContext>& ctx) {
243 return ctx && ctx->request && requests_to_cancel.count(ctx->request->request_id) > 0;
244 });
245
246
247 for (const auto &handler : m_active_request_batches) {
248 handler->cancel_request_by_id(requests_to_cancel);
249 }
250
251 for (const auto &request : requests_to_cancel) {
252 for (const auto &callback : request.second) {
253 if (callback) callback();
254 }
255 }
256 }
257
260 std::unique_lock<std::mutex> lock(m_mutex);
261 auto pending_requests = std::move(m_pending_requests);
262 m_pending_requests.clear();
263 lock.unlock();
264
265 for (const auto &request_context : pending_requests) {
266# if __cplusplus >= 201402L
267 auto response = std::make_unique<HttpResponse>();
268# else
269 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
270# endif
271 const long CANCELED_REQUEST_CODE = 499;
272 response->error_code = utils::make_error_code(CURLE_OK);
273 response->status_code = CANCELED_REQUEST_CODE;
274 response->ready = true;
275 request_context->callback(std::move(response));
276 }
277 for (const auto &request_context : m_failed_requests) {
278# if __cplusplus >= 201402L
279 auto response = std::make_unique<HttpResponse>();
280# else
281 auto response = std::unique_ptr<HttpResponse>(new HttpResponse());
282# endif
283 const long CANCELED_REQUEST_CODE = 499;
284 response->error_code = utils::make_error_code(CURLE_OK);
285 response->status_code = CANCELED_REQUEST_CODE;
286 response->ready = true;
287 request_context->callback(std::move(response));
288 }
289 }
290
293 curl_global_init(CURL_GLOBAL_ALL);
294 }
295
298 curl_global_cleanup();
299 }
300
303
306
307 }; // HttpRequestManager
308
309}; // namespace kurlyk
310
311#endif // _KURLYK_HTTP_REQUEST_MANAGER_HPP_INCLUDED
Manages multiple asynchronous HTTP requests using libcurl's multi interface.
Defines the HttpRateLimiter class for managing rate limits on HTTP requests.
Defines the HttpRequestContext class for managing HTTP request context, including retries and timing.
Handles multiple asynchronous HTTP requests using libcurl's multi interface.
Manages rate limits for HTTP requests, ensuring compliance with set limits.
Represents the context of an HTTP request, including the request object, callback function,...
HttpRequestManager(const HttpRequestManager &)=delete
Deleted copy constructor to enforce the singleton pattern.
std::atomic< bool > m_shutdown
Flag indicating if shutdown has been requested.
const bool add_request(std::unique_ptr< HttpRequest > request_ptr, HttpResponseCallback callback)
Adds a new HTTP request to the manager.
std::unordered_map< uint64_t, callback_list_t > m_requests_to_cancel
Map of request IDs to their associated cancellation callbacks.
const long create_rate_limit(long requests_per_period, long period_ms)
Creates a rate limit with specified parameters.
std::list< std::unique_ptr< HttpRequestContext > > m_failed_requests
List of failed HTTP requests for retrying.
void process_active_requests()
Processes active requests, moving failed ones to the failed requests list for retrying.
void cancel_request_by_id(uint64_t request_id, std::function< void()> callback)
Cancels a request by its unique identifier.
static HttpRequestManager & get_instance()
Get the singleton instance of HttpRequestManager.
std::list< std::unique_ptr< HttpBatchRequestHandler > > m_active_request_batches
List of currently active HTTP request batches.
std::atomic< uint64_t > m_request_id_counter
Atomic counter for unique request IDs.
void process_retry_failed_requests()
Attempts to retry failed requests if their retry delay has passed.
virtual ~HttpRequestManager()
Private destructor to clean up global resources.
HttpRateLimiter m_rate_limiter
Rate limiter for controlling request frequency.
std::list< std::function< void()> > callback_list_t
const bool is_loaded() const override
Checks if there are active, pending, or failed requests.
std::mutex m_mutex
Mutex to protect access to the pending requests list and requests-to-cancel map.
void process_cancel_requests()
Processes and cancels HTTP requests based on their IDs.
bool remove_limit(long limit_id)
Removes an existing rate limit with the specified identifier.
uint64_t generate_request_id()
Generates a new unique request ID.
void shutdown() override
Shuts down the request manager, clearing all active and pending requests.
HttpRequestManager & operator=(const HttpRequestManager &)=delete
Deleted copy assignment operator to enforce the singleton pattern.
HttpRequestManager()
Private constructor to initialize global resources (e.g., cURL).
void cleanup_pending_requests()
Cleans up pending requests, marking each as failed and invoking its callback.
void process() override
Processes all requests in the manager.
void process_pending_requests()
Processes all pending requests, moving valid requests to active batches or marking them as failed.
std::list< std::unique_ptr< HttpRequestContext > > m_pending_requests
List of pending HTTP requests awaiting processing.
Represents the response received from an HTTP request, including headers, content,...
Interface for modules managed by NetworkWorker (e.g., HTTP, WebSocket).
std::error_code make_error_code(ClientError e)
Creates a std::error_code from a ClientError value.
Primary namespace for the Kurlyk library, encompassing initialization, request management,...
std::function< void(HttpResponsePtr response)> HttpResponseCallback
Type definition for the callback function used to handle HTTP responses.