Skip to content

Commit 8e47191

Browse files
authored
Refactor route handling and server (#78)
* Refactor route handling and server * Improve variable names * Use localhost instead of ip * http.Server Addr is the TCP address for the server to listen on * Just provide the port for the http server * Add localhost to url in test * Don't reuse the server Addr when making request * Use LookupHost to get localhost ip * Try with a sleep * Use listen and server separately * Move log line, as Serve blocks
1 parent 550b9e0 commit 8e47191

File tree

5 files changed

+81
-94
lines changed

5 files changed

+81
-94
lines changed

apm-lambda-extension/extension/http_server.go

Lines changed: 14 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,50 +18,27 @@
1818
package extension
1919

2020
import (
21+
"log"
2122
"net"
2223
"net/http"
23-
"time"
2424
)
2525

26-
type serverHandler struct {
27-
data chan AgentData
28-
config *extensionConfig
29-
}
30-
31-
func (handler *serverHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
32-
if r.URL.Path == "/intake/v2/events" {
33-
handleIntakeV2Events(handler, w, r)
34-
return
35-
}
36-
37-
if r.URL.Path == "/" {
38-
handleInfoRequest(handler, w, r)
39-
return
40-
}
41-
42-
// if we have not yet returned, 404
43-
w.WriteHeader(http.StatusNotFound)
44-
w.Write([]byte("404"))
45-
46-
}
26+
var agentDataServer *http.Server
4727

48-
func NewHttpServer(dataChannel chan AgentData, config *extensionConfig) *http.Server {
49-
var handler = serverHandler{data: dataChannel, config: config}
50-
timeout := time.Duration(config.dataReceiverTimeoutSeconds) * time.Second
51-
s := &http.Server{
52-
Addr: config.dataReceiverServerPort,
53-
Handler: &handler,
54-
ReadTimeout: timeout,
55-
WriteTimeout: timeout,
56-
MaxHeaderBytes: 1 << 20,
57-
}
28+
func StartHttpServer(agentDataChan chan AgentData, config *extensionConfig) (err error) {
29+
mux := http.NewServeMux()
30+
mux.HandleFunc("/", handleInfoRequest(config.apmServerUrl))
31+
mux.HandleFunc("/intake/v2/events", handleIntakeV2Events(agentDataChan))
32+
agentDataServer = &http.Server{Addr: config.dataReceiverServerPort, Handler: mux}
5833

59-
addr := s.Addr
60-
ln, err := net.Listen("tcp", addr)
34+
ln, err := net.Listen("tcp", agentDataServer.Addr)
6135
if err != nil {
62-
return s
36+
return
6337
}
64-
go s.Serve(ln)
6538

66-
return s
39+
go func() {
40+
log.Printf("Extension listening for apm data on %s", agentDataServer.Addr)
41+
agentDataServer.Serve(ln)
42+
}()
43+
return nil
6744
}

apm-lambda-extension/extension/http_server_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package extension
1919

2020
import (
2121
"io/ioutil"
22+
"net"
2223
"net/http"
2324
"net/http/httptest"
2425
"testing"
@@ -46,15 +47,17 @@ func TestInfoProxy(t *testing.T) {
4647
apmServerUrl: apmServer.URL,
4748
apmServerSecretToken: "foo",
4849
apmServerApiKey: "bar",
49-
dataReceiverServerPort: "127.0.0.1:1234",
50+
dataReceiverServerPort: ":1234",
5051
dataReceiverTimeoutSeconds: 15,
5152
}
52-
extensionServer := NewHttpServer(dataChannel, &config)
53-
defer extensionServer.Close()
53+
54+
StartHttpServer(dataChannel, &config)
55+
defer agentDataServer.Close()
56+
57+
hosts, _ := net.LookupHost("localhost")
58+
url := "http://" + hosts[0] + ":1234"
5459

5560
// Create a request to send to the extension
56-
client := &http.Client{}
57-
url := "http://" + extensionServer.Addr
5861
req, err := http.NewRequest("GET", url, nil)
5962
if err != nil {
6063
t.Logf("Could not create request")
@@ -64,9 +67,10 @@ func TestInfoProxy(t *testing.T) {
6467
}
6568

6669
// Send the request to the extension
70+
client := &http.Client{}
6771
resp, err := client.Do(req)
6872
if err != nil {
69-
t.Logf("Error fetching %s, [%v]", extensionServer.Addr, err)
73+
t.Logf("Error fetching %s, [%v]", agentDataServer.Addr, err)
7074
t.Fail()
7175
} else {
7276
body, _ := ioutil.ReadAll(resp.Body)

apm-lambda-extension/extension/process_events.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
func ProcessShutdown() {
2727
log.Println("Received SHUTDOWN event")
2828
log.Println("Exiting")
29+
agentDataServer.Close()
2930
}
3031

3132
func FlushAPMData(client *http.Client, dataChannel chan AgentData, config *extensionConfig) {

apm-lambda-extension/extension/route_handlers.go

Lines changed: 55 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -29,68 +29,73 @@ type AgentData struct {
2929
}
3030

3131
// URL: http://server/
32-
func handleInfoRequest(handler *serverHandler, w http.ResponseWriter, r *http.Request) {
33-
client := &http.Client{}
32+
func handleInfoRequest(apmServerUrl string) func(w http.ResponseWriter, r *http.Request) {
33+
return func(w http.ResponseWriter, r *http.Request) {
34+
client := &http.Client{}
3435

35-
req, err := http.NewRequest(r.Method, handler.config.apmServerUrl, nil)
36-
//forward every header received
37-
for name, values := range r.Header {
38-
// Loop over all values for the name.
39-
for _, value := range values {
40-
req.Header.Set(name, value)
36+
req, err := http.NewRequest(r.Method, apmServerUrl, nil)
37+
//forward every header received
38+
for name, values := range r.Header {
39+
// Loop over all values for the name.
40+
for _, value := range values {
41+
req.Header.Set(name, value)
42+
}
43+
}
44+
if err != nil {
45+
log.Printf("could not create request object for %s:%s: %v", r.Method, apmServerUrl, err)
46+
return
4147
}
42-
}
43-
if err != nil {
44-
log.Printf("could not create request object for %s:%s: %v", r.Method, handler.config.apmServerUrl, err)
45-
return
46-
}
4748

48-
resp, err := client.Do(req)
49-
if err != nil {
50-
log.Printf("error forwarding info request (`/`) to APM Server: %v", err)
51-
return
52-
}
53-
defer resp.Body.Close()
54-
body, err := ioutil.ReadAll(resp.Body)
55-
if err != nil {
56-
log.Printf("could not read info request response to APM Server: %v", err)
57-
return
58-
}
49+
resp, err := client.Do(req)
50+
if err != nil {
51+
log.Printf("error forwarding info request (`/`) to APM Server: %v", err)
52+
return
53+
}
54+
defer resp.Body.Close()
55+
body, err := ioutil.ReadAll(resp.Body)
56+
if err != nil {
57+
log.Printf("could not read info request response to APM Server: %v", err)
58+
return
59+
}
5960

60-
// send status code
61-
w.WriteHeader(resp.StatusCode)
61+
// send status code
62+
w.WriteHeader(resp.StatusCode)
6263

63-
// send every header received
64-
for name, values := range resp.Header {
65-
// Loop over all values for the name.
66-
for _, value := range values {
67-
w.Header().Add(name, value)
64+
// send every header received
65+
for name, values := range resp.Header {
66+
// Loop over all values for the name.
67+
for _, value := range values {
68+
w.Header().Add(name, value)
69+
}
6870
}
71+
// send body
72+
w.Write([]byte(body))
6973
}
70-
// send body
71-
w.Write([]byte(body))
7274
}
7375

7476
// URL: http://server/intake/v2/events
75-
func handleIntakeV2Events(handler *serverHandler, w http.ResponseWriter, r *http.Request) {
76-
w.WriteHeader(http.StatusAccepted)
77-
w.Write([]byte("ok"))
77+
func handleIntakeV2Events(agentDataChan chan AgentData) func(w http.ResponseWriter, r *http.Request) {
7878

79-
if r.Body == nil {
80-
log.Println("Could not get bytes from agent request body")
81-
return
82-
}
79+
return func(w http.ResponseWriter, r *http.Request) {
80+
w.WriteHeader(http.StatusAccepted)
81+
w.Write([]byte("ok"))
8382

84-
rawBytes, err := ioutil.ReadAll(r.Body)
85-
if err != nil {
86-
log.Println("Could not read bytes from agent request body")
87-
return
88-
}
83+
if r.Body == nil {
84+
log.Println("No body in agent request")
85+
return
86+
}
8987

90-
agentData := AgentData{
91-
Data: rawBytes,
92-
ContentEncoding: r.Header.Get("Content-Encoding"),
88+
rawBytes, err := ioutil.ReadAll(r.Body)
89+
if err != nil {
90+
log.Println("Could not read bytes from agent request body")
91+
return
92+
}
93+
94+
agentData := AgentData{
95+
Data: rawBytes,
96+
ContentEncoding: r.Header.Get("Content-Encoding"),
97+
}
98+
log.Println("Adding agent data to buffer to be sent to apm server")
99+
agentDataChan <- agentData
93100
}
94-
log.Println("Adding agent data to buffer to be sent to apm server")
95-
handler.data <- agentData
96101
}

apm-lambda-extension/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func main() {
6464
// and get a channel to listen for that data
6565
agentDataChannel := make(chan extension.AgentData, 100)
6666

67-
extension.NewHttpServer(agentDataChannel, config)
67+
extension.StartHttpServer(agentDataChannel, config)
6868

6969
// Make channel for collecting logs and create a HTTP server to listen for them
7070
logsChannel := make(chan logsapi.LogEvent)

0 commit comments

Comments
 (0)