feat: KeyedNormalizer + AsyncPollingConsumer.PollOnceAsync + OutboxStoreExtensions#335
Conversation
…cal> Key-based O(1) dispatch variant of Normalizer. Builder throws on duplicate key registration (safer than last-writer-wins). Dictionary snapshot taken at Build() so the built instance is fully immutable after construction. 8 test scenarios cover: happy path, multiple keys, default fallback, missing-key-throws, async handler, cancellation, duplicate-key guard, and build-twice semantics. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Single-shot poll cycle that skips the run loop, interval, jitter, and back-off — designed for caller-driven polling (workflow steps, cron jobs, AWS Lambda, Azure Functions). Returns the raw message from the source or null on empty poll. Does not invoke any run-loop handler. 4 new test scenarios: returns item, returns null on empty, respects cancellation, confirms handler is not invoked. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…sync Convenience extension on IOutboxStore<object> for callers without compile-time payload type knowledge (e.g., generic workflow orchestrators). Accepts an untyped payload + IReadOnlyDictionary<string,string> headers, projects to MessageHeaders, and delegates to EnqueueAsync. Null or empty headers map to MessageHeaders.Empty (no allocation). 3 test scenarios: with payload+headers, null headers, cancellation. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
There was a problem hiding this comment.
Pull request overview
This PR introduces three new messaging primitives in PatternKit.Core to support the WorkflowFramework adoption plan (Phase 1): a keyed normalizer for O(1) dispatch, a single-shot polling API for AsyncPollingConsumer<T>, and an outbox convenience extension for untyped payloads.
Changes:
- Added
KeyedNormalizer<TKey,TRaw,TCanonical>with a fluent builder, default handler support, and duplicate-key validation. - Added
AsyncPollingConsumer<T>.PollOnceAsyncfor caller-driven single-iteration polling. - Added
OutboxStoreExtensions.EnqueueObjectAsyncto enqueueobjectpayloads with optional string headers, plus new tests for all three deliverables.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.cs |
Adds dictionary-dispatch normalizer with builder, default handler, and snapshot immutability. |
src/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs |
Adds PollOnceAsync single-shot polling API. |
src/PatternKit.Core/Messaging/Reliability/OutboxStoreExtensions.cs |
Adds EnqueueObjectAsync convenience overload for IOutboxStore<object>. |
test/PatternKit.Tests/Messaging/Transformation/KeyedNormalizerTests.cs |
Adds scenario-based test coverage for keyed normalizer behavior and builder semantics. |
test/PatternKit.Tests/Messaging/Consumers/AsyncPollingConsumerTests.cs |
Adds tests for PollOnceAsync behavior and cancellation. |
test/PatternKit.Tests/Messaging/Reliability/OutboxStoreExtensionsTests.cs |
Adds tests for EnqueueObjectAsync payload/headers behavior and cancellation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| [Scenario("PollOnceAsync DoesNotInvokeRunLoopHandler")] | ||
| [Fact] | ||
| public async Task PollOnceAsync_DoesNotInvokeRunLoopHandler() | ||
| { | ||
| var handlerInvoked = false; | ||
|
|
||
| var consumer = AsyncPollingConsumer<string>.Create() | ||
| .WithSource(async (_, _) => { await Task.CompletedTask; return Message<string>.Create("msg"); }) | ||
| .Build(); | ||
|
|
||
| // PollOnceAsync takes no handler argument — the run-loop handler is never called. | ||
| var result = await consumer.PollOnceAsync(); | ||
|
|
||
| ScenarioExpect.False(handlerInvoked); | ||
| ScenarioExpect.NotNull(result); // the source did produce a message |
| ScenarioExpect.Equal(0, record.Message.Headers.Count); | ||
| } | ||
|
|
| /// <param name="ct">Cancellation token propagated directly to the poll source.</param> | ||
| /// <returns> | ||
| /// The message returned by the source, or <see langword="null"/> when the source | ||
| /// returned an empty poll. | ||
| /// </returns> | ||
| public ValueTask<Message<TPayload>?> PollOnceAsync(CancellationToken ct = default) | ||
| { | ||
| ct.ThrowIfCancellationRequested(); | ||
| return _source(MessageContext.Empty, ct); |
Test Results799 tests 799 ✅ 28s ⏱️ Results for commit 71c1e4a. ♻️ This comment has been updated with latest results. |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #335 +/- ##
==========================================
+ Coverage 89.66% 95.64% +5.97%
==========================================
Files 484 486 +2
Lines 39951 40002 +51
Branches 5744 5756 +12
==========================================
+ Hits 35824 38261 +2437
+ Misses 1873 1741 -132
+ Partials 2254 0 -2254
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
🔍 PR Validation ResultsVersion: `` ✅ Validation Steps
📊 ArtifactsDry-run artifacts have been uploaded and will be available for 7 days. This comment was automatically generated by the PR validation workflow. |
- PollOnceAsync: add optional context parameter so callers can supply MessageContext instead of always using Empty; cancellation flows via the explicit ct parameter as before - PollOnceAsync test DoesNotInvokeRunLoopHandler: rewrite to count source invocations (observable) instead of checking an unused flag - OutboxStoreExtensionsTests: assert ReferenceEquals(MessageHeaders.Empty) for null headers; add empty-dictionary scenario to verify same guarantee Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Code Coverage |
Summary
Ships PatternKit v0.113.0 with three new primitives targeting the WorkflowFramework adoption plan (Phase 1 of patternkit-iteration-2.md).
Deliverable 1 —
KeyedNormalizer<TKey,TRaw,TCanonical>src/PatternKit.Core/Messaging/Transformation/KeyedNormalizer.csNormalizer<TRaw,TCanonical>(predicate-first)ArgumentExceptionon duplicate key registration (safer than last-writer-wins)Defaulthandler; throwsKeyNotFoundExceptionif neither key nor default matches (error message format matches predicate-based Normalizer)Build()— fully immutable after constructionKeyedNormalizerTests.csDeliverable 2 —
AsyncPollingConsumer<T>.PollOnceAsyncsrc/PatternKit.Core/Messaging/Consumers/AsyncPollingConsumer.cs(edited)Message<TPayload>?from the source — caller owns what to do with itAsyncPollingConsumerTests.csDeliverable 3 —
OutboxStoreExtensions.EnqueueObjectAsyncsrc/PatternKit.Core/Messaging/Reliability/OutboxStoreExtensions.csIOutboxStore<object>for callers without compile-time payload type knowledgeobject payload+IReadOnlyDictionary<string,string>? headersMessageHeaders.Empty(no allocation)OutboxStoreExtensionsTests.csTest plan
[Scenario]/[Fact]tests pass on net8, net9, net10 TFMsNormalizerTests,AsyncPollingConsumerTests,IOutboxStoreTestsPatternKit.Corebuilds clean across all target frameworks (netstandard2.0/2.1, net8/9/10)🤖 Generated with Claude Code