diff --git a/lib/src/main/java/io/ably/lib/objects/Adapter.java b/lib/src/main/java/io/ably/lib/objects/Adapter.java index e9a084ae7..76c35cc37 100644 --- a/lib/src/main/java/io/ably/lib/objects/Adapter.java +++ b/lib/src/main/java/io/ably/lib/objects/Adapter.java @@ -2,7 +2,7 @@ import io.ably.lib.realtime.AblyRealtime; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.transport.ConnectionManager; +import io.ably.lib.realtime.Connection; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; @@ -23,8 +23,8 @@ public Adapter(@NotNull AblyRealtime ably) { } @Override - public @NotNull ConnectionManager getConnectionManager() { - return ably.connection.connectionManager; + public @NotNull Connection getConnection() { + return ably.connection; } @Override diff --git a/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java b/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java index 21262942a..b6054e71a 100644 --- a/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java +++ b/lib/src/main/java/io/ably/lib/objects/ObjectsAdapter.java @@ -1,7 +1,7 @@ package io.ably.lib.objects; import io.ably.lib.realtime.ChannelBase; -import io.ably.lib.transport.ConnectionManager; +import io.ably.lib.realtime.Connection; import io.ably.lib.types.AblyException; import io.ably.lib.types.ClientOptions; import org.jetbrains.annotations.Blocking; @@ -18,13 +18,13 @@ public interface ObjectsAdapter { @NotNull ClientOptions getClientOptions(); /** - * Retrieves the connection manager for handling connection state and operations. + * Retrieves the connection instance for handling connection state and operations. * Used to check connection status, obtain error information, and manage * message transmission across the Ably connection. * - * @return the connection manager instance + * @return the connection instance */ - @NotNull ConnectionManager getConnectionManager(); + @NotNull Connection getConnection(); /** * Retrieves the current time in milliseconds from the Ably server. diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 3fab010cb..94ca2b822 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -104,6 +104,7 @@ public class ConnectionManager implements ConnectListener { * This field is initialized only if the LiveObjects plugin is present in the classpath. */ private final LiveObjectsPlugin liveObjectsPlugin; + public Long objectsGCGracePeriod = null; /** * Methods on the channels map owned by the {@link AblyRealtime} instance @@ -1319,6 +1320,7 @@ private synchronized void onConnected(ProtocolMessage message) { connectionStateTtl = connectionDetails.connectionStateTtl; maxMessageSize = connectionDetails.maxMessageSize; siteCode = connectionDetails.siteCode; // CD2j + objectsGCGracePeriod = connectionDetails.objectsGCGracePeriod; /* set the clientId resolved from token, if any */ String clientId = connectionDetails.clientId; diff --git a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java index 587b9241f..6a557a12a 100644 --- a/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java +++ b/lib/src/main/java/io/ably/lib/types/ConnectionDetails.java @@ -81,6 +81,11 @@ public class ConnectionDetails { */ public String siteCode; + /** + * The duration in milliseconds used to retain tombstoned objects at client side. + */ + public Long objectsGCGracePeriod; + ConnectionDetails() { maxIdleInterval = Defaults.maxIdleInterval; connectionStateTtl = Defaults.connectionStateTtl; @@ -124,6 +129,9 @@ ConnectionDetails readMsgpack(MessageUnpacker unpacker) throws IOException { case "siteCode": siteCode = unpacker.unpackString(); break; + case "objectsGCGracePeriod": + objectsGCGracePeriod = unpacker.unpackLong(); + break; default: Log.v(TAG, "Unexpected field: " + fieldName); unpacker.skipValue(); diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java index a64f96daa..5350f0515 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelTest.java @@ -2617,6 +2617,29 @@ public void connect_should_not_rewrite_immediate_attach() throws AblyException { } } + @Test + public void channel_get_objects_throws_exception() throws AblyException { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + try (AblyRealtime ably = new AblyRealtime(opts)) { + + /* wait until connected */ + new ConnectionWaiter(ably.connection).waitFor(ConnectionState.connected); + assertEquals("Verify connected state reached", ably.connection.state, ConnectionState.connected); + + /* create a channel and attach */ + final Channel channel = ably.channels.get("channel"); + channel.attach(); + new ChannelWaiter(channel).waitFor(ChannelState.attached); + assertEquals("Verify attached state reached", channel.state, ChannelState.attached); + + AblyException exception = assertThrows(AblyException.class, channel::getObjects); + assertNotNull(exception); + assertEquals(40019, exception.errorInfo.code); + assertEquals(400, exception.errorInfo.statusCode); + assertTrue(exception.errorInfo.message.contains("LiveObjects plugin hasn't been installed")); + } + } + static class DetachingProtocolListener implements DebugOptions.RawProtocolListener { public Channel theChannel; diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 6a855868c..683971510 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -3,6 +3,8 @@ package io.ably.lib.objects import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.CompletionListener import io.ably.lib.types.Callback +import io.ably.lib.realtime.ConnectionEvent +import io.ably.lib.realtime.ConnectionStateListener import io.ably.lib.types.ChannelMode import io.ably.lib.types.ErrorInfo import io.ably.lib.types.ProtocolMessage @@ -12,6 +14,8 @@ import kotlinx.coroutines.suspendCancellableCoroutine import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException +internal val ObjectsAdapter.connectionManager get() = connection.connectionManager + /** * Spec: RTO15g */ @@ -47,6 +51,16 @@ internal suspend fun ObjectsAdapter.attachAsync(channelName: String) = suspendCa } } +internal fun ObjectsAdapter.onGCGracePeriodUpdated(block : (Long?) -> Unit) : ObjectsSubscription { + connectionManager.objectsGCGracePeriod?.let { block(it) } + // Return new objectsGCGracePeriod whenever connection state changes to connected + val listener: (_: ConnectionStateListener.ConnectionStateChange) -> Unit = { + block(connectionManager.objectsGCGracePeriod) + } + connection.on(ConnectionEvent.connected, listener) + return ObjectsSubscription { connection.off(listener) } +} + /** * Retrieves the channel modes for a specific channel. * This method returns the modes that are set for the specified channel. diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt index e3ba0649d..7f3e9b372 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt @@ -20,6 +20,7 @@ internal enum class ObjectOperationAction(val code: Int) { CounterCreate(3), CounterInc(4), ObjectDelete(5), + MapClear(6), Unknown(-1); // code for unknown value during deserialization } @@ -109,6 +110,13 @@ internal data class CounterInc( */ internal object ObjectDelete +/** + * Payload for MAP_CLEAR operation. + * Spec: MCL* + * No fields - action is sufficient + */ +internal object MapClear + /** * Payload for MAP_CREATE_WITH_OBJECT_ID operation. * Spec: MCRO* @@ -176,7 +184,13 @@ internal data class ObjectsMap( * The map entries, indexed by key. * Spec: OMP3b */ - val entries: Map? = null + val entries: Map? = null, + + /** + * The serial value of the last MAP_CLEAR operation applied to the map. + * Spec: OMP3c + */ + val clearTimeserial: String? = null, ) /** @@ -255,6 +269,12 @@ internal data class ObjectOperation( * Spec: OOP3q */ val counterCreateWithObjectId: CounterCreateWithObjectId? = null, + + /** + * Payload for MAP_CLEAR operation. + * Spec: OOP3r + */ + val mapClear: MapClear? = null, ) /** diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt index 28ee839e0..224cd606f 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt @@ -14,6 +14,9 @@ import java.util.concurrent.ConcurrentHashMap internal object ObjectsPoolDefaults { const val GC_INTERVAL_MS = 1000L * 60 * 5 // 5 minutes /** + * The SDK will attempt to use the `objectsGCGracePeriod` value provided by the server in the `connectionDetails` + * object of the `CONNECTED` event. + * If the server does not provide this value, the SDK will fall back to this default value. * Must be > 2 minutes to ensure we keep tombstones long enough to avoid the possibility of receiving an operation * with an earlier serial that would not have been applied if the tombstone still existed. * @@ -49,10 +52,19 @@ internal class ObjectsPool( private val gcScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) private var gcJob: Job // Job for the garbage collection coroutine + @Volatile private var gcGracePeriod = ObjectsPoolDefaults.GC_GRACE_PERIOD_MS + private var gcPeriodSubscription: ObjectsSubscription + init { // RTO3b - Initialize pool with root object pool[ROOT_OBJECT_ID] = DefaultLiveMap.zeroValue(ROOT_OBJECT_ID, realtimeObjects) - // Start garbage collection coroutine + // Start garbage collection coroutine with server-provided grace period if available + gcPeriodSubscription = realtimeObjects.adapter.onGCGracePeriodUpdated { period -> + period?.let { + gcGracePeriod = it + Log.i(tag, "Using objectsGCGracePeriod from server: $gcGracePeriod ms") + } ?: Log.i(tag, "Server did not provide objectsGCGracePeriod, using default: $gcGracePeriod ms") + } gcJob = startGCJob() } @@ -123,9 +135,9 @@ internal class ObjectsPool( */ private fun onGCInterval() { pool.entries.removeIf { (_, obj) -> - if (obj.isEligibleForGc()) { true } // Remove from pool + if (obj.isEligibleForGc(gcGracePeriod)) { true } // Remove from pool else { - obj.onGCInterval() + obj.onGCInterval(gcGracePeriod) false // Keep in pool } } @@ -152,6 +164,7 @@ internal class ObjectsPool( * Should be called when the pool is no longer needed. */ fun dispose() { + gcPeriodSubscription.unsubscribe() gcJob.cancel() gcScope.cancel() pool.clear() diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt index 1980f3c93..2eb10d0bd 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/serialization/MsgpackSerialization.kt @@ -12,6 +12,7 @@ import io.ably.lib.objects.MapCreate import io.ably.lib.objects.MapCreateWithObjectId import io.ably.lib.objects.MapRemove import io.ably.lib.objects.MapSet +import io.ably.lib.objects.MapClear import io.ably.lib.objects.ObjectDelete import io.ably.lib.objects.ObjectsMapSemantics import io.ably.lib.objects.ObjectsCounter @@ -174,6 +175,7 @@ private fun ObjectOperation.writeMsgpack(packer: MessagePacker) { if (objectDelete != null) fieldCount++ if (mapCreateWithObjectId != null) fieldCount++ if (counterCreateWithObjectId != null) fieldCount++ + if (mapClear != null) fieldCount++ packer.packMapHeader(fieldCount) @@ -224,6 +226,11 @@ private fun ObjectOperation.writeMsgpack(packer: MessagePacker) { counterCreateWithObjectId.writeMsgpack(packer) } + if (mapClear != null) { + packer.packString("mapClear") + packer.packMapHeader(0) // empty map, no fields + } + } /** @@ -242,6 +249,7 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation { var objectDelete: ObjectDelete? = null var mapCreateWithObjectId: MapCreateWithObjectId? = null var counterCreateWithObjectId: CounterCreateWithObjectId? = null + var mapClear: MapClear? = null for (i in 0 until fieldCount) { val fieldName = unpacker.unpackString().intern() @@ -271,6 +279,10 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation { } "mapCreateWithObjectId" -> mapCreateWithObjectId = readMapCreateWithObjectId(unpacker) "counterCreateWithObjectId" -> counterCreateWithObjectId = readCounterCreateWithObjectId(unpacker) + "mapClear" -> { + unpacker.skipValue() // empty map, consume it + mapClear = MapClear + } else -> unpacker.skipValue() } } @@ -290,6 +302,7 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation { objectDelete = objectDelete, mapCreateWithObjectId = mapCreateWithObjectId, counterCreateWithObjectId = counterCreateWithObjectId, + mapClear = mapClear, ) } @@ -631,6 +644,7 @@ private fun ObjectsMap.writeMsgpack(packer: MessagePacker) { if (semantics != null) fieldCount++ if (entries != null) fieldCount++ + if (clearTimeserial != null) fieldCount++ packer.packMapHeader(fieldCount) @@ -647,6 +661,11 @@ private fun ObjectsMap.writeMsgpack(packer: MessagePacker) { value.writeMsgpack(packer) } } + + if (clearTimeserial != null) { + packer.packString("clearTimeserial") + packer.packString(clearTimeserial) + } } /** @@ -657,6 +676,7 @@ private fun readObjectMap(unpacker: MessageUnpacker): ObjectsMap { var semantics: ObjectsMapSemantics? = null var entries: Map? = null + var clearTimeserial: String? = null for (i in 0 until fieldCount) { val fieldName = unpacker.unpackString().intern() @@ -684,11 +704,12 @@ private fun readObjectMap(unpacker: MessageUnpacker): ObjectsMap { } entries = tempMap } + "clearTimeserial" -> clearTimeserial = unpacker.unpackString() else -> unpacker.skipValue() } } - return ObjectsMap(semantics = semantics, entries = entries) + return ObjectsMap(semantics = semantics, entries = entries, clearTimeserial = clearTimeserial) } /** diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt index c6f602b5c..2eca29b55 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt @@ -4,7 +4,6 @@ import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.ObjectsOperationSource -import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.objectError import io.ably.lib.objects.type.livecounter.noOpCounterUpdate import io.ably.lib.objects.type.livemap.noOpMapUpdate @@ -138,10 +137,20 @@ internal abstract class BaseRealtimeObject( /** * Checks if the object is eligible for garbage collection. + * + * An object is eligible for garbage collection if it has been tombstoned and + * the time since tombstoning exceeds the specified grace period. + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned objects + * should be kept before being eligible for collection. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * @return true if the object is tombstoned and the grace period has elapsed, + * false otherwise */ - internal fun isEligibleForGc(): Boolean { + internal fun isEligibleForGc(gcGracePeriod: Long): Boolean { val currentTime = System.currentTimeMillis() - return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true } /** @@ -198,12 +207,22 @@ internal abstract class BaseRealtimeObject( /** * Called during garbage collection intervals to clean up expired entries. * + * This method is invoked periodically (every 5 minutes) by the ObjectsPool + * to perform cleanup of tombstoned data that has exceeded the grace period. + * * This method should identify and remove entries that: * - Have been marked as tombstoned - * - Have a tombstone timestamp older than the configured grace period + * - Have a tombstone timestamp older than the specified grace period + * + * @param gcGracePeriod The grace period in milliseconds that tombstoned entries + * should be kept before being eligible for removal. + * This value is retrieved from the server's connection details + * or defaults to 24 hours if not provided by the server. + * Must be greater than 2 minutes to ensure proper operation + * ordering and avoid issues with delayed operations. * * Implementations typically use single-pass removal techniques to * efficiently clean up expired data without creating temporary collections. */ - abstract fun onGCInterval() + abstract fun onGCInterval(gcGracePeriod: Long) } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index d242a9bd3..4f1ef28e5 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -111,7 +111,7 @@ internal class DefaultLiveCounter private constructor( liveCounterManager.notify(update as LiveCounterUpdate) } - override fun onGCInterval() { + override fun onGCInterval(gcGracePeriod: Long) { // Nothing to GC for a counter object return } diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index b84c55d76..8e9746d6e 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -38,6 +38,9 @@ internal class DefaultLiveMap private constructor( */ internal val data = ConcurrentHashMap() + /** @spec RTLM25 */ + internal var clearTimeserial: String? = null + /** * LiveMapManager instance for managing LiveMap operations */ @@ -174,6 +177,7 @@ internal class DefaultLiveMap private constructor( } override fun clearData(): LiveMapUpdate { + clearTimeserial = null // RTLM4 return liveMapManager.calculateUpdateFromDataDiff(data.toMap(), emptyMap()) .apply { data.clear() } } @@ -186,8 +190,8 @@ internal class DefaultLiveMap private constructor( liveMapManager.notify(update as LiveMapUpdate) } - override fun onGCInterval() { - data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() } + override fun onGCInterval(gcGracePeriod: Long) { + data.entries.removeIf { (_, entry) -> entry.isEligibleForGc(gcGracePeriod) } } companion object { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt index df2259583..f12e88d88 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapEntry.kt @@ -3,7 +3,6 @@ package io.ably.lib.objects.type.livemap import io.ably.lib.objects.* import io.ably.lib.objects.ObjectData import io.ably.lib.objects.ObjectsPool -import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.type.BaseRealtimeObject import io.ably.lib.objects.type.ObjectType import io.ably.lib.objects.type.counter.LiveCounter @@ -73,9 +72,9 @@ internal fun LiveMapEntry.getResolvedValue(objectsPool: ObjectsPool): LiveMapVal /** * Extension function to check if a LiveMapEntry is expired and ready for garbage collection */ -internal fun LiveMapEntry.isEligibleForGc(): Boolean { +internal fun LiveMapEntry.isEligibleForGc(gcGracePeriod: Long): Boolean { val currentTime = System.currentTimeMillis() - return isTombstoned && tombstonedAt?.let { currentTime - it >= ObjectsPoolDefaults.GC_GRACE_PERIOD_MS } == true + return isTombstoned && tombstonedAt?.let { currentTime - it >= gcGracePeriod } == true } private fun fromRealtimeObject(realtimeObject: BaseRealtimeObject): LiveMapValue { diff --git a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt index cd1644b38..c8990f06b 100644 --- a/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt +++ b/liveobjects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt @@ -32,6 +32,8 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang liveMap.createOperationIsMerged = false // RTLM6b liveMap.data.clear() + liveMap.clearTimeserial = objectState.map?.clearTimeserial // RTLM6i + objectState.map?.entries?.forEach { (key, entry) -> liveMap.data[key] = LiveMapEntry( isTombstoned = entry.tombstone ?: false, @@ -83,6 +85,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang liveMap.notifyUpdated(update) true // RTLM15d5b } + ObjectOperationAction.MapClear -> { + val update = applyMapClear(serial) // RTLM15d8 + liveMap.notifyUpdated(update) // RTLM15d8a + true // RTLM15d8b + } else -> { Log.w(tag, "Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4 false @@ -118,6 +125,14 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang mapSet: MapSet, // RTLM7d1 timeSerial: String?, // RTLM7d2 ): LiveMapUpdate { + // RTLM7h - skip if operation is older than the last MAP_CLEAR + val clearSerial = liveMap.clearTimeserial + if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) { + Log.v(tag, + "Skipping MAP_SET for key=\"${mapSet.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId") + return noOpMapUpdate + } + val existingEntry = liveMap.data[mapSet.key] // RTLM7a @@ -170,6 +185,14 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang timeSerial: String?, // RTLM8c2 timeStamp: Long?, // RTLM8c3 ): LiveMapUpdate { + // RTLM8g - skip if operation is older than the last MAP_CLEAR + val clearSerial = liveMap.clearTimeserial + if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) { + Log.v(tag, + "Skipping MAP_REMOVE for key=\"${mapRemove.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId") + return noOpMapUpdate + } + val existingEntry = liveMap.data[mapRemove.key] // RTLM8a @@ -212,6 +235,40 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang return LiveMapUpdate(mapOf(mapRemove.key to LiveMapUpdate.Change.REMOVED)) } + /** + * @spec RTLM24 - Applies MAP_CLEAR operation to LiveMap + */ + private fun applyMapClear(timeSerial: String?): LiveMapUpdate { + val clearSerial = liveMap.clearTimeserial + + // RTLM24c - skip if existing clear serial is strictly newer than incoming op serial + if (clearSerial != null && (timeSerial == null || clearSerial > timeSerial)) { + Log.v(tag, + "Skipping MAP_CLEAR: op serial $timeSerial <= current clear serial $clearSerial; objectId=$objectId") + return noOpMapUpdate + } + + Log.v(tag, + "Updating clearTimeserial; previous=$clearSerial, new=$timeSerial; objectId=$objectId") + liveMap.clearTimeserial = timeSerial // RTLM24d + + val update = mutableMapOf() + + // RTLM24e - remove all entries whose serial is older than (or equal to missing) the clear serial + liveMap.data.entries.removeIf { + val (key, entry) = it + val entrySerial = entry.timeserial + if (entrySerial == null || (timeSerial != null && timeSerial > entrySerial)) { + update[key] = LiveMapUpdate.Change.REMOVED + true + } else { + false + } + } + + return LiveMapUpdate(update) + } + /** * For Lww CRDT semantics (the only supported LiveMap semantic) an operation * Should only be applied if incoming serial is strictly greater than existing entry's serial. diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt index 5750046b0..21f5c6792 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/HelpersTest.kt @@ -5,12 +5,10 @@ import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState import io.ably.lib.realtime.ChannelStateListener import io.ably.lib.realtime.CompletionListener -import io.ably.lib.transport.ConnectionManager +import io.ably.lib.realtime.ConnectionEvent +import io.ably.lib.realtime.ConnectionStateListener import io.ably.lib.types.* -import io.mockk.every -import io.mockk.mockk -import io.mockk.slot -import io.mockk.verify +import io.mockk.* import kotlinx.coroutines.test.runTest import org.junit.Assert.* import org.junit.Test @@ -21,11 +19,10 @@ class HelpersTest { // sendAsync @Test fun testSendAsyncShouldQueueAccordingToClientOptions() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions().apply { queueMessages = false } - every { adapter.connectionManager } returns connManager every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -44,11 +41,10 @@ class HelpersTest { @Test fun testSendAsyncErrorPropagatesAblyException() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions() - every { adapter.connectionManager } returns connManager every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } answers { @@ -63,13 +59,61 @@ class HelpersTest { assertEquals(40000, ex.errorInfo.code) } + @Test + fun testOnGCGracePeriodImmediateInvokesBlock() { + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager + connManager.setPrivateField("objectsGCGracePeriod", 123L) + + var value: Long? = null + adapter.onGCGracePeriodUpdated { v -> value = v } + + assertEquals(123L, value) + verify(exactly = 1) { adapter.connection.on(ConnectionEvent.connected, any()) } + } + + @Test + fun testOnGCGracePeriodDeferredInvokesOnConnectedWithValue() { + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager + val connection = adapter.connection + + var value: Long? = null + every { connection.on(ConnectionEvent.connected, any()) } answers { + val listener = secondArg() + connManager.setPrivateField("objectsGCGracePeriod", 456L) + listener.onConnectionStateChanged(mockk(relaxed = true)) + } + + adapter.onGCGracePeriodUpdated { v -> value = v } + + assertEquals(456L, value) + verify(exactly = 1) { connection.on(ConnectionEvent.connected, any()) } + } + + @Test + fun testOnGCGracePeriodDeferredInvokesOnConnectedWithNull() { + val adapter = getMockObjectsAdapter() + val connection = adapter.connection + + var value: Long? = null + every { connection.on(ConnectionEvent.connected, any()) } answers { + val listener = secondArg() + listener.onConnectionStateChanged(mockk(relaxed = true)) + } + + adapter.onGCGracePeriodUpdated { v -> value = v } + + assertNull(value) + verify(exactly = 1) { connection.on(ConnectionEvent.connected, any()) } + } + @Test fun testSendAsyncThrowsWhenConnectionManagerThrows() = runTest { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager val clientOptions = ClientOptions() - every { adapter.connectionManager } returns connManager every { adapter.clientOptions } returns clientOptions every { connManager.send(any(), any(), any()) } throws RuntimeException("send failed hard") @@ -233,7 +277,7 @@ class HelpersTest { every { adapter.getChannel("ch") } returns channel channel.state = ChannelState.attaching - every { channel.once(any()) } answers { + every { channel.once(any()) } answers { val listener = firstArg() val stateChange = mockk(relaxed = true) { setPrivateField("current", ChannelState.attached) @@ -242,28 +286,28 @@ class HelpersTest { } adapter.ensureAttached("ch") - verify(exactly = 1) { channel.once(any()) } + verify(exactly = 1) { channel.once(any()) } } - @Test - fun testEnsureAttachedAttachingButReceivesNonAttachedEmitsError() = runTest { - val adapter = mockk(relaxed = true) - val channel = mockk(relaxed = true) - every { adapter.getChannel("ch") } returns channel - channel.state = ChannelState.attaching - every { channel.once(any()) } answers { - val listener = firstArg() - val stateChange = mockk(relaxed = true) { - setPrivateField("current", ChannelState.suspended) - setPrivateField("reason", clientError("Not attached").errorInfo) - } - listener.onChannelStateChanged(stateChange) - } - val ex = assertFailsWith { adapter.ensureAttached("ch") } - assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) - assertTrue(ex.errorInfo.message.contains("Not attached")) - verify(exactly = 1) { channel.once(any()) } - } + @Test + fun testEnsureAttachedAttachingButReceivesNonAttachedEmitsError() = runTest { + val adapter = mockk(relaxed = true) + val channel = mockk(relaxed = true) + every { adapter.getChannel("ch") } returns channel + channel.state = ChannelState.attaching + every { channel.once(any()) } answers { + val listener = firstArg() + val stateChange = mockk(relaxed = true) { + setPrivateField("current", ChannelState.suspended) + setPrivateField("reason", clientError("Not attached").errorInfo) + } + listener.onChannelStateChanged(stateChange) + } + val ex = assertFailsWith { adapter.ensureAttached("ch") } + assertEquals(ErrorCode.ChannelStateError.code, ex.errorInfo.code) + assertTrue(ex.errorInfo.message.contains("Not attached")) + verify(exactly = 1) { channel.once(any()) } + } @Test fun testEnsureAttachedThrowsForInvalidState() = runTest { @@ -344,9 +388,8 @@ class HelpersTest { // throwIfUnpublishableState @Test fun testThrowIfUnpublishableStateInactiveConnection() { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { adapter.connectionManager } returns connManager + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager every { connManager.isActive } returns false every { connManager.stateErrorInfo } returns serverError("not active").errorInfo @@ -357,9 +400,8 @@ class HelpersTest { @Test fun testThrowIfUnpublishableStateChannelFailed() { - val adapter = mockk(relaxed = true) - val connManager = mockk(relaxed = true) - every { adapter.connectionManager } returns connManager + val adapter = getMockObjectsAdapter() + val connManager = adapter.connectionManager every { connManager.isActive } returns true val channel = mockk(relaxed = true) every { adapter.getChannel("ch") } returns channel diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt index 57e8f45ea..4c413649e 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/ObjectMessageSizeTest.kt @@ -16,17 +16,15 @@ import io.ably.lib.objects.ensureMessageSizeWithinLimit import io.ably.lib.objects.size import io.ably.lib.transport.Defaults import io.ably.lib.types.AblyException -import io.mockk.mockk import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith class ObjectMessageSizeTest { - @Test fun testObjectMessageSizeWithinLimit() = runTest { - val mockAdapter = mockk(relaxed = true) + val mockAdapter = getMockObjectsAdapter() mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) @@ -157,7 +155,7 @@ class ObjectMessageSizeTest { @Test fun testObjectMessageSizeAboveLimit() = runTest { - val mockAdapter = mockk(relaxed = true) + val mockAdapter = getMockObjectsAdapter() mockAdapter.connectionManager.maxMessageSize = Defaults.maxMessageSize // 64 kb assertEquals(65536, mockAdapter.connectionManager.maxMessageSize) diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt index 56e4b2d20..17be76951 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt @@ -11,11 +11,13 @@ import io.ably.lib.objects.type.livemap.LiveMapManager import io.ably.lib.realtime.AblyRealtime import io.ably.lib.realtime.Channel import io.ably.lib.realtime.ChannelState +import io.ably.lib.transport.ConnectionManager import io.ably.lib.types.ChannelMode import io.ably.lib.types.ChannelOptions import io.ably.lib.types.ClientOptions import io.mockk.every import io.mockk.mockk +import io.mockk.mockkStatic import io.mockk.spyk import kotlinx.coroutines.CompletableDeferred @@ -46,9 +48,11 @@ internal fun getMockRealtimeChannel( } internal fun getMockObjectsAdapter(): ObjectsAdapter { - val mockkAdapter = mockk(relaxed = true) - every { mockkAdapter.getChannel(any()) } returns getMockRealtimeChannel("testChannelName") - return mockkAdapter + mockkStatic("io.ably.lib.objects.HelpersKt") + return mockk(relaxed = true) { + every { getChannel(any()) } returns getMockRealtimeChannel("testChannelName") + every { connectionManager } returns mockk(relaxed = true) + } } internal fun getMockObjectsPool(): ObjectsPool { diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt index 656b1e7c1..aff4f9d1a 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsPoolTest.kt @@ -19,7 +19,7 @@ class ObjectsPoolTest { @Test fun `(RTO3, RTO3a, RTO3b) An internal ObjectsPool should be used to maintain the list of objects present on a channel`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool assertNotNull(objectsPool) @@ -44,7 +44,7 @@ class ObjectsPoolTest { @Test fun `(RTO6) ObjectsPool should create zero-value objects if not exists`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = spyk(defaultRealtimeObjects.objectsPool) assertEquals(1, objectsPool.size(), "RTO3 - Should only contain the root object initially") @@ -78,7 +78,7 @@ class ObjectsPoolTest { @Test fun `(RTO4b1, RTO4b2) ObjectsPool should reset to initial pool retaining original root map`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool assertEquals(1, objectsPool.size()) val rootMap = objectsPool.get(ROOT_OBJECT_ID) as DefaultLiveMap @@ -107,7 +107,7 @@ class ObjectsPoolTest { @Test fun `(RTO5c2, RTO5c2a) ObjectsPool should delete extra object IDs`() { - val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", mockk(relaxed = true)) + val defaultRealtimeObjects = DefaultRealtimeObjects("dummyChannel", getMockObjectsAdapter()) val objectsPool = defaultRealtimeObjects.objectsPool // Add some objects diff --git a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index 5c5068c8c..adaf4eb81 100644 --- a/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/liveobjects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -1,6 +1,7 @@ package io.ably.lib.objects.unit.type.livemap import io.ably.lib.objects.* +import io.ably.lib.objects.MapClear import io.ably.lib.objects.MapCreate import io.ably.lib.objects.MapRemove import io.ably.lib.objects.MapSet @@ -1129,4 +1130,259 @@ class LiveMapManagerTest { val expectedUpdate = mapOf("key1" to LiveMapUpdate.Change.REMOVED) assertEquals(expectedUpdate, update.update) } + + @Test + fun `(RTLM24) applyMapClear removes entries older than clear serial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial1", + data = ObjectData(string = "value1") + ) + liveMap.data["key2"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial3", + data = ObjectData(string = "value2") + ) + + val operation = ObjectOperation( + action = ObjectOperationAction.MapClear, + objectId = "map:testMap@1", + mapClear = MapClear + ) + + // Apply MAP_CLEAR with serial "serial2" — between serial1 and serial3 + liveMapManager.applyOperation(operation, "serial2", null) + + assertNull(liveMap.data["key1"], "Entry at serial1 should be removed") + assertNotNull(liveMap.data["key2"], "Entry at serial3 should be kept") + assertEquals("serial2", liveMap.clearTimeserial) + } + + @Test + fun `(RTLM24c) applyMapClear skips when existing clearTimeserial is newer`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial1", + data = ObjectData(string = "value1") + ) + liveMap.clearTimeserial = "serial3" + + val operation = ObjectOperation( + action = ObjectOperationAction.MapClear, + objectId = "map:testMap@1", + mapClear = MapClear + ) + + liveMapManager.applyOperation(operation, "serial2", null) + + // clearTimeserial should remain unchanged and data should be untouched + assertEquals("serial3", liveMap.clearTimeserial) + assertNotNull(liveMap.data["key1"], "Entry should not be removed") + } + + @Test + fun `(RTLM25) clearTimeserial is set after MAP_CLEAR`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + assertNull(liveMap.clearTimeserial) + + val operation = ObjectOperation( + action = ObjectOperationAction.MapClear, + objectId = "map:testMap@1", + mapClear = MapClear + ) + + liveMapManager.applyOperation(operation, "serial1", null) + + assertEquals("serial1", liveMap.clearTimeserial) + } + + @Test + fun `(RTLM7h) applyMapSet skips when op serial is less than or equal to clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.clearTimeserial = "serial2" + + val operation = ObjectOperation( + action = ObjectOperationAction.MapSet, + objectId = "map:testMap@1", + mapSet = MapSet(key = "key1", value = ObjectData(string = "value1")) + ) + + liveMapManager.applyOperation(operation, "serial1", null) + + assertNull(liveMap.data["key1"], "Entry should NOT be added when op serial <= clearTimeserial") + } + + @Test + fun `(RTLM7h) applyMapSet applies when op serial is greater than clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.clearTimeserial = "serial1" + + val operation = ObjectOperation( + action = ObjectOperationAction.MapSet, + objectId = "map:testMap@1", + mapSet = MapSet(key = "key1", value = ObjectData(string = "value1")) + ) + + liveMapManager.applyOperation(operation, "serial2", null) + + assertNotNull(liveMap.data["key1"], "Entry should be added when op serial > clearTimeserial") + assertEquals("value1", liveMap.data["key1"]?.data?.string) + } + + @Test + fun `(RTLM8g) applyMapRemove skips when op serial is less than or equal to clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.data["key1"] = LiveMapEntry( + isTombstoned = false, + timeserial = "serial3", + data = ObjectData(string = "value1") + ) + liveMap.clearTimeserial = "serial2" + + val operation = ObjectOperation( + action = ObjectOperationAction.MapRemove, + objectId = "map:testMap@1", + mapRemove = MapRemove(key = "key1") + ) + + liveMapManager.applyOperation(operation, "serial1", null) + + assertFalse(liveMap.data["key1"]?.isTombstoned == true, "Entry should NOT be tombstoned when op serial <= clearTimeserial") + } + + @Test + fun `(RTLM6i) applyState sets clearTimeserial from objectState`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + val objectState = ObjectState( + objectId = "map:testMap@1", + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = emptyMap(), + clearTimeserial = "serial1" + ), + siteTimeserials = emptyMap(), + tombstone = false, + ) + + liveMapManager.applyState(objectState, null) + + assertEquals("serial1", liveMap.clearTimeserial) + } + + @Test + fun `(RTLM6i) applyState resets clearTimeserial to null when objectState has no clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + liveMap.clearTimeserial = "serial1" + + val objectState = ObjectState( + objectId = "map:testMap@1", + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = emptyMap(), + clearTimeserial = null + ), + siteTimeserials = emptyMap(), + tombstone = false, + ) + + liveMapManager.applyState(objectState, null) + + assertNull(liveMap.clearTimeserial) + } + + @Test + fun `(RTLM6i, RTLM6d, RTLM7h) applyState filters createOp entries older than or equal to clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + // createOp has three entries: + // key-null-serial — no timeserial (treated as pre-clear by RTLM7h) + // key-old-serial — serial1, strictly older than the clear serial (serial2) + // key-new-serial — serial3, strictly newer than the clear serial (serial2) + val createOp = ObjectOperation( + action = ObjectOperationAction.MapCreate, + objectId = "map:testMap@1", + mapCreate = MapCreate( + semantics = ObjectsMapSemantics.LWW, + entries = mapOf( + "key-null-serial" to ObjectsMapEntry( + data = ObjectData(string = "nullSerialValue"), + timeserial = null + ), + "key-old-serial" to ObjectsMapEntry( + data = ObjectData(string = "oldSerialValue"), + timeserial = "serial1" + ), + "key-new-serial" to ObjectsMapEntry( + data = ObjectData(string = "newSerialValue"), + timeserial = "serial3" + ) + ) + ) + ) + + val objectState = ObjectState( + objectId = "map:testMap@1", + map = ObjectsMap( + semantics = ObjectsMapSemantics.LWW, + entries = emptyMap(), + clearTimeserial = "serial2" // RTLM6i: set before createOp entries are merged + ), + createOp = createOp, + siteTimeserials = mapOf("site1" to "serial1"), + tombstone = false, + ) + + liveMapManager.applyState(objectState, null) + + // RTLM7h: entries with null or older-than-clear serials must be filtered out + assertNull(liveMap.data["key-null-serial"], "Entry with null serial should be filtered by RTLM7h") + assertNull(liveMap.data["key-old-serial"], "Entry with serial1 <= clearTimeserial serial2 should be filtered by RTLM7h") + // Entry whose serial is strictly newer than clearTimeserial must survive + assertNotNull(liveMap.data["key-new-serial"], "Entry with serial3 > clearTimeserial serial2 should be present") + assertEquals("newSerialValue", liveMap.data["key-new-serial"]?.data?.string) + } + + @Test + fun `(RTLM4) clearData resets clearTimeserial`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + + liveMap.clearTimeserial = "serial1" + liveMap.clearData() + + assertNull(liveMap.clearTimeserial) + } + + @Test + fun `(RTLM15d8) applyOperation returns true for MAP_CLEAR`() { + val liveMap = getDefaultLiveMapWithMockedDeps() + val liveMapManager = liveMap.LiveMapManager + + val operation = ObjectOperation( + action = ObjectOperationAction.MapClear, + objectId = "map:testMap@1", + mapClear = MapClear + ) + + val result = liveMapManager.applyOperation(operation, "serial1", null) + assertTrue(result, "applyOperation should return true for MAP_CLEAR") + } }