11package lerna .akka .entityreplication .raft .snapshot
22
3+ import akka .actor .testkit .typed .scaladsl .LoggingTestKit
4+ import akka .actor .typed .scaladsl .adapter .ClassicActorSystemOps
5+
36import java .util .concurrent .atomic .AtomicInteger
47import akka .actor .{ ActorRef , ActorSystem }
58import akka .persistence .testkit .scaladsl .SnapshotTestKit
6- import akka .persistence .testkit .{ PersistenceTestKitPlugin , PersistenceTestKitSnapshotPlugin }
9+ import akka .persistence .testkit .{
10+ ProcessingResult ,
11+ ProcessingSuccess ,
12+ SnapshotOperation ,
13+ SnapshotStorage ,
14+ WriteSnapshot ,
15+ }
716import akka .testkit .TestKit
8- import com .typesafe .config .{ Config , ConfigFactory }
917import lerna .akka .entityreplication .model .{ NormalizedEntityId , TypeName }
1018import lerna .akka .entityreplication .raft .model .LogEntryIndex
1119import lerna .akka .entityreplication .raft .routing .MemberIndex
12- import lerna .akka .entityreplication .raft .snapshot .ShardSnapshotStoreFailureSpec ._
1320import lerna .akka .entityreplication .raft .snapshot .SnapshotProtocol ._
1421import lerna .akka .entityreplication .raft .{ ActorSpec , RaftSettings }
1522import lerna .akka .entityreplication .testkit .KryoSerializable
1623
24+ import scala .concurrent .Promise
25+ import scala .util .Using
26+
1727object ShardSnapshotStoreFailureSpec {
1828 final case object DummyState extends KryoSerializable
19-
20- def configWithPersistenceTestKits : Config = {
21- PersistenceTestKitPlugin .config
22- .withFallback(PersistenceTestKitSnapshotPlugin .config)
23- .withFallback(raftPersistenceConfigWithPersistenceTestKits)
24- .withFallback(ConfigFactory .load())
25- }
26-
27- private val raftPersistenceConfigWithPersistenceTestKits : Config = ConfigFactory .parseString(
28- s """
29- |lerna.akka.entityreplication.raft.persistence {
30- | journal.plugin = ${PersistenceTestKitPlugin .PluginId }
31- | snapshot-store.plugin = ${PersistenceTestKitSnapshotPlugin .PluginId }
32- | # Might be possible to use PersistenceTestKitReadJournal
33- | // query.plugin = ""
34- |}
35- | """ .stripMargin,
36- )
37-
3829}
3930
4031class ShardSnapshotStoreFailureSpec
4132 extends TestKit (
42- ActorSystem (" ShardSnapshotStoreFailureSpec" , ShardSnapshotStoreFailureSpec .configWithPersistenceTestKits),
33+ ActorSystem (" ShardSnapshotStoreFailureSpec" , ShardSnapshotStoreSpecBase .configWithPersistenceTestKits),
4334 )
4435 with ActorSpec {
36+ import ShardSnapshotStoreFailureSpec ._
4537
4638 private val snapshotTestKit = SnapshotTestKit (system)
39+ private val typeName = TypeName .from(" test" )
40+ private val memberIndex = MemberIndex (" test-role" )
4741
4842 override def beforeEach (): Unit = {
4943 super .beforeEach()
@@ -55,9 +49,9 @@ class ShardSnapshotStoreFailureSpec
5549 planAutoKill {
5650 childActorOf(
5751 ShardSnapshotStore .props(
58- TypeName .from( " test " ) ,
52+ typeName ,
5953 RaftSettings (system.settings.config),
60- MemberIndex ( " test-role " ) ,
54+ memberIndex ,
6155 ),
6256 )
6357 }
@@ -70,10 +64,11 @@ class ShardSnapshotStoreFailureSpec
7064 " ShardSnapshotStore(読み込みの異常)" should {
7165
7266 " FetchSnapshot に失敗した場合は応答無し(クライアント側でタイムアウトの実装が必要)" in {
73- val entityId = generateUniqueEntityId()
74- val shardSnapshotStore = createShardSnapshotStore()
67+ val entityId = generateUniqueEntityId()
68+ val shardSnapshotStore = createShardSnapshotStore()
69+ val snapshotStorePersistenceId = SnapshotStore .persistenceId(typeName, entityId, memberIndex)
7570
76- snapshotTestKit.failNextRead()
71+ snapshotTestKit.failNextRead(snapshotStorePersistenceId )
7772 shardSnapshotStore ! FetchSnapshot (entityId, replyTo = testActor)
7873 expectNoMessage()
7974 }
@@ -82,16 +77,100 @@ class ShardSnapshotStoreFailureSpec
8277 " ShardSnapshotStore(書き込みの異常)" should {
8378
8479 " SaveSnapshot に失敗した場合は SaveSnapshotFailure が返信される" in {
80+ val entityId = generateUniqueEntityId()
81+ val shardSnapshotStore = createShardSnapshotStore()
82+ val snapshotStorePersistenceId = SnapshotStore .persistenceId(typeName, entityId, memberIndex)
83+ val metadata = EntitySnapshotMetadata (entityId, LogEntryIndex .initial())
84+ val dummyEntityState = EntityState (DummyState )
85+ val snapshot = EntitySnapshot (metadata, dummyEntityState)
86+
87+ snapshotTestKit.failNextPersisted(snapshotStorePersistenceId)
88+ shardSnapshotStore ! SaveSnapshot (snapshot, replyTo = testActor)
89+ expectMsg(SaveSnapshotFailure (metadata))
90+ }
91+ }
92+
93+ " ShardSnapshotStore (with time-consuming writes)" should {
94+
95+ // Emulates a time-consuming write
96+ class TimeConsumingWriteSnapshotPolicy extends SnapshotStorage .SnapshotPolicies .PolicyType with AutoCloseable {
97+ val processingResultPromise = Promise [ProcessingResult ]()
98+ override def tryProcess (persistenceId : String , processingUnit : SnapshotOperation ): ProcessingResult = {
99+ processingUnit match {
100+ case _ : WriteSnapshot => processingResultPromise.future.await
101+ case _ => ProcessingSuccess
102+ }
103+ }
104+ override def close (): Unit = {
105+ processingResultPromise.trySuccess(ProcessingSuccess )
106+ }
107+ }
108+
109+ " reply with `SnapshotNotFound` to `FetchSnapshot` if it has no EntitySnapshot and is saving an EntitySnapshot" ignore {
110+ // TODO Change SnapshotStore.savingSnapshot such that this test passes.
85111 val entityId = generateUniqueEntityId()
86112 val shardSnapshotStore = createShardSnapshotStore()
87- val metadata = EntitySnapshotMetadata (entityId, LogEntryIndex .initial())
88- val dummyEntityState = EntityState (DummyState )
89- val snapshot = EntitySnapshot (metadata, dummyEntityState)
113+ val metadata = EntitySnapshotMetadata (entityId, LogEntryIndex (1 ))
114+ val snapshot = EntitySnapshot (metadata, EntityState (DummyState ))
115+
116+ Using (new TimeConsumingWriteSnapshotPolicy ()) { timeConsumingWriteSnapshotPolicy =>
117+ // Prepare: SnapshotStore is saving the snapshot
118+ snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
119+ shardSnapshotStore ! SaveSnapshot (snapshot, replyTo = testActor)
120+
121+ // Test:
122+ shardSnapshotStore ! FetchSnapshot (entityId, replyTo = testActor)
123+ expectMsg(SnapshotNotFound )
124+ }
125+ }
90126
91- snapshotTestKit.failNextPersisted()
92- shardSnapshotStore ! SaveSnapshot (snapshot, replyTo = testActor)
93- expectMsg(SaveSnapshotFailure (metadata))
127+ " reply with `SnapshotFound` to `FetchSnapshot` if it has an EntitySnapshot and is saving a new EntitySnapshot" in {
128+ val entityId = generateUniqueEntityId()
129+ val shardSnapshotStore = createShardSnapshotStore()
130+
131+ val firstSnapshotMetadata = EntitySnapshotMetadata (entityId, LogEntryIndex (1 ))
132+ val firstSnapshot =
133+ EntitySnapshot (firstSnapshotMetadata, EntityState (DummyState ))
134+ shardSnapshotStore ! SaveSnapshot (firstSnapshot, replyTo = testActor)
135+ expectMsg(SaveSnapshotSuccess (firstSnapshotMetadata))
136+
137+ Using (new TimeConsumingWriteSnapshotPolicy ()) { timeConsumingWriteSnapshotPolicy =>
138+ // Prepare: SnapshotStore is saving the second snapshot
139+ snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
140+ val secondSnapshot =
141+ EntitySnapshot (EntitySnapshotMetadata (entityId, LogEntryIndex (5 )), EntityState (DummyState ))
142+ shardSnapshotStore ! SaveSnapshot (secondSnapshot, replyTo = testActor)
143+
144+ // Test:
145+ shardSnapshotStore ! FetchSnapshot (entityId, replyTo = testActor)
146+ expectMsg(SnapshotFound (firstSnapshot))
147+ }
94148 }
149+
150+ " reply with nothing to `SaveSnapshot` and log a warning if it is saving an EntitySnapshot" in {
151+ implicit val typedSystem : akka.actor.typed.ActorSystem [Nothing ] = system.toTyped
152+
153+ val entityId = generateUniqueEntityId()
154+ val shardSnapshotStore = createShardSnapshotStore()
155+ val metadata = EntitySnapshotMetadata (entityId, LogEntryIndex (1 ))
156+ val snapshot = EntitySnapshot (metadata, EntityState (DummyState ))
157+
158+ Using (new TimeConsumingWriteSnapshotPolicy ()) { timeConsumingWriteSnapshotPolicy =>
159+ // Prepare: SnapshotStore is saving the snapshot
160+ snapshotTestKit.withPolicy(timeConsumingWriteSnapshotPolicy)
161+ shardSnapshotStore ! SaveSnapshot (snapshot, replyTo = testActor)
162+
163+ // Test:
164+ LoggingTestKit
165+ .warn(
166+ s " Saving snapshot for an entity ( $entityId) currently. Consider to increase log-size-threshold or log-size-check-interval. " ,
167+ ).expect {
168+ shardSnapshotStore ! SaveSnapshot (snapshot, replyTo = testActor)
169+ }
170+ expectNoMessage()
171+ }
172+ }
173+
95174 }
96175
97176}
0 commit comments