diff --git a/CHANGELOG.md b/CHANGELOG.md index 159020b0..0ce328e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Added `JobCancelTx` which marks a running job as cancelled atomically within a caller-supplied transaction, mirroring `JobCompleteTx` for the cancel path. [PR #1219](https://github.com/riverqueue/river/pull/1219) +- Added `JobFailTx` which records an attempt error and transitions the job to retryable, available, or discarded (based on attempts remaining) atomically within a caller-supplied transaction. The returned job's `State` indicates whether the job will retry, letting callers keep external state (e.g. a web-app status row) in sync with the job's outcome. [PR #1219](https://github.com/riverqueue/river/pull/1219) + ## [0.35.0] - 2026-04-18 ### Changed diff --git a/client_test.go b/client_test.go index f8d035c4..27159c0d 100644 --- a/client_test.go +++ b/client_test.go @@ -7054,6 +7054,94 @@ func Test_Client_JobCompletion(t *testing.T) { require.Equal(t, rivertype.JobStateCompleted, reloadedJob.State) require.Equal(t, updatedJob.FinalizedAt, reloadedJob.FinalizedAt) }) + + t.Run("JobThatIsCancelledManuallyIsNotTouchedByCompleter", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t, newTestConfig(t, "")) + + now := client.baseService.Time.StubNow(time.Now().UTC()) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var updatedJob *Job[JobArgs] + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + + updatedJob, err = JobCancelTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("no longer needed")) + require.NoError(t, err) + + return tx.Commit(ctx) + })) + + insertRes, err := client.Insert(ctx, JobArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateCancelled, event.Job.State) + require.NotNil(t, updatedJob) + require.Equal(t, rivertype.JobStateCancelled, updatedJob.State) + require.NotNil(t, event.Job.FinalizedAt) + require.NotNil(t, updatedJob.FinalizedAt) + require.WithinDuration(t, now, *updatedJob.FinalizedAt, time.Microsecond) + require.WithinDuration(t, *updatedJob.FinalizedAt, *event.Job.FinalizedAt, time.Microsecond) + + reloadedJob, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + + require.Equal(t, rivertype.JobStateCancelled, reloadedJob.State) + require.Equal(t, updatedJob.FinalizedAt, reloadedJob.FinalizedAt) + require.Len(t, reloadedJob.Errors, 1) + require.Equal(t, "JobCancelError: no longer needed", reloadedJob.Errors[0].Error) + }) + + t.Run("JobThatIsFailedManuallyIsNotTouchedByCompleter", func(t *testing.T) { + t.Parallel() + + client, bundle := setup(t, newTestConfig(t, "")) + + now := client.baseService.Time.StubNow(time.Now().UTC()) + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var updatedJob *Job[JobArgs] + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + + updatedJob, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("final failure")) + require.NoError(t, err) + + return tx.Commit(ctx) + })) + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 1}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateDiscarded, event.Job.State) + require.NotNil(t, updatedJob) + require.Equal(t, rivertype.JobStateDiscarded, updatedJob.State) + require.NotNil(t, event.Job.FinalizedAt) + require.NotNil(t, updatedJob.FinalizedAt) + require.WithinDuration(t, now, *updatedJob.FinalizedAt, time.Microsecond) + require.WithinDuration(t, *updatedJob.FinalizedAt, *event.Job.FinalizedAt, time.Microsecond) + + reloadedJob, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + + require.Equal(t, rivertype.JobStateDiscarded, reloadedJob.State) + require.Equal(t, updatedJob.FinalizedAt, reloadedJob.FinalizedAt) + require.Len(t, reloadedJob.Errors, 1) + require.Equal(t, "final failure", reloadedJob.Errors[0].Error) + }) } type unregisteredJobArgs struct{} diff --git a/example_cancel_job_within_tx_test.go b/example_cancel_job_within_tx_test.go new file mode 100644 index 00000000..f42a32c2 --- /dev/null +++ b/example_cancel_job_within_tx_test.go @@ -0,0 +1,194 @@ +package river_test + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/slogutil" + "github.com/riverqueue/river/rivershared/util/testutil" +) + +// UploadCancelArgs represents work on a user-initiated upload that the +// worker may discover to be unrecoverable. A separate "uploads" row is +// created by the web app in the same transaction that enqueues this job; +// the job keeps the external status in sync with River's view of +// the job's outcome. +type UploadCancelArgs struct { + UploadID int64 `json:"upload_id"` + // Valid is used to simulate validation outcomes. When false, the + // worker treats the upload as permanently unprocessable and cancels + // the job rather than retrying. + Valid bool `json:"valid"` +} + +func (UploadCancelArgs) Kind() string { return "example_upload_cancel_worker" } + +// UploadCancelWorker processes an upload. If the upload is permanently +// invalid, the worker uses JobCancelTx to record the failure: the job will +// not be retried regardless of MaxAttempts. The external "uploads" row's +// status is promoted to "cancelled" in the same transaction so the web +// app sees a consistent final state. +type UploadCancelWorker struct { + river.WorkerDefaults[UploadCancelArgs] + + dbPool *pgxpool.Pool + schema string +} + +func (w *UploadCancelWorker) Work(ctx context.Context, job *river.Job[UploadCancelArgs]) error { + tx, err := w.dbPool.Begin(ctx) + if err != nil { + // return err #1: could not begin a tx. Nothing has been done yet; + // River will retry per its policy. + return err + } + defer tx.Rollback(ctx) + + // Increment the external "attempts" counter so the web app can see + // that the worker ran. + if _, err := tx.Exec(ctx, + fmt.Sprintf(`UPDATE %q.uploads SET attempts = attempts + 1 WHERE id = $1`, w.schema), + job.Args.UploadID, + ); err != nil { + // return err #2: external UPDATE failed; tx rolls back. River retries. + return err + } + + // Perform validation. If the upload is unrecoverably invalid there's + // no point retrying - cancel the job instead. + if !job.Args.Valid { + validationErr := errors.New("upload rejected: invalid content") + + if _, err := tx.Exec(ctx, + fmt.Sprintf(`UPDATE %q.uploads SET status = 'cancelled', last_error = $1 WHERE id = $2`, w.schema), + validationErr.Error(), job.Args.UploadID, + ); err != nil { + // return err #3: the external "mark cancelled" UPDATE failed. + // Tx rolls back with everything in it. River will retry the + // job normally - which on the next run will hit the same + // validation error and (if we then reach JobCancelTx + // successfully) cancel for real. + return err + } + + if _, err := river.JobCancelTx[*riverpgxv5.Driver](ctx, tx, job, validationErr); err != nil { + // return err #4: JobCancelTx itself errored. Tx rolls back. + return err + } + + if err := tx.Commit(ctx); err != nil { + // return err #5: commit failed. Nothing persisted. River retries. + return err + } + + // return validationErr #6: the cancellation is recorded and the + // external row is in sync. Return the error up so that any + // registered HookWorkEnd hooks (and ErrorHandler) still see it + // and can log/trace it or add metadata via river.RecordOutput. + // Metadata merges on the executor's subsequent UPDATE are *not* + // guarded by state, so those land asynchronously after our tx + // commits. Hooks and handlers cannot steer the job's outcome at + // this point: the state change and any new AttemptError are + // guarded by state = 'running' in the SQL and won't apply now + // that JobCancelTx has moved the row out of that state. + return validationErr + } + + // Normal success path omitted for brevity; a real worker would do its + // work here, optionally call JobCompleteTx to finalize the job within + // this same tx, then commit. + return errors.New("success path not shown in this example") +} + +// Example_cancelJobWithinTx demonstrates how to transactionally cancel a +// job when the worker determines that further retries are pointless, +// while keeping an external status row in sync. +func Example_cancelJobWithinTx() { + ctx := context.Background() + + dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) + if err != nil { + panic(err) + } + defer dbPool.Close() + + schema := riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil) + + if _, err := dbPool.Exec(ctx, fmt.Sprintf(` + CREATE TABLE %q.uploads ( + id bigint PRIMARY KEY, + status text NOT NULL, + attempts int NOT NULL DEFAULT 0, + last_error text + )`, schema)); err != nil { + panic(err) + } + if _, err := dbPool.Exec(ctx, fmt.Sprintf(`INSERT INTO %q.uploads (id, status) VALUES (99, 'working')`, schema)); err != nil { + panic(err) + } + + workers := river.NewWorkers() + river.AddWorker(workers, &UploadCancelWorker{dbPool: dbPool, schema: schema}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})), + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 100}, + }, + Schema: schema, + TestOnly: true, + Workers: workers, + }) + if err != nil { + panic(err) + } + + // Subscribe to the cancelled-event stream so the example can wait for + // the job to be fully processed. + subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled) + defer subscribeCancel() + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + // MaxAttempts=5 to make clear the job stops on the first run because + // of the cancel - attempts remaining doesn't matter. + if _, err := riverClient.Insert(ctx, UploadCancelArgs{UploadID: 99, Valid: false}, &river.InsertOpts{MaxAttempts: 5}); err != nil { + panic(err) + } + + riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) + + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } + + var ( + status string + attempts int + lastError *string + ) + if err := dbPool.QueryRow(ctx, + fmt.Sprintf(`SELECT status, attempts, last_error FROM %q.uploads WHERE id = 99`, schema), + ).Scan(&status, &attempts, &lastError); err != nil { + panic(err) + } + lastErrTxt := "" + if lastError != nil { + lastErrTxt = *lastError + } + fmt.Printf("upload 99: status=%q attempts=%d last_error=%q\n", status, attempts, lastErrTxt) + + // Output: + // upload 99: status="cancelled" attempts=1 last_error="upload rejected: invalid content" +} diff --git a/example_fail_job_within_tx_test.go b/example_fail_job_within_tx_test.go new file mode 100644 index 00000000..6d89eea7 --- /dev/null +++ b/example_fail_job_within_tx_test.go @@ -0,0 +1,246 @@ +package river_test + +import ( + "context" + "errors" + "fmt" + "log/slog" + "os" + "time" + + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/slogutil" + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivertype" +) + +// UploadFailArgs represents work on a user-initiated upload. A separate +// "uploads" row is created by the web app in the same transaction that +// enqueues this job, and its "status" column is what the web app polls to +// show progress to the user. The job is responsible for keeping that +// external status in sync with River's view of the job's final outcome. +type UploadFailArgs struct { + UploadID int64 `json:"upload_id"` +} + +func (UploadFailArgs) Kind() string { return "example_upload_fail_worker" } + +// UploadFailWorker simulates a worker processing an upload. The work always +// fails (to make the example deterministic). Each attempt increments the +// external "uploads.attempts" counter so the web app can show progress. +// On the final attempt, the external "uploads.status" is promoted to +// "failed" so the user sees a permanent failure; on non-final attempts, +// status stays "working". +type UploadFailWorker struct { + river.WorkerDefaults[UploadFailArgs] + + dbPool *pgxpool.Pool + schema string +} + +// NextRetry retries immediately so the example finishes quickly. +func (w *UploadFailWorker) NextRetry(*river.Job[UploadFailArgs]) time.Time { + return time.Now() +} + +// Work uses a named return value (err) plus a single deferred failure +// handler. The "real" body below just returns any error it encounters - +// persistence is handled in one place by the defer. The attempts counter +// is incremented up front (outside the defer) so it happens on every run. +func (w *UploadFailWorker) Work(ctx context.Context, job *river.Job[UploadFailArgs]) (err error) { + tx, err := w.dbPool.Begin(ctx) + if err != nil { + // return err #1: could not begin a tx. Nothing has been done; + // River will retry per its policy. + return err + } + // Rolls back if nothing else commits. No-op after a successful commit. + defer tx.Rollback(ctx) + + // Always increment the external "attempts" counter at the start of a + // run so the web app can display progress across retries. + if _, err = tx.Exec(ctx, + fmt.Sprintf(`UPDATE %q.uploads SET attempts = attempts + 1 WHERE id = $1`, w.schema), + job.Args.UploadID, + ); err != nil { + // return err #2: the external UPDATE failed. Tx rolls back; River retries. + return err + } + + // One deferred handler for both success and failure. On success + // (err == nil) it just commits the tx. On failure it records the + // error via JobFailTx, synchronizes the external status with + // River's decision (retry vs final), commits, and clears err so + // River's executor doesn't record a redundant AttemptError. + defer func() { + if err == nil { + if commitErr := tx.Commit(ctx); commitErr != nil { + err = commitErr + } + return + } + + // Record the failure atomically with the attempts increment. + // JobFailTx picks retryable, available, or discarded based on + // how many attempts remain, using the same retry-policy logic + // the executor's own error path uses. + jobAfter, failErr := river.JobFailTx[*riverpgxv5.Driver](ctx, tx, job, err) + if failErr != nil { + // JobFailTx itself failed (rare - e.g. JSON marshal of + // metadata updates, or a DB error). Tx rolls back and the + // executor's own error path will record an AttemptError for + // the original err. Surface both via errors.Join. + err = errors.Join(err, failErr) + return + } + + // Only promote the external status to "failed" on the final + // attempt. On non-final attempts the status stays "working" + // (only the attempts counter changed) and the user sees the + // job as still in progress. + if jobAfter.State == rivertype.JobStateDiscarded { + if _, syncErr := tx.Exec(ctx, + fmt.Sprintf(`UPDATE %q.uploads SET status = 'failed', last_error = $1 WHERE id = $2`, w.schema), + err.Error(), job.Args.UploadID, + ); syncErr != nil { + err = errors.Join(err, syncErr) + return + } + } + + if commitErr := tx.Commit(ctx); commitErr != nil { + err = errors.Join(err, commitErr) + return + } + + // Leave err set and let it propagate back to River. The failure + // is already recorded on the job and the external row is in sync, + // so this isn't about persisting the error - it's about giving + // any registered HookWorkEnd hooks (and ErrorHandler) a chance + // to still observe the error, log/trace it, and merge additional + // metadata (e.g. via river.RecordOutput). Metadata merges on the + // executor's subsequent UPDATE are *not* guarded by state, so + // those land asynchronously after our tx commits. Hooks cannot + // change the job's final state or append a second AttemptError + // because those columns in that UPDATE are guarded by state = + // 'running', which no longer holds. + }() + + // A real worker does its actual work here. On success it would call + // river.JobCompleteTx and then `return nil`; the deferred handler + // commits the tx. Any error returned is handed to the defer. For + // this deterministic example we simulate a transient failure: + return errors.New("transient upstream failure") +} + +// Example_failJobWithinTx demonstrates how to transactionally record a job +// failure while keeping an external "uploads" row in sync. The example runs +// the job with MaxAttempts=2 so you can see both paths: after the first +// failure the external status stays "working" (and attempts=1); after the +// final failure the status flips to "failed" (and attempts=2). +func Example_failJobWithinTx() { + ctx := context.Background() + + dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL()) + if err != nil { + panic(err) + } + defer dbPool.Close() + + // Isolated schema for this example (only necessary for the example test). + schema := riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil) + + // Stand in for the web app's own "uploads" table. In a real application + // this lives alongside your domain tables. + if _, err := dbPool.Exec(ctx, fmt.Sprintf(` + CREATE TABLE %q.uploads ( + id bigint PRIMARY KEY, + status text NOT NULL, + attempts int NOT NULL DEFAULT 0, + last_error text + )`, schema)); err != nil { + panic(err) + } + // The web app inserts this row in the same tx as the River job (not + // shown here); here we just create it first. + if _, err := dbPool.Exec(ctx, fmt.Sprintf(`INSERT INTO %q.uploads (id, status) VALUES (42, 'working')`, schema)); err != nil { + panic(err) + } + + workers := river.NewWorkers() + river.AddWorker(workers, &UploadFailWorker{dbPool: dbPool, schema: schema}) + + riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{ + Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})), + Queues: map[string]river.QueueConfig{ + river.QueueDefault: {MaxWorkers: 100}, + }, + Schema: schema, + TestOnly: true, // suitable only for use in tests; remove for live environments + Workers: workers, + }) + if err != nil { + panic(err) + } + + // Subscribe to the failed-event stream so the example can wait for the + // job to be fully processed. + subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobFailed) + defer subscribeCancel() + + if err := riverClient.Start(ctx); err != nil { + panic(err) + } + + // MaxAttempts: 2 so the example exercises both the "retry remaining" + // path (status stays working) and the "final attempt" path (status + // becomes failed). + if _, err := riverClient.Insert(ctx, UploadFailArgs{UploadID: 42}, &river.InsertOpts{MaxAttempts: 2}); err != nil { + panic(err) + } + + // Wait for the job to reach a terminal Discarded state. Use a generous + // timeout so the retry loop (attempt 1 -> Available -> fetcher picks it + // up -> attempt 2) completes reliably even under the race detector or + // heavy CI parallelism. + deadline := time.After(30 * time.Second) + for { + select { + case event := <-subscribeChan: + if event.Job.State == rivertype.JobStateDiscarded { + goto done + } + case <-deadline: + panic("timed out waiting for job to be discarded") + } + } +done: + + if err := riverClient.Stop(ctx); err != nil { + panic(err) + } + + var ( + status string + attempts int + lastError *string + ) + if err := dbPool.QueryRow(ctx, + fmt.Sprintf(`SELECT status, attempts, last_error FROM %q.uploads WHERE id = 42`, schema), + ).Scan(&status, &attempts, &lastError); err != nil { + panic(err) + } + lastErrTxt := "" + if lastError != nil { + lastErrTxt = *lastError + } + fmt.Printf("upload 42: status=%q attempts=%d last_error=%q\n", status, attempts, lastErrTxt) + + // Output: + // upload 42: status="failed" attempts=2 last_error="transient upstream failure" +} diff --git a/internal/jobexecutor/job_executor.go b/internal/jobexecutor/job_executor.go index dab44f39..114f9dd9 100644 --- a/internal/jobexecutor/job_executor.go +++ b/internal/jobexecutor/job_executor.go @@ -59,10 +59,22 @@ var errExecutorDefaultCancel = errors.New("context cancelled as executor finishe type contextKey string // ContextKeyMetadataUpdates is the context key for the metadata updates map -// stored in the context. It's exposed from this internal package solely so -// that it can be used in tests for JobCompleteTx. +// stored in the context. It's exposed from this internal package so it can +// be read and mutated by JobCompleteTx, JobFailTx, and JobCancelTx (and by +// their tests), as well as by RecordOutput and the riverlog middleware. const ContextKeyMetadataUpdates contextKey = "river_metadata_updates" +// ContextKeyNextRetry is the context key for the NextRetryFunc closure stored +// in the context by the executor. It's exposed from this internal package +// solely so that it can be used by JobFailTx. +const ContextKeyNextRetry contextKey = "river_next_retry" + +// NextRetryFunc returns the scheduled-at time for the next retry of the +// current job, and whether the job should transition via JobStateAvailable +// (true) instead of JobStateRetryable (false). The Available branch applies +// when the next retry is within the client's scheduler interval. +type NextRetryFunc func(ctx context.Context, now time.Time, jobRow *rivertype.JobRow) (scheduledAt time.Time, useAvailable bool) + // MetadataUpdatesFromWorkContext returns the metadata updates stored in the // work context, if any. // @@ -79,6 +91,25 @@ func MetadataUpdatesFromWorkContext(ctx context.Context) (map[string]any, bool) return typedMetadataUpdates, true } +// NextRetryFromWorkContext returns a function that computes the next retry +// time for the current job using the same logic the executor uses when +// reporting an error from a worker: the worker's NextRetry override (if any), +// falling back to the client retry policy, falling back to the default retry +// policy if the result is in the past. +// +// When run on a non-work context, it returns nil, false. +func NextRetryFromWorkContext(ctx context.Context) (NextRetryFunc, bool) { + nextRetry := ctx.Value(ContextKeyNextRetry) + if nextRetry == nil { + return nil, false + } + typedNextRetry, ok := nextRetry.(NextRetryFunc) + if !ok { + return nil, false + } + return typedNextRetry, true +} + type jobExecutorResult struct { Err error MetadataUpdates map[string]any @@ -175,6 +206,7 @@ func (e *JobExecutor) Execute(ctx context.Context) { func (e *JobExecutor) execute(ctx context.Context) (res *jobExecutorResult) { metadataUpdates := make(map[string]any) ctx = context.WithValue(ctx, ContextKeyMetadataUpdates, metadataUpdates) + ctx = context.WithValue(ctx, ContextKeyNextRetry, NextRetryFunc(e.nextRetry)) defer func() { if recovery := recover(); recovery != nil { @@ -478,6 +510,34 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, return } + nextRetryScheduledAt, useAvailable := e.nextRetry(ctx, now, jobRow) + + // Normally, errored jobs are set `retryable` for the future and it's the + // scheduler's job to set them back to `available` so they can be reworked. + // This isn't friendly for smaller retry times though because it means that + // effectively no retry time smaller than the scheduler's run interval is + // respected. Here, we offset that with a branch that makes jobs immediately + // `available` if their retry was smaller than the scheduler's run interval. + var params *riverdriver.JobSetStateIfRunningParams + if useAvailable { + params = riverdriver.JobSetStateErrorAvailable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates) + } else { + params = riverdriver.JobSetStateErrorRetryable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates) + } + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil { + e.Logger.ErrorContext(ctx, e.Name+": Failed to report error for job", logAttrs...) + } +} + +// nextRetry computes the next retry time for the given job using the worker's +// NextRetry override (if any), the client retry policy, and a default policy +// fallback when the result is in the past. It also returns useAvailable=true +// when the retry is within the scheduler interval, signaling that the job +// should transition directly to JobStateAvailable instead of JobStateRetryable. +// +// This is used by reportError when an error is returned from a worker and by +// JobFailTx via NextRetryFromWorkContext. +func (e *JobExecutor) nextRetry(ctx context.Context, now time.Time, jobRow *rivertype.JobRow) (time.Time, bool) { var nextRetryScheduledAt time.Time if e.WorkUnit != nil { nextRetryScheduledAt = e.WorkUnit.NextRetry() @@ -495,21 +555,8 @@ func (e *JobExecutor) reportError(ctx context.Context, jobRow *rivertype.JobRow, nextRetryScheduledAt = e.DefaultClientRetryPolicy.NextRetry(jobRow) } - // Normally, errored jobs are set `retryable` for the future and it's the - // scheduler's job to set them back to `available` so they can be reworked. - // This isn't friendly for smaller retry times though because it means that - // effectively no retry time smaller than the scheduler's run interval is - // respected. Here, we offset that with a branch that makes jobs immediately - // `available` if their retry was smaller than the scheduler's run interval. - var params *riverdriver.JobSetStateIfRunningParams - if nextRetryScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval { - params = riverdriver.JobSetStateErrorAvailable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates) - } else { - params = riverdriver.JobSetStateErrorRetryable(jobRow.ID, nextRetryScheduledAt, errData, metadataUpdates) - } - if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil { - e.Logger.ErrorContext(ctx, e.Name+": Failed to report error for job", logAttrs...) - } + useAvailable := nextRetryScheduledAt.Sub(e.Time.Now()) <= e.SchedulerInterval + return nextRetryScheduledAt, useAvailable } type withJobsAndErrorsByID interface { diff --git a/job_cancel_tx.go b/job_cancel_tx.go new file mode 100644 index 00000000..7d35db1a --- /dev/null +++ b/job_cancel_tx.go @@ -0,0 +1,127 @@ +package river + +import ( + "context" + "encoding/json" + "errors" + "time" + + "github.com/riverqueue/river/internal/execution" + "github.com/riverqueue/river/internal/jobexecutor" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivertype" +) + +// JobCancelTx marks the job as cancelled as part of transaction tx, appending +// jobErr as an AttemptError on the job. If tx is rolled back, the cancellation +// will be as well. The job will not be retried regardless of how many attempts +// remain, mirroring the behavior of returning JobCancel(jobErr) from a +// Worker's Work method but keeping the state change in sync with other +// database writes in tx. +// +// The function needs to know the type of the River database driver, which is +// the same as the one in use by Client, but the other generic parameters can be +// inferred. An invocation should generally look like: +// +// _, err := river.JobCancelTx[*riverpgxv5.Driver](ctx, tx, job, jobErr) +// if err != nil { +// // handle error +// } +// +// Returns the updated, cancelled job. +func JobCancelTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs], jobErr error) (*Job[TArgs], error) { + if job.State != rivertype.JobStateRunning { + return nil, errors.New("job must be running") + } + + client := ClientFromContext[TTx](ctx) + if client == nil { + return nil, errors.New("client not found in context, can only work within a River worker") + } + + metadataUpdatesBytes, err := marshalMetadataUpdatesFromContext(ctx) + if err != nil { + return nil, err + } + + // Wrap jobErr in JobCancel so the recorded AttemptError.Error matches + // what the executor persists for a worker returning river.JobCancel(err). + // If the caller already passed a JobCancelError (e.g. via river.JobCancel + // themselves), keep it as-is to avoid double-wrapping. + if !errors.Is(jobErr, &rivertype.JobCancelError{}) { + jobErr = rivertype.JobCancel(jobErr) + } + + now := client.baseService.Time.Now() + errData, err := marshalAttemptError(job.Attempt, now, jobErr) + if err != nil { + return nil, err + } + + driver := client.Driver() + pilot := client.Pilot() + execTx := driver.UnwrapExecutor(tx) + + params := riverdriver.JobSetStateCancelled(job.ID, now, errData, metadataUpdatesBytes) + rows, err := pilot.JobSetStateIfRunningMany(ctx, execTx, setStateIfRunningManyParamsFromOne(client.config.Schema, params)) + if err != nil { + return nil, err + } + if len(rows) == 0 { + if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker { + panic("to use JobCancelTx in a rivertest.Worker, the job must be inserted into the database first") + } + + return nil, rivertype.ErrNotFound + } + updatedJob := &Job[TArgs]{JobRow: rows[0]} + + if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil { + return nil, err + } + + return updatedJob, nil +} + +// marshalMetadataUpdatesFromContext extracts metadata updates from the +// jobexecutor work context and JSON-marshals them for a JobSetState* call. +// Returns nil bytes when there are no updates. This is shared logic for +// JobCompleteTx, JobFailTx, and JobCancelTx. +func marshalMetadataUpdatesFromContext(ctx context.Context) ([]byte, error) { + metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) + if !hasMetadataUpdates || len(metadataUpdates) == 0 { + return nil, nil + } + return json.Marshal(metadataUpdates) +} + +// marshalAttemptError builds and JSON-marshals a rivertype.AttemptError from +// jobErr. A nil jobErr is allowed and produces AttemptError.Error == "". +func marshalAttemptError(attempt int, now time.Time, jobErr error) ([]byte, error) { + errStr := "" + if jobErr != nil { + errStr = jobErr.Error() + } + return json.Marshal(rivertype.AttemptError{ + At: now, + Attempt: attempt, + Error: errStr, + }) +} + +// setStateIfRunningManyParamsFromOne wraps a single JobSetStateIfRunningParams +// into the "many" variant used by pilot.JobSetStateIfRunningMany. Shared by +// JobCompleteTx, JobFailTx, and JobCancelTx. +func setStateIfRunningManyParamsFromOne(schema string, params *riverdriver.JobSetStateIfRunningParams) *riverdriver.JobSetStateIfRunningManyParams { + return &riverdriver.JobSetStateIfRunningManyParams{ + ID: []int64{params.ID}, + Attempt: []*int{params.Attempt}, + ErrData: [][]byte{params.ErrData}, + FinalizedAt: []*time.Time{params.FinalizedAt}, + MetadataDoMerge: []bool{params.MetadataDoMerge}, + MetadataUpdates: [][]byte{params.MetadataUpdates}, + ScheduledAt: []*time.Time{params.ScheduledAt}, + Schema: schema, + State: []rivertype.JobState{params.State}, + } +} diff --git a/job_cancel_tx_test.go b/job_cancel_tx_test.go new file mode 100644 index 00000000..80285665 --- /dev/null +++ b/job_cancel_tx_test.go @@ -0,0 +1,183 @@ +package river + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/internal/execution" + "github.com/riverqueue/river/internal/jobexecutor" + "github.com/riverqueue/river/internal/rivercommon" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivertype" +) + +func TestJobCancelTx(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + type testBundle struct { + client *Client[pgx.Tx] + exec riverdriver.Executor + tx pgx.Tx + } + + setup := func(ctx context.Context, t *testing.T) (context.Context, *testBundle) { + t.Helper() + + tx := riverdbtest.TestTxPgx(ctx, t) + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riversharedtest.Logger(t), + }) + require.NoError(t, err) + ctx = context.WithValue(ctx, rivercommon.ContextKeyClient{}, client) + + return ctx, &testBundle{ + client: client, + exec: riverpgxv5.New(nil).UnwrapExecutor(tx), + tx: tx, + } + } + + t.Run("CancelsJob", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + cancelErr := errors.New("cancelled by user") + cancelledJob, err := JobCancelTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, cancelErr) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, cancelledJob.State) + require.WithinDuration(t, time.Now(), *cancelledJob.FinalizedAt, 2*time.Second) + require.Len(t, cancelledJob.Errors, 1) + require.Equal(t, "JobCancelError: cancelled by user", cancelledJob.Errors[0].Error) + require.Equal(t, job.Attempt, cancelledJob.Errors[0].Attempt) + + updatedJob, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ + ID: job.ID, + Schema: "", + }) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, updatedJob.State) + }) + + t.Run("CancelsJobWithNilError", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + cancelledJob, err := JobCancelTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, nil) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, cancelledJob.State) + require.Len(t, cancelledJob.Errors, 1) + require.Equal(t, "JobCancelError: ", cancelledJob.Errors[0].Error) + }) + + t.Run("CancelsJobWithMetadataUpdates", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + metadataUpdates := map[string]any{"foo": "bar"} + ctx = context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, metadataUpdates) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + cancelledJob, err := JobCancelTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, cancelledJob.State) + + updatedJob, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ + ID: job.ID, + Schema: "", + }) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, updatedJob.State) + require.JSONEq(t, `{"foo":"bar"}`, string(updatedJob.Metadata)) + }) + + t.Run("ErrorIfMetadataMarshallingFails", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + ctx = context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, map[string]any{"foo": make(chan int)}) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + _, err := JobCancelTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.ErrorContains(t, err, "unsupported type: chan int") + }) + + t.Run("ErrorIfNotRunning", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{}) + + _, err := JobCancelTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.EqualError(t, err, "job must be running") + }) + + t.Run("ErrorIfJobDoesntExist", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateAvailable), + }) + + _, err := bundle.exec.JobDelete(ctx, &riverdriver.JobDeleteParams{ID: job.ID}) + require.NoError(t, err) + + job.State = rivertype.JobStateRunning + _, err = JobCancelTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + + t.Run("PanicsIfCalledInTestWorkerWithoutInsertingJob", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + ctx = context.WithValue(ctx, execution.ContextKeyInsideTestWorker{}, true) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + _, err := bundle.client.JobDeleteTx(ctx, bundle.tx, job.ID) + require.NoError(t, err) + job.State = rivertype.JobStateRunning + + require.PanicsWithValue(t, "to use JobCancelTx in a rivertest.Worker, the job must be inserted into the database first", func() { + _, err := JobCancelTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.NoError(t, err) + }) + }) +} diff --git a/job_complete_tx.go b/job_complete_tx.go index 5c1a75eb..7c30b026 100644 --- a/job_complete_tx.go +++ b/job_complete_tx.go @@ -4,10 +4,8 @@ import ( "context" "encoding/json" "errors" - "time" "github.com/riverqueue/river/internal/execution" - "github.com/riverqueue/river/internal/jobexecutor" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivertype" ) @@ -35,36 +33,17 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx return nil, errors.New("client not found in context, can only work within a River worker") } - driver := client.Driver() - pilot := client.Pilot() - - // extract metadata updates from context - metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx) - hasMetadataUpdates = hasMetadataUpdates && len(metadataUpdates) > 0 - var ( - metadataUpdatesBytes []byte - err error - ) - if hasMetadataUpdates { - metadataUpdatesBytes, err = json.Marshal(metadataUpdates) - if err != nil { - return nil, err - } + metadataUpdatesBytes, err := marshalMetadataUpdatesFromContext(ctx) + if err != nil { + return nil, err } + driver := client.Driver() + pilot := client.Pilot() execTx := driver.UnwrapExecutor(tx) - params := riverdriver.JobSetStateCompleted(job.ID, client.baseService.Time.Now(), nil) - rows, err := pilot.JobSetStateIfRunningMany(ctx, execTx, &riverdriver.JobSetStateIfRunningManyParams{ - ID: []int64{params.ID}, - Attempt: []*int{params.Attempt}, - ErrData: [][]byte{params.ErrData}, - FinalizedAt: []*time.Time{params.FinalizedAt}, - MetadataDoMerge: []bool{hasMetadataUpdates}, - MetadataUpdates: [][]byte{metadataUpdatesBytes}, - ScheduledAt: []*time.Time{params.ScheduledAt}, - Schema: client.config.Schema, - State: []rivertype.JobState{params.State}, - }) + + params := riverdriver.JobSetStateCompleted(job.ID, client.baseService.Time.Now(), metadataUpdatesBytes) + rows, err := pilot.JobSetStateIfRunningMany(ctx, execTx, setStateIfRunningManyParamsFromOne(client.config.Schema, params)) if err != nil { return nil, err } diff --git a/job_fail_tx.go b/job_fail_tx.go new file mode 100644 index 00000000..2924298f --- /dev/null +++ b/job_fail_tx.go @@ -0,0 +1,99 @@ +package river + +import ( + "context" + "encoding/json" + "errors" + + "github.com/riverqueue/river/internal/execution" + "github.com/riverqueue/river/internal/jobexecutor" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivertype" +) + +// JobFailTx records jobErr as an attempt error and transitions the job to a +// failure state as part of transaction tx. If more attempts remain, the job +// is set to JobStateRetryable (or JobStateAvailable if the next retry is +// within the client's scheduler interval). If this was the final attempt +// (job.Attempt >= job.MaxAttempts), the job is set to JobStateDiscarded. If +// tx is rolled back, the state change will be as well. +// +// The returned job reflects the new state. Callers that keep external state +// in sync with job outcomes can inspect updatedJob.State - a state of +// JobStateDiscarded (or similar terminal state) indicates the job will not +// run again and any follow-up work (e.g. marking a web-app row "failed") can +// proceed inside the same transaction. +// +// The function needs to know the type of the River database driver, which is +// the same as the one in use by Client, but the other generic parameters can +// be inferred. An invocation should generally look like: +// +// updatedJob, err := river.JobFailTx[*riverpgxv5.Driver](ctx, tx, job, jobErr) +// if err != nil { +// // handle error +// } +// if updatedJob.State == rivertype.JobStateDiscarded { +// // job will not be retried; update external state accordingly +// } +// +// Returns the updated, failed job. +func JobFailTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx context.Context, tx TTx, job *Job[TArgs], jobErr error) (*Job[TArgs], error) { + if job.State != rivertype.JobStateRunning { + return nil, errors.New("job must be running") + } + + client := ClientFromContext[TTx](ctx) + if client == nil { + return nil, errors.New("client not found in context, can only work within a River worker") + } + + metadataUpdatesBytes, err := marshalMetadataUpdatesFromContext(ctx) + if err != nil { + return nil, err + } + + now := client.baseService.Time.Now() + errData, err := marshalAttemptError(job.Attempt, now, jobErr) + if err != nil { + return nil, err + } + + var params *riverdriver.JobSetStateIfRunningParams + if job.Attempt >= job.MaxAttempts { + params = riverdriver.JobSetStateDiscarded(job.ID, now, errData, metadataUpdatesBytes) + } else { + nextRetryFunc, ok := jobexecutor.NextRetryFromWorkContext(ctx) + if !ok { + return nil, errors.New("next retry function not found in context, can only work within a River worker") + } + scheduledAt, useAvailable := nextRetryFunc(ctx, now, job.JobRow) + if useAvailable { + params = riverdriver.JobSetStateErrorAvailable(job.ID, scheduledAt, errData, metadataUpdatesBytes) + } else { + params = riverdriver.JobSetStateErrorRetryable(job.ID, scheduledAt, errData, metadataUpdatesBytes) + } + } + + driver := client.Driver() + pilot := client.Pilot() + execTx := driver.UnwrapExecutor(tx) + + rows, err := pilot.JobSetStateIfRunningMany(ctx, execTx, setStateIfRunningManyParamsFromOne(client.config.Schema, params)) + if err != nil { + return nil, err + } + if len(rows) == 0 { + if _, isInsideTestWorker := ctx.Value(execution.ContextKeyInsideTestWorker{}).(bool); isInsideTestWorker { + panic("to use JobFailTx in a rivertest.Worker, the job must be inserted into the database first") + } + + return nil, rivertype.ErrNotFound + } + updatedJob := &Job[TArgs]{JobRow: rows[0]} + + if err := json.Unmarshal(updatedJob.EncodedArgs, &updatedJob.Args); err != nil { + return nil, err + } + + return updatedJob, nil +} diff --git a/job_fail_tx_integration_test.go b/job_fail_tx_integration_test.go new file mode 100644 index 00000000..d05ff775 --- /dev/null +++ b/job_fail_tx_integration_test.go @@ -0,0 +1,553 @@ +package river + +import ( + "context" + "errors" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/internal/riverinternaltest/retrypolicytest" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivertype" +) + +// TestJobFailTxIntegration tests edge cases: every combination of: +// (Work returns nil / err / panic / JobCancelError) * +// (Tx fn called: none / JobFailTx / JobCancelTx / JobCompleteTx) * +// (tx commit or rollback), asserting the job's final DB state. +func TestJobFailTxIntegration(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type testBundle struct { + dbPool *pgxpool.Pool + subscribeChan <-chan *Event + } + + setup := func(t *testing.T, config *Config) (*Client[pgx.Tx], *testBundle) { + t.Helper() + + var ( + dbPool = riversharedtest.DBPool(ctx, t) + driver = riverpgxv5.New(dbPool) + schema = riverdbtest.TestSchema(ctx, t, driver, nil) + ) + config.Schema = schema + // Use a predictable retry policy so a retryable job isn't moved back + // to available by the scheduler before assertions run. + config.RetryPolicy = &retrypolicytest.RetryPolicySlow{} + + client := newTestClient(t, dbPool, config) + startClient(ctx, t, client) + + subscribeChan, cancel := client.Subscribe(EventKindJobCancelled, EventKindJobCompleted, EventKindJobFailed) + t.Cleanup(cancel) + + return client, &testBundle{ + dbPool: dbPool, + subscribeChan: subscribeChan, + } + } + + // Work returns nil, no Tx fn called. + t.Run("NilReturnNoTxCallCompletes", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + + client, bundle := setup(t, config) + + insertRes, err := client.Insert(ctx, JobArgs{}, nil) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateCompleted, event.Job.State) + }) + + // Work returns nil, JobFailTx committed (non-final attempt). + t.Run("NilReturnFailTxCommittedNonFinal", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + _, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("boom")) + require.NoError(t, err) + + return tx.Commit(ctx) + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateRetryable, event.Job.State) + + reloaded, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, reloaded.State) + require.Len(t, reloaded.Errors, 1) + require.Equal(t, "boom", reloaded.Errors[0].Error) + }) + + // Final attempt -> Discarded. + t.Run("NilReturnFailTxCommittedFinalAttempt", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + _, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("final boom")) + require.NoError(t, err) + + return tx.Commit(ctx) + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 1}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateDiscarded, event.Job.State) + require.NotNil(t, event.Job.FinalizedAt) + }) + + // Work returns nil, JobFailTx called but tx rolled back. + //nolint:dupl // matrix test; near-identical structure is the point + t.Run("NilReturnFailTxRolledBack", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + + _, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("will be rolled back")) + require.NoError(t, err) + + return tx.Rollback(ctx) + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 1}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateCompleted, event.Job.State) + }) + + // Work returns nil, JobCancelTx committed. + t.Run("NilReturnCancelTxCommitted", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + _, err = JobCancelTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("abort")) + require.NoError(t, err) + + return tx.Commit(ctx) + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateCancelled, event.Job.State) + }) + + // Work returns nil, JobCancelTx called but tx rolled back. + //nolint:dupl // matrix test; near-identical structure is the point + t.Run("NilReturnCancelTxRolledBack", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + + _, err = JobCancelTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("will be rolled back")) + require.NoError(t, err) + + return tx.Rollback(ctx) + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateCompleted, event.Job.State) + }) + + // Work returns err, JobFailTx committed. + // JobFailTx's state wins; the executor's subsequent state/errors + // writes are guarded by state = 'running' and don't take effect. + t.Run("ErrReturnFailTxCommitted", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + _, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("from JobFailTx")) + require.NoError(t, err) + + if err := tx.Commit(ctx); err != nil { + return err + } + return errors.New("post-tx err") + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateRetryable, event.Job.State) + + reloaded, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, reloaded.State) + require.Len(t, reloaded.Errors, 1) + // The error from JobFailTx wins; the subsequent work-returned + // error is not appended because the executor's errors-update is + // guarded by state = 'running'. + require.Equal(t, "from JobFailTx", reloaded.Errors[0].Error) + }) + + // Work returns err, JobFailTx called but tx rolled back. + // Executor's error path records the work error as usual. + //nolint:dupl // matrix test; near-identical structure is the point + t.Run("ErrReturnFailTxRolledBack", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + + _, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("ignored by rollback")) + require.NoError(t, err) + + if err := tx.Rollback(ctx); err != nil { + return err + } + return errors.New("from worker") + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateRetryable, event.Job.State) + require.Len(t, event.Job.Errors, 1) + require.Equal(t, "from worker", event.Job.Errors[0].Error) + }) + + // Work returns err, JobCancelTx committed. + t.Run("ErrReturnCancelTxCommitted", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + _, err = JobCancelTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("from JobCancelTx")) + require.NoError(t, err) + + if err := tx.Commit(ctx); err != nil { + return err + } + return errors.New("post-tx err") + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateCancelled, event.Job.State) + }) + + // Work returns err, JobCancelTx called but tx rolled back. + //nolint:dupl // matrix test; near-identical structure is the point + t.Run("ErrReturnCancelTxRolledBack", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + + _, err = JobCancelTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("ignored by rollback")) + require.NoError(t, err) + + if err := tx.Rollback(ctx); err != nil { + return err + } + return errors.New("from worker") + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateRetryable, event.Job.State) + require.Len(t, event.Job.Errors, 1) + require.Equal(t, "from worker", event.Job.Errors[0].Error) + }) + + // Work panics after JobFailTx committed. + t.Run("PanicAfterFailTxCommitted", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + _, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("from JobFailTx (pre-panic)")) + require.NoError(t, err) + + if err := tx.Commit(ctx); err != nil { + return err + } + panic("boom") + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateRetryable, event.Job.State) + + reloaded, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, reloaded.State) + require.Len(t, reloaded.Errors, 1) + require.Equal(t, "from JobFailTx (pre-panic)", reloaded.Errors[0].Error) + }) + + // Work returns JobCancelError after calling JobFailTx. + // DB state ends at whatever JobFailTx set (retryable); the + // executor's cancel branch emits JobSetStateCancelled but the + // state change is guarded by state = 'running' and doesn't apply. + t.Run("JobCancelErrorReturnedAfterFailTxCommitted", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var bundle *testBundle + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + _, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("from JobFailTx")) + require.NoError(t, err) + + if err := tx.Commit(ctx); err != nil { + return err + } + return JobCancel(errors.New("ignored cancel")) + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateRetryable, event.Job.State) + }) + + // JobFailTx then JobCompleteTx in the same tx. The second call + // sees the row in a non-running state: pilot.JobSetStateIfRunningMany's + // SQL returns the row unchanged (IfRunning check fails in the SQL), so + // JobCompleteTx returns a job whose State is *not* Completed, without + // error. The DB state after commit reflects what JobFailTx set. + t.Run("FailTxThenCompleteTxSameTx", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, "") + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + var ( + bundle *testBundle + secondCallJobState rivertype.JobState + secondCallErr error + secondCallDone = make(chan struct{}) + ) + AddWorker(config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + defer close(secondCallDone) + + tx, err := bundle.dbPool.Begin(ctx) + require.NoError(t, err) + defer tx.Rollback(ctx) + + _, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("first")) + require.NoError(t, err) + + var secondJob *Job[JobArgs] + secondJob, secondCallErr = JobCompleteTx[*riverpgxv5.Driver](ctx, tx, job) + if secondJob != nil { + secondCallJobState = secondJob.State + } + + return tx.Commit(ctx) + })) + + client, b := setup(t, config) + bundle = b + + insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 3}) + require.NoError(t, err) + + event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan) + require.Equal(t, insertRes.Job.ID, event.Job.ID) + require.Equal(t, rivertype.JobStateRetryable, event.Job.State) + + <-secondCallDone + require.NoError(t, secondCallErr) + require.Equal(t, rivertype.JobStateRetryable, secondCallJobState) + + reloaded, err := client.JobGet(ctx, insertRes.Job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, reloaded.State) + }) +} diff --git a/job_fail_tx_test.go b/job_fail_tx_test.go new file mode 100644 index 00000000..54593257 --- /dev/null +++ b/job_fail_tx_test.go @@ -0,0 +1,260 @@ +package river + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stretchr/testify/require" + + "github.com/riverqueue/river/internal/execution" + "github.com/riverqueue/river/internal/jobexecutor" + "github.com/riverqueue/river/internal/rivercommon" + "github.com/riverqueue/river/riverdbtest" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivershared/riversharedtest" + "github.com/riverqueue/river/rivershared/testfactory" + "github.com/riverqueue/river/rivershared/util/ptrutil" + "github.com/riverqueue/river/rivershared/util/testutil" + "github.com/riverqueue/river/rivertype" +) + +func TestJobFailTx(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type JobArgs struct { + testutil.JobArgsReflectKind[JobArgs] + } + + type testBundle struct { + client *Client[pgx.Tx] + exec riverdriver.Executor + tx pgx.Tx + } + + // retryFunc injects a NextRetryFunc into the context so JobFailTx can use + // it like the executor would. Tests can override scheduledAt/useAvailable + // to exercise the Retryable vs Available branch. + injectRetry := func(ctx context.Context, scheduledAt time.Time, useAvailable bool) context.Context { + fn := jobexecutor.NextRetryFunc(func(ctx context.Context, now time.Time, jobRow *rivertype.JobRow) (time.Time, bool) { + return scheduledAt, useAvailable + }) + return context.WithValue(ctx, jobexecutor.ContextKeyNextRetry, fn) + } + + setup := func(ctx context.Context, t *testing.T) (context.Context, *testBundle) { + t.Helper() + + tx := riverdbtest.TestTxPgx(ctx, t) + client, err := NewClient(riverpgxv5.New(nil), &Config{ + Logger: riversharedtest.Logger(t), + }) + require.NoError(t, err) + ctx = context.WithValue(ctx, rivercommon.ContextKeyClient{}, client) + + return ctx, &testBundle{ + client: client, + exec: riverpgxv5.New(nil).UnwrapExecutor(tx), + tx: tx, + } + } + + t.Run("RetryableOnNonFinalAttempt", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + nextRetry := time.Now().Add(time.Hour) + ctx = injectRetry(ctx, nextRetry, false) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(1), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + failErr := errors.New("temporary failure") + failedJob, err := JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, failErr) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, failedJob.State) + require.WithinDuration(t, nextRetry, failedJob.ScheduledAt, time.Second) + require.Nil(t, failedJob.FinalizedAt) + require.Len(t, failedJob.Errors, 1) + require.Equal(t, "temporary failure", failedJob.Errors[0].Error) + require.Equal(t, job.Attempt, failedJob.Errors[0].Attempt) + }) + + t.Run("AvailableOnNonFinalAttemptShortRetry", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + nextRetry := time.Now().Add(50 * time.Millisecond) + ctx = injectRetry(ctx, nextRetry, true) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(1), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + failedJob, err := JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("short retry")) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, failedJob.State) + require.Nil(t, failedJob.FinalizedAt) + }) + + t.Run("DiscardsJobOnFinalAttempt", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + // Final attempt: attempt == max_attempts. No retry fn lookup should + // be needed on the discard path. + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(3), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + failErr := errors.New("permanent failure") + failedJob, err := JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, failErr) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, failedJob.State) + require.WithinDuration(t, time.Now(), *failedJob.FinalizedAt, 2*time.Second) + require.Len(t, failedJob.Errors, 1) + require.Equal(t, "permanent failure", failedJob.Errors[0].Error) + + reloaded, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, reloaded.State) + }) + + t.Run("FailsJobWithNilError", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(3), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + failedJob, err := JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, nil) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, failedJob.State) + require.Len(t, failedJob.Errors, 1) + require.Equal(t, "", failedJob.Errors[0].Error) + }) + + t.Run("FailsJobWithMetadataUpdates", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + metadataUpdates := map[string]any{"foo": "bar"} + ctx = context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, metadataUpdates) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(3), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + failedJob, err := JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, failedJob.State) + + reloaded, err := bundle.exec.JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: job.ID}) + require.NoError(t, err) + require.JSONEq(t, `{"foo":"bar"}`, string(reloaded.Metadata)) + }) + + t.Run("ErrorIfMetadataMarshallingFails", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + ctx = context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, map[string]any{"foo": make(chan int)}) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(3), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + _, err := JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.ErrorContains(t, err, "unsupported type: chan int") + }) + + t.Run("ErrorIfNotRunning", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{}) + + _, err := JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.EqualError(t, err, "job must be running") + }) + + t.Run("ErrorIfNoNextRetryFuncOnNonFinalAttempt", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(1), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateRunning), + }) + + _, err := JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.EqualError(t, err, "next retry function not found in context, can only work within a River worker") + }) + + t.Run("ErrorIfJobDoesntExist", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(3), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateAvailable), + }) + + _, err := bundle.exec.JobDelete(ctx, &riverdriver.JobDeleteParams{ID: job.ID}) + require.NoError(t, err) + + job.State = rivertype.JobStateRunning + _, err = JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + + t.Run("PanicsIfCalledInTestWorkerWithoutInsertingJob", func(t *testing.T) { + t.Parallel() + + ctx, bundle := setup(ctx, t) + ctx = context.WithValue(ctx, execution.ContextKeyInsideTestWorker{}, true) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ + Attempt: ptrutil.Ptr(3), + MaxAttempts: ptrutil.Ptr(3), + State: ptrutil.Ptr(rivertype.JobStateAvailable), + }) + _, err := bundle.client.JobDeleteTx(ctx, bundle.tx, job.ID) + require.NoError(t, err) + job.State = rivertype.JobStateRunning + + require.PanicsWithValue(t, "to use JobFailTx in a rivertest.Worker, the job must be inserted into the database first", func() { + _, err := JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, &Job[JobArgs]{JobRow: job}, errors.New("oops")) + require.NoError(t, err) + }) + }) +} diff --git a/rivertest/worker_test.go b/rivertest/worker_test.go index 1e0d5680..74bcde9d 100644 --- a/rivertest/worker_test.go +++ b/rivertest/worker_test.go @@ -540,6 +540,54 @@ func TestWorker_WorkJob(t *testing.T) { require.Equal(t, rivertype.JobStateCompleted, updatedJob.State) }) + t.Run("JobCancelTxWithInsertedJobRow", func(t *testing.T) { + t.Parallel() + + testWorker, bundle := setup(t) + + args := testArgs{} + insertRes, err := bundle.client.InsertTx(ctx, bundle.tx, args, nil) + require.NoError(t, err) + + bundle.workFunc = func(ctx context.Context, job *river.Job[testArgs]) error { + _, err := river.JobCancelTx[*riverpgxv5.Driver](ctx, bundle.tx, job, errors.New("cancelled")) + require.NoError(t, err) + return nil + } + + res, err := testWorker.WorkJob(ctx, t, bundle.tx, insertRes.Job) + require.NoError(t, err) + require.Equal(t, river.EventKindJobCancelled, res.EventKind) + + updatedJob, err := bundle.driver.UnwrapExecutor(bundle.tx).JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: insertRes.Job.ID, Schema: ""}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, updatedJob.State) + }) + + t.Run("JobFailTxWithInsertedJobRow", func(t *testing.T) { + t.Parallel() + + testWorker, bundle := setup(t) + + args := testArgs{} + insertRes, err := bundle.client.InsertTx(ctx, bundle.tx, args, &river.InsertOpts{MaxAttempts: 1}) + require.NoError(t, err) + + bundle.workFunc = func(ctx context.Context, job *river.Job[testArgs]) error { + _, err := river.JobFailTx[*riverpgxv5.Driver](ctx, bundle.tx, job, errors.New("failed")) + require.NoError(t, err) + return nil + } + + res, err := testWorker.WorkJob(ctx, t, bundle.tx, insertRes.Job) + require.NoError(t, err) + require.Equal(t, river.EventKindJobFailed, res.EventKind) + + updatedJob, err := bundle.driver.UnwrapExecutor(bundle.tx).JobGetByID(ctx, &riverdriver.JobGetByIDParams{ID: insertRes.Job.ID, Schema: ""}) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, updatedJob.State) + }) + t.Run("ErrorsWhenGivenAlreadyCompletedJob", func(t *testing.T) { t.Parallel()