@@ -2,77 +2,96 @@ package lerna.akka.entityreplication.raft.snapshot
22
33import java .util .concurrent .atomic .AtomicInteger
44import akka .actor .{ ActorRef , ActorSystem }
5+ import akka .persistence .testkit .scaladsl .SnapshotTestKit
6+ import akka .persistence .testkit .{ PersistenceTestKitPlugin , PersistenceTestKitSnapshotPlugin }
57import akka .testkit .TestKit
68import com .typesafe .config .{ Config , ConfigFactory }
79import lerna .akka .entityreplication .model .{ NormalizedEntityId , TypeName }
810import lerna .akka .entityreplication .raft .model .LogEntryIndex
911import lerna .akka .entityreplication .raft .routing .MemberIndex
10- import lerna .akka .entityreplication .raft .snapshot .ShardSnapshotStoreFailureSpecBase ._
12+ import lerna .akka .entityreplication .raft .snapshot .ShardSnapshotStoreFailureSpec ._
1113import lerna .akka .entityreplication .raft .snapshot .SnapshotProtocol ._
1214import lerna .akka .entityreplication .raft .{ ActorSpec , RaftSettings }
1315import lerna .akka .entityreplication .testkit .KryoSerializable
1416
15- // snapshot-store のスタブを利用して snapshot の読み込みを失敗させる
16- class ShardSnapshotStoreLoadingFailureSpec
17- extends ShardSnapshotStoreFailureSpecBase (
18- SnapshotPluginStub .brokenLoadingSnapshotConfig.withFallback(ConfigFactory .load()),
19- ) {
17+ object ShardSnapshotStoreFailureSpec {
18+ final case object DummyState extends KryoSerializable
19+
20+ def configWithPersistenceTestKits : Config = {
21+ PersistenceTestKitPlugin .config
22+ .withFallback(PersistenceTestKitSnapshotPlugin .config)
23+ .withFallback(raftPersistenceConfigWithPersistenceTestKits)
24+ .withFallback(ConfigFactory .load())
25+ }
26+
27+ private val raftPersistenceConfigWithPersistenceTestKits : Config = ConfigFactory .parseString(
28+ s """
29+ |lerna.akka.entityreplication.raft.persistence {
30+ | journal.plugin = ${PersistenceTestKitPlugin .PluginId }
31+ | snapshot-store.plugin = ${PersistenceTestKitSnapshotPlugin .PluginId }
32+ | # Might be possible to use PersistenceTestKitReadJournal
33+ | // query.plugin = ""
34+ |}
35+ | """ .stripMargin,
36+ )
37+
38+ }
39+
40+ class ShardSnapshotStoreFailureSpec
41+ extends TestKit (
42+ ActorSystem (" ShardSnapshotStoreFailureSpec" , ShardSnapshotStoreFailureSpec .configWithPersistenceTestKits),
43+ )
44+ with ActorSpec {
45+
46+ private val snapshotTestKit = SnapshotTestKit (system)
47+
48+ override def beforeEach (): Unit = {
49+ super .beforeEach()
50+ snapshotTestKit.clearAll()
51+ snapshotTestKit.resetPolicy()
52+ }
53+
54+ def createShardSnapshotStore (): ActorRef =
55+ planAutoKill {
56+ childActorOf(
57+ ShardSnapshotStore .props(
58+ TypeName .from(" test" ),
59+ RaftSettings (system.settings.config),
60+ MemberIndex (" test-role" ),
61+ ),
62+ )
63+ }
64+
65+ val entityIdSeq = new AtomicInteger (0 )
66+
67+ def generateUniqueEntityId (): NormalizedEntityId =
68+ NormalizedEntityId .from(s " test-entity- ${entityIdSeq.incrementAndGet()}" )
2069
2170 " ShardSnapshotStore(読み込みの異常)" should {
2271
2372 " FetchSnapshot に失敗した場合は応答無し(クライアント側でタイムアウトの実装が必要)" in {
2473 val entityId = generateUniqueEntityId()
2574 val shardSnapshotStore = createShardSnapshotStore()
2675
76+ snapshotTestKit.failNextRead()
2777 shardSnapshotStore ! FetchSnapshot (entityId, replyTo = testActor)
2878 expectNoMessage()
2979 }
3080 }
31- }
32-
33- // snapshot-store のスタブを利用して snapshot の永続化を失敗させる
34- class ShardSnapshotStoreSavingFailureSpec
35- extends ShardSnapshotStoreFailureSpecBase (
36- SnapshotPluginStub .brokenSavingSnapshotConfig.withFallback(ConfigFactory .load()),
37- ) {
38-
39- private [this ] val dummyEntityState = EntityState (DummyState )
4081
4182 " ShardSnapshotStore(書き込みの異常)" should {
4283
4384 " SaveSnapshot に失敗した場合は SaveSnapshotFailure が返信される" in {
4485 val entityId = generateUniqueEntityId()
4586 val shardSnapshotStore = createShardSnapshotStore()
4687 val metadata = EntitySnapshotMetadata (entityId, LogEntryIndex .initial())
88+ val dummyEntityState = EntityState (DummyState )
4789 val snapshot = EntitySnapshot (metadata, dummyEntityState)
4890
91+ snapshotTestKit.failNextPersisted()
4992 shardSnapshotStore ! SaveSnapshot (snapshot, replyTo = testActor)
5093 expectMsg(SaveSnapshotFailure (metadata))
5194 }
5295 }
53- }
5496
55- object ShardSnapshotStoreFailureSpecBase {
56- final case object DummyState extends KryoSerializable
57- }
58-
59- abstract class ShardSnapshotStoreFailureSpecBase (config : Config )
60- extends TestKit (ActorSystem (" ShardSnapshotStoreFailureSpec" , config))
61- with ActorSpec {
62-
63- def createShardSnapshotStore (): ActorRef =
64- planAutoKill {
65- childActorOf(
66- ShardSnapshotStore .props(
67- TypeName .from(" test" ),
68- RaftSettings (system.settings.config),
69- MemberIndex (" test-role" ),
70- ),
71- )
72- }
73-
74- val entityIdSeq = new AtomicInteger (0 )
75-
76- def generateUniqueEntityId (): NormalizedEntityId =
77- NormalizedEntityId .from(s " test-entity- ${entityIdSeq.incrementAndGet()}" )
7897}
0 commit comments