Skip to content

Commit 550b9e0

Browse files
estolfomarclop
andauthored
Use more straightforward compression approach and buffer pool (#79)
* Use more straightforward compression strategy and buffer pool * Remove extra request variable and simplify if-else * Add benchmark, pass http.Client around Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com> * Use ioutil.Discard Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com> Co-authored-by: Marc Lopez Rubio <marc5.12@outlook.com>
1 parent 93cfa53 commit 550b9e0

File tree

4 files changed

+74
-33
lines changed

4 files changed

+74
-33
lines changed

apm-lambda-extension/extension/apm_server.go

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,53 +21,55 @@ import (
2121
"bytes"
2222
"compress/gzip"
2323
"fmt"
24-
"io"
2524
"io/ioutil"
2625
"log"
2726
"net/http"
27+
"sync"
2828
)
2929

30+
var bufferPool = sync.Pool{New: func() interface{} {
31+
return &bytes.Buffer{}
32+
}}
33+
3034
// todo: can this be a streaming or streaming style call that keeps the
3135
// connection open across invocations?
32-
func PostToApmServer(agentData AgentData, config *extensionConfig) error {
33-
endpointUri := "intake/v2/events"
34-
var req *http.Request
35-
var err error
36+
func PostToApmServer(client *http.Client, agentData AgentData, config *extensionConfig) error {
37+
endpointURI := "intake/v2/events"
38+
encoding := agentData.ContentEncoding
39+
buf := bufferPool.Get().(*bytes.Buffer)
40+
defer func() {
41+
buf.Reset()
42+
bufferPool.Put(buf)
43+
}()
3644

3745
if agentData.ContentEncoding == "" {
38-
pr, pw := io.Pipe()
39-
gw, _ := gzip.NewWriterLevel(pw, gzip.BestSpeed)
40-
41-
go func() {
42-
_, err = io.Copy(gw, bytes.NewReader(agentData.Data))
43-
gw.Close()
44-
pw.Close()
45-
if err != nil {
46-
log.Printf("Failed to compress data: %v", err)
47-
}
48-
}()
49-
50-
req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, pr)
46+
encoding = "gzip"
47+
gw, err := gzip.NewWriterLevel(buf, gzip.BestSpeed)
5148
if err != nil {
52-
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
49+
return err
5350
}
54-
req.Header.Add("Content-Encoding", "gzip")
55-
} else {
56-
req, err = http.NewRequest("POST", config.apmServerUrl+endpointUri, bytes.NewReader(agentData.Data))
57-
if err != nil {
58-
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
51+
if _, err := gw.Write(agentData.Data); err != nil {
52+
log.Printf("Failed to compress data: %v", err)
5953
}
60-
req.Header.Add("Content-Encoding", agentData.ContentEncoding)
54+
if err := gw.Close(); err != nil {
55+
log.Printf("Failed write compressed data to buffer: %v", err)
56+
}
57+
} else {
58+
buf.Write(agentData.Data)
6159
}
6260

61+
req, err := http.NewRequest("POST", config.apmServerUrl+endpointURI, buf)
62+
if err != nil {
63+
return fmt.Errorf("failed to create a new request when posting to APM server: %v", err)
64+
}
65+
req.Header.Add("Content-Encoding", encoding)
6366
req.Header.Add("Content-Type", "application/x-ndjson")
6467
if config.apmServerApiKey != "" {
6568
req.Header.Add("Authorization", "ApiKey "+config.apmServerApiKey)
6669
} else if config.apmServerSecretToken != "" {
6770
req.Header.Add("Authorization", "Bearer "+config.apmServerSecretToken)
6871
}
6972

70-
client := &http.Client{}
7173
resp, err := client.Do(req)
7274
if err != nil {
7375
return fmt.Errorf("failed to post to APM server: %v", err)

apm-lambda-extension/extension/apm_server_test.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func TestPostToApmServerDataCompressed(t *testing.T) {
5757
apmServerUrl: apmServer.URL + "/",
5858
}
5959

60-
err := PostToApmServer(agentData, &config)
60+
err := PostToApmServer(apmServer.Client(), agentData, &config)
6161
assert.Equal(t, nil, err)
6262
}
6363

@@ -90,6 +90,40 @@ func TestPostToApmServerDataNotCompressed(t *testing.T) {
9090
apmServerUrl: apmServer.URL + "/",
9191
}
9292

93-
err := PostToApmServer(agentData, &config)
93+
err := PostToApmServer(apmServer.Client(), agentData, &config)
9494
assert.Equal(t, nil, err)
9595
}
96+
97+
func BenchmarkPostToAPM(b *testing.B) {
98+
// Copied from https://github.com/elastic/apm-server/blob/master/testdata/intake-v2/transactions.ndjson.
99+
benchBody := []byte(`{"metadata": {"service": {"name": "1234_service-12a3","node": {"configured_name": "node-123"},"version": "5.1.3","environment": "staging","language": {"name": "ecmascript","version": "8"},"runtime": {"name": "node","version": "8.0.0"},"framework": {"name": "Express","version": "1.2.3"},"agent": {"name": "elastic-node","version": "3.14.0"}},"user": {"id": "123user", "username": "bar", "email": "bar@user.com"}, "labels": {"tag0": null, "tag1": "one", "tag2": 2}, "process": {"pid": 1234,"ppid": 6789,"title": "node","argv": ["node","server.js"]},"system": {"hostname": "prod1.example.com","architecture": "x64","platform": "darwin", "container": {"id": "container-id"}, "kubernetes": {"namespace": "namespace1", "pod": {"uid": "pod-uid", "name": "pod-name"}, "node": {"name": "node-name"}}},"cloud":{"account":{"id":"account_id","name":"account_name"},"availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"project":{"id":"project_id","name":"project_name"},"provider":"cloud_provider","region":"cloud_region","service":{"name":"lambda"}}}}
100+
{"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}}
101+
{"transaction": {"id": "4340a8e0df1906ecbfa9", "trace_id": "0acd456789abcdef0123456789abcdef", "name": "GET /api/types","type": "request","duration": 32.592981,"outcome":"success", "result": "success", "timestamp": 1496170407154000, "sampled": true, "span_count": {"started": 17},"context": {"service": {"runtime": {"version": "7.0"}},"page":{"referer":"http://localhost:8000/test/e2e/","url":"http://localhost:8000/test/e2e/general-usecase/"}, "request": {"socket": {"remote_address": "12.53.12.1","encrypted": true},"http_version": "1.1","method": "POST","url": {"protocol": "https:","full": "https://www.example.com/p/a/t/h?query=string#hash","hostname": "www.example.com","port": "8080","pathname": "/p/a/t/h","search": "?query=string","hash": "#hash","raw": "/p/a/t/h?query=string#hash"},"headers": {"user-agent":["Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36","Mozilla Chrome Edge"],"content-type": "text/html","cookie": "c1=v1, c2=v2","some-other-header": "foo","array": ["foo","bar","baz"]},"cookies": {"c1": "v1","c2": "v2"},"env": {"SERVER_SOFTWARE": "nginx","GATEWAY_INTERFACE": "CGI/1.1"},"body": {"str": "hello world","additional": { "foo": {},"bar": 123,"req": "additional information"}}},"response": {"status_code": 200,"headers": {"content-type": "application/json"},"headers_sent": true,"finished": true,"transfer_size":25.8,"encoded_body_size":26.90,"decoded_body_size":29.90}, "user": {"domain": "ldap://abc","id": "99","username": "foo"},"tags": {"organization_uuid": "9f0e9d64-c185-4d21-a6f4-4673ed561ec8", "tag2": 12, "tag3": 12.45, "tag4": false, "tag5": null },"custom": {"my_key": 1,"some_other_value": "foo bar","and_objects": {"foo": ["bar","baz"]},"(": "not a valid regex and that is fine"}}}}
102+
{"transaction": { "id": "cdef4340a8e0df19", "trace_id": "0acd456789abcdef0123456789abcdef", "type": "request", "duration": 13.980558, "timestamp": 1532976822281000, "sampled": null, "span_count": { "dropped": 55, "started": 436 }, "marks": {"navigationTiming": {"appBeforeBootstrap": 608.9300000000001,"navigationStart": -21},"another_mark": {"some_long": 10,"some_float": 10.0}, "performance": {}}, "context": { "request": { "socket": { "remote_address": "192.0.1", "encrypted": null }, "method": "POST", "headers": { "user-agent": null, "content-type": null, "cookie": null }, "url": { "protocol": null, "full": null, "hostname": null, "port": null, "pathname": null, "search": null, "hash": null, "raw": null } }, "response": { "headers": { "content-type": null } }, "service": {"environment":"testing","name": "service1","node": {"configured_name": "node-ABC"}, "language": {"version": "2.5", "name": "ruby"}, "agent": {"version": "2.2", "name": "elastic-ruby", "ephemeral_id": "justanid"}, "framework": {"version": "5.0", "name": "Rails"}, "version": "2", "runtime": {"version": "2.5", "name": "cruby"}}},"experience":{"cls":1,"fid":2.0,"tbt":3.4,"longtask":{"count":3,"sum":2.5,"max":1}}}}
103+
{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}}
104+
{"transaction": { "name": "july-2021-delete-after-july-31", "type": "lambda", "result": "success", "id": "142e61450efb8574", "trace_id": "eb56529a1f461c5e7e2f66ecb075e983", "subtype": null, "action": null, "duration": 38.853, "timestamp": 1631736666365048, "sampled": true, "context": { "cloud": { "origin": { "account": { "id": "abc123" }, "provider": "aws", "region": "us-east-1", "service": { "name": "serviceName" } } }, "service": { "origin": { "id": "abc123", "name": "service-name", "version": "1.0" } }, "user": {}, "tags": {}, "custom": { } }, "sync": true, "span_count": { "started": 0 }, "outcome": "unknown", "faas": { "coldstart": false, "execution": "2e13b309-23e1-417f-8bf7-074fc96bc683", "trigger": { "request_id": "FuH2Cir_vHcEMUA=", "type": "http" } }, "sample_rate": 1 } }
105+
`)
106+
agentData := AgentData{Data: benchBody, ContentEncoding: ""}
107+
108+
// Create apm server and handler
109+
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
110+
io.Copy(ioutil.Discard, r.Body)
111+
r.Body.Close()
112+
w.WriteHeader(202)
113+
w.Write([]byte(`{}`))
114+
}))
115+
config := extensionConfig{
116+
apmServerUrl: apmServer.URL + "/",
117+
}
118+
119+
client := &http.Client{
120+
Transport: http.DefaultTransport.(*http.Transport).Clone(),
121+
}
122+
b.ResetTimer()
123+
for i := 0; i < b.N; i++ {
124+
err := PostToApmServer(client, agentData, &config)
125+
if err != nil {
126+
b.Fatal(err)
127+
}
128+
}
129+
}

apm-lambda-extension/extension/process_events.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,21 @@ package extension
2020
import (
2121
"encoding/json"
2222
"log"
23+
"net/http"
2324
)
2425

2526
func ProcessShutdown() {
2627
log.Println("Received SHUTDOWN event")
2728
log.Println("Exiting")
2829
}
2930

30-
func FlushAPMData(dataChannel chan AgentData, config *extensionConfig) {
31+
func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig) {
3132
log.Println("Checking for agent data")
3233
for {
3334
select {
3435
case agentData := <-dataChannel:
3536
log.Println("Processing agent data")
36-
err := PostToApmServer(agentData, config)
37+
err := PostToApmServer(client, agentData, config)
3738
if err != nil {
3839
log.Printf("Error sending to APM server, skipping: %v", err)
3940
}

apm-lambda-extension/main.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package main
2020
import (
2121
"context"
2222
"log"
23+
"net/http"
2324
"os"
2425
"os/signal"
2526
"path/filepath"
@@ -87,6 +88,9 @@ func main() {
8788
}
8889
}
8990

91+
client := &http.Client{
92+
Transport: http.DefaultTransport.(*http.Transport).Clone(),
93+
}
9094
for {
9195
select {
9296
case <-ctx.Done():
@@ -112,7 +116,7 @@ func main() {
112116

113117
// Flush any APM data, in case waiting for the runtimeDone event timed out,
114118
// the agent data wasn't available yet, and we got to the next event
115-
extension.FlushAPMData(agentDataChannel, config)
119+
extension.FlushAPMData(client, agentDataChannel, config)
116120

117121
// Make a channel for signaling that a runtimeDone event has been received
118122
runtimeDone := make(chan struct{})
@@ -130,7 +134,7 @@ func main() {
130134
log.Println("Function invocation is complete, not receiving any more agent data")
131135
return
132136
case agentData := <-agentDataChannel:
133-
err := extension.PostToApmServer(agentData, config)
137+
err := extension.PostToApmServer(client, agentData, config)
134138
if err != nil {
135139
log.Printf("Error sending to APM server, skipping: %v", err)
136140
}
@@ -179,7 +183,7 @@ func main() {
179183
}
180184

181185
// Flush APM data now that the function invocation has completed
182-
extension.FlushAPMData(agentDataChannel, config)
186+
extension.FlushAPMData(client, agentDataChannel, config)
183187

184188
// Signal that the function invocation has completed
185189
close(funcInvocDone)

0 commit comments

Comments
 (0)