Skip to content

Commit b7ea5fe

Browse files
committed
Refactor concurrency management in HTTP client
1 parent 2de7628 commit b7ea5fe

File tree

2 files changed

+80
-51
lines changed

2 files changed

+80
-51
lines changed

httpclient/httpclient_concurrency_management.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"go.uber.org/zap"
1414
)
1515

16-
//------ Constants and Data Structures:
16+
// Constants and Data Structures:
1717

1818
const (
1919
MaxConcurrency = 10 // Maximum allowed concurrent requests
@@ -31,9 +31,62 @@ type ConcurrencyManager struct {
3131
lastTokenAcquisitionTime time.Time
3232
}
3333

34+
// requestIDKey is type used as a key for storing and retrieving
35+
// request-specific identifiers from a context.Context object. This private
36+
// type ensures that the key is distinct and prevents accidental value
37+
// retrieval or conflicts with other context keys. The value associated
38+
// with this key in a context is typically a UUID that uniquely identifies
39+
// a request being processed by the ConcurrencyManager, allowing for
40+
// fine-grained control and tracking of concurrent HTTP requests.
3441
type requestIDKey struct{}
3542

36-
//------ Constructor and Helper Functions:
43+
// Functions:
44+
45+
// AcquireConcurrencyToken attempts to acquire a token from the ConcurrencyManager to manage the number of concurrent requests.
46+
// This function is designed to ensure that the HTTP client adheres to predefined concurrency limits, preventing an excessive number of simultaneous requests.
47+
// It creates a new context with a timeout to avoid indefinite blocking in case the concurrency limit is reached.
48+
// Upon successfully acquiring a token, it records the time taken to acquire the token and updates performance metrics accordingly.
49+
// The function then adds the acquired request ID to the context, which can be used for tracking and managing individual requests.
50+
//
51+
// Parameters:
52+
// - ctx: The parent context from which the new context with timeout will be derived. This allows for proper request cancellation and timeout handling.
53+
// - log: An instance of a logger (conforming to the logger.Logger interface), used to log relevant information and errors during the token acquisition process.
54+
//
55+
// Returns:
56+
// - A new context containing the acquired request ID, which should be passed to subsequent operations requiring concurrency control.
57+
// - An error if the token could not be acquired within the timeout period or due to any other issues encountered by the ConcurrencyManager.
58+
//
59+
// Usage:
60+
// This function should be called before making an HTTP request that needs to be controlled for concurrency.
61+
// The returned context should be used for the HTTP request to ensure it is associated with the acquired concurrency token.
62+
func (c *Client) AcquireConcurrencyToken(ctx context.Context, log logger.Logger) (context.Context, error) {
63+
// Measure the token acquisition start time
64+
tokenAcquisitionStart := time.Now()
65+
66+
// Create a new context with a timeout for acquiring the concurrency token
67+
ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
68+
defer cancel()
69+
70+
requestID, err := c.ConcurrencyMgr.Acquire(ctxWithTimeout)
71+
if err != nil {
72+
log.Error("Failed to acquire concurrency token", zap.Error(err))
73+
return nil, err
74+
}
75+
76+
// Calculate the duration it took to acquire the token
77+
tokenAcquisitionDuration := time.Since(tokenAcquisitionStart)
78+
79+
// Lock the mutex before updating the performance metrics
80+
c.PerfMetrics.lock.Lock()
81+
c.PerfMetrics.TokenWaitTime += tokenAcquisitionDuration
82+
c.PerfMetrics.lock.Unlock()
83+
84+
// Add the acquired request ID to the context for use in subsequent operations
85+
ctxWithRequestID := context.WithValue(ctx, requestIDKey{}, requestID)
86+
87+
// Return the updated context and nil error to indicate success
88+
return ctxWithRequestID, nil
89+
}
3790

3891
// NewConcurrencyManager initializes a new ConcurrencyManager with the given concurrency limit, logger, and debug mode.
3992
// The ConcurrencyManager ensures no more than a certain number of concurrent requests are made.

httpclient/httpclient_request.go

Lines changed: 25 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,50 +14,20 @@ import (
1414
"go.uber.org/zap"
1515
)
1616

17-
// AcquireConcurrencyToken attempts to acquire a token from the ConcurrencyManager to manage the number of concurrent requests.
18-
// This function is designed to ensure that the HTTP client adheres to predefined concurrency limits, preventing an excessive number of simultaneous requests.
19-
// It creates a new context with a timeout to avoid indefinite blocking in case the concurrency limit is reached.
20-
// Upon successfully acquiring a token, it records the time taken to acquire the token and updates performance metrics accordingly.
21-
// The function then adds the acquired request ID to the context, which can be used for tracking and managing individual requests.
17+
// updatePerformanceMetrics updates the client's performance metrics by recording the duration
18+
// of the HTTP request and incrementing the total request count. This function is thread-safe
19+
// and uses a mutex to synchronize updates to the performance metrics.
2220
//
2321
// Parameters:
24-
// - ctx: The parent context from which the new context with timeout will be derived. This allows for proper request cancellation and timeout handling.
25-
// - log: An instance of a logger (conforming to the logger.Logger interface), used to log relevant information and errors during the token acquisition process.
22+
// - duration: The time duration it took for an HTTP request to complete.
2623
//
27-
// Returns:
28-
// - A new context containing the acquired request ID, which should be passed to subsequent operations requiring concurrency control.
29-
// - An error if the token could not be acquired within the timeout period or due to any other issues encountered by the ConcurrencyManager.
30-
//
31-
// Usage:
32-
// This function should be called before making an HTTP request that needs to be controlled for concurrency.
33-
// The returned context should be used for the HTTP request to ensure it is associated with the acquired concurrency token.
34-
func (c *Client) AcquireConcurrencyToken(ctx context.Context, log logger.Logger) (context.Context, error) {
35-
// Measure the token acquisition start time
36-
tokenAcquisitionStart := time.Now()
37-
38-
// Create a new context with a timeout for acquiring the concurrency token
39-
ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
40-
defer cancel()
41-
42-
requestID, err := c.ConcurrencyMgr.Acquire(ctxWithTimeout)
43-
if err != nil {
44-
log.Error("Failed to acquire concurrency token", zap.Error(err))
45-
return nil, err
46-
}
47-
48-
// Calculate the duration it took to acquire the token
49-
tokenAcquisitionDuration := time.Since(tokenAcquisitionStart)
50-
51-
// Lock the mutex before updating the performance metrics
24+
// This function should be called after each HTTP request to keep track of the client's
25+
// performance over time.
26+
func (c *Client) updatePerformanceMetrics(duration time.Duration) {
5227
c.PerfMetrics.lock.Lock()
53-
c.PerfMetrics.TokenWaitTime += tokenAcquisitionDuration
54-
c.PerfMetrics.lock.Unlock()
55-
56-
// Add the acquired request ID to the context for use in subsequent operations
57-
ctxWithRequestID := context.WithValue(ctx, requestIDKey{}, requestID)
58-
59-
// Return the updated context and nil error to indicate success
60-
return ctxWithRequestID, nil
28+
defer c.PerfMetrics.lock.Unlock()
29+
c.PerfMetrics.TotalResponseTime += duration
30+
c.PerfMetrics.TotalRequests++
6131
}
6232

6333
// DoRequest constructs and executes a standard HTTP request with support for retry logic.
@@ -191,10 +161,13 @@ func (c *Client) DoRequest(method, endpoint string, body, out interface{}, log l
191161
}
192162
// After each request, compute and update response time
193163
responseDuration := time.Since(responseTimeStart)
194-
c.PerfMetrics.lock.Lock()
195-
c.PerfMetrics.TotalResponseTime += responseDuration
196-
c.PerfMetrics.lock.Unlock()
197-
164+
c.updatePerformanceMetrics(responseDuration)
165+
/*
166+
responseDuration := time.Since(responseTimeStart)
167+
c.PerfMetrics.lock.Lock()
168+
c.PerfMetrics.TotalResponseTime += responseDuration
169+
c.PerfMetrics.lock.Unlock()
170+
*/
198171
// Checks for the presence of a deprecation header in the HTTP response and logs if found.
199172
if i == 0 {
200173
CheckDeprecationHeader(resp, log)
@@ -303,12 +276,15 @@ func (c *Client) DoRequest(method, endpoint string, body, out interface{}, log l
303276
return nil, err
304277
}
305278

306-
// After the request, compute and update response time
279+
// After each request, compute and update response time
307280
responseDuration := time.Since(responseTimeStart)
308-
c.PerfMetrics.lock.Lock()
309-
c.PerfMetrics.TotalResponseTime += responseDuration
310-
c.PerfMetrics.lock.Unlock()
311-
281+
c.updatePerformanceMetrics(responseDuration)
282+
/*
283+
responseDuration := time.Since(responseTimeStart)
284+
c.PerfMetrics.lock.Lock()
285+
c.PerfMetrics.TotalResponseTime += responseDuration
286+
c.PerfMetrics.lock.Unlock()
287+
*/
312288
CheckDeprecationHeader(resp, log)
313289

314290
// Handle (unmarshal) response with API Handler

0 commit comments

Comments
 (0)