Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
65d8c0f
Obsolete resource handling for read-cache-after-write
csviri Mar 8, 2026
da70c5f
wip
csviri Mar 8, 2026
a390fe6
latest resource version by informers
csviri Mar 9, 2026
8ef46ad
wip
csviri Mar 9, 2026
18ee1c8
wip
csviri Mar 9, 2026
de8267d
wip
csviri Mar 9, 2026
a5631c5
latest resync version
csviri Mar 9, 2026
c689711
scheduled cleanup of obsolete resources, improved get
csviri Mar 9, 2026
5d7de15
obsolete handling unit test
csviri Mar 9, 2026
9b49b9f
additional unit tests
csviri Mar 9, 2026
720b8ca
wip
csviri Mar 9, 2026
5421153
wip
csviri Mar 9, 2026
68861ab
test fix
csviri Mar 9, 2026
6d0bfac
logger config
csviri Mar 9, 2026
ffd050f
wip
csviri Mar 9, 2026
386e280
wip
csviri Mar 9, 2026
706d162
hardedning obsolete cleanup conditions
csviri Mar 10, 2026
9e27114
wip
csviri Mar 10, 2026
71a7e10
wip
csviri Mar 10, 2026
771049d
wip
csviri Mar 10, 2026
66b9665
remove interval doublecheck
csviri Mar 10, 2026
f834fb6
wip
csviri Mar 10, 2026
d7d484e
wip
csviri Mar 10, 2026
aefcbc6
explicit scheduled executor service
csviri Mar 10, 2026
46cd7ee
cleanup configs
csviri Mar 10, 2026
3069759
wip
csviri Mar 10, 2026
1415048
naming: ghost resources instead obsolete
csviri Mar 10, 2026
67ecebd
cleanup
csviri Mar 10, 2026
ec96713
test fix
csviri Mar 10, 2026
6060af0
wip
csviri Mar 10, 2026
dd0e326
wip
csviri Mar 10, 2026
3f22f3e
wip
csviri Mar 10, 2026
3586c2d
wip
csviri Mar 10, 2026
b507acb
test class naming
csviri Mar 11, 2026
0685672
additional unit tests
csviri Mar 11, 2026
f850a83
ghost clear concurrency tests
csviri Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand All @@ -40,6 +41,7 @@ public class ExecutorServiceManager {
private ExecutorService executor;
private ExecutorService workflowExecutor;
private ExecutorService cachingExecutorService;
private ScheduledExecutorService scheduledExecutorService;
private boolean started;
private ConfigurationService configurationService;

Expand Down Expand Up @@ -126,10 +128,15 @@ public ExecutorService cachingExecutorService() {
return cachingExecutorService;
}

public ScheduledExecutorService scheduledExecutorService() {
return scheduledExecutorService;
}

public void start(ConfigurationService configurationService) {
if (!started) {
this.configurationService = configurationService; // used to lazy init workflow executor
this.cachingExecutorService = Executors.newCachedThreadPool();
this.scheduledExecutorService = Executors.newScheduledThreadPool(0);
this.executor = new InstrumentedExecutorService(configurationService.getExecutorService());
started = true;
Comment on lines 135 to 141
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduledExecutorService is initialized with Executors.newScheduledThreadPool(0), which creates a scheduler with 0 core threads; scheduled tasks won’t run. This also isn’t included in stop() shutdown, so it would leak threads once fixed. Use a scheduler with at least 1 thread (or a shared scheduler) and ensure it’s shut down in stop() alongside the other executors.

Copilot uses AI. Check for mistakes.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not true.

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_LONG_VALUE_SET;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_VALUE_SET;

Expand Down Expand Up @@ -139,4 +141,13 @@
* @since 5.3.0
*/
boolean comparableResourceVersions() default DEFAULT_COMPARABLE_RESOURCE_VERSION;

/**
* For read-cache-after-write consistency there are some corner cases where we need to check the
* caches see {@link TemporaryResourceCache} periodically. This is the period in milliseconds.
* Applicable only if {@link #comparableResourceVersions()} is true.
*
Comment on lines +146 to +149
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc has a mismatched brace in the link ({@link #comparableResourceVersions()}}) which will render incorrectly and can produce Javadoc warnings. Also, linking to TemporaryResourceCache#checkObsoleteResources() references a private method, which may not be linkable in generated docs; consider linking to the class or describing the behavior without referencing the private method.

Copilot uses AI. Check for mistakes.
* @since 5.3.0
*/
long ghostResourceCacheCheckInterval() default DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.api.config.informer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -53,7 +54,8 @@ public class InformerConfiguration<R extends HasMetadata> {
private ItemStore<R> itemStore;
private Long informerListLimit;
private FieldSelector fieldSelector;
private boolean comparableResourceVersions;
private Boolean comparableResourceVersions;
private Duration ghostResourceCacheCheckInterval;

protected InformerConfiguration(
Class<R> resourceClass,
Expand All @@ -68,7 +70,8 @@ protected InformerConfiguration(
ItemStore<R> itemStore,
Long informerListLimit,
FieldSelector fieldSelector,
boolean comparableResourceVersions) {
Boolean comparableResourceVersions,
Duration ghostResourceCacheCheckInterval) {
this(resourceClass);
this.name = name;
this.namespaces = namespaces;
Expand All @@ -82,6 +85,7 @@ protected InformerConfiguration(
this.informerListLimit = informerListLimit;
this.fieldSelector = fieldSelector;
this.comparableResourceVersions = comparableResourceVersions;
this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval;
}

private InformerConfiguration(Class<R> resourceClass) {
Expand Down Expand Up @@ -117,7 +121,8 @@ public static <R extends HasMetadata> InformerConfiguration<R>.Builder builder(
original.itemStore,
original.informerListLimit,
original.fieldSelector,
original.comparableResourceVersions)
original.comparableResourceVersions,
original.ghostResourceCacheCheckInterval)
.builder;
}

Expand Down Expand Up @@ -296,6 +301,10 @@ public boolean isComparableResourceVersions() {
return comparableResourceVersions;
}

public Duration getGhostResourceCacheCheckInterval() {
return ghostResourceCacheCheckInterval;
}

@SuppressWarnings("UnusedReturnValue")
public class Builder {

Expand All @@ -310,6 +319,13 @@ public InformerConfiguration<R> buildForController() {
}
// to avoid potential NPE
followControllerNamespaceChanges = false;
if (comparableResourceVersions == null) {
comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION;
}

if (ghostResourceCacheCheckInterval == null) {
ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL;
}
return InformerConfiguration.this;
}

Expand All @@ -321,6 +337,14 @@ public InformerConfiguration<R> build() {
if (followControllerNamespaceChanges == null) {
followControllerNamespaceChanges = DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES;
}
if (comparableResourceVersions == null) {
comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION;
}

if (ghostResourceCacheCheckInterval == null) {
ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL;
}

return InformerConfiguration.this;
}

Expand Down Expand Up @@ -368,6 +392,8 @@ public InformerConfiguration<R>.Builder initFromAnnotation(
.map(f -> new FieldSelector.Field(f.path(), f.value(), f.negated()))
.toList()));
withComparableResourceVersions(informerConfig.comparableResourceVersions());
withGhostResourceCacheCheckInterval(
Duration.ofMillis(informerConfig.ghostResourceCacheCheckInterval()));
}
return this;
}
Expand Down Expand Up @@ -473,5 +499,10 @@ public Builder withComparableResourceVersions(boolean comparableResourceVersions
InformerConfiguration.this.comparableResourceVersions = comparableResourceVersions;
return this;
}

public Builder withGhostResourceCacheCheckInterval(Duration ghostResourceCacheCheckInterval) {
InformerConfiguration.this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.api.config.informer;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
Expand All @@ -33,7 +34,6 @@
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.SAME_AS_CONTROLLER_NAMESPACES_SET;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACE_SET;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE_SET;
Expand Down Expand Up @@ -97,21 +97,18 @@ class DefaultInformerEventSourceConfiguration<R extends HasMetadata>
private final GroupVersionKind groupVersionKind;
private final InformerConfiguration<R> informerConfig;
private final KubernetesClient kubernetesClient;
private final boolean comparableResourceVersion;

protected DefaultInformerEventSourceConfiguration(
GroupVersionKind groupVersionKind,
PrimaryToSecondaryMapper<?> primaryToSecondaryMapper,
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper,
InformerConfiguration<R> informerConfig,
KubernetesClient kubernetesClient,
boolean comparableResourceVersion) {
KubernetesClient kubernetesClient) {
this.informerConfig = Objects.requireNonNull(informerConfig);
this.groupVersionKind = groupVersionKind;
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
this.kubernetesClient = kubernetesClient;
this.comparableResourceVersion = comparableResourceVersion;
}

@Override
Expand Down Expand Up @@ -139,11 +136,6 @@ public Optional<GroupVersionKind> getGroupVersionKind() {
public Optional<KubernetesClient> getKubernetesClient() {
return Optional.ofNullable(kubernetesClient);
}

@Override
public boolean comparableResourceVersion() {
return this.comparableResourceVersion;
}
}

@SuppressWarnings({"unused", "UnusedReturnValue"})
Expand All @@ -157,7 +149,6 @@ class Builder<R extends HasMetadata> {
private PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private KubernetesClient kubernetesClient;
private boolean comparableResourceVersion = DEFAULT_COMPARABLE_RESOURCE_VERSION;

private Builder(Class<R> resourceClass, Class<? extends HasMetadata> primaryResourceClass) {
this(resourceClass, primaryResourceClass, null);
Expand Down Expand Up @@ -296,7 +287,13 @@ public Builder<R> withFieldSelector(FieldSelector fieldSelector) {
}

public Builder<R> withComparableResourceVersion(boolean comparableResourceVersion) {
this.comparableResourceVersion = comparableResourceVersion;
config.withComparableResourceVersions(comparableResourceVersion);
return this;
}

public Builder<R> withGhostResourceCacheCheckInterval(
Duration ghostResourceCacheCheckInterval) {
config.withGhostResourceCacheCheckInterval(ghostResourceCacheCheckInterval);
return this;
}

Expand Down Expand Up @@ -339,10 +336,7 @@ public InformerEventSourceConfiguration<R> build() {
HasMetadata.getKind(primaryResourceClass),
false)),
config.build(),
kubernetesClient,
comparableResourceVersion);
kubernetesClient);
}
}

boolean comparableResourceVersion();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.api.reconciler;

import java.time.Duration;
import java.util.Collections;
import java.util.Set;

Expand Down Expand Up @@ -43,5 +44,9 @@ public final class Constants {
public static final boolean DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES = true;
public static final boolean DEFAULT_COMPARABLE_RESOURCE_VERSION = true;

public static final long DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS = 3 * 60 * 1000;
public static final Duration DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL =
Duration.ofMillis(DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS);

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ public class ControllerEventSource<T extends HasMetadata>

@SuppressWarnings({"unchecked", "rawtypes"})
public ControllerEventSource(Controller<T> controller) {
super(
NAME,
controller.getCRClient(),
controller.getConfiguration(),
controller.getConfiguration().getInformerConfig().isComparableResourceVersions());
super(NAME, controller.getCRClient(), controller.getConfiguration());
this.controller = controller;

final var config = controller.getConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;

/**
* Wraps informer(s) so they are connected to the eventing system of the framework. Note that since
* this is built on top of Fabric8 client Informers, it also supports caching resources using
Expand All @@ -58,29 +56,18 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>

public InformerEventSource(
InformerEventSourceConfiguration<R> configuration, EventSourceContext<P> context) {
this(
configuration,
configuration.getKubernetesClient().orElse(context.getClient()),
configuration.comparableResourceVersion());
}

InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
this(configuration, client, DEFAULT_COMPARABLE_RESOURCE_VERSION);
this(configuration, configuration.getKubernetesClient().orElse(context.getClient()));
}

@SuppressWarnings({"unchecked", "rawtypes"})
private InformerEventSource(
InformerEventSourceConfiguration<R> configuration,
KubernetesClient client,
boolean comparableResourceVersions) {
InformerEventSource(InformerEventSourceConfiguration<R> configuration, KubernetesClient client) {
super(
configuration.name(),
configuration
.getGroupVersionKind()
.map(gvk -> client.genericKubernetesResources(gvk.apiVersion(), gvk.getKind()))
.orElseGet(() -> (MixedOperation) client.resources(configuration.getResourceClass())),
configuration,
comparableResourceVersions);
configuration);
// If there is a primary to secondary mapper there is no need for primary to secondary index.
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
if (useSecondaryToPrimaryIndex()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,27 @@ private boolean isWatchingAllNamespaces() {
return sources.containsKey(WATCH_ALL_NAMESPACES);
}

public boolean isWatchingNamespace(String namespace) {
// for cluster scoped resources we can assume
// that we watch the whole cluster
if (namespace == null) {
return true;
}
if (isWatchingAllNamespaces()) {
return true;
}
return sources.containsKey(namespace);
}

private Optional<InformerWrapper<R>> getSource(String namespace) {
namespace = isWatchingAllNamespaces() || namespace == null ? WATCH_ALL_NAMESPACES : namespace;
return Optional.ofNullable(sources.get(namespace));
}

String lastSyncResourceVersion(String namespace) {
return getSource(namespace).orElseThrow().getLastSyncResourceVersion();
}

@Override
public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
this.indexers.putAll(indexers);
Expand Down
Loading
Loading