diff --git a/CHANGELOG.md b/CHANGELOG.md index f012b83..452f812 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,15 +25,31 @@ Until `1.0.0`, breaking changes may appear in any release and are flagged with * history; the resulting schema is unchanged. Existing installations from an earlier release: the `outbox:001` changeset checksum changed — they must start on a fresh okapi schema, or clear okapi's rows from `okapi_databasechangelog`, before upgrading. +- **`OutboxScheduler` / `OutboxPurger` in `okapi-core` now require a non-null + `TransactionRunner`** constructor parameter. The previous nullable default + caused `tick()` to silently fall back to a non-transactional path; under JDBC + auto-commit, `FOR UPDATE SKIP LOCKED` releases its row lock at the end of the + claim `SELECT` and concurrent processor instances deliver the same entry + multiple times. Spring Boot users are unaffected. Direct constructor users + (Ktor, manual Spring wiring, plain Java/Kotlin) must supply a + `TransactionRunner`; adapters with their own transaction primitive wrap it + in a thin implementation, e.g. + + ```kotlin + object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = transaction { block() } + } + ``` + + ([#51](https://github.com/softwaremill/okapi/issues/51)) - **`OutboxProcessorScheduler` / `OutboxPurgerScheduler` constructors now require a non-null `TransactionRunner`** (previously a nullable `TransactionTemplate?`, with `OutboxPurgerScheduler`'s parameter defaulted to `null`). Spring Boot autoconfig users are unaffected — the autoconfig derives a `TransactionRunner` from any `PlatformTransactionManager` on the classpath. Users constructing the schedulers directly must supply a `TransactionRunner` (e.g. `SpringTransactionRunner(template)` or - a thin lambda wrapping their framework's native transaction primitive). The previous - null-default silently degraded `FOR UPDATE SKIP LOCKED` to JDBC auto-commit, permitting - duplicate delivery across processor instances. ([#49](https://github.com/softwaremill/okapi/pull/49)) + a thin lambda wrapping their framework's native transaction primitive). + ([#49](https://github.com/softwaremill/okapi/pull/49)) - **`okapi-spring-boot` autoconfig refuses to start when it cannot verify the PlatformTransactionManager↔outbox-DataSource binding** in a multi-DataSource context. Specifically: if `extractDataSource` cannot determine the PTM's DataSource (e.g. JTA, diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt index 71dd56a..1a0aad9 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxPurger.kt @@ -10,6 +10,9 @@ import java.util.concurrent.atomic.AtomicBoolean /** * Periodically removes DELIVERED outbox entries older than [OutboxPurgerConfig.retention]. * + * Each batch delete is its own transaction -- a mid-batch failure rolls back only + * that batch, and a long retention sweep does not hold one multi-minute transaction. + * * Runs on a single daemon thread with explicit [start]/[stop] lifecycle. * [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown. * [AtomicBoolean] guards against accidental double-start, not restart. @@ -18,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean */ class OutboxPurger @JvmOverloads constructor( private val outboxStore: OutboxStore, - private val transactionRunner: TransactionRunner? = null, + private val transactionRunner: TransactionRunner, private val config: OutboxPurgerConfig = OutboxPurgerConfig(), private val clock: Clock = Clock.systemUTC(), ) { @@ -59,9 +62,9 @@ class OutboxPurger @JvmOverloads constructor( var totalDeleted = 0 var batches = 0 do { - val deleted = transactionRunner?.runInTransaction { + val deleted = transactionRunner.runInTransaction { outboxStore.removeDeliveredBefore(cutoff, config.batchSize) - } ?: outboxStore.removeDeliveredBefore(cutoff, config.batchSize) + } totalDeleted += deleted batches++ } while (deleted == config.batchSize && batches < MAX_BATCHES_PER_TICK) diff --git a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt index 1de4ebe..f2bbb2e 100644 --- a/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt +++ b/okapi-core/src/main/kotlin/com/softwaremill/okapi/core/OutboxScheduler.kt @@ -9,7 +9,10 @@ import java.util.concurrent.atomic.AtomicBoolean /** * Standalone scheduler that periodically calls [OutboxProcessor.processNext]. * - * Each tick is optionally wrapped in a transaction via [transactionRunner]. + * Each tick runs inside [transactionRunner]. The runner is required: without a surrounding + * transaction, `FOR UPDATE SKIP LOCKED` releases its row lock at the end of the claim + * SELECT and concurrent processor instances deliver the same entry multiple times. + * * Runs on a single daemon thread with explicit [start]/[stop] lifecycle. * [start] and [stop] are single-use -- the internal executor cannot be restarted after shutdown. * [AtomicBoolean] guards against accidental double-start, not restart. @@ -20,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean */ class OutboxScheduler @JvmOverloads constructor( private val outboxProcessor: OutboxProcessor, - private val transactionRunner: TransactionRunner? = null, + private val transactionRunner: TransactionRunner, private val config: OutboxSchedulerConfig = OutboxSchedulerConfig(), ) { private val running = AtomicBoolean(false) @@ -50,8 +53,7 @@ class OutboxScheduler @JvmOverloads constructor( private fun tick() { try { - transactionRunner?.runInTransaction { outboxProcessor.processNext(config.batchSize) } - ?: outboxProcessor.processNext(config.batchSize) + transactionRunner.runInTransaction { outboxProcessor.processNext(config.batchSize) } logger.debug("Outbox processor tick completed [batchSize={}]", config.batchSize) } catch (e: Exception) { logger.error("Outbox processor tick failed, will retry at next scheduled interval", e) diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt index 17a7711..d6c7b09 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxPurgerTest.kt @@ -31,6 +31,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig( retention = ofDays(7), interval = ofMillis(50), @@ -62,6 +63,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), clock = fixedClock, ) @@ -83,6 +85,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), clock = fixedClock, ) @@ -105,6 +108,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), clock = fixedClock, ) @@ -126,6 +130,7 @@ class OutboxPurgerTest : FunSpec({ val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), clock = fixedClock, ) @@ -141,6 +146,7 @@ class OutboxPurgerTest : FunSpec({ val store = stubStore(onRemove = { _, _ -> 0 }) val purger = OutboxPurger( outboxStore = store, + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMinutes(1), batchSize = 100), clock = fixedClock, ) @@ -155,6 +161,7 @@ class OutboxPurgerTest : FunSpec({ test("start after stop throws") { val purger = OutboxPurger( outboxStore = stubStore(), + transactionRunner = noOpTransactionRunner(), config = OutboxPurgerConfig(interval = ofMinutes(1), batchSize = 100), clock = fixedClock, ) @@ -166,8 +173,46 @@ class OutboxPurgerTest : FunSpec({ purger.start() }.message shouldBe "OutboxPurger cannot be restarted after stop()" } + + test("transactionRunner wraps each batch delete") { + val txInvocations = AtomicInteger(0) + val storeInvocations = AtomicInteger(0) + val latch = CountDownLatch(1) + val store = stubStore(onRemove = { _, _ -> + val count = storeInvocations.incrementAndGet() + if (count == 1) { + 100 + } else { + latch.countDown() + 42 + } + }) + val txRunner = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T { + txInvocations.incrementAndGet() + return block() + } + } + + val purger = OutboxPurger( + outboxStore = store, + transactionRunner = txRunner, + config = OutboxPurgerConfig(interval = ofMillis(50), batchSize = 100), + clock = fixedClock, + ) + purger.start() + latch.await(2, TimeUnit.SECONDS) shouldBe true + purger.stop() + + storeInvocations.get() shouldBe 2 + txInvocations.get() shouldBe storeInvocations.get() + } }) +private fun noOpTransactionRunner() = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() +} + private fun stubStore(onRemove: (Instant, Int) -> Int = { _, _ -> 0 }) = object : OutboxStore { override fun persist(entry: OutboxEntry) = entry override fun claimPending(limit: Int) = emptyList() diff --git a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt index 6239750..cf29568 100644 --- a/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt +++ b/okapi-core/src/test/kotlin/com/softwaremill/okapi/core/OutboxSchedulerTest.kt @@ -22,6 +22,7 @@ class OutboxSchedulerTest : FunSpec({ val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMillis(50), batchSize = 25), ) @@ -43,6 +44,7 @@ class OutboxSchedulerTest : FunSpec({ val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMillis(50)), ) @@ -63,6 +65,7 @@ class OutboxSchedulerTest : FunSpec({ val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMillis(50)), ) @@ -78,6 +81,7 @@ class OutboxSchedulerTest : FunSpec({ val processor = stubProcessor { _ -> } val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMinutes(1)), ) @@ -92,6 +96,7 @@ class OutboxSchedulerTest : FunSpec({ val processor = stubProcessor { _ -> } val scheduler = OutboxScheduler( outboxProcessor = processor, + transactionRunner = noOpTransactionRunner(), config = OutboxSchedulerConfig(interval = ofMinutes(1)), ) @@ -103,7 +108,7 @@ class OutboxSchedulerTest : FunSpec({ }.message shouldBe "OutboxScheduler cannot be restarted after stop()" } - test("transactionRunner wraps tick when provided") { + test("transactionRunner wraps tick") { val txInvoked = AtomicBoolean(false) val latch = CountDownLatch(1) val processor = stubProcessor { _ -> latch.countDown() } @@ -126,23 +131,12 @@ class OutboxSchedulerTest : FunSpec({ txInvoked.get() shouldBe true } - - test("tick runs without transactionRunner") { - val latch = CountDownLatch(1) - val processor = stubProcessor { _ -> latch.countDown() } - - val scheduler = OutboxScheduler( - outboxProcessor = processor, - transactionRunner = null, - config = OutboxSchedulerConfig(interval = ofMillis(50)), - ) - - scheduler.start() - latch.await(2, TimeUnit.SECONDS) shouldBe true - scheduler.stop() - } }) +private fun noOpTransactionRunner() = object : TransactionRunner { + override fun runInTransaction(block: () -> T): T = block() +} + private fun stubProcessor(onProcessNext: (Int) -> Unit): OutboxProcessor { val store = object : OutboxStore { override fun persist(entry: OutboxEntry) = entry