diff --git a/.gitignore b/.gitignore index ac2f31bf8d..dfced3185e 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ dist/ .DS_Store .idea *.iml +.claude/ diff --git a/guides/deploy/build.md b/guides/deploy/build.md index f8055bbd3e..63ac0c83c2 100644 --- a/guides/deploy/build.md +++ b/guides/deploy/build.md @@ -25,7 +25,7 @@ Build tasks are derived from the CDS configuration and project context. By defau - _db/_, _srv/_, _app/_ — default root folders of a CAP project - _fts/_ and its subfolders when using [feature toggles](../extensibility/feature-toggles#enable-feature-toggles) - CDS model folders and files defined by [required services](../../node.js/cds-env#services) - - Built-in examples: [persistent queue](../../node.js/queue#persistent-queue) or [MTX-related services](../multitenancy/mtxs#mtx-services-reference) + - Built-in examples: [event queue](../../node.js/event-queues#configuration) or [MTX-related services](../multitenancy/mtxs#mtx-services-reference) - Explicit `src` folder configured in the build task diff --git a/guides/events/_menu.md b/guides/events/_menu.md index ac3d5b3c11..dfaef0c700 100644 --- a/guides/events/_menu.md +++ b/guides/events/_menu.md @@ -2,8 +2,8 @@ # [Core Concepts](core-concepts) # [Event Queues](event-queues) # [Messaging](messaging) -# [Apache Kafka](../../../guides/events/apache-kafka) -# [Advanced Event Mesh](is-aem) -# [SAP Event Mesh](event-mesh) -# [SAP Event Hub](event-hub) -# [Events from S/4](s4) +# [• Apache Kafka](../../../guides/events/apache-kafka) +# [• Advanced Event Mesh](is-aem) +# [• SAP Event Mesh](event-mesh) +# [• SAP Event Hub](event-hub) +# [• Events from S/4](s4) diff --git a/guides/events/assets/event-queues-motivation.drawio b/guides/events/assets/event-queues-motivation.drawio new file mode 100644 index 0000000000..b747dd3679 --- /dev/null +++ b/guides/events/assets/event-queues-motivation.drawio @@ -0,0 +1,106 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/guides/events/assets/event-queues-motivation.drawio.svg b/guides/events/assets/event-queues-motivation.drawio.svg new file mode 100644 index 0000000000..8d85dd7b82 --- /dev/null +++ b/guides/events/assets/event-queues-motivation.drawio.svg @@ -0,0 +1,4 @@ + + + +








High Risk of Inconsistent Data
High Risk of Inconsistent Data...








Eventual Consistency
Eventual Consistency...
Database
Database
Service A
Service A
Service B
Service B
Database
Database
Database



Database...
Service A
Service A
Service B
Service B
Database
Database
Event Queue     
Event Queue     
\ No newline at end of file diff --git a/guides/events/event-queues.md b/guides/events/event-queues.md index 89f14eff84..87ef425dda 100644 --- a/guides/events/event-queues.md +++ b/guides/events/event-queues.md @@ -1,62 +1,668 @@ --- synopsis: > - Transactional Event Queues allow the scheduling of events and background tasks for asynchronous exactly once processing and with ultimate resilience. + Transactional Event Queues let you persist events and scheduled tasks in the same database transaction as your business data, then process them asynchronously with retries and a dead letter queue. status: released --- # Transactional Event Queues +The *'Transactional Outbox'* Pattern, generalized {.subtitle} -{{ $frontmatter.synopsis }} +Persist events and scheduled tasks in the same database transaction as your business data, then process them asynchronously with retries and a dead letter queue. +{.abstract} +> [!tip] Transactional Event Queues – Guiding Principles +> +> 1. Queued work is written in the same transaction as your business data → *no phantom events, no lost events* +> 2. A background runner dispatches it after commit, not during the request → *fast request handling, durable side effects* +> 3. Failed work is retried with exponential backoff; unrecoverable entries become dead letters → *ultimate resilience* +> +> => Application developers stay focused on the domain, not on failure modes. + +[toc]:./ [[toc]] -## Event Queues: Concept +## Motivation + +Distributed side effects are hard to get right. +An application may commit local data, but a follow-up remote call can still fail because of network errors, service outages, or a process crash. Two-phase commits across a database and a remote service or message broker aren't an option in modern cloud architectures, so applications instead aim for **eventual consistency**: the local state and the remote state may diverge briefly, but converge after the dispatch completes (or after compensation, if it fails permanently). + +*Transactional Event Queues* are CAP's mechanism for that. They store the follow-up work in the database as part of the **same transaction** as your business data. Once the transaction commits, a background runner reads pending messages and dispatches them — retrying with exponentially increasing delays on failure, and moving the message to a dead letter queue after a configurable number of attempts. + +![Two side-by-side diagrams contrasting two integration patterns. On the left, on a red background labelled 'High Risk of Inconsistent Data', Service A calls Service B directly and each service writes to its own database — if either step fails after the other has committed, the two databases diverge. On the right, on a green background labelled 'Eventual Consistency', Service A writes to its own database and into an event queue inside that same database in one transaction; the queued message is then dispatched to Service B asynchronously. Service B still writes to its own database. Either both writes on the Service-A side commit together or neither does, and the call to Service B is retried until it succeeds.](assets/event-queues-motivation.drawio.svg) + +Because the queued message and your business data share the same database transaction, you get two core guarantees: + +- **No phantom events** — if the transaction rolls back, no message is sent. +- **No lost events** — if the transaction commits, the queued work is persisted and processed eventually. CAP avoids duplicate execution under normal operation, but handlers should still be idempotent to tolerate rare crash windows or external side effects. + +This pattern is widely known as the [*'Transactional Outbox'*](https://microservices.io/patterns/data/transactional-outbox.html), but CAP's event queues go beyond outbound messages. They cover three use cases: + +- **Outbox** — defer outbound calls to remote services and emits to message brokers until the transaction succeeds. +- **Inbox** — acknowledge inbound messages immediately and process them asynchronously. +- **Scheduled Tasks** — run periodic or delayed work such as data replication. + +### Pub/Sub vs. Event Queues + +These are sometimes confused but solve different problems. + +**Pub/sub** — typically realized through a message broker — *loosely couples microservices*. A producer publishes events without knowing who consumes them; consumers subscribe by topic. The unit of trust is the broker. + +**Event queues** address *async workload processing within one service*. They turn a piece of work into a database row that survives commit, restart, and retry, then dispatch it later — to the same service in process, to a remote service, or to a message broker. The unit of trust is the database transaction. + +The two compose: when the dispatch target *is* a message broker, the event queue is the transactional bridge that makes pub/sub safe across the local commit. The [Inbox](#inbox) does the mirror image on the receiving side. + +> [!info] Related patterns +> [*Event Sourcing*](https://microservices.io/patterns/data/event-sourcing.html) solves the same atomic-state-change-and-publish problem by making an append-only event log the source of truth. Event queues persist messages only until processed and then delete them — they're a transactional bridge to remote systems, not the system of record. + +> [!tip] When not to use event queues +> If you need an immediate, synchronous response from a remote system, use a normal service call. Queued calls execute asynchronously and discard the direct return value — for purely local logic that finishes inside the current request, an event queue adds nothing. + + +## Outbox + +The outbox defers outbound calls to remote services and emits to message brokers until the main transaction succeeds. +This prevents sending requests or messages to external systems when your transaction might still roll back. + + +### Programmatic Use + +**Example:** In the *xtravels* application, when an agent creates a `Bookings` record (a flight booking on a travel), the application also has to notify *xflights* of the booking. The straightforward implementation is to call *xflights* directly from an `after CREATE` handler: + +```js +const xflights = await cds.connect.to('xflights') + +this.after('CREATE', 'Bookings', async (_, req) => { + const { flight_ID: flight, flight_date: date } = req.data + // Anti-pattern: the remote call happens before the local commit is safe // [!code --] + await xflights.send('POST', 'BookingCreated', { flight, date }) // [!code --] +}) +``` + +This works in the happy case, but it's not safe: if the surrounding transaction later fails, the external booking may already exist while the local `Bookings` row gets rolled back. + +The outbox fixes this. Wrap the remote service in `cds.queued()` (Node.js) or `OutboxService.outboxed()` (Java) and dispatch as before — the call is now persisted within the current transaction and sent after commit: + +::: code-group +```js [Node.js] +const xflights = await cds.connect.to('xflights') +const qd_xflights = cds.queued(xflights) + +this.after('CREATE', 'Bookings', async (_, req) => { + const { flight_ID: flight, flight_date: date } = req.data + // Persisted within the current transaction, sent after commit // [!code ++] + await qd_xflights.send('POST', 'BookingCreated', { flight, date }) // [!code ++] +}) +``` +```java [Java] +@Autowired @Qualifier("XFlightsOutbox") +OutboxService outbox; + +@Autowired @Qualifier(CqnService.DEFAULT_NAME) +CqnService xflights; + +@After(event = CqnService.EVENT_CREATE, entity = Bookings_.CDS_NAME) +void notifyXFlights(List bookings) { + AsyncCqnService outboxedXFlights = AsyncCqnService.of(xflights, outbox); + bookings.forEach(b -> outboxedXFlights.emit("BookingCreated", + Map.of("flight", b.getFlightId(), "date", b.getFlightDate()))); +} +``` +::: + +If the transaction rolls back, no booking request is sent. + +> [!tip] Enabled by default +> Event queues are enabled by default — there's nothing to install or activate. The persistent queue starts with your application; the configuration shown later is only for tuning. + +The `xflights` connection here stands in for any remote service you've configured under `cds.requires`. The complete setup of the *xtravels* application and the *xflights* service it consumes lives in the [@capire/xtravels](https://github.com/capire/xtravels) sample. + +A queued call changes *when* work happens and *what the caller can expect back*: + +- A **direct** call returns the remote service's result (or error) and only then commits the local transaction. +- A **queued** call writes the message to the queue inside the local transaction and returns. The actual remote dispatch happens after commit, in the background. + +> [!warning] Queued calls discard the direct return value +> A queued service persists the request and returns after the message is stored, not after the remote operation finishes. Any return value from `send()` or `run()` is therefore not available to the caller. To act on the outcome, register a [callback handler](#callbacks) on `#succeeded` or `#failed`. + +> [!tip] `await` is still needed +> Even though processing is asynchronous, you still need to `await` because the message is written to the database within the current transaction. + +In Java, you can also wrap a service at runtime through the service catalog rather than wiring through Spring: + +```java +OutboxService outbox = runtime.getServiceCatalog() + .getService(OutboxService.class, "XFlightsOutbox"); +CqnService xflights = runtime.getServiceCatalog() + .getService(CqnService.class, "xflights"); + +AsyncCqnService queued = AsyncCqnService.of(xflights, outbox); +queued.emit("BookingCreated", Map.of("flight", "AA017", "date", "2026-07-15")); +``` + +To unwrap a queued service back to its synchronous original: + +::: code-group +```js [Node.js] +const xflights = cds.unqueued(qd_xflights) +``` +```java [Java] +CqnService xflights = OutboxService.unboxed(outboxedXFlights); +``` +::: + + +### By Configuration + +A service can also be outboxed centrally — without touching handler code — by setting a flag on its configuration. Every call from your handlers is then queued automatically. + +::: code-group +```json [Node.js — package.json] +{ + "cds": { + "requires": { + "messaging": { + "outboxed": true + } + } + } +} +``` +```yaml [Java — application.yaml] +cds: + outbox: + services: + DefaultOutboxUnordered: + maxAttempts: 10 +``` +::: + +This is the typical setup for **technical services** — messaging and audit logging — where every emit must be durable. CAP enables it by default for those services (see [*Auto-Outboxed Services*](#auto-outboxed-services) below). + +For **business services**, however, a class-level flag is usually too coarse. Remote integrations called from domain handlers typically need *some* calls outboxed — the post-commit notification to *xflights*, say — while others stay synchronous (a read-through query, a probe before commit). For that finer control, prefer the programmatic path with `cds.queued()` or `srv.schedule()`. + + +### Auto-Outboxed Services + +Some services are outboxed automatically — you don't need to wrap or configure them: + +| Service | Description | +|---------|-------------| +| `cds.MessagingService` | All messaging services | +| `cds.AuditLogService` | Audit log events | + +This ensures that messaging and audit log events are sent reliably and never lost because of transaction rollbacks; they use the persistent queue by default. + +[Learn more about auto-outboxed services in Node.js.](../../node.js/event-queues#queueing-a-service){.learn-more} +[Learn more about the outbox in Java.](../../java/event-queues#default-outbox-services){.learn-more} + + +### Callbacks + +Because queued calls return after the message is *stored* — not after the remote operation completes — you can't use the return value of `send()` or `run()` to react to success or failure. Instead, register a callback handler on the queued service: + +- `/#succeeded` — fires when processing completes successfully. +- `/#failed` — fires when the message becomes a dead letter (after all retries are exhausted). + +**Example:** After *xflights* successfully processes a `BookingCreated` event, the *xtravels* application replicates the booking confirmation back into its own database. If the booking fails, the application updates the local `Bookings` row to surface the error in its UI. + +```js +const xflights = await cds.connect.to('xflights') + +// Called when the queued booking succeeds +xflights.after('BookingCreated/#succeeded', async (result, req) => { + console.log('Flight booked successfully:', result) + // Replicate booking details from remote +}) + +// Called when the queued booking fails after max retries +xflights.after('BookingCreated/#failed', async (error, req) => { + console.log('Flight booking failed:', error) + // Trigger compensation logic +}) +``` + +This is also the foundation for [SAGA-style](https://microservices.io/patterns/data/saga.html) compensation across distributed systems: two-phase commits aren't possible once an outboxed call has gone out, so you keep consistency by reacting to outcomes and compensating where needed. + +> [!note] Node.js only +> Callback events `#succeeded` and `#failed` are currently available in Node.js only. Java doesn't have an equivalent yet, but it's on the roadmap. + +> [!tip] Register on specific events +> Callback handlers must be registered for the specific `#succeeded` or `#failed` events. +> The `*` wildcard handler is not called for these events. + + +## Inbox + +The inbox mirrors the [*'Outbox'* pattern](#outbox) for inbound messages. +When a message arrives from a broker, the messaging service immediately persists it to the database, acknowledges it to the broker, and schedules its processing. + +This brings two advantages: + +- **Quick acknowledgment** — the broker doesn't have to wait for your processing to complete, which keeps consumer throughput high under load. +- **Flatten the curve** — if a burst of messages arrives, they are queued in your database and processed at a controlled pace. + +> [!note] Especially useful when broker redelivery doesn't fit +> Some message brokers don't allow retriggering delivery or correcting message payloads. Others have aggressive or unconfigurable redelivery timeouts that misfire when your processing legitimately takes longer than the broker's window. With the inbox, the broker's job ends at acknowledgement and failures are handled inside your app via the [dead letter queue](#dead-letter-queue), where you have full control over retry timing, payload correction, and discard. + +Enable the inbox in your configuration: + +::: code-group +```json [Node.js — package.json] +{ + "cds": { + "requires": { + "messaging": { + "inboxed": true + } + } + } +} +``` +```yaml [Java — application.yaml] +cds: + messaging: + services: + - name: messaging-name + inbox: + enabled: true +``` +::: + +> [!warning] Inboxing shifts failure handling to your application +> With inboxing enabled, the broker considers the message delivered as soon as your app stores it. +> If later processing fails, recovery no longer happens in the broker; it happens in your application's retry and dead letter queue flow. + + +## Scheduled Tasks + +Event queues are not limited to outbound calls and messaging. +You can schedule arbitrary work such as data replication, cache refresh, or garbage collection. + +A scheduled task is identified by its event name and exists only once: a subsequent `schedule()` call with the same name overwrites the previous schedule (tasks are upserted, not deduplicated). This makes scheduling idempotent — convenient during application startup, where the same registration code may run on every boot. + +**Example:** Replicate airport master data from the xflights service every 10 minutes. + +::: code-group +```js [Node.js] +const xflights = await cds.connect.to('xflights') +await xflights.schedule('replicate', { entity: 'Airports' }).every('10m') +``` +```java [Java] +@Autowired @Qualifier("DefaultOutboxUnordered") +OutboxService outbox; + +@Autowired +RemoteService xflights; + +Schedulable.of(xflights, outbox) + .scheduled(Schedule.create().taskName("replicate-airports") + .every(Duration.ofMinutes(10))) + .emit("replicate", Map.of("entity", "Airports")); +``` +::: + +The `schedule()` method queues like `cds.queued(srv).send(event, data)` — within the current transaction, dispatched after commit — but it **upserts** a singleton task keyed by event name (or by `.as(name)`) instead of inserting a new entry on every call. It also accepts optional timing: + +```js +// Execute once, as soon as possible +await xflights.schedule('cleanup', { olderThan: '30d' }) + +// Execute once, after a delay +await xflights.schedule('cleanup', { olderThan: '30d' }) + .after('1h') // [!code highlight] + +// Execute repeatedly — supports time strings and cron expressions +await xflights.schedule('replicate', { entity: 'Airports' }) + .every('10m') // [!code highlight] +await xflights.schedule('replicate', { entity: 'Airports' }) + .every('*/10 * * * *') // [!code highlight] + +// Remove a previously scheduled task +await xflights.unschedule('replicate') +``` + +`.after()` accepts milliseconds (as a number) or a time string such as `'1s'`, `'10m'`, `'1h'`. +`.every()` accepts the same plus a five-field cron expression. +The fluent calls can be combined in any order; `.as()` is typically chained last. + +To schedule the same event with different payloads under separate identities, give each its own task name with `.as()`: + +```js +await xflights.schedule('replicate', { entity: 'Airports' }).every('10m') + .as('airports') // [!code highlight] +await xflights.schedule('replicate', { entity: 'Airlines' }).every('1h') + .as('airlines') // [!code highlight] + +// Each can be removed independently by its task name +await xflights.unschedule('airports') +await xflights.unschedule('airlines') +``` + +> [!tip] Real-world example: data federation +> The [data federation guide](../integration/data-federation) uses `srv.schedule().every()` to implement polling-based replication, fetching incremental updates from remote services on a regular interval. + + +## End-to-End Example + +The following example ties together queueing, callbacks, and local state updates. +It shows a common pattern: create local business data first, then trigger remote work asynchronously, then react to its outcome. + +```js [srv/travel-service.js] +const cds = require('@sap/cds') +const LOG = cds.log('TravelService') + +module.exports = class TravelService extends cds.ApplicationService { + async init() { + const xflights = await cds.connect.to('xflights') + const qd_xflights = cds.queued(xflights) + + this.after('CREATE', 'Bookings', async (_, req) => { + const { flight_ID: flight, flight_date: date } = req.data + await qd_xflights.send('POST', 'BookingCreated', { flight, date }) + }) + + xflights.after('BookingCreated/#succeeded', async (_, req) => { + await UPDATE('Bookings') + .set({ status: 'Booked' }) + .where({ ID: req.data.id }) + }) + + xflights.after('BookingCreated/#failed', async (err, req) => { + await UPDATE('Bookings') + .set({ status: 'BookingFailed' }) + .where({ ID: req.data.id }) + LOG.warn(`Flight booking permanently failed: ${err.message}`) + }) + + await super.init() + } +} +``` + +This example highlights an important design rule: +use callbacks or persisted status updates for outcomes, not direct return values. + + +## Configuration + +The persistent queue is enabled by default — messages are stored in the `cds.outbox.Messages` table within the current transaction (the `outbox` namespace is historical; the table backs all three patterns). You only configure the queue when you want to deviate from the defaults. + +::: code-group +```json [Node.js — package.json] +{ + "cds": { + "requires": { + "queue": { + "maxAttempts": 11 //> default: 10 + } + } + } +} +``` +```yaml [Java — application.yaml] +cds: + outbox: + services: + DefaultOutboxUnordered: + maxAttempts: 11 #> default: 10 +``` +::: + +::: details Node.js — `cds.requires.queue` + +| Option | Default | Description | +|--------|---------|-------------| +| `maxAttempts` | `10` | Maximum retries before a message becomes a dead letter | +| `timeout` | `"1h"` | Time after which a `processing` message is considered abandoned and eligible for reprocessing | + +::: + +::: details Java — per outbox service + +| Option | Default | Description | +|--------|---------|-------------| +| `maxAttempts` | `10` | Maximum retries before the entry becomes a dead letter | +| `enabled` | `true` | Set to `false` to disable an outbox service | + +A separate, runtime-global setting controls how long a `processing` entry can be held before another instance may pick it up: + +```yaml +cds.outbox.persistent.statusLock.timeout: PT1H # default +``` + +::: + +To disable event queues entirely, set `cds.requires.queue: false`. + +To disable queueing for a specific service in Node.js, set `outboxed: false` on it (for example, `cds.requires.messaging.outboxed: false`). In Java, set `cds.outbox.services..enabled: false`. + + +## Operations + +Production concerns: how runners coordinate across instances, how authorization carries over the queue boundary, retries, dead letters, and metrics. + +### Locking + +CAP uses **application-level locking** to coordinate processors across application instances. When a runner picks up a message, it sets the message's `status` to `processing`; other runners skip messages in that state. After processing, the row lock is released; the message is deleted (on success) or rescheduled (on failure) in the processing transaction. + +> [!warning] Migrating across `@sap/cds` major versions +> This guide describes the implementation in `@sap/cds` 10+. Older versions select messages differently: +> +> - **`@sap/cds` 8** does **not** check the `status` column at all. +> - **`@sap/cds` 9** checks `status` but holds a row-level lock for the duration of processing (`legacyLocking: true` is the default in cds 9). +> - **`@sap/cds` 10** uses application-level locking via `status` and releases the row lock after selection. +> +> A rolling upgrade from `@sap/cds` 8 directly to 10 can therefore lead to **double-processing of messages**, because cds 8 instances pick up messages that a cds 10 instance has already marked `processing`. Plan downtime, drain the queue before upgrading, or upgrade through cds 9 first. + +### Authorization + +When an event is processed asynchronously, the original HTTP request context is no longer available. +CAP handles this as follows: + +- The **user ID** is stored with the queued message and re-created when the message is processed. +- **User roles, attributes, and tokens** are *not* stored. Asynchronous processing always runs in privileged mode. + +There is no principal propagation across the queue boundary, by design — that would require CAP to persist authentication tokens in some encrypted form, and those tokens often expire long before the queued work runs. + +As a consequence, outbox calls reach their target system in the context of a *technical user* of the calling application, not the original end user. Outbox only those calls that the target system can authorize for a technical user — for example, service-to-service calls that don't depend on the end-user identity. If a queued handler still needs request-time information, encode it in the event payload or derive it from persisted business data. + +### Error Handling + +When processing fails, the system retries the message with exponentially increasing delays. +After a configurable maximum number of attempts, the message is moved to the dead letter queue. + +Some errors are identified as *unrecoverable* — for example, when a topic is forbidden by the broker. +These messages are immediately moved to the dead letter queue without further retries. + +To mark your own errors as unrecoverable in Node.js — for example, when *xflights* rejects a `replicate` request with a permanent 4xx response: + +```js +xflights.on('replicate', async (req) => { + try { + // call xflights to fetch the delta for the entity + // and write the result to the database + } catch (e) { + if (e.code >= 400 && e.code < 500) { // [!code highlight] + // semantic error — don't retry // [!code highlight] + e.unrecoverable = true // [!code highlight] + } // [!code highlight] + throw e + } +}) +``` + +In Java, suppress retries by catching the error and calling `context.setCompleted()`: + +```java +@On(service = "XFlightsOutbox", event = "replicate") +void replicate(OutboxMessageEventContext context) { + try { + // call xflights to fetch the delta for the entity + // and write the result to the database + } catch (HttpClientErrorException e) { + if (e.getStatusCode().is4xxClientError()) { // [!code highlight] + // semantic error — don't retry // [!code highlight] + context.setCompleted(); // [!code highlight] + return; // [!code highlight] + } // [!code highlight] + throw e; // transient — let the runner retry + } +} +``` + +### Dead Letter Queue + +Messages that exceed the maximum retry count remain in the `cds.outbox.Messages` database table with their error information intact. +These entries form the *dead letter queue* and require manual intervention — either to fix the underlying issue and retry, or to discard the message. + +> [!warning] Increasing `maxAttempts` between deployments +> You can raise `maxAttempts` between deployments. Older entries that had reached the previous maximum will be retried automatically after the new deployment — if the dead letter queue is large, this can cause unintended load on the system. + +For triage, query the table directly: + +```sql +SELECT ID, target, status, attempts, lastAttemptTimestamp, lastError + FROM cds_outbox_Messages + ORDER BY timestamp DESC; +``` + +You can also expose a CDS service to manage dead-letter entries with bound *revive* and *delete* actions: + +**1. Define the service** + +```cds [srv/outbox-dead-letter-queue-service.cds] +using from '@sap/cds/srv/outbox'; + +@requires: 'internal-user' +service OutboxDeadLetterQueueService { + + @readonly + entity DeadOutboxMessages as projection on cds.outbox.Messages + actions { + action revive(); + action delete(); + }; + +} +``` + +> [!warning] Restrict access +> The dead letter queue contains sensitive data. +> Ensure the service is accessible only to internal users. + +**2. Filter for dead entries** + +As `maxAttempts` is configurable, its value cannot be added as a static filter to the projection, but must be applied programmatically. + +::: code-group +```js [Node.js — srv/outbox-dead-letter-queue-service.js] +const cds = require('@sap/cds') -The _Outbox Pattern_ is a reliable strategy used in distributed systems to ensure that messages or events are consistently recorded and delivered, even in the face of failures. _Event Queues_ not only apply this pattern to _outbound_ messages, but also to _inbound_ messages and to _internal_ background tasks. So, event queues can be used for four different use cases: +module.exports = class OutboxDeadLetterQueueService extends cds.ApplicationService { + async init() { + this.before('READ', 'DeadOutboxMessages', function (req) { + const { maxAttempts } = cds.env.requires.queue + req.query.where('attempts >= ', maxAttempts) + }) + await super.init() + } +} +``` +```java [Java — DeadOutboxMessagesHandler.java] +@Component +@ServiceName(OutboxDeadLetterQueueService_.CDS_NAME) +public class DeadOutboxMessagesHandler implements EventHandler { -* **Outbox** → for outbound calls to remote services -* **Inbox** → for asynchronously handling inbound requests -* **Background tasks** → e.g. scheduled periodically -* **Remote Callbacks** → implementing SAGA patterns + private final PersistenceService db; -The core principle remains the same: + public DeadOutboxMessagesHandler( + @Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) { + this.db = db; + } -Instead of being sent directly to receivers, event messages are persisted in an _Event Queue_ table in the database -- **within the same transaction** as the triggering action, if applicable. + @Before(event = CqnService.EVENT_READ, entity = DeadOutboxMessages_.CDS_NAME) + public void addDeadEntryFilter(CdsReadEventContext context) { + Optional outboxFilters = createOutboxFilters(context.getCdsRuntime()); + outboxFilters.ifPresent(filter -> { + CqnSelect modified = copy(context.getCqn(), new Modifier() { + @Override + public CqnPredicate where(Predicate where) { + return filter.and(where); + } + }); + context.setCqn(modified); + }); + } +} +``` +::: -Later on, these event messages are read from the database and actually sent to the receiving services, hence **processed asynchronously** -- with retries, if necessary, so guaranteeing **ultimate resilience**. +**3. Implement bound actions** +Entries in the dead letter queue can be *revived* by resetting the retry counter to zero, or *deleted* permanently. +::: code-group +```js [Node.js — srv/outbox-dead-letter-queue-service.js] +this.on('revive', 'DeadOutboxMessages', async function (req) { + await UPDATE(req.subject).set({ attempts: 0 }) +}) +this.on('delete', 'DeadOutboxMessages', async function (req) { + await DELETE.from(req.subject) +}) +``` +```java [Java — DeadOutboxMessagesHandler.java] +@On +public void reviveOutboxMessage(DeadOutboxMessagesReviveContext context) { + CqnAnalyzer analyzer = CqnAnalyzer.create(context.getModel()); + Map key = analyzer.analyze(context.getCqn()).rootKeys(); + Messages msg = Messages.create((String) key.get(Messages.ID)); + msg.setAttempts(0); + db.run(Update.entity(Messages_.class).entry(key).data(msg)); + context.setCompleted(); +} -## Outbox { #outbox } +@On +public void deleteOutboxEntry(DeadOutboxMessagesDeleteContext context) { + CqnAnalyzer analyzer = CqnAnalyzer.create(context.getModel()); + Map key = analyzer.analyze(context.getCqn()).rootKeys(); + db.run(Delete.from(Messages_.class).byId(key.get(Messages.ID))); + context.setCompleted(); +} +``` +::: -Regarding the _outbox_, please see the following existing documentation: -- [Transactional Outbox](../../java/outbox) in CAP Java -- [Outboxing with `cds.queued`](../../node.js/queue) in CAP Node.js +### Observability -## Inbox { #inbox } +Both stacks export queue KPIs through OpenTelemetry, observed against the `cds.outbox.Messages` table: -Through the _inbox_, inbound messages can be accepted as asynchronous tasks. -That is, the messaging service persists the message to the database, acknowledges its receipt to the message broker, and schedules its processing. +| Metric | Description | Type | +|---|---|---| +| `cold` (`com.sap.cds.outbox.coldEntries`) | Entries that exhausted retries and won't be retried — the dead letter queue size. | Gauge | +| `remaining` (`com.sap.cds.outbox.remainingEntries`) | Entries pending delivery. | Gauge | +| `min` / `med` / `max storage time` (`com.sap.cds.outbox.{min,med,max}StorageTimeSeconds`) | How long entries have been sitting in the outbox, in seconds. | Gauge | +| `incoming` (`com.sap.cds.outbox.incomingMessages`) | Messages submitted to the outbox. | Counter | +| `outgoing` (`com.sap.cds.outbox.outgoingMessages`) | Messages successfully dispatched. | Counter | -Simply configure your messaging service for Node.js as cds.requires.messaging.inboxed = true and for CAP Java as cds.messaging.services=[{"name": "messaging-name", "inbox": {"enabled": true}}] +Metrics are scoped per microservice instance, outbox name, and tenant. The Java integration is built in. For Node.js, add `@cap-js/telemetry` to your dependencies and queue metrics start emitting alongside CAP's other telemetry signals. -**Inboxing moves the dead letter queue into your CAP app❗️** +[Learn more about Java OpenTelemetry integration.](../../java/operating-applications/observability#open-telemetry){.learn-more} +[Learn more about `@cap-js/telemetry`.](https://github.com/cap-js/telemetry#queue){.learn-more} -With the inbox, all messages are acknowledged with the message broker regardless of whether they can be processed or not. -Hence, failures need to be managed via the dead letter queue built on `cds.outbox.Messages`. -[Learn more about the dead letter queue in Node.js.](../../node.js/queue#managing-the-dead-letter-queue){.learn-more} -[Learn more about the dead letter queue in Java.](../../java/outbox#outbox-dead-letter-queue){.learn-more} +## Next Steps -Inboxing is especially beneficial in case the message broker does not allow to trigger redelivery and/ or "fix" the message payload. +For stack-specific APIs, configuration keys, and troubleshooting: +- [Event Queues in Node.js](../../node.js/event-queues) — `cds.queued`, `cds.unqueued`, `cds.flush`, `srv.schedule` (incl. `#succeeded` / `#failed` callbacks), queue configuration, troubleshooting. +- [Event Queues in Java](../../java/event-queues) — `OutboxService`, `AsyncCqnService`, custom outbox services, the technical outbox API, error-handling patterns, and event versioning for blue/green deployments. +Most real-world event-queue use comes through messaging or remote services. From here you'll likely want to look at: -## More to Come +- [Messaging](messaging) — emitting and consuming events between CAP applications and via brokers; messaging services are auto-outboxed. +- [CAP-Level Service Integration](../integration/calesi) — consuming remote services as if they were local; outboxing them centrally with `outboxed: true`. +- [CAP-Level Data Federation](../integration/data-federation) — using `srv.schedule().every()` for polling-based replication from remote services. -This documentation is not complete yet, or the APIs are not released for general availability. -Stay tuned to upcoming releases for further updates. diff --git a/guides/security/data-protection.md b/guides/security/data-protection.md index 1292969bbc..574b657755 100644 --- a/guides/security/data-protection.md +++ b/guides/security/data-protection.md @@ -576,7 +576,7 @@ Design your CDS services exposed to web adapters on need-to-know basis. Be espec #### CAP Service Runtime Open transactions are expensive as they bind many resources such as a database connection as well as memory buffers. -To minimize the amount of time a transaction must be kept open, the CAP runtime offers an [Outbox Service](../../java/outbox) that allows to schedule asynchronous remote calls in the business transaction. +To minimize the amount of time a transaction must be kept open, the CAP runtime offers an [Outbox Service](../../java/event-queues) that allows to schedule asynchronous remote calls in the business transaction. Hence, the request time to process a business query, which requires a remote call (such as to an audit log server or messaging broker), is minimized and independent from the response time of the remote service. ::: tip diff --git a/java/_menu.md b/java/_menu.md index 9846736087..624c2e5c23 100644 --- a/java/_menu.md +++ b/java/_menu.md @@ -16,11 +16,11 @@ ## [Indicating Errors](event-handlers/indicating-errors) ## [Request Contexts](event-handlers/request-contexts) ## [ChangeSet Contexts](event-handlers/changeset-contexts) +# [Event Queues](event-queues) # [Fiori Drafts](fiori-drafts) # [Messaging](messaging) # [Audit Logging](auditlog) # [Change Tracking](change-tracking) -# [Transactional Outbox](outbox) # [Multitenancy](multitenancy) # [Security](security) # [Spring Boot Integration](spring-boot-integration) diff --git a/java/auditlog.md b/java/auditlog.md index 68c85e338d..91f5cc1828 100644 --- a/java/auditlog.md +++ b/java/auditlog.md @@ -30,7 +30,7 @@ The following events can be emitted with the [AuditLogService](https://javadoc.i - [Configuration changes](#config-change) - [Security events](#security-event) -AuditLog events typically are bound to business transactions. In order to handle the events transactionally and also to decouple the request from outbound calls to a consumer, for example a central audit log service, the AuditLog service leverages the [outbox](./outbox) service internally which allows [deferred](#deferred) sending of events. +AuditLog events typically are bound to business transactions. In order to handle the events transactionally and also to decouple the request from outbound calls to a consumer, for example a central audit log service, the AuditLog service leverages the [outbox](./event-queues) service internally which allows [deferred](#deferred) sending of events. ### Use AuditLogService @@ -102,13 +102,11 @@ auditLogService.logSecurityEvent(action, data); ### Deferred AuditLog Events { #deferred} -Instead of processing the audit log events synchronously in the [audit log handler](#auditlog-handlers), the `AuditLogService` can store the event in the [outbox](./outbox). This is done in the *same* transaction of the business request. Hence, a cancelled business transaction will not send any audit log events that are bound to it. To gain fine-grained control, for example to isolate a specific event from the current transaction, you may refine the transaction scope. See [ChangeSetContext API](./event-handlers/changeset-contexts#defining-changeset-contexts) for more information. +Instead of processing the audit log events synchronously in the [audit log handler](#auditlog-handlers), the `AuditLogService` can store the event in the [outbox](./event-queues). This is done in the *same* transaction of the business request. Hence, a cancelled business transaction will not send any audit log events that are bound to it. To gain fine-grained control, for example to isolate a specific event from the current transaction, you may refine the transaction scope. See [ChangeSetContext API](./event-handlers/changeset-contexts#defining-changeset-contexts) for more information. As the stored events are processed asynchronously, the business request is also decoupled from the audit log handler which typically sends the events synchronously to a central audit log service. This improves resilience and performance. -By default, the outbox comes in an [in-memory](./outbox#in-memory) flavour which has the drawback that it can't guarantee that the all events are processed after the transaction has been successfully closed. - -To close this gap, a sophisticated [persistent outbox](./outbox#persistent) service can be configured. +The audit-log service uses the [`DefaultOutboxUnordered`](./event-queues#default-outbox-services) outbox by default. By default, not all events are send asynchronously via (persistent) outbox. * [Security events](#security-event) are always send synchronously. diff --git a/java/event-queues.md b/java/event-queues.md new file mode 100644 index 0000000000..c133f2efea --- /dev/null +++ b/java/event-queues.md @@ -0,0 +1,448 @@ +--- +synopsis: > + Java APIs and configuration for CAP's Transactional Event Queues — `OutboxService`, `AsyncCqnService`, scheduling, custom outbox services, error handling, and event versioning. +status: released +--- + +# Event Queues in Java + +For concepts, use cases, and guarantees, see the [Transactional Event Queues](../guides/events/event-queues) guide. This page covers the Java-specific APIs and configuration on top of that. + +In Java, event queues are exposed as **outbox services**. The runtime ships two default outboxes — `DefaultOutboxOrdered` (used by messaging services; processes entries in submission order) and `DefaultOutboxUnordered` (used by the audit-log service; may process entries in parallel). You can register custom outbox services for advanced isolation, scaling, or shared-database scenarios. + +[[toc]] + + +## Programmatic API + +### Outboxing a Service + +Wrap any CAP service with outbox handling. Events triggered on the returned wrapper are stored in the outbox first and executed asynchronously after commit. Relevant information from the `RequestContext` is stored with the event data; the user context is downgraded to a system user context. + +```java +OutboxService myCustomOutbox = ...; +CqnService remoteS4 = ...; +CqnService outboxedS4 = myCustomOutbox.outboxed(remoteS4); +``` + +If a method on the outboxed service has a return value, it returns `null` — the call is asynchronous. To make this explicit at the type level, use the variant that wraps the service with an asynchronous-suited API: + +```java +OutboxService myCustomOutbox = ...; +CqnService remoteS4 = ...; +AsyncCqnService outboxedS4 = myCustomOutbox.outboxed(remoteS4, AsyncCqnService.class); +``` + +`AsyncCqnService.of()` is a convenience for the common case: + +```java +OutboxService myCustomOutbox = ...; +CqnService remoteS4 = ...; +AsyncCqnService outboxedS4 = AsyncCqnService.of(remoteS4, myCustomOutbox); +``` + +The outboxed service is thread-safe and can be cached. Any service that implements the `Service` interface can be outboxed, and each call is asynchronously executed if the API method internally calls `Service.emit(EventContext)`. + +To recover the synchronous service from a wrapped one: + +```java +CqnService synchronous = OutboxService.unboxed(outboxedS4); +``` + +::: tip Custom asynchronous-suited APIs +When defining your own asynchronous-suited interface, it must provide the same method signatures as the interface of the outboxed service, except for the return types — those should be `void`. +::: + +::: warning Java Proxy +A service wrapped by an outbox is a [Java Proxy](https://docs.oracle.com/javase/8/docs/technotes/guides/reflection/proxy.html). It only implements the *interfaces* of the underlying object — you can't cast an outboxed service proxy back to its concrete implementation class. +::: + + +### Scheduling + +CAP Java offers two ways to schedule a queued event, both controlled by a `Schedule` builder. + +**Option 1 — pass a `Schedule` to `submit`** on a regular outbox, per call: + +```java +@Autowired @Qualifier("DefaultOutboxUnordered") +OutboxService outbox; + +OutboxMessage message = OutboxMessage.create(); +message.setParams(Map.of("entity", "Airports")); + +outbox.submit("replicate", message, + Schedule.create().taskName("replicate-airports") + .every(Duration.ofMinutes(10))); +``` + +**Option 2 — wrap a service with `Schedulable`** so all subsequent emits use a fixed schedule: + +```java +@Autowired @Qualifier("DefaultOutboxUnordered") +OutboxService outbox; + +@Autowired +RemoteService xflights; + +Schedulable scheduled = Schedulable.of(xflights, outbox) + .scheduled(Schedule.create().taskName("replicate-airports") + .every(Duration.ofMinutes(10))); + +scheduled.emit("replicate", Map.of("entity", "Airports")); +``` + +Every outboxed service is guaranteed to implement `Schedulable` — its single method `scheduled(Schedule)` returns the same service typed to use the given schedule on every subsequent emit. + +#### `Schedule` Options + +`Schedule` is a small builder with three mutually exclusive timing options: + +```java +// Execute once, after a delay +Schedule.create().taskName("cleanup").after(Duration.ofHours(1)); + +// Execute repeatedly, with a fixed delay between successful runs +Schedule.create().taskName("replicate-airports").every(Duration.ofMinutes(10)); + +// Execute repeatedly, on a Spring cron expression +Schedule.create().taskName("nightly-cleanup").cron("0 0 3 * * *"); +``` + +`after` and `every` accept any [`java.time.Duration`](https://docs.oracle.com/javase/8/docs/api/java/time/Duration.html). `cron` follows the [Spring cron syntax](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/scheduling/support/CronExpression.html) (six fields including seconds; differs from Node.js's five-field cron). The three are mutually exclusive — combining them throws `IllegalArgumentException`. + +A scheduled task with a name is a **singleton**: a subsequent submission with the same task name overwrites the previous schedule (tasks are upserted, not deduplicated). This makes scheduling idempotent — convenient during application startup, where the same registration code may run on every boot. Without a task name, every submission creates a new scheduled entry. + +To remove a previously scheduled task, submit a cancellation with the same task name: + +```java +outbox.submit("replicate", OutboxMessage.create(), + Schedule.create().taskName("replicate-airports").cancel()); +``` + +Cancellation requires a task name — it's how the runtime locates the entry to delete. + + +### Technical Outbox API + +The technical API outboxes custom messages for arbitrary events or processing logic. The `OutboxMessage` instance is serialized to JSON and stored in the database, so all data must be JSON-serializable. + +```java +OutboxService outboxService = runtime.getServiceCatalog() + .getService(OutboxService.class, ""); + +OutboxMessage message = OutboxMessage.create(); +message.setParams(Map.of("name", "John", "lastname", "Doe")); + +outboxService.submit("myEvent", message); +``` + +Register an `@On` handler on the outbox service to perform the processing logic when the message is published: + +```java +@On(service = "", event = "myEvent") +void processMyEvent(OutboxMessageEventContext context) { + OutboxMessage message = context.getMessage(); + Map params = message.getParams(); + String name = (String) params.get("name"); + String lastname = (String) params.get("lastname"); + + // Perform processing logic for myEvent + + context.setCompleted(); +} +``` + +The handler must complete the context after executing the processing logic. + +[Learn more about event handlers.](./event-handlers/){.learn-more} + + +#### Custom Serialization + +The outbox has no information about the structure or data types being serialized. If your custom messages use non-default data types — or you need extra context properties — register `@Before` and `@On` handlers to customize serialization and deserialization. *(This isn't required for CDS-model-based services.)* + +```java [srv/src/main/java/com/myapp/CustomOutboxHandler.java] +@Component +@ServiceName(value = "*", type = OutboxService.class) +public class CustomOutboxHandler implements EventHandler { + + @On + void publishedByOutbox(OutboxMessageEventContext context) { + // Restore custom values from context only + if (Boolean.FALSE.equals(context.getIsInbound())) { + return; + } + + // custom deserialization logic + Long date = (Long) context.getMessage().getParams().get("orderDate"); + context.getMessage().getParams().put("orderDate", Instant.ofEpochSecond(date)); + } + + @Before(event = "*") + void prepareOutboxMessage(OutboxMessageEventContext context) { + // prepare outbox message for storage only + if (Boolean.TRUE.equals(context.getIsInbound())) { + return; + } + + // custom serialization logic + Instant date = (Instant) context.getMessage().getParams().get("orderDate"); + context.getMessage().getParams().put("orderDate", date.getEpochSecond()); + } +} +``` + +> [!warning] Don't complete the context in either of those handlers +> Calling `setCompleted` here breaks the chain — the next handler isn't called and processing fails. + + +### Error Handling + +By default the outbox retries publishing a message on error until it reaches `maxAttempts`. This makes applications resilient against unavailability of external systems. + +Some errors aren't worth retrying — for example, a `400 Bad Request` from a downstream service indicates a *semantic* error that the same payload will reproduce on every attempt. Wrap the processing in a try/catch and call `context.setCompleted()` to remove the message from the queue without further retries: + +```java +@On(service = "", event = "myEvent") +void processMyEvent(OutboxMessageEventContext context) { + try { + // Perform processing logic for myEvent + } catch (Exception e) { + if (isUnrecoverableSemanticError(e)) { + // Perform application-specific counter-measures + context.setCompleted(); // indicate message deletion to outbox + } else { + throw e; // indicate error to outbox + } + } +} +``` + +If the original processing logic isn't yours and you need to wrap its error handling, use `EventContext.proceed()`: + +```java +@On(service = OutboxService.PERSISTENT_ORDERED_NAME, event = AuditLogService.DEFAULT_NAME) +void handleAuditLogProcessingErrors(OutboxMessageEventContext context) { + try { + context.proceed(); // wrap default logic + } catch (Exception e) { + if (isUnrecoverableSemanticError(e)) { + // Perform application-specific counter-measures + context.setCompleted(); + } else { + throw e; + } + } +} +``` + +[Learn more about `EventContext.proceed()`.](./event-handlers/#proceed-on){.learn-more} + +> [!note] Callbacks not yet available +> The `#succeeded` / `#failed` callback events documented for Node.js have no Java equivalent yet — see [Callbacks](../guides/events/event-queues#callbacks) in the common guide. + + +## Configuration + +### Default Outbox Services + +CAP Java ships two default outbox services: + +- **`DefaultOutboxOrdered`** — used by [messaging services](messaging) by default. Processes entries in submission order. +- **`DefaultOutboxUnordered`** — used by the [AuditLog service](auditlog) by default. May process entries in parallel across application instances. + +The configuration of both can be overridden in *application.yaml*: + +::: code-group +```yaml [srv/src/main/resources/application.yaml] +cds: + outbox: + services: + DefaultOutboxOrdered: + maxAttempts: 10 + DefaultOutboxUnordered: + maxAttempts: 10 +``` +::: + +| Option | Default | Description | +|---|---|---| +| `maxAttempts` | `10` | Number of unsuccessful emits until the message is ignored. It still remains in the database. | +| `enabled` | `true` | Set to `false` to disable an outbox service. | + +A separate, runtime-global setting controls how long a `processing` entry can be held before another instance may pick it up — useful when an instance crashes mid-processing: + +```yaml [srv/src/main/resources/application.yaml] +cds: + outbox: + persistent: + statusLock: + timeout: PT1H # default +``` + +The outbox stores the last error in the `lastError` element of `cds.outbox.Messages`. + + +### Custom Outbox Services + +Configure custom persistent outboxes in *application.yaml*: + +::: code-group +```yaml [srv/src/main/resources/application.yaml] +cds: + outbox: + services: + MyCustomOutbox: + maxAttempts: 5 + MyOtherCustomOutbox: + maxAttempts: 10 +``` +::: + +Access them either via the service catalog: + +```java +OutboxService myCustomOutbox = cdsRuntime.getServiceCatalog() + .getService(OutboxService.class, "MyCustomOutbox"); +``` + +or by Spring injection: + +```java +@Component +public class MySpringComponent { + private final OutboxService myCustomOutbox; + + public MySpringComponent(@Qualifier("MyCustomOutbox") OutboxService myCustomOutbox) { + this.myCustomOutbox = myCustomOutbox; + } +} +``` + +::: warning Removing a custom outbox +Before removing a custom outbox from the configuration, ensure no unprocessed entries remain in `cds.outbox.Messages` for it. Removing the outbox configuration does not delete the entries — they remain in the table and aren't processed anymore. +::: + + +### Shared Databases + +> [!warning] Workaround for unsupported scenario +> CAP Java does not yet support microservices with a shared database out of the box: the two static-named default outboxes (`DefaultOutboxOrdered`, `DefaultOutboxUnordered`) would be shared across all services and introduce conflicts. + +The manual workaround uses isolated custom outboxes with service-specific names: + +**1. Deactivate the default outboxes and create service-specific ones** + +```yaml +cds: + outbox: + services: + # deactivate default outboxes + DefaultOutboxUnordered.enabled: false + DefaultOutboxOrdered.enabled: false + # custom outboxes with unique names + Service1CustomOutboxOrdered: + maxAttempts: 10 + Service1CustomOutboxUnordered: + maxAttempts: 10 +``` + +**2. Adapt audit log configuration** + +The default audit-log outbox is `DefaultOutboxUnordered`. Point it at the new custom outbox: + +```yaml +cds: + auditlog: + outbox.name: Service1CustomOutboxUnordered +``` + +**3. Adapt messaging configuration** + +For *each* messaging service in the application, point it at the new ordered outbox: + +```yaml +cds: + messaging: + services: + MessagingService1: + outbox.name: Service1CustomOutboxOrdered + MessagingService2: + outbox.name: Service1CustomOutboxOrdered +``` + +::: tip Important +Both deactivating the defaults *and* using unique outbox namespaces are required to achieve service isolation in a shared-DB scenario. +::: + + +### Event Versions + +In blue/green scenarios, outbox collectors of an older deployment may not be able to process events emitted by a newer deployment. Configure each deployment with an *event version* so older collectors skip newer events: + +[`cds.environment.deployment.version: 2`](./developing-applications/properties#cds-environment-deployment-version) + +::: warning Ascending versions only +Configured deployment versions must increase. Messages are processed by an outbox collector only if the event version is less than or equal to the deployment version. +::: + +To automate versioning from the Maven app version, enable resource filtering in *srv/pom.xml*: + +::: code-group +```xml [srv/pom.xml] + + ... + + + src/main/resources + true + + + ... +``` +::: + +Then use the `${project.version}` placeholder: + +[`cds.environment.deployment.version: ${project.version}`](./developing-applications/properties#cds-environment-deployment-version) + +A startup log entry shows the configured version: + +```bash +2024-12-19T11:21:33.253+01:00 INFO 3420 --- [main] cds.services.impl.utils.BuildInfo : application.deployment.version: 1.0.0-SNAPSHOT +``` + +To opt a specific custom outbox out of the version check entirely, set [`cds.outbox.services.MyCustomOutbox.checkVersion: false`](./developing-applications/properties#cds-outbox-services--checkVersion). + + +## Troubleshooting + +### Inspecting `cds.outbox.Messages` + +To see what's currently queued, query `cds.outbox.Messages` directly through the `PersistenceService`. The columns most useful for triage are `status`, `attempts`, `target`, `lastError`, and `lastAttemptTimestamp`: + +```java +@Autowired @Qualifier(PersistenceService.DEFAULT_NAME) +PersistenceService db; + +Result result = db.run(Select.from(Messages_.class) + .columns(m -> m.ID(), m -> m.target(), m -> m.status(), + m -> m.attempts(), m -> m.lastAttemptTimestamp(), m -> m.lastError()) + .orderBy(m -> m.timestamp().desc())); +``` + +For a managed view with bound *revive* and *delete* actions, see [*Dead Letter Queue*](../guides/events/event-queues#dead-letter-queue) in the common guide. + +### Deleting Entries + +To clear stuck messages programmatically: + +```java +db.run(Delete.from(Messages_.class)); +``` + + +--- + +Working in Node.js? See [Event Queues in Node.js](../node.js/event-queues). diff --git a/java/messaging.md b/java/messaging.md index 3d1ff977df..0f101c9107 100644 --- a/java/messaging.md +++ b/java/messaging.md @@ -85,9 +85,9 @@ As shown in the example, there are two flavors of sending messages with the mess In section [CDS-Declared Events](#cds-declared-events), we show how to declare events in CDS models and by this let CAP generate EventContext interfaces especially tailored for the defined payload, that allows type safe access to the payload. ::: tip Using an outbox -The messages are sent once the transaction is successful. Per default, an in-memory outbox is used, but there's also support for a [persistent outbox](./outbox#persistent). +The messages are sent once the transaction is successful. Per default, an in-memory outbox is used, but there's also support for a [persistent outbox](./event-queues#default-outbox-services). -You can configure a [custom outbox](./outbox#custom-outboxes) for a messaging service by setting the property +You can configure a [custom outbox](./event-queues#custom-outbox-services) for a messaging service by setting the property `cds.messaging.services..outbox.name` to the name of the custom outbox. This specifically makes sense when [using multiple channels](../guides/events/messaging#using-multiple-channels). ::: diff --git a/java/outbox.md b/java/outbox.md deleted file mode 100644 index 0cba54d407..0000000000 --- a/java/outbox.md +++ /dev/null @@ -1,576 +0,0 @@ ---- -synopsis: > - Find here information about the Outbox service in CAP Java. -status: released ---- - -# Transactional Outbox - - -{{ $frontmatter.synopsis }} - -## Concepts - -Usually the emit of messages should be delayed until the main transaction succeeded, otherwise recipients also receive messages in case of a rollback. -To solve this problem, a transactional outbox can be used to defer the emit of messages until the success of the current transaction. - -The outbox is typically not used directly, but rather through the [messaging service](../java/messaging), the [AuditLog service](../java/auditlog) or to [outbox CAP service events](#outboxing-cap-service-events). - -## In-Memory Outbox (Default) { #in-memory} - -The in-memory outbox is used per default and the messages are emitted when the current transaction is successful. Until then, messages are kept in memory. - - -## Persistent Outbox { #persistent} - -The persistent outbox requires a persistence layer to persist the messages before emitting them. Here, the to-be-emitted message is stored in a database table first. The same database transaction is used as for other operations, therefore transactional consistency is guaranteed. - -Once the transaction succeeds, the messages are read from the database table and are emitted. - -- If an emit was successful, the respective message is deleted from the database table. -- If an emit wasn't successful, there will be a retry after some (exponentially growing) waiting time. After a maximum number of attempts, the message is ignored for processing and remains in the database table. Even if the app crashes the messages can be redelivered after successful application startup. - -To enable the persistence for the outbox, you need to add the service `outbox` of kind `persistent-outbox` to the `cds.requires` section in the _package.json_ or _cdsrc.json_, which will automatically enhance your CDS model in order to support the persistent outbox. - -```jsonc -{ - // ... - "cds": { - "requires": { - "outbox": { - "kind": "persistent-outbox" - } - } - } -} -``` - -::: warning -Be aware that you need to migrate the database schemas of all tenants after you've enhanced your model with an outbox version from `@sap/cds` version 6.0.0 or later. -::: - -For a multitenancy scenario, make sure that the required configuration is also done in the MTX sidecar service. Make sure that the base model in all tenants is updated to activate the outbox. - -::: info Option: Add outbox to your base model -Alternatively, you can add `using from '@sap/cds/srv/outbox';` to your base model. In this case, you need to update the tenant models after deployment but you don't need to update MTX Sidecar. -::: - -If enabled, CAP Java provides two persistent outbox services by default: - -- `DefaultOutboxOrdered` - is used by default by [messaging services](../java/messaging) -- `DefaultOutboxUnordered` - is used by default by the [AuditLog service](../java/auditlog) - -The default configuration for both outboxes can be overridden using the `cds.outbox.services` section, for example in the _application.yaml_: -::: code-group -```yaml [srv/src/main/resources/application.yaml] -cds: - outbox: - services: - DefaultOutboxOrdered: - maxAttempts: 10 - # ordered: true - DefaultOutboxUnordered: - maxAttempts: 10 - # ordered: false -``` -::: -You have the following configuration options: -- `maxAttempts` (default `10`): The number of unsuccessful emits until the message is ignored. It still remains in the database table. -- `ordered` (default `true`): If this flag is enabled, the outbox instance processes the entries in the order they have been submitted to it. Otherwise, the outbox may process entries randomly and in parallel, by leveraging outbox processors running in multiple application instances. This option can't be changed for the default persistent outboxes. - -The persistent outbox stores the last error that occurred, when trying to emit the message of an entry. The error is stored in the element `lastError` of the entity `cds.outbox.Messages`. - -### Configuring Custom Outboxes { #custom-outboxes} - -Custom persistent outboxes can be configured using the `cds.outbox.services` section, for example in the _application.yaml_: -::: code-group -```yaml [srv/src/main/resources/application.yaml] -cds: - outbox: - services: - MyCustomOutbox: - maxAttempts: 5 - MyOtherCustomOutbox: - maxAttempts: 10 -``` -::: -Afterward you can access the outbox instances from the service catalog: - -```java -OutboxService myCustomOutbox = cdsRuntime.getServiceCatalog().getService(OutboxService.class, "MyCustomOutbox"); -OutboxService myOtherCustomOutbox = cdsRuntime.getServiceCatalog().getService(OutboxService.class, "MyOtherCustomOutbox"); -``` - -Alternatively it's possible to inject them into a Spring component: - -```java -@Component -public class MySpringComponent { - private final OutboxService myCustomOutbox; - - public MySpringComponent(@Qualifier("MyCustomOutbox") OutboxService myCustomOutbox) { - this.myCustomOutbox = myCustomOutbox; - } -} -``` - -::: warning When removing a custom outbox ... -... it must be ensured that there are no unprocessed entries left. - -Removing a custom outbox from the `cds.outbox.services` section doesn't remove the -entries from the `cds.outbox.Messages` table. The entries remain in the `cds.outbox.Messages` table and aren't -processed anymore. - -::: - -### Outbox Event Versions - -In scenarios with multiple deployment versions (blue/green), situations may arise in which the outbox collectors of the older deployment cannot process the events generated by a newer deployment. In this case, the event can get stuck in the outbox, with all the resulting problems. - -To avoid this problem, you can configure the outbox to use an event version that prevents the outbox collectors from using the newer events. For this purpose, you can set the parameter [cds.environment.deployment.version: 2](../java/developing-applications/properties#cds-environment-deployment-version). - -::: warning Ascending Versions -The configured deployment versions must be in ascending order. The messages are only processed by the outbox collector if the event version is less than or equal to the deployment version. -::: - -To make things easier, you can automate versioning by using the Maven app version. This requires you to increment the version for each new deployment. - -To do this, the Maven `resource.filtering` configuration in the `srv/pom.xml` must be activated as follows, so that the app version placeholder `${project.version}` can be used in [cds.environment.deployment.version: ${project.version}](../java/developing-applications/properties#cds-environment-deployment-version). - -::: code-group -```xml [srv/pom.xml] - - ... - - - src/main/resources - true - - - ... -``` -::: - -To be sure that the deployment version has been set correctly, you can find a log entry at startup that shows the configured version: - -```bash -2024-12-19T11:21:33.253+01:00 INFO 3420 --- [main] cds.services.impl.utils.BuildInfo : application.deployment.version: 1.0.0-SNAPSHOT -``` - -And finally, if for some reason you don't want to use a version check for a particular outbox collector, you can switch it off via the outbox configuration [cds.outbox.services.MyCustomOutbox.checkVersion: false](../java/developing-applications/properties#cds-outbox-services--checkVersion). - -### Outbox for Shared Databases - -Currently, CAP Java does not yet support microservices with shared database out of the box, as this can lead to unexpected behavior when different isolated services use the same outboxes. -Since CAP automatically creates two outboxes with a static name — **DefaultOutboxOrdered** and **DefaultOutboxUnordered** — these would be shared across all services which introduces conflicts. - -To avoid this, you can apply a manual workaround as follows: - - 1. Customize the outbox configuration and isolating them via distinct namespaces for each service. - 2. Adapt the Audit Log outbox configuration. - 3. Adapt the messaging outbox configuration per service. - - These steps are described in the following sections. - -#### Deactivate Default Outboxes - -First, deactivate the two default outboxes and create custom outboxes with configurations tailored to your needs. - -```yaml -cds: - outbox: - services: - # deactivate default outboxes - DefaultOutboxUnordered.enabled: false - DefaultOutboxOrdered.enabled: false - # custom outboxes with unique names - Service1CustomOutboxOrdered: - maxAttempts: 10 - storeLastError: true - ordered: true - Service1CustomOutboxUnordered: - maxAttempts: 10 - storeLastError: true - ordered: false - -``` - -#### Adapt Audit Log Configuration - -The **DefaultOutboxUnordered** outbox is automatically used for audit logging. Therefore, you must update the audit log configuration to point to the custom one. - -```yaml -cds: - ... - auditlog: - outbox.name: Service1CustomOutboxUnordered -``` - -#### Adapt Messaging Configuration - -Next, adapt the messaging configuration of **every** messaging service in the application so that they use the custom-defined outboxes. - -```yaml -cds: - messaging: - services: - MessagingService1: - outbox.name: Service1CustomOutboxOrdered - MessagingService2: - outbox.name: Service1CustomOutboxOrdered -``` - - -::: tip Important Note -It is crucial to **deactivate** the default outboxes, and ensure **unique outbox namespaces** in order to achieve proper isolation between services in a shared DB scenario. -::: - - -## Outboxing CAP Service Events - -Outbox services support outboxing of arbitrary CAP services. A typical use case is to outbox remote OData -service calls, but also calls to other CAP services can be decoupled from the business logic flow. - -The API `OutboxService.outboxed(Service)` is used to wrap services with outbox handling. Events triggered -on the returned wrapper are stored in the outbox first, and executed asynchronously. Relevant information from -the `RequestContext` is stored with the event data, however the user context is downgraded to a system user context. - -The following example shows you how to outbox a service: - -```java -OutboxService myCustomOutbox = ...; -CqnService remoteS4 = ...; -CqnService outboxedS4 = myCustomOutbox.outboxed(remoteS4); -``` - -If a method on the outboxed service has a return value, it will always return `null` since it's executed asynchronously. A common example for this are the `CqnService.run(...)` methods. -To improve this the API `OutboxService.outboxed(Service, Class)` can be used, which wraps a service with an asynchronous suited API while outboxing it. -This can be used together with the interface `AsyncCqnService` to outbox remote OData services: - -```java -OutboxService myCustomOutbox = ...; -CqnService remoteS4 = ...; -AsyncCqnService outboxedS4 = myCustomOutbox.outboxed(remoteS4, AsyncCqnService.class); -``` - -The method `AsyncCqnService.of()` can be used alternatively to achieve the same for CqnServices: - -```java -OutboxService myCustomOutbox = ...; -CqnService remoteS4 = ...; -AsyncCqnService outboxedS4 = AsyncCqnService.of(remoteS4, myCustomOutbox); -``` - -::: tip Custom asynchronous suited API -When defining your own custom asynchronous suited API, the interface must provide the same method signatures as the interface of the outboxed service, except for the return types which should be `void`. -::: - -The outboxed service is thread-safe and can be cached. -Any service that implements the `Service` interface can be outboxed. -Each call to the outboxed service is asynchronously executed, if the API method internally calls the method `Service.emit(EventContext)`. - -A service wrapped by an outbox can be unboxed by calling the API `OutboxService.unboxed(Service)`. Method calls to the unboxed -service are executed synchronously without storing the event in an outbox. - -::: warning Java Proxy -A service wrapped by an outbox is a [Java Proxy](https://docs.oracle.com/javase/8/docs/technotes/guides/reflection/proxy.html). Such a proxy only implements the _interfaces_ of the object that it's wrapping. This means an outboxed service proxy can't be casted to the class implementing the underlying service object. -::: - -::: tip Custom outbox for scaling -The default outbox services can be used for outboxing arbitrary CAP services. If you detect a scaling issue, -you can define custom outboxes that can be used for outboxing. -::: - -## Technical Outbox API { #technical-outbox-api } - -Outbox services provide the technical API `OutboxService.submit(String, OutboxMessage)` that can be used to outbox custom messages for an arbitrary event or processing logic. -When submitting a custom message, an `OutboxMessage` that can optionally contain parameters for the event needs to be provided. -As the `OutboxMessage` instance is serialized and stored in the database, all data provided in that message -must be serializable and deserializable to/from JSON. The following example shows the submission of a custom message to an outbox: - -```java -OutboxService outboxService = runtime.getServiceCatalog().getService(OutboxService.class, ""); - -OutboxMessage message = OutboxMessage.create(); -message.setParams(Map.of("name", "John", "lastname", "Doe")); - -outboxService.submit("myEvent", message); -``` - -A handler for the custom message must be registered on the outbox service. This handler performs the processing logic when the message is published by the outbox: - -```java -@On(service = "", event = "myEvent") -void processMyEvent(OutboxMessageEventContext context) { - OutboxMessage message = context.getMessage(); - Map params = message.getParams(); - String name = (String) param.get("name"); - String lastname = (String) param.get("lastname"); - - // Perform processing logic for myEvent - - context.setCompleted(); -} -``` - -You must ensure that the handler is completing the context, after executing the processing logic. - -[Learn more about event handlers.](./event-handlers/){.learn-more} - -::: tip Customizing Outbox Entries - -The outbox has no information regarding the structure and the data types that -shall be serialized and deserialized to and from the outbox. - -Special handling is needed to avoid serialization and deserialization errors in custom outbox handlers if custom data types are used, **or** if additional context properties are required. _Special handling isn't required for CDS model-based services._ - -```java [srv/src/main/java/com/myapp/CustomOutboxHandler.java] -@Component -@ServiceName(value = "*", type = OutboxService.class) -public class CustomOutboxHandler implements EventHandler { - - @On - void publishedByOutbox(OutboxMessageEventContext context) { - // Restore custom values from context only - if (Boolean.FALSE.equals(context.getIsInbound())) { - return; - } - - // custom deserialization logic - Long date = (Long) context.getMessage().getParams().get("orderDate"); - context.getMessage().getParams().put("orderDate", Instant.ofEpochSecond(date)); - } - - @Before(event = "*") - void prepareOutboxMessage(OutboxMessageEventContext context) { - // prepare outbox message for storage only - if (Boolean.TRUE.equals(context.getIsInbound())) { - return; - } - - // custom serialization logic - Instant date = (Instant) context.getMessage().getParams().get("orderDate"); - context.getMessage().getParams().put("orderDate", new Long(date.getEpochSecond())); - } -} -``` - -**Don't complete the context in any of those two handlers, otherwise other -handlers aren't called and functionality is broken.** - -::: - -## Handling Outbox Errors { #handling-outbox-errors } - -The outbox by default retries publishing a message, if an error occurs during processing, until the message has reached the maximum number of attempts. -This behavior makes applications resilient against unavailability of external systems, which is a typical use case for outbox message processing. - -However, there might also be situations in which it is not reasonable to retry publishing a message. -For example, when the processed message causes a semantic error - typically due to a `400 Bad request` - on the external system. -Outbox messages causing such errors should be removed from the outbox message table before reaching the maximum number of retry attempts and instead application-specific -counter-measures should be taken to correct the semantic error or ignore the message altogether. - -A simple try-catch block around the message processing can be used to handle errors: -- If an error should cause a retry, the original exception should be (re)thrown (default behavior). -- If an error should not cause a retry, the exception should be suppressed and additional steps can be performed. - -```java -@On(service = "", event = "myEvent") -void processMyEvent(OutboxMessageEventContext context) { - try { - // Perform processing logic for myEvent - } catch (Exception e) { - if (isUnrecoverableSemanticError(e)) { - // Perform application-specific counter-measures - context.setCompleted(); // indicate message deletion to outbox - } else { - throw e; // indicate error to outbox - } - } -} -``` - -In some situations, the original outbox processing logic is not implemented by you but the processing needs to be extended with additional error handling. -In that case, wrap the `EventContext.proceed()` method, which executes the underlying processing logic: - -```java -@On(service = OutboxService.PERSISTENT_ORDERED_NAME, event = AuditLogService.DEFAULT_NAME) -void handleAuditLogProcessingErrors(OutboxMessageEventContext context) { - try { - context.proceed(); // wrap default logic - } catch (Exception e) { - if (isUnrecoverableSemanticError(e)) { - // Perform application-specific counter-measures - context.setCompleted(); // indicate message deletion to outbox - } else { - throw e; // indicate error to outbox - } - } -} -``` - -[Learn more about `EventContext.proceed()`.](./event-handlers/#proceed-on){.learn-more} - -## Outbox Dead Letter Queue - -The transactional outbox tries to process each entry a specific number of times. The number of attempts is configurable per outbox by setting the configuration `cds.outbox.services..maxAttempts`. - -[Learn more about CDS Properties.](./developing-applications/properties){.learn-more} - -Once the maximum number of attempts is exceeded, the corresponding entry is not touched anymore and hence it can be regarded as dead. Dead outbox entries are not deleted automatically. They remain in the database and it's up to the application to take care of the entries. By defining a CDS service, the dead entries can be managed conveniently. Let's have a look, how you can develop a Dead Letter Queue for the transactional outbox. - -::: warning Changing configuration between deployments - -It's possible to increase the value of the configuration `cds.outbox.services..maxAttempts` in between of deployments. Older entries which have reached their max attempts in the past would be retried automatically after deployment of the new microservice version. If the dead letter queue has a large size, this leads to unintended load on the system. - -::: - - -### Define the Service - -::: code-group - -```cds [srv/outbox-dead-letter-queue-service.cds] -using from '@sap/cds/srv/outbox'; - -@requires: 'internal-user' -service OutboxDeadLetterQueueService { - - @readonly - entity DeadOutboxMessages as projection on cds.outbox.Messages - actions { - action revive(); - action delete(); - }; - -} -``` - -::: - -The `OutboxDeadLetterQueueService` provides an entity `DeadOutboxMessages` which is a projection on the outbox table `cds.outbox.Messages` that has two bound actions: - -- `revive()` sets the number of attempts to `0` such that the outbox entry is going to be processed again. -- `delete()` deletes the outbox entry from the database. - -Filters can be applied as for any other CDS defined entity, for example, to filter for a specific outbox where the outbox name is stored in the field `target` of the entity `cds.outbox.Messages`. - -::: warning `OutboxDeadLetterQueueService` for internal users only - -It is crucial to make the service `OutboxDeadLetterQueueService` accessible for internal users only as it contains sensitive data that could be exploited for malicious purposes if unauthorized changes are performed. - -[Learn more about pseudo roles](../guides/security/cap-users#pseudo-roles){.learn-more} - -::: - -### Reading Dead Entries - -Filtering the dead entries is done by adding an appropriate `where`-clause to all `READ`-queries which matches all outbox message entries that have been retried for the maximum number of times. The following code provides an example handler implementation defining this behavior for the `DeadLetterQueueService`: - -```java -@Component -@ServiceName(OutboxDeadLetterQueueService_.CDS_NAME) -public class DeadOutboxMessagesHandler implements EventHandler { - - private final PersistenceService db; - - public DeadOutboxMessagesHandler(@Qualifier(PersistenceService.DEFAULT_NAME) PersistenceService db) { - this.db = db; - } - - @Before(entity = DeadOutboxMessages_.CDS_NAME) - public void addDeadEntryFilter(CdsReadEventContext context) { - Optional outboxFilters = this.createOutboxFilters(context.getCdsRuntime()); - - if (outboxFilters.isPresent()) { - CqnSelect modifiedCqn = - copy( - context.getCqn(), - new Modifier() { - @Override - public CqnPredicate where(Predicate where) { - return outboxFilters.get().and(where); - } - }); - context.setCqn(modifiedCqn); - } - } - - private Optional createOutboxFilters(CdsRuntime runtime) { - CdsProperties.Outbox outboxConfigs = runtime.getEnvironment().getCdsProperties().getOutbox(); - - return runtime.getServiceCatalog().getServices(OutboxService.class) - .map(service -> { - OutboxServiceConfig config = outboxConfigs.getService(service.getName()); - return CQL.get(Messages.TARGET).eq(service.getName()) - .and(CQL.get(Messages.ATTEMPTS).ge(config.getMaxAttempts())); - }) - .reduce(Predicate::or); - } -} -``` - -[Learn more about event handlers.](./event-handlers/){.learn-more} - -### Implement Bound Actions - -```java -@Autowired -@Qualifier(PersistenceService.DEFAULT_NAME) -private PersistenceService db; - -@On -public void reviveOutboxMessage(DeadOutboxMessagesReviveContext context) { - CqnAnalyzer analyzer = CqnAnalyzer.create(context.getModel()); - AnalysisResult analysisResult = analyzer.analyze(context.getCqn()); - Map key = analysisResult.rootKeys(); - Messages deadOutboxMessage = Messages.create((String) key.get(Messages.ID)); - - deadOutboxMessage.setAttempts(0); - - this.db.run(Update.entity(Messages_.class).entry(key).data(deadOutboxMessage)); - context.setCompleted(); -} - -@On -public void deleteOutboxEntry(DeadOutboxMessagesDeleteContext context) { - CqnAnalyzer analyzer = CqnAnalyzer.create(context.getModel()); - AnalysisResult analysisResult = analyzer.analyze(context.getCqn()); - Map key = analysisResult.rootKeys(); - - this.db.run(Delete.from(Messages_.class).byId(key.get(Messages.ID))); - context.setCompleted(); -} -``` - -The injected `PersistenceService` instance is used to perform the operations on the `Messages` entity since the entity `DeadOutboxMessages` is read-only. Both handlers first retrieve the ID of the entry and then they perform the corresponding operation on the database. - -[Learn more about CQL statement inspection.](./working-with-cql/query-introspection#cqnanalyzer){.learn-more} - -::: tip Use paging logic -Avoid reading all outbox entries at once in case entries which have large request payloads are present. Prefer `READ`-queries with paging instead. -::: - -## Observability using Open Telemetry - -The transactional outbox integrates Open Telemetry for logging telemetry data. - -[Learn more about observability with Open Telemetry.](./operating-applications/observability#open-telemetry){.learn-more} - -The following KPIs are logged in addition to the spans described in the [observability chapter](./operating-applications/observability): - -| KPI Name | Description | KPI Type | -| ------------------------------------------ | ------------------------------------------------------------------------------------------------------ | -------- | -| `com.sap.cds.outbox.coldEntries` | Number of entries that could not be delivered after repeated attempts and will not be retried anymore. | Gauge | -| `com.sap.cds.outbox.remainingEntries` | Number of entries which are pending for delivery. | Gauge | -| `com.sap.cds.outbox.maxStorageTimeSeconds` | Maximum time in seconds an entry was residing in the outbox. | Gauge | -| `com.sap.cds.outbox.medStorageTimeSeconds` | Median time in seconds of an entry stored in the outbox." | Gauge | -| `com.sap.cds.outbox.minStorageTimeSeconds` | Minimal time in seconds an entry was stored in the outbox. | Gauge | -| `com.sap.cds.outbox.incomingMessages` | Number of incoming messages of the outbox. | Counter | -| `com.sap.cds.outbox.outgoingMessages` | Number of outgoing messages of the outbox. | Counter | - -The KPIs are logged per microservice instance (in case of horizontal scaling), outbox, and tenant. diff --git a/node.js/_menu.md b/node.js/_menu.md index bca3500453..d74179cdbf 100644 --- a/node.js/_menu.md +++ b/node.js/_menu.md @@ -39,7 +39,6 @@ ## [Class cds. Event](events#cds-event) ## [Class cds. Request](events#cds-request) ## [Error Handling](events#req-reject) - ## [Event Queues](queue) # [cds. Queries](cds-ql) @@ -55,6 +54,7 @@ # [cds. env](cds-env) # [cds. utils](cds-utils) +# [Event Queues](event-queues) # [Serving Fiori UIs](fiori) # [Transactions](cds-tx) # [Security](authentication) diff --git a/node.js/assets/dead-letter-queue-1.js b/node.js/assets/dead-letter-queue-1.js index 75becf250f..f5218040a5 100644 --- a/node.js/assets/dead-letter-queue-1.js +++ b/node.js/assets/dead-letter-queue-1.js @@ -3,7 +3,7 @@ const cds = require('@sap/cds') module.exports = class OutboxDeadLetterQueueService extends cds.ApplicationService { async init() { this.before('READ', 'DeadOutboxMessages', function (req) { - const { maxAttempts } = cds.env.requires.outbox + const { maxAttempts } = cds.env.requires.queue req.query.where('attempts >= ', maxAttempts) }) diff --git a/node.js/event-queues.md b/node.js/event-queues.md new file mode 100644 index 0000000000..160f7e9ed8 --- /dev/null +++ b/node.js/event-queues.md @@ -0,0 +1,259 @@ +--- +synopsis: > + Node.js APIs and configuration for CAP's Transactional Event Queues — `cds.queued`, `cds.unqueued`, `srv.schedule`, `cds.flush`, callbacks, and queue configuration. +status: released +--- + +# Event Queues in Node.js + +For concepts, use cases, and guarantees, see the [Transactional Event Queues](../guides/events/event-queues) guide. This page covers the Node.js-specific APIs and configuration on top of that. + +In Node.js, you wrap a service with `cds.queued()` to queue its events, or enable queueing through configuration. The persistent queue is the default for all queued services. + +> [!info] Event queues vs. `cds.spawn` +> [`cds.spawn`](cds-tx#cds-spawn) runs a *detached continuation* — an in-memory background job in a fresh root transaction, optionally with `every` / `after` recurrence. It does not persist anything: a crash before the job completes loses it, and concurrent app instances each run their own copy. +> +> Reach for `cds.spawn` when the work is in-process, idempotent, and tolerates being dropped — for example, a periodic cache refresh. Use an event queue when you need **transactional integration with the calling request** (the message lives or dies with the surrounding commit) or **persistence and retries across restarts and instances**. + +[[toc]] + + +## Programmatic API + +### Queueing a Service + +#### `cds.queued(srv)` { .method } + +```tsx +function cds.queued ( srv: Service ) => QueuedService +``` + +Wrap a non-database service in `cds.queued()` to obtain a queued proxy. All `emit` / `send` / `run` calls on the proxy are persisted in the current transaction and dispatched after commit: + +```js +const srv = await cds.connect.to('yourService') +const qd_srv = cds.queued(srv) + +await qd_srv.emit('someEvent', { some: 'message' }) // persisted, dispatched async +await qd_srv.send('someEvent', { some: 'message' }) +``` + +::: tip `await` is still needed +The persistent queue writes the message to the database within the current transaction; you still need to `await` to keep that write inside the transaction. +::: + +For backwards compatibility, `cds.outboxed(srv)` works as a synonym. + +#### `cds.unqueued(srv)` { .method } + +```tsx +function cds.unqueued ( srv: QueuedService ) => Service +``` + +Get back the original synchronous service from a queued proxy: + +```js +const srv = cds.unqueued(qd_srv) +``` + +This is useful when a service is queued through configuration and you need a synchronous call site. For backwards compatibility, `cds.unboxed(srv)` works as a synonym. + +#### Queueing through Configuration + +You can outbox any *outbound* service through configuration without changing code. The `outboxed` flag on the service config is the trigger: + +```json +{ + "requires": { + "yourService": { + "kind": "odata", + "outboxed": true + } + } +} +``` + +Some services — `cds.MessagingService` and `cds.AuditLogService` — are outboxed by default; see [*Auto-Outboxed Services*](../guides/events/event-queues#auto-outboxed-services) in the common guide. + + +### Scheduling + +`srv.schedule()` queues like `cds.queued(srv).send()` — within the current transaction, dispatched after commit — but it **upserts** a singleton task keyed by event name (or by `.as(name)`) instead of inserting a new entry on every call. It accepts optional timing: + +```js +await srv.schedule('someEvent', { some: 'message' }) // execute asap +await srv.schedule('someEvent', { some: 'message' }).after('1h') // delay +await srv.schedule('someEvent', { some: 'message' }).every('10m') // recurrence +await srv.schedule('someEvent', { some: 'message' }).every('*/10 * * * *') // cron + +await srv.unschedule('someEvent') // remove +``` + +`.after()` accepts milliseconds (as a number) or a time string such as `'1s'`, `'10m'`, `'1h'`. `.every()` accepts the same plus a five-field cron expression. + +A scheduled task is identified by its event name and exists only once. A subsequent `schedule()` call with the same name overwrites the previous schedule (tasks are upserted, not deduplicated) — convenient for idempotent registration during application startup. + +To schedule the same event under separate identities (for example, with different payloads), give each its own task name with `.as()`: + +```js +await srv.schedule('replicate', { entity: 'Airports' }).as('airports').every('10m') +await srv.schedule('replicate', { entity: 'Airlines' }).as('airlines').every('1 hour') + +await srv.unschedule('airports') // remove by task name +``` + + +### Callbacks + +> [!note] Node.js only +> Callback events have no Java equivalent yet, but they're on the roadmap. + +Once a queued message has been successfully processed, the runtime emits `/#succeeded` on the same service: + +```js +srv.after('someEvent/#succeeded', (data, req) => { + // `data` is the result of the event processor + console.log('Message successfully processed:', data) +}) +``` + +Similarly, when a message becomes a dead letter (after all retries are exhausted), the runtime emits `/#failed`: + +```js +srv.after('someEvent/#failed', (data, req) => { + // `data` is the error from the event processor + console.log('Message could not be processed:', data) +}) +``` + +::: tip Register on specific events +Callback handlers must be registered for the specific `#succeeded` or `#failed` events. The `*` wildcard handler is not called for these events. +::: + + +### Manual Processing + +> [!note] Node.js only +> `cds.flush()` is a Node.js API; both stacks have built-in recovery mechanisms that pick up pending messages automatically. + +You rarely need to trigger processing manually — both single-tenant and multi-tenant runners pick up pending messages automatically. The most common use case is recovery after an application crash, where another emit for the same tenant and service would otherwise be needed to restart processing: + +```js +// Flush a specific queue +const srv = await cds.connect.to('yourService') +await cds.flush(srv.name) + +// Flush all queues +await cds.flush() +``` + +The argument is a **queue name** (typically `srv.name`); `cds.flush()` without an argument flushes all queues. The returned promise resolves once a processing pass has completed for the targeted queues. It's safe to call when the runner is already active — the runner's own scheduling logic handles overlap. + + +## Configuration + +The persistent queue is enabled by default. Messages are stored in the `cds.outbox.Messages` table within the current transaction. `cds.requires.queue` resolves to its default config automatically via `cds.env`; specify it only when tuning. + +```json +{ + "requires": { + "queue": { + "maxAttempts": 10, + "timeout": "1h" + } + } +} +``` + +> [!warning] Rolling upgrades and `legacyLocking` +> The `legacyLocking` flag controls cross-version compatibility for the queue's status check. See [*Locking*](../guides/events/event-queues#locking) in the common guide for the version-by-version behavior and the rolling-upgrade caveat. + +::: details Queue options + +`cds.requires.queue`: + +| Option | Default | Description | +|--------|---------|-------------| +| `maxAttempts` | `10` | Maximum retries before a message becomes a dead letter | +| `timeout` | `"1h"` | Time after which a `processing` message is considered abandoned and eligible for reprocessing | +| `legacyLocking` | `false` | Backward compatibility with `@sap/cds` v9; to be removed in a future release | + +::: + + +### Disabling the Queue + +Disable event queues globally: + +```json +{ "cds": { "requires": { "queue": false } } } +``` + +Or disable queueing for a specific service — for example to make `cds.MessagingService` emit immediately: + +```json +{ + "requires": { + "messaging": { + "kind": "enterprise-messaging", + "outboxed": false + } + } +} +``` + + +## Troubleshooting + +### Inspecting `cds.outbox.Messages` + +To see what's currently queued, query `cds.outbox.Messages` directly. The columns most useful for triage are `status`, `attempts`, `target`, `lastError`, and `lastAttemptTimestamp`: + +```js +const db = await cds.connect.to('db') +const messages = await SELECT.from('cds.outbox.Messages') + .columns('ID', 'target', 'status', 'attempts', 'lastAttemptTimestamp', 'lastError') + .orderBy('timestamp desc') +``` + +For a managed view with bound *revive* and *delete* actions, see [*Dead Letter Queue*](../guides/events/event-queues#dead-letter-queue) in the common guide. + + +### Deleting Entries + +To clear stuck messages programmatically: + +```js +const db = await cds.connect.to('db') +await DELETE.from('cds.outbox.Messages') +``` + + +### Messages Table Not Found + +If the `cds.outbox.Messages` table is missing from the database, the most common cause is insufficient model configuration in *package.json*. If you've overwritten `requires.db.model`, add the outbox model path: + +```jsonc +"requires": { + "db": { ... + "model": [..., "@sap/cds/srv/outbox"] + } +} +``` + +For projects on `@sap/cds < 6.7.0` with custom build tasks that override `options.model`, add the path there too: + +```jsonc +"build": { + "tasks": [{ ... + "options": { "model": [..., "@sap/cds/srv/outbox"] } + }] +} +``` + +Note that the model configuration isn't required for CAP projects using the standard project layout with `db`, `srv`, and `app` folders. + + +--- + +Working in Java? See [Event Queues in Java](../java/event-queues). diff --git a/node.js/messaging.md b/node.js/messaging.md index 9335ad2a4c..36503b5e67 100644 --- a/node.js/messaging.md +++ b/node.js/messaging.md @@ -265,7 +265,7 @@ this.after(['CREATE', 'UPDATE', 'DELETE'], 'Reviews', async (_, req) => { ``` ::: tip The messages are sent once the transaction is successful. -Per default, a persistent queue is used. See [Messaging - Queue](./queue) for more information. +Per default, a persistent queue is used. See [Messaging - Queue](./event-queues) for more information. ::: ## Receiving Events @@ -300,7 +300,7 @@ In general, messages don't contain user information but operate with a technical ### Inbox -You can store received messages in an inbox before they're processed. Under the hood, it uses the [task queue](./queue) for reliable asynchronous processing. +You can store received messages in an inbox before they're processed. Under the hood, it uses the [task queue](./event-queues) for reliable asynchronous processing. Enable it by setting the `inboxed` option to `true`, for example: ```js diff --git a/node.js/queue.md b/node.js/queue.md deleted file mode 100644 index 1d10ba32fe..0000000000 --- a/node.js/queue.md +++ /dev/null @@ -1,342 +0,0 @@ ---- -synopsis: > - Learn details about the task queue feature. -status: released ---- - -# Queueing with `cds.queued` - -[[toc]] - - - -## Overview - -The _task queue_ feature allows you to defer event processing. - -A common use case is the outbox pattern, where remote operations are deferred until the main transaction has been successfully committed. -This prevents accidental execution of remote calls in case the transaction is rolled back. - -Every non-database CAP service can be _queued_, meaning that event dispatching becomes _asynchronous_. - -::: tip -The _task queue_ feature can be disabled globally via cds.requires.queue = false. -::: - - -## Queueing a Service - - -### cds. queued (srv) {.method} - -```tsx -function cds.queued ( srv: Service ) => QueuedService -``` - -Programmatically, you can get the queued service as follows: - -```js -const srv = await cds.connect.to('yourService') -const qd_srv = cds.queued(srv) - -await qd_srv.emit('someEvent', { some: 'message' }) // asynchronous -await qd_srv.send('someEvent', { some: 'message' }) // asynchronous -``` - -::: tip `await` needed -You still need to `await` these operations because they're asynchronous. In case of a persistent queue, which is the default, messages are stored in the database, within the current transaction. -::: - -For backwards compatibility, `cds.outboxed(srv)` works as a synonym. - - -### cds. unqueued (srv) {.method} - -```tsx -function cds.unqueued ( srv: QueuedService ) => Service -``` - -Use this on a queued service to get back to the original service: - -```js -const srv = cds.unqueued(qd_srv) -``` - -This is useful if your service is outboxed (that is, queued) per configuration. - -For backwards compatibility, `cds.unboxed(srv)` works as a synonym. - - -### Per Configuration - -Some services are outboxed by default; these include [`cds.MessagingService`](messaging) and `cds.AuditLogService`. -You can configure the outbox behavior by specifying the `outboxed` option in your service configuration. - -```json -{ - "requires": { - "yourService": { - "kind": "odata", - "outboxed": true - } - } -} -``` - -For transactional safety, you're encouraged to use the [persistent queue](#persistent-queue), which is enabled by default. - - - -## Persistent Queue (Default) {#persistent-queue} - -The persistent queue is the default configuration. - -Using the persistent queue, the to-be-emitted message is stored in a database table within the current transaction, therefore transactional consistency is guaranteed. - -::: details You can use the following configuration options: - -```json -{ - "requires": { - "queue": { - "kind": "persistent-queue", - "maxAttempts": 20, - "storeLastError": true, - "legacyLocking": true, - "timeout": "1h" - } - } -} -``` - -The optional parameters are: - -- `maxAttempts` (default `20`): The number of unsuccessful emits until the message is considered unprocessable. The message will remain in the database table! -- `storeLastError` (default `true`): Specifies whether error information of the last failed emit is stored in the tasks table. -- `legacyLocking` (default `true`): If set to `false`, database locks are only used to set the status of the message to `processing` to prevent long-kept database locks. Although this is the recommended approach, it is incompatible with task runners still on `@sap/cds^8`. -- `timeout` (default `"1h"`): The time after which a message with `status === "processing"` is considered to be abandoned and eligable to be processed again. Only for `legacyLocking === false`. - -::: - -Once the transaction succeeds, the messages are read from the database table and dispatched. -If processing was successful, the respective message is deleted from the database table. -If processing failed, the system retries the message after exponentially increasing delays. -After a maximum number of attempts, the message is ignored for processing and remains in the database, which -therefore also acts as a dead letter queue. -See [Managing the Dead Letter Queue](#managing-the-dead-letter-queue), to learn about how to handle such messages. - -There is only one active message processor per service, tenant, app instance, and message. -This ensures that no duplicate emits happen, except in the highly unlikely case of an app crash right after successful processing but before the message could be deleted. - -::: tip Unrecoverable errors -Some errors during the emit are identified as unrecoverable, for example in [SAP Event Mesh](../guides/events/event-mesh) if the used topic is forbidden. -The respective message is then updated and the `attempts` field is set to `maxAttempts` to prevent further processing. -[Programming errors](./best-practices#error-types) crash the server instance and must be fixed. -To mark your own errors as unrecoverable, you can set `unrecoverable = true` on the error object. -::: - - -Your database model is automatically extended by the entity `cds.outbox.Messages`: - -```cds -namespace cds.outbox; - -entity Messages { - key ID : UUID; - timestamp : Timestamp; - target : String; - msg : LargeString; - attempts : Integer default 0; - partition : Integer default 0; - lastError : LargeString; - lastAttemptTimestamp : Timestamp @cds.on.update: $now; - status : String(23); -} -``` - -In your CDS model, you can refer to the entity `cds.outbox.Messages` using the path `@sap/cds/srv/outbox`, for example to expose it in a service (cf. [Managing the Dead Letter Queue](#managing-the-dead-letter-queue)). - - -### Known Limitations - -- If the app crashes, another emit for the respective tenant and service is necessary to restart the message processing. It can be triggered manually using the `flush` method. -- The service that handles the queued event must not rely on user roles and attributes, as they are not stored with the message. In other words, asynchronous task are always processed in a privileged mode. However, the user ID is stored to re-create the correct context. - - -### Managing the Dead Letter Queue - -You can manage the dead letter queue by implementing a service that exposes a read-only projection on entity `cds.outbox.Messages` as well as bound actions to either revive or delete the respective message. - -::: tip -See [Outbox Dead Letter Queue](../java/outbox#outbox-dead-letter-queue) in the CAP Java documentation for additional considerations while we work on a general outbox guide. -::: - -#### 1. Define the Service - -::: code-group -```cds [srv/outbox-dead-letter-queue-service.cds] -using from '@sap/cds/srv/outbox'; - -@requires: 'internal-user' -service OutboxDeadLetterQueueService { - - @readonly - entity DeadOutboxMessages as projection on cds.outbox.Messages - actions { - action revive(); - action delete(); - }; - -} -``` -::: - -#### 2. Filter for Dead Entries - -As `maxAttempts` is configurable, its value cannot be added as a static filter to projection `DeadOutboxMessages`, but must be considered programmatically. - -::: code-group -<<< ./assets/dead-letter-queue-1.js#snippet{5-8} [srv/outbox-dead-letter-queue-service.js] -::: - -#### 3. Implement Bound Actions - -Finally, entries in the dead letter queue can either be _revived_ by resetting the number of attempts (that is, `SET attempts = 0`) or _deleted_. - -::: code-group -<<< ./assets/dead-letter-queue-2.js#snippet{10-12,14-16} [srv/outbox-dead-letter-queue-service.js] -::: - - -### Additional APIs - -#### Task Scheduling - -You can use the `schedule` method as a shortcut for `cds.queued(srv).send()`, with optional scheduling options `after` and `every`: - -```js -await srv.schedule('someEvent', { some: 'message' }) -await srv.schedule('someEvent', { some: 'message' }).after('1h') // after one hour -await srv.schedule('someEvent', { some: 'message' }).every('1h') // every hour after each processing -``` - -#### Task Processing - -To manually trigger the message processing, for example if your server is restarted, you can use the `flush` method. - -```js -const srv = await cds.connect.to('yourService') -cds.queued(srv).flush() -``` - -#### Task Callbacks - -Once a message has been successfully processed, it triggers the `/#succeeded` handlers. - -```js -srv.after('someEvent/#succeeded', (data, req) => { - // `data` is the result of the event processor - console.log('Message successfully processed:', data) -}) -``` - -Similarly, you can use the `/#failed` event to handle failed messages (once the maximum retry count is reached). - -```js -srv.after('someEvent/#failed', (data, req) => { - // `data` is the error from the event processor - console.log('Message could not be processed:', data) -}) -``` - -::: tip Register on specific events -Event handlers have to be registered for these specific events. The `*` wildcard handler is not called for these. -::: - - - -## In-Memory Queue - -You can enable the in-memory queue globally with: - -```json -{ - "requires": { - "queue": { - "kind": "in-memory-queue" - } - } -} -``` - -Messages are emitted only after the current transaction is successfully committed. Until then, messages are only kept in memory. -This is similar to the following code if done manually: - -```js -cds.context.on('succeeded', () => this.emit(msg)) -``` - -::: warning No retry mechanism -The message is lost if the emit fails. There's no retry mechanism. -::: - - - -## Immediate Emit - -To disable deferred emitting for a particular service only, you can set the `outboxed` option of that service to `false`: - -```json -{ - "requires": { - "messaging": { - "kind": "enterprise-messaging", - "outboxed": false - } - } -} -``` - - - -## Troubleshooting - - -### Delete Entries in the Messages Table - -To manually delete entries in the table `cds.outbox.Messages`, you can either -expose it in a service, see [Managing the Dead Letter Queue](#managing-the-dead-letter-queue), or programmatically modify it using the `cds.outbox.Messages` -entity: - -```js -const db = await cds.connect.to('db') -await DELETE.from('cds.outbox.Messages') -``` - - -### Messages Table Not Found - -If the messages table is not found on the database, this can be caused by insufficient configuration data in _package.json_. - -In case you have overwritten `requires.db.model` there, make sure to add the outbox model path `@sap/cds/srv/outbox`: - -```jsonc -"requires": { - "db": { ... - "model": [..., "@sap/cds/srv/outbox"] - } -} -``` - -The following is only relevant if you're using @sap/cds version < 6.7.0 and you've configured `options.model` in custom build tasks. -Add the model path accordingly: - -```jsonc -"build": { - "tasks": [{ ... - "options": { "model": [..., "@sap/cds/srv/outbox"] } - }] -} -``` - -Note that model configuration isn't required for CAP projects using the standard project layout with `db`, `srv`, and `app` folders. In this case, you can delete the entire `model` configuration. diff --git a/redirects.md b/redirects.md index cc6ad860eb..2c7c01f270 100644 --- a/redirects.md +++ b/redirects.md @@ -88,6 +88,7 @@ - [java/indicating-errors](java/event-handlers/indicating-errors) - [java/messaging-foundation](java/messaging) - [java/observability](java/operating-applications/observability) +- [java/outbox](java/event-queues) - [java/overview](java/getting-started) - [java/persistence-services](java/cqn-services/persistence-services) - [java/provisioning-api](java/event-handlers) @@ -104,7 +105,8 @@ - [node.js/cds-dk](tools/apis/cds-import) - [node.js/middlewares](node.js/cds-serve) -- [node.js/outbox](node.js/queue) +- [node.js/outbox](node.js/event-queues) +- [node.js/queue](node.js/event-queues) - [node.js/protocols](node.js/cds-serve) - [node.js/requests](node.js/events) - [node.js/services](node.js/core-services)