Skip to content

Commit ff09298

Browse files
authored
Add backoff mechanism to recover from APM Server transport failures (#148)
* Add basic mechanism * Implementing review comments * Implement review comments #2 * Add unit tests * Add recovery test in main unit tests * Implementing code review comments - 3 * Handle Grace Period goroutine leak * Fix tests and implement review comments * Fix main unit tests * Correct log level in tests * Rename HangLock and accelerate TestAPMRecoveryServer * Implement review comments - 5 * Add additional checks in APM Server tests * Fix race condition in TestEnterBackoffFromHealthy * Refactor APMServerTransportState * Add locks on wait period goroutine * Adjusting lock position * Enter backoff refactor * Remove unused channel * Handle invalid transport status
1 parent 6a980e5 commit ff09298

File tree

14 files changed

+457
-105
lines changed

14 files changed

+457
-105
lines changed

apm-lambda-extension/e2e-testing/e2e_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ func TestEndToEnd(t *testing.T) {
3232
panic("No config file")
3333
}
3434

35-
extension.InitLogger()
3635
if os.Getenv("ELASTIC_APM_LOG_LEVEL") != "" {
3736
logLevel, _ := logrus.ParseLevel(os.Getenv("ELASTIC_APM_LOG_LEVEL"))
3837
extension.Log.Logger.SetLevel(logLevel)

apm-lambda-extension/extension/apm_server.go

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,49 @@ package extension
2020
import (
2121
"bytes"
2222
"compress/gzip"
23+
"context"
24+
"errors"
2325
"fmt"
2426
"io"
2527
"io/ioutil"
28+
"math"
29+
"math/rand"
2630
"net/http"
2731
"sync"
32+
"time"
2833
)
2934

3035
var bufferPool = sync.Pool{New: func() interface{} {
3136
return &bytes.Buffer{}
3237
}}
3338

39+
type ApmServerTransportStatusType string
40+
41+
const (
42+
Failing ApmServerTransportStatusType = "Failing"
43+
Pending ApmServerTransportStatusType = "Pending"
44+
Healthy ApmServerTransportStatusType = "Healthy"
45+
)
46+
47+
type ApmServerTransportStateType struct {
48+
sync.Mutex
49+
Status ApmServerTransportStatusType
50+
ReconnectionCount int
51+
GracePeriodTimer *time.Timer
52+
}
53+
54+
var ApmServerTransportState = ApmServerTransportStateType{
55+
Status: Healthy,
56+
ReconnectionCount: -1,
57+
}
58+
3459
// todo: can this be a streaming or streaming style call that keeps the
3560
// connection open across invocations?
36-
func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig) error {
61+
func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig, ctx context.Context) error {
62+
if !IsTransportStatusHealthyOrPending() {
63+
return errors.New("transport status is unhealthy")
64+
}
65+
3766
endpointURI := "intake/v2/events"
3867
encoding := agentData.ContentEncoding
3968

@@ -72,19 +101,75 @@ func PostToApmServer(client *http.Client, agentData AgentData, config *extension
72101
req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken)
73102
}
74103

104+
Log.Debug("Sending data chunk to APM Server")
75105
resp, err := client.Do(req)
76106
if err != nil {
107+
SetApmServerTransportState(Failing, ctx)
77108
return fmt.Errorf("failed to post to APM server: %v", err)
78109
}
79110

80111
//Read the response body
81112
defer resp.Body.Close()
82113
body, err := ioutil.ReadAll(resp.Body)
83114
if err != nil {
115+
SetApmServerTransportState(Failing, ctx)
84116
return fmt.Errorf("failed to read the response body after posting to the APM server")
85117
}
86118

119+
SetApmServerTransportState(Healthy, ctx)
120+
Log.Debug("Transport status set to healthy")
87121
Log.Debugf("APM server response body: %v", string(body))
88122
Log.Debugf("APM server response status code: %v", resp.StatusCode)
89123
return nil
90124
}
125+
126+
func IsTransportStatusHealthyOrPending() bool {
127+
return ApmServerTransportState.Status != Failing
128+
}
129+
130+
func SetApmServerTransportState(status ApmServerTransportStatusType, ctx context.Context) {
131+
switch status {
132+
case Healthy:
133+
ApmServerTransportState.Lock()
134+
ApmServerTransportState.Status = status
135+
Log.Debugf("APM Server Transport status set to %s", status)
136+
ApmServerTransportState.ReconnectionCount = -1
137+
ApmServerTransportState.Unlock()
138+
case Failing:
139+
ApmServerTransportState.Lock()
140+
ApmServerTransportState.Status = status
141+
Log.Debugf("APM Server Transport status set to %s", status)
142+
ApmServerTransportState.ReconnectionCount++
143+
ApmServerTransportState.GracePeriodTimer = time.NewTimer(computeGracePeriod())
144+
Log.Debugf("Grace period entered, reconnection count : %d", ApmServerTransportState.ReconnectionCount)
145+
go func() {
146+
select {
147+
case <-ApmServerTransportState.GracePeriodTimer.C:
148+
Log.Debug("Grace period over - timer timed out")
149+
case <-ctx.Done():
150+
Log.Debug("Grace period over - context done")
151+
}
152+
ApmServerTransportState.Status = Pending
153+
Log.Debugf("APM Server Transport status set to %s", status)
154+
ApmServerTransportState.Unlock()
155+
}()
156+
default:
157+
Log.Errorf("Cannot set APM Server Transport status to %s", status)
158+
}
159+
}
160+
161+
// ComputeGracePeriod https://github.com/elastic/apm/blob/main/specs/agents/transport.md#transport-errors
162+
func computeGracePeriod() time.Duration {
163+
gracePeriodWithoutJitter := math.Pow(math.Min(float64(ApmServerTransportState.ReconnectionCount), 6), 2)
164+
jitter := rand.Float64()/5 - 0.1
165+
return time.Duration((gracePeriodWithoutJitter + jitter*gracePeriodWithoutJitter) * float64(time.Second))
166+
}
167+
168+
func EnqueueAPMData(agentDataChannel chan AgentData, agentData AgentData) {
169+
select {
170+
case agentDataChannel <- agentData:
171+
Log.Debug("Adding agent data to buffer to be sent to apm server")
172+
default:
173+
Log.Warn("Channel full: dropping a subset of agent data")
174+
}
175+
}

0 commit comments

Comments
 (0)