diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index bb07c5a6cb..a66ed19abd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -40,6 +41,7 @@ public class ExecutorServiceManager { private ExecutorService executor; private ExecutorService workflowExecutor; private ExecutorService cachingExecutorService; + private ScheduledExecutorService scheduledExecutorService; private boolean started; private ConfigurationService configurationService; @@ -126,10 +128,15 @@ public ExecutorService cachingExecutorService() { return cachingExecutorService; } + public ScheduledExecutorService scheduledExecutorService() { + return scheduledExecutorService; + } + public void start(ConfigurationService configurationService) { if (!started) { this.configurationService = configurationService; // used to lazy init workflow executor this.cachingExecutorService = Executors.newCachedThreadPool(); + this.scheduledExecutorService = Executors.newScheduledThreadPool(0); this.executor = new InstrumentedExecutorService(configurationService.getExecutorService()); started = true; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java index e6655641a2..c8da5c0c7c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/Informer.java @@ -27,9 +27,11 @@ import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; +import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES; +import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS; import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_LONG_VALUE_SET; import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_VALUE_SET; @@ -139,4 +141,13 @@ * @since 5.3.0 */ boolean comparableResourceVersions() default DEFAULT_COMPARABLE_RESOURCE_VERSION; + + /** + * For read-cache-after-write consistency there are some corner cases where we need to check the + * caches see {@link TemporaryResourceCache} periodically. This is the period in milliseconds. + * Applicable only if {@link #comparableResourceVersions()} is true. + * + * @since 5.3.0 + */ + long ghostResourceCacheCheckInterval() default DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java index f6caa4fe4d..a3637b6929 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java @@ -15,6 +15,7 @@ */ package io.javaoperatorsdk.operator.api.config.informer; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -53,7 +54,8 @@ public class InformerConfiguration { private ItemStore itemStore; private Long informerListLimit; private FieldSelector fieldSelector; - private boolean comparableResourceVersions; + private Boolean comparableResourceVersions; + private Duration ghostResourceCacheCheckInterval; protected InformerConfiguration( Class resourceClass, @@ -68,7 +70,8 @@ protected InformerConfiguration( ItemStore itemStore, Long informerListLimit, FieldSelector fieldSelector, - boolean comparableResourceVersions) { + Boolean comparableResourceVersions, + Duration ghostResourceCacheCheckInterval) { this(resourceClass); this.name = name; this.namespaces = namespaces; @@ -82,6 +85,7 @@ protected InformerConfiguration( this.informerListLimit = informerListLimit; this.fieldSelector = fieldSelector; this.comparableResourceVersions = comparableResourceVersions; + this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval; } private InformerConfiguration(Class resourceClass) { @@ -117,7 +121,8 @@ public static InformerConfiguration.Builder builder( original.itemStore, original.informerListLimit, original.fieldSelector, - original.comparableResourceVersions) + original.comparableResourceVersions, + original.ghostResourceCacheCheckInterval) .builder; } @@ -296,6 +301,10 @@ public boolean isComparableResourceVersions() { return comparableResourceVersions; } + public Duration getGhostResourceCacheCheckInterval() { + return ghostResourceCacheCheckInterval; + } + @SuppressWarnings("UnusedReturnValue") public class Builder { @@ -310,6 +319,13 @@ public InformerConfiguration buildForController() { } // to avoid potential NPE followControllerNamespaceChanges = false; + if (comparableResourceVersions == null) { + comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION; + } + + if (ghostResourceCacheCheckInterval == null) { + ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL; + } return InformerConfiguration.this; } @@ -321,6 +337,14 @@ public InformerConfiguration build() { if (followControllerNamespaceChanges == null) { followControllerNamespaceChanges = DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES; } + if (comparableResourceVersions == null) { + comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION; + } + + if (ghostResourceCacheCheckInterval == null) { + ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL; + } + return InformerConfiguration.this; } @@ -368,6 +392,8 @@ public InformerConfiguration.Builder initFromAnnotation( .map(f -> new FieldSelector.Field(f.path(), f.value(), f.negated())) .toList())); withComparableResourceVersions(informerConfig.comparableResourceVersions()); + withGhostResourceCacheCheckInterval( + Duration.ofMillis(informerConfig.ghostResourceCacheCheckInterval())); } return this; } @@ -473,5 +499,10 @@ public Builder withComparableResourceVersions(boolean comparableResourceVersions InformerConfiguration.this.comparableResourceVersions = comparableResourceVersions; return this; } + + public Builder withGhostResourceCacheCheckInterval(Duration ghostResourceCacheCheckInterval) { + InformerConfiguration.this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval; + return this; + } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java index 69903e805f..31851044d9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerEventSourceConfiguration.java @@ -15,6 +15,7 @@ */ package io.javaoperatorsdk.operator.api.config.informer; +import java.time.Duration; import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -33,7 +34,6 @@ import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; -import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION; import static io.javaoperatorsdk.operator.api.reconciler.Constants.SAME_AS_CONTROLLER_NAMESPACES_SET; import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACE_SET; import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE_SET; @@ -97,21 +97,18 @@ class DefaultInformerEventSourceConfiguration private final GroupVersionKind groupVersionKind; private final InformerConfiguration informerConfig; private final KubernetesClient kubernetesClient; - private final boolean comparableResourceVersion; protected DefaultInformerEventSourceConfiguration( GroupVersionKind groupVersionKind, PrimaryToSecondaryMapper primaryToSecondaryMapper, SecondaryToPrimaryMapper secondaryToPrimaryMapper, InformerConfiguration informerConfig, - KubernetesClient kubernetesClient, - boolean comparableResourceVersion) { + KubernetesClient kubernetesClient) { this.informerConfig = Objects.requireNonNull(informerConfig); this.groupVersionKind = groupVersionKind; this.primaryToSecondaryMapper = primaryToSecondaryMapper; this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; this.kubernetesClient = kubernetesClient; - this.comparableResourceVersion = comparableResourceVersion; } @Override @@ -139,11 +136,6 @@ public Optional getGroupVersionKind() { public Optional getKubernetesClient() { return Optional.ofNullable(kubernetesClient); } - - @Override - public boolean comparableResourceVersion() { - return this.comparableResourceVersion; - } } @SuppressWarnings({"unused", "UnusedReturnValue"}) @@ -157,7 +149,6 @@ class Builder { private PrimaryToSecondaryMapper primaryToSecondaryMapper; private SecondaryToPrimaryMapper secondaryToPrimaryMapper; private KubernetesClient kubernetesClient; - private boolean comparableResourceVersion = DEFAULT_COMPARABLE_RESOURCE_VERSION; private Builder(Class resourceClass, Class primaryResourceClass) { this(resourceClass, primaryResourceClass, null); @@ -296,7 +287,13 @@ public Builder withFieldSelector(FieldSelector fieldSelector) { } public Builder withComparableResourceVersion(boolean comparableResourceVersion) { - this.comparableResourceVersion = comparableResourceVersion; + config.withComparableResourceVersions(comparableResourceVersion); + return this; + } + + public Builder withGhostResourceCacheCheckInterval( + Duration ghostResourceCacheCheckInterval) { + config.withGhostResourceCacheCheckInterval(ghostResourceCacheCheckInterval); return this; } @@ -339,10 +336,7 @@ public InformerEventSourceConfiguration build() { HasMetadata.getKind(primaryResourceClass), false)), config.build(), - kubernetesClient, - comparableResourceVersion); + kubernetesClient); } } - - boolean comparableResourceVersion(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java index 7330a407c1..5d5f7a70cb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Constants.java @@ -15,6 +15,7 @@ */ package io.javaoperatorsdk.operator.api.reconciler; +import java.time.Duration; import java.util.Collections; import java.util.Set; @@ -43,5 +44,9 @@ public final class Constants { public static final boolean DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES = true; public static final boolean DEFAULT_COMPARABLE_RESOURCE_VERSION = true; + public static final long DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS = 3 * 60 * 1000; + public static final Duration DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL = + Duration.ofMillis(DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS); + private Constants() {} } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index e0682d5808..07d59e039a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -48,11 +48,7 @@ public class ControllerEventSource @SuppressWarnings({"unchecked", "rawtypes"}) public ControllerEventSource(Controller controller) { - super( - NAME, - controller.getCRClient(), - controller.getConfiguration(), - controller.getConfiguration().getInformerConfig().isComparableResourceVersions()); + super(NAME, controller.getCRClient(), controller.getConfiguration()); this.controller = controller; final var config = controller.getConfiguration(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index fcec8ae68b..70d83e640e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -35,8 +35,6 @@ import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; -import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION; - /** * Wraps informer(s) so they are connected to the eventing system of the framework. Note that since * this is built on top of Fabric8 client Informers, it also supports caching resources using @@ -58,29 +56,18 @@ public class InformerEventSource public InformerEventSource( InformerEventSourceConfiguration configuration, EventSourceContext

context) { - this( - configuration, - configuration.getKubernetesClient().orElse(context.getClient()), - configuration.comparableResourceVersion()); - } - - InformerEventSource(InformerEventSourceConfiguration configuration, KubernetesClient client) { - this(configuration, client, DEFAULT_COMPARABLE_RESOURCE_VERSION); + this(configuration, configuration.getKubernetesClient().orElse(context.getClient())); } @SuppressWarnings({"unchecked", "rawtypes"}) - private InformerEventSource( - InformerEventSourceConfiguration configuration, - KubernetesClient client, - boolean comparableResourceVersions) { + InformerEventSource(InformerEventSourceConfiguration configuration, KubernetesClient client) { super( configuration.name(), configuration .getGroupVersionKind() .map(gvk -> client.genericKubernetesResources(gvk.apiVersion(), gvk.getKind())) .orElseGet(() -> (MixedOperation) client.resources(configuration.getResourceClass())), - configuration, - comparableResourceVersions); + configuration); // If there is a primary to secondary mapper there is no need for primary to secondary index. primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); if (useSecondaryToPrimaryIndex()) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 42e06c9d9a..6632ce631e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -230,11 +230,27 @@ private boolean isWatchingAllNamespaces() { return sources.containsKey(WATCH_ALL_NAMESPACES); } + public boolean isWatchingNamespace(String namespace) { + // for cluster scoped resources we can assume + // that we watch the whole cluster + if (namespace == null) { + return true; + } + if (isWatchingAllNamespaces()) { + return true; + } + return sources.containsKey(namespace); + } + private Optional> getSource(String namespace) { namespace = isWatchingAllNamespaces() || namespace == null ? WATCH_ALL_NAMESPACES : namespace; return Optional.ofNullable(sources.get(namespace)); } + String lastSyncResourceVersion(String namespace) { + return getSource(namespace).orElseThrow().getLastSyncResourceVersion(); + } + @Override public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 2fc67c4892..978deda333 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -67,10 +67,10 @@ public abstract class ManagedInformerEventSource< protected TemporaryResourceCache temporaryResourceCache; protected MixedOperation client; - protected ManagedInformerEventSource( - String name, MixedOperation client, C configuration, boolean comparableResourceVersions) { + protected ManagedInformerEventSource(String name, MixedOperation client, C configuration) { super(configuration.getResourceClass(), name); - this.comparableResourceVersions = comparableResourceVersions; + this.comparableResourceVersions = + configuration.getInformerConfig().isComparableResourceVersions(); this.client = client; this.configuration = configuration; } @@ -153,7 +153,15 @@ public synchronized void start() { if (isRunning()) { return; } - temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions); + temporaryResourceCache = + new TemporaryResourceCache<>( + comparableResourceVersions, + configuration.getInformerConfig().getGhostResourceCacheCheckInterval().toMillis(), + controllerConfiguration + .getConfigurationService() + .getExecutorServiceManager() + .scheduledExecutorService(), + this); this.cache = new InformerManager<>(client, configuration, this); cache.setControllerConfiguration(controllerConfiguration); cache.addIndexers(indexers); @@ -183,18 +191,14 @@ public void handleRecentResourceCreate(ResourceID resourceID, R resource) { @Override public Optional get(ResourceID resourceID) { - // The order of these two lookups matters. If we queried the informer cache first, - // a race condition could occur: we might not find the resource there yet, then - // process an informer event that evicts the temporary resource cache entry. At that - // point the resource would already be present in the informer cache, but we would - // have missed it in both caches during this call. Optional resource = temporaryResourceCache.getResourceFromCache(resourceID); var res = cache.get(resourceID); if (comparableResourceVersions && resource.isPresent() - && res.filter( - r -> ReconcilerUtilsInternal.compareResourceVersions(r, resource.orElseThrow()) > 0) - .isEmpty()) { + && ReconcilerUtilsInternal.compareResourceVersions( + resource.get().getMetadata().getResourceVersion(), + manager().lastSyncResourceVersion(resource.get().getMetadata().getNamespace())) + > 0) { log.debug("Latest resource found in temporary cache for Resource ID: {}", resourceID); return resource; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 43d9dc1fab..1044720d59 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -19,6 +19,8 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,10 +58,10 @@ public class TemporaryResourceCache { private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); private final Map cache = new ConcurrentHashMap<>(); + private final Map activeUpdates = new HashMap<>(); private final boolean comparableResourceVersions; - private String latestResourceVersion; - private final Map activeUpdates = new HashMap<>(); + private final ManagedInformerEventSource managedInformerEventSource; public enum EventHandling { DEFER, @@ -67,8 +69,20 @@ public enum EventHandling { NEW } - public TemporaryResourceCache(boolean comparableResourceVersions) { + public TemporaryResourceCache( + boolean comparableResourceVersions, + long ghostResourceCheckInterval, + ScheduledExecutorService ghostCheckExecutor, + ManagedInformerEventSource managedInformerEventSource) { this.comparableResourceVersions = comparableResourceVersions; + this.managedInformerEventSource = managedInformerEventSource; + if (comparableResourceVersions) { + ghostCheckExecutor.scheduleWithFixedDelay( + this::checkGhostResources, + ghostResourceCheckInterval, + ghostResourceCheckInterval, + TimeUnit.MILLISECONDS); + } } public synchronized void startEventFilteringModify(ResourceID resourceID) { @@ -126,10 +140,6 @@ private synchronized EventHandling onEvent( if (log.isDebugEnabled()) { log.debug("Processing event"); } - if (!unknownState) { - latestResourceVersion = resource.getMetadata().getResourceVersion(); - log.debug("Setting latest resource version to: {}", latestResourceVersion); - } var cached = cache.get(resourceId); EventHandling result = EventHandling.NEW; if (cached != null) { @@ -178,20 +188,29 @@ public synchronized void putResource(T newResource) { return; } + var ns = newResource.getMetadata().getNamespace(); + // this can happen when we dynamically change the followed namespace list + if (!managedInformerEventSource.manager().isWatchingNamespace(ns)) { + log.debug( + "Skipping caching of resource: {} since namespace is now watched: {}", resourceId, ns); + return; + } + // check against the latestResourceVersion processed by the TemporaryResourceCache // If the resource is older, then we can safely ignore. // // this also prevents resurrecting recently deleted entities for which the delete event // has already been processed - if (latestResourceVersion != null + var latestRV = getLastSyncResourceVersion(ns); + if (latestRV != null && ReconcilerUtilsInternal.compareResourceVersions( - latestResourceVersion, newResource.getMetadata().getResourceVersion()) + latestRV, newResource.getMetadata().getResourceVersion()) > 0) { log.debug( "Resource {}: resourceVersion {} is not later than latest {}", resourceId, newResource.getMetadata().getResourceVersion(), - latestResourceVersion); + latestRV); return; } @@ -208,6 +227,49 @@ public synchronized void putResource(T newResource) { } } + private String getLastSyncResourceVersion(String namespace) { + return managedInformerEventSource.manager().lastSyncResourceVersion(namespace); + } + + /** + * There are (probably extremely rare) circumstances, when we can miss a delete event related to a + * resources: when we create a resource that is deleted right after by third party and the related + * informer have a disconnected watch and this watch needs to do a re-list when connected again. + * In this case neither the ADD nor DELETE event will be propagated to the informer, but we + * explicitly add resources to this cache. Those are cleaned up by this check. + */ + private void checkGhostResources() { + log.debug("Checking for ghost resources."); + var iterator = cache.entrySet().iterator(); + while (iterator.hasNext()) { + var e = iterator.next(); + + var ns = e.getValue().getMetadata().getNamespace(); + // this can happen if followed namespaces are changed dynamically + if (!managedInformerEventSource.manager().isWatchingNamespace(ns)) { + log.debug( + "Removing resource: {} from cache as part of ghost cleanup. Namespace is not followed" + + " anymore: {}", + e.getKey(), + ns); + iterator.remove(); + continue; + } + if ((ReconcilerUtilsInternal.compareResourceVersions( + e.getValue().getMetadata().getResourceVersion(), getLastSyncResourceVersion(ns)) + < 0) + // making sure we have the situation where resource is missing from the cache + && managedInformerEventSource + .manager() + .get(ResourceID.fromResource(e.getValue())) + .isEmpty()) { + iterator.remove(); + managedInformerEventSource.handleEvent(ResourceAction.DELETED, e.getValue(), null, true); + log.debug("Removing ghost resource with ID: {}", e.getKey()); + } + } + } + public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index df450b29a6..820768d6a2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -30,6 +30,7 @@ import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.ResolvedControllerConfiguration; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.processing.Controller; @@ -60,8 +61,11 @@ class ControllerEventSourceTest @BeforeEach public void setup() { - when(controllerConfig.getConfigurationService()).thenReturn(new BaseConfigurationService()); + var ic = mock(InformerConfiguration.class); + when(controllerConfig.getInformerConfig()).thenReturn(ic); + when(ic.getGhostResourceCacheCheckInterval()) + .thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL); setUpSource(new ControllerEventSource<>(testController), true, controllerConfig); } @@ -325,6 +329,7 @@ public TestConfiguration( .withOnUpdateFilter(onUpdateFilter) .withGenericFilter(genericFilter) .withComparableResourceVersions(true) + .withGhostResourceCacheCheckInterval(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL) .buildForController(), false); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 12c85ee342..e60ac02280 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -19,6 +19,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -36,6 +38,7 @@ import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils; @@ -86,6 +89,8 @@ void setup() { when(informerEventSourceConfiguration.getInformerConfig()).thenReturn(informerConfig); when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET); when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class); + when(informerConfig.getGhostResourceCacheCheckInterval()) + .thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL); informerEventSource = spy( new InformerEventSource<>(informerEventSourceConfiguration, clientMock) { @@ -335,6 +340,116 @@ void multipleCachingFilteringUpdates_variant4() { assertNoEventProduced(); } + @Test + void ghostCheckRemovesCachedResourceDuringFilteringUpdate() { + var mes = mock(ManagedInformerEventSource.class); + var mim = mock(InformerManager.class); + when(mes.manager()).thenReturn(mim); + when(mim.isWatchingNamespace(any())).thenReturn(true); + when(mim.lastSyncResourceVersion(any())).thenReturn("1"); + when(mim.get(any())).thenReturn(Optional.empty()); + + var ghostCheckExecutor = Executors.newScheduledThreadPool(1); + temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes)); + informerEventSource.setTemporalResourceCache(temporaryResourceCache); + + // put resource in cache and start a filtering update + var deployment = deploymentWithResourceVersion(2); + temporaryResourceCache.putResource(deployment); + var resourceId = ResourceID.fromResource(deployment); + temporaryResourceCache.startEventFilteringModify(resourceId); + + // advance sync version so ghost check considers the cached resource outdated + when(mim.lastSyncResourceVersion(any())).thenReturn("3"); + + // ghost check should remove the cached resource + await() + .untilAsserted( + () -> assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty()); + + // complete the filtering update - the resource should not reappear + temporaryResourceCache.doneEventFilterModify(resourceId, "2"); + assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); + + ghostCheckExecutor.shutdownNow(); + } + + @Test + void ghostCheckRunsConcurrentlyWithPutResource() { + var mes = mock(ManagedInformerEventSource.class); + var mim = mock(InformerManager.class); + when(mes.manager()).thenReturn(mim); + when(mim.isWatchingNamespace(any())).thenReturn(true); + when(mim.lastSyncResourceVersion(any())).thenReturn("1"); + when(mim.get(any())).thenReturn(Optional.empty()); + + var ghostCheckExecutor = Executors.newScheduledThreadPool(1); + temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes)); + informerEventSource.setTemporalResourceCache(temporaryResourceCache); + + // put a resource that will become a ghost + var deployment = deploymentWithResourceVersion(2); + temporaryResourceCache.putResource(deployment); + + // advance sync version so ghost check removes it + when(mim.lastSyncResourceVersion(any())).thenReturn("3"); + + await() + .untilAsserted( + () -> + assertThat( + temporaryResourceCache.getResourceFromCache( + ResourceID.fromResource(deployment))) + .isEmpty()); + + // now put a newer resource - should succeed even after ghost removal + var newerDeployment = deploymentWithResourceVersion(4); + temporaryResourceCache.putResource(newerDeployment); + assertThat( + temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(newerDeployment))) + .isPresent(); + + ghostCheckExecutor.shutdownNow(); + } + + @Test + void filteringUpdateAndGhostCheckWithNamespaceChange() { + var mes = mock(ManagedInformerEventSource.class); + var mim = mock(InformerManager.class); + when(mes.manager()).thenReturn(mim); + when(mim.isWatchingNamespace(any())).thenReturn(true); + when(mim.lastSyncResourceVersion(any())).thenReturn("1"); + when(mim.get(any())).thenReturn(Optional.empty()); + + var ghostCheckExecutor = Executors.newScheduledThreadPool(1); + temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes)); + informerEventSource.setTemporalResourceCache(temporaryResourceCache); + + // start filtering update and put resource + var deployment = deploymentWithResourceVersion(2); + var resourceId = ResourceID.fromResource(deployment); + temporaryResourceCache.startEventFilteringModify(resourceId); + temporaryResourceCache.putResource(deployment); + + // namespace becomes unwatched - ghost check should clean up + when(mim.isWatchingNamespace(any())).thenReturn(false); + + await() + .untilAsserted( + () -> assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty()); + + // complete the filtering update + var doneResult = temporaryResourceCache.doneEventFilterModify(resourceId, "2"); + // resource was already cleaned by ghost check, so no deferred event + assertThat(doneResult).isEmpty(); + + // put should be rejected since namespace is no longer watched + temporaryResourceCache.putResource(deploymentWithResourceVersion(3)); + assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); + + ghostCheckExecutor.shutdownNow(); + } + private void assertNoEventProduced() { await() .pollDelay(Duration.ofMillis(50)) @@ -376,7 +491,13 @@ private CountDownLatch sendForEventFilteringUpdate(Deployment deployment, int re } private void withRealTemporaryResourceCache() { - temporaryResourceCache = spy(new TemporaryResourceCache<>(true)); + var mes = mock(ManagedInformerEventSource.class); + var mim = mock(InformerManager.class); + when(mes.manager()).thenReturn(mim); + when(mim.lastSyncResourceVersion(any())).thenReturn("1"); + + temporaryResourceCache = + spy(new TemporaryResourceCache<>(true, 100, mock(ScheduledExecutorService.class), mes)); informerEventSource.setTemporalResourceCache(temporaryResourceCache); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java similarity index 67% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java index 592a552433..f50063ccde 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -15,7 +15,10 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.time.Duration; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -28,17 +31,38 @@ import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -class TemporaryPrimaryResourceCacheTest { +@SuppressWarnings({"unchecked", "rawtypes"}) +class TemporaryResourceCacheTest { public static final String RESOURCE_VERSION = "2"; + public static final int GHOST_RESOURCE_CHECK_INTERVAL = 100; private TemporaryResourceCache temporaryResourceCache; + private volatile String latestSyncVersion; + private ManagedInformerEventSource managedInformerEventSource = + mock(ManagedInformerEventSource.class); @BeforeEach void setup() { - temporaryResourceCache = new TemporaryResourceCache<>(true); + latestSyncVersion = "1"; + managedInformerEventSource = mock(ManagedInformerEventSource.class); + var mim = mock(InformerManager.class); + when(managedInformerEventSource.manager()).thenReturn(mim); + when(mim.isWatchingNamespace(any())).thenReturn(true); + when(mim.lastSyncResourceVersion(any())).then(a -> latestSyncVersion); + temporaryResourceCache = + new TemporaryResourceCache<>( + true, 0, mock(ScheduledExecutorService.class), managedInformerEventSource); } @Test @@ -61,7 +85,7 @@ void updateNotAddsTheResourceIntoCacheIfLaterVersionKnown() { ResourceAction.ADDED, testResource.toBuilder().editMetadata().withResourceVersion("3").endMetadata().build(), null); - + latestSyncVersion = "3"; temporaryResourceCache.putResource(testResource); var cached = temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource)); @@ -107,14 +131,16 @@ void removesResourceFromCache() { .endMetadata() .build(), null); - + latestSyncVersion = "3"; assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isNotPresent(); } @Test void nonComparableResourceVersionsDisables() { - this.temporaryResourceCache = new TemporaryResourceCache<>(false); + this.temporaryResourceCache = + new TemporaryResourceCache<>( + false, 0, mock(ScheduledExecutorService.class), mock(ManagedInformerEventSource.class)); this.temporaryResourceCache.putResource(testResource()); @@ -123,7 +149,7 @@ void nonComparableResourceVersionsDisables() { } @Test - void eventReceivedDuringFiltering() throws Exception { + void eventReceivedDuringFiltering() { var testResource = testResource(); temporaryResourceCache.startEventFilteringModify(ResourceID.fromResource(testResource)); @@ -203,7 +229,6 @@ void putBeforeEvent() { nextResource.getMetadata().setResourceVersion("3"); temporaryResourceCache.putResource(nextResource); - // the result is obsolete result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); assertThat(result).isEqualTo(EventHandling.OBSOLETE); } @@ -216,6 +241,7 @@ void putBeforeEventWithEventFiltering() { var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); assertThat(result).isEqualTo(EventHandling.NEW); + latestSyncVersion = RESOURCE_VERSION; var nextResource = testResource(); nextResource.getMetadata().setResourceVersion("3"); @@ -225,7 +251,7 @@ void putBeforeEventWithEventFiltering() { temporaryResourceCache.putResource(nextResource); temporaryResourceCache.doneEventFilterModify(resourceId, "3"); - // the result is obsolete + latestSyncVersion = "3"; result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); assertThat(result).isEqualTo(EventHandling.OBSOLETE); } @@ -288,12 +314,122 @@ void rapidDeletion() { .endMetadata() .build(), false); + latestSyncVersion = "3"; temporaryResourceCache.putResource(testResource); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) .isEmpty(); } + @Test + void removalOfGhostResources() { + withTemporaryResourceCacheForGhostHandling(); + + var tr = testResource(); + this.temporaryResourceCache.putResource(tr); + + await() + .pollDelay(Duration.ofMillis(2 * GHOST_RESOURCE_CHECK_INTERVAL)) + .untilAsserted( + () -> + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(tr))) + .isPresent()); + + latestSyncVersion = "3"; + + await() + .untilAsserted( + () -> + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(tr))) + .isEmpty()); + verify(managedInformerEventSource, times(1)) + .handleEvent(eq(ResourceAction.DELETED), eq(tr), isNull(), eq(true)); + } + + @Test + void checksGhostOnlyWithCertainDelay() { + withTemporaryResourceCacheForGhostHandling(); + this.temporaryResourceCache.putResource(testResource()); + latestSyncVersion = "3"; + await() + .pollDelay(Duration.ofMillis(GHOST_RESOURCE_CHECK_INTERVAL / 5)) + .untilAsserted( + () -> + assertThat( + temporaryResourceCache.getResourceFromCache( + ResourceID.fromResource(testResource()))) + .isPresent()); + + await() + .untilAsserted( + () -> + assertThat( + temporaryResourceCache.getResourceFromCache( + ResourceID.fromResource(testResource()))) + .isEmpty()); + } + + @Test + void ghostResourceIsNotRemovedIfLatestSyncVersionIsOlder() { + withTemporaryResourceCacheForGhostHandling(); + this.temporaryResourceCache.putResource(testResource()); + latestSyncVersion = "1"; + + await() + .pollDelay(Duration.ofMillis(GHOST_RESOURCE_CHECK_INTERVAL * 2)) + .untilAsserted( + () -> + assertThat( + temporaryResourceCache.getResourceFromCache( + ResourceID.fromResource(testResource()))) + .isPresent()); + } + + @Test + void ghostRemovalRemovesResourcesOnNotFollowedNamespaces() { + withTemporaryResourceCacheForGhostHandling(); + + var tr = testResource(); + temporaryResourceCache.putResource(tr); + + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(tr))) + .isPresent(); + + // simulate namespace no longer being watched + var mim = managedInformerEventSource.manager(); + when(mim.isWatchingNamespace(tr.getMetadata().getNamespace())).thenReturn(false); + + await() + .untilAsserted( + () -> + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(tr))) + .isEmpty()); + + // no delete event should be fired for resources removed due to namespace change + verify(managedInformerEventSource, times(0)) + .handleEvent(any(), any(), any(), any(Boolean.class)); + } + + @Test + void doNotCacheResourceOnPutIfNamespaceIsNotFollowedAnymore() { + var mim = managedInformerEventSource.manager(); + when(mim.isWatchingNamespace("default")).thenReturn(false); + + var tr = testResource(); + temporaryResourceCache.putResource(tr); + + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(tr))).isEmpty(); + } + + private void withTemporaryResourceCacheForGhostHandling() { + this.temporaryResourceCache = + new TemporaryResourceCache<>( + true, + GHOST_RESOURCE_CHECK_INTERVAL, + Executors.newScheduledThreadPool(1), + managedInformerEventSource); + } + private ConfigMap propagateTestResourceToCache() { var testResource = testResource(); temporaryResourceCache.putResource(testResource); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentdifferentnamespace/ConfigMapDependentResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentdifferentnamespace/ConfigMapDependentResource.java index 1feb5b5ecd..4f3921d509 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentdifferentnamespace/ConfigMapDependentResource.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/dependent/dependentdifferentnamespace/ConfigMapDependentResource.java @@ -19,9 +19,13 @@ import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.javaoperatorsdk.operator.api.config.informer.Informer; +import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDNoGCKubernetesDependentResource; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent; +@KubernetesDependent(informer = @Informer(namespaces = Constants.WATCH_ALL_NAMESPACES)) public class ConfigMapDependentResource extends CRUDNoGCKubernetesDependentResource< ConfigMap, DependentDifferentNamespaceCustomResource> { diff --git a/sample-operators/leader-election/pom.xml b/sample-operators/leader-election/pom.xml index 8194b433fc..4f896485d1 100644 --- a/sample-operators/leader-election/pom.xml +++ b/sample-operators/leader-election/pom.xml @@ -57,11 +57,6 @@ log4j-core compile - - org.takes - takes - 1.25.0 - org.awaitility awaitility diff --git a/sample-operators/leader-election/src/test/resources/log4j2.xml b/sample-operators/leader-election/src/test/resources/log4j2.xml index 8b1c5ca270..593f120e0b 100644 --- a/sample-operators/leader-election/src/test/resources/log4j2.xml +++ b/sample-operators/leader-election/src/test/resources/log4j2.xml @@ -19,11 +19,14 @@ - + - + + + + diff --git a/sample-operators/metrics-processing/src/main/resources/log4j2.xml b/sample-operators/metrics-processing/src/main/resources/log4j2.xml index 2979258355..593f120e0b 100644 --- a/sample-operators/metrics-processing/src/main/resources/log4j2.xml +++ b/sample-operators/metrics-processing/src/main/resources/log4j2.xml @@ -23,7 +23,10 @@ - + + + +