diff --git a/system/src/test/java/org/eclipse/ditto/testing/system/connectivity/AbstractConnectivityITestCases.java b/system/src/test/java/org/eclipse/ditto/testing/system/connectivity/AbstractConnectivityITestCases.java index 54a5f69..a0d8b4a 100644 --- a/system/src/test/java/org/eclipse/ditto/testing/system/connectivity/AbstractConnectivityITestCases.java +++ b/system/src/test/java/org/eclipse/ditto/testing/system/connectivity/AbstractConnectivityITestCases.java @@ -1538,6 +1538,369 @@ public void publishEnrichedSignalsWithFeatureWildcard() { ); } + @Test + @Connections({CONNECTION_WITH_EXTRA_FIELDS, CONNECTION1}) + public void publishEnrichedSignalsWithMultipleTopicsPerTarget() { + // A single target subscribes to twin events AND live events, each with different extraFields. + // The outbound mapping must apply the correct extraFields per topic and not duplicate signals. + final String correlationIdPrefix = "publishMultiTopics-"; + final String correlationId = correlationIdPrefix + createNewCorrelationId(); + final DittoHeaders headers = createDittoHeaders(correlationId); + + final C consumer = initTargetsConsumer(cf.connectionNameWithExtraFields); + + final ThingId thingId = generateThingId(); + final int initialCounterValue = 0; + final String counterKey = "counter"; + final Thing thing = Thing.newBuilder() + .setId(thingId) + .setAttribute(JsonPointer.of(counterKey), JsonValue.of(initialCounterValue)) + .setAttribute(JsonPointer.of("filter"), JsonValue.of(false)) + .build(); + final Policy policy = Policy.newBuilder() + .forLabel("DEFAULT") + .setSubject(testingContextWithRandomNs.getOAuthClient().getDefaultSubject()) + .setSubject(connectionSubject(cf.connectionName1)) + .setSubject(connectionSubject(cf.connectionNameWithExtraFields)) + .setGrantedPermissions(PoliciesResourceType.thingResource("/"), READ, WRITE) + .setGrantedPermissions(PoliciesResourceType.policyResource("/"), READ, WRITE) + .setGrantedPermissions(PoliciesResourceType.messageResource("/"), READ, WRITE) + .build(); + final CreateThing createThing = CreateThing.of(thing, policy.toJson(), headers); + + sendSignal(cf.connectionName1, createThing); + final M thingCreatedMessage = consumeFromTarget(cf.connectionNameWithExtraFields, consumer); + assertThat(thingCreatedMessage).describedAs("twinEvent (thingCreated)").isNotNull(); + final Adaptable thingCreatedAdaptable = jsonifiableAdaptableFrom(thingCreatedMessage); + assertThat(thingCreatedAdaptable.getTopicPath().getChannel()).isEqualTo(TopicPath.Channel.TWIN); + assertThat(thingCreatedAdaptable.getTopicPath().getAction()).contains(TopicPath.Action.CREATED); + assertThat(thingCreatedAdaptable.getPayload().getExtra()).describedAs("thingCreated/extra") + .contains(JsonObject.newBuilder() + .set(JsonPointer.of("attributes/" + counterKey), initialCounterValue) + .build()); + + // Set counter to 20 to pass filters on live commands/events topics + final DittoHeaders headers2 = headers.toBuilder() + .correlationId(correlationIdPrefix + "set20-" + createNewCorrelationId()) + .build(); + final ModifyAttribute setCounter20 = + ModifyAttribute.of(thingId, JsonPointer.of(counterKey), JsonValue.of(20), headers2); + sendSignal(cf.connectionName1, setCounter20); + final M setCounter20Message = consumeFromTarget(cf.connectionNameWithExtraFields, consumer); + assertThat(setCounter20Message).describedAs("twinEvent (attributeModified[c=20])").isNotNull(); + final Adaptable setCounter20Adaptable = jsonifiableAdaptableFrom(setCounter20Message); + assertThat(setCounter20Adaptable.getPayload().getExtra()).describedAs("attributeModified[c=20]/extra") + .contains(JsonObject.newBuilder().set(JsonPointer.of("attributes/counter"), 20).build()); + + // Set filter=true to pass live events filter + final DittoHeaders headers3 = headers.toBuilder() + .correlationId(correlationIdPrefix + "setFilter-" + createNewCorrelationId()) + .build(); + final ModifyAttribute setFilterTrue = + ModifyAttribute.of(thingId, JsonPointer.of("filter"), JsonValue.of(true), headers3); + sendSignal(cf.connectionName1, setFilterTrue); + final M setFilterMessage = consumeFromTarget(cf.connectionNameWithExtraFields, consumer); + assertThat(setFilterMessage).describedAs("twinEvent (filter=true)").isNotNull(); + + // Send live command - should be received since counter=20 matches live commands filter + final DittoHeaders liveHeaders = headers.toBuilder() + .correlationId(correlationIdPrefix + "live-cmd-" + createNewCorrelationId()) + .responseRequired(false) + .channel(TopicPath.Channel.LIVE.getName()).build(); + final ModifyAttribute liveModifyAttribute = + ModifyAttribute.of(thingId, JsonPointer.of(counterKey), JsonValue.of(99), liveHeaders); + sendSignal(cf.connectionName1, liveModifyAttribute); + final M liveCommandMessage = consumeFromTarget(cf.connectionNameWithExtraFields, consumer); + assertThat(liveCommandMessage).describedAs("liveCommand").isNotNull(); + final Adaptable liveCommandAdaptable = jsonifiableAdaptableFrom(liveCommandMessage); + assertThat(liveCommandAdaptable.getTopicPath().getChannel()).isEqualTo(TopicPath.Channel.LIVE); + assertThat(liveCommandAdaptable.getTopicPath().getAction()).contains(TopicPath.Action.MODIFY); + assertThat(liveCommandAdaptable.getPayload().getExtra()).describedAs("liveCommand/extra") + .contains(JsonObject.newBuilder().set(JsonPointer.of("attributes/counter"), 20).build()); + + // Send live event - should be received since counter=20 AND filter=true matches live events filter + final DittoHeaders liveHeaders2 = headers.toBuilder() + .correlationId(correlationIdPrefix + "live-evt-" + createNewCorrelationId()) + .responseRequired(false) + .channel(TopicPath.Channel.LIVE.getName()).build(); + final AttributeModified liveEvent = + AttributeModified.of(thingId, JsonPointer.of("filter"), JsonValue.of(true), 300L, Instant.now(), + liveHeaders2, null); + sendSignal(cf.connectionName1, liveEvent); + final M liveEventMessage = consumeFromTarget(cf.connectionNameWithExtraFields, consumer); + assertThat(liveEventMessage).describedAs("liveEvent").isNotNull(); + final Adaptable liveEventAdaptable = jsonifiableAdaptableFrom(liveEventMessage); + assertThat(liveEventAdaptable.getTopicPath().getChannel()).isEqualTo(TopicPath.Channel.LIVE); + assertThat(liveEventAdaptable.getTopicPath().getAction()).contains(TopicPath.Action.MODIFIED); + assertThat(liveEventAdaptable.getPayload().getExtra()).describedAs("liveEvent/extra") + .contains(JsonObject.newBuilder().set(JsonPointer.of("attributes/counter"), 20).build()); + } + + @Test + @Connections(NONE) + public void overlappingTwinTopicFiltersDeliverSignalOnce() throws Exception { + // A single target with 2 twin event topics whose filters BOTH match the same signal. + // Verifies findFirst() semantics: signal delivered exactly once, extra fields from one topic only. + final String connectionName = "overlap-twin-" + UUID.randomUUID(); + final Connection baseConnection = cf.getSingleConnection(connectionName); + if (baseConnection.getConnectionType() == ConnectionType.AMQP_10 || + baseConnection.getConnectionType() == ConnectionType.AMQP_091) { + LOGGER.info("Skipping overlappingTwinTopicFiltersDeliverSignalOnce for {} - " + + "requires protocol-specific setup for dynamically created connections", + baseConnection.getConnectionType()); + return; + } + + final String targetAddress = cf.defaultTargetAddress(connectionName); + final Target baseTarget = baseConnection.getTargets().get(0); + + // Broad topic: matches ALL attribute modifications, enriched with definition + final FilteredTopic topicBroad = ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withFilter("eq(topic:action,\"modified\")") + .withExtraFields(ThingFieldSelector.fromString("definition")) + .build(); + // Narrow topic: matches only /attributes/specific modifications, enriched with features + final FilteredTopic topicNarrow = ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/attributes/specific\"))") + .withExtraFields(ThingFieldSelector.fromString("features")) + .build(); + + final Target target = ConnectivityModelFactory.newTargetBuilder(baseTarget) + .address(targetAddress) + .topics(Set.of(topicBroad, topicNarrow)) + .build(); + + final Connection connection = baseConnection.toBuilder() + .setTargets(Collections.singletonList(target)) + .build(); + + cf.asyncCreateConnection(connection).get(60, TimeUnit.SECONDS); + final C consumer = initTargetsConsumer(connectionName, targetAddress); + + final ThingId thingId = generateThingId(); + final PolicyId policyId = putPolicyForThing(thingId, connectionName); + final FeatureProperties sensorProps = ThingsModelFactory.newFeaturePropertiesBuilder() + .set("value", 42) + .build(); + final Thing thing = Thing.newBuilder() + .setId(thingId) + .setPolicyId(policyId) + .setDefinition(ThingsModelFactory.newDefinition("test:TestModel:1.0.0")) + .setAttribute(JsonPointer.of("specific"), JsonValue.of("initial")) + .setAttribute(JsonPointer.of("other"), JsonValue.of("initial")) + .setFeature("sensor", sensorProps) + .build(); + + putThing(2, thing, JsonSchemaVersion.V_2) + .withJWT(testingContextWithRandomNs.getOAuthClient().getAccessToken()) + .expectingHttpStatus(HttpStatus.CREATED) + .fire(); + Thread.sleep(1000); + + // --- Step 1: modify /attributes/specific → matches BOTH topics → signal once, no duplication --- + putAttribute(2, thingId, "specific", "\"fired\"") + .withJWT(testingContextWithRandomNs.getOAuthClient().getAccessToken()) + .expectingHttpStatus(HttpStatus.NO_CONTENT) + .fire(); + + final M messageOverlap = consumeFromTarget(connectionName, consumer); + assertThat(messageOverlap).describedAs("event for overlapping modification").isNotNull(); + final Adaptable adaptableOverlap = jsonifiableAdaptableFrom(messageOverlap); + assertThat(adaptableOverlap.getTopicPath().getChannel()).isEqualTo(TopicPath.Channel.TWIN); + assertThat(adaptableOverlap.getTopicPath().getAction()).contains(TopicPath.Action.MODIFIED); + assertThat(adaptableOverlap.getPayload().getExtra()) + .describedAs("overlapping match must produce extra from exactly one topic") + .isPresent() + .hasValueSatisfying(extra -> { + final boolean hasDefinition = extra.contains(JsonKey.of("definition")); + final boolean hasFeatures = extra.contains(JsonKey.of("features")); + assertThat(hasDefinition || hasFeatures) + .describedAs("extra must contain definition OR features") + .isTrue(); + assertThat(hasDefinition && hasFeatures) + .describedAs("extra must NOT contain both — findFirst should pick one topic only") + .isFalse(); + }); + + // Verify no duplicate signal for the same modification + final M possibleDuplicate = consumeFromTarget(connectionName, consumer); + + // --- Step 2: modify /attributes/other → matches only broad topic → deterministic extra --- + putAttribute(2, thingId, "other", "\"fired\"") + .withJWT(testingContextWithRandomNs.getOAuthClient().getAccessToken()) + .expectingHttpStatus(HttpStatus.NO_CONTENT) + .fire(); + + // If step 1 produced a duplicate, it arrives here; consume until we get the "other" event + M messageOther = possibleDuplicate; + if (messageOther != null) { + final Adaptable dup = jsonifiableAdaptableFrom(messageOther); + if ("/attributes/specific".equals(dup.getPayload().getPath().toString())) { + fail("Duplicate signal received for overlapping filter match on /attributes/specific"); + } + } else { + messageOther = consumeFromTarget(connectionName, consumer); + } + + assertThat(messageOther).describedAs("event for /attributes/other modification").isNotNull(); + final Adaptable adaptableOther = jsonifiableAdaptableFrom(messageOther); + assertThat(adaptableOther.getTopicPath().getChannel()).isEqualTo(TopicPath.Channel.TWIN); + assertThat(adaptableOther.getPayload().getPath().toString()) + .isEqualTo(JsonPointer.of("attributes/other").toString()); + assertThat(adaptableOther.getPayload().getExtra()) + .describedAs("only broad topic matches → extra must contain definition") + .isPresent() + .hasValueSatisfying(extra -> { + assertThat(extra.contains(JsonKey.of("definition"))) + .describedAs("extra must include definition") + .isTrue(); + assertThat(extra.contains(JsonKey.of("features"))) + .describedAs("extra must NOT include features (narrow topic doesn't match)") + .isFalse(); + }); + } + + @Test + @Connections(NONE) + public void multipleTwinTopicsWithDifferentFiltersAndExtraFields() throws Exception { + // A single target with 3 twin event topics, each with a different RQL filter and different extra fields. + // Verifies that for each modification, only the matching topic's filter passes, + // and only the matching topic's extra fields are included in the outbound signal. + // Extra fields use different root keys (definition vs features) because setTrimmedExtra + // removes by root key — same-root fields (e.g. attributes/x, attributes/y) would collide. + final String connectionName = "multi-twin-" + UUID.randomUUID(); + final Connection baseConnection = cf.getSingleConnection(connectionName); + if (baseConnection.getConnectionType() == ConnectionType.AMQP_10 || + baseConnection.getConnectionType() == ConnectionType.AMQP_091) { + LOGGER.info("Skipping multipleTwinTopicsWithDifferentFiltersAndExtraFields for {} - " + + "requires protocol-specific setup for dynamically created connections", + baseConnection.getConnectionType()); + return; + } + + final String targetAddress = cf.defaultTargetAddress(connectionName); + final Target baseTarget = baseConnection.getTargets().get(0); + + // Topic A: matches modifications to /attributes/triggerA, enriched with definition + final FilteredTopic topicA = ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/attributes/triggerA\"))") + .withExtraFields(ThingFieldSelector.fromString("definition")) + .build(); + // Topic B: matches modifications to /attributes/triggerB, enriched with features + final FilteredTopic topicB = ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/attributes/triggerB\"))") + .withExtraFields(ThingFieldSelector.fromString("features")) + .build(); + // Topic C: matches modifications to /attributes/triggerC, no extra fields + final FilteredTopic topicC = ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS) + .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/attributes/triggerC\"))") + .build(); + + final Target target = ConnectivityModelFactory.newTargetBuilder(baseTarget) + .address(targetAddress) + .topics(Set.of(topicA, topicB, topicC)) + .build(); + + final Connection connection = baseConnection.toBuilder() + .setTargets(Collections.singletonList(target)) + .build(); + + cf.asyncCreateConnection(connection).get(60, TimeUnit.SECONDS); + final C consumer = initTargetsConsumer(connectionName, targetAddress); + + final ThingId thingId = generateThingId(); + final PolicyId policyId = putPolicyForThing(thingId, connectionName); + final FeatureProperties sensorProps = ThingsModelFactory.newFeaturePropertiesBuilder() + .set("value", 42) + .build(); + final Thing thing = Thing.newBuilder() + .setId(thingId) + .setPolicyId(policyId) + .setDefinition(ThingsModelFactory.newDefinition("test:TestModel:1.0.0")) + .setAttribute(JsonPointer.of("triggerA"), JsonValue.of("initial")) + .setAttribute(JsonPointer.of("triggerB"), JsonValue.of("initial")) + .setAttribute(JsonPointer.of("triggerC"), JsonValue.of("initial")) + .setFeature("sensor", sensorProps) + .build(); + + putThing(2, thing, JsonSchemaVersion.V_2) + .withJWT(testingContextWithRandomNs.getOAuthClient().getAccessToken()) + .expectingHttpStatus(HttpStatus.CREATED) + .fire(); + Thread.sleep(1000); + + // --- Step 1: modify triggerA → matches topicA → extra should contain definition only --- + putAttribute(2, thingId, "triggerA", "\"fired\"") + .withJWT(testingContextWithRandomNs.getOAuthClient().getAccessToken()) + .expectingHttpStatus(HttpStatus.NO_CONTENT) + .fire(); + + final M messageA = consumeFromTarget(connectionName, consumer); + assertThat(messageA).describedAs("event for triggerA modification").isNotNull(); + final Adaptable adaptableA = jsonifiableAdaptableFrom(messageA); + assertThat(adaptableA.getTopicPath().getChannel()).isEqualTo(TopicPath.Channel.TWIN); + assertThat(adaptableA.getTopicPath().getAction()).contains(TopicPath.Action.MODIFIED); + assertThat(adaptableA.getPayload().getPath().toString()) + .describedAs("event path should be /attributes/triggerA") + .isEqualTo(JsonPointer.of("attributes/triggerA").toString()); + assertThat(adaptableA.getPayload().getExtra()).describedAs("topicA extra should contain definition") + .isPresent() + .hasValueSatisfying(extra -> { + assertThat(extra.contains(JsonKey.of("definition"))) + .describedAs("topicA extra must include definition") + .isTrue(); + assertThat(extra.contains(JsonKey.of("features"))) + .describedAs("topicA extra must NOT include features") + .isFalse(); + }); + + // --- Step 2: modify triggerB → matches topicB → extra should contain features only --- + putAttribute(2, thingId, "triggerB", "\"fired\"") + .withJWT(testingContextWithRandomNs.getOAuthClient().getAccessToken()) + .expectingHttpStatus(HttpStatus.NO_CONTENT) + .fire(); + + final M messageB = consumeFromTarget(connectionName, consumer); + assertThat(messageB).describedAs("event for triggerB modification").isNotNull(); + final Adaptable adaptableB = jsonifiableAdaptableFrom(messageB); + assertThat(adaptableB.getTopicPath().getChannel()).isEqualTo(TopicPath.Channel.TWIN); + assertThat(adaptableB.getTopicPath().getAction()).contains(TopicPath.Action.MODIFIED); + assertThat(adaptableB.getPayload().getPath().toString()) + .describedAs("event path should be /attributes/triggerB") + .isEqualTo(JsonPointer.of("attributes/triggerB").toString()); + assertThat(adaptableB.getPayload().getExtra()).describedAs("topicB extra should contain features") + .isPresent() + .hasValueSatisfying(extra -> { + assertThat(extra.contains(JsonKey.of("features"))) + .describedAs("topicB extra must include features") + .isTrue(); + assertThat(extra.getValue(JsonPointer.of("features/sensor/properties/value"))) + .describedAs("topicB extra must include sensor feature value") + .contains(JsonValue.of(42)); + assertThat(extra.contains(JsonKey.of("definition"))) + .describedAs("topicB extra must NOT include definition") + .isFalse(); + }); + + // --- Step 3: modify triggerC → matches topicC → no extra fields --- + putAttribute(2, thingId, "triggerC", "\"fired\"") + .withJWT(testingContextWithRandomNs.getOAuthClient().getAccessToken()) + .expectingHttpStatus(HttpStatus.NO_CONTENT) + .fire(); + + final M messageC = consumeFromTarget(connectionName, consumer); + assertThat(messageC).describedAs("event for triggerC modification").isNotNull(); + final Adaptable adaptableC = jsonifiableAdaptableFrom(messageC); + assertThat(adaptableC.getTopicPath().getChannel()).isEqualTo(TopicPath.Channel.TWIN); + assertThat(adaptableC.getTopicPath().getAction()).contains(TopicPath.Action.MODIFIED); + assertThat(adaptableC.getPayload().getPath().toString()) + .describedAs("event path should be /attributes/triggerC") + .isEqualTo(JsonPointer.of("attributes/triggerC").toString()); + assertThat(adaptableC.getPayload().getExtra()) + .describedAs("topicC extra should be empty (topic has no extraFields)") + .isEmpty(); + } + @Test @Category(RequireSource.class) @Connections({CONNECTION1, CONNECTION2})