diff --git a/howto/workers.md b/howto/workers.md index 8ad82ea..0298041 100644 --- a/howto/workers.md +++ b/howto/workers.md @@ -169,6 +169,7 @@ The handler receives a `context.Context` for cancellation and a `*WorkerInfo` fo | Return value | Long-running worker (no `Every`) | Periodic worker (with `Every`) | |---|---|---| | `return nil` | Worker stops permanently | Cycle succeeded — next tick fires | +| `return workers.ErrSkipTick` | Treated like `return error` (not meaningful) | Tick skipped — next tick fires normally | | `return error` | Restarts with backoff (if restart enabled) | Restarts with backoff (if restart enabled) | | `return ctx.Err()` | Clean shutdown | Clean shutdown | | `return workers.ErrDoNotRestart` | Permanent stop | Permanent stop | @@ -177,6 +178,26 @@ The handler receives a `context.Context` for cancellation and a `*WorkerInfo` fo **Periodic workers** run the handler once per tick. Return nil for success (next tick fires normally). Return an error to trigger restart. The `Every` wrapper manages the tick loop — your handler just processes one cycle. +### ErrSkipTick + +Return `workers.ErrSkipTick` from a periodic handler when a tick fails transiently (DB timeout, network blip) and you want to skip it without triggering a full restart. The timer continues and the next tick fires normally: + +```go +func pollDatabase(ctx context.Context, info *workers.WorkerInfo) error { + rows, err := db.QueryContext(ctx, "SELECT ...") + if err != nil { + if ctx.Err() != nil { + return ctx.Err() // context cancelled — clean shutdown + } + return workers.ErrSkipTick // transient failure, try again next interval + } + defer rows.Close() + return processRows(rows) +} +``` + +Without `ErrSkipTick`, you'd have to swallow errors by returning nil and track them internally. `ErrSkipTick` gives the framework visibility into skipped ticks while keeping the timer going. + ### ErrDoNotRestart Return `workers.ErrDoNotRestart` from a handler to signal permanent completion — the supervisor will not restart the worker even though restart is enabled by default. `ChannelWorker` and `BatchChannelWorker` return this automatically when their channel is closed. @@ -197,7 +218,7 @@ func processQueue(ctx context.Context, info *workers.WorkerInfo) error { |--------|-------------|---------| | `HandlerFunc(fn)` | Set handler from a plain function | — | | `Handler(h)` | Set handler from a `CycleHandler` struct | — | -| `WithRestart(false)` | Disable restart (one-shot worker) | `true` (restart with backoff) | +| `WithRestart(false)` | Disable restart (one-shot worker). Periodic workers should generally keep the default; use `ErrSkipTick`/`ErrDoNotRestart` instead. | `true` (restart with backoff) | | `Every(duration)` | Run periodically on a fixed interval | — | | `WithJitter(percent)` | Randomize tick interval by ±percent (requires `Every`) | inherit run-level | | `WithInitialDelay(d)` | Delay first tick (requires `Every`) | — | @@ -373,7 +394,9 @@ middleware.Recover(func(name string, v any) { ### Tracing -Creates an OTEL span named `worker::cycle` for each tick. Records errors on the span: +Creates an OTEL span named `worker::cycle` for each tick. Records errors on the span. Worker spans are always sampled regardless of the global sampler — this prevents silent span drops when using `ParentBased(TraceIDRatioBased(...))`, where worker root spans (which have no incoming parent) would otherwise be probabilistically dropped. + +The OTEL trace ID is automatically injected into the log context as `trace` for correlation with your tracing backend. ```go middleware.Tracing() @@ -408,6 +431,18 @@ middleware.DistributedLock(redisLocker, ) ``` +For the common case of logging and skipping, use `WithSkipOnNotAcquired`: + +```go +middleware.DistributedLock(redisLocker, + middleware.WithSkipOnNotAcquired(func(ctx context.Context, name string) { + log.Info(ctx, "msg", "lock held, skipping", "worker", name) + }), +) +``` + +**Caution:** If the `WithOnNotAcquired` callback returns a non-nil error, the framework treats it as a cycle failure — for periodic workers, this triggers restart with backoff. Use `WithSkipOnNotAcquired` or return nil from the callback if you want to skip without restart. + The `Locker` interface: ```go @@ -417,6 +452,8 @@ type Locker interface { } ``` +If your lock implementation already has these two methods with matching signatures, it satisfies `Locker` directly — no adapter needed. + Release uses `context.WithoutCancel` so that context cancellation does not prevent lock cleanup. ### Timeout @@ -468,12 +505,14 @@ Every handler receives a `*WorkerInfo` that carries worker metadata and child ma |--------|-------------| | `GetName() string` | Worker name | | `GetAttempt() int` | Restart attempt (0 on first run) | +| `GetHandler() CycleHandler` | The worker's handler — use type assertion for handler-specific state | | `Add(w *Worker) bool` | Add child worker — returns false if name already exists (no-op) | | `Remove(name string)` | Stop child worker by name | -| `GetChildren() []string` | Names of running child workers | +| `GetChildren() []string` | Names of running child workers (stopped children auto-pruned) | | `GetChild(name string) (Worker, bool)` | Look up a child by name (returns a value copy) | +| `GetChildCount() int` | Number of running children (cheaper than `len(GetChildren())`) | -Use `Worker.GetName()` and `Worker.GetHandler()` to inspect a child. +Use `Worker.GetName()`, `Worker.GetHandler()`, `Worker.GetInterval()`, and `Worker.GetRestartOnFail()` to inspect a child. To replace a running worker, call `Remove` then `Add`. This is not atomic — there is a brief window where the worker is not running. @@ -579,6 +618,45 @@ info.Add(workers.NewWorker("solver").HandlerFunc(makeSolver(newCfg))) Note: `Remove` + `Add` is not atomic — there is a brief window where the worker is not running. +**Automatic cleanup:** When a child permanently stops (see the [return value table](#handler-return-values) for what triggers permanent stop), it is automatically excluded from `GetChildren` and `GetChild`. The underlying [suture] supervisor is the source of truth — no manual cleanup needed. Note that there may be a brief delay between the child stopping and the change being visible, as stop events are processed asynchronously. + +### Example: Config change detection via handler + +Instead of maintaining a parallel map to track per-worker state (e.g., config versions), store metadata on your `CycleHandler` implementation and inspect it via `GetChild().GetHandler()` type assertion: + +```go +type solverHandler struct { + version int64 + cfg SolverConfig +} + +func (h *solverHandler) RunCycle(ctx context.Context, info *workers.WorkerInfo) error { + return solve(ctx, h.cfg) +} + +func (h *solverHandler) Close() error { return nil } +``` + +In the reconciler, detect config changes without a parallel tracking map: + +```go +for key, desired := range desiredConfigs { + child, exists := info.GetChild(key) + if exists { + if h, ok := child.GetHandler().(*solverHandler); ok && h.version == desired.version { + continue // config unchanged, skip + } + info.Remove(key) // config changed, replace + } + info.Add(workers.NewWorker(key).Handler(&solverHandler{ + version: desired.version, + cfg: desired.cfg, + })) +} +``` + +`GetChild()` returns a copy of the `Worker` struct, but the handler is stored as a `CycleHandler` interface — use type assertion to access handler-specific fields for change detection or metadata inspection. + ### Example: Fixed children on startup A worker that spawns N consumer goroutines when it starts: