Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
Expand Down Expand Up @@ -84,28 +82,16 @@ public CdcEventsIgniteApplier(IgniteEx ignite, int maxBatchSize, IgniteLogger lo

/** {@inheritDoc} */
@Override protected KeyCacheObject toKey(CdcEvent evt) {
Object key = evt.key();

if (key instanceof KeyCacheObject)
return (KeyCacheObject)key;
else
return new KeyCacheObjectImpl(key, null, evt.partition());
return evt.keyCacheObject();
}

/** {@inheritDoc} */
@Override protected GridCacheDrInfo toValue(int cacheId, CdcEvent evt, GridCacheVersion ver) {
CacheObject cacheObj;

Object val = evt.value();

if (val instanceof CacheObject)
cacheObj = (CacheObject)val;
else
cacheObj = new CacheObjectImpl(val, null);
CacheObject val = evt.valueCacheObject();

return evt.expireTime() != EXPIRE_TIME_ETERNAL ?
new GridCacheDrExpirationInfo(cacheObj, ver, TTL_ETERNAL, evt.expireTime()) :
new GridCacheDrInfo(cacheObj, ver);
new GridCacheDrExpirationInfo(val, ver, TTL_ETERNAL, evt.expireTime()) :
new GridCacheDrInfo(val, ver);
}

/** @return Cache. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

package org.apache.ignite.cdc.conflictresolve;

import java.util.Objects;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
Expand Down Expand Up @@ -88,11 +94,12 @@ public CacheVersionConflictResolverImpl(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry,
Object prevStateMeta,
boolean atomicVerComparator
) {
GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);

boolean useNew = isUseNew(ctx, oldEntry, newEntry);
boolean useNew = isUseNew(ctx, oldEntry, newEntry, prevStateMeta);

if (log.isDebugEnabled())
debugResolve(ctx, useNew, oldEntry, newEntry);
Expand All @@ -117,7 +124,8 @@ public CacheVersionConflictResolverImpl(
protected <K, V> boolean isUseNew(
CacheObjectValueContext ctx,
GridCacheVersionedEntryEx<K, V> oldEntry,
GridCacheVersionedEntryEx<K, V> newEntry
GridCacheVersionedEntryEx<K, V> newEntry,
Object prevStateMeta
) {
if (newEntry.dataCenterId() == clusterId) // Update made on the local cluster always win.
return true;
Expand All @@ -139,8 +147,8 @@ protected <K, V> boolean isUseNew(
}

if (conflictResolveFieldEnabled) {
Object oldVal = oldEntry.value(ctx);
Object newVal = newEntry.value(ctx);
Object oldVal = oldEntry.value(ctx);

if (oldVal != null && newVal != null) {
try {
Expand All @@ -153,6 +161,17 @@ protected <K, V> boolean isUseNew(
);
}
}

Object field = oldVal != null ? value(oldVal) : null;

if (Objects.equals(field, prevStateMeta)) // Previous value synchronized.
return true;
}
else {
GridCacheVersion oldVer = oldEntry.value(ctx) != null ? oldEntry.version() : null; // TODO null value version (entry vs row)

if (Objects.equals(oldVer, prevStateMeta)) // Previous value synchronized.
return true;
}

log.error("Conflict can't be resolved, " + (newEntry.value(ctx) == null ? "remove" : "update") + " ignored " +
Expand All @@ -162,6 +181,30 @@ protected <K, V> boolean isUseNew(
return false;
}

/**
* {@inheritDoc}
*/
@Override public Object previousStateMetadata(GridCacheEntryEx entry) {
if (conflictResolveFieldEnabled) {
CacheObjectValueContext ctx = entry.context().cacheObjectContext();
CacheObject val = entry.rawGet();

return val != null ?
value(CacheObjectUtils.unwrapBinaryIfNeeded(ctx, val, true, true, null)) :
null;
}
else {
try {
GridCacheVersion ver = entry.version();

return ver != null ? ver.conflictVersion() : null;
}
catch (GridCacheEntryRemovedException e) { // TODO
throw new RuntimeException(e);
}
}
}

/** @return Conflict resolve field value. */
protected Comparable value(Object val) {
return (val instanceof BinaryObject)
Expand Down
Loading