CloudEvents-framed domain event publisher to Redpanda/Kafka, with circuit-breaker + outbox fallback, per-topic DLQ, and franz-go backing.
Producer-only. Orthogonal to github.com/LerianStudio/lib-commons/v5/commons/rabbitmq (internal command queues) — neither deprecates the other.
- Module:
github.com/LerianStudio/lib-streaming - Version:
Unreleased(post-v0.2.0) - Go:
1.25.9 - License: Elastic License 2.0 (see LICENSE)
go get github.com/LerianStudio/lib-streaming@main
v0.2.0 is a breaking change. Highlights:
NewProducernow requiresWithCatalog(catalog); build aCatalogofEventDefinitionrecords at bootstrap.EmittakesEmitRequest{DefinitionKey, TenantID, Payload, ...}— the catalog ownsResourceType/EventType/SchemaVersion.Config.EventToggles/STREAMING_EVENT_TOGGLESreplaced byConfig.PolicyOverrides/STREAMING_EVENT_POLICIES.RegisterOutboxHandler(registry, eventTypes...)collapsed toRegisterOutboxRelay(registry)— one stable relay event type.
See CHANGELOG.md for the full list and the outbox-row compatibility notes.
Bootstrap in main.go:
cfg, warnings, err := streaming.LoadConfig()
if err != nil { return err }
for _, warning := range warnings { logger.Log(ctx, log.LevelWarn, warning) }
runtime.InitPanicMetrics(metricsFactory)
assert.InitAssertionMetrics(metricsFactory)
catalog, err := streaming.NewCatalog(streaming.EventDefinition{
Key: "transaction.created",
ResourceType: "transaction",
EventType: "created",
})
if err != nil { return err }
producer, err := streaming.NewProducer(ctx, cfg,
streaming.WithLogger(logger),
streaming.WithMetricsFactory(metricsFactory),
streaming.WithTracer(tracer),
streaming.WithCircuitBreakerManager(cbManager),
streaming.WithOutboxRepository(outboxRepo),
streaming.WithCatalog(catalog),
)
if err != nil { return err }
if err := producer.RegisterOutboxRelay(outboxRegistry); err != nil { return err }
if err := launcher.Add("streaming", producer); err != nil { return err }Service method uses the injected Emitter:
err := emitter.Emit(ctx, streaming.EmitRequest{
DefinitionKey: "transaction.created",
TenantID: "t-abc",
Subject: "tx-123",
Payload: payloadBytes,
})Unit-test with the mock emitter:
mock := streamingtest.NewMockEmitter()
svc := NewMyService(mock)
svc.DoSomething(ctx)
streamingtest.AssertEventEmitted(t, mock, "transaction.created")- CloudEvents 1.0 binary mode. Headers carry
ce-*context attributes; payload is raw JSON. - franz-go backing. Battle-tested Kafka client with automatic batching, retries, and backpressure.
- Circuit breaker. Opens on broker failure; half-open probes recover automatically.
- Policy-driven delivery. Catalog defaults, environment overrides, and call overrides resolve direct publish, outbox, and DLQ behavior per event.
- Outbox fallback. When policy selects outbox (
alwaysor circuit-open fallback) andWithOutboxRepository/WithOutboxWriteris wired,Emitstores a versionedOutboxEnvelopeunder the stablelerian.streaming.publishrelay type. RegisterRegisterOutboxRelayonce with the app outbox dispatcher. - Per-topic DLQ. Failures land on
<source>.dlqwith six structured headers (source topic, error class, error message, retry count, first-failure-at, producer ID) for forensic analysis. - Tenant-aware partitioning.
Event.PartitionKey()returnsTenantIDby default;SystemEvent=trueswitches to"system:" + EventType. - Caller-safe errors.
IsCallerError(err)distinguishes caller-correctable faults (validation, payload shape) from infrastructure faults (broker down, network).
| Implementation | Use case |
|---|---|
*Producer |
Production. Backed by franz-go. Implements commons.App. |
NoopEmitter |
STREAMING_ENABLED=false or empty STREAMING_BROKERS. No-op. |
streamingtest.MockEmitter |
Unit tests. Concurrency-safe. Deep-copies emit requests. Includes Assert* helpers and WaitForEvent. |
streaming.New(ctx, cfg, opts...) returns the right implementation based on Config. streaming.NewProducer(ctx, cfg, opts...) forces *Producer construction.
All variables use the STREAMING_ prefix. See the package godoc (go doc github.com/LerianStudio/lib-streaming) for the full table with defaults, types, and purpose. The reference set also lives in .env.reference.
Elastic License 2.0. See LICENSE.