@@ -4,7 +4,7 @@ import akka.actor.typed.scaladsl.Behaviors
44import akka .actor .typed .scaladsl .adapter ._
55import lerna .akka .entityreplication .raft .RaftProtocol
66import lerna .akka .entityreplication .raft .RaftProtocol .EntityCommand
7- import lerna .akka .entityreplication .raft .model .LogEntryIndex
7+ import lerna .akka .entityreplication .raft .model .{ LogEntry , LogEntryIndex }
88import lerna .akka .entityreplication .raft .protocol .{ FetchEntityEvents , FetchEntityEventsResponse }
99import lerna .akka .entityreplication .raft .snapshot .SnapshotProtocol
1010import lerna .akka .entityreplication .raft .snapshot .SnapshotProtocol .EntitySnapshot
@@ -29,6 +29,8 @@ private[entityreplication] class Recovering[Command, Event, State](
2929
3030 import Recovering ._
3131
32+ override def stateName : String = " Recovering"
33+
3234 def createBehavior (
3335 shardSnapshotStore : ActorRef [SnapshotProtocol .Command ],
3436 recoveryIndex : LogEntryIndex ,
@@ -45,12 +47,27 @@ private[entityreplication] class Recovering[Command, Event, State](
4547 case FetchEntityEventsResponse (events) => RaftProtocol .RecoveryState (events, snapshot)
4648 }
4749
50+ if (context.log.isTraceEnabled) {
51+ context.log.trace(
52+ " [{}] Sending FetchSnapshot: entityId=[{}], to=[{}]" ,
53+ stateName,
54+ setup.replicationId.entityId.raw,
55+ shardSnapshotStore,
56+ )
57+ }
4858 shardSnapshotStore ! SnapshotProtocol .FetchSnapshot (
4959 setup.replicationId.entityId,
5060 replyTo = fetchSnapshotResponseMapper.toClassic,
5161 )
5262
5363 Behaviors .withTimers { scheduler =>
64+ if (context.log.isTraceEnabled) {
65+ context.log.trace(
66+ " [{}] Starting single RecoveryTimeoutTimer: delay=[{}]" ,
67+ stateName,
68+ setup.settings.recoveryEntityTimeout,
69+ )
70+ }
5471 scheduler.startSingleTimer(
5572 RecoveryTimeoutTimer ,
5673 RaftProtocol .RecoveryTimeout ,
@@ -59,35 +76,103 @@ private[entityreplication] class Recovering[Command, Event, State](
5976 Behaviors
6077 .receiveMessage[EntityCommand ] {
6178 case command : RaftProtocol .ApplySnapshot =>
79+ if (context.log.isTraceEnabled) {
80+ context.log.trace(
81+ " [{}] Received ApplySnapshot: index=[{}], entityId=[{}], stateType=[{}]" ,
82+ stateName,
83+ command.entitySnapshot.map(_.metadata.logEntryIndex),
84+ command.entitySnapshot.map(_.metadata.entityId.raw),
85+ command.entitySnapshot.map(_.state.underlying.getClass.getName),
86+ )
87+ }
6288 val snapshotIndex = command.entitySnapshot match {
6389 case Some (snapshot) => snapshot.metadata.logEntryIndex
6490 case None => LogEntryIndex .initial()
6591 }
92+ val replyTo =
93+ fetchEntityEventsResponseMapper(command.entitySnapshot)
94+ if (context.log.isTraceEnabled) {
95+ context.log.trace(
96+ " [{}] Sending FetchEntityEvents: entityId=[{}], fromIndex=[{}], toIndex=[{}], replyTo=[{}], to=[{}]" ,
97+ stateName,
98+ setup.replicationId.entityId.raw,
99+ snapshotIndex.next(),
100+ recoveryIndex,
101+ replyTo,
102+ setup.shard,
103+ )
104+ }
66105 setup.shard ! FetchEntityEvents (
67106 setup.replicationId.entityId,
68107 from = snapshotIndex.next(),
69108 to = recoveryIndex,
70- fetchEntityEventsResponseMapper(command.entitySnapshot) ,
109+ replyTo ,
71110 )
72111 Behaviors .same
73112 case command : RaftProtocol .RecoveryState =>
113+ if (context.log.isTraceEnabled) {
114+ def toLogMessage (logEntry : LogEntry ): String = {
115+ val entityId = logEntry.event.entityId.map(_.raw)
116+ val eventType = logEntry.event.event.getClass.getName
117+ s " index= ${logEntry.index}, term= ${logEntry.term.term}, entityId= $entityId, eventType= $eventType"
118+ }
119+ context.log.trace(
120+ " [{}] Received RecoveryState: " +
121+ " snapshot.index=[{}], snapshot.entityId=[{}], snapshot.stateType=[{}], " +
122+ " events.size=[{}], events.head=[{}], events.last=[{}]" ,
123+ stateName,
124+ command.snapshot.map(_.metadata.logEntryIndex),
125+ command.snapshot.map(_.metadata.entityId.raw),
126+ command.snapshot.map(_.state.underlying.getClass),
127+ command.events.size,
128+ command.events.headOption.map(toLogMessage),
129+ command.events.lastOption.map(toLogMessage),
130+ )
131+ }
74132 scheduler.cancel(RecoveryTimeoutTimer )
75133 receiveRecoveryState(command)
76134 case RaftProtocol .RecoveryTimeout =>
77135 if (context.log.isInfoEnabled)
78136 context.log.info(
79- " Entity (name: {}) recovering timed out. It will be retried later." ,
137+ " [{}] Entity (name: [{}]) recovering timed out. It will be retried later." ,
138+ stateName,
80139 setup.entityContext.entityId,
81140 )
82141 // TODO: Enable backoff to prevent cascade failures
83142 throw RaftProtocol .EntityRecoveryTimeoutException (context.self.path)
84143 case command : RaftProtocol .ProcessCommand =>
144+ if (context.log.isTraceEnabled) {
145+ context.log.trace(
146+ " [{}] Stashing ProcessCommand: commandType=[{}]" ,
147+ stateName,
148+ command.command.getClass.getName,
149+ )
150+ }
85151 setup.stashBuffer.stash(command)
86152 Behaviors .same
87153 case command : RaftProtocol .Replica =>
154+ if (context.log.isTraceEnabled) {
155+ context.log.trace(
156+ " [{}] Stashing Replica: index=[{}], term=[{}], entityId=[{}], eventType=[{}]" ,
157+ stateName,
158+ command.logEntry.index,
159+ command.logEntry.term.term,
160+ command.logEntry.event.entityId.map(_.raw),
161+ command.logEntry.event.event.getClass.getName,
162+ )
163+ }
88164 setup.stashBuffer.stash(command)
89165 Behaviors .same
90166 case command : RaftProtocol .TakeSnapshot =>
167+ if (context.log.isTraceEnabled) {
168+ setup.context.log.trace(
169+ " [{}] Stashing TakeSnapshot: index=[{}], entityId=[{}], replyTo=[{}]" ,
170+ stateName,
171+ command.metadata.logEntryIndex,
172+ command.metadata.entityId.raw,
173+ command.replyTo,
174+ )
175+ }
91176 setup.stashBuffer.stash(command)
92177 Behaviors .same
93178 case _ : RaftProtocol .Activate => Behaviors .unhandled
@@ -107,10 +192,26 @@ private[entityreplication] class Recovering[Command, Event, State](
107192 }
108193 val snapshotAppliedState =
109194 ReadyState (entityState, lastAppliedLogIndex)
195+ if (context.log.isTraceEnabled) {
196+ context.log.trace(
197+ " [{}] Recovering with initial state: index=[{}], stateType=[{}]" ,
198+ stateName,
199+ snapshotAppliedState.lastAppliedLogEntryIndex,
200+ snapshotAppliedState.entityState.getClass.getName,
201+ )
202+ }
110203 val eventAppliedState =
111204 command.events.foldLeft(snapshotAppliedState)((state, entry) =>
112205 state.applyEvent(setup, entry.event.event, entry.index),
113206 )
207+ if (context.log.isTraceEnabled) {
208+ context.log.trace(
209+ " [{}] Recovered with state: index=[{}], stateType=[{}]" ,
210+ stateName,
211+ eventAppliedState.lastAppliedLogEntryIndex,
212+ eventAppliedState.entityState.getClass.getName,
213+ )
214+ }
114215 Ready .behavior(setup, eventAppliedState)
115216 }
116217}
0 commit comments