From 5f6c319a84bcaf1ab21ea5b59e8c14ca87a42073 Mon Sep 17 00:00:00 2001 From: Brandur Date: Thu, 4 Jun 2026 12:25:00 +0200 Subject: [PATCH] Add `Pilot.JobRescueMany` for active job rescue This one's presented as an alternative to #1256 with its own alternative Pro implementation and which may provide some superior properties around potential bloat for table bloat in `river_job`. Changes: * Pilot picks up a new `JobRescueMany` function. In the `StandardPilot`, this just goes directly to the function of the same name in the driver's `Executor`. * We now inject `ProducerReportInterval` in `PilotInitParams`. This value is useful to an underlying pilot in knowing what age of producer row it should respect when performing a job rescue. --- client.go | 8 +++--- client_pilot_test.go | 7 ++++-- internal/maintenance/job_rescuer.go | 14 ++++++++++- internal/maintenance/job_rescuer_test.go | 31 ++++++++++++++++++++++++ metadata_test.go | 3 ++- rivershared/riverpilot/pilot.go | 6 +++++ rivershared/riverpilot/standard_pilot.go | 4 +++ 7 files changed, 66 insertions(+), 7 deletions(-) diff --git a/client.go b/client.go index 1fb7260b..3fe931a3 100644 --- a/client.go +++ b/client.go @@ -867,9 +867,10 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.pilot = &riverpilot.StandardPilot{} } client.pilot.PilotInit(archetype, (&riverpilot.PilotInitParams{ - Insert: client.insertMany, - NotifyNonTxJobInsert: client.notifyProducerWithoutListenerJobFetch, - WorkerMetadata: workerMetadata, + Insert: client.insertMany, + NotifyNonTxJobInsert: client.notifyProducerWithoutListenerJobFetch, + ProducerReportInterval: producerReportIntervalDefault, + WorkerMetadata: workerMetadata, }).Validate()) pluginPilot, _ := client.pilot.(pilotPlugin) @@ -942,6 +943,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client { jobRescuer := maintenance.NewRescuer(archetype, &maintenance.JobRescuerConfig{ ClientRetryPolicy: config.RetryPolicy, + Pilot: client.pilot, RescueAfter: config.RescueStuckJobsAfter, Schema: config.Schema, WorkUnitFactoryFunc: func(kind string) workunit.WorkUnitFactory { diff --git a/client_pilot_test.go b/client_pilot_test.go index 7f47cd4b..55e81c1a 100644 --- a/client_pilot_test.go +++ b/client_pilot_test.go @@ -39,7 +39,7 @@ type pilotSpyTestSignals struct { PeriodicJobGetAll testsignal.TestSignal[struct{}] PeriodicJobKeepAlive testsignal.TestSignal[struct{}] PeriodicJobUpsertMany testsignal.TestSignal[struct{}] - PilotInit testsignal.TestSignal[struct{}] + PilotInit testsignal.TestSignal[*riverpilot.PilotInitParams] ProducerInit testsignal.TestSignal[struct{}] ProducerKeepAlive testsignal.TestSignal[struct{}] ProducerShutdown testsignal.TestSignal[struct{}] @@ -106,7 +106,7 @@ func (p *pilotSpy) PeriodicJobUpsertMany(ctx context.Context, exec riverdriver.E func (p *pilotSpy) PilotInit(archetype *baseservice.Archetype, params *riverpilot.PilotInitParams) { p.pilotInitCalls.Add(1) - p.testSignals.PilotInit.Signal(struct{}{}) + p.testSignals.PilotInit.Signal(params) p.StandardPilot.PilotInit(archetype, params) } @@ -150,6 +150,7 @@ func Test_Client_PilotUsage(t *testing.T) { } pilot := &pilotSpy{} + pilot.testSignals.Init(t) pluginDriver := newDriverWithPlugin(t, dbPool) pluginDriver.pilot = pilot @@ -187,6 +188,8 @@ func Test_Client_PilotUsage(t *testing.T) { require.NotNil(t, client) require.Equal(t, int64(1), pilot.jobCleanerQueuesExcludedCalls.Load()) require.Equal(t, int64(1), pilot.pilotInitCalls.Load()) + pilotInitParams := pilot.testSignals.PilotInit.WaitOrTimeout() + require.Equal(t, producerReportIntervalDefault, pilotInitParams.ProducerReportInterval) }) t.Run("JobCancelUsesPilot", func(t *testing.T) { diff --git a/internal/maintenance/job_rescuer.go b/internal/maintenance/job_rescuer.go index c7353b36..22adebc0 100644 --- a/internal/maintenance/job_rescuer.go +++ b/internal/maintenance/job_rescuer.go @@ -14,6 +14,7 @@ import ( "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/circuitbreaker" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/startstop" "github.com/riverqueue/river/rivershared/testsignal" @@ -50,6 +51,9 @@ type JobRescuerConfig struct { // Interval is the amount of time to wait between runs of the rescuer. Interval time.Duration + // Pilot controls driver-level behavior that can be customized by plugins. + Pilot riverpilot.Pilot + // RescueAfter is the amount of time for a job to be active before it is // considered stuck and should be rescued. RescueAfter time.Duration @@ -70,6 +74,9 @@ func (c *JobRescuerConfig) mustValidate() *JobRescuerConfig { if c.Interval <= 0 { panic("RescuerConfig.Interval must be above zero") } + if c.Pilot == nil { + panic("RescuerConfig.Pilot must be set") + } if c.RescueAfter <= 0 { panic("RescuerConfig.JobDuration must be above zero") } @@ -103,12 +110,17 @@ type JobRescuer struct { func NewRescuer(archetype *baseservice.Archetype, config *JobRescuerConfig, exec riverdriver.Executor) *JobRescuer { batchSizes := config.WithDefaults() + pilot := config.Pilot + if pilot == nil { + pilot = &riverpilot.StandardPilot{} + } return baseservice.Init(archetype, &JobRescuer{ Config: (&JobRescuerConfig{ BatchSizes: batchSizes, ClientRetryPolicy: config.ClientRetryPolicy, Interval: cmp.Or(config.Interval, JobRescuerIntervalDefault), + Pilot: pilot, RescueAfter: cmp.Or(config.RescueAfter, JobRescuerRescueAfterDefault), Schema: config.Schema, WorkUnitFactoryFunc: config.WorkUnitFactoryFunc, @@ -253,7 +265,7 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error) } if len(rescueManyParams.ID) > 0 { - _, err = s.exec.JobRescueMany(ctx, &rescueManyParams) + _, err = s.Config.Pilot.JobRescueMany(ctx, s.exec, &rescueManyParams) if err != nil { return nil, fmt.Errorf("error rescuing stuck jobs: %w", err) } diff --git a/internal/maintenance/job_rescuer_test.go b/internal/maintenance/job_rescuer_test.go index d1f44ffd..6de6b32e 100644 --- a/internal/maintenance/job_rescuer_test.go +++ b/internal/maintenance/job_rescuer_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "sync/atomic" "testing" "time" @@ -14,6 +15,7 @@ import ( "github.com/riverqueue/river/riverdbtest" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riverpilot" "github.com/riverqueue/river/rivershared/riversharedmaintenance" "github.com/riverqueue/river/rivershared/riversharedtest" "github.com/riverqueue/river/rivershared/startstoptest" @@ -57,6 +59,19 @@ func (p *SimpleClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time { return job.AttemptedAt.Add(timeutil.SecondsAsDuration(retrySeconds)) } +type jobRescueManyPilotSpy struct { + riverpilot.StandardPilot + + calls atomic.Int64 + params *riverdriver.JobRescueManyParams +} + +func (p *jobRescueManyPilotSpy) JobRescueMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobRescueManyParams) (*struct{}, error) { + p.calls.Add(1) + p.params = params + return p.StandardPilot.JobRescueMany(ctx, exec, params) +} + func TestJobRescuer(t *testing.T) { t.Parallel() @@ -304,6 +319,22 @@ func TestJobRescuer(t *testing.T) { riversharedtest.WaitOrTimeout(t, stopped) }) + t.Run("UsesPilotJobRescueMany", func(t *testing.T) { + t.Parallel() + + rescuer, bundle := setup(t) + pilot := &jobRescueManyPilotSpy{} + rescuer.Config.Pilot = pilot + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr(rescuerJobKind), State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(bundle.rescueHorizon.Add(-1 * time.Hour)), MaxAttempts: ptrutil.Ptr(5)}) + + _, err := rescuer.runOnce(ctx) + require.NoError(t, err) + + require.Equal(t, int64(1), pilot.calls.Load()) + require.Equal(t, []int64{job.ID}, pilot.params.ID) + }) + t.Run("CanRunMultipleTimes", func(t *testing.T) { t.Parallel() diff --git a/metadata_test.go b/metadata_test.go index 2736b0c8..94298915 100644 --- a/metadata_test.go +++ b/metadata_test.go @@ -4,8 +4,9 @@ import ( "context" "testing" - "github.com/riverqueue/river/internal/jobexecutor" "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/internal/jobexecutor" ) func TestMetadataSet(t *testing.T) { diff --git a/rivershared/riverpilot/pilot.go b/rivershared/riverpilot/pilot.go index e42ef797..ce3d7824 100644 --- a/rivershared/riverpilot/pilot.go +++ b/rivershared/riverpilot/pilot.go @@ -41,6 +41,8 @@ type Pilot interface { params *riverdriver.JobInsertFastManyParams, ) ([]*riverdriver.JobInsertFastResult, error) + JobRescueMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobRescueManyParams) (*struct{}, error) + JobRetry(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobRetryParams) (*rivertype.JobRow, error) JobSetStateIfRunningMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) @@ -75,6 +77,10 @@ type PilotInitParams struct { // latency between job insert and when a job is worked. NotifyNonTxJobInsert func(ctx context.Context, res []*rivertype.JobInsertResult) + // ProducerReportInterval is the amount of time between periodic reports of + // producer status. + ProducerReportInterval time.Duration + // WorkerMetadata is metadata about registered workers as received from the // client's worker bundle. Only available when a client will work jobs (i.e. // has Workers configured), so while it's safe to assume the presence of diff --git a/rivershared/riverpilot/standard_pilot.go b/rivershared/riverpilot/standard_pilot.go index fb6e7d80..86695344 100644 --- a/rivershared/riverpilot/standard_pilot.go +++ b/rivershared/riverpilot/standard_pilot.go @@ -39,6 +39,10 @@ func (p *StandardPilot) JobInsertMany( return exec.JobInsertFastMany(ctx, params) } +func (p *StandardPilot) JobRescueMany(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobRescueManyParams) (*struct{}, error) { + return exec.JobRescueMany(ctx, params) +} + func (p *StandardPilot) JobRetry(ctx context.Context, exec riverdriver.Executor, params *riverdriver.JobRetryParams) (*rivertype.JobRow, error) { return exec.JobRetry(ctx, params) }