Skip to content

Commit 4321e1c

Browse files
authored
Merge branch 'master' into revise-logs-of-sending-replication-results
2 parents 4ae43c2 + c18e940 commit 4321e1c

File tree

9 files changed

+320
-110
lines changed

9 files changed

+320
-110
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111
- Enhance leader's replication response handling [PR#160](https://github.com/lerna-stack/akka-entity-replication/pull/160)
1212
- Change event sourcing log level to debug
1313
[PR#163](https://github.com/lerna-stack/akka-entity-replication/pull/163)
14+
- ReplicationRegionRaftActorStarter uses its FQCN as its logger name
15+
[PR178](https://github.com/lerna-stack/akka-entity-replication/pull/178)
1416
- Add diagnostic logging to CommitLogStoreActor
1517
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164)
1618
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
@@ -43,6 +45,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4345
- Snapshot synchronization could remove committed log entries that not be included in snapshots
4446
[#167](https://github.com/lerna-stack/akka-entity-replication/issues/167)
4547
[#PR168](https://github.com/lerna-stack/akka-entity-replication/pull/168)
48+
- SnapshotStore doesn't reply with SnapshotNotFound sometimes
49+
[#182](https://github.com/lerna-stack/akka-entity-replication/issues/182),
50+
[#PR183](https://github.com/lerna-stack/akka-entity-replication/pull/183)
4651

4752
## [v2.1.0] - 2022-03-24
4853
[v2.1.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v2.0.0...v2.1.0

docs/typed/implementation_guide.md

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,57 @@ Consistency is ensured when it processes operations that can effect outside the
258258
The entity will output results base on the consistent up-to-date state even if under the network partitioning.
259259
The commands will be fail on one side of the partitioned network to keep consistency.
260260

261+
### Detecting data inconsistencies by Entity Implementation
262+
263+
While akka-entity-replication 2.2.0 or above closes some data inconsistency issues,
264+
detecting such inconsistency issues by entity implementation is preferred.
265+
An entity can use the following techniques to detect data inconsistencies:
266+
267+
* To detect an event duplication and miss, use an event number. As the state of the entity, the entity holds the event
268+
number (called LastAppliedEventNumber) of the last event the entity applied itself. Furthermore, the entity puts the
269+
event number (specifically, LastAppliedEventNumber plus one) to an event. The event handler of the entity verifies
270+
that the event has the expected event number (specifically, the event number must be equal to LastAppliedEventNumber
271+
plus one). If this verification fails, either an event duplication or miss has happened.
272+
* To detect an event misdelivery, put the entity ID to an event. The event handler of the entity verifies that the event
273+
has the same entity ID as its own. If this verification fails, an event misdelivery has happened.
274+
275+
The following example illustrates how an entity detects data inconsistencies:
276+
277+
```scala
278+
import lerna.akka.entityreplication.typed._
279+
280+
object MyReplicatedEntity {
281+
final case class Command()
282+
final case class Event(entityId: String, eventNo: Long)
283+
final case class State(lastAppliedEventNo: Long)
284+
285+
def apply(entityContext: ReplicatedEntityContext[Command]): Behavior[Command] =
286+
ReplicatedEntityBehavior[Command, Event, State](
287+
entityContext,
288+
emptyState = State(lastAppliedEventNo = 0),
289+
commandHandler = (state, command) => {
290+
if (??? /* the command is not processed yet */) {
291+
// Replicate an event as below:
292+
// - To detect an event duplication and miss, put the event number (`state.lastAppliedEventNo + 1`) to the event.
293+
// - To detect an event misdelivery, put the entity ID (`entityContext.entityId`) to the event.
294+
Effect.replicate(Event(entityContext.entityId, state.lastAppliedEventNo + 1))
295+
} else {
296+
// Replicate nothing
297+
???
298+
}
299+
},
300+
eventHandler = (state, event) => {
301+
// To detect an event duplication and miss, verifies the event has the expected event number:
302+
require(event.eventNo == state.lastAppliedEventNo + 1)
303+
// To detect an event misdelivery, verifies the event has the expected entity ID:
304+
require(event.entityId == entityContext.entityId)
305+
// The next state must set the event number of the event to LastAppliedEventNo:
306+
State(event.eventNo)
307+
}
308+
)
309+
}
310+
```
311+
261312
### Passivation
262313

263314
You can stop entities that are not used to reduce memory consumption.

src/main/scala/lerna/akka/entityreplication/ReplicationRegionRaftActorStarter.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ private[entityreplication] object ReplicationRegionRaftActorStarter {
3535
): Behavior[Nothing] = {
3636
Behaviors
3737
.setup[Command] { context =>
38+
context.setLoggerName(this.getClass)
3839
val (disableIds, enableIds) = ids.partition(settings.disabledShards.contains)
3940
if (disableIds.nonEmpty) {
4041
context.log.info(

src/main/scala/lerna/akka/entityreplication/raft/snapshot/SnapshotStore.scala

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ private[entityreplication] object SnapshotStore {
1717
selfMemberIndex: MemberIndex,
1818
): Props =
1919
Props(new SnapshotStore(typeName, entityId, settings, selfMemberIndex))
20+
21+
/** Returns a persistence ID of SnapshotStore */
22+
def persistenceId(typeName: TypeName, entityId: NormalizedEntityId, selfMemberIndex: MemberIndex): String =
23+
ActorIds.persistenceId("SnapshotStore", typeName.underlying, entityId.underlying, selfMemberIndex.role)
24+
2025
}
2126

2227
private[entityreplication] class SnapshotStore(
@@ -29,7 +34,7 @@ private[entityreplication] class SnapshotStore(
2934
import SnapshotProtocol._
3035

3136
override def persistenceId: String =
32-
ActorIds.persistenceId("SnapshotStore", typeName.underlying, entityId.underlying, selfMemberIndex.role)
37+
SnapshotStore.persistenceId(typeName, entityId, selfMemberIndex)
3338

3439
override def journalPluginId: String = settings.journalPluginId
3540

@@ -91,8 +96,11 @@ private[entityreplication] class SnapshotStore(
9196
cmd.entityId,
9297
)
9398
case FetchSnapshot(_, replyTo) =>
94-
prevSnapshot.foreach { s =>
95-
replyTo ! SnapshotProtocol.SnapshotFound(s)
99+
prevSnapshot match {
100+
case Some(prevSnapshot) =>
101+
replyTo ! SnapshotProtocol.SnapshotFound(prevSnapshot)
102+
case None =>
103+
replyTo ! SnapshotProtocol.SnapshotNotFound(entityId)
96104
}
97105
}
98106
case _: persistence.SaveSnapshotSuccess =>
Lines changed: 155 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,192 @@
11
package lerna.akka.entityreplication.raft.snapshot
22

3+
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
4+
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
5+
36
import java.util.concurrent.atomic.AtomicInteger
47
import akka.actor.{ ActorRef, ActorSystem }
8+
import akka.persistence.testkit.scaladsl.SnapshotTestKit
9+
import akka.persistence.testkit.{
10+
ProcessingResult,
11+
ProcessingSuccess,
12+
SnapshotOperation,
13+
SnapshotStorage,
14+
WriteSnapshot,
15+
}
516
import akka.testkit.TestKit
6-
import com.typesafe.config.{ Config, ConfigFactory }
717
import lerna.akka.entityreplication.model.{ NormalizedEntityId, TypeName }
818
import lerna.akka.entityreplication.raft.model.LogEntryIndex
919
import lerna.akka.entityreplication.raft.routing.MemberIndex
10-
import lerna.akka.entityreplication.raft.snapshot.ShardSnapshotStoreFailureSpecBase._
1120
import lerna.akka.entityreplication.raft.snapshot.SnapshotProtocol._
1221
import lerna.akka.entityreplication.raft.{ ActorSpec, RaftSettings }
1322
import lerna.akka.entityreplication.testkit.KryoSerializable
1423

15-
// snapshot-store のスタブを利用して snapshot の読み込みを失敗させる
16-
class ShardSnapshotStoreLoadingFailureSpec
17-
extends ShardSnapshotStoreFailureSpecBase(
18-
SnapshotPluginStub.brokenLoadingSnapshotConfig.withFallback(ConfigFactory.load()),
19-
) {
24+
import scala.concurrent.Promise
25+
26+
object ShardSnapshotStoreFailureSpec {
27+
final case object DummyState extends KryoSerializable
28+
}
29+
30+
class ShardSnapshotStoreFailureSpec
31+
extends TestKit(
32+
ActorSystem("ShardSnapshotStoreFailureSpec", ShardSnapshotStoreSpecBase.configWithPersistenceTestKits),
33+
)
34+
with ActorSpec {
35+
import ShardSnapshotStoreFailureSpec._
36+
37+
private val snapshotTestKit = SnapshotTestKit(system)
38+
private val typeName = TypeName.from("test")
39+
private val memberIndex = MemberIndex("test-role")
40+
41+
override def beforeEach(): Unit = {
42+
super.beforeEach()
43+
snapshotTestKit.clearAll()
44+
snapshotTestKit.resetPolicy()
45+
}
46+
47+
def createShardSnapshotStore(): ActorRef =
48+
planAutoKill {
49+
childActorOf(
50+
ShardSnapshotStore.props(
51+
typeName,
52+
RaftSettings(system.settings.config),
53+
memberIndex,
54+
),
55+
)
56+
}
57+
58+
val entityIdSeq = new AtomicInteger(0)
59+
60+
def generateUniqueEntityId(): NormalizedEntityId =
61+
NormalizedEntityId.from(s"test-entity-${entityIdSeq.incrementAndGet()}")
2062

2163
"ShardSnapshotStore(読み込みの異常)" should {
2264

2365
"FetchSnapshot に失敗した場合は応答無し(クライアント側でタイムアウトの実装が必要)" in {
24-
val entityId = generateUniqueEntityId()
25-
val shardSnapshotStore = createShardSnapshotStore()
66+
val entityId = generateUniqueEntityId()
67+
val shardSnapshotStore = createShardSnapshotStore()
68+
val snapshotStorePersistenceId = SnapshotStore.persistenceId(typeName, entityId, memberIndex)
2669

70+
snapshotTestKit.failNextRead(snapshotStorePersistenceId)
2771
shardSnapshotStore ! FetchSnapshot(entityId, replyTo = testActor)
2872
expectNoMessage()
2973
}
3074
}
31-
}
32-
33-
// snapshot-store のスタブを利用して snapshot の永続化を失敗させる
34-
class ShardSnapshotStoreSavingFailureSpec
35-
extends ShardSnapshotStoreFailureSpecBase(
36-
SnapshotPluginStub.brokenSavingSnapshotConfig.withFallback(ConfigFactory.load()),
37-
) {
38-
39-
private[this] val dummyEntityState = EntityState(DummyState)
4075

4176
"ShardSnapshotStore(書き込みの異常)" should {
4277

4378
"SaveSnapshot に失敗した場合は SaveSnapshotFailure が返信される" in {
44-
val entityId = generateUniqueEntityId()
45-
val shardSnapshotStore = createShardSnapshotStore()
46-
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex.initial())
47-
val snapshot = EntitySnapshot(metadata, dummyEntityState)
79+
val entityId = generateUniqueEntityId()
80+
val shardSnapshotStore = createShardSnapshotStore()
81+
val snapshotStorePersistenceId = SnapshotStore.persistenceId(typeName, entityId, memberIndex)
82+
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex.initial())
83+
val dummyEntityState = EntityState(DummyState)
84+
val snapshot = EntitySnapshot(metadata, dummyEntityState)
4885

86+
snapshotTestKit.failNextPersisted(snapshotStorePersistenceId)
4987
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
5088
expectMsg(SaveSnapshotFailure(metadata))
5189
}
5290
}
53-
}
5491

55-
object ShardSnapshotStoreFailureSpecBase {
56-
final case object DummyState extends KryoSerializable
57-
}
92+
"ShardSnapshotStore (with time-consuming writes)" should {
5893

59-
abstract class ShardSnapshotStoreFailureSpecBase(config: Config)
60-
extends TestKit(ActorSystem("ShardSnapshotStoreFailureSpec", config))
61-
with ActorSpec {
94+
// Emulates a time-consuming write
95+
// Note:
96+
// The promise (`processingResultPromise`) must be fulfilled.
97+
// The succeeding tests will fail unless the promise is fulfilled.
98+
class TimeConsumingWriteSnapshotPolicy extends SnapshotStorage.SnapshotPolicies.PolicyType {
99+
val processingResultPromise = Promise[ProcessingResult]()
100+
override def tryProcess(persistenceId: String, processingUnit: SnapshotOperation): ProcessingResult = {
101+
processingUnit match {
102+
case _: WriteSnapshot => processingResultPromise.future.await
103+
case _ => ProcessingSuccess
104+
}
105+
}
106+
def trySuccess(): Unit = {
107+
processingResultPromise.trySuccess(ProcessingSuccess)
108+
}
109+
}
62110

63-
def createShardSnapshotStore(): ActorRef =
64-
planAutoKill {
65-
childActorOf(
66-
ShardSnapshotStore.props(
67-
TypeName.from("test"),
68-
RaftSettings(system.settings.config),
69-
MemberIndex("test-role"),
70-
),
71-
)
111+
"reply with `SnapshotNotFound` to `FetchSnapshot` if it has no EntitySnapshot and is saving an EntitySnapshot" in {
112+
val entityId = generateUniqueEntityId()
113+
val shardSnapshotStore = createShardSnapshotStore()
114+
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
115+
val snapshot = EntitySnapshot(metadata, EntityState(DummyState))
116+
117+
val timeConsumingWriteSnapshotPolicy = new TimeConsumingWriteSnapshotPolicy()
118+
try {
119+
// Prepare: SnapshotStore is saving the snapshot
120+
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
121+
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
122+
123+
// Test:
124+
shardSnapshotStore ! FetchSnapshot(entityId, replyTo = testActor)
125+
expectMsg(SnapshotNotFound(entityId))
126+
} finally {
127+
// Cleanup:
128+
// The succeeding tests will fail unless the promise is fulfilled.
129+
timeConsumingWriteSnapshotPolicy.trySuccess()
130+
}
72131
}
73132

74-
val entityIdSeq = new AtomicInteger(0)
133+
"reply with `SnapshotFound` to `FetchSnapshot` if it has an EntitySnapshot and is saving a new EntitySnapshot" in {
134+
val entityId = generateUniqueEntityId()
135+
val shardSnapshotStore = createShardSnapshotStore()
136+
137+
val firstSnapshotMetadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
138+
val firstSnapshot =
139+
EntitySnapshot(firstSnapshotMetadata, EntityState(DummyState))
140+
shardSnapshotStore ! SaveSnapshot(firstSnapshot, replyTo = testActor)
141+
expectMsg(SaveSnapshotSuccess(firstSnapshotMetadata))
142+
143+
val timeConsumingWriteSnapshotPolicy = new TimeConsumingWriteSnapshotPolicy()
144+
try {
145+
// Prepare: SnapshotStore is saving the second snapshot
146+
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
147+
val secondSnapshot =
148+
EntitySnapshot(EntitySnapshotMetadata(entityId, LogEntryIndex(5)), EntityState(DummyState))
149+
shardSnapshotStore ! SaveSnapshot(secondSnapshot, replyTo = testActor)
150+
151+
// Test:
152+
shardSnapshotStore ! FetchSnapshot(entityId, replyTo = testActor)
153+
expectMsg(SnapshotFound(firstSnapshot))
154+
} finally {
155+
// Cleanup:
156+
// The succeeding tests will fail unless the promise is fulfilled.
157+
timeConsumingWriteSnapshotPolicy.trySuccess()
158+
}
159+
}
160+
161+
"reply with nothing to `SaveSnapshot` and log a warning if it is saving an EntitySnapshot" in {
162+
implicit val typedSystem: akka.actor.typed.ActorSystem[Nothing] = system.toTyped
163+
164+
val entityId = generateUniqueEntityId()
165+
val shardSnapshotStore = createShardSnapshotStore()
166+
val metadata = EntitySnapshotMetadata(entityId, LogEntryIndex(1))
167+
val snapshot = EntitySnapshot(metadata, EntityState(DummyState))
168+
169+
val timeConsumingWriteSnapshotPolicy = new TimeConsumingWriteSnapshotPolicy()
170+
try {
171+
// Prepare: SnapshotStore is saving the snapshot
172+
snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
173+
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
174+
175+
// Test:
176+
LoggingTestKit
177+
.warn(
178+
s"Saving snapshot for an entity ($entityId) currently. Consider to increase log-size-threshold or log-size-check-interval.",
179+
).expect {
180+
shardSnapshotStore ! SaveSnapshot(snapshot, replyTo = testActor)
181+
}
182+
expectNoMessage()
183+
} finally {
184+
// Cleanup:
185+
// The succeeding tests will fail unless the promise is fulfilled.
186+
timeConsumingWriteSnapshotPolicy.trySuccess()
187+
}
188+
}
189+
190+
}
75191

76-
def generateUniqueEntityId(): NormalizedEntityId =
77-
NormalizedEntityId.from(s"test-entity-${entityIdSeq.incrementAndGet()}")
78192
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package lerna.akka.entityreplication.raft.snapshot
2+
3+
import akka.persistence.testkit.{ PersistenceTestKitPlugin, PersistenceTestKitSnapshotPlugin }
4+
import com.typesafe.config.{ Config, ConfigFactory }
5+
6+
object ShardSnapshotStoreSpecBase {
7+
8+
def configWithPersistenceTestKits: Config = {
9+
PersistenceTestKitPlugin.config
10+
.withFallback(PersistenceTestKitSnapshotPlugin.config)
11+
.withFallback(raftPersistenceConfigWithPersistenceTestKits)
12+
.withFallback(ConfigFactory.load())
13+
}
14+
15+
private val raftPersistenceConfigWithPersistenceTestKits: Config = ConfigFactory.parseString(
16+
s"""
17+
|lerna.akka.entityreplication.raft.persistence {
18+
| journal.plugin = ${PersistenceTestKitPlugin.PluginId}
19+
| snapshot-store.plugin = ${PersistenceTestKitSnapshotPlugin.PluginId}
20+
| # Might be possible to use PersistenceTestKitReadJournal
21+
| // query.plugin = ""
22+
|}
23+
|""".stripMargin,
24+
)
25+
26+
}

0 commit comments

Comments
 (0)