Skip to content

Commit 011667b

Browse files
committed
Refactor ConcurrencyMetrics struct in handler.go to include additional metrics and locks
1 parent 7baff09 commit 011667b

File tree

3 files changed

+136
-9
lines changed

3 files changed

+136
-9
lines changed

concurrency/const.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package concurrency
2+
3+
import "time"
4+
5+
const (
6+
// MaxAcceptableTTFB represents the maximum acceptable Time to First Byte (TTFB) in milliseconds.
7+
// TTFB is the time taken for the server to start sending the first byte of data in response to a request.
8+
// Adjustments in concurrency will be made if the TTFB exceeds this threshold.
9+
MaxAcceptableTTFB = 300 * time.Millisecond
10+
11+
// MaxAcceptableThroughput represents the maximum acceptable network throughput in bytes per second.
12+
// Throughput is the amount of data transferred over the network within a specific time interval.
13+
// Adjustments in concurrency will be made if the network throughput exceeds this threshold.
14+
MaxAcceptableThroughput = 5 * 1024 * 1024
15+
16+
// MaxAcceptableResponseTimeVariability represents the maximum acceptable variability in response times.
17+
// It is used as a threshold to dynamically adjust concurrency based on fluctuations in response times.
18+
MaxAcceptableResponseTimeVariability = 500 * time.Millisecond
19+
)

concurrency/handler.go

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,28 @@ type ConcurrencyHandler struct {
2525
Metrics *ConcurrencyMetrics
2626
}
2727

28-
// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
28+
// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.// ConcurrencyMetrics captures various metrics related to managing concurrency for the client's interactions with the API.
2929
type ConcurrencyMetrics struct {
30-
TotalRequests int64
31-
TotalRetries int64
32-
TotalRateLimitErrors int64
33-
TotalResponseTime time.Duration
34-
AverageResponseTime time.Duration
35-
ErrorRate float64
36-
TokenWaitTime time.Duration
37-
Lock sync.Mutex // Protects performance metrics fields
30+
TotalRequests int64 // Total number of requests made
31+
TotalRetries int64 // Total number of retry attempts
32+
TotalRateLimitErrors int64 // Total number of rate limit errors encountered
33+
TotalResponseTime time.Duration // Total response time for all requests
34+
AverageResponseTime time.Duration // Average response time across all requests
35+
ErrorRate float64 // Error rate calculated as (TotalRateLimitErrors + 5xxErrors) / TotalRequests
36+
TokenWaitTime time.Duration // Total time spent waiting for tokens
37+
TTFB struct { // Metrics related to Time to First Byte (TTFB)
38+
Total time.Duration // Total Time to First Byte (TTFB) for all requests
39+
Count int64 // Count of requests used for calculating TTFB
40+
Lock sync.Mutex // Lock for TTFB metrics
41+
}
42+
Throughput struct { // Metrics related to network throughput
43+
Total float64 // Total network throughput for all requests
44+
Count int64 // Count of requests used for calculating throughput
45+
Lock sync.Mutex // Lock for throughput metrics
46+
}
47+
Variance float64 // Variance of response times
48+
ResponseCount int64 // Count of responses used for calculating response time variability
49+
Lock sync.Mutex // Lock for overall metrics fields
3850
}
3951

4052
// NewConcurrencyHandler initializes a new ConcurrencyHandler with the given

concurrency/metrics.go

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package concurrency
2+
3+
import (
4+
"math"
5+
"net/http"
6+
"time"
7+
8+
"go.uber.org/zap"
9+
)
10+
11+
// MonitorRateLimitHeaders monitors the rate limit headers (X-RateLimit-Remaining and Retry-After)
12+
// in the HTTP response and adjusts concurrency accordingly.
13+
func (ch *ConcurrencyHandler) MonitorRateLimitHeaders(resp *http.Response) {
14+
// Extract X-RateLimit-Remaining and Retry-After headers from the response
15+
remaining := resp.Header.Get("X-RateLimit-Remaining")
16+
retryAfter := resp.Header.Get("Retry-After")
17+
18+
// Adjust concurrency based on the values of these headers
19+
// Implement your logic here to dynamically adjust concurrency
20+
}
21+
22+
// MonitorServerResponseCodes monitors server response codes and adjusts concurrency accordingly.
23+
func (ch *ConcurrencyHandler) MonitorServerResponseCodes(resp *http.Response) {
24+
statusCode := resp.StatusCode
25+
// Check for 5xx errors (server errors) and 4xx errors (client errors)
26+
// Implement your logic here to track increases in error rates and adjust concurrency
27+
}
28+
29+
// MonitorResponseTimeVariability calculates the standard deviation of response times
30+
// and uses moving averages to smooth out fluctuations, adjusting concurrency accordingly.
31+
func (ch *ConcurrencyHandler) MonitorResponseTimeVariability(responseTime time.Duration) {
32+
ch.Metrics.Lock.Lock()
33+
defer ch.Metrics.Lock.Unlock()
34+
35+
// Update TotalResponseTime and ResponseCount for moving average calculation
36+
ch.Metrics.TotalResponseTime += responseTime
37+
ch.Metrics.ResponseCount++
38+
39+
// Calculate average response time
40+
averageResponseTime := ch.Metrics.TotalResponseTime / time.Duration(ch.Metrics.ResponseCount)
41+
42+
// Calculate standard deviation of response times
43+
variance := ch.calculateVariance(averageResponseTime, responseTime)
44+
stdDev := math.Sqrt(variance)
45+
46+
// Adjust concurrency based on response time variability
47+
if float64(stdDev) > MaxAcceptableResponseTimeVariability.Seconds() && len(ch.sem) > MinConcurrency {
48+
newSize := len(ch.sem) - 1
49+
ch.logger.Info("Reducing concurrency due to high response time variability", zap.Int("NewSize", newSize))
50+
ch.ResizeSemaphore(newSize)
51+
}
52+
}
53+
54+
// calculateVariance calculates the variance of response times.
55+
func (ch *ConcurrencyHandler) calculateVariance(averageResponseTime time.Duration, responseTime time.Duration) float64 {
56+
// Convert time.Duration values to seconds
57+
averageSeconds := averageResponseTime.Seconds()
58+
responseSeconds := responseTime.Seconds()
59+
60+
// Calculate variance
61+
variance := (float64(ch.Metrics.ResponseCount-1)*math.Pow(averageSeconds-responseSeconds, 2) + ch.Metrics.Variance) / float64(ch.Metrics.ResponseCount)
62+
ch.Metrics.Variance = variance
63+
return variance
64+
}
65+
66+
// MonitorNetworkLatency measures Time to First Byte (TTFB) and monitors network throughput,
67+
// adjusting concurrency based on changes in network latency and throughput.
68+
func (ch *ConcurrencyHandler) MonitorNetworkLatency(ttfb time.Duration, throughput float64) {
69+
ch.Metrics.Lock.Lock()
70+
defer ch.Metrics.Lock.Unlock()
71+
72+
// Calculate the TTFB moving average
73+
ch.Metrics.TTFB.Lock.Lock()
74+
defer ch.Metrics.TTFB.Lock.Unlock()
75+
ch.Metrics.TTFB.Total += ttfb
76+
ch.Metrics.TTFB.Count++
77+
ttfbMovingAverage := ch.Metrics.TTFB.Total / time.Duration(ch.Metrics.TTFB.Count)
78+
79+
// Calculate the throughput moving average
80+
ch.Metrics.Throughput.Lock.Lock()
81+
defer ch.Metrics.Throughput.Lock.Unlock()
82+
ch.Metrics.Throughput.Total += throughput
83+
ch.Metrics.Throughput.Count++
84+
throughputMovingAverage := ch.Metrics.Throughput.Total / float64(ch.Metrics.Throughput.Count)
85+
86+
// Adjust concurrency based on TTFB and throughput moving averages
87+
if ttfbMovingAverage > MaxAcceptableTTFB && len(ch.sem) > MinConcurrency {
88+
newSize := len(ch.sem) - 1
89+
ch.logger.Info("Reducing concurrency due to high TTFB", zap.Int("NewSize", newSize))
90+
ch.ResizeSemaphore(newSize)
91+
} else if throughputMovingAverage > MaxAcceptableThroughput && len(ch.sem) < MaxConcurrency {
92+
newSize := len(ch.sem) + 1
93+
ch.logger.Info("Increasing concurrency due to high throughput", zap.Int("NewSize", newSize))
94+
ch.ResizeSemaphore(newSize)
95+
}
96+
}

0 commit comments

Comments
 (0)