Skip to content

Commit 2de7628

Browse files
committed
Add AcquireConcurrencyToken function for managing concurrency in HTTP client requests
1 parent 3f0ac05 commit 2de7628

File tree

1 file changed

+63
-31
lines changed

1 file changed

+63
-31
lines changed

httpclient/httpclient_request.go

Lines changed: 63 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -10,35 +10,54 @@ import (
1010

1111
"github.com/deploymenttheory/go-api-http-client/errors"
1212
"github.com/deploymenttheory/go-api-http-client/logger"
13+
"github.com/google/uuid"
1314
"go.uber.org/zap"
1415
)
1516

16-
func (c *Client) executeRequest(req *http.Request, log logger.Logger) (*http.Response, error) {
17-
// Start response time measurement
18-
responseTimeStart := time.Now()
17+
// AcquireConcurrencyToken attempts to acquire a token from the ConcurrencyManager to manage the number of concurrent requests.
18+
// This function is designed to ensure that the HTTP client adheres to predefined concurrency limits, preventing an excessive number of simultaneous requests.
19+
// It creates a new context with a timeout to avoid indefinite blocking in case the concurrency limit is reached.
20+
// Upon successfully acquiring a token, it records the time taken to acquire the token and updates performance metrics accordingly.
21+
// The function then adds the acquired request ID to the context, which can be used for tracking and managing individual requests.
22+
//
23+
// Parameters:
24+
// - ctx: The parent context from which the new context with timeout will be derived. This allows for proper request cancellation and timeout handling.
25+
// - log: An instance of a logger (conforming to the logger.Logger interface), used to log relevant information and errors during the token acquisition process.
26+
//
27+
// Returns:
28+
// - A new context containing the acquired request ID, which should be passed to subsequent operations requiring concurrency control.
29+
// - An error if the token could not be acquired within the timeout period or due to any other issues encountered by the ConcurrencyManager.
30+
//
31+
// Usage:
32+
// This function should be called before making an HTTP request that needs to be controlled for concurrency.
33+
// The returned context should be used for the HTTP request to ensure it is associated with the acquired concurrency token.
34+
func (c *Client) AcquireConcurrencyToken(ctx context.Context, log logger.Logger) (context.Context, error) {
35+
// Measure the token acquisition start time
36+
tokenAcquisitionStart := time.Now()
1937

20-
// Execute the request
21-
resp, err := c.httpClient.Do(req)
38+
// Create a new context with a timeout for acquiring the concurrency token
39+
ctxWithTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
40+
defer cancel()
41+
42+
requestID, err := c.ConcurrencyMgr.Acquire(ctxWithTimeout)
2243
if err != nil {
23-
log.Error("Failed to send request", zap.String("method", req.Method), zap.String("url", req.URL.String()), zap.Error(err))
44+
log.Error("Failed to acquire concurrency token", zap.Error(err))
2445
return nil, err
2546
}
2647

27-
// Compute and update response time
28-
responseDuration := time.Since(responseTimeStart)
48+
// Calculate the duration it took to acquire the token
49+
tokenAcquisitionDuration := time.Since(tokenAcquisitionStart)
50+
51+
// Lock the mutex before updating the performance metrics
2952
c.PerfMetrics.lock.Lock()
30-
c.PerfMetrics.TotalResponseTime += responseDuration
53+
c.PerfMetrics.TokenWaitTime += tokenAcquisitionDuration
3154
c.PerfMetrics.lock.Unlock()
3255

33-
// Check for the presence of a deprecation header in the HTTP response and log if found
34-
CheckDeprecationHeader(resp, log)
56+
// Add the acquired request ID to the context for use in subsequent operations
57+
ctxWithRequestID := context.WithValue(ctx, requestIDKey{}, requestID)
3558

36-
// Basic response status check
37-
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
38-
log.Warn("Received non-success HTTP status", zap.String("method", req.Method), zap.String("url", req.URL.String()), zap.Int("status_code", resp.StatusCode))
39-
}
40-
41-
return resp, nil
59+
// Return the updated context and nil error to indicate success
60+
return ctxWithRequestID, nil
4261
}
4362

4463
// DoRequest constructs and executes a standard HTTP request with support for retry logic.
@@ -78,25 +97,38 @@ func (c *Client) DoRequest(method, endpoint string, body, out interface{}, log l
7897
if err != nil || !valid {
7998
return nil, fmt.Errorf("validity of the authentication token failed with error: %w", err)
8099
}
100+
/*
101+
// Acquire a token for concurrency management with a timeout and measure its acquisition time
102+
tokenAcquisitionStart := time.Now()
103+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
104+
defer cancel()
81105
82-
// Acquire a token for concurrency management with a timeout and measure its acquisition time
83-
tokenAcquisitionStart := time.Now()
84-
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
85-
defer cancel()
106+
requestID, err := c.ConcurrencyMgr.Acquire(ctx)
107+
if err != nil {
108+
return nil, err
109+
}
110+
defer c.ConcurrencyMgr.Release(requestID)
111+
112+
tokenAcquisitionDuration := time.Since(tokenAcquisitionStart)
113+
c.PerfMetrics.lock.Lock()
114+
c.PerfMetrics.TokenWaitTime += tokenAcquisitionDuration
115+
c.PerfMetrics.lock.Unlock()
86116
87-
requestID, err := c.ConcurrencyMgr.Acquire(ctx)
117+
// Add the request ID to the context
118+
ctx = context.WithValue(ctx, requestIDKey{}, requestID)
119+
*/
120+
121+
// Acquire a token for concurrency management
122+
ctx, err := c.AcquireConcurrencyToken(context.Background(), log)
88123
if err != nil {
89124
return nil, err
90125
}
91-
defer c.ConcurrencyMgr.Release(requestID)
92-
93-
tokenAcquisitionDuration := time.Since(tokenAcquisitionStart)
94-
c.PerfMetrics.lock.Lock()
95-
c.PerfMetrics.TokenWaitTime += tokenAcquisitionDuration
96-
c.PerfMetrics.lock.Unlock()
97-
98-
// Add the request ID to the context
99-
ctx = context.WithValue(ctx, requestIDKey{}, requestID)
126+
defer func() {
127+
// Extract the requestID from the context and release the concurrency token
128+
if requestID, ok := ctx.Value(requestIDKey{}).(uuid.UUID); ok {
129+
c.ConcurrencyMgr.Release(requestID)
130+
}
131+
}()
100132

101133
// Determine which set of encoding and content-type request rules to use
102134
apiHandler := c.APIHandler

0 commit comments

Comments
 (0)