Skip to content

Commit 718302b

Browse files
committed
sized down concurrency package
1 parent 799e33c commit 718302b

File tree

4 files changed

+33
-35
lines changed

4 files changed

+33
-35
lines changed

concurrency/resize.go

Lines changed: 0 additions & 35 deletions
This file was deleted.

concurrency/scale.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,36 @@ func (ch *ConcurrencyHandler) ScaleUp() {
3535
ch.logger.Info("Concurrency already at maximum level; cannot increase further", zap.Int("currentSize", currentSize))
3636
}
3737
}
38+
39+
// ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new
40+
// semaphore with the specified new size and closes the old semaphore to ensure that no further tokens can
41+
// be acquired from it. This approach helps manage the transition from the old concurrency level to the new one
42+
// without affecting ongoing operations significantly.
43+
//
44+
// Parameters:
45+
// - newSize: The new size for the semaphore, representing the updated limit on concurrent requests.
46+
//
47+
// This function should be called from within synchronization contexts, such as AdjustConcurrency, to avoid
48+
// race conditions and ensure that changes to the semaphore are consistent with the observed metrics.
49+
func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int) {
50+
newSem := make(chan struct{}, newSize)
51+
52+
// Transfer tokens from the old semaphore to the new one.
53+
for {
54+
select {
55+
case token := <-ch.sem:
56+
select {
57+
case newSem <- token:
58+
// Token transferred to new semaphore.
59+
default:
60+
// New semaphore is full, put token back to the old one to allow ongoing operations to complete.
61+
ch.sem <- token
62+
}
63+
default:
64+
// No more tokens to transfer.
65+
close(ch.sem)
66+
ch.sem = newSem
67+
return
68+
}
69+
}
70+
}

0 commit comments

Comments
 (0)