Skip to content

Commit 32303e1

Browse files
authored
Merge branch 'master' into ci-coverage
2 parents 0ffbf3a + 817cba6 commit 32303e1

File tree

4 files changed

+510
-4
lines changed

4 files changed

+510
-4
lines changed

src/multi-jvm/scala/lerna/akka/entityreplication/typed/MultiSnapshotSyncSpec.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,9 @@ class MultiSnapshotSyncSpec extends MultiNodeSpec(MultiSnapshotSyncSpecConfig) w
154154
/** RaftActor should compact it's log within this timeout */
155155
private val compactionTimeout: FiniteDuration = 10.seconds
156156

157+
/** Delayed Followers should synchronize snapshot within this timeout */
158+
private val snapshotSynchronizationTimeout: FiniteDuration = 10.seconds
159+
157160
/** None if this node doesn't shut it down. */
158161
private var maybeNewSystem: Option[classic.ActorSystem] = None
159162
private var runningNodes: Seq[RoleName] = roles
@@ -241,6 +244,14 @@ class MultiSnapshotSyncSpec extends MultiNodeSpec(MultiSnapshotSyncSpecConfig) w
241244
Thread.sleep(compactionTimeout.toMillis)
242245
}
243246

247+
/** Verifies snapshot synchronization completed by inspecting logging
248+
*/
249+
private def expectSnapshotSynchronizationCompleted(): Unit = {
250+
LoggingTestKit.info("Snapshot synchronization completed").expect {
251+
Thread.sleep(snapshotSynchronizationTimeout.toMillis)
252+
}
253+
}
254+
244255
private def setValue(id: String, value: Int)(implicit timeout: Timeout): Int = {
245256
// NOTE:
246257
// Too short retryInterval (e.g.200ms) will cause premature compaction and make this test unstable
@@ -286,6 +297,13 @@ class MultiSnapshotSyncSpec extends MultiNodeSpec(MultiSnapshotSyncSpecConfig) w
286297
enterBarrier("The cluster has nodes: [4,_,5].")
287298
}
288299

300+
"The leader (which is on node4) installs snapshots to the delayed follower (node5)" in {
301+
runOn(node5) {
302+
expectSnapshotSynchronizationCompleted()
303+
}
304+
enterBarrier("The snapshots were installed")
305+
}
306+
289307
"The leader (which is on node4) replicates some log entries. Nodes ([4,5]) compacts their log entries" in {
290308
object BarrierNames {
291309
val ExpectingCompactionCompleted = "Expecting compaction completed"
@@ -325,6 +343,13 @@ class MultiSnapshotSyncSpec extends MultiNodeSpec(MultiSnapshotSyncSpecConfig) w
325343
enterBarrier("The cluster has nodes: [6,7,8].")
326344
}
327345

346+
"The leader (which is on node8) installs snapshots to the delayed follower (node7)" in {
347+
runOn(node7) {
348+
expectSnapshotSynchronizationCompleted()
349+
}
350+
enterBarrier("The snapshots were installed")
351+
}
352+
328353
"The leader (which is on node8) replicates some log entries" in {
329354
object BarrierNames {
330355
val ExpectingCompactionCompleted = "Expecting compaction completed"

src/test/scala/lerna/akka/entityreplication/raft/RaftActorCandidateSpec.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,10 @@ class RaftActorCandidateSpec extends TestKit(ActorSystem()) with RaftActorSpecBa
309309
candidate ! createAppendEntries(shardId, term2, anotherMemberIndex)
310310
expectMsg(AppendEntriesSucceeded(term2, LogEntryIndex(0), candidateMemberIndex))
311311

312-
getState(candidate).stateName should be(Follower)
312+
val state = getState(candidate)
313+
state.stateName should be(Follower)
314+
state.stateData.currentTerm should be(term2)
315+
state.stateData.leaderMember should contain(anotherMemberIndex)
313316
}
314317

315318
"AppendEntries が同じ Term を持っているときは(先に別のリーダーが当選したということなので)Follower に降格" in {
@@ -324,7 +327,9 @@ class RaftActorCandidateSpec extends TestKit(ActorSystem()) with RaftActorSpecBa
324327
candidate ! createAppendEntries(shardId, term, anotherMemberIndex)
325328
expectMsg(AppendEntriesSucceeded(term, LogEntryIndex(0), candidateMemberIndex))
326329

327-
getState(candidate).stateName should be(Follower)
330+
val state = getState(candidate)
331+
state.stateName should be(Follower)
332+
state.stateData.leaderMember should contain(anotherMemberIndex)
328333
}
329334

330335
"コマンドは保留し、フォロワーに降格した場合はリーダーに転送する" in {
@@ -440,6 +445,10 @@ class RaftActorCandidateSpec extends TestKit(ActorSystem()) with RaftActorSpecBa
440445
setState(candidate, Candidate, candidateData)
441446
candidate ! createAppendEntries(shardId, term1, leaderMemberIndex, index3, term1)
442447
expectMsg(AppendEntriesFailed(term1, candidateMemberIndex))
448+
449+
val state = getState(candidate)
450+
state.stateName should be(Follower)
451+
state.stateData.leaderMember should contain(leaderMemberIndex)
443452
}
444453

445454
"prevLogIndex の Term が prevLogTerm に一致するログエントリでない場合は AppendEntriesFailed を返す" in {
@@ -497,6 +506,7 @@ class RaftActorCandidateSpec extends TestKit(ActorSystem()) with RaftActorSpecBa
497506
val state = getState(candidate)
498507
state.stateName should be(Follower)
499508
state.stateData.currentTerm should be(leaderTerm)
509+
state.stateData.leaderMember should contain(leaderMemberIndex)
500510
}
501511

502512
"leaderCommit > commitIndex となる場合、 commitIndex に min(leaderCommit, 新規エントリの最後のインデックス) を設定" in {

src/test/scala/lerna/akka/entityreplication/raft/RaftActorFollowerSpec.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ class RaftActorFollowerSpec extends TestKit(ActorSystem()) with RaftActorSpecBas
207207
val leaderMemberIndex = candidateMemberIndex
208208
follower ! createAppendEntries(shardId, term, leaderMemberIndex)
209209
expectMsg(AppendEntriesSucceeded(term, LogEntryIndex(0), followerMemberIndex))
210+
211+
val state = getState(follower)
212+
state.stateName should be(Follower)
213+
state.stateData.leaderMember should contain(leaderMemberIndex)
210214
}
211215

212216
"自分が持っている Term より新しい場合は AppendEntries を成功させる" in {
@@ -227,6 +231,11 @@ class RaftActorFollowerSpec extends TestKit(ActorSystem()) with RaftActorSpecBas
227231
val leaderMemberIndex = candidateMemberIndex
228232
follower ! createAppendEntries(shardId, term2, leaderMemberIndex)
229233
expectMsg(AppendEntriesSucceeded(term2, LogEntryIndex(0), followerMemberIndex))
234+
235+
val state = getState(follower)
236+
state.stateName should be(Follower)
237+
state.stateData.currentTerm should be(term2)
238+
state.stateData.leaderMember should contain(leaderMemberIndex)
230239
}
231240

232241
"自分が持っている Term より古い場合は AppendEntries を失敗させる" in {
@@ -494,6 +503,10 @@ class RaftActorFollowerSpec extends TestKit(ActorSystem()) with RaftActorSpecBas
494503
logEntries,
495504
)
496505
expectMsg(AppendEntriesFailed(term1, followerMemberIndex))
506+
507+
val state = getState(follower)
508+
state.stateName should be(Follower)
509+
state.stateData.leaderMember should contain(leaderMemberIndex)
497510
}
498511

499512
"agree to a term if it receives AppendEntries which includes log entries that cannot be merged and newer Term" in {
@@ -598,6 +611,10 @@ class RaftActorFollowerSpec extends TestKit(ActorSystem()) with RaftActorSpecBas
598611
followerLogEntries.last.term.next(),
599612
)
600613
expectMsg(AppendEntriesFailed(term1, followerMemberIndex))
614+
615+
val state = getState(follower)
616+
state.stateName should be(Follower)
617+
state.stateData.leaderMember should contain(leaderMemberIndex)
601618
}
602619

603620
}

0 commit comments

Comments
 (0)