Fix the handling of subscriptions with multiple topics and extra fields#2425
Fix the handling of subscriptions with multiple topics and extra fields#2425abalarev wants to merge 1 commit intoeclipse-ditto:masterfrom
Conversation
f26bbaf to
01fd8d8
Compare
|
@abalarev will you also provide a system-test which tests this fix - seems important enough to me to cover it with an integration-test |
|
@thjaeckle Sounds reasonable, I'll add some system test(s) |
|
@abalarev thanks for the added system tests 👍 |
|
@thjaeckle Please run the system tests again, I've pushed a fix. |
Signed-off-by: Andrey Balarev <andrey.balarev@bosch.com>
01fd8d8 to
340ea56
Compare
thjaeckle
left a comment
There was a problem hiding this comment.
Thanks for tackling this — the refactor direction looks right and the headline scenarios are well-covered. A few correctness concerns and polish items inline.
One overarching note on test coverage: the four correctness concerns flagged below (shared-root trimming, cross-streaming-type early return, non-deterministic findFirst over a Set, and catch-all / extras precedence) are not yet exercised by the new tests. Once the intended behavior for each is settled, please add regression tests that lock it in.
| final var builder = extra.toBuilder(); | ||
| allExtraFields.getPointers().stream() | ||
| .filter(pointer -> !neededFields.getPointers().contains(pointer)) | ||
| .forEach(pointer -> pointer.getRoot().ifPresent(builder::remove)); |
There was a problem hiding this comment.
The trim removes by pointer.getRoot(), which deletes the whole top-level object. With allExtraFields = ["/features/f1", "/features/f2", "/attributes"] and neededFields = ["/features/f1", "/attributes"], the loop hits /features/f2, removes the features root, and also wipes /features/f1 that the matched topic actually requested.
The trimming should operate at full-pointer depth (remove the value at the exact pointer, or rebuild the JSON containing only neededFields) rather than at the root segment.
| .map(FilteredTopic::getExtraFields) | ||
| .flatMap(Optional::stream) | ||
| .toList(); | ||
| boolean topicWithNoFilterNoExtraFieldsExists = topics.stream().anyMatch(topic -> topic.getFilter().isEmpty() && topic.getExtraFields().isEmpty()); |
There was a problem hiding this comment.
topics is target.getTopics() unfiltered by streaming type — pairTargetsWithTopics now passes all of a target's topics whenever any one of them has extra fields. So a target with e.g. a TWIN_EVENTS extras-topic plus a LIVE_MESSAGES topic with no filter and no extras would short-circuit enrichment for a TWIN_EVENTS signal, even though the LIVE_MESSAGES topic is irrelevant to that signal.
This anyMatch should apply the same streamingType == StreamingType.fromTopic(topic.getTopic().getPubSubTopic()) guard used in pairTargetsWithTopics.
| .flatMap(Optional::stream) | ||
| .toList(); | ||
| boolean topicWithNoFilterNoExtraFieldsExists = topics.stream().anyMatch(topic -> topic.getFilter().isEmpty() && topic.getExtraFields().isEmpty()); | ||
| if (allExtraFields.isEmpty() || topicWithNoFilterNoExtraFieldsExists) { |
There was a problem hiding this comment.
Behavior change worth pinning down: a target that mixes { TWIN_EVENTS, no filter, no extras } with { TWIN_EVENTS, filter=X, extras=definition } will now be delivered without extras (the catch-all wins via this early return). Previously the same signal also produced an enriched copy.
Deduplication is good, but is it intentional that the catch-all topic wins over the topic that explicitly requested enrichment? Either way, please document the precedence rule in the javadoc of enrichAndFilterSignal so the semantics are unambiguous.
| .map(signal -> setTrimmedExtra(signal, topic, expressionResolver, | ||
| extra, allExtraFieldsOptional.get())) | ||
| .stream()) | ||
| .findFirst() |
There was a problem hiding this comment.
topics is a Set<FilteredTopic>, which has no defined iteration order. If a signal matches more than one topic on the target, findFirst() picks an arbitrary one, so the trimmed extras delivered downstream become non-deterministic — the very symptom this PR sets out to fix.
Consider iterating in a deterministic order: derive from target.getTopics() via a structure that preserves insertion order, or sort topics by a stable key, so the chosen match is reproducible.
|
|
||
|
|
||
| // Called inside stream; must be thread-safe | ||
| // precondition: whenever filteredTopic != null, it contains an extra fields |
There was a problem hiding this comment.
This precondition comment is now stale — the parameter is Set<FilteredTopic>, not a single nullable FilteredTopic. Please update or remove.
| * As these targets have already passed pre-filtering in an early stage, no more filtering is needed.</li> | ||
| * <li> | ||
| * For each target containing any extra field in its topics, it produces a pair of outbound signal and a set of its target topics. | ||
| * As the filter could incLude extra fields, an additional filtering must be performed after extracting the extra fields. |
| if (extraFieldsOptional.isEmpty()) { | ||
| final Set<FilteredTopic> topics = outboundSignalWithExtraFields.second(); | ||
|
|
||
| List<JsonFieldSelector> allExtraFields = topics.stream() |
There was a problem hiding this comment.
The rest of this file is consistently final on every local. Please add final to the new locals introduced in this method: allExtraFields (here), topicWithNoFilterNoExtraFieldsExists, topicWithNoFilterExists, allExtraFieldsOptional, and partialThingOptional.
| public void multipleTopicsWithoutExtraFieldsFastProcessed() { | ||
| new TestKit(actorSystemResource.getActorSystem()) {{ | ||
| // Create a target with multiple topics all without extra fields. | ||
| // Must send outbound signal, skipping filtering as the pe-filtering has already done the job |
There was a problem hiding this comment.
Typo: pe-filtering → pre-filtering.
| public void multipleTopicsWithExtraFieldsFirstTopic() { | ||
| new TestKit(actorSystemResource.getActorSystem()) {{ | ||
| // Create a target with multiple FilteredTopics for the same streaming type with different extraFields | ||
| List <Target> targets = List.of(createTestTargetMultiTopics(Set.of(topic4(), topic3(), topic5()))); |
There was a problem hiding this comment.
Stray space between List and <Target> (also at lines 310, 341, 377, 402). Should be List<Target> to match the project style.
Summary
Resolves incorrect handling of subscriptions for multiple topics, containing at least one extra field.
Issue
What's changed
OutboundMaooingProcessorActor flow changed as follows:
Added system tests: eclipse-ditto/ditto-testing#33