Skip to content

Commit ea74936

Browse files
authored
Merge pull request #100 from lerna-stack/fix-crashing-replicated-entity-behavior-testkit
Fix TestKit throws "Shard received unexpected message" after the entity passivated
2 parents 6e9abc5 + 09d05b7 commit ea74936

File tree

3 files changed

+58
-18
lines changed

3 files changed

+58
-18
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1313
This change will show you deserialization warnings during the rolling update, it's safe to ignore.
1414
For more details, see [Akka 2.6.16 release note](https://akka.io/blog/news/2021/08/19/akka-2.6.16-released#rolling-upgrades)
1515

16+
### Fixed
17+
- TestKit throws "Shard received unexpected message" exception after the entity passivated [PR#100](https://github.com/lerna-stack/akka-entity-replication/pull/100)
18+
1619
## [v2.0.0] - 2021-07-16
1720
[v2.0.0]: https://github.com/lerna-stack/akka-entity-replication/compare/v1.0.0...v2.0.0
1821

src/main/scala/lerna/akka/entityreplication/typed/internal/testkit/ReplicatedEntityBehaviorTestKitImpl.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package lerna.akka.entityreplication.typed.internal.testkit
33
import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, SerializationTestKit }
44
import akka.actor.typed.{ ActorRef, Behavior }
55
import akka.actor.typed.scaladsl.adapter._
6+
import lerna.akka.entityreplication.ReplicationRegion
67
import lerna.akka.entityreplication.model.NormalizedEntityId
78
import lerna.akka.entityreplication.raft.RaftProtocol
89
import lerna.akka.entityreplication.raft.model.{ EntityEvent, NoOp, ReplicatedLog, Term }
@@ -11,6 +12,7 @@ import lerna.akka.entityreplication.typed.ClusterReplication.ShardCommand
1112
import lerna.akka.entityreplication.typed.{ ReplicatedEntityContext, ReplicatedEntityTypeKey }
1213
import lerna.akka.entityreplication.typed.testkit.ReplicatedEntityBehaviorTestKit
1314

15+
import scala.annotation.tailrec
1416
import scala.util.control.NonFatal
1517

1618
private[entityreplication] class ReplicatedEntityBehaviorTestKitImpl[Command, Event, State](
@@ -103,18 +105,22 @@ private[entityreplication] class ReplicatedEntityBehaviorTestKitImpl[Command, Ev
103105
}
104106

105107
private[this] def handleReplicatedEvent(): Option[Event] = {
106-
val shardCommand: Option[ShardCommand] =
107-
try {
108-
Option(shardProbe.receiveMessage())
109-
} catch {
110-
case NonFatal(_) => None // It wasn't replicated until the timeout
111-
}
112-
val replicateCommand =
108+
@tailrec
109+
def findReplicateCommand(): Option[RaftProtocol.Replicate] = {
110+
val shardCommand: Option[ShardCommand] =
111+
try {
112+
Option(shardProbe.receiveMessage())
113+
} catch {
114+
case NonFatal(_) => None // It wasn't replicated until the timeout
115+
}
113116
shardCommand match {
114-
case Some(r: RaftProtocol.Replicate) => Option(r)
115-
case Some(m) => throw new IllegalStateException(s"Shard received unexpected message [$m]")
116-
case None => None
117+
case Some(r: RaftProtocol.Replicate) => Option(r)
118+
case Some(r: ReplicationRegion.Passivate) => findReplicateCommand() // retry
119+
case Some(m) => throw new IllegalStateException(s"Shard received unexpected message [$m]")
120+
case None => None
117121
}
122+
}
123+
val replicateCommand = findReplicateCommand()
118124
replicateCommand.foreach { replicate =>
119125
// side effects
120126
replicatedLog = replicatedLog.append(EntityEvent(Option(normalizedEntityId), replicate.event), term)

src/test/scala/lerna/akka/entityreplication/typed/testkit/ReplicatedEntityBehaviorTestKitSpec.scala

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,17 @@ class ReplicatedEntityBehaviorTestKitSpec extends FlatSpec with Matchers with Be
7272

7373
behavior of "ReplicatedEntityBehaviorTestKit"
7474

75+
it should "still work after passivation which is triggered by handling message with 'tell' pattern" in {
76+
// check no exception raised
77+
replicatedEntityTestKit.runCommand(TellAndPassivate())
78+
replicatedEntityTestKit.runCommand(TellAndReplicateAEventAndPassivate(inc = 10))
79+
replicatedEntityTestKit.runCommand(TellAndReplicateAEvent(inc = 10))
80+
}
81+
it should "still work after passivation which is triggered by handling message with 'ask' pattern" in {
82+
// check no exception raised
83+
replicatedEntityTestKit.runCommand(AskAndReplicateAEventAndPassivate(inc = 10, _))
84+
replicatedEntityTestKit.runCommand(AskAndReplicateAEvent(inc = 10, _))
85+
}
7586
it should "report the command that could not be serialized" in {
7687
val ex = intercept[AssertionError] {
7788
replicatedEntityTestKit.runCommand(FailCommandSerialization())
@@ -237,14 +248,19 @@ object ReplicatedEntityBehaviorTestKitSpec {
237248

238249
sealed trait Command
239250
final case class AskAndReplicateAEvent(inc: Int, replyTo: ActorRef[Reply]) extends Command with KryoSerializable
240-
final case class AskAndNoReply(replyTo: ActorRef[Reply]) extends Command with KryoSerializable
241-
final case class AskAndNoEvents(replyTo: ActorRef[Reply]) extends Command with KryoSerializable
242-
final case class TellAndNoEvents() extends Command with KryoSerializable
243-
final case class TellAndReplicateAEvent(inc: Int) extends Command with KryoSerializable
244-
final case class FailCommandSerialization() extends Command
245-
final case class FailReplySerialization(replyTo: ActorRef[Reply]) extends Command with KryoSerializable
246-
final case class FailStateSerialization() extends Command with KryoSerializable
247-
final case class FailEventSerialization() extends Command with KryoSerializable
251+
final case class AskAndReplicateAEventAndPassivate(inc: Int, replyTo: ActorRef[Reply])
252+
extends Command
253+
with KryoSerializable
254+
final case class AskAndNoReply(replyTo: ActorRef[Reply]) extends Command with KryoSerializable
255+
final case class AskAndNoEvents(replyTo: ActorRef[Reply]) extends Command with KryoSerializable
256+
final case class TellAndNoEvents() extends Command with KryoSerializable
257+
final case class TellAndPassivate() extends Command with KryoSerializable
258+
final case class TellAndReplicateAEvent(inc: Int) extends Command with KryoSerializable
259+
final case class TellAndReplicateAEventAndPassivate(inc: Int) extends Command with KryoSerializable
260+
final case class FailCommandSerialization() extends Command
261+
final case class FailReplySerialization(replyTo: ActorRef[Reply]) extends Command with KryoSerializable
262+
final case class FailStateSerialization() extends Command with KryoSerializable
263+
final case class FailEventSerialization() extends Command with KryoSerializable
248264

249265
trait Reply
250266
final case class SerializableReply(inc: Int, total: Int) extends Reply with KryoSerializable
@@ -277,6 +293,15 @@ object ReplicatedEntityBehaviorTestKitSpec {
277293
}
278294
.thenReply(replyTo)(state => SerializableReply(inc, state.total))
279295

296+
case AskAndReplicateAEventAndPassivate(inc, replyTo) =>
297+
Effect
298+
.replicate(SerializableEvent(inc))
299+
.thenRun { _: State =>
300+
context.log.info("complete depositing")
301+
}
302+
.thenPassivate()
303+
.thenReply(replyTo)(state => SerializableReply(inc, state.total))
304+
280305
case AskAndNoReply(_) =>
281306
Effect.noReply
282307

@@ -289,9 +314,15 @@ object ReplicatedEntityBehaviorTestKitSpec {
289314
context.log.info("receive a command")
290315
}.thenNoReply()
291316

317+
case TellAndPassivate() =>
318+
Effect.passivate().thenNoReply()
319+
292320
case TellAndReplicateAEvent(inc: Int) =>
293321
Effect.replicate(SerializableEvent(inc)).thenNoReply()
294322

323+
case TellAndReplicateAEventAndPassivate(inc: Int) =>
324+
Effect.replicate(SerializableEvent(inc)).thenPassivate().thenNoReply()
325+
295326
case FailCommandSerialization() =>
296327
Effect.noReply
297328

0 commit comments

Comments
 (0)