Skip to content

Commit 7baff09

Browse files
committed
Refactor concurrency package to include new metrics and remove unused code
1 parent 7389b1b commit 7baff09

File tree

4 files changed

+72
-268
lines changed

4 files changed

+72
-268
lines changed

concurrency/adjust_concurrency.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
// concurrency/adjust_concurrency.go
2+
3+
package concurrency
4+
5+
import (
6+
"time"
7+
8+
"go.uber.org/zap"
9+
)
10+
11+
// AdjustConcurrency dynamically adjusts the number of concurrent requests allowed based on the current
12+
// metrics tracked by the ConcurrencyHandler. This function assesses the average response time and error
13+
// rate to decide whether to increase or decrease the concurrency limits.
14+
//
15+
// If the average response time exceeds one second and the current concurrency is greater than the minimum
16+
// limit, the concurrency level is decreased to help alleviate potential load on the server or network.
17+
// Conversely, if the error rate is below 0.05 and the current concurrency is below the maximum limit,
18+
// the concurrency level is increased to potentially improve throughput.
19+
//
20+
// This function locks the metrics to ensure thread safety and prevent race conditions during the read
21+
// and update operations. The actual adjustment of the semaphore's size is delegated to the ResizeSemaphore
22+
// function.
23+
func (ch *ConcurrencyHandler) AdjustConcurrency() {
24+
ch.Metrics.Lock.Lock()
25+
defer ch.Metrics.Lock.Unlock()
26+
27+
// Example logic based on simplified conditions
28+
if ch.Metrics.AverageResponseTime > time.Second && len(ch.sem) > MinConcurrency {
29+
newSize := len(ch.sem) - 1
30+
ch.logger.Info("Reducing concurrency due to high response time", zap.Int("NewSize", newSize))
31+
ch.ResizeSemaphore(newSize)
32+
} else if ch.Metrics.ErrorRate < 0.05 && len(ch.sem) < MaxConcurrency {
33+
newSize := len(ch.sem) + 1
34+
ch.logger.Info("Increasing concurrency due to low error rates", zap.Int("NewSize", newSize))
35+
ch.ResizeSemaphore(newSize)
36+
}
37+
}
38+
39+
// ResizeSemaphore adjusts the size of the semaphore used to control concurrency. This method creates a new
40+
// semaphore with the specified new size and closes the old semaphore to ensure that no further tokens can
41+
// be acquired from it. This approach helps manage the transition from the old concurrency level to the new one
42+
// without affecting ongoing operations significantly.
43+
//
44+
// Parameters:
45+
// - newSize: The new size for the semaphore, representing the updated limit on concurrent requests.
46+
//
47+
// This function should be called from within synchronization contexts, such as AdjustConcurrency, to avoid
48+
// race conditions and ensure that changes to the semaphore are consistent with the observed metrics.
49+
func (ch *ConcurrencyHandler) ResizeSemaphore(newSize int) {
50+
newSem := make(chan struct{}, newSize)
51+
52+
// Transfer tokens from the old semaphore to the new one.
53+
for {
54+
select {
55+
case token := <-ch.sem:
56+
select {
57+
case newSem <- token:
58+
// Token transferred to new semaphore.
59+
default:
60+
// New semaphore is full, put token back to the old one to allow ongoing operations to complete.
61+
ch.sem <- token
62+
}
63+
default:
64+
// No more tokens to transfer.
65+
close(ch.sem)
66+
ch.sem = newSem
67+
return
68+
}
69+
}
70+
}

concurrency/dynamic_token_adjustment.go

Lines changed: 0 additions & 199 deletions
This file was deleted.

concurrency/handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type ConcurrencyMetrics struct {
3131
TotalRetries int64
3232
TotalRateLimitErrors int64
3333
TotalResponseTime time.Duration
34+
AverageResponseTime time.Duration
35+
ErrorRate float64
3436
TokenWaitTime time.Duration
3537
Lock sync.Mutex // Protects performance metrics fields
3638
}

concurrency/metrics.go

Lines changed: 0 additions & 69 deletions
This file was deleted.

0 commit comments

Comments
 (0)