Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Added `JobCancelTx` which marks a running job as cancelled atomically within a caller-supplied transaction, mirroring `JobCompleteTx` for the cancel path. [PR #1219](https://github.com/riverqueue/river/pull/1219)
- Added `JobFailTx` which records an attempt error and transitions the job to retryable, available, or discarded (based on attempts remaining) atomically within a caller-supplied transaction. The returned job's `State` indicates whether the job will retry, letting callers keep external state (e.g. a web-app status row) in sync with the job's outcome. [PR #1219](https://github.com/riverqueue/river/pull/1219)

## [0.35.0] - 2026-04-18

### Changed
Expand Down
88 changes: 88 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7054,6 +7054,94 @@ func Test_Client_JobCompletion(t *testing.T) {
require.Equal(t, rivertype.JobStateCompleted, reloadedJob.State)
require.Equal(t, updatedJob.FinalizedAt, reloadedJob.FinalizedAt)
})

t.Run("JobThatIsCancelledManuallyIsNotTouchedByCompleter", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t, newTestConfig(t, ""))

now := client.baseService.Time.StubNow(time.Now().UTC())

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

var updatedJob *Job[JobArgs]
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
tx, err := bundle.dbPool.Begin(ctx)
require.NoError(t, err)

updatedJob, err = JobCancelTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("no longer needed"))
require.NoError(t, err)

return tx.Commit(ctx)
}))

insertRes, err := client.Insert(ctx, JobArgs{}, nil)
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
require.Equal(t, insertRes.Job.ID, event.Job.ID)
require.Equal(t, rivertype.JobStateCancelled, event.Job.State)
require.NotNil(t, updatedJob)
require.Equal(t, rivertype.JobStateCancelled, updatedJob.State)
require.NotNil(t, event.Job.FinalizedAt)
require.NotNil(t, updatedJob.FinalizedAt)
require.WithinDuration(t, now, *updatedJob.FinalizedAt, time.Microsecond)
require.WithinDuration(t, *updatedJob.FinalizedAt, *event.Job.FinalizedAt, time.Microsecond)

reloadedJob, err := client.JobGet(ctx, insertRes.Job.ID)
require.NoError(t, err)

require.Equal(t, rivertype.JobStateCancelled, reloadedJob.State)
require.Equal(t, updatedJob.FinalizedAt, reloadedJob.FinalizedAt)
require.Len(t, reloadedJob.Errors, 1)
require.Equal(t, "JobCancelError: no longer needed", reloadedJob.Errors[0].Error)
})

t.Run("JobThatIsFailedManuallyIsNotTouchedByCompleter", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t, newTestConfig(t, ""))

now := client.baseService.Time.StubNow(time.Now().UTC())

type JobArgs struct {
testutil.JobArgsReflectKind[JobArgs]
}

var updatedJob *Job[JobArgs]
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
tx, err := bundle.dbPool.Begin(ctx)
require.NoError(t, err)

updatedJob, err = JobFailTx[*riverpgxv5.Driver](ctx, tx, job, errors.New("final failure"))
require.NoError(t, err)

return tx.Commit(ctx)
}))

insertRes, err := client.Insert(ctx, JobArgs{}, &InsertOpts{MaxAttempts: 1})
require.NoError(t, err)

event := riversharedtest.WaitOrTimeout(t, bundle.subscribeChan)
require.Equal(t, insertRes.Job.ID, event.Job.ID)
require.Equal(t, rivertype.JobStateDiscarded, event.Job.State)
require.NotNil(t, updatedJob)
require.Equal(t, rivertype.JobStateDiscarded, updatedJob.State)
require.NotNil(t, event.Job.FinalizedAt)
require.NotNil(t, updatedJob.FinalizedAt)
require.WithinDuration(t, now, *updatedJob.FinalizedAt, time.Microsecond)
require.WithinDuration(t, *updatedJob.FinalizedAt, *event.Job.FinalizedAt, time.Microsecond)

reloadedJob, err := client.JobGet(ctx, insertRes.Job.ID)
require.NoError(t, err)

require.Equal(t, rivertype.JobStateDiscarded, reloadedJob.State)
require.Equal(t, updatedJob.FinalizedAt, reloadedJob.FinalizedAt)
require.Len(t, reloadedJob.Errors, 1)
require.Equal(t, "final failure", reloadedJob.Errors[0].Error)
})
}

type unregisteredJobArgs struct{}
Expand Down
194 changes: 194 additions & 0 deletions example_cancel_job_within_tx_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package river_test

import (
"context"
"errors"
"fmt"
"log/slog"
"os"

"github.com/jackc/pgx/v5/pgxpool"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
)

// UploadCancelArgs represents work on a user-initiated upload that the
// worker may discover to be unrecoverable. A separate "uploads" row is
// created by the web app in the same transaction that enqueues this job;
// the job keeps the external status in sync with River's view of
// the job's outcome.
type UploadCancelArgs struct {
UploadID int64 `json:"upload_id"`
// Valid is used to simulate validation outcomes. When false, the
// worker treats the upload as permanently unprocessable and cancels
// the job rather than retrying.
Valid bool `json:"valid"`
}

func (UploadCancelArgs) Kind() string { return "example_upload_cancel_worker" }

// UploadCancelWorker processes an upload. If the upload is permanently
// invalid, the worker uses JobCancelTx to record the failure: the job will
// not be retried regardless of MaxAttempts. The external "uploads" row's
// status is promoted to "cancelled" in the same transaction so the web
// app sees a consistent final state.
type UploadCancelWorker struct {
river.WorkerDefaults[UploadCancelArgs]

dbPool *pgxpool.Pool
schema string
}

func (w *UploadCancelWorker) Work(ctx context.Context, job *river.Job[UploadCancelArgs]) error {
tx, err := w.dbPool.Begin(ctx)
if err != nil {
// return err #1: could not begin a tx. Nothing has been done yet;
// River will retry per its policy.
return err
}
defer tx.Rollback(ctx)

// Increment the external "attempts" counter so the web app can see
// that the worker ran.
if _, err := tx.Exec(ctx,
fmt.Sprintf(`UPDATE %q.uploads SET attempts = attempts + 1 WHERE id = $1`, w.schema),
job.Args.UploadID,
); err != nil {
// return err #2: external UPDATE failed; tx rolls back. River retries.
return err
}

// Perform validation. If the upload is unrecoverably invalid there's
// no point retrying - cancel the job instead.
if !job.Args.Valid {
validationErr := errors.New("upload rejected: invalid content")

if _, err := tx.Exec(ctx,
fmt.Sprintf(`UPDATE %q.uploads SET status = 'cancelled', last_error = $1 WHERE id = $2`, w.schema),
validationErr.Error(), job.Args.UploadID,
); err != nil {
// return err #3: the external "mark cancelled" UPDATE failed.
// Tx rolls back with everything in it. River will retry the
// job normally - which on the next run will hit the same
// validation error and (if we then reach JobCancelTx
// successfully) cancel for real.
return err
}

if _, err := river.JobCancelTx[*riverpgxv5.Driver](ctx, tx, job, validationErr); err != nil {
// return err #4: JobCancelTx itself errored. Tx rolls back.
return err
}

if err := tx.Commit(ctx); err != nil {
// return err #5: commit failed. Nothing persisted. River retries.
return err
}

// return validationErr #6: the cancellation is recorded and the
// external row is in sync. Return the error up so that any
// registered HookWorkEnd hooks (and ErrorHandler) still see it
// and can log/trace it or add metadata via river.RecordOutput.
// Metadata merges on the executor's subsequent UPDATE are *not*
// guarded by state, so those land asynchronously after our tx
// commits. Hooks and handlers cannot steer the job's outcome at
// this point: the state change and any new AttemptError are
// guarded by state = 'running' in the SQL and won't apply now
// that JobCancelTx has moved the row out of that state.
return validationErr
}

// Normal success path omitted for brevity; a real worker would do its
// work here, optionally call JobCompleteTx to finalize the job within
// this same tx, then commit.
return errors.New("success path not shown in this example")
}

// Example_cancelJobWithinTx demonstrates how to transactionally cancel a
// job when the worker determines that further retries are pointless,
// while keeping an external status row in sync.
func Example_cancelJobWithinTx() {
ctx := context.Background()

dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()

schema := riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil)

if _, err := dbPool.Exec(ctx, fmt.Sprintf(`
CREATE TABLE %q.uploads (
id bigint PRIMARY KEY,
status text NOT NULL,
attempts int NOT NULL DEFAULT 0,
last_error text
)`, schema)); err != nil {
panic(err)
}
if _, err := dbPool.Exec(ctx, fmt.Sprintf(`INSERT INTO %q.uploads (id, status) VALUES (99, 'working')`, schema)); err != nil {
panic(err)
}

workers := river.NewWorkers()
river.AddWorker(workers, &UploadCancelWorker{dbPool: dbPool, schema: schema})

riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Schema: schema,
TestOnly: true,
Workers: workers,
})
if err != nil {
panic(err)
}

// Subscribe to the cancelled-event stream so the example can wait for
// the job to be fully processed.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCancelled)
defer subscribeCancel()

if err := riverClient.Start(ctx); err != nil {
panic(err)
}

// MaxAttempts=5 to make clear the job stops on the first run because
// of the cancel - attempts remaining doesn't matter.
if _, err := riverClient.Insert(ctx, UploadCancelArgs{UploadID: 99, Valid: false}, &river.InsertOpts{MaxAttempts: 5}); err != nil {
panic(err)
}

riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1)

if err := riverClient.Stop(ctx); err != nil {
panic(err)
}

var (
status string
attempts int
lastError *string
)
if err := dbPool.QueryRow(ctx,
fmt.Sprintf(`SELECT status, attempts, last_error FROM %q.uploads WHERE id = 99`, schema),
).Scan(&status, &attempts, &lastError); err != nil {
panic(err)
}
lastErrTxt := ""
if lastError != nil {
lastErrTxt = *lastError
}
fmt.Printf("upload 99: status=%q attempts=%d last_error=%q\n", status, attempts, lastErrTxt)

// Output:
// upload 99: status="cancelled" attempts=1 last_error="upload rejected: invalid content"
}
Loading
Loading