Skip to content

Commit f3c1b03

Browse files
authored
Merge branch 'master' into test-state-after-append-entries-handling
2 parents 2e13329 + 14a6cc2 commit f3c1b03

File tree

15 files changed

+1260
-80
lines changed

15 files changed

+1260
-80
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
5555
- `InstallSnapshot` can miss snapshots to copy [PR#128](https://github.com/lerna-stack/akka-entity-replication/pull/128)
5656

5757
⚠️ This change adds a new persistence event. This might don't allow downgrading after upgrading.
58+
- Moving a leader during snapshot synchronization can delete committed log entries [#133](https://github.com/lerna-stack/akka-entity-replication/issues/133)
59+
60+
⚠️ This change adds a new persistence event. This might don't allow downgrading after upgrading.
5861

5962
## [v2.0.0] - 2021-07-16
6063
[v2.0.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v1.0.0...v2.0.0
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# It is safe to exclude the following since the case classes used in protobuf is not an API intend to use by users
2+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.<init>$default$3")
3+
ProblemFilters.exclude[IncompatibleSignatureProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.unapply")
4+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.apply$default$3")
5+
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.apply")
6+
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.of")
7+
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.copy")
8+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.copy$default$3")
9+
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.this")
10+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.<init>$default$3")
11+
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.of")
12+
ProblemFilters.exclude[DirectMissingMethodProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.apply")
13+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.apply$default$3")
14+
ProblemFilters.exclude[IncompatibleSignatureProblem]("lerna.akka.entityreplication.protobuf.msg.SnapshotStatus.unapply")

src/main/protobuf/cluster_replication.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
// Style Guide
22
// https://developers.google.com/protocol-buffers/docs/style
3+
//
4+
// Please see the rules described on the following to maintain compatibility:
5+
// https://developers.google.com/protocol-buffers/docs/proto#updating
36

47
syntax = "proto2";
58

@@ -42,6 +45,11 @@ message CompactionCompleted {
4245
repeated NormalizedEntityId entity_ids = 5;
4346
}
4447

48+
message SnapshotSyncStarted {
49+
required Term snapshot_last_log_term = 1;
50+
required LogEntryIndex snapshot_last_log_index = 2;
51+
}
52+
4553
message SnapshotSyncCompleted {
4654
required Term snapshot_last_log_term = 1;
4755
required LogEntryIndex snapshot_last_log_index = 2;
@@ -262,6 +270,8 @@ message ReplicatedLog {
262270
message SnapshotStatus {
263271
required Term snapshot_last_term = 1;
264272
required LogEntryIndex snapshot_last_log_index = 2;
273+
optional Term target_snapshot_last_term = 3;
274+
optional LogEntryIndex target_snapshot_last_log_index = 4;
265275
}
266276

267277
message Term {

src/main/scala/lerna/akka/entityreplication/protobuf/ClusterReplicationSerializer.scala

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,47 +18,49 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
1818
private lazy val serialization = SerializationExtension(system)
1919

2020
// Manifests
21+
// make them public for testing purposes
2122
// raft
22-
private val BegunNewTermManifest = "AA"
23-
private val VotedManifest = "AB"
24-
private val DetectedNewTermManifest = "AC"
25-
private val AppendedEntriesManifest = "AD"
26-
private val AppendedEventManifest = "AE"
27-
private val CompactionCompletedManifest = "AF"
28-
private val SnapshotSyncCompletedManifest = "AG"
29-
private val PersistentStateManifest = "AH"
30-
private val CommandManifest = "AI"
31-
private val ForwardedCommandManifest = "AJ"
23+
val BegunNewTermManifest = "AA"
24+
val VotedManifest = "AB"
25+
val DetectedNewTermManifest = "AC"
26+
val AppendedEntriesManifest = "AD"
27+
val AppendedEventManifest = "AE"
28+
val CompactionCompletedManifest = "AF"
29+
val SnapshotSyncCompletedManifest = "AG"
30+
val PersistentStateManifest = "AH"
31+
val CommandManifest = "AI"
32+
val ForwardedCommandManifest = "AJ"
33+
val SnapshotSyncStartedManifest = "AK"
3234
// raft.eventsourced
33-
private val CommitLogStoreInternalEventManifest = "BA"
34-
private val CommitLogStoreSaveManifest = "BB"
35-
private val CommitLogStoreActorStateManifest = "BC"
36-
private val CommitLogStoreAppendCommittedEntriesManifest = "BD"
37-
private val CommitLogStoreAppendCommittedEntriesResponseManifest = "BE"
35+
val CommitLogStoreInternalEventManifest = "BA"
36+
val CommitLogStoreSaveManifest = "BB"
37+
val CommitLogStoreActorStateManifest = "BC"
38+
val CommitLogStoreAppendCommittedEntriesManifest = "BD"
39+
val CommitLogStoreAppendCommittedEntriesResponseManifest = "BE"
3840
// raft.protocol
39-
private val RequestVoteManifest = "CA"
40-
private val RequestVoteAcceptedManifest = "CB"
41-
private val RequestVoteDeniedManifest = "CC"
42-
private val AppendEntriesManifest = "CD"
43-
private val AppendEntriesSucceededManifest = "CE"
44-
private val AppendEntriesFailedManifest = "CF"
45-
private val InstallSnapshotManifest = "CG"
46-
private val InstallSnapshotSucceededManifest = "CH"
47-
private val SuspendEntityManifest = "CI"
48-
private val TryCreateEntityManifest = "CJ"
41+
val RequestVoteManifest = "CA"
42+
val RequestVoteAcceptedManifest = "CB"
43+
val RequestVoteDeniedManifest = "CC"
44+
val AppendEntriesManifest = "CD"
45+
val AppendEntriesSucceededManifest = "CE"
46+
val AppendEntriesFailedManifest = "CF"
47+
val InstallSnapshotManifest = "CG"
48+
val InstallSnapshotSucceededManifest = "CH"
49+
val SuspendEntityManifest = "CI"
50+
val TryCreateEntityManifest = "CJ"
4951
// raft.snapshot
50-
private val EntitySnapshotManifest = "DA"
52+
val EntitySnapshotManifest = "DA"
5153
// raft.snapshot.sync
52-
private val SyncCompletedManifest = "EA"
53-
private val SyncProgressManifest = "EB"
54-
private val NoOffsetManifest = "EC"
55-
private val SequenceManifest = "ED"
56-
private val TimeBasedUUIDManifest = "EE"
57-
private val SnapshotCopiedManifest = "EF"
54+
val SyncCompletedManifest = "EA"
55+
val SyncProgressManifest = "EB"
56+
val NoOffsetManifest = "EC"
57+
val SequenceManifest = "ED"
58+
val TimeBasedUUIDManifest = "EE"
59+
val SnapshotCopiedManifest = "EF"
5860
// raft.model
59-
private val NoOpManifest = "FA"
61+
val NoOpManifest = "FA"
6062
// typed
61-
private val ReplicationEnvelopeManifest = "GA"
63+
val ReplicationEnvelopeManifest = "GA"
6264

6365
// Manifest -> fromBinary
6466
private val fromBinaryMap = HashMap[String, Array[Byte] => ClusterReplicationSerializable](
@@ -69,6 +71,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
6971
AppendedEntriesManifest -> appendedEntriesFromBinary,
7072
AppendedEventManifest -> appendedEventFromBinary,
7173
CompactionCompletedManifest -> compactionCompletedFromBinary,
74+
SnapshotSyncStartedManifest -> snapshotSyncStartedFromBinary,
7275
SnapshotSyncCompletedManifest -> snapshotSyncCompletedFromBinary,
7376
PersistentStateManifest -> persistentStateFromBinary,
7477
CommandManifest -> commandFromBinary,
@@ -141,6 +144,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
141144
case _: raft.RaftActor.AppendedEntries => AppendedEntriesManifest
142145
case _: raft.RaftActor.AppendedEvent => AppendedEventManifest
143146
case _: raft.RaftActor.CompactionCompleted => CompactionCompletedManifest
147+
case _: raft.RaftActor.SnapshotSyncStarted => SnapshotSyncStartedManifest
144148
case _: raft.RaftActor.SnapshotSyncCompleted => SnapshotSyncCompletedManifest
145149
case _: raft.PersistentStateData.PersistentState => PersistentStateManifest
146150
case _: raft.RaftProtocol.Command => CommandManifest
@@ -187,6 +191,7 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
187191
case m: raft.RaftActor.AppendedEntries => appendedEntriesToBinary(m)
188192
case m: raft.RaftActor.AppendedEvent => appendedEventToBinary(m)
189193
case m: raft.RaftActor.CompactionCompleted => compactionCompletedToBinary(m)
194+
case m: raft.RaftActor.SnapshotSyncStarted => snapshotSyncStartedToBinary(m)
190195
case m: raft.RaftActor.SnapshotSyncCompleted => snapshotSyncCompletedToBinary(m)
191196
case m: raft.PersistentStateData.PersistentState => persistentStateToBinary(m)
192197
case m: raft.RaftProtocol.Command => commandToBinary(m)
@@ -327,6 +332,22 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
327332
)
328333
}
329334

335+
private def snapshotSyncStartedToBinary(message: raft.RaftActor.SnapshotSyncStarted): Array[Byte] = {
336+
msg.SnapshotSyncStarted
337+
.of(
338+
snapshotLastLogTerm = termToProto(message.snapshotLastLogTerm),
339+
snapshotLastLogIndex = logEntryIndexToProto(message.snapshotLastLogIndex),
340+
).toByteArray
341+
}
342+
343+
private def snapshotSyncStartedFromBinary(bytes: Array[Byte]): raft.RaftActor.SnapshotSyncStarted = {
344+
val proto = msg.SnapshotSyncStarted.parseFrom(bytes)
345+
raft.RaftActor.SnapshotSyncStarted(
346+
snapshotLastLogTerm = termFromProto(proto.snapshotLastLogTerm),
347+
snapshotLastLogIndex = logEntryIndexFromProto(proto.snapshotLastLogIndex),
348+
)
349+
}
350+
330351
private def snapshotSyncCompletedToBinary(message: raft.RaftActor.SnapshotSyncCompleted): Array[Byte] = {
331352
msg.SnapshotSyncCompleted
332353
.of(
@@ -976,13 +997,18 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
976997
msg.SnapshotStatus.of(
977998
snapshotLastTerm = termToProto(message.snapshotLastTerm),
978999
snapshotLastLogIndex = logEntryIndexToProto(message.snapshotLastLogIndex),
1000+
targetSnapshotLastTerm = Option(termToProto(message.targetSnapshotLastTerm)),
1001+
targetSnapshotLastLogIndex = Option(logEntryIndexToProto(message.targetSnapshotLastLogIndex)),
9791002
)
9801003
}
9811004

9821005
private def snapshotStatusFromProto(proto: msg.SnapshotStatus): raft.model.SnapshotStatus = {
9831006
raft.model.SnapshotStatus(
9841007
snapshotLastTerm = termFromProto(proto.snapshotLastTerm),
9851008
snapshotLastLogIndex = logEntryIndexFromProto(proto.snapshotLastLogIndex),
1009+
targetSnapshotLastTerm = termFromProto(proto.targetSnapshotLastTerm.getOrElse(proto.snapshotLastTerm)),
1010+
targetSnapshotLastLogIndex =
1011+
logEntryIndexFromProto(proto.targetSnapshotLastLogIndex.getOrElse(proto.snapshotLastLogIndex)),
9861012
)
9871013
}
9881014

src/main/scala/lerna/akka/entityreplication/raft/Candidate.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,9 @@ private[raft] trait Candidate { this: RaftActor =>
132132
case appendEntries: AppendEntries if appendEntries.term.isOlderThan(currentData.currentTerm) =>
133133
sender() ! AppendEntriesFailed(currentData.currentTerm, selfMemberIndex)
134134

135+
case appendEntries: AppendEntries if currentData.lastSnapshotStatus.isDirty =>
136+
rejectAppendEntriesSinceSnapshotsAreDirty(appendEntries)
137+
135138
case appendEntries: AppendEntries =>
136139
if (currentData.hasMatchLogEntry(appendEntries.prevLogIndex, appendEntries.prevLogTerm)) {
137140
if (log.isDebugEnabled) log.debug("=== [Candidate] append {} ===", appendEntries)

src/main/scala/lerna/akka/entityreplication/raft/Follower.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ private[raft] trait Follower { this: RaftActor =>
8989
case appendEntries: AppendEntries if appendEntries.term.isOlderThan(currentData.currentTerm) =>
9090
sender() ! AppendEntriesFailed(currentData.currentTerm, selfMemberIndex)
9191

92+
case appendEntries: AppendEntries if currentData.lastSnapshotStatus.isDirty =>
93+
rejectAppendEntriesSinceSnapshotsAreDirty(appendEntries)
94+
9295
case appendEntries: AppendEntries =>
9396
if (currentData.hasMatchLogEntry(appendEntries.prevLogIndex, appendEntries.prevLogTerm)) {
9497
if (log.isDebugEnabled) log.debug("=== [Follower] append {} ===", appendEntries)

0 commit comments

Comments
 (0)