From ebedde2b2f23c5ee09da254b91f22f3095a717c3 Mon Sep 17 00:00:00 2001 From: Brandur Date: Mon, 1 Jun 2026 14:56:33 +0200 Subject: [PATCH] Standardize "hot" operation timeout This one's a simpler alternative to #1263. All we do is take the existing timeout added in #1255 and standardize it with the timeout we use for job completion, putting a comment on the two saying we should revisit. I decided to leave it as 10 seconds after all (instead of changing it to 30 seconds like I had in #1263) after seeing that job completion already uses 10 seconds. This may still not be long enough for a jobs table in serious trouble due to dead tuples, but if it's not, job completion is already going to be in trouble so it's likely things have degenerated. --- CHANGELOG.md | 6 ++---- internal/jobcompleter/job_completer.go | 7 +++---- internal/rivercommon/river_common.go | 11 +++++++++++ rivershared/riverpilot/standard_pilot.go | 6 ++---- 4 files changed, 18 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d734eb3..38a85e0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,14 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259) + ### Added - Added `MetadataSet` to stage job metadata updates from worker middleware, `HookWorkBegin`, workers, or `HookWorkEnd`, with changes persisted when the job is completed. [PR #1269](https://github.com/riverqueue/river/pull/1269) -### Fixed - -⚠️ **Breaking API change:** `rivermigrate.Migrator.Validate` and `rivermigrate.Migrator.ValidateTx` now take a `*rivermigrate.ValidateOpts` parameter. Pass `nil` to preserve previous behavior. We normally endeavor not to make any breaking API changes, but this one will keep the API in a much nicer state, and is on an ancillary function that most installations won't be using. [PR #1259](https://github.com/riverqueue/river/pull/1259) - ### Changed - Added `rivermigrate.ValidateOpts.TargetVersion` so validation can check migrations up to a specific target version, matching the target-version behavior available on `Migrate` and `MigrateTx`. Notably, this is a breaking API change as the validate functions previously didn't take any options. [PR #1259](https://github.com/riverqueue/river/pull/1259) diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 37a4d399..823394c4 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -10,6 +10,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/riverqueue/river/internal/jobstats" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/riverpilot" @@ -578,11 +579,9 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer ) for attempt := 1; attempt <= numRetries; attempt++ { - const timeout = 10 * time.Second - // I've found that we want at least ten seconds for a large batch, // although it usually doesn't need that long. - ctx, cancel := context.WithTimeout(uncancelledCtx, timeout) + ctx, cancel := context.WithTimeout(uncancelledCtx, rivercommon.HotOperationTimeout) defer cancel() retVal, err := retryFunc(ctx) @@ -603,7 +602,7 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer slog.Int("attempt", attempt), slog.String("err", err.Error()), slog.String("sleep_duration", sleepDuration.String()), - slog.String("timeout", timeout.String()), + slog.String("timeout", rivercommon.HotOperationTimeout.String()), ) if !disableSleep { serviceutil.CancellableSleep(logCtx, sleepDuration) diff --git a/internal/rivercommon/river_common.go b/internal/rivercommon/river_common.go index 986583ed..4f769b39 100644 --- a/internal/rivercommon/river_common.go +++ b/internal/rivercommon/river_common.go @@ -3,6 +3,7 @@ package rivercommon import ( "errors" "regexp" + "time" ) // These constants are made available in rivercommon so that they're accessible @@ -17,6 +18,16 @@ const ( QueueDefault = "default" ) +// HotOperationTimeout attempts to standardize timeouts for some "hot" +// operations like locking available jobs or completing finished jobs. It's +// somewhat questionable whether it makes sense to share timing on these +// queries, but for the time being it makes more sense than each part of the +// code randomly choosing its own timing. +// +// We probably want to have another look at this in the not-too-distant future +// to make sure we can't do anything a bit smarter when it comes to timeouts. +const HotOperationTimeout = 10 * time.Second + const ( // MetadataKeyPeriodicJobID is a metadata key inserted with a periodic job // when a configured periodic job has its ID property set. This lets diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index 45e2ff2b..fb6e7d80 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -3,15 +3,13 @@ package riverpilot import ( "context" "sync/atomic" - "time" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivertype" ) -const standardPilotJobGetAvailableTimeoutDefault = 10 * time.Second - type StandardPilot struct { seq atomic.Int64 } @@ -23,7 +21,7 @@ func (p *StandardPilot) JobGetAvailable(ctx context.Context, exec riverdriver.Ex return nil, nil } - ctx, cancel := context.WithTimeoutCause(ctx, standardPilotJobGetAvailableTimeoutDefault, context.DeadlineExceeded) + ctx, cancel := context.WithTimeout(ctx, rivercommon.HotOperationTimeout) defer cancel() return exec.JobGetAvailable(ctx, params)