queue is a queue and workflow library with pluggable backends and runtime extensions.
go get github.com/goforj/queueimport (
"context"
"fmt"
"github.com/goforj/queue"
)
func main() {
q, _ := queue.NewWorkerpool(
queue.WithWorkers(2), // optional; default: runtime.NumCPU() (min 1)
)
type EmailPayload struct {
To string `json:"to"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
_ = m.Bind(&payload)
fmt.Println("send to", payload.To)
return nil
})
_ = q.StartWorkers(context.Background())
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
queue.NewJob("emails:send").
Payload(EmailPayload{To: "user@example.com"}),
)
}SQL-backed queues (
sqlite,mysql,postgres) are durable and convenient, but they trade throughput for operational simplicity. They default to1worker, and increasing concurrency may require DB tuning (indexes, connection pool, lock contention). Prefer broker-backed drivers for higher-throughput workloads.
Use root constructors for in-process backends, and driver-module constructors for external backends. See the Driver Constructors API section below for full constructor shapes (New(...) and NewWithConfig(...)).
Driver backends live in separate packages so applications only import/link the optional backend dependencies they actually use (smaller builds, less dependency overhead, cleaner deploys).
package main
import (
"github.com/goforj/queue"
"github.com/goforj/queue/driver/mysqlqueue"
"github.com/goforj/queue/driver/natsqueue"
"github.com/goforj/queue/driver/postgresqueue"
"github.com/goforj/queue/driver/rabbitmqqueue"
"github.com/goforj/queue/driver/redisqueue"
"github.com/goforj/queue/driver/sqlitequeue"
"github.com/goforj/queue/driver/sqsqueue"
)
func main() {
queue.NewSync() // in-process sync
queue.NewWorkerpool() // in-process worker pool
queue.NewNull() // drop-only / disabled mode
sqlitequeue.New("file:queue.db?_busy_timeout=5000") // SQL durable queue (SQLite)
mysqlqueue.New("user:pass@tcp(127.0.0.1:3306)/app") // SQL durable queue (MySQL)
postgresqueue.New("postgres://user:pass@127.0.0.1:5432/app?sslmode=disable") // SQL durable queue (Postgres)
redisqueue.New("127.0.0.1:6379") // Redis/Asynq
natsqueue.New("nats://127.0.0.1:4222") // NATS
sqsqueue.New("us-east-1") // SQS
rabbitmqqueue.New("amqp://guest:guest@127.0.0.1:5672/") // RabbitMQ
}import (
"context"
"github.com/goforj/queue"
)
type EmailPayload struct {
ID int `json:"id"`
}
func main() {
q, _ := queue.NewWorkerpool()
q.Register("reports:generate", func(ctx context.Context, m queue.Message) error {
return nil
})
q.Register("reports:upload", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
return nil
})
q.Register("users:notify_report_ready", func(ctx context.Context, m queue.Message) error {
return nil
})
_ = q.StartWorkers(context.Background())
defer q.Shutdown(context.Background())
chainID, _ := q.Chain(
// 1) generate report data
queue.NewJob("reports:generate").Payload(map[string]any{"report_id": "rpt_123"}),
// 2) upload report artifact after generate succeeds
queue.NewJob("reports:upload").Payload(EmailPayload{ID: 123}),
// 3) notify user only after upload succeeds
queue.NewJob("users:notify_report_ready").Payload(map[string]any{"user_id": 123}),
).OnQueue("critical").Dispatch(context.Background())
_ = chainID
}Use Run(ctx) for long-lived workers: it starts processing, waits for shutdown signals, and performs graceful termination.
import (
"context"
"log"
"os/signal"
"syscall"
"github.com/goforj/queue"
)
func main() {
q, _ := queue.NewWorkerpool()
// Register handlers before starting workers.
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
return nil
})
// Create a context that is canceled on SIGINT/SIGTERM (Ctrl+C, container stop).
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
// Run starts workers, blocks until ctx is canceled, then gracefully shuts down.
if err := q.Run(ctx); err != nil {
log.Fatal(err)
}
}Job: Typed work unit for app handlers.
_, _ = q.Dispatch(
queue.NewJob("emails:send").Payload(EmailPayload{To: "user@example.com"}),
)Chain: Ordered workflow (A then B then C).
_, _ = q.Chain(
queue.NewJob("reports:generate"),
queue.NewJob("reports:upload"),
queue.NewJob("users:notify_report_ready"),
).Dispatch(context.Background())Batch: Parallel workflow with callbacks.
_, _ = q.Batch(
queue.NewJob("emails:send"),
queue.NewJob("sms:send"),
).Then(queue.NewJob("notifications:done")).Dispatch(context.Background())Middleware: Cross-cutting execution policy.
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool},
queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)Events: Lifecycle hooks and observability.
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool, Observer: queue.NewStatsCollector()},
)Backends: Driver/runtime transport selection.
q, _ := queue.NewWorkerpool()
rq, _ := redisqueue.New("127.0.0.1:6379")
_, _ = q, rq// Define a struct for your job payload.
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
}
// Fluent builder pattern for job options.
job := queue.NewJob("emails:send").
// Payload can be bytes, structs, maps, or JSON-marshalable values.
// Default payload is empty.
Payload(EmailPayload{ID: 123, To: "user@example.com"}).
// OnQueue sets the queue name.
// Default is empty; broker-style drivers expect an explicit queue.
OnQueue("default").
// Timeout sets per-job execution timeout.
// Default is unset; some drivers may apply driver/runtime defaults.
Timeout(20 * time.Second).
// Retry sets max retries.
// Default is 0, which means one total attempt.
Retry(3).
// Backoff sets retry delay.
// Default is unset; Redis dispatch returns ErrBackoffUnsupported.
Backoff(500 * time.Millisecond).
// Delay schedules first execution in the future.
// Default is 0 (run immediately).
Delay(2 * time.Second).
// UniqueFor deduplicates Type+Payload for a TTL window.
// Default is 0 (no dedupe).
UniqueFor(45 * time.Second)
// Dispatch the job to the queue.
_, _ = q.Dispatch(job)
// In handlers, use Bind to decode payload into a struct.
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
return nil
})Run local + integration-backed benchmarks (requires Docker/testcontainers):
cd docs && GOWORK=off INTEGRATION_BACKEND=all GOCACHE=/tmp/queue-gocache go test -tags=benchrender ./bench -run '^TestRenderBenchmarks$'| Class | Driver | ns/op | ops/s | B/op | allocs/op |
|---|---|---|---|---|---|
| External | nats | 774 | 1291823 | 1258 | 13 |
| External | redis | 95295 | 10494 | 2113 | 33 |
| External | rabbitmq | 165780 | 6032 | 1882 | 57 |
| External | sqlite | 202380 | 4941 | 1931 | 47 |
| External | postgres | 1056731 | 946 | 3809 | 78 |
| External | sqs | 1873911 | 534 | 94784 | 1082 |
| External | mysql | 2286406 | 437 | 3303 | 62 |
| Local | null | 37 | 26673780 | 128 | 1 |
| Local | sync | 282 | 3539823 | 408 | 6 |
| Local | workerpool | 650 | 1538462 | 456 | 7 |
Use queue.WithMiddleware(...) to apply cross-cutting workflow behavior to workflow job execution (logging, filtering, and error policy).
Common patterns:
- wrap handler execution (before/after logging, timing, tracing)
- skip jobs conditionally (maintenance mode, feature flags)
- convert matched errors into terminal failures (no retry)
var errValidation = errors.New("validation failed")
maintenanceMode := false
audit := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
log.Printf("start job=%s", m.JobType)
err := next(ctx, m)
log.Printf("done job=%s err=%v", m.JobType, err)
return err
})
skipMaintenance := queue.SkipWhen{
Predicate: func(context.Context, queue.Message) bool {
return maintenanceMode
},
}
fatalValidation := queue.FailOnError{
When: func(err error) bool {
return errors.Is(err, errValidation)
},
}
q, _ := queue.New(
queue.Config{Driver: queue.DriverWorkerpool},
queue.WithMiddleware(audit, skipMaintenance, fatalValidation),
)
_ = qUse queue.Observer implementations to capture normalized runtime events across drivers.
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
collector,
queue.ObserverFunc(func(event queue.Event) {
_ = event.Kind
}),
)
q, _ := queue.New(queue.Config{
Driver: queue.DriverWorkerpool,
Observer: observer,
})
_ = qStatsCollectorcounters are process-local and event-driven.- In multi-process deployments, aggregate metrics externally (OTel/Prometheus/etc.).
- Prefer backend-native stats when available.
queue.SupportsNativeStats(q)indicates native driver snapshot support.queue.Snapshot(ctx, q, collector)merges native + collector where possible.
events := make(chan queue.Event, 100)
collector := queue.NewStatsCollector()
observer := queue.MultiObserver(
collector,
queue.ChannelObserver{
Events: events,
DropIfFull: true,
},
queue.ObserverFunc(func(e queue.Event) {
_ = e
}),
)
q, _ := queue.New(queue.Config{
Driver: queue.DriverWorkerpool,
Observer: observer,
})
_ = qRunnable example: examples/observeall/main.go
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
runtimeObserver := queue.ObserverFunc(func(event queue.Event) {
attemptInfo := fmt.Sprintf("attempt=%d/%d", event.Attempt, event.MaxRetry+1)
jobInfo := fmt.Sprintf("job=%s key=%s queue=%s driver=%s", event.JobType, event.JobKey, event.Queue, event.Driver)
switch event.Kind {
case queue.EventEnqueueAccepted:
logger.Info("Accepted dispatch", "msg", fmt.Sprintf("Accepted %s", jobInfo), "scheduled", event.Scheduled, "at", event.Time.Format(time.RFC3339Nano))
case queue.EventEnqueueRejected:
logger.Error("Dispatch failed", "msg", fmt.Sprintf("Rejected %s", jobInfo), "error", event.Err)
case queue.EventEnqueueDuplicate:
logger.Warn("Skipped duplicate job", "msg", fmt.Sprintf("Duplicate %s", jobInfo))
case queue.EventEnqueueCanceled:
logger.Warn("Canceled dispatch", "msg", fmt.Sprintf("Canceled %s", jobInfo), "error", event.Err)
case queue.EventProcessStarted:
logger.Info("Started processing job", "msg", fmt.Sprintf("Started %s (%s)", jobInfo, attemptInfo), "at", event.Time.Format(time.RFC3339Nano))
case queue.EventProcessSucceeded:
logger.Info("Processed job", "msg", fmt.Sprintf("Processed %s in %s (%s)", jobInfo, event.Duration, attemptInfo))
case queue.EventProcessFailed:
logger.Error("Processing failed", "msg", fmt.Sprintf("Failed %s after %s (%s)", jobInfo, event.Duration, attemptInfo), "error", event.Err)
case queue.EventProcessRetried:
logger.Warn("Retrying job", "msg", fmt.Sprintf("Retry scheduled for %s (%s)", jobInfo, attemptInfo), "error", event.Err)
case queue.EventProcessArchived:
logger.Error("Archived failed job", "msg", fmt.Sprintf("Archived %s after final failure (%s)", jobInfo, attemptInfo), "error", event.Err)
case queue.EventQueuePaused:
logger.Info("Paused queue", "msg", fmt.Sprintf("Paused queue=%s driver=%s", event.Queue, event.Driver))
case queue.EventQueueResumed:
logger.Info("Resumed queue", "msg", fmt.Sprintf("Resumed queue=%s driver=%s", event.Queue, event.Driver))
default:
logger.Info("Queue event", "msg", fmt.Sprintf("kind=%s %s", event.Kind, jobInfo))
}
})
workflowObserver := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
logger.Info("workflow event",
"kind", event.Kind,
"dispatch_id", event.DispatchID,
"job_id", event.JobID,
"chain_id", event.ChainID,
"batch_id", event.BatchID,
"job_type", event.JobType,
"queue", event.Queue,
"attempt", event.Attempt,
"duration", event.Duration,
"err", event.Err,
)
})
q, _ := queue.New(
queue.Config{
Driver: queue.DriverSync,
Observer: runtimeObserver,
},
queue.WithObserver(workflowObserver),
)
_ = q| Type | EventKind | Meaning |
|---|---|---|
| queue | enqueue_accepted | Job accepted by driver for enqueue. |
| queue | enqueue_rejected | Job enqueue failed. |
| queue | enqueue_duplicate | Duplicate job rejected due to uniqueness key. |
| queue | enqueue_canceled | Context cancellation prevented enqueue. |
| queue | process_started | Worker began processing job. |
| queue | process_succeeded | Handler returned success. |
| queue | process_failed | Handler returned error. |
| queue | process_retried | Driver scheduled retry attempt. |
| queue | process_archived | Job moved to terminal failure state. |
| queue | queue_paused | Queue was paused (driver supports pause). |
| queue | queue_resumed | Queue was resumed. |
| workflow | dispatch_started | Workflow runtime accepted a dispatch request and created a dispatch record. |
| workflow | dispatch_succeeded | Dispatch was successfully enqueued to the underlying queue runtime. |
| workflow | dispatch_failed | Dispatch failed before job execution could start. |
| workflow | job_started | A workflow job handler started execution. |
| workflow | job_succeeded | A workflow job handler completed successfully. |
| workflow | job_failed | A workflow job handler returned an error. |
| workflow | chain_started | A chain workflow was created and started. |
| workflow | chain_advanced | Chain progressed from one node to the next node. |
| workflow | chain_completed | Chain reached terminal success. |
| workflow | chain_failed | Chain reached terminal failure. |
| workflow | batch_started | A batch workflow was created and started. |
| workflow | batch_progressed | Batch state changed as jobs completed/failed. |
| workflow | batch_completed | Batch reached terminal success (or allowed-failure completion). |
| workflow | batch_failed | Batch reached terminal failure. |
| workflow | batch_cancelled | Batch was cancelled before normal completion. |
| workflow | callback_started | Chain/batch callback execution started. |
| workflow | callback_succeeded | Chain/batch callback completed successfully. |
| workflow | callback_failed | Chain/batch callback returned an error. |
Runnable examples live in the separate examples module (./examples).
They are not included when applications import github.com/goforj/queue, which keeps dependency graphs and build/link overhead smaller.
The API section below is autogenerated; do not edit between the markers.
New creates the high-level Queue API based on Config.Driver.
q, err := queue.New(queue.Config{Driver: queue.DriverWorkerpool})
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
_ = payload
return nil
})
_ = q.WithWorkers(1).StartWorkers(context.Background()) // optional; default: runtime.NumCPU() (min 1)
defer q.Shutdown(context.Background())
_, _ = q.Dispatch(
queue.NewJob("emails:send").
Payload(EmailPayload{ID: 1}).
OnQueue("default"),
)NewNull creates a Queue on the null backend.
q, err := queue.NewNull()
if err != nil {
return
}NewStatsCollector creates an event collector for queue counters.
collector := queue.NewStatsCollector()NewSync creates a Queue on the synchronous in-process backend.
q, err := queue.NewSync()
if err != nil {
return
}NewWorkerpool creates a Queue on the in-process workerpool backend.
q, err := queue.NewWorkerpool()
if err != nil {
return
}Backoff sets delay between retries.
job := queue.NewJob("emails:send").Backoff(500 * time.Millisecond)Bind unmarshals job payload JSON into dst.
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
}
job := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "user@example.com",
})
var payload EmailPayload
if err := job.Bind(&payload); err != nil {
return
}
_ = payload.ToDelay defers execution by duration.
job := queue.NewJob("emails:send").Delay(300 * time.Millisecond)NewJob creates a job value with a required job type.
job := queue.NewJob("emails:send")OnQueue sets the target queue name.
job := queue.NewJob("emails:send").OnQueue("critical")Payload sets job payload from common value types.
Example: payload bytes
jobBytes := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))Example: payload struct
type Meta struct {
Nested bool `json:"nested"`
}
type EmailPayload struct {
ID int `json:"id"`
To string `json:"to"`
Meta Meta `json:"meta"`
}
jobStruct := queue.NewJob("emails:send").Payload(EmailPayload{
ID: 1,
To: "user@example.com",
Meta: Meta{Nested: true},
})Example: payload map
jobMap := queue.NewJob("emails:send").Payload(map[string]any{
"id": 1,
"to": "user@example.com",
"meta": map[string]any{"nested": true},
})PayloadBytes returns a copy of job payload bytes.
job := queue.NewJob("emails:send").Payload([]byte(`{"id":1}`))
payload := job.PayloadBytes()PayloadJSON marshals payload as JSON.
job := queue.NewJob("emails:send").PayloadJSON(map[string]int{"id": 1})Retry sets max retry attempts.
job := queue.NewJob("emails:send").Retry(4)Timeout sets per-job execution timeout.
job := queue.NewJob("emails:send").Timeout(10 * time.Second)UniqueFor enables uniqueness dedupe within the given TTL.
job := queue.NewJob("emails:send").UniqueFor(45 * time.Second)Active returns active count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Active: 2},
},
}
fmt.Println(snapshot.Active("default"))
// Output: 2Archived returns archived count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Archived: 7},
},
}
fmt.Println(snapshot.Archived("default"))
// Output: 7Failed returns failed count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Failed: 2},
},
}
fmt.Println(snapshot.Failed("default"))
// Output: 2MultiObserver fans out events to multiple observers.
events := make(chan queue.Event, 2)
observer := queue.MultiObserver(
queue.ChannelObserver{Events: events},
queue.ObserverFunc(func(queue.Event) {}),
)
observer.Observe(queue.Event{Kind: queue.EventEnqueueAccepted})
fmt.Println(len(events))
// Output: 1Observe forwards an event to the configured channel.
ch := make(chan queue.Event, 1)
observer := queue.ChannelObserver{Events: ch}
observer.Observe(queue.Event{Kind: queue.EventProcessStarted, Queue: "default"})
event := <-chObserve handles a queue runtime event.
var observer queue.Observer
observer.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
})Observe calls the wrapped function.
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
observer := queue.ObserverFunc(func(event queue.Event) {
logger.Info("queue event",
"kind", event.Kind,
"driver", event.Driver,
"queue", event.Queue,
"job_type", event.JobType,
"attempt", event.Attempt,
"max_retry", event.MaxRetry,
"duration", event.Duration,
"err", event.Err,
)
})
observer.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobType: "emails:send",
})Observe records an event and updates normalized counters.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})Pause pauses queue consumption for drivers that support it.
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 1Paused returns paused count for a queue.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventQueuePaused,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
fmt.Println(snapshot.Paused("default"))
// Output: 1Pending returns pending count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Pending: 3},
},
}
fmt.Println(snapshot.Pending("default"))
// Output: 3Processed returns processed count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Processed: 11},
},
}
fmt.Println(snapshot.Processed("default"))
// Output: 11Queue returns queue counters for a queue name.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, ok := snapshot.Queue("default")
fmt.Println(ok, counters.Pending)
// Output: true 1Queues returns sorted queue names present in the snapshot.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "critical",
Time: time.Now(),
})
snapshot := collector.Snapshot()
names := snapshot.Queues()
fmt.Println(len(names), names[0])
// Output: 1 criticalResume resumes queue consumption for drivers that support it.
q, _ := queue.NewSync()
_ = queue.Pause(context.Background(), q, "default")
_ = queue.Resume(context.Background(), q, "default")
snapshot, _ := queue.Snapshot(context.Background(), q, nil)
fmt.Println(snapshot.Paused("default"))
// Output: 0RetryCount returns retry count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Retry: 1},
},
}
fmt.Println(snapshot.RetryCount("default"))
// Output: 1SafeObserve delivers an event to an observer and recovers observer panics.
This is an advanced helper intended for driver-module implementations.
Scheduled returns scheduled count for a queue.
snapshot := queue.StatsSnapshot{
ByQueue: map[string]queue.QueueCounters{
"default": {Scheduled: 4},
},
}
fmt.Println(snapshot.Scheduled("default"))
// Output: 4Snapshot returns driver-native stats, falling back to collector data.
q, _ := queue.NewSync()
snapshot, _ := q.Stats(context.Background())
_, ok := snapshot.Queue("default")
fmt.Println(ok)
// Output: trueSnapshot returns a copy of collected counters.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventEnqueueAccepted,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessStarted,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Time: time.Now(),
})
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
JobKey: "job-1",
Duration: 12 * time.Millisecond,
Time: time.Now(),
})
snapshot := collector.Snapshot()
counters, _ := snapshot.Queue("default")
throughput, _ := snapshot.Throughput("default")
fmt.Printf("queues=%v\n", snapshot.Queues())
fmt.Printf("counters=%+v\n", counters)
fmt.Printf("hour=%+v\n", throughput.Hour)
// Output:
// queues=[default]
// counters={Pending:0 Active:0 Scheduled:0 Retry:0 Archived:0 Processed:1 Failed:0 Paused:0 AvgWait:0s AvgRun:12ms}
// hour={Processed:1 Failed:0}SupportsNativeStats reports whether a queue runtime exposes native stats snapshots.
q, _ := queue.NewSync()
fmt.Println(queue.SupportsNativeStats(q))
// Output: trueSupportsPause reports whether a queue runtime supports Pause/Resume.
q, _ := queue.NewSync()
fmt.Println(queue.SupportsPause(q))
// Output: trueThroughput returns rolling throughput windows for a queue name.
collector := queue.NewStatsCollector()
collector.Observe(queue.Event{
Kind: queue.EventProcessSucceeded,
Driver: queue.DriverSync,
Queue: "default",
Time: time.Now(),
})
snapshot := collector.Snapshot()
throughput, ok := snapshot.Throughput("default")
fmt.Printf("ok=%v hour=%+v day=%+v week=%+v\n", ok, throughput.Hour, throughput.Day, throughput.Week)
// Output: ok=true hour={Processed:1 Failed:0} day={Processed:1 Failed:0} week={Processed:1 Failed:0}Batch creates a batch builder for fan-out workflow execution.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Batch(
queue.NewJob("emails:send").Payload(map[string]any{"id": 1}),
queue.NewJob("emails:send").Payload(map[string]any{"id": 2}),
).Name("send-emails").OnQueue("default").Dispatch(context.Background())Chain creates a chain builder for sequential workflow execution.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
q.Register("second", func(ctx context.Context, m queue.Message) error { return nil })
_, _ = q.Chain(
queue.NewJob("first"),
queue.NewJob("second"),
).OnQueue("default").Dispatch(context.Background())Dispatch enqueues a high-level job using context.Background.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
job := queue.NewJob("emails:send").Payload(map[string]any{"id": 1}).OnQueue("default")
_, _ = q.Dispatch(job)DispatchCtx enqueues a high-level job using the provided context.
Driver reports the configured backend driver for the underlying queue runtime.
q, err := queue.NewSync()
if err != nil {
return
}
fmt.Println(q.Driver())
// Output: syncFindBatch returns current batch state by ID.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
batchID, err := q.Batch(queue.NewJob("emails:send")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindBatch(context.Background(), batchID)FindChain returns current chain state by ID.
q, err := queue.NewSync()
if err != nil {
return
}
q.Register("first", func(ctx context.Context, m queue.Message) error { return nil })
chainID, err := q.Chain(queue.NewJob("first")).Dispatch(context.Background())
if err != nil {
return
}
_, _ = q.FindChain(context.Background(), chainID)Pause pauses consumption for a queue when supported by the underlying driver. See the README "Queue Backends" table for Pause/Resume support and docs/backend-guarantees.md (Capability Matrix) for broader backend differences.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
_ = q.Pause(context.Background(), "default")
}Prune deletes old workflow state records.
q, err := queue.NewSync()
if err != nil {
return
}
_ = q.Prune(context.Background(), time.Now().Add(-24*time.Hour))Register binds a handler for a high-level job type.
q, err := queue.NewSync()
if err != nil {
return
}
type EmailPayload struct {
ID int `json:"id"`
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error {
var payload EmailPayload
if err := m.Bind(&payload); err != nil {
return err
}
_ = payload
return nil
})Resume resumes consumption for a queue when supported by the underlying driver.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsPause(q) {
_ = q.Resume(context.Background(), "default")
}Run starts worker processing, blocks until ctx is canceled, then gracefully shuts down.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := queue.NewWorkerpool()
if err != nil {
return
}
q.Register("emails:send", func(ctx context.Context, m queue.Message) error { return nil })
go func() {
time.Sleep(100 * time.Millisecond)
cancel()
}()
_ = q.Run(ctx)Shutdown drains workers and closes underlying resources.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
_ = q.StartWorkers(context.Background())
_ = q.Shutdown(context.Background())StartWorkers starts worker processing.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
_ = q.StartWorkers(context.Background())Stats returns a normalized snapshot when supported by the underlying driver.
q, err := queue.NewSync()
if err != nil {
return
}
if queue.SupportsNativeStats(q) {
_, _ = q.Stats(context.Background())
}WithClock overrides the workflow runtime clock.
q, err := queue.New(
queue.Config{Driver: queue.DriverSync},
queue.WithClock(func() time.Time { return time.Unix(0, 0) }),
)
if err != nil {
return
}WithMiddleware appends queue workflow middleware.
mw := queue.MiddlewareFunc(func(ctx context.Context, m queue.Message, next queue.Next) error {
return next(ctx, m)
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithMiddleware(mw))
if err != nil {
return
}WithObserver installs a workflow lifecycle observer.
observer := queue.WorkflowObserverFunc(func(event queue.WorkflowEvent) {
_ = event.Kind
})
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithObserver(observer))
if err != nil {
return
}WithStore overrides the workflow orchestration store.
var store queue.WorkflowStore
q, err := queue.New(queue.Config{Driver: queue.DriverSync}, queue.WithStore(store))
if err != nil {
return
}WithWorkers sets desired worker concurrency before StartWorkers. It applies to high-level queue constructors (for example NewWorkerpool/New/NewSync).
q, err := queue.NewWorkerpool(
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}WithWorkers sets desired worker concurrency before StartWorkers.
q, err := queue.NewWorkerpool()
if err != nil {
return
}
q.WithWorkers(4) // optional; default: runtime.NumCPU() (min 1)AssertCount fails when dispatch count is not expected.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertCount(t, 1)AssertDispatched fails when jobType was not dispatched.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatched(t, "emails:send")AssertDispatchedOn fails when jobType was not dispatched on queueName.
fake := queue.NewFake()
_ = fake.Dispatch(
queue.NewJob("emails:send").
OnQueue("critical"),
)
fake.AssertDispatchedOn(t, "critical", "emails:send")AssertDispatchedTimes fails when jobType dispatch count does not match expected.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertDispatchedTimes(t, "emails:send", 2)AssertNotDispatched fails when jobType was dispatched.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send"))
fake.AssertNotDispatched(t, "emails:cancel")AssertNothingDispatched fails when any dispatch was recorded.
fake := queue.NewFake()
fake.AssertNothingDispatched(t)Dispatch records a typed job payload in-memory using the fake default queue.
fake := queue.NewFake()
err := fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))DispatchCtx submits a typed job payload using the provided context.
fake := queue.NewFake()
ctx := context.Background()
err := fake.DispatchCtx(ctx, queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(err == nil)
// Output: trueDriver returns the active queue driver.
fake := queue.NewFake()
driver := fake.Driver()NewFake creates a queue fake that records dispatches and provides assertions.
fake := queue.NewFake()
_ = fake.Dispatch(
queue.NewJob("emails:send").
Payload(map[string]any{"id": 1}).
OnQueue("critical"),
)
records := fake.Records()
fmt.Println(len(records), records[0].Queue, records[0].Job.Type)
// Output: 1 critical emails:sendRecords returns a copy of all dispatch records.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
records := fake.Records()
fmt.Println(len(records), records[0].Job.Type)
// Output: 1 emails:sendRegister associates a handler with a job type.
fake := queue.NewFake()
fake.Register("emails:send", func(context.Context, queue.Job) error { return nil })Reset clears all recorded dispatches.
fake := queue.NewFake()
_ = fake.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
fmt.Println(len(fake.Records()))
fake.Reset()
fmt.Println(len(fake.Records()))
// Output:
// 1
// 0Shutdown drains running work and releases resources.
fake := queue.NewFake()
err := fake.Shutdown(context.Background())StartWorkers starts worker execution.
fake := queue.NewFake()
err := fake.StartWorkers(context.Background())Workers sets desired worker concurrency before StartWorkers.
fake := queue.NewFake()
q := fake.Workers(4)
fmt.Println(q != nil)
// Output: trueNew creates a high-level Queue using the MySQL SQL backend.
q, err := mysqlqueue.New(
"user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true",
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit MySQL SQL driver config.
q, err := mysqlqueue.NewWithConfig(
mysqlqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
DB: nil, // optional; provide *sql.DB instead of DSN
DSN: "user:pass@tcp(127.0.0.1:3306)/queue?parseTime=true", // optional if DB is set
ProcessingRecoveryGrace: 2 * time.Second, // default if <=0: 2s
ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
},
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}New creates a high-level Queue using the NATS backend.
q, err := natsqueue.New(
"nats://127.0.0.1:4222",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit NATS driver config.
q, err := natsqueue.NewWithConfig(
natsqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
URL: "nats://127.0.0.1:4222", // required
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}New creates a high-level Queue using the Postgres SQL backend.
q, err := postgresqueue.New(
"postgres://user:pass@127.0.0.1:5432/queue?sslmode=disable",
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit Postgres SQL driver config.
q, err := postgresqueue.NewWithConfig(
postgresqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
DB: nil, // optional; provide *sql.DB instead of DSN
DSN: "postgres://user:pass@127.0.0.1:5432/queue?sslmode=disable", // optional if DB is set
ProcessingRecoveryGrace: 2 * time.Second, // default if <=0: 2s
ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
},
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}New creates a high-level Queue using the RabbitMQ backend.
q, err := rabbitmqqueue.New(
"amqp://guest:guest@127.0.0.1:5672/",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit RabbitMQ driver config.
q, err := rabbitmqqueue.NewWithConfig(
rabbitmqqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
URL: "amqp://guest:guest@127.0.0.1:5672/", // required
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}New creates a high-level Queue using the Redis backend.
q, err := redisqueue.New(
"127.0.0.1:6379",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit Redis driver config.
q, err := redisqueue.NewWithConfig(
redisqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
Addr: "127.0.0.1:6379", // required
Password: "", // optional; default empty
DB: 0, // optional; default 0
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}New creates a high-level Queue using the SQLite SQL backend.
q, err := sqlitequeue.New(
"file:queue.db?_busy_timeout=5000",
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit SQLite SQL driver config.
q, err := sqlitequeue.NewWithConfig(
sqlitequeue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
DB: nil, // optional; provide *sql.DB instead of DSN
DSN: "file:queue.db?_busy_timeout=5000", // optional if DB is set
ProcessingRecoveryGrace: 2 * time.Second, // default if <=0: 2s
ProcessingLeaseNoTimeout: 5 * time.Minute, // default if <=0: 5m
},
queue.WithWorkers(4), // optional; default: 1 worker
)
if err != nil {
return
}New creates a high-level Queue using the SQS backend.
q, err := sqsqueue.New(
"us-east-1",
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}NewWithConfig creates a high-level Queue using an explicit SQS driver config.
q, err := sqsqueue.NewWithConfig(
sqsqueue.Config{
DriverBaseConfig: queueconfig.DriverBaseConfig{
DefaultQueue: "critical", // default if empty: "default"
Observer: nil, // default: nil
},
Region: "us-east-1", // default if empty: "us-east-1"
Endpoint: "", // optional; set for LocalStack/custom endpoint
AccessKey: "", // optional; static credentials
SecretKey: "", // optional; static credentials
},
queue.WithWorkers(4), // optional; default: runtime.NumCPU() (min 1)
)
if err != nil {
return
}Examples in this section assume they are used inside tests and t is a *testing.T (or testing.TB).
AssertBatchCount fails if total recorded workflow batch count does not match n.
f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertBatchCount(t, 1)AssertBatched fails unless at least one recorded workflow batch matches predicate.
f := queuefake.New()
_, _ = f.Workflow().Batch(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertBatched(t, func(spec bus.BatchSpec) bool { return len(spec.JobTypes) == 2 })AssertChained fails if no recorded workflow chain matches expected job type order.
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil), bus.NewJob("b", nil)).Dispatch(nil)
f.AssertChained(t, []string{"a", "b"})AssertCount fails when total dispatch count is not expected.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
f.AssertCount(t, 2)AssertDispatched fails when jobType was not dispatched.
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
f.AssertDispatched(t, "emails:send")AssertDispatchedOn fails when jobType was not dispatched on queueName.
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
f.AssertDispatchedOn(t, "critical", "emails:send")AssertDispatchedTimes fails when jobType dispatch count does not match expected.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
f.AssertDispatchedTimes(t, "emails:send", 2)AssertNotDispatched fails when jobType was dispatched.
f := queuefake.New()
f.AssertNotDispatched(t, "emails:send")AssertNothingBatched fails if any workflow batch was recorded.
f := queuefake.New()
f.AssertNothingBatched(t)AssertNothingDispatched fails when any dispatch was recorded.
f := queuefake.New()
f.AssertNothingDispatched(t)AssertNothingWorkflowDispatched fails when any workflow dispatch was recorded.
f := queuefake.New()
f.AssertNothingWorkflowDispatched(t)AssertWorkflowDispatched fails when jobType was not workflow-dispatched.
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatched(t, "a")AssertWorkflowDispatchedOn fails when jobType was not workflow-dispatched on queueName.
f := queuefake.New()
_, _ = f.Workflow().Chain(bus.NewJob("a", nil)).OnQueue("critical").Dispatch(nil)
f.AssertWorkflowDispatchedOn(t, "critical", "a")AssertWorkflowDispatchedTimes fails when workflow dispatch count for jobType does not match expected.
f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
_, _ = wf.Chain(bus.NewJob("a", nil)).Dispatch(nil)
f.AssertWorkflowDispatchedTimes(t, "a", 2)AssertWorkflowNotDispatched fails when jobType was workflow-dispatched.
f := queuefake.New()
f.AssertWorkflowNotDispatched(t, "emails:send")Count returns the total number of recorded dispatches.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("a"))
_ = q.Dispatch(queue.NewJob("b"))
_ = f.Count()CountJob returns how many times a job type was dispatched.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = q.Dispatch(queue.NewJob("emails:send"))
_ = f.CountJob("emails:send")CountOn returns how many times a job type was dispatched on a queue.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("critical"))
_ = f.CountOn("critical", "emails:send")New creates a fake queue harness backed by queue.NewFake().
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))
f.AssertDispatched(t, "emails:send")
f.AssertCount(t, 1)Queue returns the queue fake to inject into code under test.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send").OnQueue("default"))Records returns a copy of recorded dispatches.
f := queuefake.New()
_ = f.Queue().Dispatch(queue.NewJob("emails:send"))
records := f.Records()Reset clears recorded dispatches.
f := queuefake.New()
q := f.Queue()
_ = q.Dispatch(queue.NewJob("emails:send"))
f.Reset()
f.AssertNothingDispatched(t)Workflow returns the workflow/orchestration fake for chain/batch assertions.
f := queuefake.New()
wf := f.Workflow()
_, _ = wf.Chain(
bus.NewJob("a", nil),
bus.NewJob("b", nil),
).Dispatch(context.Background())
f.AssertChained(t, []string{"a", "b"})Unit tests (root module):
go test ./...Integration tests (separate integration module):
go test -tags=integration ./integration/...Select specific backends with INTEGRATION_BACKEND (comma-separated), for example:
INTEGRATION_BACKEND=sqlite go test -tags=integration ./integration/...
INTEGRATION_BACKEND=redis,rabbitmq go test -tags=integration ./integration/... -count=1
INTEGRATION_BACKEND=all go test -tags=integration ./integration/... -count=1Matrix status and backend integration notes are tracked in docs/integration-scenarios.md.
