Skip to content

Commit 71c1cef

Browse files
authored
Merge pull request #128 from lerna-stack/fix-inconsistency-after-installsnapshot
Fix inconsistency after InstallSnapshot
2 parents 48ab644 + c000054 commit 71c1cef

File tree

20 files changed

+1398
-120
lines changed

20 files changed

+1398
-120
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3131
- The Raft leader uses the same previous `LogEntryIndex` and `Term` to all batched `AppendEntries` messages [#123](https://github.com/lerna-stack/akka-entity-replication/issues/123)
3232
- Raft Actors doesn't accept a `RequestVote(lastLogIndex < log.lastLogIndex, lastLogTerm > log.lastLogTerm)` message [#125](https://github.com/lerna-stack/akka-entity-replication/issues/125)
3333
- A new event is created even though all past events have not been applied [#130](https://github.com/lerna-stack/akka-entity-replication/issues/130)
34+
- `InstallSnapshot` can miss snapshots to copy [PR#128](https://github.com/lerna-stack/akka-entity-replication/pull/128)
35+
36+
⚠️ This change adds a new persistence event. This might don't allow downgrading after upgrading.
3437

3538
## [v2.0.0] - 2021-07-16
3639
[v2.0.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v1.0.0...v2.0.0

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ lazy val lerna = (project in file("."))
5555
// TODO 2.6.x 系に対応できる方法に変更する。
5656
"com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.15.2" % Test,
5757
"com.typesafe.akka" %% "akka-persistence-testkit" % akkaVersion % Test,
58+
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
5859
),
5960
inConfig(MultiJvm)(
6061
// multi-jvm ディレクトリをフォーマットするために必要
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# It is safe to exclude the following since CompactionCompletedTag (that is renamed to EntitySnapshotsUpdatedTag) is package-private.
2+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.persistence.CompactionCompletedTag")
3+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.persistence.CompactionCompletedTag$")
4+
# It is safe to exclude the following since SnapshotSyncManager is package-private.
5+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$CompactionEnvelope")
6+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$CompactionEnvelope$")
7+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SyncCompleteAll")
8+
ProblemFilters.exclude[MissingClassProblem]("lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SyncCompleteAll$")

src/main/protobuf/cluster_replication.proto

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,14 @@ message TimeBasedUUID {
198198
required int64 least_sig_bits = 2;
199199
}
200200

201+
message SnapshotCopied {
202+
required Offset offset = 1;
203+
required MemberIndex member_index = 2;
204+
required NormalizedShardId shard_id = 3;
205+
required Term snapshot_last_log_term = 4;
206+
required LogEntryIndex snapshot_last_log_index = 5;
207+
repeated NormalizedEntityId entity_ids = 6;
208+
}
201209

202210
// ===
203211
// model

src/main/resources/reference.conf

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ lerna.akka.entityreplication {
5757

5858
// Time to abort operations related to persistence
5959
persistence-operation-timeout = 10s
60+
61+
// Maximum size of a snapshot batch copied from leader's snapshot store to local snapshot store
62+
// Note:
63+
// If the event that updated the snapshots contains more than this batch size of entityId,
64+
// only the snapshots the single event indicates will be copied over this limit.
65+
// Copying snapshot should be executed atomically per event.
66+
max-snapshot-batch-size = 1000
6067
}
6168

6269
sharding = ${akka.cluster.sharding} {
@@ -92,6 +99,7 @@ lerna.akka.entityreplication {
9299
}
93100
event-adapter-bindings {
94101
"lerna.akka.entityreplication.raft.RaftActor$CompactionCompleted" = akka-entity-replication-raft-event-adapter
102+
"lerna.akka.entityreplication.raft.snapshot.sync.SnapshotSyncManager$SnapshotCopied" = akka-entity-replication-raft-event-adapter
95103
}
96104
}
97105

src/main/scala/lerna/akka/entityreplication/protobuf/ClusterReplicationSerializer.scala

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
4646
// raft.snapshot
4747
private val EntitySnapshotManifest = "DA"
4848
// raft.snapshot.sync
49-
private val SyncCompletedManifest = "EA"
50-
private val SyncProgressManifest = "EB"
51-
private val NoOffsetManifest = "EC"
52-
private val SequenceManifest = "ED"
53-
private val TimeBasedUUIDManifest = "EE"
49+
private val SyncCompletedManifest = "EA"
50+
private val SyncProgressManifest = "EB"
51+
private val NoOffsetManifest = "EC"
52+
private val SequenceManifest = "ED"
53+
private val TimeBasedUUIDManifest = "EE"
54+
private val SnapshotCopiedManifest = "EF"
5455
// raft.model
5556
private val NoOpManifest = "FA"
5657
// typed
@@ -87,11 +88,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
8788
// raft.snapshot
8889
EntitySnapshotManifest -> entitySnapshotFromBinary,
8990
// raft.snapshot.sync
90-
SyncCompletedManifest -> syncCompletedFromBinary,
91-
SyncProgressManifest -> syncProgressFromBinary,
92-
NoOffsetManifest -> noOffsetEnvelopeFromBinary,
93-
SequenceManifest -> sequenceEnvelopeFromBinary,
94-
TimeBasedUUIDManifest -> timeBasedUUIDEnvelopeFromBinary,
91+
SyncCompletedManifest -> syncCompletedFromBinary,
92+
SyncProgressManifest -> syncProgressFromBinary,
93+
NoOffsetManifest -> noOffsetEnvelopeFromBinary,
94+
SequenceManifest -> sequenceEnvelopeFromBinary,
95+
TimeBasedUUIDManifest -> timeBasedUUIDEnvelopeFromBinary,
96+
SnapshotCopiedManifest -> snapshotCopiedFromBinary,
9597
// raft.model
9698
NoOpManifest -> noOpFromBinary,
9799
// typed
@@ -156,11 +158,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
156158
// raft.snapsnot
157159
case _: raft.snapshot.SnapshotProtocol.EntitySnapshot => EntitySnapshotManifest
158160
// raft.snapshot.sync
159-
case _: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => SyncCompletedManifest
160-
case _: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => SyncProgressManifest
161-
case _: NoOffsetEnvelope.type => NoOffsetManifest
162-
case _: SequenceEnvelope => SequenceManifest
163-
case _: TimeBasedUUIDEnvelope => TimeBasedUUIDManifest
161+
case _: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => SyncCompletedManifest
162+
case _: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => SyncProgressManifest
163+
case _: NoOffsetEnvelope.type => NoOffsetManifest
164+
case _: SequenceEnvelope => SequenceManifest
165+
case _: TimeBasedUUIDEnvelope => TimeBasedUUIDManifest
166+
case _: raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied => SnapshotCopiedManifest
164167
// raft.model
165168
case _: raft.model.NoOp.type => NoOpManifest
166169
// typed
@@ -197,11 +200,12 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
197200
// raft.snapshot
198201
case m: raft.snapshot.SnapshotProtocol.EntitySnapshot => entitySnapShotToBinary(m)
199202
// raft.snapshot.sync
200-
case m: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => syncCompletedToBinary(m)
201-
case m: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => syncProgressToBinary(m)
202-
case m: NoOffsetEnvelope.type => noOffsetEnvelopeToBinary(m)
203-
case m: SequenceEnvelope => sequenceEnvelopeToBinary(m)
204-
case m: TimeBasedUUIDEnvelope => timeBasedUUIDEnvelopeToBinary(m)
203+
case m: raft.snapshot.sync.SnapshotSyncManager.SyncCompleted => syncCompletedToBinary(m)
204+
case m: raft.snapshot.sync.SnapshotSyncManager.SyncProgress => syncProgressToBinary(m)
205+
case m: NoOffsetEnvelope.type => noOffsetEnvelopeToBinary(m)
206+
case m: SequenceEnvelope => sequenceEnvelopeToBinary(m)
207+
case m: TimeBasedUUIDEnvelope => timeBasedUUIDEnvelopeToBinary(m)
208+
case m: raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied => snapshotCopiedToBinary(m)
205209
// raft.model
206210
case m: raft.model.NoOp.type => noOpToBinary(m)
207211
// typed
@@ -790,6 +794,30 @@ private[entityreplication] final class ClusterReplicationSerializer(val system:
790794
)
791795
}
792796

797+
private def snapshotCopiedToBinary(message: raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied): Array[Byte] = {
798+
msg.SnapshotCopied
799+
.of(
800+
offset = offsetToProto(message.offset),
801+
memberIndex = memberIndexToProto(message.memberIndex),
802+
shardId = normalizedShardIdToProto(message.shardId),
803+
snapshotLastLogTerm = termToProto(message.snapshotLastLogTerm),
804+
snapshotLastLogIndex = logEntryIndexToProto(message.snapshotLastLogIndex),
805+
entityIds = message.entityIds.map(normalizedEntityIdToProto).toSeq,
806+
).toByteArray
807+
}
808+
809+
private def snapshotCopiedFromBinary(bytes: Array[Byte]): raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied = {
810+
val proto = msg.SnapshotCopied.parseFrom(bytes)
811+
raft.snapshot.sync.SnapshotSyncManager.SnapshotCopied(
812+
offset = offsetFromProto(proto.offset),
813+
memberIndex = memberIndexFromProto(proto.memberIndex),
814+
shardId = normalizedShardIdFromProto(proto.shardId),
815+
snapshotLastLogTerm = termFromProto(proto.snapshotLastLogTerm),
816+
snapshotLastLogIndex = logEntryIndexFromProto(proto.snapshotLastLogIndex),
817+
entityIds = proto.entityIds.map(normalizedEntityIdFromProto).toSet,
818+
)
819+
}
820+
793821
// ===
794822
// model
795823
// ===

src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,12 @@ private[raft] class RaftActor(
158158
override val persistenceId: String =
159159
ActorIds.persistenceId("raft", typeName.underlying, shardId.underlying, selfMemberIndex.role)
160160

161+
/**
162+
* NOTE:
163+
* [[RaftActor]] has to use the same journal plugin as [[SnapshotSyncManager]]
164+
* because snapshot synchronization is achieved by reading both the events
165+
* [[CompactionCompleted]] which [[RaftActor]] persisted and SnapshotCopied which [[SnapshotSyncManager]] persisted.
166+
*/
161167
override def journalPluginId: String = settings.journalPluginId
162168

163169
override def journalPluginConfig: Config = settings.journalPluginAdditionalConfig

src/main/scala/lerna/akka/entityreplication/raft/RaftSettings.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ trait RaftSettings {
4646

4747
def snapshotSyncPersistenceOperationTimeout: FiniteDuration
4848

49+
def snapshotSyncMaxSnapshotBatchSize: Int
50+
4951
def clusterShardingConfig: Config
5052

5153
def raftActorAutoStartFrequency: FiniteDuration

src/main/scala/lerna/akka/entityreplication/raft/RaftSettingsImpl.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ private[entityreplication] final case class RaftSettingsImpl(
2525
compactionLogSizeCheckInterval: FiniteDuration,
2626
snapshotSyncCopyingParallelism: Int,
2727
snapshotSyncPersistenceOperationTimeout: FiniteDuration,
28+
snapshotSyncMaxSnapshotBatchSize: Int,
2829
clusterShardingConfig: Config,
2930
raftActorAutoStartFrequency: FiniteDuration,
3031
raftActorAutoStartNumberOfActors: Int,
@@ -140,6 +141,13 @@ private[entityreplication] object RaftSettingsImpl {
140141
val snapshotSyncPersistenceOperationTimeout: FiniteDuration =
141142
config.getDuration("snapshot-sync.persistence-operation-timeout").toScala
142143

144+
val snapshotSyncMaxSnapshotBatchSize: Int =
145+
config.getInt("snapshot-sync.max-snapshot-batch-size")
146+
require(
147+
snapshotSyncMaxSnapshotBatchSize > 0,
148+
s"snapshot-sync.max-snapshot-batch-size (${snapshotSyncMaxSnapshotBatchSize}) should be larger than 0",
149+
)
150+
143151
val clusterShardingConfig: Config = config.getConfig("sharding")
144152

145153
val raftActorAutoStartFrequency: FiniteDuration =
@@ -196,6 +204,7 @@ private[entityreplication] object RaftSettingsImpl {
196204
compactionLogSizeCheckInterval = compactionLogSizeCheckInterval,
197205
snapshotSyncCopyingParallelism = snapshotSyncCopyingParallelism,
198206
snapshotSyncPersistenceOperationTimeout = snapshotSyncPersistenceOperationTimeout,
207+
snapshotSyncMaxSnapshotBatchSize = snapshotSyncMaxSnapshotBatchSize,
199208
clusterShardingConfig = clusterShardingConfig,
200209
raftActorAutoStartFrequency = raftActorAutoStartFrequency,
201210
raftActorAutoStartNumberOfActors = raftActorAutoStartNumberOfActors,

src/main/scala/lerna/akka/entityreplication/raft/persistence/CompactionCompletedTag.scala renamed to src/main/scala/lerna/akka/entityreplication/raft/persistence/EntitySnapshotsUpdatedTag.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,12 @@ package lerna.akka.entityreplication.raft.persistence
33
import lerna.akka.entityreplication.model.NormalizedShardId
44
import lerna.akka.entityreplication.raft.routing.MemberIndex
55

6-
private[entityreplication] final case class CompactionCompletedTag(
6+
private[entityreplication] final case class EntitySnapshotsUpdatedTag(
77
memberIndex: MemberIndex,
88
shardId: NormalizedShardId,
99
) {
1010
private[this] val delimiter = ":"
1111

12+
// Do not change this tag format for compatibility
1213
override def toString: String = s"CompactionCompleted${delimiter}${shardId.underlying}${delimiter}${memberIndex.role}"
1314
}

0 commit comments

Comments
 (0)