Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Ignore errors like `tls: failed to send closeNotify alert (but connection was closed anyway)` when closing listeners. [PR #1216](https://github.com/riverqueue/river/pull/1216).

### Fixed

- Fixed leader election to track explicit database-issued leadership terms, reducing handoff flakiness and same-client reacquisition edge cases while making reelection and resign target the current leadership lease instead of a stale one. [PR #1213](https://github.com/riverqueue/river/pull/1213).
Expand Down
29 changes: 28 additions & 1 deletion internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"log/slog"
"slices"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -262,14 +263,40 @@ func (n *Notifier) listenerClose(ctx context.Context, skipLock bool) {

n.Logger.DebugContext(ctx, n.Name+": Listener closing")
if err := n.listener.Close(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
if shouldIgnoreListenerError(err) {
n.Logger.ErrorContext(ctx, n.Name+": Error closing listener", "err", err)
}
}

n.isConnected = false
}

// shouldIgnoreListenerError returns true if the error is a certain type of
// common error that we see when trying to close a listener.
//
// It's probably not strictly necessary to log errors on listener close, but it
// seems not ideal to ignore them completely either. If this function were ever
// to become to unwieldy though, we might want to go back to the drawing board.
func shouldIgnoreListenerError(err error) bool {
if errors.Is(err, context.Canceled) {
return true
}

// In practice, this occurs a fair bit in some systems. See:
//
// https://github.com/riverqueue/river/issues/256
//
// There's been an issue opened to make this a well-known error type, but
// it's not right now:
//
// https://github.com/golang/go/issues/75600
if strings.Contains(err.Error(), "tls: failed to send closeNotify alert") {
return true
}

return false
}

const listenerTimeout = 10 * time.Second

func (n *Notifier) listenerConnect(ctx context.Context, skipLock bool) error {
Expand Down
28 changes: 28 additions & 0 deletions internal/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,34 @@ func topicAndPayloadNotifyFunc(notifyChan chan TopicAndPayload) NotifyFunc {
}
}

func TestShouldIgnoreListenerError(t *testing.T) {
t.Parallel()

t.Run("ContextCanceled", func(t *testing.T) {
t.Parallel()

require.True(t, shouldIgnoreListenerError(context.Canceled))
})

t.Run("WrappedContextCanceled", func(t *testing.T) {
t.Parallel()

require.True(t, shouldIgnoreListenerError(fmt.Errorf("wrapped: %w", context.Canceled)))
})

t.Run("TLSCloseNotifyAlert", func(t *testing.T) {
t.Parallel()

require.True(t, shouldIgnoreListenerError(errors.New("tls: failed to send closeNotify alert (but connection was closed anyway): write tcp 10.0.5.131:40192->10.0.7.96:5432: i/o timeout")))
})

t.Run("ArbitraryError", func(t *testing.T) {
t.Parallel()

require.False(t, shouldIgnoreListenerError(errors.New("some other error")))
})
}

func sendNotification(ctx context.Context, t *testing.T, exec riverdriver.Executor, schema, topic string, payload string) {
t.Helper()

Expand Down
Loading