Skip to content

Commit 9d0227b

Browse files
author
Taichi Yamakawa
committed
Merge remote-tracking branch 'origin/master' into add-entity-diagnostic-logs
2 parents ae71779 + 0f22a3e commit 9d0227b

File tree

3 files changed

+78
-10
lines changed

3 files changed

+78
-10
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111
- Add diagnostic logs
1212
[PR#164](https://github.com/lerna-stack/akka-entity-replication/pull/164),
13-
[PR#176](https://github.com/lerna-stack/akka-entity-replication/pull/176)
13+
[PR#176](https://github.com/lerna-stack/akka-entity-replication/pull/176),
14+
[PR#177](https://github.com/lerna-stack/akka-entity-replication/pull/177)
1415
- Add function extracting shard id from entity id to lerna.akka.entityreplication.typed.ClusterReplication
1516
[PR#172](https://github.com/lerna-stack/akka-entity-replication/pull/172)
1617
- Add function of Disabling raft actor [PR#173](https://github.com/lerna-stack/akka-entity-replication/pull/173)

src/main/scala/lerna/akka/entityreplication/raft/RaftActor.scala

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -742,6 +742,15 @@ private[raft] class RaftActor(
742742
response match {
743743
case response: SnapshotSyncManager.SyncSnapshotSucceeded =>
744744
applyDomainEvent(SnapshotSyncCompleted(response.snapshotLastLogTerm, response.snapshotLastLogIndex)) { _ =>
745+
if (log.isInfoEnabled) {
746+
log.info(
747+
"[{}] Completed snapshot synchronization: srcMemberIndex=[{}], snapshotLastLogTerm=[{}], snapshotLastLogIndex=[{}]",
748+
currentState,
749+
response.srcMemberIndex,
750+
response.snapshotLastLogTerm.term,
751+
response.snapshotLastLogIndex,
752+
)
753+
}
745754
region ! ReplicationRegion.DeliverTo(
746755
response.srcMemberIndex,
747756
InstallSnapshotSucceeded(
@@ -754,6 +763,15 @@ private[raft] class RaftActor(
754763
}
755764

756765
case response: SnapshotSyncManager.SyncSnapshotAlreadySucceeded =>
766+
if (log.isInfoEnabled) {
767+
log.info(
768+
"[{}] Completed snapshot synchronization already: srcMemberIndex=[{}], snapshotLastLogTerm=[{}], snapshotLastLogIndex=[{}]",
769+
currentState,
770+
response.srcMemberIndex,
771+
response.snapshotLastLogTerm.term,
772+
response.snapshotLastLogIndex,
773+
)
774+
}
757775
region ! ReplicationRegion.DeliverTo(
758776
response.srcMemberIndex,
759777
InstallSnapshotSucceeded(
@@ -764,7 +782,11 @@ private[raft] class RaftActor(
764782
),
765783
)
766784

767-
case _: SnapshotSyncManager.SyncSnapshotFailed => // ignore
785+
case _: SnapshotSyncManager.SyncSnapshotFailed =>
786+
// ignore
787+
if (log.isWarningEnabled) {
788+
log.warning("[{}] Failed snapshot synchronization", currentState)
789+
}
768790
}
769791

770792
private val snapshotSyncManagerName: String = ActorIds.actorName(
@@ -777,7 +799,8 @@ private[raft] class RaftActor(
777799
// Snapshot updates during compaction will break consistency
778800
if (log.isInfoEnabled)
779801
log.info(
780-
"Skipping snapshot synchronization because compaction is in progress (remaining: {}/{})",
802+
"[{}] Skipping snapshot synchronization because compaction is in progress (remaining: {}/{})",
803+
currentState,
781804
currentData.snapshottingProgress.inProgressEntities.size,
782805
currentData.snapshottingProgress.inProgressEntities.size + currentData.snapshottingProgress.completedEntities.size,
783806
)
@@ -796,6 +819,17 @@ private[raft] class RaftActor(
796819
snapshotSyncManagerName,
797820
)
798821
}
822+
if (log.isDebugEnabled) {
823+
log.debug(
824+
s"[${currentState}] Starting snapshot synchronization " +
825+
"(srcLatestSnapshotLastLogTerm=[{}], srcLatestSnapshotLastLogIndex=[{}], " +
826+
"dstLatestSnapshotLastLogTerm=[{}], dstLatestSnapshotLastLogIndex=[{}])",
827+
installSnapshot.srcLatestSnapshotLastLogTerm.term,
828+
installSnapshot.srcLatestSnapshotLastLogLogIndex,
829+
currentData.lastSnapshotStatus.snapshotLastTerm.term,
830+
currentData.lastSnapshotStatus.snapshotLastLogIndex,
831+
)
832+
}
799833
snapshotSyncManager ! SnapshotSyncManager.SyncSnapshot(
800834
srcLatestSnapshotLastLogTerm = installSnapshot.srcLatestSnapshotLastLogTerm,
801835
srcLatestSnapshotLastLogIndex = installSnapshot.srcLatestSnapshotLastLogLogIndex,

src/main/scala/lerna/akka/entityreplication/raft/snapshot/sync/SnapshotSyncManager.scala

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package lerna.akka.entityreplication.raft.snapshot.sync
33
import akka.actor.{ ActorLogging, ActorRef, Props, Status }
44
import akka.pattern.extended.ask
55
import akka.pattern.pipe
6-
import akka.persistence.{ PersistentActor, RuntimePluginConfig, SnapshotOffer }
6+
import akka.persistence.{ PersistentActor, RecoveryCompleted, RuntimePluginConfig, SnapshotOffer }
77
import akka.persistence.query.{ EventEnvelope, Offset, PersistenceQuery }
88
import akka.persistence.query.scaladsl.CurrentEventsByTagQuery
99
import akka.stream.{ KillSwitches, UniqueKillSwitch }
@@ -222,10 +222,18 @@ private[entityreplication] class SnapshotSyncManager(
222222

223223
override def receiveRecover: Receive = {
224224

225-
case SnapshotOffer(_, snapshot: SyncProgress) =>
225+
case SnapshotOffer(metadata, snapshot: SyncProgress) =>
226+
if (log.isInfoEnabled) {
227+
log.info("Loaded snapshot: metadata=[{}], snapshot=[{}]", metadata, snapshot)
228+
}
226229
this.state = snapshot
227230

228231
case event: Event => updateState(event)
232+
233+
case RecoveryCompleted =>
234+
if (log.isInfoEnabled) {
235+
log.info("Recovery completed: state=[{}]", this.state)
236+
}
229237
}
230238

231239
private[this] var state = SyncProgress(Offset.noOffset)
@@ -286,10 +294,16 @@ private[entityreplication] class SnapshotSyncManager(
286294
s"(typeName: $typeName, memberIndex: $dstMemberIndex, snapshotLastLogTerm: ${dstLatestSnapshotLastLogTerm.term}, snapshotLastLogIndex: $dstLatestSnapshotLastLogIndex)",
287295
)
288296

289-
case _: akka.persistence.SaveSnapshotSuccess =>
297+
case akka.persistence.SaveSnapshotSuccess(metadata) =>
298+
if (log.isInfoEnabled) {
299+
log.info("Succeeded to save snapshot synchronization progress: metadata=[{}]", metadata)
300+
}
290301
context.stop(self)
291302

292-
case _: akka.persistence.SaveSnapshotFailure =>
303+
case akka.persistence.SaveSnapshotFailure(metadata, cause) =>
304+
if (log.isWarningEnabled) {
305+
log.warning("Failed to save snapshot synchronization progress: metadata=[{}], cause=[{}]", metadata, cause)
306+
}
293307
context.stop(self)
294308
}
295309

@@ -300,7 +314,10 @@ private[entityreplication] class SnapshotSyncManager(
300314
dstLatestSnapshotLastLogIndex: LogEntryIndex,
301315
): Receive = {
302316

303-
case _: SyncSnapshot => // ignore
317+
case syncSnapshot: SyncSnapshot =>
318+
if (log.isDebugEnabled) {
319+
log.debug("Dropping [{}] since the snapshot synchronization is running.", syncSnapshot)
320+
}
304321

305322
case syncStatus: SyncStatus =>
306323
this.killSwitch = None
@@ -318,6 +335,13 @@ private[entityreplication] class SnapshotSyncManager(
318335
updateState(event)
319336
if (event.snapshotLastLogIndex < srcLatestSnapshotLastLogIndex) {
320337
// complete partially
338+
if (log.isDebugEnabled) {
339+
log.debug(
340+
"Snapshot synchronization partially completed and continues: {} -> {}",
341+
s"(typeName: $typeName, memberIndex: $srcMemberIndex, snapshotLastLogIndex: ${event.snapshotLastLogIndex}/${srcLatestSnapshotLastLogIndex})",
342+
s"(typeName: $typeName, memberIndex: $dstMemberIndex, snapshotLastLogTerm: ${dstLatestSnapshotLastLogTerm.term}, snapshotLastLogIndex: $dstLatestSnapshotLastLogIndex)",
343+
)
344+
}
321345
startSnapshotSynchronizationBatch(
322346
srcLatestSnapshotLastLogIndex,
323347
dstLatestSnapshotLastLogTerm,
@@ -374,8 +398,17 @@ private[entityreplication] class SnapshotSyncManager(
374398
)
375399
context.stop(self)
376400

377-
case _: akka.persistence.SaveSnapshotSuccess => // ignore: previous execution result
378-
case _: akka.persistence.SaveSnapshotFailure => // ignore: previous execution result
401+
case saveSnapshotSuccess: akka.persistence.SaveSnapshotSuccess =>
402+
// ignore: previous execution result
403+
if (log.isDebugEnabled) {
404+
log.debug("Dropping [{}] of the previous synchronization.", saveSnapshotSuccess)
405+
}
406+
407+
case saveSnapshotFailure: akka.persistence.SaveSnapshotFailure =>
408+
// ignore: previous execution result
409+
if (log.isDebugEnabled) {
410+
log.debug("Dropping [{}] of the previous synchronization.", saveSnapshotFailure)
411+
}
379412
}
380413

381414
def updateState(event: Event): Unit =

0 commit comments

Comments
 (0)