diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java
index 2d07124367..253033d5f2 100644
--- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java
+++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActor.java
@@ -70,6 +70,7 @@
import org.eclipse.ditto.connectivity.api.OutboundSignalFactory;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.FilteredTopic;
+import org.eclipse.ditto.connectivity.model.Topic;
import org.eclipse.ditto.connectivity.model.LogCategory;
import org.eclipse.ditto.connectivity.model.LogType;
import org.eclipse.ditto.connectivity.model.MetricDirection;
@@ -103,7 +104,6 @@
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.placeholders.ExpressionResolver;
-import org.eclipse.ditto.placeholders.HeadersPlaceholder;
import org.eclipse.ditto.placeholders.PipelineElement;
import org.eclipse.ditto.placeholders.PlaceholderFactory;
import org.eclipse.ditto.placeholders.PlaceholderResolver;
@@ -149,7 +149,6 @@ public final class OutboundMappingProcessorActor
private static final ResourcePlaceholder RESOURCE_PLACEHOLDER = ResourcePlaceholder.getInstance();
private static final TimePlaceholder TIME_PLACEHOLDER = TimePlaceholder.getInstance();
private static final ThingJsonPlaceholder THING_JSON_PLACEHOLDER = ThingJsonPlaceholder.getInstance();
- private static final HeadersPlaceholder HEADERS_PLACEHOLDER = PlaceholderFactory.newHeadersPlaceholder();
private final ActorRef clientActor;
private final Connection connection;
@@ -396,7 +395,7 @@ protected void messageDiscarded(final OutboundSignal message, final QueueOfferRe
outboundPair.first(),
outboundMappingProcessor,
Source.single(outboundPair.first())
- .via(splitByTargetExtraFieldsFlow())
+ .via(pairTargetsWithTopicsFlow())
.mapAsync(mappingConfig.getParallelism(), this::enrichAndFilterSignal)
.mapConcat(x -> x)
.map(outbound -> handleOutboundSignal(outbound, outboundMappingProcessor))
@@ -411,61 +410,53 @@ protected void messageDiscarded(final OutboundSignal message, final QueueOfferRe
* Create a flow that splits 1 outbound signal into many as follows.
*
* -
- * Targets with matching filtered topics without extra fields are grouped into 1 outbound signal, followed by
+ * For each target without extra fields, it produces a pair of outbound signal and empty set of topics.
+ * As these targets have already passed pre-filtering in an early stage, no more filtering is needed.
+ * -
+ * For each target containing any extra field in its topics, it produces a pair of outbound signal and a set of its target topics.
+ * As the filter could include extra fields, an additional filtering must be performed after extracting the extra fields.
+ * This filtering should pass the first match only to not duplicate the outbound signal for the same target.
+ * The outbound signal should be enriched with only those extra fields, which are listed in the topic matched the filter.
*
- * - one outbound signal for each target with a matching filtered topic with extra fields.
- *
- * The matching filtered topic is attached in the latter case.
- * Consequently, for each outbound signal leaving this flow, if it has a filtered topic attached,
- * then it has 1 unique target with a matching topic with extra fields.
+ *
* This satisfies the precondition of {@code this#enrichAndFilterSignal}.
*
* @return the flow.
*/
- private static Flow, NotUsed> splitByTargetExtraFieldsFlow() {
+ private static Flow>, NotUsed> pairTargetsWithTopicsFlow() {
return Flow.create()
.mapConcat(outboundSignal -> {
- final Pair, List>> splitTargets =
- splitTargetsByExtraFields(outboundSignal);
-
- final boolean shouldSendSignalWithoutExtraFields =
- !splitTargets.first().isEmpty() ||
- isCommandResponseWithReplyTarget(outboundSignal.getSource()) ||
- outboundSignal.getTargets().isEmpty(); // no target - this is an error response
- final Stream> outboundSignalWithoutExtraFields =
- shouldSendSignalWithoutExtraFields
- ? Stream.of(Pair.create(outboundSignal.setTargets(splitTargets.first()), null))
- : Stream.empty();
-
- final Stream> outboundSignalWithExtraFields =
- splitTargets.second().stream()
- .map(targetAndSelector -> Pair.create(
- outboundSignal.setTargets(
- Collections.singletonList(targetAndSelector.first())),
- targetAndSelector.second()));
-
- return Stream.concat(outboundSignalWithoutExtraFields, outboundSignalWithExtraFields).toList();
+ final boolean shouldSendSignalDirectly =
+ isCommandResponseWithReplyTarget(outboundSignal.getSource()) ||
+ outboundSignal.getTargets().isEmpty(); // no target - this is an error response
+ return shouldSendSignalDirectly
+ ? List.of(Pair.create(outboundSignal, Collections.emptySet()))
+ : pairTargetsWithTopics(outboundSignal).stream()
+ .map(targetAndSelector -> Pair.create(
+ outboundSignal.setTargets(Collections.singletonList(targetAndSelector.first())),
+ targetAndSelector.second()))
+ .toList();
});
}
-
// Called inside stream; must be thread-safe
- // precondition: whenever filteredTopic != null, it contains an extra fields
private CompletionStage> enrichAndFilterSignal(
- final Pair outboundSignalWithExtraFields) {
-
+ final Pair> outboundSignalWithExtraFields) {
final OutboundSignalWithSender outboundSignal = outboundSignalWithExtraFields.first();
- final FilteredTopic filteredTopic = outboundSignalWithExtraFields.second();
- final ExpressionResolver expressionResolver =
- Resolvers.forSignal(outboundSignal.getSource(), connection.getId());
- final Optional extraFieldsOptional = getExtraFields(expressionResolver, filteredTopic);
- if (extraFieldsOptional.isEmpty()) {
+ final Set topics = outboundSignalWithExtraFields.second();
+
+ final List allExtraFields = topics.stream()
+ .map(FilteredTopic::getExtraFields)
+ .flatMap(Optional::stream)
+ .toList();
+ final boolean topicWithNoFilterNoExtraFieldsExists = topics.stream().anyMatch(topic -> topic.getFilter().isEmpty() && topic.getExtraFields().isEmpty());
+ if (allExtraFields.isEmpty() || topicWithNoFilterNoExtraFieldsExists) {
+ // Pre-filtering already did the job
return CompletableFuture.completedFuture(Collections.singletonList(outboundSignal));
}
- final JsonFieldSelector extraFields = extraFieldsOptional.get();
- final Target target = outboundSignal.getTargets().get(0);
-
+ final boolean topicWithNoFilterExists = topics.stream().anyMatch(topic -> topic.getFilter().isEmpty());
+ final Target target = outboundSignal.getTargets().getFirst();
final DittoHeaders headers = DittoHeaders.newBuilder()
.authorizationContext(target.getAuthorizationContext())
// the correlation-id MUST NOT be set! as the DittoHeaders are used as a caching key in the Caffeine
@@ -474,21 +465,40 @@ private CompletionStage> enrichAndFilterSig
.schemaVersion(JsonSchemaVersion.LATEST)
.build();
- return extractEntityId(outboundSignal.delegate.getSource())
+ final ExpressionResolver expressionResolver =
+ Resolvers.forSignal(outboundSignal.getSource(), connection.getId());
+ final Optional allExtraFieldsOptional = getExtraFields(expressionResolver, allExtraFields);
+
+ // Avoid multiple calls to 'retrievePartialThing' (for each topic with extra fields) by combining extra fields from all topics
+ final Optional> partialThingOptional = extractEntityId(outboundSignal.delegate.getSource())
.filter(ThingId.class::isInstance)
.map(ThingId.class::cast)
- .map(thingId ->
- signalEnrichmentFacade.retrievePartialThing(
- thingId,
- extraFields,
- headers,
- outboundSignal.getSource())
- )
- .map(partialThingCompletionStage -> partialThingCompletionStage
- .thenApply(outboundSignal::setExtra)
- .thenApply(outboundSignalWithExtra -> applyFilter(outboundSignalWithExtra, filteredTopic)))
- .orElse(CompletableFuture.completedFuture(outboundSignal)
- .thenApply(outboundSignalWithExtra -> applyFilter(outboundSignalWithExtra, filteredTopic)))
+ .flatMap(thingId -> allExtraFieldsOptional
+ .map(resolvedExtraFields ->
+ signalEnrichmentFacade.retrievePartialThing(
+ thingId,
+ resolvedExtraFields,
+ headers,
+ outboundSignal.getSource())));
+
+ return partialThingOptional
+ .map(partialThing -> partialThing
+ .>thenApply(extra -> {
+ final Thing enrichedThing = ThingEventToThingConverter.mergeThingWithExtraFields(
+ outboundSignal.getSource(),
+ allExtraFieldsOptional.get(),
+ extra).orElse(null);
+ return topics.stream()
+ .filter(_ -> enrichedThing != null || topicWithNoFilterExists)
+ .flatMap(topic -> applyFilter(outboundSignal, enrichedThing, topic)
+ .map(signal -> setTrimmedExtra(signal, topic, expressionResolver,
+ extra, allExtraFieldsOptional.get()))
+ .stream())
+ .findFirst()
+ .map(Collections::singletonList)
+ .orElse(Collections.emptyList());
+ }))
+ .orElseGet(() -> CompletableFuture.completedFuture(Collections.singletonList(outboundSignal)))
.exceptionally(error -> {
logger.withCorrelationId(outboundSignal.getSource())
.warning("Could not retrieve extra data due to: {} {}", error.getClass().getSimpleName(),
@@ -498,17 +508,35 @@ private CompletionStage> enrichAndFilterSig
});
}
+ private static OutboundSignalWithSender setTrimmedExtra(final OutboundSignalWithSender signal,
+ final FilteredTopic topic,
+ final ExpressionResolver expressionResolver,
+ final JsonObject extra,
+ final JsonFieldSelector allExtraFields) {
+
+ return topic.getExtraFields()
+ .flatMap(fields -> getExtraFields(expressionResolver, Collections.singletonList(fields)))
+ .map(neededFields -> {
+ final var builder = extra.toBuilder();
+ allExtraFields.getPointers().stream()
+ .filter(pointer -> !neededFields.getPointers().contains(pointer))
+ .forEach(pointer -> pointer.getRoot().ifPresent(builder::remove));
+ return signal.setExtra(builder.build());
+ })
+ .orElse(signal);
+ }
+
private static Optional getExtraFields(final ExpressionResolver expressionResolver,
- @Nullable final FilteredTopic filteredTopic) {
-
- return Optional.ofNullable(filteredTopic)
- .flatMap(FilteredTopic::getExtraFields)
- .map(extraFields -> extraFields.getPointers().stream()
- .map(JsonPointer::toString)
- .map(expressionResolver::resolve)
- .flatMap(PipelineElement::toStream)
- .map(JsonPointer::of)
- .toList())
+ final List extraFieldsSelectors) {
+
+ return Optional.of(
+ extraFieldsSelectors.stream()
+ .flatMap(selector -> selector.getPointers().stream())
+ .map(JsonPointer::toString)
+ .map(expressionResolver::resolve)
+ .flatMap(PipelineElement::toStream)
+ .map(JsonPointer::of)
+ .toList())
.filter(jsonPointers -> !jsonPointers.isEmpty())
.map(JsonFactory::newFieldSelector)
.map(ThingFieldSelector::fromJsonFieldSelector);
@@ -736,7 +764,7 @@ private CompletionStage> toMultiMappe
logger);
return List.of();
} else {
- final ActorRef sender = outboundSignals.get(0).sender;
+ final ActorRef sender = outboundSignals.getFirst().sender;
final List targetsToPublishAt = outboundSignals.stream()
.map(OutboundSignal::getTargets)
.flatMap(List::stream)
@@ -787,15 +815,22 @@ private static Stream filterFailedEnrichments(
});
}
- private Collection applyFilter(final OutboundSignalWithSender outboundSignalWithExtra,
- final FilteredTopic filteredTopic) {
+ private Optional applyFilter(final OutboundSignalWithSender outboundSignal,
+ @Nullable final Thing thing, final FilteredTopic topic) {
+
+ final Signal> signal = outboundSignal.getSource();
+ final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
+
+ if (!topicMatchesTopicPath(topicPath, topic.getTopic())) {
+ return Optional.empty();
+ }
- final Optional filter = filteredTopic.getFilter();
- final Optional extraFields = filteredTopic.getExtraFields();
- if (filter.isPresent() && extraFields.isPresent()) {
+ final Optional filter = topic.getFilter();
+ if (filter.isPresent()) {
+ if (thing == null) {
+ return Optional.empty();
+ }
// evaluate filter criteria again if signal enrichment is involved.
- final Signal> signal = outboundSignalWithExtra.getSource();
- final TopicPath topicPath = DITTO_PROTOCOL_ADAPTER.toTopicPath(signal);
final PlaceholderResolver topicPathPlaceholderResolver = PlaceholderFactory
.newPlaceholderResolver(TOPIC_PATH_PLACEHOLDER, topicPath);
final PlaceholderResolver entityIdPlaceholderResolver = PlaceholderFactory
@@ -815,27 +850,36 @@ private Collection applyFilter(final OutboundSignalWit
topicPathPlaceholderResolver, entityIdPlaceholderResolver, thingPlaceholderResolver,
featurePlaceholderResolver, resourcePlaceholderResolver, timePlaceholderResolver
).filterCriteria(filter.get(), dittoHeaders);
- return outboundSignalWithExtra.getExtra()
- .flatMap(extra -> ThingEventToThingConverter
- .mergeThingWithExtraFields(signal, extraFields.get(), extra)
- .filter(thing -> {
- final PlaceholderResolver thingJsonPlaceholderResolver = PlaceholderFactory
- .newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing);
- return ThingPredicateVisitor.apply(criteria, topicPathPlaceholderResolver,
- entityIdPlaceholderResolver, thingPlaceholderResolver,
- featurePlaceholderResolver, resourcePlaceholderResolver,
- timePlaceholderResolver, thingJsonPlaceholderResolver)
- .test(thing);
- })
- .map(thing -> outboundSignalWithExtra))
- .map(Collections::singletonList)
- .orElse(List.of());
+ final PlaceholderResolver thingJsonPlaceholderResolver = PlaceholderFactory
+ .newPlaceholderResolver(THING_JSON_PLACEHOLDER, thing);
+ final var result = Optional.of(outboundSignal)
+ .filter(_ -> ThingPredicateVisitor
+ .apply(criteria, topicPathPlaceholderResolver,
+ entityIdPlaceholderResolver, thingPlaceholderResolver,
+ featurePlaceholderResolver, resourcePlaceholderResolver,
+ timePlaceholderResolver, thingJsonPlaceholderResolver)
+ .test(thing));
+ return result;
} else {
// no signal enrichment: filtering is already done in SignalFilter since there is no ignored field
- return Collections.singletonList(outboundSignalWithExtra);
+ return Optional.of(outboundSignal);
}
}
+ private static boolean topicMatchesTopicPath(final TopicPath topicPath, final Topic topic) {
+ return switch (topic) {
+ case TWIN_EVENTS -> topicPath.isChannel(TopicPath.Channel.TWIN)
+ && topicPath.isCriterion(TopicPath.Criterion.EVENTS);
+ case LIVE_EVENTS -> topicPath.isChannel(TopicPath.Channel.LIVE)
+ && topicPath.isCriterion(TopicPath.Criterion.EVENTS);
+ case LIVE_COMMANDS -> topicPath.isChannel(TopicPath.Channel.LIVE)
+ && topicPath.isCriterion(TopicPath.Criterion.COMMANDS);
+ case LIVE_MESSAGES -> topicPath.isChannel(TopicPath.Channel.LIVE)
+ && topicPath.isCriterion(TopicPath.Criterion.MESSAGES);
+ default -> false;
+ };
+ }
+
private static String stackTraceAsString(final DittoRuntimeException exception) {
final StringWriter stringWriter = new StringWriter();
exception.printStackTrace(new PrintWriter(stringWriter));
@@ -848,38 +892,39 @@ private static boolean isSuccessResponse(final CommandResponse> response) {
}
/**
- * Split the targets of an outbound signal into 2 parts: those without extra fields and those with.
+ * Pairs each target of an outbound signal with its topics, if any with an extra field.
*
* @param outboundSignal The outbound signal.
- * @return A pair of lists. The first list contains targets without matching extra fields.
- * The second list contains targets together with their extra fields matching the outbound signal.
+ * @return A list of pairs, one per target.
+ * If the target has at least one topic with extra fields, the target is paired with a set of its topics.
+ * Otherwise (no extra fields), it is paired with an empty set.
+ * If the signal has no streaming type, all targets are paired with an empty set.
*/
- private static Pair, List>> splitTargetsByExtraFields(
+ private static List>> pairTargetsWithTopics(
final OutboundSignal outboundSignal) {
final Optional streamingTypeOptional = StreamingType.fromSignal(outboundSignal.getSource());
if (streamingTypeOptional.isPresent()) {
// Find targets with a matching topic with extra fields
final StreamingType streamingType = streamingTypeOptional.get();
- final List targetsWithoutExtraFields = new ArrayList<>(outboundSignal.getTargets().size());
- final List> targetsWithExtraFields =
+ final List>> targetsPairedWithTopics =
new ArrayList<>(outboundSignal.getTargets().size());
+
for (final Target target : outboundSignal.getTargets()) {
- final Optional matchingExtraFields = target.getTopics()
- .stream()
- .filter(filteredTopic -> filteredTopic.getExtraFields().isPresent() &&
- streamingType == StreamingType.fromTopic(filteredTopic.getTopic().getPubSubTopic()))
- .findAny();
- if (matchingExtraFields.isPresent()) {
- targetsWithExtraFields.add(Pair.create(target, matchingExtraFields.get()));
+ if (target.getTopics().stream()
+ .anyMatch(filteredTopic -> filteredTopic.getExtraFields().isPresent() &&
+ streamingType == StreamingType.fromTopic(filteredTopic.getTopic().getPubSubTopic()))) {
+ targetsPairedWithTopics.add(Pair.create(target, target.getTopics()));
} else {
- targetsWithoutExtraFields.add(target);
+ targetsPairedWithTopics.add(Pair.create(target, Collections.emptySet()));
}
}
- return Pair.create(targetsWithoutExtraFields, targetsWithExtraFields);
+ return targetsPairedWithTopics;
} else {
// The outbound signal has no streaming type: Do not attach extra fields.
- return Pair.create(outboundSignal.getTargets(), Collections.emptyList());
+ return outboundSignal.getTargets().stream()
+ .map(target -> Pair.create(target, Collections.emptySet()))
+ .toList();
}
}
diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/MessageMappingProcessorActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/MessageMappingProcessorActorTest.java
index f5d47d6281..c20d5b59ea 100644
--- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/MessageMappingProcessorActorTest.java
+++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/MessageMappingProcessorActorTest.java
@@ -176,15 +176,15 @@ public void testSignalEnrichment() {
proxyActorProbe.reply(
RetrieveThingResponse.of(retrieveThing.getEntityId(), extra, retrieveThing.getDittoHeaders()));
- // THEN: a mapped signal without enrichment arrives first
+ // THEN: a mapped signal with enrichment arrives first
final BaseClientActor.PublishMappedMessage
publishMappedMessage = expectMsgClass(BaseClientActor.PublishMappedMessage.class);
int i = 0;
- expectPublishedMappedMessage(publishMappedMessage, i++, signal, targetWithoutEnrichment);
-
- // THEN: Receive an outbound signal with extra fields.
- expectPublishedMappedMessage(publishMappedMessage, i, signal, targetWithEnrichment,
+ expectPublishedMappedMessage(publishMappedMessage, i++, signal, targetWithEnrichment,
mapped -> assertThat(mapped.getAdaptable().getPayload().getExtra()).contains(extra));
+
+ // THEN: Receive an outbound signal without extra fields.
+ expectPublishedMappedMessage(publishMappedMessage, i, signal, targetWithoutEnrichment);
}};
}
@@ -287,65 +287,65 @@ public void testSignalEnrichmentWithPayloadMappedTargets() {
publishMappedMessage = expectMsgClass(BaseClientActor.PublishMappedMessage.class);
int i = 0;
+ // THEN: Receive an outbound signal with extra fields.
+ expectPublishedMappedMessage(publishMappedMessage, i++, signal, targetWithEnrichment,
+ mapped -> assertThat(mapped.getAdaptable().getPayload().getExtra()).contains(extra),
+ mapped -> assertThat(mapped.getExternalMessage().getHeaders()).containsOnlyKeys("content-type")
+ );
+
// THEN: the first mapped signal is without enrichment
expectPublishedMappedMessage(publishMappedMessage, i++, signal, targetWithoutEnrichment,
mapped -> assertThat(mapped.getExternalMessage().getHeaders()).containsOnlyKeys("content-type")
);
- // THEN: the second mapped signal is without enrichment and applied 1 payload mapper arrives
+ // THEN: Receive an outbound signal with extra fields and with mapped payload.
expectPublishedMappedMessage(publishMappedMessage, i++, signal,
- targetWithoutEnrichmentAnd1PayloadMapper,
+ targetWithEnrichmentAnd1PayloadMapper,
+ mapped -> assertThat(mapped.getAdaptable().getPayload().getExtra()).contains(extra),
mapped -> assertThat(mapped.getExternalMessage().getHeaders())
.contains(AddHeaderMessageMapper.OUTBOUND_HEADER)
);
- // THEN: a mapped signal without enrichment and applied 2 payload mappers arrives causing 3 messages
- // as 1 mapper duplicates the message
+ // THEN: the second mapped signal is without enrichment and applied 1 payload mapper arrives
expectPublishedMappedMessage(publishMappedMessage, i++, signal,
- targetWithoutEnrichmentAnd2PayloadMappers,
+ targetWithoutEnrichmentAnd1PayloadMapper,
mapped -> assertThat(mapped.getExternalMessage().getHeaders())
.contains(AddHeaderMessageMapper.OUTBOUND_HEADER)
);
+
+ // THEN: a mapped signal with enrichment and applied 2 payload mappers arrives causing 3 messages
+ // as 1 mapper duplicates the message
expectPublishedMappedMessage(publishMappedMessage, i++, signal,
- targetWithoutEnrichmentAnd2PayloadMappers,
+ targetWithEnrichmentAnd2PayloadMappers,
+ mapped -> assertThat(mapped.getAdaptable().getPayload().getExtra()).contains(extra),
mapped -> assertThat(mapped.getExternalMessage().getHeaders()).containsOnlyKeys("content-type")
);
expectPublishedMappedMessage(publishMappedMessage, i++, signal,
- targetWithoutEnrichmentAnd2PayloadMappers,
- mapped -> assertThat(mapped.getExternalMessage().getHeaders()).containsOnlyKeys("content-type")
- );
-
- // THEN: Receive an outbound signal with extra fields.
- expectPublishedMappedMessage(publishMappedMessage, i++, signal, targetWithEnrichment,
+ targetWithEnrichmentAnd2PayloadMappers,
mapped -> assertThat(mapped.getAdaptable().getPayload().getExtra()).contains(extra),
mapped -> assertThat(mapped.getExternalMessage().getHeaders()).containsOnlyKeys("content-type")
);
-
- // THEN: Receive an outbound signal with extra fields and with mapped payload.
expectPublishedMappedMessage(publishMappedMessage, i++, signal,
- targetWithEnrichmentAnd1PayloadMapper,
+ targetWithEnrichmentAnd2PayloadMappers,
mapped -> assertThat(mapped.getAdaptable().getPayload().getExtra()).contains(extra),
mapped -> assertThat(mapped.getExternalMessage().getHeaders())
.contains(AddHeaderMessageMapper.OUTBOUND_HEADER)
);
- // THEN: a mapped signal with enrichment and applied 2 payload mappers arrives causing 3 messages
+ // THEN: a mapped signal without enrichment and applied 2 payload mappers arrives causing 3 messages
// as 1 mapper duplicates the message
expectPublishedMappedMessage(publishMappedMessage, i++, signal,
- targetWithEnrichmentAnd2PayloadMappers,
- mapped -> assertThat(mapped.getAdaptable().getPayload().getExtra()).contains(extra),
- mapped -> assertThat(mapped.getExternalMessage().getHeaders()).containsOnlyKeys("content-type")
+ targetWithoutEnrichmentAnd2PayloadMappers,
+ mapped -> assertThat(mapped.getExternalMessage().getHeaders())
+ .contains(AddHeaderMessageMapper.OUTBOUND_HEADER)
);
expectPublishedMappedMessage(publishMappedMessage, i++, signal,
- targetWithEnrichmentAnd2PayloadMappers,
- mapped -> assertThat(mapped.getAdaptable().getPayload().getExtra()).contains(extra),
+ targetWithoutEnrichmentAnd2PayloadMappers,
mapped -> assertThat(mapped.getExternalMessage().getHeaders()).containsOnlyKeys("content-type")
);
expectPublishedMappedMessage(publishMappedMessage, i, signal,
- targetWithEnrichmentAnd2PayloadMappers,
- mapped -> assertThat(mapped.getAdaptable().getPayload().getExtra()).contains(extra),
- mapped -> assertThat(mapped.getExternalMessage().getHeaders())
- .contains(AddHeaderMessageMapper.OUTBOUND_HEADER)
+ targetWithoutEnrichmentAnd2PayloadMappers,
+ mapped -> assertThat(mapped.getExternalMessage().getHeaders()).containsOnlyKeys("content-type")
);
}};
}
diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java
index c10f1f5d5b..44577181ca 100644
--- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java
+++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/OutboundMappingProcessorActorTest.java
@@ -41,21 +41,30 @@
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
+import org.eclipse.ditto.connectivity.model.FilteredTopic;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.connectivity.model.Topic;
import org.eclipse.ditto.internal.utils.pekko.ActorSystemResource;
import org.eclipse.ditto.internal.utils.protocol.ProtocolAdapterProvider;
import org.eclipse.ditto.internal.utils.tracing.DittoTracingInitResource;
+import org.eclipse.ditto.json.JsonKey;
import org.eclipse.ditto.json.JsonObject;
+import org.eclipse.ditto.json.JsonPointer;
+import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.Attributes;
+import org.eclipse.ditto.things.model.Feature;
+import org.eclipse.ditto.things.model.FeatureProperties;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingFieldSelector;
import org.eclipse.ditto.things.model.ThingId;
+import org.eclipse.ditto.things.model.ThingsModelFactory;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThingResponse;
+import org.eclipse.ditto.things.model.signals.events.FeatureModified;
import org.eclipse.ditto.things.model.signals.events.ThingModified;
+import org.jspecify.annotations.NonNull;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.FixMethodOrder;
@@ -75,6 +84,7 @@ public final class OutboundMappingProcessorActorTest {
DittoTracingInitResource.disableDittoTracing();
private static final Connection CONNECTION = createTestConnection();
+ public static final Thing THING = createTestThing();
@Rule
public final ActorSystemResource actorSystemResource = ActorSystemResource.newInstance(
@@ -100,10 +110,7 @@ public void setUp() {
@Test
public void sendWeakAckForAllSourcesAndTargetsWhenDroppedByAllTargets() {
new TestKit(actorSystemResource.getActorSystem()) {{
- final Props props =
- OutboundMappingProcessorActor.props(clientActorProbe.ref(), getProcessors(), CONNECTION,
- TestConstants.CONNECTIVITY_CONFIG, 3);
- final ActorRef underTest = actorSystemResource.newActor(props);
+ final ActorRef underTest = getTestActorRef(CONNECTION);
// WHEN: mapping processor actor receives outbound signal whose every authorized target is filtered out
final OutboundSignal outboundSignal = outboundTwinEvent(
@@ -129,12 +136,7 @@ public void sendWeakAckForAllSourcesAndTargetsWhenDroppedByAllTargets() {
@Test
public void eventsWithFailedEnrichmentIssueFailedAcks() {
new TestKit(actorSystemResource.getActorSystem()) {{
- final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(),
- getProcessors(),
- CONNECTION,
- TestConstants.CONNECTIVITY_CONFIG,
- 3);
- final ActorRef underTest = actorSystemResource.newActor(props);
+ final ActorRef underTest = getTestActorRef(CONNECTION);
final OutboundSignal outboundSignal = outboundTwinEvent(
Attributes.newBuilder().set("target2", "wayne").build(),
@@ -156,12 +158,7 @@ public void eventsWithFailedEnrichmentIssueFailedAcks() {
@Test
public void sendWeakAckWhenDroppedBySomeTarget() {
new TestKit(actorSystemResource.getActorSystem()) {{
- final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(),
- getProcessors(),
- CONNECTION,
- TestConstants.CONNECTIVITY_CONFIG,
- 3);
- final ActorRef underTest = actorSystemResource.newActor(props);
+ final ActorRef underTest = getTestActorRef(CONNECTION);
// WHEN: mapping processor actor receives outbound signal with 2 authorized targets,
// 1 of which drops it via RQL filter after enrichment
@@ -188,12 +185,7 @@ public void sendWeakAckWhenDroppedBySomeTarget() {
@Test
public void sendWeakAckWhenDroppedByMapper() {
new TestKit(actorSystemResource.getActorSystem()) {{
- final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(),
- getProcessors(),
- CONNECTION,
- TestConstants.CONNECTIVITY_CONFIG,
- 3);
- final ActorRef underTest = actorSystemResource.newActor(props);
+ final ActorRef underTest = getTestActorRef(CONNECTION);
// WHEN: mapping processor actor receives outbound signal with 2 authorized targets,
// 1 of which drops it via payload mapping
@@ -218,12 +210,7 @@ public void sendWeakAckWhenDroppedByMapper() {
@Test
public void doNotSendWeakAckForLiveResponse() {
new TestKit(actorSystemResource.getActorSystem()) {{
- final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(),
- getProcessors(),
- CONNECTION,
- TestConstants.CONNECTIVITY_CONFIG,
- 3);
- final ActorRef underTest = actorSystemResource.newActor(props);
+ final ActorRef underTest = getTestActorRef(CONNECTION);
// WHEN: mapping processor actor receives outbound signal with 3 authorized targets,
// 2 of which drops it via payload mapping, 1 of which issues live-response
@@ -248,12 +235,7 @@ public void doNotSendWeakAckForLiveResponse() {
@Test
public void expectNoTargetIssuedAckRequestInPublishedSignals() {
new TestKit(actorSystemResource.getActorSystem()) {{
- final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(),
- getProcessors(),
- CONNECTION,
- TestConstants.CONNECTIVITY_CONFIG,
- 3);
- final ActorRef underTest = actorSystemResource.newActor(props);
+ final ActorRef underTest = getTestActorRef(CONNECTION);
// WHEN: mapping processor actor receives outbound signal
// with requests for source-declared and target-issued acks
@@ -274,19 +256,226 @@ public void expectNoTargetIssuedAckRequestInPublishedSignals() {
}};
}
- private List getProcessors() {
+ private List getProcessors(Connection connection) {
return List.of(
- OutboundMappingProcessor.of(CONNECTION,
- TestConstants.CONNECTIVITY_CONFIG, actorSystemResource.getActorSystem(),
- protocolAdapterProvider.getProtocolAdapter("test"),
- AbstractMessageMappingProcessorActorTest.mockLoggingAdapter(), null)
+ OutboundMappingProcessor.of(connection,
+ TestConstants.CONNECTIVITY_CONFIG, actorSystemResource.getActorSystem(),
+ protocolAdapterProvider.getProtocolAdapter("test"),
+ AbstractMessageMappingProcessorActorTest.mockLoggingAdapter(), null)
);
}
+ @Test
+ public void multipleTopicsWithExtraFieldsFirstTopic() {
+ new TestKit(actorSystemResource.getActorSystem()) {{
+ // Create a target with multiple FilteredTopics for the same streaming type with different extraFields
+ List targets = List.of(createTestTargetMultiTopics(Set.of(topic4(), topic3(), topic5())));
+ final Connection connection = CONNECTION.toBuilder().setTargets(targets).build();
+ final ActorRef underTest = getTestActorRef(connection);
+
+ // Now test a signal that should match the second subscription (no extraFields)
+ final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING,
+ Feature.newBuilder().withId("feature4")
+ .build().setProperties(FeatureProperties.newBuilder().set("size", "large").build()),
+ List.of("multipleExtraFields"), targets, getRef());
+
+ underTest.tell(outboundSignal, getRef());
+ partialRetrieveAndResponse();
+
+ // Expect a publish message with enriched signal
+ final BaseClientActor.PublishMappedMessage publishWithPolicy =
+ clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class);
+
+ // Verify this is for the target with extraFields
+ assertThat(publishWithPolicy.getOutboundSignal().first().getTargets())
+ .contains(targets.get(0));
+
+ assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra())
+ .isPresent()
+ .hasValueSatisfying(extra -> {
+ assertThat(extra.contains(JsonKey.of("definition")))
+ .as("Outbound signal does not contain the requested extra fields.")
+ .isTrue();
+ assertThat(extra.contains(JsonKey.of("attributes")))
+ .as("Redundant extra fields exists, which must not exist.")
+ .isFalse();
+ });
+ }};
+ }
+
+ @Test
+ public void multipleTopicsWithExtraFieldsMidTopic() {
+ new TestKit(actorSystemResource.getActorSystem()) {{
+ // Create a target with multiple FilteredTopics for the same streaming type with different extraFields
+ List targets = List.of(createTestTargetMultiTopics(Set.of(topic4(), topic3(), topic5())));
+ final Connection connection = CONNECTION.toBuilder().setTargets(targets).build();
+ final ActorRef underTest = getTestActorRef(connection);
+
+ // Now test a signal that should match the second subscription (no extraFields)
+ final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING,
+ Feature.newBuilder().withId("feature3")
+ .build().setProperties(FeatureProperties.newBuilder().set("level", "full").build()),
+ List.of("multipleExtraFields"), targets, getRef());
+
+ underTest.tell(outboundSignal, getRef());
+ partialRetrieveAndResponse();
+
+ // Expect a publish message with enriched signal
+ final BaseClientActor.PublishMappedMessage publishWithPolicy =
+ clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class);
+
+ // Verify this is for the target with extraFields
+ assertThat(publishWithPolicy.getOutboundSignal().first().getTargets())
+ .contains(targets.get(0));
+
+ assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra())
+ .as("Outbound signal must not contain any extra field.")
+ .isEmpty();
+ }};
+ }
+
+ @Test
+ public void multipleTopicsWithExtraFieldsLastTopic() {
+ new TestKit(actorSystemResource.getActorSystem()) {{
+ // Create a target with multiple FilteredTopics for the same streaming type with different extraFields
+ List targets = List.of(createTestTargetMultiTopics(Set.of(topic4(), topic3(), topic5())));
+ final Connection connection = CONNECTION.toBuilder().setTargets(targets).build();
+ final ActorRef underTest = getTestActorRef(connection);
+
+ final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING,
+ Feature.newBuilder().withId("feature5")
+ .build().setProperties(FeatureProperties.newBuilder().set("level", "full").build()),
+ List.of("multipleExtraFields"), targets, getRef());
+
+ underTest.tell(outboundSignal, getRef());
+ partialRetrieveAndResponse();
+
+ final BaseClientActor.PublishMappedMessage publishWithPolicy =
+ clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class);
+
+ assertThat(publishWithPolicy.getOutboundSignal().first().getTargets())
+ .contains(targets.get(0));
+
+ assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra())
+ .isPresent()
+ .hasValueSatisfying(extra -> {
+ assertThat(extra.contains(JsonKey.of("attributes")))
+ .as("Outbound signal does not contain the requested extra fields.")
+ .isTrue();
+ assertThat(extra.contains(JsonKey.of("definition")))
+ .as("Redundant extra fields exists, which must not exist.")
+ .isFalse();
+ });
+ }};
+ }
+
+ @Test
+ public void multipleTopicsWithoutExtraFieldsFastProcessed() {
+ new TestKit(actorSystemResource.getActorSystem()) {{
+ // Create a target with multiple topics all without extra fields.
+ // Must send outbound signal, skipping filtering as the pre-filtering has already done the job
+ List targets = List.of(createTestTargetMultiTopics(Set.of(topic3(), topic3a())));
+ final Connection connection = CONNECTION.toBuilder().targets(targets).build();
+ final ActorRef underTest = getTestActorRef(connection);
+
+ final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING,
+ Feature.newBuilder().withId("water-tank").build(),
+ List.of("multipleWithoutExtraFields"), targets, getRef());
+ underTest.tell(outboundSignal, getRef());
+
+ final BaseClientActor.PublishMappedMessage publishWithPolicy =
+ clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class);
+
+ assertThat(publishWithPolicy.getOutboundSignal().first().getTargets())
+ .contains(targets.get(0));
+
+ assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra())
+ .isEmpty();
+ }};
+ }
+
+ @Test
+ public void multipleTopicsExtraFieldsFilterless() {
+ new TestKit(actorSystemResource.getActorSystem()) {{
+ // Create a target with multiple topics with some extra fields, but with a topic without a filter
+ // Must send outbound signal, skipping filtering but enriching with the requested extra fields
+ List targets = List.of(createTestTargetMultiTopics(Set.of(topic3(), topic5(), topic2())));
+ final Connection connection = CONNECTION.toBuilder().targets(targets).build();
+ final ActorRef underTest = getTestActorRef(connection);
+
+ final OutboundSignal outboundSignal = outboundFeatureTwinEvent(THING,
+ Feature.newBuilder().withId("some-feature").build(),
+ List.of("multipleWithExtraFields"), targets, getRef());
+ underTest.tell(outboundSignal, getRef());
+ partialRetrieveAndResponse();
+
+ final BaseClientActor.PublishMappedMessage publishWithPolicy =
+ clientActorProbe.expectMsgClass(BaseClientActor.PublishMappedMessage.class);
+
+ assertThat(publishWithPolicy.getOutboundSignal().first().getTargets())
+ .contains(targets.get(0));
+
+ assertThat(publishWithPolicy.getOutboundSignal().first().getAdaptable().getPayload().getExtra())
+ .isPresent()
+ .hasValueSatisfying(extra -> assertThat(extra.contains(JsonKey.of("definition")))
+ .as("Outbound signal does not contain the requested extra fields.")
+ .isTrue());
+
+ }};
+ }
+
+ private void partialRetrieveAndResponse() {
+ // Expect enrichment request for all fields in all topics
+ final RetrieveThing retrieveEnrichedThing = proxyActorProbe.expectMsgClass(RetrieveThing.class);
+ assertThat(retrieveEnrichedThing.getSelectedFields())
+ .isPresent()
+ .hasValueSatisfying(fields ->
+ assertThat(fields.getPointers())
+ .contains(JsonPointer.of("/attributes"), JsonPointer.of("/definition")));
+
+ // Reply with enriched Thing
+ final Thing enrichedThing = Thing.newBuilder()
+ .setAttributes(Attributes.newBuilder().set("attr1", "attrValue1").build())
+ .setDefinition(ThingsModelFactory.newDefinition("testNamespace:TestDefinition:1.2.3"))
+ .build();
+ proxyActorProbe.reply(RetrieveThingResponse.of(thingId(), enrichedThing, null, null, DittoHeaders.empty()));
+ }
+
+ private ActorRef getTestActorRef(Connection connection) {
+ final Props props = OutboundMappingProcessorActor.props(clientActorProbe.ref(),
+ getProcessors(connection),
+ connection,
+ TestConstants.CONNECTIVITY_CONFIG,
+ 3);
+ return actorSystemResource.newActor(props);
+ }
+
+ private static OutboundSignal outboundFeatureTwinEvent(final Thing thing, final Feature feature, final Collection requestedAcks,
+ final List targets, final ActorRef testRef) {
+ final List readGrantedSubjects = targets.stream()
+ .map(Target::getAuthorizationContext)
+ .flatMap(authContext -> authContext.getAuthorizationSubjects().stream())
+ .distinct()
+ .toList();
+
+ final FeatureModified featureModified = FeatureModified.of(thing.getEntityId().get(), feature, 2L, Instant.EPOCH,
+ DittoHeaders.newBuilder()
+ .acknowledgementRequests(requestedAcks.stream()
+ .map(AcknowledgementRequest::parseAcknowledgementRequest)
+ .toList())
+ .readGrantedSubjects(readGrantedSubjects)
+ .putHeader(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey(),
+ testRef.path().toSerializationFormat())
+ .build(),
+ Metadata.newMetadata(JsonObject.empty()));
+ return OutboundSignalFactory.newOutboundSignal(featureModified, targets);
+ }
+
private static OutboundSignal outboundTwinEvent(final Attributes attributes, final Collection requestedAcks,
final List targets, final ActorRef testRef) {
final Thing thing = Thing.newBuilder().setId(thingId())
.setAttributes(attributes)
+ .setPolicyId(PolicyId.of("test1:policy1"))
.build();
final List readGrantedSubjects = targets.stream()
.map(Target::getAuthorizationContext)
@@ -333,6 +522,56 @@ private static Connection createTestConnection() {
.build();
}
+ private static @NonNull Thing createTestThing() {
+ return Thing.newBuilder().setId(thingId())
+ .setAttributes(Attributes.newBuilder().set("attr1", "attrValue1").build())
+ .setPolicyId(PolicyId.of("test1:policy1"))
+ .build();
+ }
+
+ private static FilteredTopic topic1() {
+ return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS).build();
+ }
+
+ private static FilteredTopic topic2() {
+ return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS)
+ .withExtraFields(ThingFieldSelector.fromString("definition")).build();
+
+ }
+
+ private static FilteredTopic topic3() {
+ return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS)
+ .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/features/feature3\"))").build();
+ }
+
+ private static FilteredTopic topic3a() {
+ return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS)
+ .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/features/feature3a\"))").build();
+ }
+
+ private static FilteredTopic topic4() {
+ return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS)
+ .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/features/feature4\"))")
+ .withExtraFields(ThingFieldSelector.fromString("definition")).build();
+
+ }
+
+ private static FilteredTopic topic5() {
+ return ConnectivityModelFactory.newFilteredTopicBuilder(Topic.TWIN_EVENTS)
+ .withFilter("and(eq(topic:action,\"modified\"),eq(resource:path,\"/features/feature5\"))")
+ .withExtraFields(ThingFieldSelector.fromString("attributes")).build();
+
+ }
+
+ private static @NonNull Target createTestTargetMultiTopics(Set topics) {
+ return ConnectivityModelFactory.newTargetBuilder()
+ .address("multipleExtraFieldsTarget")
+ .authorizationContext(singletonContext(target1Subject()))
+ .topics(topics)
+ .issuedAcknowledgementLabel(AcknowledgementLabel.of("multipleExtraFields"))
+ .build();
+ }
+
private static Source createTestSource() {
return ConnectivityModelFactory.newSourceBuilder()
.address("source1")