Skip to content

Commit 31fc4cc

Browse files
authored
Merge branch 'master' into ci-coverage
2 parents 32303e1 + 0352e28 commit 31fc4cc

File tree

4 files changed

+32
-42
lines changed

4 files changed

+32
-42
lines changed

CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1818
This feature is enabled only by using `typed.ClusterReplication`.
1919
It is highly recommended that you switch using the typed API since the classic API was deprecated.
2020

21-
- Raft actors track the progress of the event sourcing [#136](https://github.com/lerna-stack/akka-entity-replication/issues/136).
21+
- Raft actors track the progress of the event sourcing
22+
[#136](https://github.com/lerna-stack/akka-entity-replication/issues/136),
23+
[PR#137](https://github.com/lerna-stack/akka-entity-replication/pull/137),
24+
[PR#142](https://github.com/lerna-stack/akka-entity-replication/pull/142).
2225

2326
This feature ensures that
2427
- Event Sourcing won't halt even if the event-sourcing store is unavailable for a long period.
@@ -36,8 +39,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3639

3740
It requires that
3841
`lerna.akka.entityreplication.raft.compaction.preserve-log-size` is less than
39-
`lerna.akka.entityreplication.raft.compaction.log-size-threshold`.
42+
`lerna.akka.entityreplication.raft.compaction.log-size-threshold`.
4043

44+
- Compaction warns if it might not delete enough entries [PR#142](https://github.com/lerna-stack/akka-entity-replication/pull/142)
4145

4246
### Changed
4347
- Bump up Akka version to 2.6.17 [PR#98](https://github.com/lerna-stack/akka-entity-replication/pull/98)

src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -465,31 +465,34 @@ private[raft] class RaftActor(
465465
}
466466
commitLogStore ! CommitLogStoreActor.AppendCommittedEntries(shardId, Seq.empty)
467467

468-
if (
469-
currentData.replicatedLog.entries.size >= settings.compactionLogSizeThreshold
470-
&& currentData.hasLogEntriesThatCanBeCompacted
471-
) {
468+
if (currentData.replicatedLog.entries.size >= settings.compactionLogSizeThreshold) {
472469
val estimatedCompactedLogSize: Int =
473470
currentData.estimatedReplicatedLogSizeAfterCompaction(settings.compactionPreserveLogSize)
474471
if (estimatedCompactedLogSize >= settings.compactionLogSizeThreshold) {
472+
// This warning might also happen when the first SnapshotTick expires if
473+
// * there are already enough entries (>= compactionLogSizeThreshold), and eventSourcingIndex is unknown yet.
474+
// * there is no leader (e.g., split vote), and no entries are applied.
475+
// * there is the leader, but only a few entries are applied (the replication is too slow).
475476
if (log.isWarningEnabled) {
476477
log.warning(
477-
"[{}] Skipping compaction since compaction might not delete enough entries " +
478-
"(even if this compaction continues, the remaining entries will trigger new compaction at the next tick). " +
478+
"[{}] Compaction might not delete enough entries, but will continue to reduce log size as possible " +
479+
"(even if this compaction continues, the remaining entries might trigger new compaction at the next tick). " +
479480
s"Estimated compacted log size is [{}] entries (lastApplied [{}], eventSourcingIndex [{}], preserveLogSize [${settings.compactionPreserveLogSize}]), " +
480481
s"however compaction.log-size-threshold is [${settings.compactionLogSizeThreshold}] entries. " +
481-
"This warning happens if event sourcing is too slow or compaction is too fast.",
482+
"This warning might happen if event sourcing is too slow or compaction is too fast (or too slow). " +
483+
"If this warning continues, please consult settings related to event sourcing and compaction.",
482484
currentState,
483485
estimatedCompactedLogSize,
484486
currentData.lastApplied,
485487
currentData.eventSourcingIndex,
486488
)
487489
}
488-
} else if (snapshotSynchronizationIsInProgress) {
490+
}
491+
if (snapshotSynchronizationIsInProgress) {
489492
// Snapshot updates during synchronizing snapshot will break consistency
490493
if (log.isInfoEnabled)
491494
log.info("Skipping compaction because snapshot synchronization is in progress")
492-
} else {
495+
} else if (currentData.hasAppliedLogEntries) {
493496
val (term, logEntryIndex, entityIds) = currentData.resolveSnapshotTargets()
494497
applyDomainEvent(SnapshottingStarted(term, logEntryIndex, entityIds)) { _ =>
495498
if (log.isInfoEnabled)

src/main/scala/lerna/akka/entityreplication/raft/RaftMemberData.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,8 @@ private[entityreplication] trait RaftMemberData
397397
prevLogIndex == lastSnapshotStatus.targetSnapshotLastLogIndex
398398
}
399399

400-
def hasLogEntriesThatCanBeCompacted: Boolean = {
400+
/** Returns true if [[replicatedLog]] has entries that have been already applied */
401+
def hasAppliedLogEntries: Boolean = {
401402
replicatedLog.sliceEntriesFromHead(lastApplied).nonEmpty
402403
}
403404

src/test/scala/lerna/akka/entityreplication/raft/RaftActorSpec.scala

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
7575
"ログが追加された後にログの長さがしきい値を超えている場合はスナップショットがとられる" in {
7676
val snapshotStore = TestProbe()
7777
val replicationActor = TestProbe()
78-
val commitLogStore = TestProbe()
7978
val shardId = createUniqueShardId()
8079
val followerMemberIndex = createUniqueMemberIndex()
8180
val follower = createRaftActor(
@@ -84,7 +83,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
8483
shardSnapshotStore = snapshotStore.ref,
8584
replicationActor = replicationActor.ref,
8685
settings = RaftSettings(raftConfig),
87-
commitLogStore = commitLogStore.ref,
8886
)
8987

9088
val leaderMemberIndex = createUniqueMemberIndex()
@@ -105,10 +103,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
105103
leaderCommit = applicableIndex,
106104
)
107105

108-
// To ensure compaction starts, CommitLogStore should handle AppendCommittedEntries.
109-
commitLogStore.expectMsg(CommitLogStoreActor.AppendCommittedEntries(shardId, Seq.empty))
110-
commitLogStore.reply(CommitLogStoreActor.AppendCommittedEntriesResponse(applicableIndex))
111-
112106
val command =
113107
replicationActor.fishForSpecificMessage() {
114108
case msg: TakeSnapshot => msg
@@ -375,7 +369,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
375369
"prevent to start snapshot synchronization during compaction" in {
376370
val snapshotStore = TestProbe()
377371
val replicationActor = TestProbe()
378-
val commitLogStore = TestProbe()
379372
val shardId = createUniqueShardId()
380373
val followerMemberIndex = createUniqueMemberIndex()
381374
val follower = createRaftActor(
@@ -384,7 +377,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
384377
shardSnapshotStore = snapshotStore.ref,
385378
replicationActor = replicationActor.ref,
386379
settings = RaftSettings(raftConfig),
387-
commitLogStore = commitLogStore.ref,
388380
)
389381

390382
val leaderMemberIndex = createUniqueMemberIndex()
@@ -413,10 +405,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
413405
leaderCommit = applicableIndex,
414406
)
415407

416-
// To ensure compaction starts, CommitLogStore should handle AppendCommittedEntries.
417-
commitLogStore.expectMsg(CommitLogStoreActor.AppendCommittedEntries(shardId, Seq.empty))
418-
commitLogStore.reply(CommitLogStoreActor.AppendCommittedEntriesResponse(applicableIndex))
419-
420408
// wait for starting compaction
421409
val takeSnapshot =
422410
replicationActor.fishForSpecificMessage() {
@@ -440,7 +428,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
440428
"not persist snapshots that have already been persisted in the next compaction" in {
441429
val snapshotStore = TestProbe()
442430
val replicationActor = TestProbe()
443-
val commitLogStore = TestProbe()
444431
val shardId = createUniqueShardId()
445432
val followerMemberIndex = createUniqueMemberIndex()
446433
val follower = createRaftActor(
@@ -449,7 +436,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
449436
shardSnapshotStore = snapshotStore.ref,
450437
replicationActor = replicationActor.ref,
451438
settings = RaftSettings(raftConfig),
452-
commitLogStore = commitLogStore.ref,
453439
)
454440

455441
val leaderMemberIndex = createUniqueMemberIndex()
@@ -470,10 +456,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
470456
leaderCommit = LogEntryIndex(4),
471457
)
472458

473-
// To ensure compaction starts, CommitLogStore should handle AppendCommittedEntries.
474-
commitLogStore.expectMsg(CommitLogStoreActor.AppendCommittedEntries(shardId, Seq.empty))
475-
commitLogStore.reply(CommitLogStoreActor.AppendCommittedEntriesResponse(LogEntryIndex(4)))
476-
477459
replicationActor.fishForSpecificMessage() {
478460
case msg: TakeSnapshot =>
479461
msg.metadata.entityId should be(entityId1)
@@ -499,10 +481,6 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
499481
leaderCommit = LogEntryIndex(7),
500482
)
501483

502-
// To ensure compaction starts, CommitLogStore should handle AppendCommittedEntries.
503-
commitLogStore.expectMsg(CommitLogStoreActor.AppendCommittedEntries(shardId, Seq.empty))
504-
commitLogStore.reply(CommitLogStoreActor.AppendCommittedEntriesResponse(LogEntryIndex(7)))
505-
506484
// the snapshot should be only for entity2
507485
replicationActor.fishForSpecificMessage() {
508486
case msg: TakeSnapshot =>
@@ -516,16 +494,18 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
516494
}
517495
}
518496

519-
"not start if the compaction might not delete enough log entries" in {
497+
"warn and continue if the compaction might not delete enough log entries" in {
520498
val commitLogStore = TestProbe()
521499
val shardId = createUniqueShardId()
522500
val followerMemberIndex = createUniqueMemberIndex()
523501
val raftSettings = RaftSettings(raftConfig)
502+
val replicationActor = TestProbe()
524503
val follower = createRaftActor(
525504
shardId = shardId,
526505
selfMemberIndex = followerMemberIndex,
527506
commitLogStore = commitLogStore.ref,
528507
settings = raftSettings,
508+
replicationActor = replicationActor.ref,
529509
)
530510

531511
// The compaction cannot delete any entries by setting eventSourcingIndex to 0
@@ -536,11 +516,12 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
536516
assume(raftSettings.compactionLogSizeThreshold == 3)
537517
LoggingTestKit
538518
.warn(
539-
"[Follower] Skipping compaction since compaction might not delete enough entries " +
540-
"(even if this compaction continues, the remaining entries will trigger new compaction at the next tick). " +
519+
"[Follower] Compaction might not delete enough entries, but will continue to reduce log size as possible " +
520+
"(even if this compaction continues, the remaining entries might trigger new compaction at the next tick). " +
541521
"Estimated compacted log size is [3] entries (lastApplied [3], eventSourcingIndex [Some(0)], preserveLogSize [1]), " +
542522
"however compaction.log-size-threshold is [3] entries. " +
543-
"This warning happens if event sourcing is too slow or compaction is too fast.",
523+
"This warning might happen if event sourcing is too slow or compaction is too fast (or too slow). " +
524+
"If this warning continues, please consult settings related to event sourcing and compaction.",
544525
).expect {
545526
val leaderMemberIndex = createUniqueMemberIndex()
546527
val entityId = NormalizedEntityId.from("entity1")
@@ -556,11 +537,12 @@ class RaftActorSpec extends TestKit(ActorSystem()) with RaftActorSpecBase {
556537
entries = logEntries,
557538
leaderCommit = LogEntryIndex(3),
558539
)
559-
// A few seconds later (`compaction.log-size-check-interval`),
560-
// the follower will output a warn log and not start the compaction
561-
// since the estimated compacted entries size is larger than the threshold (`compaction.log-size-threshold`).
562-
}
563540

541+
val takeSnapshot = replicationActor.fishForSpecificMessage() {
542+
case msg: TakeSnapshot => msg
543+
}
544+
takeSnapshot.metadata shouldBe EntitySnapshotMetadata(entityId, LogEntryIndex(3))
545+
}
564546
}
565547

566548
}

0 commit comments

Comments
 (0)