Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ internal enum class ObjectOperationAction(val code: Int) {
CounterCreate(3),
CounterInc(4),
ObjectDelete(5),
MapClear(6),
Unknown(-1); // code for unknown value during deserialization
}

Expand Down Expand Up @@ -109,6 +110,13 @@ internal data class CounterInc(
*/
internal object ObjectDelete

/**
* Payload for MAP_CLEAR operation.
* Spec: MCL*
* No fields - action is sufficient
*/
internal object MapClear

/**
* Payload for MAP_CREATE_WITH_OBJECT_ID operation.
* Spec: MCRO*
Expand Down Expand Up @@ -176,7 +184,13 @@ internal data class ObjectsMap(
* The map entries, indexed by key.
* Spec: OMP3b
*/
val entries: Map<String, ObjectsMapEntry>? = null
val entries: Map<String, ObjectsMapEntry>? = null,

/**
* The serial value of the last MAP_CLEAR operation applied to the map.
* Spec: OMP3c
*/
val clearTimeserial: String? = null,
)

/**
Expand Down Expand Up @@ -255,6 +269,12 @@ internal data class ObjectOperation(
* Spec: OOP3q
*/
val counterCreateWithObjectId: CounterCreateWithObjectId? = null,

/**
* Payload for MAP_CLEAR operation.
* Spec: OOP3r
*/
val mapClear: MapClear? = null,
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.ably.lib.objects.MapCreate
import io.ably.lib.objects.MapCreateWithObjectId
import io.ably.lib.objects.MapRemove
import io.ably.lib.objects.MapSet
import io.ably.lib.objects.MapClear
import io.ably.lib.objects.ObjectDelete
import io.ably.lib.objects.ObjectsMapSemantics
import io.ably.lib.objects.ObjectsCounter
Expand Down Expand Up @@ -174,6 +175,7 @@ private fun ObjectOperation.writeMsgpack(packer: MessagePacker) {
if (objectDelete != null) fieldCount++
if (mapCreateWithObjectId != null) fieldCount++
if (counterCreateWithObjectId != null) fieldCount++
if (mapClear != null) fieldCount++

packer.packMapHeader(fieldCount)

Expand Down Expand Up @@ -224,6 +226,11 @@ private fun ObjectOperation.writeMsgpack(packer: MessagePacker) {
counterCreateWithObjectId.writeMsgpack(packer)
}

if (mapClear != null) {
packer.packString("mapClear")
packer.packMapHeader(0) // empty map, no fields
}

}

/**
Expand All @@ -242,6 +249,7 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation {
var objectDelete: ObjectDelete? = null
var mapCreateWithObjectId: MapCreateWithObjectId? = null
var counterCreateWithObjectId: CounterCreateWithObjectId? = null
var mapClear: MapClear? = null

for (i in 0 until fieldCount) {
val fieldName = unpacker.unpackString().intern()
Expand Down Expand Up @@ -271,6 +279,10 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation {
}
"mapCreateWithObjectId" -> mapCreateWithObjectId = readMapCreateWithObjectId(unpacker)
"counterCreateWithObjectId" -> counterCreateWithObjectId = readCounterCreateWithObjectId(unpacker)
"mapClear" -> {
unpacker.skipValue() // empty map, consume it
mapClear = MapClear
}
else -> unpacker.skipValue()
}
}
Expand All @@ -290,6 +302,7 @@ private fun readObjectOperation(unpacker: MessageUnpacker): ObjectOperation {
objectDelete = objectDelete,
mapCreateWithObjectId = mapCreateWithObjectId,
counterCreateWithObjectId = counterCreateWithObjectId,
mapClear = mapClear,
)
}

Expand Down Expand Up @@ -631,6 +644,7 @@ private fun ObjectsMap.writeMsgpack(packer: MessagePacker) {

if (semantics != null) fieldCount++
if (entries != null) fieldCount++
if (clearTimeserial != null) fieldCount++

packer.packMapHeader(fieldCount)

Expand All @@ -647,6 +661,11 @@ private fun ObjectsMap.writeMsgpack(packer: MessagePacker) {
value.writeMsgpack(packer)
}
}

if (clearTimeserial != null) {
packer.packString("clearTimeserial")
packer.packString(clearTimeserial)
}
}

/**
Expand All @@ -657,6 +676,7 @@ private fun readObjectMap(unpacker: MessageUnpacker): ObjectsMap {

var semantics: ObjectsMapSemantics? = null
var entries: Map<String, ObjectsMapEntry>? = null
var clearTimeserial: String? = null

for (i in 0 until fieldCount) {
val fieldName = unpacker.unpackString().intern()
Expand Down Expand Up @@ -684,11 +704,12 @@ private fun readObjectMap(unpacker: MessageUnpacker): ObjectsMap {
}
entries = tempMap
}
"clearTimeserial" -> clearTimeserial = unpacker.unpackString()
else -> unpacker.skipValue()
}
}

return ObjectsMap(semantics = semantics, entries = entries)
return ObjectsMap(semantics = semantics, entries = entries, clearTimeserial = clearTimeserial)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ internal class DefaultLiveMap private constructor(
*/
internal val data = ConcurrentHashMap<String, LiveMapEntry>()

/** @spec RTLM25 */
internal var clearTimeserial: String? = null

/**
* LiveMapManager instance for managing LiveMap operations
*/
Expand Down Expand Up @@ -174,6 +177,7 @@ internal class DefaultLiveMap private constructor(
}

override fun clearData(): LiveMapUpdate {
clearTimeserial = null // RTLM4
return liveMapManager.calculateUpdateFromDataDiff(data.toMap(), emptyMap())
.apply { data.clear() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
liveMap.createOperationIsMerged = false // RTLM6b
liveMap.data.clear()

liveMap.clearTimeserial = objectState.map?.clearTimeserial // RTLM6i

objectState.map?.entries?.forEach { (key, entry) ->
liveMap.data[key] = LiveMapEntry(
isTombstoned = entry.tombstone ?: false,
Expand Down Expand Up @@ -83,6 +85,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
liveMap.notifyUpdated(update)
true // RTLM15d5b
}
ObjectOperationAction.MapClear -> {
val update = applyMapClear(serial) // RTLM15d8
liveMap.notifyUpdated(update) // RTLM15d8a
true // RTLM15d8b
}
else -> {
Log.w(tag, "Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4
false
Expand Down Expand Up @@ -118,6 +125,14 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
mapSet: MapSet, // RTLM7d1
timeSerial: String?, // RTLM7d2
): LiveMapUpdate {
// RTLM7h - skip if operation is older than the last MAP_CLEAR
val clearSerial = liveMap.clearTimeserial
if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) {
Log.v(tag,
"Skipping MAP_SET for key=\"${mapSet.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId")
return noOpMapUpdate
}

val existingEntry = liveMap.data[mapSet.key]

// RTLM7a
Expand Down Expand Up @@ -170,6 +185,14 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
timeSerial: String?, // RTLM8c2
timeStamp: Long?, // RTLM8c3
): LiveMapUpdate {
// RTLM8g - skip if operation is older than the last MAP_CLEAR
val clearSerial = liveMap.clearTimeserial
if (clearSerial != null && (timeSerial == null || clearSerial >= timeSerial)) {
Log.v(tag,
"Skipping MAP_REMOVE for key=\"${mapRemove.key}\": op serial $timeSerial <= clear serial $clearSerial; objectId=$objectId")
return noOpMapUpdate
}

val existingEntry = liveMap.data[mapRemove.key]

// RTLM8a
Expand Down Expand Up @@ -212,6 +235,40 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChang
return LiveMapUpdate(mapOf(mapRemove.key to LiveMapUpdate.Change.REMOVED))
}

/**
* @spec RTLM24 - Applies MAP_CLEAR operation to LiveMap
*/
private fun applyMapClear(timeSerial: String?): LiveMapUpdate {
val clearSerial = liveMap.clearTimeserial

// RTLM24c - skip if existing clear serial is strictly newer than incoming op serial
if (clearSerial != null && (timeSerial == null || clearSerial > timeSerial)) {
Log.v(tag,
"Skipping MAP_CLEAR: op serial $timeSerial <= current clear serial $clearSerial; objectId=$objectId")
return noOpMapUpdate
}

Log.v(tag,
"Updating clearTimeserial; previous=$clearSerial, new=$timeSerial; objectId=$objectId")
liveMap.clearTimeserial = timeSerial // RTLM24d

val update = mutableMapOf<String, LiveMapUpdate.Change>()

// RTLM24e - remove all entries whose serial is older than (or equal to missing) the clear serial
liveMap.data.entries.removeIf {
val (key, entry) = it
val entrySerial = entry.timeserial
if (entrySerial == null || (timeSerial != null && timeSerial > entrySerial)) {
update[key] = LiveMapUpdate.Change.REMOVED
true
} else {
false
}
}

return LiveMapUpdate(update)
}

/**
* For Lww CRDT semantics (the only supported LiveMap semantic) an operation
* Should only be applied if incoming serial is strictly greater than existing entry's serial.
Expand Down
Loading
Loading