Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
objectsManager.clearSyncObjectsDataPool() // RTO4b3
objectsManager.clearSyncObjectsPool() // RTO4b3
// RTO4b5 removed — buffer already cleared by RTO4d above
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
Expand All @@ -340,7 +340,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
if (state != ChannelState.suspended) {
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
objectsPool.clearObjectsData(false)
objectsManager.clearSyncObjectsDataPool()
objectsManager.clearSyncObjectsPool()
}
}
else -> {
Expand Down
81 changes: 59 additions & 22 deletions liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import kotlinx.coroutines.CompletableDeferred
internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObjects): ObjectsStateCoordinator() {
private val tag = "ObjectsManager"
/**
* @spec RTO5 - Sync objects data pool for collecting sync messages
* @spec RTO5 - Sync objects pool for collecting sync messages
*/
private val syncObjectsDataPool = mutableMapOf<String, ObjectMessage>()
private val syncObjectsPool = mutableMapOf<String, ObjectMessage>()
private var currentSyncId: String? = null
/**
* @spec RTO7 - Buffered object operations during sync
Expand Down Expand Up @@ -59,7 +59,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

// RTO5a3 - continue current sync sequence
applyObjectSyncMessages(objectMessages) // RTO5b
applyObjectSyncMessages(objectMessages) // RTO5f

// RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync
if (syncTracker.hasSyncEnded()) {
Expand All @@ -77,7 +77,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
internal fun startNewSync(syncId: String?) {
Log.v(tag, "Starting new sync sequence: syncId=$syncId")

syncObjectsDataPool.clear() // RTO5a2a
syncObjectsPool.clear() // RTO5a2a
currentSyncId = syncId
syncCompletionWaiter = CompletableDeferred()
stateChange(ObjectsState.Syncing)
Expand All @@ -93,7 +93,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
applySync() // RTO5c1/2/7
applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
bufferedObjectOperations.clear() // RTO5c5
syncObjectsDataPool.clear() // RTO5c4
syncObjectsPool.clear() // RTO5c4
currentSyncId = null // RTO5c3
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9
stateChange(ObjectsState.Synced) // RTO5c8
Expand Down Expand Up @@ -124,11 +124,11 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

/**
* Clears the sync objects data pool.
* Clears the sync objects pool.
* Used by DefaultRealtimeObjects.handleStateChange.
*/
internal fun clearSyncObjectsDataPool() {
syncObjectsDataPool.clear()
internal fun clearSyncObjectsPool() {
syncObjectsPool.clear()
}

/**
Expand All @@ -145,7 +145,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
* @spec RTO5c - Processes sync data and updates objects pool
*/
private fun applySync() {
if (syncObjectsDataPool.isEmpty()) {
if (syncObjectsPool.isEmpty()) {
return
}

Expand All @@ -154,8 +154,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
val existingObjectUpdates = mutableListOf<Pair<BaseRealtimeObject, ObjectUpdate>>()

// RTO5c1
for ((objectId, objectMessage) in syncObjectsDataPool) {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
for ((objectId, objectMessage) in syncObjectsPool) {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5f
receivedObjectIds.add(objectId)
val existingObject = realtimeObjects.objectsPool.get(objectId)

Expand All @@ -166,7 +166,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
existingObjectUpdates.add(Pair(existingObject, update))
} else { // RTO5c1b
// RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool
val newObject = createObjectFromState(objectState)
val newObject = createObjectFromState(objectState) ?: continue // RTO5c1b1c - skip unsupported
newObject.applyObjectSync(objectMessage)
realtimeObjects.objectsPool.set(objectId, newObject)
}
Expand Down Expand Up @@ -232,9 +232,9 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

/**
* Applies sync messages to sync data pool.
* Applies sync messages to sync data pool, merging partial sync messages for the same objectId.
*
* @spec RTO5b - Collects object states during sync sequence
* @spec RTO5f - Collects and merges object states during sync sequence
*/
private fun applyObjectSyncMessages(objectMessages: List<ObjectMessage>) {
for (objectMessage in objectMessages) {
Expand All @@ -244,11 +244,44 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
}

val objectState: ObjectState = objectMessage.objectState
if (objectState.counter != null || objectState.map != null) {
syncObjectsDataPool[objectState.objectId] = objectMessage
} else {
// RTO5c1b1c - object state must contain either counter or map data
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
val objectId = objectState.objectId
val existingEntry = syncObjectsPool[objectId]

if (existingEntry == null) {
// RTO5f1 - objectId not in pool, store directly
if (objectState.counter != null || objectState.map != null) {
syncObjectsPool[objectId] = objectMessage
} else {
// RTO5c1b1c - object state must contain either counter or map data
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
}
continue
}

// RTO5f2 - objectId already in pool; this is a partial sync message, merge based on type
when {
objectState.map != null -> {
// RTO5f2a - map object: merge entries
if (objectState.tombstone) {
// RTO5f2a1 - tombstone: replace pool entry entirely
syncObjectsPool[objectId] = objectMessage
} else {
// RTO5f2a2 - merge map entries; server guarantees no duplicate keys across partials
val existingState = existingEntry.objectState!! // non-null for existing entry
val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty()
val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries)
val mergedState = existingState.copy(map = mergedMap)
syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)
}
}
objectState.counter != null -> {
// RTO5f2b - counter objects must never be split across messages
Log.e(tag, "Received partial sync message for a counter object, skipping: ${objectMessage.id}")
}
else -> {
// RTO5f2c - unsupported type, log warning and skip
Log.w(tag, "Received partial sync message for an unsupported object type, skipping: ${objectMessage.id}")
}
}
}
}
Expand All @@ -258,11 +291,15 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
*
* @spec RTO5c1b - Creates objects from object state based on type
*/
private fun createObjectFromState(objectState: ObjectState): BaseRealtimeObject {
private fun createObjectFromState(objectState: ObjectState): BaseRealtimeObject? {
return when {
objectState.counter != null -> DefaultLiveCounter.zeroValue(objectState.objectId, realtimeObjects) // RTO5c1b1a
objectState.map != null -> DefaultLiveMap.zeroValue(objectState.objectId, realtimeObjects) // RTO5c1b1b
else -> throw clientError("Object state must contain either counter or map data") // RTO5c1b1c
else -> {
// RTO5c1b1c - unsupported object type, skip gracefully
Log.w(tag, "Received unsupported object state during OBJECT_SYNC (no counter or map), skipping objectId: ${objectState.objectId}")
null
}
}
}

Expand All @@ -284,7 +321,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject

internal fun dispose() {
syncCompletionWaiter?.cancel()
syncObjectsDataPool.clear()
syncObjectsPool.clear()
bufferedObjectOperations.clear()
disposeObjectsStateListeners()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ internal abstract class BaseRealtimeObject(
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
*/
internal fun applyObjectSync(objectMessage: ObjectMessage): ObjectUpdate {
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5f
validate(objectState)
// object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation.
// should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ internal val BaseRealtimeObject.TombstonedAt: Long?
* START - DefaultRealtimeObjects dep mocks
* ======================================
*/
internal val ObjectsManager.SyncObjectsDataPool: Map<String, ObjectState>
get() = this.getPrivateField("syncObjectsDataPool")
internal val ObjectsManager.SyncObjectsPool: Map<String, ObjectMessage>
get() = this.getPrivateField("syncObjectsPool")

internal val ObjectsManager.BufferedObjectOperations: List<ObjectMessage>
get() = this.getPrivateField("bufferedObjectOperations")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import io.ably.lib.objects.type.livemap.DefaultLiveMap
import io.ably.lib.objects.type.livemap.LiveMapEntry
import io.ably.lib.objects.unit.BufferedObjectOperations
import io.ably.lib.objects.unit.ObjectsManager
import io.ably.lib.objects.unit.SyncObjectsDataPool
import io.ably.lib.objects.unit.SyncObjectsPool
import io.ably.lib.objects.unit.getMockObjectsAdapter
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
import io.ably.lib.objects.unit.getMockRealtimeChannel
Expand Down Expand Up @@ -83,7 +83,7 @@ class DefaultRealtimeObjectsTest {
defaultRealtimeObjects.ObjectsManager.endSync()
}

assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsPool.size) // RTO4b3
assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4d
assertEquals(1, defaultRealtimeObjects.objectsPool.size()) // RTO4b1 - Only root remains
assertEquals(rootObject, defaultRealtimeObjects.objectsPool.get(ROOT_OBJECT_ID)) // points to previously created root object
Expand Down Expand Up @@ -246,16 +246,16 @@ class DefaultRealtimeObjectsTest {
failCalled.await()

verify(exactly = 0) { defaultRealtimeObjects.objectsPool.clearObjectsData(any()) }
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
}

@Test
fun `(RTO4) handleStateChange(DETACHED) clears objects data and sync pool`() = runTest {
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()

// Use clearSyncObjectsDataPool (the last operation in the coroutine) as the completion signal
// Use clearSyncObjectsPool (the last operation in the coroutine) as the completion signal
val syncPoolCleared = CompletableDeferred<Unit>()
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } answers {
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() } answers {
callOriginal()
syncPoolCleared.complete(Unit)
}
Expand All @@ -265,7 +265,7 @@ class DefaultRealtimeObjectsTest {
syncPoolCleared.await()

verify(exactly = 1) { defaultRealtimeObjects.objectsPool.clearObjectsData(false) }
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
}

@Test
Expand All @@ -281,7 +281,7 @@ class DefaultRealtimeObjectsTest {
operation = ObjectOperation(
action = ObjectOperationAction.CounterInc,
objectId = "counter:test@1",
counterOp = ObjectsCounterOp(amount = 5.0)
counterInc = CounterInc(number = 5.0)
)
)
)
Expand Down Expand Up @@ -313,7 +313,7 @@ class DefaultRealtimeObjectsTest {
operation = ObjectOperation(
action = ObjectOperationAction.CounterInc,
objectId = "counter:test@1",
counterOp = ObjectsCounterOp(amount = 5.0)
counterInc = CounterInc(number = 5.0)
)
)
)
Expand Down Expand Up @@ -370,7 +370,7 @@ class DefaultRealtimeObjectsTest {
operation = ObjectOperation(
action = ObjectOperationAction.CounterInc,
objectId = "counter:test@1",
counterOp = ObjectsCounterOp(amount = 3.0)
counterInc = CounterInc(number = 3.0)
),
serial = "serial-op-1",
siteCode = "site1"
Expand Down
Loading
Loading