Skip to content

Commit c60260a

Browse files
committed
🐞fix: TestKit ignore Passivate messages when it handles replicated event
1 parent d1b8533 commit c60260a

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

src/main/scala/lerna/akka/entityreplication/typed/internal/testkit/ReplicatedEntityBehaviorTestKitImpl.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package lerna.akka.entityreplication.typed.internal.testkit
33
import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, SerializationTestKit }
44
import akka.actor.typed.{ ActorRef, Behavior }
55
import akka.actor.typed.scaladsl.adapter._
6+
import lerna.akka.entityreplication.ReplicationRegion
67
import lerna.akka.entityreplication.model.NormalizedEntityId
78
import lerna.akka.entityreplication.raft.RaftProtocol
89
import lerna.akka.entityreplication.raft.model.{ EntityEvent, NoOp, ReplicatedLog, Term }
@@ -11,6 +12,7 @@ import lerna.akka.entityreplication.typed.ClusterReplication.ShardCommand
1112
import lerna.akka.entityreplication.typed.{ ReplicatedEntityContext, ReplicatedEntityTypeKey }
1213
import lerna.akka.entityreplication.typed.testkit.ReplicatedEntityBehaviorTestKit
1314

15+
import scala.annotation.tailrec
1416
import scala.util.control.NonFatal
1517

1618
private[entityreplication] class ReplicatedEntityBehaviorTestKitImpl[Command, Event, State](
@@ -103,18 +105,22 @@ private[entityreplication] class ReplicatedEntityBehaviorTestKitImpl[Command, Ev
103105
}
104106

105107
private[this] def handleReplicatedEvent(): Option[Event] = {
106-
val shardCommand: Option[ShardCommand] =
107-
try {
108-
Option(shardProbe.receiveMessage())
109-
} catch {
110-
case NonFatal(_) => None // It wasn't replicated until the timeout
111-
}
112-
val replicateCommand =
108+
@tailrec
109+
def findReplicateCommand(): Option[RaftProtocol.Replicate] = {
110+
val shardCommand: Option[ShardCommand] =
111+
try {
112+
Option(shardProbe.receiveMessage())
113+
} catch {
114+
case NonFatal(_) => None // It wasn't replicated until the timeout
115+
}
113116
shardCommand match {
114-
case Some(r: RaftProtocol.Replicate) => Option(r)
115-
case Some(m) => throw new IllegalStateException(s"Shard received unexpected message [$m]")
116-
case None => None
117+
case Some(r: RaftProtocol.Replicate) => Option(r)
118+
case Some(r: ReplicationRegion.Passivate) => findReplicateCommand() // retry
119+
case Some(m) => throw new IllegalStateException(s"Shard received unexpected message [$m]")
120+
case None => None
117121
}
122+
}
123+
val replicateCommand = findReplicateCommand()
118124
replicateCommand.foreach { replicate =>
119125
// side effects
120126
replicatedLog = replicatedLog.append(EntityEvent(Option(normalizedEntityId), replicate.event), term)

0 commit comments

Comments
 (0)