getPreviousResource() {
return Optional.ofNullable(previousResource);
}
+ public Boolean getLastStateUnknow() {
+ return lastStateUnknow;
+ }
+
@Override
public String toString() {
- return "ExtendedResourceEvent{"
+ return "GenericResourceEvent{"
+ getPreviousResource()
.map(r -> "previousResourceVersion=" + r.getMetadata().getResourceVersion())
.orElse("")
@@ -61,7 +67,7 @@ public String toString() {
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
- ExtendedResourceEvent that = (ExtendedResourceEvent) o;
+ GenericResourceEvent that = (GenericResourceEvent) o;
return Objects.equals(previousResource, that.previousResource);
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
index 93d3eb5e80..ea2ab89c2d 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java
@@ -33,7 +33,6 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
-import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
/**
* Wraps informer(s) so they are connected to the eventing system of the framework. Note that since
@@ -123,8 +122,15 @@ public synchronized void onDelete(R resource, boolean deletedFinalStateUnknown)
log.debug(
"On delete event received. deletedFinalStateUnknown: {}", deletedFinalStateUnknown);
}
+ var resultEvent =
+ temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
+ if (resultEvent.isEmpty()) {
+ return;
+ }
+ if (resultEvent.orElseThrow().getAction() != ResourceAction.DELETED) {
+ log.warn("Non delete event received on onDelete handling. This should not happen.");
+ }
primaryToSecondaryIndex.onDelete(resource);
- temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
if (acceptedByDeleteFilters(resource, deletedFinalStateUnknown)) {
propagateEvent(resource);
}
@@ -152,22 +158,26 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol
primaryToSecondaryIndex.onAddOrUpdate(newObject);
var resourceID = ResourceID.fromResource(newObject);
- var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
+ var resultEvent = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);
- if (eventHandling != EventHandling.NEW) {
- log.debug(
- "{} event propagation", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping");
+ if (resultEvent.isEmpty()) {
+ log.debug("Deferring event propagation");
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
log.debug(
- "Propagating event for {}, resource with same version not result of a reconciliation.",
+ "Propagating event for {}, resource with same version not result of a our update.",
action);
- propagateEvent(newObject);
+ var event = resultEvent.get();
+ handleEvent(
+ event.getAction(),
+ (R) event.getResource().orElseThrow(),
+ (R) event.getPreviousResource().orElse(null),
+ event.getLastStateUnknow());
} else {
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
}
}
- private void propagateEvent(R object) {
+ protected void propagateEvent(R object) {
var primaryResourceIdSet =
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object);
if (primaryResourceIdSet.isEmpty()) {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
index f021101229..c56a42d27c 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
@@ -46,7 +46,6 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.*;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
-import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
@SuppressWarnings("rawtypes")
public abstract class ManagedInformerEventSource<
@@ -101,45 +100,19 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
try {
temporaryResourceCache.startEventFilteringModify(id);
updatedResource = updateMethod.apply(resourceToUpdate);
- log.debug("Resource update successful");
handleRecentResourceUpdate(id, updatedResource, resourceToUpdate);
+ log.debug("Caching resource update successful");
return updatedResource;
} finally {
- var res =
- temporaryResourceCache.doneEventFilterModify(
- id,
- updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion());
- var updatedForLambda = updatedResource;
+ var res = temporaryResourceCache.doneEventFilterModify(id);
res.ifPresentOrElse(
r -> {
- R latestResource = (R) r.getResource().orElseThrow();
- // as previous resource version we use the one from successful update, since
- // we process new event here only if that is more recent then the event from our update.
- // Note that this is equivalent with the scenario when an informer watch connection
- // would reconnect and loose some events in between.
- // If that update was not successful we still record the previous version from the
- // actual event in the ExtendedResourceEvent.
- R extendedResourcePrevVersion =
- (r instanceof ExtendedResourceEvent)
- ? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null)
- : null;
- R prevVersionOfResource =
- updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion;
- if (log.isDebugEnabled()) {
- log.debug(
- "Previous resource version: {} resource from update present: {}"
- + " extendedPrevResource present: {}",
- prevVersionOfResource.getMetadata().getResourceVersion(),
- updatedForLambda != null,
- extendedResourcePrevVersion != null);
- }
+ log.debug("Propagating not own event");
handleEvent(
r.getAction(),
- latestResource,
- prevVersionOfResource,
- (r instanceof ResourceDeleteEvent)
- ? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown()
- : null);
+ (R) r.getResource().orElseThrow(),
+ (R) r.getPreviousResource().orElse(null),
+ r.getLastStateUnknow());
},
() -> log.debug("No new event present after the filtering update"));
}
@@ -173,9 +146,15 @@ public synchronized void stop() {
@Override
public void onList(String resourceVersion, boolean remainedEmpty) {
+ temporaryResourceCache.setRelistFinished();
temporaryResourceCache.checkGhostResources();
}
+ @Override
+ public void onBeforeList(String lastSyncResourceVersion) {
+ temporaryResourceCache.setOngoingRelist();
+ }
+
@Override
public void handleRecentResourceUpdate(
ResourceID resourceID, R resource, R previousVersionOfResource) {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
index 405f52cc8d..5e757317ad 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java
@@ -29,8 +29,6 @@
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
-import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;
-import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
/**
* Temporal cache is used to solve the problem for {@link KubernetesDependentResource} that is, when
@@ -50,6 +48,15 @@
*
* If comparable resource versions are disabled, then this cache is effectively disabled.
*
+ *
Some principles to realize with the current filtering algorithm:
+ *
+ *
+ * - We propagate events only if we received an event that has the same resourceVersion or newer
+ * than resource version from update
+ *
- The propagated event should correspond to a possible real world scenario - considering also
+ * ones that could happen if the Informer does a re-list.
+ *
+ *
* @param resource to cache.
*/
public class TemporaryResourceCache {
@@ -59,15 +66,10 @@ public class TemporaryResourceCache {
private final Map cache = new ConcurrentHashMap<>();
private final Map activeUpdates = new HashMap<>();
private final boolean comparableResourceVersions;
+ private boolean informerOngoingRelist = false;
private final ManagedInformerEventSource managedInformerEventSource;
- public enum EventHandling {
- DEFER,
- OBSOLETE,
- NEW
- }
-
public TemporaryResourceCache(
boolean comparableResourceVersions,
ManagedInformerEventSource managedInformerEventSource) {
@@ -79,62 +81,50 @@ public synchronized void startEventFilteringModify(ResourceID resourceID) {
if (!comparableResourceVersions) {
return;
}
- var ed = activeUpdates.computeIfAbsent(resourceID, id -> new EventFilterDetails());
+ var ed =
+ activeUpdates.computeIfAbsent(
+ resourceID, id -> new EventFilterDetails(informerOngoingRelist));
ed.increaseActiveUpdates();
}
- public synchronized Optional doneEventFilterModify(
- ResourceID resourceID, String updatedResourceVersion) {
+ public synchronized Optional doneEventFilterModify(ResourceID resourceID) {
if (!comparableResourceVersions) {
return Optional.empty();
}
var ed = activeUpdates.get(resourceID);
- if (ed == null || !ed.decreaseActiveUpdates(updatedResourceVersion)) {
- log.debug(
- "Active updates {} for resource id: {}",
- ed != null ? ed.getActiveUpdates() : 0,
- resourceID);
+ if (ed == null) return Optional.empty();
+ if (!ed.decreaseActiveUpdates()) {
+ log.debug("Active updates {} for resource id: {}", ed.getActiveUpdates(), resourceID);
return Optional.empty();
}
- activeUpdates.remove(resourceID);
- var res = ed.getLatestEventAfterLastUpdateEvent();
- log.debug(
- "Zero active updates for resource id: {}; event after update event: {}; updated resource"
- + " version: {}",
- resourceID,
- res.isPresent(),
- updatedResourceVersion);
- return res;
+ return finaleEventHandlingAndCleanup(resourceID, ed);
}
- public void onDeleteEvent(T resource, boolean unknownState) {
- onEvent(ResourceAction.DELETED, resource, null, unknownState, true);
+ public Optional onDeleteEvent(T resource, boolean unknownState) {
+ return onEvent(ResourceAction.DELETED, resource, null, unknownState);
}
- public EventHandling onAddOrUpdateEvent(
+ public Optional onAddOrUpdateEvent(
ResourceAction action, T resource, T prevResourceVersion) {
- return onEvent(action, resource, prevResourceVersion, false, false);
+ return onEvent(action, resource, prevResourceVersion, null);
}
- private synchronized EventHandling onEvent(
- ResourceAction action,
- T resource,
- T prevResourceVersion,
- boolean unknownState,
- boolean delete) {
+ private synchronized Optional onEvent(
+ ResourceAction action, T resource, T prevResourceVersion, Boolean unknownState) {
+ GenericResourceEvent actualEvent =
+ toGenericResourceEvent(action, resource, prevResourceVersion, unknownState);
if (!comparableResourceVersions) {
- return EventHandling.NEW;
+ return Optional.of(actualEvent);
}
-
var resourceId = ResourceID.fromResource(resource);
if (log.isDebugEnabled()) {
log.debug("Processing event");
}
var cached = cache.get(resourceId);
- EventHandling result = EventHandling.NEW;
+ Optional result = Optional.of(actualEvent);
if (cached != null) {
int comp = ReconcilerUtilsInternal.compareResourceVersions(resource, cached);
- if (comp >= 0 || unknownState) {
+ if (comp >= 0 || Boolean.TRUE.equals(unknownState)) {
log.debug(
"Removing resource from temp cache. comparison: {} unknown state: {}",
comp,
@@ -143,24 +133,38 @@ private synchronized EventHandling onEvent(
// we propagate event only for our update or newer other can be discarded since we know we
// will receive
// additional event
- result = comp == 0 ? EventHandling.OBSOLETE : EventHandling.NEW;
+ if (comp == 0) {
+ result = Optional.empty();
+ }
} else {
- result = EventHandling.OBSOLETE;
+ // in this case we received an event that might be in some edge case that was
+ // already used in reconciler or after that, but before our updated resource version.
+ // That would be hard to distinguish, so for those we are propagating the event further.
+ log.debug("Received intermediate event.");
}
}
- var ed = activeUpdates.get(resourceId);
- if (ed != null && result != EventHandling.OBSOLETE) {
- log.debug("Setting last event for id: {} delete: {}", resourceId, delete);
- ed.setLastEvent(
- delete
- ? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState)
- : new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion));
- return EventHandling.DEFER;
+ var au = activeUpdates.get(resourceId);
+ if (au != null) {
+ log.debug("Recording relevant event");
+ au.addRelatedEvent(
+ new GenericResourceEvent(action, resource, prevResourceVersion, unknownState));
+ // this is to cover the situation when we finished the filtering and caching update but
+ // did not receive events for our own updates yet.
+ if (au.isNoActiveUpdate() && au.newerOrEqualEventReceivedForOwnLastUpdate()) {
+ return finaleEventHandlingAndCleanup(resourceId, au);
+ }
+ return Optional.empty();
} else {
- return result;
+ log.debug("No active recording, event handling: {}", result);
+ return informerOngoingRelist ? Optional.of(actualEvent) : result;
}
}
+ static GenericResourceEvent toGenericResourceEvent(
+ ResourceAction action, T resource, T prevResourceVersion, Boolean unknownState) {
+ return new GenericResourceEvent(action, resource, prevResourceVersion, unknownState);
+ }
+
/** put the item into the cache if it's for a later state than what has already been observed. */
public synchronized void putResource(T newResource) {
if (!comparableResourceVersions) {
@@ -208,6 +212,9 @@ public synchronized void putResource(T newResource) {
// also make sure that we're later than the existing temporary entry
var cachedResource = getResourceFromCache(resourceId).orElse(null);
+ Optional.ofNullable(activeUpdates.get(resourceId))
+ .ifPresent(
+ au -> au.addToOwnResourceVersions(newResource.getMetadata().getResourceVersion()));
if (cachedResource == null
|| ReconcilerUtilsInternal.compareResourceVersions(newResource, cachedResource) > 0) {
@@ -231,7 +238,7 @@ private String getLastSyncResourceVersion(String namespace) {
* explicitly add resources to this cache. Those are cleaned up by this check, which is triggered
* by the informer's onList callback.
*/
- public void checkGhostResources() {
+ public synchronized void checkGhostResources() {
log.debug("Checking for ghost resources.");
var iterator = cache.entrySet().iterator();
while (iterator.hasNext()) {
@@ -246,23 +253,36 @@ public void checkGhostResources() {
e.getKey(),
ns);
iterator.remove();
+ activeUpdates.remove(e.getKey());
continue;
}
if ((ReconcilerUtilsInternal.compareResourceVersions(
e.getValue().getMetadata().getResourceVersion(), getLastSyncResourceVersion(ns))
< 0)
// making sure we have the situation where resource is missing from the cache
- && managedInformerEventSource
- .manager()
- .get(ResourceID.fromResource(e.getValue()))
- .isEmpty()) {
+ && managedInformerEventSource.manager().get(e.getKey()).isEmpty()) {
+ log.debug("Removing ghost resource with ID: {}", e.getKey());
iterator.remove();
+ activeUpdates.remove(e.getKey());
managedInformerEventSource.handleEvent(ResourceAction.DELETED, e.getValue(), null, true);
- log.debug("Removing ghost resource with ID: {}", e.getKey());
}
}
}
+ private Optional finaleEventHandlingAndCleanup(
+ ResourceID resourceID, EventFilterDetails ed) {
+ if (ed.newerOrEqualEventReceivedForOwnLastUpdate()) {
+ activeUpdates.remove(resourceID);
+ if (ed.isAffectedByReList()) {
+ return ed.summaryEventForReList();
+ } else {
+ return ed.summaryEvent();
+ }
+ } else {
+ return Optional.empty();
+ }
+ }
+
public synchronized Optional getResourceFromCache(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}
@@ -274,4 +294,18 @@ synchronized boolean isEmpty() {
synchronized Map getResources() {
return Collections.unmodifiableMap(cache);
}
+
+ // for testing purposes
+ synchronized Map getActiveUpdates() {
+ return Collections.unmodifiableMap(activeUpdates);
+ }
+
+ public synchronized void setOngoingRelist() {
+ this.informerOngoingRelist = true;
+ activeUpdates.values().forEach(EventFilterDetails::affectedByReList);
+ }
+
+ public synchronized void setRelistFinished() {
+ this.informerOngoingRelist = false;
+ }
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java
index 4528fa8a83..a7765da4fa 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java
@@ -35,12 +35,14 @@
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase;
import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
+import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
import static io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils.withResourceVersion;
@@ -227,6 +229,74 @@ void eventFilteringExceptionDuringUpdate() {
expectHandleEvent(2, 1);
}
+ @Test
+ void propagatesIntermediateEventForExternalUpdateDuringFiltering() {
+ // Causal-dependency scenario: a third party updated the resource between our read and
+ // our write. The informer delivers that update during our active filter; since its
+ // resource version is NOT one of our own writes, it must be propagated.
+ var src = new TestableControllerEventSource(new TestController(null, null, null));
+ setUpSource(src, true, controllerConfig);
+
+ var resourceId = ResourceID.fromResource(TestUtils.testCustomResource1());
+
+ // first filter writes rv 4 (our own); a second concurrent filter keeps the
+ // active-updates window open while the event below is processed
+ var latch1 = sendForEventFilteringUpdate(4);
+ var latch2 = sendForEventFilteringUpdate(testResourceWithVersion(4), 5);
+
+ latch1.countDown();
+ awaitCachedResourceVersion(src.tempCache(), resourceId, "4");
+
+ // external update with rv 3 (older than our cached rv 4) — must propagate
+ source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3));
+ latch2.countDown();
+ source.onUpdate(testResourceWithVersion(3), testResourceWithVersion(5));
+
+ await().untilAsserted(() -> verify(eventHandler, times(1)).handleEvent(any()));
+ }
+
+ @Test
+ void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() {
+ // Two consecutive own writes (rv 3 then rv 4) within an open filter window: an event
+ // for the older own version must be deferred since it's recognized as our own. A
+ // third concurrent filter keeps the active-updates window open while the event below
+ // is processed.
+ var src = new TestableControllerEventSource(new TestController(null, null, null));
+ setUpSource(src, true, controllerConfig);
+
+ var resourceId = ResourceID.fromResource(TestUtils.testCustomResource1());
+
+ var latch1 = sendForEventFilteringUpdate(3);
+ var latch2 = sendForEventFilteringUpdate(testResourceWithVersion(3), 4);
+ var latch3 = sendForEventFilteringUpdate(testResourceWithVersion(4), 5);
+
+ latch1.countDown();
+ awaitCachedResourceVersion(src.tempCache(), resourceId, "3");
+ latch2.countDown();
+ awaitCachedResourceVersion(src.tempCache(), resourceId, "4");
+
+ // event for our own rv 3 (older than cached rv 4) — must be deferred
+ source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3));
+
+ verify(eventHandler, never()).handleEvent(any());
+
+ latch3.countDown();
+ }
+
+ private void awaitCachedResourceVersion(
+ TemporaryResourceCache cache,
+ ResourceID resourceId,
+ String resourceVersion) {
+ await()
+ .untilAsserted(
+ () ->
+ assertThat(
+ cache
+ .getResourceFromCache(resourceId)
+ .map(r -> r.getMetadata().getResourceVersion()))
+ .hasValue(resourceVersion));
+ }
+
private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) {
await()
.untilAsserted(
@@ -247,7 +317,7 @@ private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) {
.isEqualTo("" + oldResourceVersion);
return true;
}),
- isNull());
+ any());
});
}
@@ -330,4 +400,15 @@ public TestConfiguration(
false);
}
}
+
+ private static class TestableControllerEventSource
+ extends ControllerEventSource {
+ TestableControllerEventSource(Controller controller) {
+ super(controller);
+ }
+
+ TemporaryResourceCache tempCache() {
+ return temporaryResourceCache;
+ }
+ }
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetailsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetailsTest.java
new file mode 100644
index 0000000000..f108c58d11
--- /dev/null
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetailsTest.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.processing.event.source.informer;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
+
+class EventFilterDetailsTest {
+
+ private EventFilterDetails details;
+
+ @BeforeEach
+ void setup() {
+ details = new EventFilterDetails(false);
+ }
+
+ @Test
+ void activeUpdatesCounter() {
+ assertThat(details.isNoActiveUpdate()).isTrue();
+ assertThat(details.getActiveUpdates()).isZero();
+
+ details.increaseActiveUpdates();
+ details.increaseActiveUpdates();
+ assertThat(details.getActiveUpdates()).isEqualTo(2);
+ assertThat(details.isNoActiveUpdate()).isFalse();
+
+ assertThat(details.decreaseActiveUpdates()).isFalse();
+ assertThat(details.getActiveUpdates()).isEqualTo(1);
+
+ assertThat(details.decreaseActiveUpdates()).isTrue();
+ assertThat(details.isNoActiveUpdate()).isTrue();
+ }
+
+ @Test
+ void summaryEmptyWhenAllRelatedEventsAreOwn() {
+ details.addToOwnResourceVersions("2");
+ details.addToOwnResourceVersions("3");
+ details.addRelatedEvent(updatedEvent("2", null));
+ details.addRelatedEvent(updatedEvent("3", "2"));
+
+ assertThat(details.summaryEvent()).isEmpty();
+ }
+
+ @Test
+ void summaryReturnsSingleNonOwnEvent() {
+ var thirdParty = updatedEvent("4", "3");
+ details.addToOwnResourceVersions("2");
+ details.addRelatedEvent(thirdParty);
+
+ var summary = details.summaryEvent();
+
+ assertThat(summary).contains(thirdParty);
+ }
+
+ @Test
+ void summaryReturnsLastEventWhenItIsDelete() {
+ var firstUpdate = updatedEvent("3", "2");
+ var deleteAtEnd = deleteEvent("4");
+ details.addRelatedEvent(firstUpdate);
+ details.addRelatedEvent(deleteAtEnd);
+
+ var summary = details.summaryEvent();
+
+ assertThat(summary).contains(deleteAtEnd);
+ }
+
+ @Test
+ void summaryDoesNotReturnDeleteWhenItIsNotLast() {
+ // simulates a delete-then-recreate sequence inside the filter window:
+ // returning the DELETE would mask the fact that the resource exists again.
+ var deleteEvent = deleteEvent("3");
+ var recreate = addedEvent("4");
+ details.addRelatedEvent(deleteEvent);
+ details.addRelatedEvent(recreate);
+
+ var summary = details.summaryEvent();
+
+ assertThat(summary).isPresent();
+ assertThat(summary.get().getAction()).isEqualTo(ResourceAction.UPDATED);
+ assertThat(summary.get().getResource().orElseThrow()).isEqualTo(recreate.getResource().get());
+ }
+
+ @Test
+ void summarySynthesizesUpdatedFromFirstPreviousToLastResource() {
+ var first = updatedEvent("3", "2");
+ var middle = updatedEvent("4", "3");
+ var last = updatedEvent("5", "4");
+ details.addRelatedEvent(first);
+ details.addRelatedEvent(middle);
+ details.addRelatedEvent(last);
+
+ var summary = details.summaryEvent().orElseThrow();
+
+ assertThat(summary.getAction()).isEqualTo(ResourceAction.UPDATED);
+ assertThat(summary.getResource().orElseThrow()).isEqualTo(last.getResource().get());
+ assertThat(summary.getPreviousResource().orElseThrow())
+ .isEqualTo(first.getPreviousResource().get());
+ assertThat(summary.getLastStateUnknow()).isNull();
+ }
+
+ @Test
+ void summaryUsesFirstResourceAsPreviousWhenFirstEventHasNoPrevious() {
+ // first event is ADD (no previous resource); synthesis must fall back to the resource itself.
+ var added = addedEvent("3");
+ var updated = updatedEvent("4", "3");
+ details.addRelatedEvent(added);
+ details.addRelatedEvent(updated);
+
+ var summary = details.summaryEvent().orElseThrow();
+
+ assertThat(summary.getAction()).isEqualTo(ResourceAction.UPDATED);
+ assertThat(summary.getResource().orElseThrow()).isEqualTo(updated.getResource().get());
+ assertThat(summary.getPreviousResource().orElseThrow()).isEqualTo(added.getResource().get());
+ }
+
+ @Test
+ void summarySkipsOwnFilterWhenAtLeastOneEventIsForeign() {
+ // even with own rvs in the mix, presence of a non-own event must surface a summary.
+ details.addToOwnResourceVersions("3");
+ var ownEvent = updatedEvent("3", "2");
+ var foreign = updatedEvent("4", "3");
+ details.addRelatedEvent(ownEvent);
+ details.addRelatedEvent(foreign);
+
+ var summary = details.summaryEvent().orElseThrow();
+
+ assertThat(summary.getAction()).isEqualTo(ResourceAction.UPDATED);
+ assertThat(summary.getResource().orElseThrow()).isEqualTo(foreign.getResource().get());
+ assertThat(summary.getPreviousResource().orElseThrow())
+ .isEqualTo(ownEvent.getPreviousResource().get());
+ }
+
+ @Test
+ void newerOrEqualReturnsTrueWhenNoOwnVersions() {
+ assertThat(details.newerOrEqualEventReceivedForOwnLastUpdate()).isTrue();
+ details.addRelatedEvent(updatedEvent("2", null));
+ assertThat(details.newerOrEqualEventReceivedForOwnLastUpdate()).isTrue();
+ }
+
+ @Test
+ void newerOrEqualReturnsFalseWhenNoRelatedEventsYet() {
+ details.addToOwnResourceVersions("3");
+
+ assertThat(details.newerOrEqualEventReceivedForOwnLastUpdate()).isFalse();
+ }
+
+ @Test
+ void newerOrEqualReturnsFalseWhenAllRelatedAreOlderThanLastOwn() {
+ details.addToOwnResourceVersions("5");
+ details.addRelatedEvent(updatedEvent("3", "2"));
+ details.addRelatedEvent(updatedEvent("4", "3"));
+
+ assertThat(details.newerOrEqualEventReceivedForOwnLastUpdate()).isFalse();
+ }
+
+ @Test
+ void newerOrEqualReturnsTrueWhenRelatedMatchesLastOwn() {
+ details.addToOwnResourceVersions("3");
+ details.addToOwnResourceVersions("5");
+ details.addRelatedEvent(updatedEvent("5", "4"));
+
+ assertThat(details.newerOrEqualEventReceivedForOwnLastUpdate()).isTrue();
+ }
+
+ @Test
+ void newerOrEqualReturnsTrueWhenRelatedNewerThanLastOwn() {
+ details.addToOwnResourceVersions("3");
+ details.addRelatedEvent(updatedEvent("7", "3"));
+
+ assertThat(details.newerOrEqualEventReceivedForOwnLastUpdate()).isTrue();
+ }
+
+ @Test
+ void summaryEventReturnsEmptyWhenNoRelatedEvents() {
+ assertThat(details.summaryEvent()).isEmpty();
+ }
+
+ @Test
+ void summaryEventForReListReturnsEmptyWhenNoRelatedEventsAndMarksSent() {
+ var reListDetails = new EventFilterDetails(true);
+
+ assertThat(reListDetails.summaryEventForReList()).isEmpty();
+ assertThat(reListDetails.isReListSummaryEventSent()).isTrue();
+ }
+
+ @Test
+ void summaryEventForReListReturnsSummaryAndMarksSent() {
+ var reListDetails = new EventFilterDetails(true);
+ var event = updatedEvent("3", "2");
+ reListDetails.addRelatedEvent(event);
+
+ var summary = reListDetails.summaryEventForReList();
+
+ assertThat(summary).contains(event);
+ assertThat(reListDetails.isReListSummaryEventSent()).isTrue();
+ }
+
+ @Test
+ void summaryEventForReListThrowsWhenNotAffectedByReList() {
+ details.addRelatedEvent(updatedEvent("3", "2"));
+
+ assertThatIllegalStateException().isThrownBy(() -> details.summaryEventForReList());
+ }
+
+ @Test
+ void summaryEventForReListThrowsWhenAlreadySent() {
+ var reListDetails = new EventFilterDetails(true);
+ reListDetails.addRelatedEvent(updatedEvent("3", "2"));
+ reListDetails.summaryEventForReList();
+
+ assertThatIllegalStateException().isThrownBy(() -> reListDetails.summaryEventForReList());
+ }
+
+ @Test
+ void affectedByReListFlagCanBeSet() {
+ assertThat(details.isAffectedByReList()).isFalse();
+
+ details.affectedByReList();
+
+ assertThat(details.isAffectedByReList()).isTrue();
+ }
+
+ private static GenericResourceEvent addedEvent(String resourceVersion) {
+ return new GenericResourceEvent(ResourceAction.ADDED, resource(resourceVersion), null, null);
+ }
+
+ private static GenericResourceEvent updatedEvent(
+ String resourceVersion, String previousResourceVersion) {
+ var prev = previousResourceVersion == null ? null : resource(previousResourceVersion);
+ return new GenericResourceEvent(ResourceAction.UPDATED, resource(resourceVersion), prev, null);
+ }
+
+ private static GenericResourceEvent deleteEvent(String resourceVersion) {
+ return new GenericResourceEvent(ResourceAction.DELETED, resource(resourceVersion), null, null);
+ }
+
+ private static ConfigMap resource(String resourceVersion) {
+ return new ConfigMapBuilder()
+ .withMetadata(
+ new ObjectMetaBuilder()
+ .withName("test")
+ .withNamespace("default")
+ .withUid("test-uid")
+ .withResourceVersion(resourceVersion)
+ .build())
+ .build();
+ }
+}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
index fe78bd3147..b39d1af6a6 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java
@@ -25,11 +25,11 @@
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apps.Deployment;
-import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.javaoperatorsdk.operator.MockKubernetesClient;
@@ -46,7 +46,6 @@
import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
-import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
@@ -71,7 +70,8 @@
class InformerEventSourceTest {
private static final String PREV_RESOURCE_VERSION = "0";
- private static final String DEFAULT_RESOURCE_VERSION = "1";
+ private static final String DEFAULT_RESOURCE_VERSION = "2";
+ public static final int REPEAT_COUNT = 5;
private InformerEventSource informerEventSource;
private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class);
@@ -112,43 +112,6 @@ public synchronized void start() {}
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
}
- @Test
- void skipsEventPropagation() {
- when(temporaryResourceCache.getResourceFromCache(any()))
- .thenReturn(Optional.of(testDeployment()));
-
- when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any()))
- .thenReturn(EventHandling.OBSOLETE);
-
- informerEventSource.onAdd(testDeployment());
- informerEventSource.onUpdate(testDeployment(), testDeployment());
-
- verify(eventHandlerMock, never()).handleEvent(any());
- }
-
- @Test
- void processEventPropagationWithoutAnnotation() {
- when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any()))
- .thenReturn(EventHandling.NEW);
- informerEventSource.onUpdate(testDeployment(), testDeployment());
-
- verify(eventHandlerMock, times(1)).handleEvent(any());
- }
-
- @Test
- void processEventPropagationWithIncorrectAnnotation() {
- when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any()))
- .thenReturn(EventHandling.NEW);
- informerEventSource.onAdd(
- new DeploymentBuilder(testDeployment())
- .editMetadata()
- .addToAnnotations(InformerEventSource.PREVIOUS_ANNOTATION_KEY, "invalid")
- .endMetadata()
- .build());
-
- verify(eventHandlerMock, times(1)).handleEvent(any());
- }
-
@Test
void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() {
withRealTemporaryResourceCache();
@@ -206,145 +169,272 @@ void filtersOnDeleteEvents() {
}
@Test
+ void deletePropagatesEventWhenTempCacheReturnsDeleteEvent() {
+ var resource = testDeployment();
+ when(temporaryResourceCache.onDeleteEvent(resource, false))
+ .thenReturn(
+ Optional.of(new GenericResourceEvent(ResourceAction.DELETED, resource, null, false)));
+
+ informerEventSource.onDelete(resource, false);
+
+ verify(eventHandlerMock, times(1)).handleEvent(any());
+ }
+
+ @Test
+ void deleteDoesNotPropagateWhenTempCacheReturnsEmpty() {
+ var resource = testDeployment();
+ when(temporaryResourceCache.onDeleteEvent(resource, false)).thenReturn(Optional.empty());
+
+ informerEventSource.onDelete(resource, false);
+
+ verify(eventHandlerMock, never()).handleEvent(any());
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
void handlesPrevResourceVersionForUpdate() {
withRealTemporaryResourceCache();
- CountDownLatch latch = sendForEventFilteringUpdate(2);
+ CountDownLatch latch = sendForEventFilteringUpdate(3);
informerEventSource.onUpdate(
- deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
+ deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
latch.countDown();
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
- expectHandleEvent(3, 2);
+ expectHandleAddEvent(3, 1);
+ expectNoActiveUpdates();
}
- @Test
+ @RepeatedTest(REPEAT_COUNT)
void handlesPrevResourceVersionForUpdateInCaseOfException() {
withRealTemporaryResourceCache();
- CountDownLatch latch =
- EventFilterTestUtils.sendForEventFilteringUpdate(
- informerEventSource,
- testDeployment(),
- r -> {
- throw new KubernetesClientException("fake");
- });
+ CountDownLatch latch = sendForExceptionThrowingUpdate();
informerEventSource.onUpdate(
deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
latch.countDown();
- expectHandleEvent(2, 1);
+ expectHandleAddEvent(2, 1);
+ expectNoActiveUpdates();
}
- @Test
- void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() {
+ @RepeatedTest(REPEAT_COUNT)
+ void failedUpdate_withNoEventsDuringWindow_propagatesNothing() {
+ // No event arrives between start and the thrown exception. doneEventFilterModify
+ // sees an empty filter window with no own writes — summary must be empty.
withRealTemporaryResourceCache();
- var deployment = testDeployment();
- CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2);
+ CountDownLatch latch = sendForExceptionThrowingUpdate();
+ latch.countDown();
+
+ assertNoEventProduced();
+ expectNoActiveUpdates();
+ assertThat(temporaryResourceCache.getResources()).isEmpty();
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
+ void failedUpdate_withMultipleEventsDuringWindow_synthesizesSummary() {
+ // Multiple foreign updates arrive while we are about to fail. Since no own write
+ // happened, every related event is foreign and must be folded into one summary
+ // event spanning first.previous → last.resource.
+ withRealTemporaryResourceCache();
+
+ CountDownLatch latch = sendForExceptionThrowingUpdate();
informerEventSource.onUpdate(
- withResourceVersion(testDeployment(), 2), withResourceVersion(testDeployment(), 3));
+ deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
informerEventSource.onUpdate(
- withResourceVersion(testDeployment(), 3), withResourceVersion(testDeployment(), 4));
+ deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
latch.countDown();
- expectHandleEvent(4, 2);
+ expectHandleAddEvent(3, 1);
+ expectNoActiveUpdates();
}
- @Test
- void doesNotPropagateEventIfReceivedBeforeUpdate() {
+ @RepeatedTest(REPEAT_COUNT)
+ void failedUpdate_withDeleteEventDuringWindow_propagatesDelete() {
+ // delete arrives during the (failing) filter window — must surface as DELETE.
withRealTemporaryResourceCache();
- CountDownLatch latch = sendForEventFilteringUpdate(2);
+ CountDownLatch latch = sendForExceptionThrowingUpdate();
+ informerEventSource.onDelete(deploymentWithResourceVersion(2), false);
+ latch.countDown();
+
+ expectHandleDeleteEvent(2);
+ expectNoActiveUpdates();
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
+ void failedUpdate_withUpdateThenDelete_propagatesDelete() {
+ // Update followed by delete inside a failing filter window: last event is DELETE,
+ // so the summary must surface the delete (not a synthesized update).
+ withRealTemporaryResourceCache();
+
+ CountDownLatch latch = sendForExceptionThrowingUpdate();
informerEventSource.onUpdate(
deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
+ informerEventSource.onDelete(deploymentWithResourceVersion(3), false);
latch.countDown();
- assertNoEventProduced();
+ expectHandleDeleteEvent(3);
+ expectNoActiveUpdates();
}
- @Test
- void filterAddEventBeforeUpdate() {
+ @RepeatedTest(REPEAT_COUNT)
+ void failedUpdate_doesNotPopulateTempCache() {
+ // putResource is only called from handleRecentResourceUpdate, which never runs
+ // when updateMethod throws. The temp cache must therefore stay empty.
+ withRealTemporaryResourceCache();
+
+ CountDownLatch latch = sendForExceptionThrowingUpdate();
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
+ latch.countDown();
+
+ expectHandleAddEvent(2, 1);
+ expectNoActiveUpdates();
+ assertThat(temporaryResourceCache.getResources()).isEmpty();
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
+ void eventReceivedAfterFailedUpdate_isPropagatedNormally() {
+ // After the exception unwinds and the filter window is fully closed, subsequent
+ // events must propagate via the regular non-filtered path.
+ withRealTemporaryResourceCache();
+
+ CountDownLatch latch = sendForExceptionThrowingUpdate();
+ latch.countDown();
+ expectNoActiveUpdates();
+
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
+
+ expectHandleAddEvent(2, 1);
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
+ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() {
+ withRealTemporaryResourceCache();
+
+ var deployment = testDeployment();
+ CountDownLatch latch = sendForEventFilteringUpdate(deployment, 2);
+ informerEventSource.onUpdate(
+ withResourceVersion(testDeployment(), 2), withResourceVersion(testDeployment(), 3));
+ informerEventSource.onUpdate(
+ withResourceVersion(testDeployment(), 3), withResourceVersion(testDeployment(), 4));
+ latch.countDown();
+
+ expectHandleAddEvent(4, 2);
+ expectNoActiveUpdates();
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
+ void doesNotPropagateEventIfReceivedBeforeUpdate() {
withRealTemporaryResourceCache();
CountDownLatch latch = sendForEventFilteringUpdate(2);
- informerEventSource.onAdd(deploymentWithResourceVersion(1));
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
latch.countDown();
assertNoEventProduced();
+ expectNoActiveUpdates();
}
- @Test
+ @RepeatedTest(REPEAT_COUNT)
void multipleCachingFilteringUpdates() {
withRealTemporaryResourceCache();
- CountDownLatch latch = sendForEventFilteringUpdate(2);
+ CountDownLatch latch = sendForEventFilteringUpdate(3);
CountDownLatch latch2 =
- sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3);
+ sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4);
informerEventSource.onUpdate(
- deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
+ deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
latch.countDown();
latch2.countDown();
informerEventSource.onUpdate(
- deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
+ deploymentWithResourceVersion(3), deploymentWithResourceVersion(4));
assertNoEventProduced();
+ expectNoActiveUpdates();
}
- @Test
+ @RepeatedTest(REPEAT_COUNT)
void multipleCachingFilteringUpdates_variant2() {
withRealTemporaryResourceCache();
- CountDownLatch latch = sendForEventFilteringUpdate(2);
+ CountDownLatch latch = sendForEventFilteringUpdate(3);
CountDownLatch latch2 =
- sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3);
+ sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4);
informerEventSource.onUpdate(
- deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
+ deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
latch.countDown();
informerEventSource.onUpdate(
- deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
+ deploymentWithResourceVersion(3), deploymentWithResourceVersion(4));
latch2.countDown();
assertNoEventProduced();
+ expectNoActiveUpdates();
}
- @Test
+ @RepeatedTest(REPEAT_COUNT)
void multipleCachingFilteringUpdates_variant3() {
withRealTemporaryResourceCache();
- CountDownLatch latch = sendForEventFilteringUpdate(2);
+ CountDownLatch latch = sendForEventFilteringUpdate(3);
CountDownLatch latch2 =
- sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3);
+ sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4);
latch.countDown();
- informerEventSource.onUpdate(
- deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
informerEventSource.onUpdate(
deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(4), deploymentWithResourceVersion(4));
latch2.countDown();
assertNoEventProduced();
+ expectNoActiveUpdates();
}
- @Test
+ @RepeatedTest(REPEAT_COUNT)
void multipleCachingFilteringUpdates_variant4() {
withRealTemporaryResourceCache();
- CountDownLatch latch = sendForEventFilteringUpdate(2);
+ CountDownLatch latch = sendForEventFilteringUpdate(3);
CountDownLatch latch2 =
- sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3);
+ sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4);
- informerEventSource.onUpdate(
- deploymentWithResourceVersion(1), deploymentWithResourceVersion(2));
informerEventSource.onUpdate(
deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(3), deploymentWithResourceVersion(4));
latch.countDown();
latch2.countDown();
assertNoEventProduced();
+ expectNoActiveUpdates();
}
- @Test
+ @RepeatedTest(REPEAT_COUNT)
+ void multipleCachingFilteringUpdates_variant5() {
+ withRealTemporaryResourceCache();
+
+ CountDownLatch latch = sendForEventFilteringUpdate(3);
+ CountDownLatch latch2 =
+ sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4);
+ latch.countDown();
+ latch2.countDown();
+
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(3), deploymentWithResourceVersion(4));
+
+ assertNoEventProduced();
+ expectNoActiveUpdates();
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
void ghostCheckRemovesCachedResourceDuringFilteringUpdate() {
var mes = mock(ManagedInformerEventSource.class);
var mim = mock(InformerManager.class);
@@ -370,11 +460,11 @@ void ghostCheckRemovesCachedResourceDuringFilteringUpdate() {
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
// complete the filtering update - the resource should not reappear
- temporaryResourceCache.doneEventFilterModify(resourceId, "2");
+ temporaryResourceCache.doneEventFilterModify(resourceId);
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
}
- @Test
+ @RepeatedTest(REPEAT_COUNT)
void ghostCheckRunsConcurrentlyWithPutResource() {
var mes = mock(ManagedInformerEventSource.class);
var mim = mock(InformerManager.class);
@@ -405,7 +495,7 @@ void ghostCheckRunsConcurrentlyWithPutResource() {
.isPresent();
}
- @Test
+ @RepeatedTest(REPEAT_COUNT)
void filteringUpdateAndGhostCheckWithNamespaceChange() {
var mes = mock(ManagedInformerEventSource.class);
var mim = mock(InformerManager.class);
@@ -430,7 +520,7 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() {
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
// complete the filtering update
- var doneResult = temporaryResourceCache.doneEventFilterModify(resourceId, "2");
+ var doneResult = temporaryResourceCache.doneEventFilterModify(resourceId);
// resource was already cleaned by ghost check, so no deferred event
assertThat(doneResult).isEmpty();
@@ -439,16 +529,181 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() {
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
}
+ @RepeatedTest(REPEAT_COUNT)
+ void ghostCheckDuringOpenFilteringUpdate_cleansUpAndDoneIsNoOp() {
+ // Combines the real eventFilteringUpdateAndCacheResource flow with a ghost-resource
+ // cleanup happening while a second filter window is still open. The ghost check
+ // must clear cache + activeUpdates and fire a synthetic DELETE; the still-open
+ // filter's later doneEventFilterModify must complete cleanly (no NPE on the
+ // already-removed EventFilterDetails) and not propagate any further events.
+ var mes = mock(ManagedInformerEventSource.class);
+ var mim = mock(InformerManager.class);
+ when(mes.manager()).thenReturn(mim);
+ when(mim.isWatchingNamespace(any())).thenReturn(true);
+ when(mim.lastSyncResourceVersion(any())).thenReturn("1");
+ when(mim.get(any())).thenReturn(Optional.empty());
+
+ temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
+ informerEventSource.setTemporalResourceCache(temporaryResourceCache);
+
+ var resourceId = ResourceID.fromResource(testDeployment());
+
+ // first filter completes and caches rv 2; second filter keeps the window open
+ var latch1 = sendForEventFilteringUpdate(2);
+ var latch2 = sendForEventFilteringUpdate(deploymentWithResourceVersion(2), 3);
+
+ latch1.countDown();
+ awaitCachedResourceVersion(resourceId, "2");
+
+ // simulate watch disconnect + relist while the second filter is still open:
+ // lastSync moved well past our cached rv, informer no longer has the resource
+ when(mim.lastSyncResourceVersion(any())).thenReturn("10");
+
+ temporaryResourceCache.checkGhostResources();
+
+ // ghost cleanup wiped both cache and activeUpdates
+ assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();
+ assertThat(temporaryResourceCache.getActiveUpdates()).isEmpty();
+
+ // synthetic DELETE fired through the cache's manager reference
+ verify(mes, times(1)).handleEvent(eq(ResourceAction.DELETED), any(), isNull(), eq(true));
+
+ // closing the still-open filter must not NPE on the missing EventFilterDetails
+ // and must not propagate anything
+ latch2.countDown();
+
+ assertNoEventProduced();
+ expectNoActiveUpdates();
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
+ void propagatesIntermediateEventForExternalUpdateDuringFiltering() {
+ // Causal-dependency fix: another controller updated the resource between our read
+ // and our write. The informer delivers that update during our active filter; since
+ // its resource version is NOT one of our own writes, it must be propagated.
+ withRealTemporaryResourceCache();
+
+ var resourceId = ResourceID.fromResource(testDeployment());
+
+ // first filter writes rv 4 (our own); a second concurrent filter keeps the
+ // active-updates window open so the event below hits the active path
+ var latch1 = sendForEventFilteringUpdate(4);
+ var latch2 = sendForEventFilteringUpdate(deploymentWithResourceVersion(4), 5);
+
+ latch1.countDown();
+ awaitCachedResourceVersion(resourceId, "4");
+
+ // external update with rv 3 (older than our cached rv 4) — must propagate
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
+ latch2.countDown();
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(4), deploymentWithResourceVersion(5));
+
+ expectHandleAddEvent(5, 2);
+ expectNoActiveUpdates();
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
+ void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() {
+ // Two consecutive own writes (rv 3 then rv 4) within an open filter window: an
+ // event for the older own version must be deferred since it's recognized as our own.
+ // A third concurrent filter keeps the active-updates window open while the event
+ // below is processed.
+ withRealTemporaryResourceCache();
+
+ var resourceId = ResourceID.fromResource(testDeployment());
+
+ var latch1 = sendForEventFilteringUpdate(3);
+ var latch2 = sendForEventFilteringUpdate(deploymentWithResourceVersion(3), 4);
+ var latch3 = sendForEventFilteringUpdate(deploymentWithResourceVersion(4), 5);
+
+ latch1.countDown();
+ awaitCachedResourceVersion(resourceId, "3");
+ latch2.countDown();
+ awaitCachedResourceVersion(resourceId, "4");
+
+ // event for our own rv 3 (older than cached rv 4) — must be deferred
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(2), deploymentWithResourceVersion(3));
+
+ verify(eventHandlerMock, never()).handleEvent(any());
+
+ latch3.countDown();
+ awaitCachedResourceVersion(resourceId, "5");
+ // drain the filter with the event for our own rv 5 — all events are now own,
+ // summary must be empty and no event propagated.
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(4), deploymentWithResourceVersion(5));
+
+ assertNoEventProduced();
+ expectNoActiveUpdates();
+ }
+
+ @RepeatedTest(REPEAT_COUNT)
+ void deleteEventPropagatedIfItWasTheLastEvent() {
+ // Within an open filter window, an external UPDATE arrives followed by a DELETE.
+ // The summary must surface the DELETE since it represents the final state.
+ withRealTemporaryResourceCache();
+
+ var latch = sendForEventFilteringUpdate(3);
+
+ informerEventSource.onUpdate(
+ deploymentWithResourceVersion(3), deploymentWithResourceVersion(4));
+ informerEventSource.onDelete(deploymentWithResourceVersion(5), false);
+
+ latch.countDown();
+
+ expectHandleDeleteEvent(5);
+ expectNoActiveUpdates();
+ }
+
+ private void awaitCachedResourceVersion(ResourceID resourceId, String resourceVersion) {
+ await()
+ .untilAsserted(
+ () ->
+ assertThat(
+ temporaryResourceCache
+ .getResourceFromCache(resourceId)
+ .map(d -> d.getMetadata().getResourceVersion()))
+ .hasValue(resourceVersion));
+ }
+
private void assertNoEventProduced() {
await()
- .pollDelay(Duration.ofMillis(50))
- .timeout(Duration.ofMillis(51))
+ .pollDelay(Duration.ofMillis(70))
+ .timeout(Duration.ofMillis(71))
+ .untilAsserted(() -> verify(informerEventSource, never()).propagateEvent(any()));
+ }
+
+ private void expectNoActiveUpdates() {
+ await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(() -> assertThat(temporaryResourceCache.getActiveUpdates()).isEmpty());
+ }
+
+ private void expectHandleAddEvent(int newResourceVersion) {
+ await()
+ .atMost(Duration.ofSeconds(1))
.untilAsserted(
- () -> verify(informerEventSource, never()).handleEvent(any(), any(), any(), any()));
+ () -> {
+ verify(informerEventSource, times(1))
+ .handleEvent(
+ eq(ResourceAction.ADDED),
+ argThat(
+ newResource -> {
+ assertThat(newResource.getMetadata().getResourceVersion())
+ .isEqualTo("" + newResourceVersion);
+ return true;
+ }),
+ isNull(),
+ any());
+ });
}
- private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) {
+ private void expectHandleAddEvent(int newResourceVersion, int oldResourceVersion) {
await()
+ .atMost(Duration.ofSeconds(1))
.untilAsserted(
() -> {
verify(informerEventSource, times(1))
@@ -466,7 +721,26 @@ private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) {
.isEqualTo("" + oldResourceVersion);
return true;
}),
- isNull());
+ any());
+ });
+ }
+
+ private void expectHandleDeleteEvent(int resourceVersion) {
+ await()
+ .atMost(Duration.ofSeconds(1))
+ .untilAsserted(
+ () -> {
+ verify(informerEventSource, times(1))
+ .handleEvent(
+ eq(ResourceAction.DELETED),
+ argThat(
+ newResource -> {
+ assertThat(newResource.getMetadata().getResourceVersion())
+ .isEqualTo("" + resourceVersion);
+ return true;
+ }),
+ isNull(),
+ any());
});
}
@@ -479,10 +753,20 @@ private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int re
informerEventSource, deployment, r -> withResourceVersion(deployment, resourceVersion));
}
+ private CountDownLatch sendForExceptionThrowingUpdate() {
+ return EventFilterTestUtils.sendForEventFilteringUpdate(
+ informerEventSource,
+ testDeployment(),
+ r -> {
+ throw new KubernetesClientException("fake");
+ });
+ }
+
private void withRealTemporaryResourceCache() {
var mes = mock(ManagedInformerEventSource.class);
var mim = mock(InformerManager.class);
when(mes.manager()).thenReturn(mim);
+ when(mim.isWatchingNamespace(any())).thenReturn(true);
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java
index 9a58b83f88..edae142770 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java
@@ -25,7 +25,6 @@
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
-import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -155,7 +154,7 @@ void eventReceivedDuringFiltering() {
.isEmpty();
var doneRes =
- temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2");
+ temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource));
assertThat(doneRes).isEmpty();
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
@@ -179,7 +178,7 @@ void newerEventDuringFiltering() {
.isEmpty();
var doneRes =
- temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2");
+ temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource));
assertThat(doneRes).isPresent();
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
@@ -197,7 +196,7 @@ void eventAfterFiltering() {
.isPresent();
var doneRes =
- temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2");
+ temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource));
assertThat(doneRes).isEmpty();
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)))
@@ -215,14 +214,14 @@ void putBeforeEvent() {
// first ensure an event is not known
var result =
temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null);
- assertThat(result).isEqualTo(EventHandling.NEW);
+ assertThat(result).isPresent();
var nextResource = testResource();
nextResource.getMetadata().setResourceVersion("3");
temporaryResourceCache.putResource(nextResource);
result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null);
- assertThat(result).isEqualTo(EventHandling.OBSOLETE);
+ assertThat(result).isEmpty();
}
@Test
@@ -232,7 +231,7 @@ void putBeforeEventWithEventFiltering() {
// first ensure an event is not known
var result =
temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null);
- assertThat(result).isEqualTo(EventHandling.NEW);
+ assertThat(result).isPresent();
latestSyncVersion = RESOURCE_VERSION;
var nextResource = testResource();
@@ -241,11 +240,11 @@ void putBeforeEventWithEventFiltering() {
temporaryResourceCache.startEventFilteringModify(resourceId);
temporaryResourceCache.putResource(nextResource);
- temporaryResourceCache.doneEventFilterModify(resourceId, "3");
+ temporaryResourceCache.doneEventFilterModify(resourceId);
latestSyncVersion = "3";
result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null);
- assertThat(result).isEqualTo(EventHandling.OBSOLETE);
+ assertThat(result).isEmpty();
}
@Test
@@ -255,7 +254,14 @@ void putAfterEventWithEventFilteringNoPost() {
// first ensure an event is not known
var result =
temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null);
- assertThat(result).isEqualTo(EventHandling.NEW);
+ assertThat(result)
+ .hasValueSatisfying(
+ v -> {
+ assertThat(v.getAction()).isEqualTo(ResourceAction.ADDED);
+ assertThat(v.getPreviousResource()).isEmpty();
+ assertThat(v.getResource()).contains(testResource);
+ assertThat(v.getLastStateUnknow()).isNull();
+ });
var nextResource = testResource();
nextResource.getMetadata().setResourceVersion("3");
@@ -265,10 +271,10 @@ void putAfterEventWithEventFilteringNoPost() {
result =
temporaryResourceCache.onAddOrUpdateEvent(
ResourceAction.UPDATED, nextResource, testResource);
- // the result is deferred
- assertThat(result).isEqualTo(EventHandling.DEFER);
+ assertThat(result).isEmpty();
+
temporaryResourceCache.putResource(nextResource);
- var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId, "3");
+ var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId);
// there is no post event because the done call claimed responsibility for rv 3
assertTrue(postEvent.isEmpty());
@@ -280,20 +286,88 @@ void putAfterEventWithEventFilteringWithPost() {
var resourceId = ResourceID.fromResource(testResource);
temporaryResourceCache.startEventFilteringModify(resourceId);
- // this should be a corner case - watch had a hard reset since the start of the
+ // this should be a corner case - watch had a hard reset since the start
// of the update operation, such that 4 rv event is seen prior to the update
// completing with the 3 rv.
var nextResource = testResource();
nextResource.getMetadata().setResourceVersion("4");
var result =
temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, nextResource, null);
- assertThat(result).isEqualTo(EventHandling.DEFER);
+ assertThat(result).isEmpty();
- var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId, "3");
+ var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId);
assertTrue(postEvent.isPresent());
}
+ @Test
+ void intermediateEventPropagatedWhenNoActiveUpdate() {
+ // Cache holds a newer version from a prior own write; no active filter is in progress.
+ // An older event arriving used to be OBSOLETE; now it must be propagated as INTERMEDIATE
+ // so callers can react to changes that happened between read and write.
+ var olderEvent = testResource();
+ var newer = testResource();
+ newer.getMetadata().setResourceVersion("3");
+
+ temporaryResourceCache.putResource(newer);
+ assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(olderEvent)))
+ .isPresent();
+
+ var result =
+ temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, olderEvent, null);
+
+ assertThat(result)
+ .hasValueSatisfying(
+ e -> {
+ assertThat(e.getResource().orElseThrow()).isEqualTo(olderEvent);
+ assertThat(e.getPreviousResource()).isNotPresent();
+ assertThat(e.getAction()).isEqualTo(ResourceAction.UPDATED);
+ });
+ }
+
+ @Test
+ void intermediateEventRecorded() {
+ // Causal-dependency scenario: a third party updated the resource between our read and
+ // our write. Its version arrives as an event but is NOT in our own resource versions,
+ // so it must be propagated (INTERMEDIATE), not deferred.
+ var external = testResource(); // rv=2 — written by another controller
+ var resourceId = ResourceID.fromResource(external);
+
+ temporaryResourceCache.startEventFilteringModify(resourceId);
+
+ var ourUpdate = testResource();
+ ourUpdate.getMetadata().setResourceVersion("3");
+ temporaryResourceCache.putResource(ourUpdate);
+
+ var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, external, null);
+
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ void intermediateEventDeferredWhenItIsOurOwnIntermediateUpdate() {
+ // Two consecutive own writes within the same filter window: the older one's event
+ // arrives after the newer one is cached. Because the version is recorded as our own,
+ // the event must be DEFERred rather than propagated.
+ var testResource = testResource();
+ var resourceId = ResourceID.fromResource(testResource);
+
+ temporaryResourceCache.startEventFilteringModify(resourceId);
+
+ var ourFirst = testResource(); // rv=2
+ temporaryResourceCache.putResource(ourFirst);
+
+ var ourSecond = testResource();
+ ourSecond.getMetadata().setResourceVersion("3");
+
+ temporaryResourceCache.startEventFilteringModify(resourceId);
+ temporaryResourceCache.putResource(ourSecond);
+
+ var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, ourFirst, null);
+
+ assertThat(result).isEmpty();
+ }
+
@Test
void rapidDeletion() {
var testResource = testResource();
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateCustomResource.java
new file mode 100644
index 0000000000..60d09f82f8
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateCustomResource.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.deletionduringstatusupdate;
+
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.ShortNames;
+import io.fabric8.kubernetes.model.annotation.Version;
+
+@Group("sample.javaoperatorsdk")
+@Version("v1")
+@ShortNames("ddsu")
+public class DeletionDuringStatusUpdateCustomResource
+ extends CustomResource implements Namespaced {}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateIT.java
new file mode 100644
index 0000000000..3012c9538c
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateIT.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.deletionduringstatusupdate;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Regression test for: deletion event dropped when resource is deleted concurrently with a status
+ * update.
+ */
+class DeletionDuringStatusUpdateIT {
+
+ static final String RESOURCE_NAME = "test-resource";
+
+ @RegisterExtension
+ LocallyRunOperatorExtension extension =
+ LocallyRunOperatorExtension.builder()
+ .withReconciler(new DeletionDuringStatusUpdateReconciler())
+ .build();
+
+ @AfterEach
+ void forceCleanup() {
+ // If the test failed, remove the finalizer so the resource can be deleted
+ var res = extension.get(DeletionDuringStatusUpdateCustomResource.class, RESOURCE_NAME);
+ if (res != null) {
+ res.getMetadata().setFinalizers(Collections.emptyList());
+ extension.replace(res);
+ extension.delete(res);
+ }
+ }
+
+ @Test
+ void deletionDuringStatusUpdateTriggersCleanup() throws InterruptedException {
+ var reconciler = extension.getReconcilerOfType(DeletionDuringStatusUpdateReconciler.class);
+
+ extension.create(testResource());
+
+ // Wait until the reconciler is inside the update operation (active-update window is open)
+ assertThat(reconciler.patchStartedLatch.await(30, TimeUnit.SECONDS))
+ .as("reconciler should enter the patch update operation")
+ .isTrue();
+
+ // Issue delete — K8s sets deletionTimestamp while the active-update window is open
+ extension.delete(testResource());
+
+ // Wait for deletionTimestamp to be confirmed on the resource in K8s
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .until(
+ () -> {
+ var res =
+ extension.get(DeletionDuringStatusUpdateCustomResource.class, RESOURCE_NAME);
+ return res != null && res.isMarkedForDeletion();
+ });
+
+ // Signal the reconciler to proceed with the actual PATCH. K8s will merge deletionTimestamp
+ // into the response - the deletion event (lower RV) is now deferred and will be dropped
+ // without the fix.
+ reconciler.deleteConfirmedLatch.countDown();
+
+ // cleanup() must be called — the deletion must not be silently lost
+ assertThat(reconciler.cleanupCalledLatch.await(30, TimeUnit.SECONDS))
+ .as("cleanup() must be called after the status update that races with the delete")
+ .isTrue();
+
+ // Resource must eventually disappear (finalizer removed)
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(
+ () ->
+ assertThat(
+ extension.get(
+ DeletionDuringStatusUpdateCustomResource.class, RESOURCE_NAME))
+ .isNull());
+ }
+
+ DeletionDuringStatusUpdateCustomResource testResource() {
+ var resource = new DeletionDuringStatusUpdateCustomResource();
+ resource.setMetadata(new ObjectMetaBuilder().withName(RESOURCE_NAME).build());
+ return resource;
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java
new file mode 100644
index 0000000000..feb0509e72
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.deletionduringstatusupdate;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+@ControllerConfiguration
+public class DeletionDuringStatusUpdateReconciler
+ implements Reconciler,
+ Cleaner {
+
+ final CountDownLatch patchStartedLatch = new CountDownLatch(1);
+ final CountDownLatch deleteConfirmedLatch = new CountDownLatch(1);
+ final CountDownLatch cleanupCalledLatch = new CountDownLatch(1);
+
+ @Override
+ public UpdateControl reconcile(
+ DeletionDuringStatusUpdateCustomResource resource,
+ Context context)
+ throws InterruptedException {
+ if (resource.isMarkedForDeletion()) {
+ return UpdateControl.noUpdate();
+ }
+
+ var status = new DeletionDuringStatusUpdateStatus();
+ status.setReady(true);
+ resource.setStatus(status);
+
+ context
+ .resourceOperations()
+ .resourcePatch(
+ resource,
+ r -> {
+ patchStartedLatch.countDown();
+ try {
+ if (!deleteConfirmedLatch.await(30, TimeUnit.SECONDS)) {
+ throw new RuntimeException("Timed out waiting for delete confirmation");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ r.getMetadata().setResourceVersion(null);
+ return context.getClient().resource(r).patchStatus();
+ },
+ context.eventSourceRetriever().getControllerEventSource());
+
+ return UpdateControl.noUpdate();
+ }
+
+ @Override
+ public DeleteControl cleanup(
+ DeletionDuringStatusUpdateCustomResource resource,
+ Context context) {
+ cleanupCalledLatch.countDown();
+ return DeleteControl.defaultDelete();
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateStatus.java
new file mode 100644
index 0000000000..c7acedce20
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/deletionduringstatusupdate/DeletionDuringStatusUpdateStatus.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.deletionduringstatusupdate;
+
+public class DeletionDuringStatusUpdateStatus {
+
+ private boolean ready;
+
+ public boolean isReady() {
+ return ready;
+ }
+
+ public void setReady(boolean ready) {
+ this.ready = ready;
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateCustomResource.java
new file mode 100644
index 0000000000..dd28ca9254
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateCustomResource.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalsecondaryupdate;
+
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.ShortNames;
+import io.fabric8.kubernetes.model.annotation.Version;
+
+@Group("sample.javaoperatorsdk")
+@Version("v1")
+@ShortNames("esu")
+public class ExternalSecondaryUpdateCustomResource
+ extends CustomResource implements Namespaced {}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateIT.java
new file mode 100644
index 0000000000..04b1565654
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateIT.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalsecondaryupdate;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
+
+import static io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalsecondaryupdate.ExternalSecondaryUpdateReconciler.EXTERNAL_LABEL_KEY;
+import static io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalsecondaryupdate.ExternalSecondaryUpdateReconciler.EXTERNAL_LABEL_VALUE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Verifies that when a secondary resource (a ConfigMap owned by the primary) is modified externally
+ * between two caching+filtering updates from the controller, the external change is NOT silently
+ * absorbed: a later reconciliation must observe it through the merged temp/informer cache.
+ */
+class ExternalSecondaryUpdateIT {
+
+ static final String RESOURCE_NAME = "test-resource";
+
+ ExternalSecondaryUpdateReconciler reconciler = new ExternalSecondaryUpdateReconciler();
+
+ @RegisterExtension
+ LocallyRunOperatorExtension operator =
+ LocallyRunOperatorExtension.builder().withReconciler(reconciler).build();
+
+ @Test
+ void externalUpdateOnSecondaryDuringFilteringUpdatePropagates() throws InterruptedException {
+ operator.create(testResource());
+
+ // wait for the reconciler to enter the first reconciliation and create the secondary CM
+ assertThat(reconciler.firstReconcileEntered.await(30, TimeUnit.SECONDS))
+ .as("reconciler must enter first reconciliation")
+ .isTrue();
+
+ // a third party adds a label to the secondary CM while we are mid-reconcile
+ var cm =
+ operator
+ .getKubernetesClient()
+ .configMaps()
+ .inNamespace(operator.getNamespace())
+ .withName(RESOURCE_NAME)
+ .get();
+ assertThat(cm).as("secondary CM created by reconciler").isNotNull();
+ var labels = new HashMap();
+ if (cm.getMetadata().getLabels() != null) {
+ labels.putAll(cm.getMetadata().getLabels());
+ }
+ labels.put(EXTERNAL_LABEL_KEY, EXTERNAL_LABEL_VALUE);
+ cm.getMetadata().setLabels(labels);
+ operator.getKubernetesClient().resource(cm).inNamespace(operator.getNamespace()).replace();
+
+ // signal the reconciler to issue the second caching+filtering SSA
+ reconciler.externalUpdateApplied.countDown();
+
+ // a later reconciliation, triggered by the external label event, must see the label
+ // through the cache (informer + temp cache merge).
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(
+ () -> {
+ assertThat(reconciler.numberOfExecutions.get())
+ .as("external CM update must trigger a fresh reconciliation")
+ .isGreaterThanOrEqualTo(2);
+ assertThat(reconciler.externalLabelSeenInLaterReconciliation.get())
+ .as("a later reconciliation must observe the externally-applied label")
+ .isTrue();
+ });
+
+ // the second SSA from the reconciler did go through and was captured
+ assertThat(reconciler.rvAfterCachingFilteringUpdate.get()).isNotNull();
+ var finalCm =
+ operator
+ .getKubernetesClient()
+ .configMaps()
+ .inNamespace(operator.getNamespace())
+ .withName(RESOURCE_NAME)
+ .get();
+ assertThat(finalCm.getMetadata().getLabels())
+ .as("external label preserved on the secondary after the SSA")
+ .containsEntry(EXTERNAL_LABEL_KEY, EXTERNAL_LABEL_VALUE);
+ }
+
+ ExternalSecondaryUpdateCustomResource testResource() {
+ var r = new ExternalSecondaryUpdateCustomResource();
+ r.setMetadata(new ObjectMetaBuilder().withName(RESOURCE_NAME).build());
+ return r;
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateReconciler.java
new file mode 100644
index 0000000000..0dac8cae33
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateReconciler.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalsecondaryupdate;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
+
+@ControllerConfiguration
+public class ExternalSecondaryUpdateReconciler
+ implements Reconciler {
+
+ static final String CM_DATA_KEY = "managed-by";
+ static final String CM_DATA_VALUE = "operator";
+ static final String EXTERNAL_LABEL_KEY = "externally-set";
+ static final String EXTERNAL_LABEL_VALUE = "yes";
+
+ final AtomicInteger numberOfExecutions = new AtomicInteger();
+ final CountDownLatch firstReconcileEntered = new CountDownLatch(1);
+ final CountDownLatch externalUpdateApplied = new CountDownLatch(1);
+ // Whether a later reconciliation (after the external label appeared) actually saw the label
+ // through the informer/temp cache.
+ final AtomicBoolean externalLabelSeenInLaterReconciliation = new AtomicBoolean();
+ final AtomicReference rvAfterCachingFilteringUpdate = new AtomicReference<>();
+
+ private InformerEventSource
+ configMapEventSource;
+
+ @Override
+ public UpdateControl reconcile(
+ ExternalSecondaryUpdateCustomResource resource,
+ Context context)
+ throws InterruptedException {
+ int execution = numberOfExecutions.incrementAndGet();
+
+ if (execution == 1) {
+ // first reconciliation: create the secondary CM via SSA, then ask the test to apply
+ // an external metadata change BEFORE we issue our second SSA on it.
+ context.resourceOperations().serverSideApply(prepareCM(resource, 1), configMapEventSource);
+
+ firstReconcileEntered.countDown();
+ if (!externalUpdateApplied.await(30, TimeUnit.SECONDS)) {
+ throw new RuntimeException("timed out waiting for external CM update");
+ }
+
+ // Second SSA on the secondary, with DIFFERENT data so it actually mutates the resource
+ // and bumps rv beyond the external label change. Without distinct data the SSA would be
+ // idempotent and return the rv produced by the external update — which would then be
+ // recorded as our own and incorrectly filter out the external event.
+ var updated =
+ context
+ .resourceOperations()
+ .serverSideApply(prepareCM(resource, 2), configMapEventSource);
+ rvAfterCachingFilteringUpdate.set(updated.getMetadata().getResourceVersion());
+ } else {
+ // any subsequent reconciliation must be able to see the external label through the
+ // informer cache (merged with the temp cache).
+ var cached = context.getSecondaryResource(ConfigMap.class).orElse(null);
+ if (cached != null
+ && cached.getMetadata().getLabels() != null
+ && EXTERNAL_LABEL_VALUE.equals(
+ cached.getMetadata().getLabels().get(EXTERNAL_LABEL_KEY))) {
+ externalLabelSeenInLaterReconciliation.set(true);
+ }
+ }
+ return UpdateControl.noUpdate();
+ }
+
+ @Override
+ public List> prepareEventSources(
+ EventSourceContext context) {
+ configMapEventSource =
+ new InformerEventSource<>(
+ InformerEventSourceConfiguration.from(
+ ConfigMap.class, ExternalSecondaryUpdateCustomResource.class)
+ .build(),
+ context);
+ return List.of(configMapEventSource);
+ }
+
+ private static ConfigMap prepareCM(ExternalSecondaryUpdateCustomResource p, int iteration) {
+ var cm =
+ new ConfigMapBuilder()
+ .withMetadata(
+ new ObjectMetaBuilder()
+ .withName(p.getMetadata().getName())
+ .withNamespace(p.getMetadata().getNamespace())
+ .build())
+ .withData(Map.of(CM_DATA_KEY, CM_DATA_VALUE, "iteration", "" + iteration))
+ .build();
+ cm.addOwnerReference(p);
+ return cm;
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateStatus.java
new file mode 100644
index 0000000000..70c555f7aa
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalsecondaryupdate/ExternalSecondaryUpdateStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalsecondaryupdate;
+
+public class ExternalSecondaryUpdateStatus {
+
+ private Integer reconciliations;
+
+ public Integer getReconciliations() {
+ return reconciliations;
+ }
+
+ public ExternalSecondaryUpdateStatus setReconciliations(Integer reconciliations) {
+ this.reconciliations = reconciliations;
+ return this;
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateCustomResource.java
new file mode 100644
index 0000000000..3621c72c8f
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateCustomResource.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalupdateduringownupdate;
+
+import io.fabric8.kubernetes.api.model.Namespaced;
+import io.fabric8.kubernetes.client.CustomResource;
+import io.fabric8.kubernetes.model.annotation.Group;
+import io.fabric8.kubernetes.model.annotation.ShortNames;
+import io.fabric8.kubernetes.model.annotation.Version;
+
+@Group("sample.javaoperatorsdk")
+@Version("v1")
+@ShortNames("eudou")
+public class ExternalUpdateDuringOwnUpdateCustomResource
+ extends CustomResource implements Namespaced {}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateIT.java
new file mode 100644
index 0000000000..ed3330358e
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateIT.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalupdateduringownupdate;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
+
+import static io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalupdateduringownupdate.ExternalUpdateDuringOwnUpdateReconciler.EXTERNAL_LABEL_KEY;
+import static io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalupdateduringownupdate.ExternalUpdateDuringOwnUpdateReconciler.EXTERNAL_LABEL_VALUE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Verifies that an external update arriving while the controller's own filter window is open is NOT
+ * mistakenly filtered. The third-party event must propagate as a fresh reconciliation in which the
+ * reconciler observes the externally-applied change.
+ */
+class ExternalUpdateDuringOwnUpdateIT {
+
+ static final String RESOURCE_NAME = "test-resource";
+
+ ExternalUpdateDuringOwnUpdateReconciler reconciler =
+ new ExternalUpdateDuringOwnUpdateReconciler();
+
+ @RegisterExtension
+ LocallyRunOperatorExtension extension =
+ LocallyRunOperatorExtension.builder().withReconciler(reconciler).build();
+
+ @Test
+ void externalUpdateDuringOwnUpdateTriggersFreshReconciliation() throws InterruptedException {
+ extension.create(testResource());
+
+ assertThat(reconciler.updateStartedLatch.await(30, TimeUnit.SECONDS))
+ .as("reconciler should enter the patch update operation")
+ .isTrue();
+
+ // external party modifies a label while our filter window is still open
+ var current = extension.get(ExternalUpdateDuringOwnUpdateCustomResource.class, RESOURCE_NAME);
+ var labels = new HashMap();
+ if (current.getMetadata().getLabels() != null) {
+ labels.putAll(current.getMetadata().getLabels());
+ }
+ labels.put(EXTERNAL_LABEL_KEY, EXTERNAL_LABEL_VALUE);
+ current.getMetadata().setLabels(labels);
+ extension.replace(current);
+
+ // signal reconciler to complete its own status update
+ reconciler.externalUpdateDoneLatch.countDown();
+
+ // the external update event must NOT be silently absorbed by the filter window;
+ // a fresh reconciliation must observe the external label.
+ await()
+ .atMost(Duration.ofSeconds(30))
+ .untilAsserted(
+ () -> {
+ assertThat(reconciler.numberOfExecutions.get()).isGreaterThanOrEqualTo(2);
+ assertThat(reconciler.externalLabelSeenInLaterReconciliation.get())
+ .as("a later reconciliation must observe the externally-applied label")
+ .isTrue();
+ });
+ }
+
+ ExternalUpdateDuringOwnUpdateCustomResource testResource() {
+ var r = new ExternalUpdateDuringOwnUpdateCustomResource();
+ r.setMetadata(new ObjectMetaBuilder().withName(RESOURCE_NAME).build());
+ return r;
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateReconciler.java
new file mode 100644
index 0000000000..e5c956dc4e
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateReconciler.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalupdateduringownupdate;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
+import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+
+@ControllerConfiguration(generationAwareEventProcessing = false)
+public class ExternalUpdateDuringOwnUpdateReconciler
+ implements Reconciler {
+
+ static final String EXTERNAL_LABEL_KEY = "externally-set";
+ static final String EXTERNAL_LABEL_VALUE = "yes";
+ static final String STATUS_VALUE = "ready";
+
+ final AtomicInteger numberOfExecutions = new AtomicInteger();
+ final CountDownLatch updateStartedLatch = new CountDownLatch(1);
+ final CountDownLatch externalUpdateDoneLatch = new CountDownLatch(1);
+ final AtomicBoolean externalLabelSeenInLaterReconciliation = new AtomicBoolean();
+
+ @Override
+ public UpdateControl reconcile(
+ ExternalUpdateDuringOwnUpdateCustomResource resource,
+ Context context) {
+ int execution = numberOfExecutions.incrementAndGet();
+
+ if (execution == 1) {
+ var status = new ExternalUpdateDuringOwnUpdateStatus().setValue(STATUS_VALUE);
+ resource.setStatus(status);
+
+ // wrap our own status update in resourcePatch with a hook that lets the test
+ // perform an external metadata update WHILE our filter window is still open.
+ context
+ .resourceOperations()
+ .resourcePatch(
+ resource,
+ r -> {
+ updateStartedLatch.countDown();
+ try {
+ if (!externalUpdateDoneLatch.await(30, TimeUnit.SECONDS)) {
+ throw new RuntimeException("timed out waiting for external update");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ // server-side state moved due to the external label change; drop our stale rv
+ r.getMetadata().setResourceVersion(null);
+ return context.getClient().resource(r).patchStatus();
+ },
+ context.eventSourceRetriever().getControllerEventSource());
+ } else {
+ var labels = resource.getMetadata().getLabels();
+ if (labels != null && EXTERNAL_LABEL_VALUE.equals(labels.get(EXTERNAL_LABEL_KEY))) {
+ externalLabelSeenInLaterReconciliation.set(true);
+ }
+ }
+ return UpdateControl.noUpdate();
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateStatus.java
new file mode 100644
index 0000000000..b059a6ee5e
--- /dev/null
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/externalupdateduringownupdate/ExternalUpdateDuringOwnUpdateStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright Java Operator SDK Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.externalupdateduringownupdate;
+
+public class ExternalUpdateDuringOwnUpdateStatus {
+
+ private String value;
+
+ public String getValue() {
+ return value;
+ }
+
+ public ExternalUpdateDuringOwnUpdateStatus setValue(String value) {
+ this.value = value;
+ return this;
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventIT.java
similarity index 98%
rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventIT.java
rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventIT.java
index 6f27925e21..398cdcf864 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventIT.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventIT.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.javaoperatorsdk.operator.baseapi.filterpatchevent;
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.filterpatchevent;
import java.time.Duration;
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventTestCustomResource.java
similarity index 93%
rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventTestCustomResource.java
rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventTestCustomResource.java
index 7f8b4838de..f228c0caf4 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventTestCustomResource.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventTestCustomResource.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.javaoperatorsdk.operator.baseapi.filterpatchevent;
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.filterpatchevent;
import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventTestCustomResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventTestCustomResourceStatus.java
similarity index 91%
rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventTestCustomResourceStatus.java
rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventTestCustomResourceStatus.java
index 1c7aeafadd..b1828f0241 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventTestCustomResourceStatus.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventTestCustomResourceStatus.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.javaoperatorsdk.operator.baseapi.filterpatchevent;
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.filterpatchevent;
public class FilterPatchEventTestCustomResourceStatus {
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventTestReconciler.java
similarity index 91%
rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventTestReconciler.java
rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventTestReconciler.java
index e7599a2881..1f19015dcd 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/filterpatchevent/FilterPatchEventTestReconciler.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/filterpatchevent/FilterPatchEventTestReconciler.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.javaoperatorsdk.operator.baseapi.filterpatchevent;
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.filterpatchevent;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -23,7 +23,7 @@
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
-import static io.javaoperatorsdk.operator.baseapi.filterpatchevent.FilterPatchEventIT.UPDATED;
+import static io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.filterpatchevent.FilterPatchEventIT.UPDATED;
@ControllerConfiguration(generationAwareEventProcessing = false)
public class FilterPatchEventTestReconciler
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesCustomResource.java
similarity index 80%
rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateCustomResource.java
rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesCustomResource.java
index 0d25bbfdd4..f12431c3ea 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateCustomResource.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesCustomResource.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate;
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.readownupdates;
import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
@@ -23,6 +23,6 @@
@Group("sample.javaoperatorsdk")
@Version("v1")
-@ShortNames("cfu")
-public class CachingFilteringUpdateCustomResource
- extends CustomResource implements Namespaced {}
+@ShortNames("rou")
+public class ReadOwnUpdatesCustomResource extends CustomResource
+ implements Namespaced {}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesIT.java
similarity index 77%
rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateIT.java
rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesIT.java
index c62c8ca186..0fe2e79102 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateIT.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesIT.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate;
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.readownupdates;
import java.time.Duration;
@@ -26,10 +26,10 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
-class CachingFilteringUpdateIT {
+class ReadOwnUpdatesIT {
public static final int RESOURCE_NUMBER = 250;
- CachingFilteringUpdateReconciler reconciler = new CachingFilteringUpdateReconciler();
+ ReadOwnUpdatesReconciler reconciler = new ReadOwnUpdatesReconciler();
@RegisterExtension
LocallyRunOperatorExtension operator =
@@ -52,26 +52,25 @@ void testResourceAccessAfterUpdate() {
// Use a single representative resource to detect that updates have completed.
var res =
operator.get(
- CachingFilteringUpdateCustomResource.class,
- "resource" + (RESOURCE_NUMBER - 1));
+ ReadOwnUpdatesCustomResource.class, "resource" + (RESOURCE_NUMBER - 1));
return res != null
&& res.getStatus() != null
&& Boolean.TRUE.equals(res.getStatus().getUpdated());
});
- if (operator.getReconcilerOfType(CachingFilteringUpdateReconciler.class).isIssueFound()) {
+ if (operator.getReconcilerOfType(ReadOwnUpdatesReconciler.class).isIssueFound()) {
throw new IllegalStateException("Error already found.");
}
for (int i = 0; i < RESOURCE_NUMBER; i++) {
- var res = operator.get(CachingFilteringUpdateCustomResource.class, "resource" + i);
+ var res = operator.get(ReadOwnUpdatesCustomResource.class, "resource" + i);
assertThat(res.getStatus()).isNotNull();
assertThat(res.getStatus().getUpdated()).isTrue();
}
}
- public CachingFilteringUpdateCustomResource createCustomResource(int i) {
- CachingFilteringUpdateCustomResource resource = new CachingFilteringUpdateCustomResource();
+ public ReadOwnUpdatesCustomResource createCustomResource(int i) {
+ ReadOwnUpdatesCustomResource resource = new ReadOwnUpdatesCustomResource();
resource.setMetadata(
new ObjectMetaBuilder()
.withName("resource" + i)
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesReconciler.java
similarity index 83%
rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java
rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesReconciler.java
index c8fc206106..545916d7f2 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesReconciler.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate;
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.readownupdates;
import java.util.List;
import java.util.Map;
@@ -33,18 +33,16 @@
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
@ControllerConfiguration
-public class CachingFilteringUpdateReconciler
- implements Reconciler {
+public class ReadOwnUpdatesReconciler implements Reconciler {
public static final String RESOURCE_VERSION_INDEX = "resourceVersionIndex";
private final AtomicBoolean issueFound = new AtomicBoolean(false);
- private InformerEventSource configMapEventSource;
+ private InformerEventSource configMapEventSource;
@Override
- public UpdateControl reconcile(
- CachingFilteringUpdateCustomResource resource,
- Context context) {
+ public UpdateControl reconcile(
+ ReadOwnUpdatesCustomResource resource, Context context) {
try {
var updated = context.resourceOperations().serverSideApply(prepareCM(resource, 1));
var cachedCM = context.getSecondaryResource(ConfigMap.class);
@@ -104,7 +102,7 @@ private void checkListContainsCM(ConfigMap updated) {
}
}
- private static ConfigMap prepareCM(CachingFilteringUpdateCustomResource p, int num) {
+ private static ConfigMap prepareCM(ReadOwnUpdatesCustomResource p, int num) {
var cm =
new ConfigMapBuilder()
.withMetadata(
@@ -119,12 +117,12 @@ private static ConfigMap prepareCM(CachingFilteringUpdateCustomResource p, int n
}
@Override
- public List> prepareEventSources(
- EventSourceContext context) {
+ public List> prepareEventSources(
+ EventSourceContext context) {
configMapEventSource =
new InformerEventSource<>(
InformerEventSourceConfiguration.from(
- ConfigMap.class, CachingFilteringUpdateCustomResource.class)
+ ConfigMap.class, ReadOwnUpdatesCustomResource.class)
.build(),
context);
configMapEventSource.addIndexers(
@@ -132,10 +130,10 @@ public List> prepareEventSo
return List.of(configMapEventSource);
}
- private void ensureStatusExists(CachingFilteringUpdateCustomResource resource) {
- CachingFilteringUpdateStatus status = resource.getStatus();
+ private void ensureStatusExists(ReadOwnUpdatesCustomResource resource) {
+ ReadOwnUpdatesStatus status = resource.getStatus();
if (status == null) {
- status = new CachingFilteringUpdateStatus();
+ status = new ReadOwnUpdatesStatus();
resource.setStatus(status);
}
}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesStatus.java
similarity index 86%
rename from operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateStatus.java
rename to operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesStatus.java
index 80b6c4ba54..7c5e6d3c8d 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateStatus.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/readcacheafterwrite/readownupdates/ReadOwnUpdatesStatus.java
@@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package io.javaoperatorsdk.operator.baseapi.cachingfilteringupdate;
+package io.javaoperatorsdk.operator.baseapi.readcacheafterwrite.readownupdates;
-public class CachingFilteringUpdateStatus {
+public class ReadOwnUpdatesStatus {
private Boolean updated;
diff --git a/pom.xml b/pom.xml
index 92152494de..c9962e7086 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,7 +71,7 @@
https://sonarcloud.io
jdk
6.1.0
- 7.7.0
+ 999-SNAPSHOT
2.0.18
2.26.0
5.23.0