Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/redacted-headers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"transports/http": minor
---

Add `Config.String()` that redacts `Headers` values so an accidental `log.Info(cfg)` or `fmt.Sprintf("%v", cfg)` can't leak credentials passed via `Authorization` / `X-API-Key` / similar headers. Header keys stay visible for debuggability. Mirrors the redaction shape already used by `transports/datadog`.

`defaultCheckRedirect` now compares hosts case-insensitively, so legitimate same-host redirects with mixed-case URLs (`Example.COM` → `example.com`) aren't refused. Cross-host refusal still applies; ports are still compared exactly.

New `Config.ShutdownTimeout` (default 5s) bounds how long `Close` waits for in-flight requests to finish during shutdown. When the timeout elapses, the worker's outbound HTTP requests are cancelled via context so `Close` can return even if the endpoint is wedged; previously a stuck endpoint could pin `Close` for up to the per-request `Client.Timeout` (30s default), and the parent `flushTransports`'s 5s timeout would leak the close goroutine. Outbound requests are now built via `http.NewRequestWithContext`.
15 changes: 8 additions & 7 deletions docs/src/public/llms-full.txt
Original file line number Diff line number Diff line change
Expand Up @@ -727,14 +727,15 @@ tr := httptr.New(httptr.Config{
URL: "https://logs.example.com/ingest",
Method: "POST", // default POST
Headers: map[string]string{"Authorization": "Bearer " + token},
BatchSize: 100, // default
BatchInterval: 5 * time.Second, // default
BufferSize: 1024, // channel capacity
Client: &http.Client{Timeout: 30 * time.Second},
Encoder: httptr.JSONArrayEncoder,
OnError: func(err error, entries []httptr.Entry) { /* report */ },
BatchSize: 100, // default
BatchInterval: 5 * time.Second, // default
BufferSize: 1024, // channel capacity
ShutdownTimeout: 5 * time.Second, // bounds Close; cancels in-flight requests on overflow
Client: &http.Client{Timeout: 30 * time.Second},
Encoder: httptr.JSONArrayEncoder,
OnError: func(err error, entries []httptr.Entry) { /* report */ },
})
defer tr.Close() // flush pending entries
defer tr.Close() // flush pending entries; bounded by ShutdownTimeout
```

### File (Lumberjack)
Expand Down
10 changes: 9 additions & 1 deletion docs/src/transports/http.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Config struct {
BatchInterval time.Duration // default 5s
BufferSize int // default 1024

ShutdownTimeout time.Duration // default 5s

OnError func(err error, entries []Entry) // default writes to os.Stderr
}
```
Expand Down Expand Up @@ -142,6 +144,12 @@ Size of the internal channel buffering entries between `SendToLogger` and the wo

When the buffer is full, entries are **dropped** and `OnError(ErrBufferFull, [entry])` is called. The dispatch path (the `log.Info(...)` caller's goroutine) never blocks.

### `ShutdownTimeout`

Caps how long `Close` waits for in-flight HTTP requests to finish during shutdown. Defaults to 5 seconds, matching loglayer's default `Config.TransportCloseTimeout`. When the timeout elapses, the worker's outbound HTTP requests are cancelled via context so `Close` can return even if the endpoint is wedged; queued-but-unsent entries surface via `OnError` as `context.Canceled`.

Without this bound, a stuck endpoint could pin `Close` for up to the per-request `Client.Timeout` (30s by default) per pending batch, and the parent `flushTransports` (`loglayer.Config.TransportCloseTimeout`, 5s default) would abandon the close goroutine on overflow rather than tear it down.

### `OnError`

Called when something goes wrong:
Expand Down Expand Up @@ -183,7 +191,7 @@ log := loglayer.New(loglayer.Config{Transport: tr, ...})
defer tr.Close()
```

After `Close`, subsequent `SendToLogger` calls drop the entry and invoke `OnError(ErrClosed, nil)`. `Close` is idempotent.
After `Close`, subsequent `SendToLogger` calls drop the entry and invoke `OnError(ErrClosed, nil)`. `Close` is idempotent and bounded by [`ShutdownTimeout`](#shutdowntimeout).

## Custom Encoder Example

Expand Down
8 changes: 8 additions & 0 deletions docs/src/whats-new.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ Initial release. New [CLI transport](/transports/cli) tuned for command-line app

New `Config.TableColumnOrder []string` knob pins the leading column order for slice-of-map metadata table rendering. Keys named here render in the listed order; the rest sort lexicographically and follow. Empty / nil keeps the previous fully-lexicographic behavior. See [Pinning column order](/transports/cli#pinning-column-order).

`transports/http`:

New `Config.String()` redacts `Headers` values so an accidental `log.Info(cfg)` or `fmt.Sprintf("%v", cfg)` can't leak credentials passed via `Authorization` / `X-API-Key` / similar headers. Header keys stay visible for debuggability. Mirrors the redaction shape already used by `transports/datadog`.

`defaultCheckRedirect` now compares hosts case-insensitively, so legitimate same-host redirects with mixed-case URLs aren't refused. Cross-host refusal still applies; ports are still compared exactly.

New `Config.ShutdownTimeout` (default 5s) bounds how long `Close` waits for in-flight requests to finish during shutdown. When the timeout elapses, the worker's outbound HTTP requests are cancelled via context so `Close` can return even if the endpoint is wedged; previously a stuck endpoint could pin `Close` for up to the per-request `Client.Timeout` (30s default), and the parent `flushTransports`'s 5s timeout would leak the close goroutine.

## Apr 30, 2026

`transports/gcplogging`:
Expand Down
20 changes: 20 additions & 0 deletions transports/datadog/datadog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,26 @@ func TestDatadog_ConfigStringRedactsAPIKey(t *testing.T) {
}
}

// The embedded HTTP config's Headers map is the realistic
// credential-leak surface for downstream wrappers (callers reach for it
// to add custom auth headers alongside DD-API-KEY). httptr.Config.String
// redacts those values; assert that an accidental
// fmt.Sprintf("%v", cfg.HTTP) doesn't ship them.
func TestDatadog_ConfigHTTPHeadersRedacted(t *testing.T) {
const secret = "Bearer custom-auth-token-keep-me-out-of-logs"
httpCfg := httptr.Config{
Headers: map[string]string{"Authorization": secret},
}

v := fmt.Sprintf("%v", httpCfg)
if strings.Contains(v, secret) {
t.Errorf("HTTP.Headers value leaked through %%v: %s", v)
}
if !strings.Contains(v, "Authorization") {
t.Errorf("HTTP.Headers key should be preserved for debuggability: %s", v)
}
}

// The APIKey field is tagged json:"-" so it's never included in JSON
// output. Closes the defense-in-depth leak path where a user does
// log.WithMetadata(cfg).Info(...) through a JSON-emitting transport.
Expand Down
72 changes: 66 additions & 6 deletions transports/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ package httptransport

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -27,10 +29,11 @@ import (
)

const (
defaultBatchSize = 100
defaultBatchInterval = 5 * time.Second
defaultBufferSize = 1024
defaultClientTimeout = 30 * time.Second
defaultBatchSize = 100
defaultBatchInterval = 5 * time.Second
defaultBufferSize = 1024
defaultClientTimeout = 30 * time.Second
defaultShutdownTimeout = 5 * time.Second
)

// Encoder serializes a batch of entries into the HTTP request body. The
Expand Down Expand Up @@ -120,12 +123,47 @@ type Config struct {
// 1024.
BufferSize int

// ShutdownTimeout caps how long Close waits for in-flight requests to
// finish during shutdown. Once exceeded, the worker's outbound HTTP
// requests are cancelled via context so Close can return even if the
// endpoint is wedged; queued-but-unsent entries surface via OnError as
// context.Canceled. Defaults to 5 seconds, matching loglayer's default
// Config.TransportCloseTimeout. Zero or negative uses the default.
ShutdownTimeout time.Duration

// OnError is called when a batch fails to encode or send. The default
// writes a one-line error to os.Stderr. Use this to plumb send errors
// into a separate logger or metrics counter.
OnError func(err error, entries []Entry)
}

// String returns a redacted form of the config so that an accidental
// log.Info(cfg) (or fmt.Sprintf("%v", cfg)) can't ship Headers values
// (Authorization, X-API-Key, etc.). Header keys are preserved so the
// call site stays debuggable; values are replaced with a fixed mask
// regardless of length.
//
// Note: Go's fmt verbs %+v and %#v intentionally bypass Stringer and
// always print struct fields. Code that uses those verbs against
// Config will see the raw Headers. Reserve %+v / %#v for debugger-style
// inspection, never for production logs.
func (c Config) String() string {
var maskedHeaders map[string]string
if len(c.Headers) > 0 {
maskedHeaders = make(map[string]string, len(c.Headers))
for k := range c.Headers {
maskedHeaders[k] = "***redacted***"
}
}
// Spell out the fields explicitly rather than %+v on `c` so a future
// field addition doesn't silently expose new sensitive content. Keep
// the order matching the struct for readability.
return fmt.Sprintf(
"httptransport.Config{URL:%q Method:%q Headers:%v BatchSize:%d BatchInterval:%v BufferSize:%d ShutdownTimeout:%v}",
c.URL, c.Method, maskedHeaders, c.BatchSize, c.BatchInterval, c.BufferSize, c.ShutdownTimeout,
)
}

// Transport implements loglayer.Transport with batched HTTP delivery.
type Transport struct {
transport.BaseTransport
Expand All @@ -135,6 +173,13 @@ type Transport struct {
wg sync.WaitGroup
closed atomic.Bool
closeMu sync.RWMutex // SendToLogger holds RLock; Close takes Lock to drain in-flight sends.
// reqCtx is intentionally a struct field: its lifetime is the
// transport's lifetime, not a single request's. It exists only so
// Close can cancel any in-flight Client.Do via cancelReq once
// ShutdownTimeout elapses; outbound requests never carry a
// caller-provided context (the dispatch path is async).
reqCtx context.Context
cancelReq context.CancelFunc
}

// New constructs an HTTP Transport and starts its background worker.
Expand Down Expand Up @@ -176,15 +221,21 @@ func Build(cfg Config) (*Transport, error) {
if cfg.BufferSize <= 0 {
cfg.BufferSize = defaultBufferSize
}
if cfg.ShutdownTimeout <= 0 {
cfg.ShutdownTimeout = defaultShutdownTimeout
}
if cfg.OnError == nil {
cfg.OnError = defaultOnError
}

reqCtx, cancelReq := context.WithCancel(context.Background())
t := &Transport{
BaseTransport: transport.NewBaseTransport(cfg.BaseConfig),
cfg: cfg,
queue: make(chan Entry, cfg.BufferSize),
done: make(chan struct{}),
reqCtx: reqCtx,
cancelReq: cancelReq,
}
t.wg.Add(1)
go t.worker()
Expand Down Expand Up @@ -239,14 +290,23 @@ func (t *Transport) SendToLogger(params loglayer.TransportParams) {
// to complete; once they have, no new SendToLogger can land an entry in the
// queue (the closed flag is set under the same lock), so the worker's
// drainAndFlush sees a stable, finite queue.
//
// Close is bounded by Config.ShutdownTimeout: when that elapses, in-flight
// HTTP requests are cancelled via context so wg.Wait can't pin Close past
// the configured bound when the endpoint is wedged. Queued-but-unsent
// entries surface via OnError as context.Canceled.
func (t *Transport) Close() error {
if !t.closed.CompareAndSwap(false, true) {
return nil
}
t.closeMu.Lock()
close(t.done)
t.closeMu.Unlock()

timer := time.AfterFunc(t.cfg.ShutdownTimeout, t.cancelReq)
t.wg.Wait()
timer.Stop()
t.cancelReq() // idempotent; ensures the request context is released.
return nil
}

Expand Down Expand Up @@ -335,7 +395,7 @@ func (t *Transport) flush(entries []Entry) {
return
}

req, err := http.NewRequest(t.cfg.Method, t.cfg.URL, bytes.NewReader(body))
req, err := http.NewRequestWithContext(t.reqCtx, t.cfg.Method, t.cfg.URL, bytes.NewReader(body))
if err != nil {
t.cfg.OnError(fmt.Errorf("loglayer/transports/http: build request: %w", err), entries)
return
Expand Down Expand Up @@ -381,7 +441,7 @@ func defaultCheckRedirect(req *http.Request, via []*http.Request) error {
if len(via) == 0 {
return nil
}
if req.URL.Host != via[0].URL.Host {
if !strings.EqualFold(req.URL.Host, via[0].URL.Host) {
return fmt.Errorf("loglayer/transports/http: refusing cross-host redirect from %q to %q", via[0].URL.Host, req.URL.Host)
}
return nil
Expand Down
100 changes: 100 additions & 0 deletions transports/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package httptransport_test
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -623,3 +624,102 @@ func TestHTTP_EntryCarriesGroups(t *testing.T) {
t.Errorf("entry 1 Groups: got %v, want nil", captured[1])
}
}

// Close is bounded by Config.ShutdownTimeout: when the endpoint is wedged,
// in-flight HTTP requests are cancelled via context so wg.Wait can't pin
// Close past the configured bound (otherwise Close would wait for the
// underlying Client.Timeout, 30s by default).
func TestHTTP_CloseBoundedByShutdownTimeout(t *testing.T) {
started := make(chan struct{}, 1)
block := make(chan struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, _ = io.Copy(io.Discard, r.Body)
select {
case started <- struct{}{}:
default:
}
<-block // pin the request until the test ends
}))
// Defer LIFO: srv.Close runs after close(block) so the pinned handler
// is released first; otherwise srv.Close would deadlock waiting for
// the handler to return.
defer srv.Close()
defer close(block)

var sendErrs atomic.Int32
tr := httptr.New(httptr.Config{
URL: srv.URL,
BatchSize: 1,
BatchInterval: 10 * time.Millisecond,
ShutdownTimeout: 100 * time.Millisecond,
OnError: func(_ error, _ []httptr.Entry) {
sendErrs.Add(1)
},
})
log := loglayer.New(loglayer.Config{Transport: tr, DisableFatalExit: true})
log.Info("trigger send")

select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("server didn't receive the request within 2s")
}

closeStart := time.Now()
if err := tr.Close(); err != nil {
t.Fatalf("Close: %v", err)
}
elapsed := time.Since(closeStart)

// 1s upper bound rejects regression to "blocks until Client.Timeout"
// (30s) while leaving headroom for the cancel-and-drain handshake on
// slow CI.
if elapsed > time.Second {
t.Errorf("Close took %v; expected ~ShutdownTimeout (100ms), upper bound 1s", elapsed)
}
if got := sendErrs.Load(); got == 0 {
t.Errorf("expected OnError to surface the cancelled request, got 0")
}
}

// Config.String redacts Headers values so an accidental log.Info(cfg) or
// fmt.Sprintf("%v", cfg) can't ship Authorization / X-API-Key values. Keys
// stay visible so the call site is debuggable.
func TestHTTP_ConfigStringRedactsHeaders(t *testing.T) {
const (
secretAuth = "Bearer deadbeef-secret-keep-me-out-of-logs"
secretKey = "another-secret-shhh"
)
cfg := httptr.Config{
URL: "https://example.com/logs",
Method: "POST",
Headers: map[string]string{
"Authorization": secretAuth,
"X-API-Key": secretKey,
},
}

s := cfg.String()
for _, secret := range []string{secretAuth, secretKey} {
if strings.Contains(s, secret) {
t.Errorf("header value leaked through String(): %s", s)
}
}
if !strings.Contains(s, "redacted") {
t.Errorf("String() should mark header values as redacted: %s", s)
}
for _, key := range []string{"Authorization", "X-API-Key"} {
if !strings.Contains(s, key) {
t.Errorf("header key %q should be preserved for debuggability: %s", key, s)
}
}

// fmt.Sprintf("%v", cfg) picks up String() automatically since the
// receiver is a value type.
v := fmt.Sprintf("%v", cfg)
for _, secret := range []string{secretAuth, secretKey} {
if strings.Contains(v, secret) {
t.Errorf("header value leaked through %%v: %s", v)
}
}
}
Loading
Loading