@@ -18,47 +18,49 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
1818 private lazy val serialization = SerializationExtension (system)
1919
2020 // Manifests
21+ // make them public for testing purposes
2122 // raft
22- private val BegunNewTermManifest = " AA"
23- private val VotedManifest = " AB"
24- private val DetectedNewTermManifest = " AC"
25- private val AppendedEntriesManifest = " AD"
26- private val AppendedEventManifest = " AE"
27- private val CompactionCompletedManifest = " AF"
28- private val SnapshotSyncCompletedManifest = " AG"
29- private val PersistentStateManifest = " AH"
30- private val CommandManifest = " AI"
31- private val ForwardedCommandManifest = " AJ"
23+ val BegunNewTermManifest = " AA"
24+ val VotedManifest = " AB"
25+ val DetectedNewTermManifest = " AC"
26+ val AppendedEntriesManifest = " AD"
27+ val AppendedEventManifest = " AE"
28+ val CompactionCompletedManifest = " AF"
29+ val SnapshotSyncCompletedManifest = " AG"
30+ val PersistentStateManifest = " AH"
31+ val CommandManifest = " AI"
32+ val ForwardedCommandManifest = " AJ"
33+ val SnapshotSyncStartedManifest = " AK"
3234 // raft.eventsourced
33- private val CommitLogStoreInternalEventManifest = " BA"
34- private val CommitLogStoreSaveManifest = " BB"
35- private val CommitLogStoreActorStateManifest = " BC"
36- private val CommitLogStoreAppendCommittedEntriesManifest = " BD"
37- private val CommitLogStoreAppendCommittedEntriesResponseManifest = " BE"
35+ val CommitLogStoreInternalEventManifest = " BA"
36+ val CommitLogStoreSaveManifest = " BB"
37+ val CommitLogStoreActorStateManifest = " BC"
38+ val CommitLogStoreAppendCommittedEntriesManifest = " BD"
39+ val CommitLogStoreAppendCommittedEntriesResponseManifest = " BE"
3840 // raft.protocol
39- private val RequestVoteManifest = " CA"
40- private val RequestVoteAcceptedManifest = " CB"
41- private val RequestVoteDeniedManifest = " CC"
42- private val AppendEntriesManifest = " CD"
43- private val AppendEntriesSucceededManifest = " CE"
44- private val AppendEntriesFailedManifest = " CF"
45- private val InstallSnapshotManifest = " CG"
46- private val InstallSnapshotSucceededManifest = " CH"
47- private val SuspendEntityManifest = " CI"
48- private val TryCreateEntityManifest = " CJ"
41+ val RequestVoteManifest = " CA"
42+ val RequestVoteAcceptedManifest = " CB"
43+ val RequestVoteDeniedManifest = " CC"
44+ val AppendEntriesManifest = " CD"
45+ val AppendEntriesSucceededManifest = " CE"
46+ val AppendEntriesFailedManifest = " CF"
47+ val InstallSnapshotManifest = " CG"
48+ val InstallSnapshotSucceededManifest = " CH"
49+ val SuspendEntityManifest = " CI"
50+ val TryCreateEntityManifest = " CJ"
4951 // raft.snapshot
50- private val EntitySnapshotManifest = " DA"
52+ val EntitySnapshotManifest = " DA"
5153 // raft.snapshot.sync
52- private val SyncCompletedManifest = " EA"
53- private val SyncProgressManifest = " EB"
54- private val NoOffsetManifest = " EC"
55- private val SequenceManifest = " ED"
56- private val TimeBasedUUIDManifest = " EE"
57- private val SnapshotCopiedManifest = " EF"
54+ val SyncCompletedManifest = " EA"
55+ val SyncProgressManifest = " EB"
56+ val NoOffsetManifest = " EC"
57+ val SequenceManifest = " ED"
58+ val TimeBasedUUIDManifest = " EE"
59+ val SnapshotCopiedManifest = " EF"
5860 // raft.model
59- private val NoOpManifest = " FA"
61+ val NoOpManifest = " FA"
6062 // typed
61- private val ReplicationEnvelopeManifest = " GA"
63+ val ReplicationEnvelopeManifest = " GA"
6264
6365 // Manifest -> fromBinary
6466 private val fromBinaryMap = HashMap [String , Array [Byte ] => ClusterReplicationSerializable ](
@@ -69,6 +71,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
6971 AppendedEntriesManifest -> appendedEntriesFromBinary,
7072 AppendedEventManifest -> appendedEventFromBinary,
7173 CompactionCompletedManifest -> compactionCompletedFromBinary,
74+ SnapshotSyncStartedManifest -> snapshotSyncStartedFromBinary,
7275 SnapshotSyncCompletedManifest -> snapshotSyncCompletedFromBinary,
7376 PersistentStateManifest -> persistentStateFromBinary,
7477 CommandManifest -> commandFromBinary,
@@ -141,6 +144,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
141144 case _ : raft.RaftActor .AppendedEntries => AppendedEntriesManifest
142145 case _ : raft.RaftActor .AppendedEvent => AppendedEventManifest
143146 case _ : raft.RaftActor .CompactionCompleted => CompactionCompletedManifest
147+ case _ : raft.RaftActor .SnapshotSyncStarted => SnapshotSyncStartedManifest
144148 case _ : raft.RaftActor .SnapshotSyncCompleted => SnapshotSyncCompletedManifest
145149 case _ : raft.PersistentStateData .PersistentState => PersistentStateManifest
146150 case _ : raft.RaftProtocol .Command => CommandManifest
@@ -187,6 +191,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
187191 case m : raft.RaftActor .AppendedEntries => appendedEntriesToBinary(m)
188192 case m : raft.RaftActor .AppendedEvent => appendedEventToBinary(m)
189193 case m : raft.RaftActor .CompactionCompleted => compactionCompletedToBinary(m)
194+ case m : raft.RaftActor .SnapshotSyncStarted => snapshotSyncStartedToBinary(m)
190195 case m : raft.RaftActor .SnapshotSyncCompleted => snapshotSyncCompletedToBinary(m)
191196 case m : raft.PersistentStateData .PersistentState => persistentStateToBinary(m)
192197 case m : raft.RaftProtocol .Command => commandToBinary(m)
@@ -327,6 +332,22 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
327332 )
328333 }
329334
335+ private def snapshotSyncStartedToBinary (message : raft.RaftActor .SnapshotSyncStarted ): Array [Byte ] = {
336+ msg.SnapshotSyncStarted
337+ .of(
338+ snapshotLastLogTerm = termToProto(message.snapshotLastLogTerm),
339+ snapshotLastLogIndex = logEntryIndexToProto(message.snapshotLastLogIndex),
340+ ).toByteArray
341+ }
342+
343+ private def snapshotSyncStartedFromBinary (bytes : Array [Byte ]): raft.RaftActor .SnapshotSyncStarted = {
344+ val proto = msg.SnapshotSyncStarted .parseFrom(bytes)
345+ raft.RaftActor .SnapshotSyncStarted (
346+ snapshotLastLogTerm = termFromProto(proto.snapshotLastLogTerm),
347+ snapshotLastLogIndex = logEntryIndexFromProto(proto.snapshotLastLogIndex),
348+ )
349+ }
350+
330351 private def snapshotSyncCompletedToBinary (message : raft.RaftActor .SnapshotSyncCompleted ): Array [Byte ] = {
331352 msg.SnapshotSyncCompleted
332353 .of(
@@ -976,13 +997,18 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
976997 msg.SnapshotStatus .of(
977998 snapshotLastTerm = termToProto(message.snapshotLastTerm),
978999 snapshotLastLogIndex = logEntryIndexToProto(message.snapshotLastLogIndex),
1000+ targetSnapshotLastTerm = Option (termToProto(message.targetSnapshotLastTerm)),
1001+ targetSnapshotLastLogIndex = Option (logEntryIndexToProto(message.targetSnapshotLastLogIndex)),
9791002 )
9801003 }
9811004
9821005 private def snapshotStatusFromProto (proto : msg.SnapshotStatus ): raft.model.SnapshotStatus = {
9831006 raft.model.SnapshotStatus (
9841007 snapshotLastTerm = termFromProto(proto.snapshotLastTerm),
9851008 snapshotLastLogIndex = logEntryIndexFromProto(proto.snapshotLastLogIndex),
1009+ targetSnapshotLastTerm = termFromProto(proto.targetSnapshotLastTerm.getOrElse(proto.snapshotLastTerm)),
1010+ targetSnapshotLastLogIndex =
1011+ logEntryIndexFromProto(proto.targetSnapshotLastLogIndex.getOrElse(proto.snapshotLastLogIndex)),
9861012 )
9871013 }
9881014
0 commit comments