From 7777806da340dfef4dff06f80b7a9ee8cf0c11df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Wed, 27 May 2026 14:51:23 +0200 Subject: [PATCH 1/2] feat(core): require TransactionRunner in OutboxScheduler/OutboxPurger (breaking) Drop the nullable default and the silent non-transactional fallback in tick(). Under JDBC auto-commit, FOR UPDATE SKIP LOCKED releases its row lock at the end of the claim SELECT and concurrent processor instances deliver the same entry multiple times. Make the invariant explicit at the type level in okapi-core, complementing #49's adapter-layer fix. Closes #51 --- CHANGELOG.md | 25 +++++++++-- .../softwaremill/okapi/core/OutboxPurger.kt | 8 ++-- .../okapi/core/OutboxScheduler.kt | 10 +++-- .../okapi/core/OutboxPurgerTest.kt | 45 +++++++++++++++++++ .../okapi/core/OutboxSchedulerTest.kt | 26 +++++------ 5 files changed, 88 insertions(+), 26 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f012b83..f825d76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,15 +25,34 @@ Until `1.0.0`, breaking changes may appear in any release and are flagged with * history; the resulting schema is unchanged. Existing installations from an earlier release: the `outbox:001` changeset checksum changed — they must start on a fresh okapi schema, or clear okapi's rows from `okapi_databasechangelog`, before upgrading. +- **`OutboxScheduler` / `OutboxPurger` in `okapi-core` now require a non-null + `TransactionRunner`** constructor parameter. The previous nullable default + caused `tick()` to silently fall back to a non-transactional path; under JDBC + auto-commit, `FOR UPDATE SKIP LOCKED` releases its row lock at the end of the + claim `SELECT` and concurrent processor instances deliver the same entry + multiple times. Spring Boot users are unaffected. Direct constructor users + (Ktor, manual Spring wiring, plain Java/Kotlin) must supply a + `TransactionRunner`; adapters with their own transaction primitive wrap it + in a thin implementation, e.g. + + ```kotlin + object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = transaction { block() } + } + ``` + + (`TransactionRunner` is intentionally a plain interface, not a `fun interface`, + because its abstract method is generic — Kotlin forbids SAM conversion for + generic abstract methods.) + ([#51](https://github.com/softwaremill/okapi/issues/51)) - **`OutboxProcessorScheduler` / `OutboxPurgerScheduler` constructors now require a non-null `TransactionRunner`** (previously a nullable `TransactionTemplate?`, with `OutboxPurgerScheduler`'s parameter defaulted to `null`). Spring Boot autoconfig users are unaffected — the autoconfig derives a `TransactionRunner` from any `PlatformTransactionManager` on the classpath. Users constructing the schedulers directly must supply a `TransactionRunner` (e.g. `SpringTransactionRunner(template)` or - a thin lambda wrapping their framework's native transaction primitive). The previous - null-default silently degraded `FOR UPDATE SKIP LOCKED` to JDBC auto-commit, permitting - duplicate delivery across processor instances. ([#49](https://github.com/softwaremill/okapi/pull/49)) + a thin lambda wrapping their framework's native transaction primitive). + ([#49](https://github.com/softwaremill/okapi/pull/49)) - **`okapi-spring-boot` autoconfig refuses to start when it cannot verify the PlatformTransactionManager↔outbox-DataSource binding** in a multi-DataSource context. Specifically: if `extractDataSource` cannot determine the PTM's DataSource (e.g. JTA, diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index 71dd56a..e4b30d0 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -10,6 +10,8 @@ import java.util.concurrent.atomic.AtomicBoolean /** * Periodically removes DELIVERED outbox entries older than [OutboxPurgerConfig.retention]. * + * Each batch delete runs inside [transactionRunner]. + * * Runs on a single daemon thread with explicit [start]/[stop] lifecycle. * [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown. * [AtomicBoolean] guards against accidental double-start, not restart. @@ -18,7 +20,7 @@ import java.util.concurrent.atomic.AtomicBoolean */ class OutboxPurger @JvmOverloads constructor( private val outboxStore: OutboxStore, - private val transactionRunner: TransactionRunner? = null, + private val transactionRunner: TransactionRunner, private val config: OutboxPurgerConfig = OutboxPurgerConfig(), private val clock: Clock = Clock.systemUTC(), ) { @@ -59,9 +61,9 @@ class OutboxPurger @JvmOverloads constructor( var totalDeleted = 0 var batches = 0 do { - val deleted = transactionRunner?.runInTransaction { + val deleted = transactionRunner.runInTransaction { outboxStore.removeDeliveredBefore(cutoff, config.batchSize) - } ?: outboxStore.removeDeliveredBefore(cutoff, config.batchSize) + } totalDeleted += deleted batches++ } while (deleted == config.batchSize && batches < MAX_BATCHES_PER_TICK) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt index 1de4ebe..f2bbb2e 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt @@ -9,7 +9,10 @@ import java.util.concurrent.atomic.AtomicBoolean /** * Standalone scheduler that periodically calls [OutboxProcessor.processNext]. * - * Each tick is optionally wrapped in a transaction via [transactionRunner]. + * Each tick runs inside [transactionRunner]. The runner is required: without a surrounding + * transaction, `FOR UPDATE SKIP LOCKED` releases its row lock at the end of the claim + * SELECT and concurrent processor instances deliver the same entry multiple times. + * * Runs on a single daemon thread with explicit [start]/[stop] lifecycle. * [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown. * [AtomicBoolean] guards against accidental double-start, not restart. @@ -20,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean */ class OutboxScheduler @JvmOverloads constructor( private val outboxProcessor: OutboxProcessor, - private val transactionRunner: TransactionRunner? = null, + private val transactionRunner: TransactionRunner, private val config: OutboxSchedulerConfig = OutboxSchedulerConfig(), ) { private val running = AtomicBoolean(false) @@ -50,8 +53,7 @@ class OutboxScheduler @JvmOverloads constructor( private fun tick() { try { - transactionRunner?.runInTransaction { outboxProcessor.processNext(config.batchSize) } - ?: outboxProcessor.processNext(config.batchSize) + transactionRunner.runInTransaction { outboxProcessor.processNext(config.batchSize) } logger.debug("Outbox processor tick completed [batchSize={}]", config.batchSize) } catch (e: Exception) { logger.error("Outbox processor tick failed, will retry at next scheduled interval", e) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt index 17a7711..d6c7b09 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt @@ -31,6 +31,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig( retention = ofDays(7), interval = ofMillis(50), @@ -62,6 +63,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), clock = fixedClock, ) @@ -83,6 +85,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), clock = fixedClock, ) @@ -105,6 +108,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), clock = fixedClock, ) @@ -126,6 +130,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), clock = fixedClock, ) @@ -141,6 +146,7 @@ class OutboxPurgerTest : FunSpec({ val store = stubStore(onRemove = { _, _ -> 0 }) val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMinutes(1), batchSize = 100), clock = fixedClock, ) @@ -155,6 +161,7 @@ class OutboxPurgerTest : FunSpec({ test("start after stop throws") { val purger = OutboxPurger( outboxStore = stubStore(), + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMinutes(1), batchSize = 100), clock = fixedClock, ) @@ -166,8 +173,46 @@ class OutboxPurgerTest : FunSpec({ purger.start() }.message shouldBe "OutboxPurger cannot be restarted after stop()" } + + test("transactionRunner wraps each batch delete") { + val txInvocations = AtomicInteger(0) + val storeInvocations = AtomicInteger(0) + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { _, _ -> + val count = storeInvocations.incrementAndGet() + if (count == 1) { + 100 + } else { + latch.countDown() + 42 + } + }) + val txRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T { + txInvocations.incrementAndGet() + return block() + } + } + + val purger = OutboxPurger( + outboxStore = store, + transactionRunner = txRunner, + config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), + clock = fixedClock, + ) + purger.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + purger.stop() + + storeInvocations.get() shouldBe 2 + txInvocations.get() shouldBe storeInvocations.get() + } }) +private fun noOpTransactionRunner() = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() +} + private fun stubStore(onRemove: (Instant, Int) -> Int = { _, _ -> 0 }) = object : OutboxStore { override fun persist(entry: OutboxEntry) = entry override fun claimPending(limit: Int) = emptyList() diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt index 6239750..cf29568 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt @@ -22,6 +22,7 @@ class OutboxSchedulerTest : FunSpec({ val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMillis(50), batchSize = 25), ) @@ -43,6 +44,7 @@ class OutboxSchedulerTest : FunSpec({ val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMillis(50)), ) @@ -63,6 +65,7 @@ class OutboxSchedulerTest : FunSpec({ val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMillis(50)), ) @@ -78,6 +81,7 @@ class OutboxSchedulerTest : FunSpec({ val processor = stubProcessor { _ -> } val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMinutes(1)), ) @@ -92,6 +96,7 @@ class OutboxSchedulerTest : FunSpec({ val processor = stubProcessor { _ -> } val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMinutes(1)), ) @@ -103,7 +108,7 @@ class OutboxSchedulerTest : FunSpec({ }.message shouldBe "OutboxScheduler cannot be restarted after stop()" } - test("transactionRunner wraps tick when provided") { + test("transactionRunner wraps tick") { val txInvoked = AtomicBoolean(false) val latch = CountDownLatch(1) val processor = stubProcessor { _ -> latch.countDown() } @@ -126,23 +131,12 @@ class OutboxSchedulerTest : FunSpec({ txInvoked.get() shouldBe true } - - test("tick runs without transactionRunner") { - val latch = CountDownLatch(1) - val processor = stubProcessor { _ -> latch.countDown() } - - val scheduler = OutboxScheduler( - outboxProcessor = processor, - transactionRunner = null, - config = OutboxSchedulerConfig(interval = ofMillis(50)), - ) - - scheduler.start() - latch.await(2, TimeUnit.SECONDS) shouldBe true - scheduler.stop() - } }) +private fun noOpTransactionRunner() = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() +} + private fun stubProcessor(onProcessNext: (Int) -> Unit): OutboxProcessor { val store = object : OutboxStore { override fun persist(entry: OutboxEntry) = entry From df18c3dc8932e0e09ab12069f61eadf2bc523a70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andrzej=20Kobyli=C5=84ski?= Date: Fri, 29 May 2026 09:24:42 +0200 Subject: [PATCH 2/2] docs(core): refine OutboxPurger KDoc, trim CHANGELOG note MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the restating "runs inside transactionRunner" line with the purger-specific rationale (per-batch transaction boundary: bounded rollback and no multi-minute sweep transaction) — the why the type signature can't convey. Drop the CHANGELOG parenthetical about plain-vs-fun interface; the Kotlin compiler already rejects fun interface on a generic abstract method, so the note documents an absence the language enforces. --- CHANGELOG.md | 3 --- .../main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt | 3 ++- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f825d76..452f812 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,9 +41,6 @@ Until `1.0.0`, breaking changes may appear in any release and are flagged with * } ``` - (`TransactionRunner` is intentionally a plain interface, not a `fun interface`, - because its abstract method is generic — Kotlin forbids SAM conversion for - generic abstract methods.) ([#51](https://github.com/softwaremill/okapi/issues/51)) - **`OutboxProcessorScheduler` / `OutboxPurgerScheduler` constructors now require a non-null `TransactionRunner`** (previously a nullable `TransactionTemplate?`, with diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index e4b30d0..1a0aad9 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -10,7 +10,8 @@ import java.util.concurrent.atomic.AtomicBoolean /** * Periodically removes DELIVERED outbox entries older than [OutboxPurgerConfig.retention]. * - * Each batch delete runs inside [transactionRunner]. + * Each batch delete is its own transaction -- a mid-batch failure rolls back only + * that batch, and a long retention sweep does not hold one multi-minute transaction. * * Runs on a single daemon thread with explicit [start]/[stop] lifecycle. * [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown.