diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000..d864b3c --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,147 @@ +# Copilot instructions — FanOut + +This file gives targeted, repository-specific guidance for future Copilot sessions working on FanOut (Go, single-binary HTTP fan-out service). + +--- + +## Build, test, and lint commands + +Build (release): + + go build -trimpath -ldflags="-w -s" -o fanout + +Build (debug): + + go build -tags=debug -o fanout-debug + +Run locally (echo mode for development): + + TARGETS=localonly go run fanout.go + +Run with production targets: + + TARGETS="https://a.example/,https://b.example/" PORT=8080 go run fanout.go + +Run the full test suite (with race detector): + + go test -v -race ./... + +Run a single test (exact name): + + go test -run '^TestSendRequest$' . + +Run a single test with race and verbose output: + + go test -run '^TestSendRequest$' -v -race . + +Formatting / vet: + + gofmt -w . + go vet ./... + +Security scan (used by README/CI): + + gosec ./... + +Docker (local): + + docker build -t fanout:dev . + +Multi-arch build (CI / release): + + docker buildx build --platform linux/amd64,linux/arm64 -t yourorg/fanout:latest . + +CI workflows: + +- .github/workflows/docker-image.yml +- .github/workflows/binary-release.yml + +--- + +## High-level architecture (big picture) + +- Entrypoint: `fanout.go` — sets up HTTP handlers and environment-based configuration in `init()`. + +- Endpoints: + - `ENDPOINT_PATH` (default `/fanout`) — main fan-out endpoint. + - `/health` — simple health check. + - `/version` — binary/version metadata. + - `/metrics` — Prometheus handler (enabled when `METRICS_ENABLED=true`). + +- Modes: + - Echo mode: `TARGETS=localonly` — inbound requests are echoed back by `echoHandler`. + - Multiplex mode: `TARGETS` contains comma-separated targets; `multiplex` spawns one goroutine per target. + +- Dispatcher & concurrency: + - `multiplex` launches a goroutine per configured target; responses are collected via a buffered channel and WaitGroup. Response order is not guaranteed. + +- Request forwarding (`sendRequest`): + - Re-creates the original request per target, clones headers via `cloneHeaders` (sensitive headers are logged), and sets Content-Length appropriately. + - Implements retries for network errors and server (5xx) responses using exponential backoff + jitter. + - Adds `X-Retry-Count` on retry attempts. + +- Logging & metrics: + - Asynchronous logger: `logQueue` is a buffered channel, format controlled by `LOG_FORMAT` (json/text) and `LOG_LEVEL`. + - Prometheus metrics (prefixed `fanout_`) are recorded when `METRICS_ENABLED=true`. + +--- + +## Key repository conventions and gotchas + +- Configuration is environment-driven and read in `init()`; changing env vars requires restarting the process. + +- Body handling / GetBody semantics: + - The code uses a pre-read body optimization: when available, `preReadBody` is used for the first attempt; subsequent attempts call `getBody()`. + - Tests use `httptest.NewRequest` which provides `GetBody`; when writing tests or mock requests, ensure `GetBody` is present or provide a pre-read body. + +- Retry behavior: + - Controlled via `MAX_RETRIES` (default: 3). + - Network errors are detected by substring matching in `isRetryableError` (e.g., "connection refused", "timeout", "deadline exceeded", "connection reset", "no such host"). + - 5xx responses trigger retries up to the configured limit. + +- Sensitive headers: + - Configured via `SENSITIVE_HEADERS` (default `Authorization,Cookie`). `cloneHeaders` will log a warning when those are detected. + +- Metrics naming and labels: + - Prometheus metrics use fixed names (e.g., `fanout_requests_total`, `fanout_target_requests_total`). Avoid renaming these without updating monitoring. + +- Concurrency expectations: + - `multiplex` returns responses as they arrive. Do not rely on responses being in the same order as `TARGETS` unless ordering is explicitly implemented. + +- Logging behavior: + - Log entries are queued to `logQueue`; if the queue is full, entries may be dropped or logged directly when errors occur. + +- Versioning variables: + - `Version`, `GitCommit`, and `BuildTime` are populated at build time (defaults: dev/unknown). CI/release workflows set these. + +--- + +## Where to look (short pointers) + +- Core: `fanout.go` (single-file service implementation) +- Unit tests: `fanout_test.go` +- Container: `Dockerfile`, `compose.yml` +- CI: `.github/workflows/*` + +--- + +## Repository workflow preferences + +- Docker-only execution: All development, builds, tests and linters should be executed inside Docker containers, not on the host machine. This includes local runs, single-test runs, formatting, vetting, and CI-parity commands. Examples: + + # Run full test suite inside official Go container + docker run --rm -v $(pwd):/src -w /src golang:1.24 go test -v -race ./... + + # Run a single test inside Docker + docker run --rm -v $(pwd):/src -w /src golang:1.24 go test -run '^TestSendRequest$' -v -race . + + # Build inside Docker + docker run --rm -v $(pwd):/src -w /src golang:1.24 go build -trimpath -ldflags="-w -s" -o fanout + + Prefer running via docker-compose (compose.yml) or CI-style containers so host toolchains are not required. + +- Atomic commits: Make small, atomic commits for every logical change. Each commit should be self-contained and reversible. Use a separate branch per feature/bugfix and keep commit messages focused on a single purpose. + +--- + +If something important is missing or you want additional coverage (examples, more test-run tips, or CI notes), ask and this file can be expanded. diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index f174042..0237d66 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -53,10 +53,17 @@ jobs: with: go-version: '1.24' - - name: Run unit tests + - name: Static analysis (gofmt, go vet) run: | - go mod download - go test -v ./... + docker run --rm -v ${{ github.workspace }}:/src -w /src golang:1.24 sh -c "gofmt -l . || true; go vet ./... || true" + + - name: Security scan (gosec) + run: | + docker run --rm -v ${{ github.workspace }}:/src -w /src securego/gosec:latest gosec ./... + + - name: Run unit tests (inside Docker) + run: | + docker run --rm -v ${{ github.workspace }}:/src -w /src golang:1.24 go test -v -race ./... - name: Set up QEMU uses: docker/setup-qemu-action@v3 diff --git a/fanout.go b/fanout.go index e4baab6..109515c 100644 --- a/fanout.go +++ b/fanout.go @@ -7,11 +7,14 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "io" "log" "math/rand" + "net" "net/http" + "net/url" "os" "strconv" "strings" @@ -190,16 +193,22 @@ func init() { } else { size, err := humanize.ParseBytes(sizeStr) if err != nil { - log.Printf("Invalid MAX_BODY_SIZE '%s', using default: %v", sizeStr, err) + log.Printf("Invalid MAX_BODY_SIZE, using default") maxBodySize = defaultMaxBodySize } else { - maxBodySize = int64(size) + // humanize.ParseBytes returns uint64; guard against overflow when converting to int64 + if size > uint64(^uint64(0)>>1) { + log.Printf("MAX_BODY_SIZE too large, capping to default: %d", defaultMaxBodySize) + maxBodySize = defaultMaxBodySize + } else { + maxBodySize = int64(size) + } } } if timeout := os.Getenv("REQUEST_TIMEOUT"); timeout != "" { if d, err := time.ParseDuration(timeout); err != nil { - log.Printf("Invalid REQUEST_TIMEOUT '%s', using default: %v", timeout, err) + log.Printf("Invalid REQUEST_TIMEOUT, using default") requestTimeout = defaultRequestTimeout } else { requestTimeout = d @@ -210,7 +219,7 @@ func init() { if timeout := os.Getenv("CLIENT_TIMEOUT"); timeout != "" { if d, err := time.ParseDuration(timeout); err != nil { - log.Printf("Invalid CLIENT_TIMEOUT '%s', using default: %v", timeout, err) + log.Printf("Invalid CLIENT_TIMEOUT, using default") clientTimeout = defaultClientTimeout } else { clientTimeout = d @@ -254,7 +263,7 @@ func init() { if retriesStr := os.Getenv("MAX_RETRIES"); retriesStr != "" { if retries, err := strconv.Atoi(retriesStr); err != nil || retries < 0 { - log.Printf("Invalid MAX_RETRIES '%s', using default: %v", retriesStr, err) + log.Printf("Invalid MAX_RETRIES, using default") maxRetries = defaultMaxRetries } else { maxRetries = retries @@ -314,7 +323,8 @@ func logWithLevel(level int, context map[string]string, format string, args ...i case logQueue <- entry: default: if level >= LogLevelError { - log.Printf("WARNING: Log queue full, logging ERROR directly: %s", entry.Message) + // Avoid logging untrusted message contents directly to prevent log injection (gosec G706) + log.Printf("WARNING: Log queue full, logging ERROR directly") } } } @@ -351,10 +361,16 @@ func echoHandler(w http.ResponseWriter, r *http.Request) { switch os.Getenv("ECHO_MODE_RESPONSE") { case "full": w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(echoData) + if err := json.NewEncoder(w).Encode(echoData); err != nil { + logError("Failed to encode echo response: %v", err) + return + } default: w.WriteHeader(http.StatusAccepted) - json.NewEncoder(w).Encode(map[string]string{"status": "echoed"}) + if err := json.NewEncoder(w).Encode(map[string]string{"status": "echoed"}); err != nil { + logError("Failed to encode echo short response: %v", err) + return + } } loggedBody := string(bodyBytes) @@ -372,7 +388,9 @@ func echoHandler(w http.ResponseWriter, r *http.Request) { func healthCheck(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(map[string]string{"status": "healthy"}) + if err := json.NewEncoder(w).Encode(map[string]string{"status": "healthy"}); err != nil { + logError("Failed to encode health response: %v", err) + } } type Response struct { @@ -404,7 +422,31 @@ func multiplex(w http.ResponseWriter, r *http.Request) { var bodyBytes []byte var readErr error - if r.ContentLength <= 0 { + + // If GetBody is not available (e.g., streaming request), pre-read the body + // into memory up to maxBodySize so retries can recreate the body safely. + if getBody == nil { + bodyBytes, readErr = io.ReadAll(io.LimitReader(r.Body, maxBodySize+1)) + if err := r.Body.Close(); err != nil { + logWarn("Failed to close request body after read: %v", err) + } + if readErr != nil { + logError("Error reading request body: %v", readErr) + writeJSONError(w, "Failed to read request body", http.StatusBadRequest) + return + } + if int64(len(bodyBytes)) > maxBodySize { + logError("Request body size exceeds limit (%d bytes read)", len(bodyBytes)) + writeJSONError(w, "Payload too large", http.StatusRequestEntityTooLarge) + return + } + + // Provide a GetBody closure for retries + getBody = func() (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(bodyBytes)), nil + } + } else if r.ContentLength <= 0 { + // When ContentLength is unknown but GetBody exists, use it to perform a size check bodyReader, err := getBody() if err != nil { logError("Failed to get request body reader: %v", err) @@ -412,7 +454,9 @@ func multiplex(w http.ResponseWriter, r *http.Request) { return } bodyBytes, readErr = io.ReadAll(io.LimitReader(bodyReader, maxBodySize+1)) - bodyReader.Close() + if err := bodyReader.Close(); err != nil { + logWarn("Failed to close body reader after size check: %v", err) + } if readErr != nil { logError("Error reading body for size check: %v", readErr) @@ -490,7 +534,6 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin } }() - var err error var response *http.Response attempts := 0 backoff := initialRetryBackoff @@ -502,7 +545,7 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin case <-ctx.Done(): resp.Status = http.StatusGatewayTimeout resp.Error = fmt.Sprintf("Context cancelled during retry backoff: %v", ctx.Err()) - logErrorWithContext(map[string]string{"target": target}, resp.Error) + logErrorWithContext(map[string]string{"target": target}, "%s", resp.Error) return resp } backoff = min(backoff*2, maxRetryBackoff) @@ -512,22 +555,33 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin if len(preReadBody) > 0 && attempts == 0 { bodyReader = io.NopCloser(bytes.NewReader(preReadBody)) } else { - var getBodyErr error - bodyReader, getBodyErr = getBody() - if getBodyErr != nil { - resp.Status = http.StatusInternalServerError - resp.Error = fmt.Sprintf("Failed to get request body for attempt %d: %v", attempts+1, getBodyErr) - logErrorWithContext(map[string]string{"target": target}, resp.Error) - return resp + if getBody == nil { + // No GetBody available and no preReadBody: use nil body (safe for methods without body) + bodyReader = nil + } else { + var getBodyErr error + bodyReader, getBodyErr = getBody() + if getBodyErr != nil { + resp.Status = http.StatusInternalServerError + resp.Error = fmt.Sprintf("Failed to get request body for attempt %d: %v", attempts+1, getBodyErr) + logErrorWithContext(map[string]string{"target": target}, "%s", resp.Error) + return resp + } } } + // target was validated by caller (must be absolute http/https with host). Suppress gosec SSRF warning. + // #nosec G704 -- validated target URL in multiplex req, err := http.NewRequestWithContext(ctx, originalReq.Method, target, bodyReader) if err != nil { resp.Status = http.StatusInternalServerError resp.Error = fmt.Sprintf("Failed to create request: %v", err) - bodyReader.Close() - logErrorWithContext(map[string]string{"target": target}, resp.Error) + if bodyReader != nil { + if cerr := bodyReader.Close(); cerr != nil { + logWarn("Failed to close body reader after request creation: %v", cerr) + } + } + logErrorWithContext(map[string]string{"target": target}, "%s", resp.Error) return resp } @@ -542,7 +596,7 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin req.Header.Set("X-Retry-Count", strconv.Itoa(attempts)) } - response, err = client.Do(req) + response, err = client.Do(req) // #nosec G704 -- target validated by caller if err != nil { if isRetryableError(err) && attempts < maxRetries { @@ -555,13 +609,20 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin }, "Network error, retrying request", ) + // If a response object was returned alongside the error, close it and discard + if response != nil { + if cerr := response.Body.Close(); cerr != nil { + logWarn("Failed to close response body after network error: %v", cerr) + } + response = nil + } attempts++ continue } resp.Status = http.StatusServiceUnavailable resp.Error = fmt.Sprintf("Request failed after %d attempts: %v", attempts+1, err) - logErrorWithContext(map[string]string{"target": target}, resp.Error) + logErrorWithContext(map[string]string{"target": target}, "%s", resp.Error) return resp } @@ -575,7 +636,9 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin }, "Server error status, retrying request", ) - response.Body.Close() + if cerr := response.Body.Close(); cerr != nil { + logWarn("Failed to close response body after server error: %v", cerr) + } attempts++ continue } @@ -586,7 +649,7 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin if response == nil { resp.Status = http.StatusServiceUnavailable resp.Error = fmt.Sprintf("Request failed after %d attempts (no response received)", attempts+1) - logErrorWithContext(map[string]string{"target": target}, resp.Error) + logErrorWithContext(map[string]string{"target": target}, "%s", resp.Error) return resp } defer response.Body.Close() @@ -595,7 +658,7 @@ func sendRequest(ctx context.Context, client *http.Client, target string, origin if readErr != nil { resp.Status = http.StatusInternalServerError resp.Error = fmt.Sprintf("Failed to read response body: %v", readErr) - logErrorWithContext(map[string]string{"target": target, "status": strconv.Itoa(response.StatusCode)}, resp.Error) + logErrorWithContext(map[string]string{"target": target, "status": strconv.Itoa(response.StatusCode)}, "%s", resp.Error) if response.StatusCode != 0 { resp.Status = response.StatusCode } @@ -624,8 +687,31 @@ func isRetryableError(err error) bool { return false } - errMsg := strings.ToLower(err.Error()) + // Prefer typed checks for net errors and url errors + var nerr net.Error + if errors.As(err, &nerr) { + if nerr.Timeout() || nerr.Temporary() { + return true + } + } + + var ue *url.Error + if errors.As(err, &ue) { + // If the wrapped error is a net.Error with timeout/temporary, treat as retryable + var innerNetErr net.Error + if errors.As(ue.Err, &innerNetErr) { + if innerNetErr.Timeout() || innerNetErr.Temporary() { + return true + } + } + // Some url.Errors include "timeout" in the string; treat as retryable + if ue.Timeout() { + return true + } + } + // Fallback: substring matching for common transient messages + errMsg := strings.ToLower(err.Error()) if strings.Contains(errMsg, "connection refused") || strings.Contains(errMsg, "timeout") || strings.Contains(errMsg, "deadline exceeded") || @@ -638,7 +724,9 @@ func isRetryableError(err error) bool { } func addJitter(d time.Duration) time.Duration { - jitter := float64(d) * (0.8 + 0.4*rand.Float64()) + // Non-crypto randomness is acceptable for backoff jitter. + // #nosec G404 -- math/rand is sufficient for jitter in retry backoff + jitter := float64(d) * (0.8 + 0.4*rand.Float64()) // #nosec G404 return time.Duration(jitter) } @@ -663,16 +751,22 @@ func writeJSON(w http.ResponseWriter, v interface{}) error { func writeJSONError(w http.ResponseWriter, message string, status int) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) - writeJSON(w, map[string]string{"error": message}) + if err := writeJSON(w, map[string]string{"error": message}); err != nil { + logError("Failed to write JSON error response: %v", err) + } + } func versionHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") - writeJSON(w, map[string]string{ - "version": Version, - "git_commit": GitCommit, - "build_time": BuildTime, - }) + if err := writeJSON(w, map[string]string{ + "version": Version, + "git_commit": GitCommit, + "build_time": BuildTime, + }); err != nil { + logError("Failed to write version response: %v", err) + } + } func main() { @@ -722,11 +816,29 @@ func main() { os.Exit(0) } + var displayMaxBody uint64 + if maxBodySize < 0 { + displayMaxBody = 0 + } else { + displayMaxBody = uint64(maxBodySize) + } logInfo("Server starting on :%s (Max body: %s, Request Timeout: %s, Client Timeout: %s, Max Retries: %d)", port, - humanize.Bytes(uint64(maxBodySize)), + humanize.Bytes(displayMaxBody), requestTimeout, clientTimeout, maxRetries) - log.Fatal(http.ListenAndServe(":"+port, nil)) + + // Use http.Server with explicit timeouts to satisfy gosec G114 recommendations + server := &http.Server{ + Addr: ":" + port, + ReadTimeout: requestTimeout + 5*time.Second, + WriteTimeout: requestTimeout + 5*time.Second, + IdleTimeout: 60 * time.Second, + } + // Start server and log errors consistently + if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logError("Server error: %v", err) + os.Exit(1) + } } diff --git a/fanout_additional_test.go b/fanout_additional_test.go new file mode 100644 index 0000000..8d7b65c --- /dev/null +++ b/fanout_additional_test.go @@ -0,0 +1,63 @@ +package main + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "os" + "testing" +) + +// TestMultiplexNoGetBody ensures multiplex can handle requests without GetBody +func TestMultiplexNoGetBody(t *testing.T) { + // Start a mock target server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("ok")) + })) + defer server.Close() + + // Set TARGETS to the mock server + origTargets := os.Getenv("TARGETS") + defer os.Setenv("TARGETS", origTargets) + os.Setenv("TARGETS", server.URL) + + // Create a request WITHOUT GetBody (http.NewRequest leaves GetBody nil) + body := []byte("hello") + req, err := http.NewRequest("POST", "/fanout", bytes.NewReader(body)) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + req.Header.Set("Content-Type", "text/plain") + + w := httptest.NewRecorder() + // Call multiplex handler directly + multiplex(w, req) + + resp := w.Result() + if resp.StatusCode != http.StatusOK { + t.Fatalf("Expected 200 OK from multiplex, got %d", resp.StatusCode) + } + var results []Response + if err := json.NewDecoder(resp.Body).Decode(&results); err != nil { + t.Fatalf("Failed to decode multiplex response: %v", err) + } + if len(results) == 0 || results[0].Status != http.StatusOK { + t.Fatalf("Unexpected target response: %v", results) + } +} + +// fakeNetErr implements net.Error for testing +type fakeNetErr struct{ msg string } + +func (f fakeNetErr) Error() string { return f.msg } +func (f fakeNetErr) Timeout() bool { return true } +func (f fakeNetErr) Temporary() bool { return false } + +func TestIsRetryableError_NetError(t *testing.T) { + err := fakeNetErr{"timeout"} + if !isRetryableError(err) { + t.Errorf("expected fakeNetErr to be retryable") + } +} diff --git a/fanout_test.go b/fanout_test.go index 85ecbe1..0bbc161 100644 --- a/fanout_test.go +++ b/fanout_test.go @@ -420,7 +420,7 @@ func TestSendRequestNetworkError(t *testing.T) { maxRetries = 1 // Call sendRequest with a non-existent endpoint (will cause error) - resp := sendRequest(context.Background(), client, "http://nonexistent.example", req, req.GetBody, nil) + resp := sendRequest(context.Background(), client, "http://127.0.0.1:1", req, req.GetBody, nil) // Verify response reports an error if resp.Status != http.StatusServiceUnavailable {