Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.GenericResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException;
import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*;
Expand Down Expand Up @@ -84,7 +84,7 @@ protected synchronized void handleEvent(
try {
if (log.isDebugEnabled()) {
log.debug("Event received with action: {}", action);
log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource);
log.debug("Event Old resource: {},\n new resource: {}", oldResource, resource);
}
MDCUtils.addResourceInfo(resource);
controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource);
Expand Down Expand Up @@ -141,11 +141,22 @@ private void handleOnAddOrUpdate(
ResourceAction action, T oldCustomResource, T newCustomResource) {
var handling =
temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource);
if (handling == EventHandling.NEW) {
handleEvent(action, newCustomResource, oldCustomResource, null);
} else if (log.isDebugEnabled()) {
log.debug("{} event propagation for action: {}", handling, action);
}
handling.ifPresentOrElse(
this::handleEvent,
() -> {
if (log.isDebugEnabled()) {
log.debug("{} event propagation for action: {}", handling, action);
}
});
}

@SuppressWarnings("unchecked")
private void handleEvent(GenericResourceEvent r) {
handleEvent(
r.getAction(),
(T) r.getResource().orElseThrow(),
(T) r.getPreviousResource().orElse(null),
r.getLastStateUnknow());
}

@Override
Expand All @@ -154,10 +165,10 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown)
resource,
ResourceAction.DELETED,
() -> {
temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
var res = temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown);
// delete event is quite special here, that requires special care, since we clean up
// caches on delete event.
handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown);
res.ifPresent(this::handleEvent);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.ReconcilerUtilsInternal;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;

class EventFilterDetails {

private int activeUpdates = 0;
private ResourceEvent lastEvent;
private String lastOwnUpdatedResourceVersion;
private final List<GenericResourceEvent> relatedEvents = new ArrayList<>(5);
private final Set<String> allOwnResourceVersions = new HashSet<>(5);

public void increaseActiveUpdates() {
activeUpdates = activeUpdates + 1;
Expand All @@ -37,36 +42,70 @@ public void increaseActiveUpdates() {
* controller to prevent race condition and send event from {@link
* ManagedInformerEventSource#eventFilteringUpdateAndCacheResource(HasMetadata, UnaryOperator)}
*/
public boolean decreaseActiveUpdates(String updatedResourceVersion) {
if (updatedResourceVersion != null
&& (lastOwnUpdatedResourceVersion == null
|| ReconcilerUtilsInternal.compareResourceVersions(
updatedResourceVersion, lastOwnUpdatedResourceVersion)
> 0)) {
lastOwnUpdatedResourceVersion = updatedResourceVersion;
}

public boolean decreaseActiveUpdates() {
activeUpdates = activeUpdates - 1;
return activeUpdates == 0;
}

public void setLastEvent(ResourceEvent event) {
lastEvent = event;
public int getActiveUpdates() {
return activeUpdates;
}

public boolean isNoActiveUpdate() {
return activeUpdates == 0;
}

void addToOwnResourceVersions(String updateVersion) {
allOwnResourceVersions.add(updateVersion);
}

public void addRelatedEvent(GenericResourceEvent event) {
relatedEvents.add(event);
}

public Optional<ResourceEvent> getLatestEventAfterLastUpdateEvent() {
if (lastEvent != null
&& (lastOwnUpdatedResourceVersion == null
|| ReconcilerUtilsInternal.compareResourceVersions(
lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(),
lastOwnUpdatedResourceVersion)
> 0)) {
return Optional.of(lastEvent);
public Optional<GenericResourceEvent> prepareSummaryEventIfNotOwnEventsPresent() {
if (relatedEvents.isEmpty()) {
return Optional.empty();
}
if (allOwnResourceVersions.containsAll(relatedEventResourceVersions())) {
return Optional.empty();
}
var deleteEvent =
relatedEvents.stream().filter(e -> e.getAction() == ResourceAction.DELETED).findFirst();
if (deleteEvent.isPresent()) {
return deleteEvent;
}
return Optional.empty();
if (relatedEvents.size() == 1) {
return Optional.of(relatedEvents.get(0));
}
var firstEvent = relatedEvents.get(0);
var firstResource =
firstEvent.getPreviousResource().orElseGet(() -> firstEvent.getResource().orElseThrow());

return Optional.of(
new GenericResourceEvent(
ResourceAction.UPDATED,
relatedEvents.get(relatedEvents.size() - 1).getResource().orElseThrow(),
firstResource,
null));
}

public int getActiveUpdates() {
return activeUpdates;
private Set<String> relatedEventResourceVersions() {
return relatedEvents.stream()
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
.collect(Collectors.toSet());
}

public boolean newerOrEqualEventReceivedForOwnLastUpdate() {
if (allOwnResourceVersions.isEmpty()) {
return true;
}
String lastOwn =
allOwnResourceVersions.stream()
.reduce((a, b) -> ReconcilerUtilsInternal.compareResourceVersions(a, b) >= 0 ? a : b)
.orElseThrow();
return relatedEvents.stream()
.map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion())
.anyMatch(rv -> ReconcilerUtilsInternal.compareResourceVersions(rv, lastOwn) >= 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,32 @@
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;

/** Used only for resource event filtering. */
public class ExtendedResourceEvent extends ResourceEvent {
public class GenericResourceEvent extends ResourceEvent {

private final HasMetadata previousResource;
private final Boolean lastStateUnknow;

public ExtendedResourceEvent(
public GenericResourceEvent(
ResourceAction action,
ResourceID resourceID,
HasMetadata latestResource,
HasMetadata previousResource) {
super(action, resourceID, latestResource);
HasMetadata previousResource,
Boolean lastStateUnknow) {
super(action, ResourceID.fromResource(latestResource), latestResource);
this.previousResource = previousResource;
this.lastStateUnknow = lastStateUnknow;
}

public Optional<HasMetadata> getPreviousResource() {
return Optional.ofNullable(previousResource);
}

public Boolean getLastStateUnknow() {
return lastStateUnknow;
}

@Override
public String toString() {
return "ExtendedResourceEvent{"
return "GenericResourceEvent{"
+ getPreviousResource()
.map(r -> "previousResourceVersion=" + r.getMetadata().getResourceVersion())
.orElse("")
Expand All @@ -61,7 +67,7 @@ public String toString() {
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ExtendedResourceEvent that = (ExtendedResourceEvent) o;
GenericResourceEvent that = (GenericResourceEvent) o;
return Objects.equals(previousResource, that.previousResource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling;

/**
* Wraps informer(s) so they are connected to the eventing system of the framework. Note that since
Expand Down Expand Up @@ -154,20 +153,24 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol

var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject);

if (eventHandling != EventHandling.NEW) {
log.debug(
"{} event propagation", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping");
if (eventHandling.isEmpty()) {
log.debug("Deferring event propagation");
} else if (eventAcceptedByFilter(action, newObject, oldObject)) {
log.debug(
"Propagating event for {}, resource with same version not result of a reconciliation.",
"Propagating event for {}, resource with same version not result of a our update.",
action);
propagateEvent(newObject);
var event = eventHandling.get();
handleEvent(
event.getAction(),
(R) event.getResource().orElseThrow(),
(R) event.getPreviousResource().orElse(null),
event.getLastStateUnknow());
} else {
log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID);
}
}

private void propagateEvent(R object) {
protected void propagateEvent(R object) {
var primaryResourceIdSet =
configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object);
if (primaryResourceIdSet.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.*;
import io.javaoperatorsdk.operator.processing.event.source.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent;

@SuppressWarnings("rawtypes")
public abstract class ManagedInformerEventSource<
Expand Down Expand Up @@ -101,45 +100,19 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
try {
temporaryResourceCache.startEventFilteringModify(id);
updatedResource = updateMethod.apply(resourceToUpdate);
log.debug("Resource update successful");
handleRecentResourceUpdate(id, updatedResource, resourceToUpdate);
log.debug("Caching resource update successful");
return updatedResource;
} finally {
var res =
temporaryResourceCache.doneEventFilterModify(
id,
updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion());
var updatedForLambda = updatedResource;
var res = temporaryResourceCache.doneEventFilterModify(id);
res.ifPresentOrElse(
r -> {
R latestResource = (R) r.getResource().orElseThrow();
// as previous resource version we use the one from successful update, since
// we process new event here only if that is more recent then the event from our update.
// Note that this is equivalent with the scenario when an informer watch connection
// would reconnect and loose some events in between.
// If that update was not successful we still record the previous version from the
// actual event in the ExtendedResourceEvent.
R extendedResourcePrevVersion =
(r instanceof ExtendedResourceEvent)
? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null)
: null;
R prevVersionOfResource =
updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion;
if (log.isDebugEnabled()) {
log.debug(
"Previous resource version: {} resource from update present: {}"
+ " extendedPrevResource present: {}",
prevVersionOfResource.getMetadata().getResourceVersion(),
updatedForLambda != null,
extendedResourcePrevVersion != null);
}
log.debug("Propagating not own event");
handleEvent(
r.getAction(),
latestResource,
prevVersionOfResource,
(r instanceof ResourceDeleteEvent)
? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown()
: null);
(R) r.getResource().orElseThrow(),
(R) r.getPreviousResource().orElse(null),
r.getLastStateUnknow());
},
() -> log.debug("No new event present after the filtering update"));
}
Expand Down
Loading
Loading