Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,12 @@ public synchronized void removeEventListener(StoreEventListener<K, V> eventListe
}
delegate.removeEventListener(eventListener);
}

@Override
public void listenerModified() {
delegate.listenerModified();
}

@Override
public void addEventFilter(StoreEventFilter<K, V> eventFilter) {
delegate.addEventFilter(eventFilter);
Expand Down
10 changes: 9 additions & 1 deletion ehcache-api/src/main/java/org/ehcache/event/EventType.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.ehcache.event;

import java.util.EnumSet;
import java.util.Set;

/**
* The different event types.
*/
Expand Down Expand Up @@ -45,6 +48,11 @@ public enum EventType {
/**
* Represents an existing {@link org.ehcache.Cache.Entry cache entry} being updated for a given key
*/
UPDATED,
UPDATED;

private static final Set<EventType> ALL_EVENT_TYPES = EnumSet.allOf(EventType.class);

public static Set<EventType> allAsSet() {
return ALL_EVENT_TYPES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.ehcache.event.EventType;

import java.util.EnumSet;
import java.util.Set;

/**
* Internal wrapper for {@link CacheEventListener} and their configuration.
Expand Down Expand Up @@ -87,8 +88,8 @@ public void onEvent(CacheEvent<? extends K, ? extends V> event) {
return listener;
}

public boolean isForEventType(EventType type) {
return forEvents.contains(type);
public Set<EventType> getEventTypes() {
return forEvents;
}

public boolean isOrdered() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public void removeEventListener(StoreEventListener<K, V> eventListener) {
// Do nothing
}

@Override
public void listenerModified() {
// Do nothing
}

@Override
public void addEventFilter(StoreEventFilter<K, V> eventFilter) {
// Do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.ehcache.core.spi.store.events;

import java.util.Set;

import org.ehcache.core.events.StoreEventDispatcher;
import org.ehcache.event.EventType;

/**
* Interface used to register on a {@link StoreEventSource} to get notified of events happening to mappings the
Expand All @@ -37,4 +40,15 @@ public interface StoreEventListener<K, V> {
* @param event the actual {@link StoreEvent}
*/
void onEvent(StoreEvent<K, V> event);

/**
* Specify which Events this Listener is handling.
* <p>
* Defaults return is all values of {@link EventType}
*
* @return Set of the {@link EventType} this listener handles.
*/
default Set<EventType> getEventTypes() {
return EventType.allAsSet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@ public interface StoreEventSource<K, V> {
* @return {@code true} if ordering is on, {@code false} otherwise
*/
boolean isEventOrdering();

/**
* Indicates that a listener was modified
*/
void listenerModified();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,22 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Stream;

import static java.util.Collections.unmodifiableMap;
import static java.util.EnumSet.allOf;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Stream.concat;

/**
* Per-cache component that manages cache event listener registrations, and provides event delivery based on desired
Expand All @@ -56,10 +67,13 @@ public class CacheEventDispatcherImpl<K, V> implements CacheEventDispatcher<K, V
private static final Logger LOGGER = LoggerFactory.getLogger(CacheEventDispatcherImpl.class);
private final ExecutorService unOrderedExectuor;
private final ExecutorService orderedExecutor;
private int listenersCount = 0;
private int orderedListenerCount = 0;
private final List<EventListenerWrapper<K, V>> syncListenersList = new CopyOnWriteArrayList<>();
private final List<EventListenerWrapper<K, V>> aSyncListenersList = new CopyOnWriteArrayList<>();

private final Map<EventType, List<EventListenerWrapper<K, V>>> syncListenersList = unmodifiableMap(allOf(EventType.class).stream()
.collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class))));
private final Map<EventType, List<EventListenerWrapper<K, V>>> asyncListenersList = unmodifiableMap(allOf(EventType.class).stream()
.collect(toMap(identity(), t -> new CopyOnWriteArrayList<>(), (a, b) -> { throw new AssertionError(); }, () -> new EnumMap<>(EventType.class))));
private final Set<EventType> registeredEventTypes = EnumSet.noneOf(EventType.class);

private final StoreEventListener<K, V> eventListener = new StoreListener();

private volatile Cache<K, V> listenerSource;
Expand Down Expand Up @@ -95,69 +109,91 @@ public void registerCacheEventListener(CacheEventListener<? super K, ? super V>
* @param wrapper the listener wrapper to register
*/
private synchronized void registerCacheEventListener(EventListenerWrapper<K, V> wrapper) {
if(aSyncListenersList.contains(wrapper) || syncListenersList.contains(wrapper)) {

if(allListeners().anyMatch(wrapper::equals)) {
throw new IllegalStateException("Cache Event Listener already registered: " + wrapper.getListener());
}

if (wrapper.isOrdered() && orderedListenerCount++ == 0) {
boolean firstListener = !allListeners().findAny().isPresent();

if (wrapper.isOrdered() && (firstListener || allListeners().noneMatch(EventListenerWrapper::isOrdered))) {
storeEventSource.setEventOrdering(true);
}

registeredEventTypes.addAll(wrapper.getEventTypes()); // add EventType of new wrapper to list of relevant EntryTypes

switch (wrapper.getFiringMode()) {
case ASYNCHRONOUS:
aSyncListenersList.add(wrapper);
wrapper.getEventTypes().forEach(type -> asyncListenersList.get(type).add(wrapper));
break;
case SYNCHRONOUS:
if (syncListenersList.isEmpty()) {
if (!syncListeners().findAny().isPresent()) {
storeEventSource.setSynchronous(true);
}
syncListenersList.add(wrapper);
wrapper.getEventTypes().forEach(type -> syncListenersList.get(type).add(wrapper));
break;
default:
throw new AssertionError("Unhandled EventFiring value: " + wrapper.getFiringMode());
}

if (listenersCount++ == 0) {
if (firstListener) {
storeEventSource.addEventListener(eventListener);
} else {
storeEventSource.listenerModified();
}
}

private Stream<EventListenerWrapper<K, V>> allListeners() {
return concat(asyncListeners(), syncListeners());
}

private Stream<EventListenerWrapper<K, V>> syncListeners() {
return syncListenersList.values().stream().flatMap(Collection::stream);
}

private Stream<EventListenerWrapper<K, V>> asyncListeners() {
return asyncListenersList.values().stream().flatMap(Collection::stream);
}

/**
* {@inheritDoc}
*/
@Override
public void deregisterCacheEventListener(CacheEventListener<? super K, ? super V> listener) {
public synchronized void deregisterCacheEventListener(CacheEventListener<? super K, ? super V> listener) {
EventListenerWrapper<K, V> wrapper = new EventListenerWrapper<>(listener);

if (!removeWrapperFromList(wrapper, aSyncListenersList)) {
if (!removeWrapperFromList(wrapper, syncListenersList)) {
throw new IllegalStateException("Unknown cache event listener: " + listener);
}
boolean removed = Stream.of(asyncListenersList, syncListenersList)
.flatMap(list -> list.values().stream())
.map(list -> list.remove(wrapper))
.reduce((a, b) -> a || b).orElse(false);

if (!removed) {
throw new IllegalStateException("Unknown cache event listener: " + listener);
}
}

/**
* Synchronized to make sure listener removal is atomic
*
* @param wrapper the listener wrapper to unregister
* @param listenersList the listener list to remove from
*/
private synchronized boolean removeWrapperFromList(EventListenerWrapper<K, V> wrapper, List<EventListenerWrapper<K, V>> listenersList) {
int index = listenersList.indexOf(wrapper);
if (index != -1) {
EventListenerWrapper<K, V> containedWrapper = listenersList.remove(index);
if(containedWrapper.isOrdered() && --orderedListenerCount == 0) {
refreshRegisteredEventTypes();

if (!allListeners().findAny().isPresent()) {
storeEventSource.setSynchronous(false);
storeEventSource.setEventOrdering(false);
storeEventSource.removeEventListener(eventListener);
} else {
if (allListeners().noneMatch(EventListenerWrapper::isOrdered)) {
storeEventSource.setEventOrdering(false);
}
if (--listenersCount == 0) {
storeEventSource.removeEventListener(eventListener);
}
if (syncListenersList.isEmpty()) {
if (!syncListeners().findAny().isPresent()) {
storeEventSource.setSynchronous(false);
}
return true;
storeEventSource.listenerModified();
}
return false;
}

private void refreshRegisteredEventTypes() {
// collect all registered EventTypes
EnumSet<EventType> newRegisteredEventTypes = EnumSet.noneOf(EventType.class);
allListeners().forEach(listener -> newRegisteredEventTypes.addAll(listener.getEventTypes()));
// drop irrelevant EventTypes
registeredEventTypes.retainAll(newRegisteredEventTypes);
}

/**
Expand All @@ -168,8 +204,8 @@ public synchronized void shutdown() {
storeEventSource.removeEventListener(eventListener);
storeEventSource.setEventOrdering(false);
storeEventSource.setSynchronous(false);
syncListenersList.clear();
aSyncListenersList.clear();
syncListenersList.values().forEach(Collection::clear);
asyncListenersList.values().forEach(Collection::clear);
unOrderedExectuor.shutdown();
orderedExecutor.shutdown();
}
Expand All @@ -183,21 +219,30 @@ public synchronized void setListenerSource(Cache<K, V> source) {
}

void onEvent(CacheEvent<K, V> event) {
ExecutorService executor;
List<EventListenerWrapper<K, V>> asyncTargets = asyncListenersList.get(event.getType());
List<EventListenerWrapper<K, V>> syncTargets = syncListenersList.get(event.getType());
if (storeEventSource.isEventOrdering()) {
executor = orderedExecutor;
if (!asyncTargets.isEmpty()) {
orderedExecutor.submit(new EventDispatchTask<>(event, asyncTargets));
}
if (!syncTargets.isEmpty()) {
Future<?> future = orderedExecutor.submit(new EventDispatchTask<>(event, syncTargets));
try {
future.get();
} catch (Exception e) {
LOGGER.error("Exception received as result from synchronous listeners", e);
}
}
} else {
executor = unOrderedExectuor;
}
if (!aSyncListenersList.isEmpty()) {
executor.submit(new EventDispatchTask<>(event, aSyncListenersList));
}
if (!syncListenersList.isEmpty()) {
Future<?> future = executor.submit(new EventDispatchTask<>(event, syncListenersList));
try {
future.get();
} catch (Exception e) {
LOGGER.error("Exception received as result from synchronous listeners", e);
if (!asyncTargets.isEmpty()) {
unOrderedExectuor.submit(new EventDispatchTask<>(event, asyncTargets));
}
if (!syncTargets.isEmpty()) {
try {
new EventDispatchTask<>(event, syncTargets).run();
} catch (Exception e) {
LOGGER.error("Exception received as result from synchronous listeners", e);
}
}
}
}
Expand Down Expand Up @@ -244,8 +289,12 @@ public void onEvent(StoreEvent<K, V> event) {
throw new AssertionError("Unexpected StoreEvent value: " + event.getType());
}
}
}

@Override
public Set<EventType> getEventTypes() {
return registeredEventTypes;
}
}
/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ class EventDispatchTask<K, V> implements Runnable {
@Override
public void run() {
for(EventListenerWrapper<K, V> listenerWrapper : listenerWrappers) {
if (listenerWrapper.isForEventType(cacheEvent.getType())) {
try {
listenerWrapper.onEvent(cacheEvent);
} catch (Exception e) {
LOGGER.warn(listenerWrapper.getListener() + " Failed to fire Event due to ", e);
}
try {
listenerWrapper.onEvent(cacheEvent);
} catch (Exception e) {
LOGGER.warn(listenerWrapper.getListener() + " Failed to fire Event due to ", e);
}
}
}
Expand Down
Loading