Skip to content

Commit 1903769

Browse files
authored
Use copy for info response and add intake test (#83)
* Use copy to forward apm server response body to agent * Remove unncessary check if body is nil and add intake test * Close body after reading * Write headers before body to response
1 parent 01d9177 commit 1903769

File tree

2 files changed

+104
-17
lines changed

2 files changed

+104
-17
lines changed

apm-lambda-extension/extension/http_server_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"net"
2323
"net/http"
2424
"net/http/httptest"
25+
"strings"
2526
"testing"
2627

2728
"gotest.tools/assert"
@@ -37,6 +38,7 @@ func TestInfoProxy(t *testing.T) {
3738
assert.Equal(t, 1, len(r.Header[key]))
3839
assert.Equal(t, headers[key], r.Header[key][0])
3940
}
41+
w.Header().Add("test", "header")
4042
w.Write([]byte(`{"foo": "bar"}`))
4143
}))
4244
defer apmServer.Close()
@@ -75,6 +77,91 @@ func TestInfoProxy(t *testing.T) {
7577
} else {
7678
body, _ := ioutil.ReadAll(resp.Body)
7779
assert.Equal(t, string(body), wantResp)
80+
assert.Equal(t, "header", resp.Header.Get("test"))
7881
resp.Body.Close()
7982
}
8083
}
84+
85+
func TestInfoProxyErrorStatusCode(t *testing.T) {
86+
// Create apm server and handler
87+
apmServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
88+
w.WriteHeader(401)
89+
}))
90+
defer apmServer.Close()
91+
92+
// Create extension config and start the server
93+
dataChannel := make(chan AgentData, 100)
94+
config := extensionConfig{
95+
apmServerUrl: apmServer.URL,
96+
apmServerSecretToken: "foo",
97+
apmServerApiKey: "bar",
98+
dataReceiverServerPort: ":1234",
99+
dataReceiverTimeoutSeconds: 15,
100+
}
101+
102+
StartHttpServer(dataChannel, &config)
103+
defer agentDataServer.Close()
104+
105+
hosts, _ := net.LookupHost("localhost")
106+
url := "http://" + hosts[0] + ":1234"
107+
108+
// Create a request to send to the extension
109+
req, err := http.NewRequest("GET", url, nil)
110+
if err != nil {
111+
t.Logf("Could not create request")
112+
}
113+
114+
// Send the request to the extension
115+
client := &http.Client{}
116+
resp, err := client.Do(req)
117+
if err != nil {
118+
t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err)
119+
t.Fail()
120+
} else {
121+
assert.Equal(t, 401, resp.StatusCode)
122+
}
123+
}
124+
125+
func Test_handleInfoRequest(t *testing.T) {
126+
headers := map[string]string{"Authorization": "test-value"}
127+
// Copied from https://github.com/elastic/apm-server/blob/master/testdata/intake-v2/transactions.ndjson.
128+
agentRequestBody := `{"metadata": {"service": {"name": "1234_service-12a3","node": {"configured_name": "node-123"},"version": "5.1.3","environment": "staging","language": {"name": "ecmascript","version": "8"},"runtime": {"name": "node","version": "8.0.0"},"framework": {"name": "Express","version": "1.2.3"},"agent": {"name": "elastic-node","version": "3.14.0"}},"user": {"id": "123user", "username": "bar", "email": "bar@user.com"}, "labels": {"tag0": null, "tag1": "one", "tag2": 2}, "process": {"pid": 1234,"ppid": 6789,"title": "node","argv": ["node","server.js"]},"system": {"hostname": "prod1.example.com","architecture": "x64","platform": "darwin", "container": {"id": "container-id"}, "kubernetes": {"namespace": "namespace1", "pod": {"uid": "pod-uid", "name": "pod-name"}, "node": {"name": "node-name"}}},"cloud":{"account":{"id":"account_id","name":"account_name"},"availability_zone":"cloud_availability_zone","instance":{"id":"instance_id","name":"instance_name"},"machine":{"type":"machine_type"},"project":{"id":"project_id","name":"project_name"},"provider":"cloud_provider","region":"cloud_region","service":{"name":"lambda"}}}}
129+
{"transaction": { "id": "945254c567a5417e", "trace_id": "0123456789abcdef0123456789abcdef", "parent_id": "abcdefabcdef01234567", "type": "request", "duration": 32.592981, "span_count": { "started": 43 }}}
130+
{"transaction": { "id": "00xxxxFFaaaa1234", "trace_id": "0123456789abcdef0123456789abcdef", "name": "amqp receive", "parent_id": "abcdefabcdef01234567", "type": "messaging", "duration": 3, "span_count": { "started": 1 }, "context": {"message": {"queue": { "name": "new_users"}, "age":{ "ms": 1577958057123}, "headers": {"user_id": "1ax3", "involved_services": ["user", "auth"]}, "body": "user created", "routing_key": "user-created-transaction"}},"session":{"id":"sunday","sequence":123}}}
131+
`
132+
133+
// Create extension config
134+
dataChannel := make(chan AgentData, 100)
135+
config := extensionConfig{
136+
apmServerSecretToken: "foo",
137+
apmServerApiKey: "bar",
138+
dataReceiverServerPort: ":1234",
139+
dataReceiverTimeoutSeconds: 15,
140+
}
141+
142+
// Start extension server
143+
StartHttpServer(dataChannel, &config)
144+
defer agentDataServer.Close()
145+
146+
// Create a request to send to the extension
147+
hosts, _ := net.LookupHost("localhost")
148+
url := "http://" + hosts[0] + ":1234/intake/v2/events"
149+
req, err := http.NewRequest("POST", url, strings.NewReader(agentRequestBody))
150+
if err != nil {
151+
t.Logf("Could not create request")
152+
}
153+
// Add headers to the request
154+
for name, value := range headers {
155+
req.Header.Add(name, value)
156+
}
157+
158+
// Send the request to the extension
159+
client := &http.Client{}
160+
resp, err := client.Do(req)
161+
if err != nil {
162+
t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err)
163+
t.Fail()
164+
} else {
165+
assert.Equal(t, 202, resp.StatusCode)
166+
}
167+
}

apm-lambda-extension/extension/route_handlers.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package extension
1919

2020
import (
21+
"io"
2122
"io/ioutil"
2223
"log"
2324
"net/http"
@@ -46,30 +47,33 @@ func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.
4647
return
4748
}
4849

49-
resp, err := client.Do(req)
50+
// Send request to apm server
51+
serverResp, err := client.Do(req)
5052
if err != nil {
5153
log.Printf("error forwarding info request (`/`) to APM Server: %v", err)
5254
return
5355
}
54-
defer resp.Body.Close()
55-
body, err := ioutil.ReadAll(resp.Body)
56-
if err != nil {
57-
log.Printf("could not read info request response to APM Server: %v", err)
58-
return
59-
}
6056

61-
// send status code
62-
w.WriteHeader(resp.StatusCode)
57+
// If WriteHeader is not called explicitly, the first call to Write
58+
// will trigger an implicit WriteHeader(http.StatusOK).
59+
if serverResp.StatusCode != 200 {
60+
w.WriteHeader(serverResp.StatusCode)
61+
}
6362

6463
// send every header received
65-
for name, values := range resp.Header {
64+
for name, values := range serverResp.Header {
6665
// Loop over all values for the name.
6766
for _, value := range values {
6867
w.Header().Add(name, value)
6968
}
7069
}
71-
// send body
72-
w.Write([]byte(body))
70+
71+
// copy body to request sent back to the agent
72+
_, err = io.Copy(w, serverResp.Body)
73+
if err != nil {
74+
log.Printf("could not read info request response to APM Server: %v", err)
75+
return
76+
}
7377
}
7478
}
7579

@@ -80,12 +84,8 @@ func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWrit
8084
w.WriteHeader(http.StatusAccepted)
8185
w.Write([]byte("ok"))
8286

83-
if r.Body == nil {
84-
log.Println("No body in agent request")
85-
return
86-
}
87-
8887
rawBytes, err := ioutil.ReadAll(r.Body)
88+
defer r.Body.Close()
8989
if err != nil {
9090
log.Println("Could not read bytes from agent request body")
9191
return

0 commit comments

Comments
 (0)