11package lerna .akka .entityreplication .typed
22
33import akka .actor .testkit .typed .scaladsl .ActorTestKit
4- import akka .actor .typed .ActorRef
4+ import akka .actor .typed .{ ActorRef , ActorSystem }
55import akka .actor .typed .receptionist .{ Receptionist , ServiceKey }
66import akka .actor .typed .scaladsl .Behaviors
77import akka .actor .typed .scaladsl .adapter ._
88import akka .remote .testconductor .RoleName
99import akka .remote .testkit .{ MultiNodeConfig , MultiNodeSpec }
1010import com .typesafe .config .{ ConfigFactory , ConfigValueFactory }
1111import lerna .akka .entityreplication .raft .routing .MemberIndex
12+ import lerna .akka .entityreplication .util .AtLeastOnceComplete
1213import lerna .akka .entityreplication .{ STMultiNodeSerializable , STMultiNodeSpec }
1314
15+ import java .util .UUID
1416import java .util .concurrent .atomic .AtomicInteger
1517import scala .jdk .CollectionConverters ._
1618
@@ -53,7 +55,7 @@ class ReplicatedEntityMultiNodeSpec extends MultiNodeSpec(ReplicatedEntityMultiN
5355 import ReplicatedEntityMultiNodeSpec ._
5456 import ReplicatedEntityMultiNodeSpecConfig ._
5557
56- private [ this ] val typedSystem = system.toTyped
58+ private implicit val typedSystem : ActorSystem [_] = system.toTyped
5759
5860 private [this ] val clusterReplication = ClusterReplication (typedSystem)
5961
@@ -90,28 +92,30 @@ class ReplicatedEntityMultiNodeSpec extends MultiNodeSpec(ReplicatedEntityMultiN
9092 val probe = actorTestKit.createTestProbe[Receptionist .Listing ]()
9193
9294 runOn(node1) {
93- val entity = clusterReplication.entityRefFor(PingPongEntity .typeKey, createSeqEntityId())
94- val replyTo = actorTestKit.createTestProbe[PingPongEntity .Pong ]()
95+ val entity = clusterReplication.entityRefFor(PingPongEntity .typeKey, createSeqEntityId())
9596
96- entity ! PingPongEntity .Ping (replyTo.ref)
97- replyTo.receiveMessage().count should be(1 )
98- entity ! PingPongEntity .Ping (replyTo.ref)
99- replyTo.receiveMessage().count should be(2 )
97+ def ping (requestId : String ): PingPongEntity .Pong = {
98+ AtLeastOnceComplete
99+ .askTo(entity, PingPongEntity .Ping (_, requestId), retryInterval = remainingOrDefault / 5 ).await
100+ }
101+
102+ ping(createUniqueRequestId()).count should be(1 )
103+ ping(createUniqueRequestId()).count should be(2 )
100104 }
101105 runOn(node1, node2, node3) {
102106 // find entity
103107 var actor : ActorRef [PingPongEntity .Command ] = null
104108 awaitAssert {
105109 typedSystem.receptionist ! Receptionist .Subscribe (PingPongEntity .serviceKey, probe.ref)
106- val listing = probe.receiveMessage()
110+ val listing = probe.receiveMessage(max = remainingOrDefault / 5 )
107111 actor = listing.serviceInstances(PingPongEntity .serviceKey).head
108112 typedSystem.receptionist ! Receptionist .Deregister (PingPongEntity .serviceKey, actor)
109113 }
110114 // check state
111115 val stateReceiver = actorTestKit.createTestProbe[PingPongEntity .State ]()
112116 awaitAssert {
113117 actor ! PingPongEntity .UnsafeGetState (stateReceiver.ref)
114- stateReceiver.receiveMessage().count should be(2 )
118+ stateReceiver.receiveMessage(max = remainingOrDefault / 5 ).count should be(2 )
115119 }
116120 }
117121 }
@@ -120,18 +124,17 @@ class ReplicatedEntityMultiNodeSpec extends MultiNodeSpec(ReplicatedEntityMultiN
120124 clusterReplication.init(PingPongEntity ())
121125
122126 runOn(node1) {
123- val entity = clusterReplication.entityRefFor(PingPongEntity .typeKey, createSeqEntityId())
124- val replyTo = actorTestKit.createTestProbe[PingPongEntity .Pong ]()
127+ val entity = clusterReplication.entityRefFor(PingPongEntity .typeKey, createSeqEntityId())
125128
126- entity ! PingPongEntity .Ping (replyTo.ref)
127- replyTo.receiveMessage().count should be(1 )
128- entity ! PingPongEntity .Ping (replyTo.ref)
129- replyTo.receiveMessage().count should be(2 )
130- entity ! PingPongEntity .Break ()
131- awaitAssert {
132- entity ! PingPongEntity .Ping (replyTo.ref)
133- replyTo.receiveMessage(max = remainingOrDefault / 5 ).count should be(3 )
129+ def ping (requestId : String ): PingPongEntity .Pong = {
130+ AtLeastOnceComplete
131+ .askTo(entity, PingPongEntity .Ping (_, requestId), retryInterval = remainingOrDefault / 5 ).await
134132 }
133+
134+ ping(createUniqueRequestId()).count should be(1 )
135+ ping(createUniqueRequestId()).count should be(2 )
136+ entity ! PingPongEntity .Break ()
137+ ping(createUniqueRequestId()).count should be(3 )
135138 }
136139 }
137140
@@ -147,7 +150,7 @@ class ReplicatedEntityMultiNodeSpec extends MultiNodeSpec(ReplicatedEntityMultiN
147150 runOn(node1, node2, node3) {
148151 awaitAssert {
149152 typedSystem.receptionist ! Receptionist .Subscribe (EphemeralEntity .serviceKey, probe.ref)
150- val listing = probe.receiveMessage()
153+ val listing = probe.receiveMessage(max = remainingOrDefault / 5 )
151154 actor = listing.serviceInstances(EphemeralEntity .serviceKey).head
152155 typedSystem.receptionist ! Receptionist .Deregister (EphemeralEntity .serviceKey, actor)
153156 }
@@ -182,7 +185,7 @@ class ReplicatedEntityMultiNodeSpec extends MultiNodeSpec(ReplicatedEntityMultiN
182185 val subscriber = actorTestKit.createTestProbe[Receptionist .Listing ]()
183186 awaitAssert {
184187 typedSystem.receptionist ! Receptionist .Subscribe (EphemeralEntity .serviceKey, subscriber.ref)
185- val listing = subscriber.receiveMessage()
188+ val listing = subscriber.receiveMessage(max = remainingOrDefault / 5 )
186189 actor = listing.serviceInstances(EphemeralEntity .serviceKey).head
187190 typedSystem.receptionist ! Receptionist .Deregister (EphemeralEntity .serviceKey, actor)
188191 }
@@ -204,6 +207,8 @@ class ReplicatedEntityMultiNodeSpec extends MultiNodeSpec(ReplicatedEntityMultiN
204207
205208 private [this ] val idGenerator = new AtomicInteger (0 )
206209 private [this ] def createSeqEntityId (): String = s " replication- ${idGenerator.incrementAndGet()}"
210+
211+ private def createUniqueRequestId (): String = UUID .randomUUID().toString
207212}
208213
209214object ReplicatedEntityMultiNodeSpec {
@@ -212,23 +217,27 @@ object ReplicatedEntityMultiNodeSpec {
212217 val typeKey : ReplicatedEntityTypeKey [Command ] = ReplicatedEntityTypeKey (" PingPong" )
213218 val serviceKey : ServiceKey [Command ] = ServiceKey [Command ](" PingPongService" )
214219
215- sealed trait Command extends STMultiNodeSerializable
216- final case class Ping (replyTo : ActorRef [Pong ]) extends Command
217- final case class Pong (count : Int ) extends STMultiNodeSerializable
218- final case class Break () extends Command
219- final case class UnsafeGetState (replyTo : ActorRef [State ]) extends Command
220+ sealed trait Command extends STMultiNodeSerializable
221+ final case class Ping (replyTo : ActorRef [Pong ], requestId : String ) extends Command
222+ final case class Pong (count : Int ) extends STMultiNodeSerializable
223+ final case class Break () extends Command
224+ final case class UnsafeGetState (replyTo : ActorRef [State ]) extends Command
220225
221- sealed trait Event extends STMultiNodeSerializable
222- final case class CountUp () extends Event
226+ sealed trait Event extends STMultiNodeSerializable
227+ final case class CountUp (requestId : String ) extends Event
223228
224- final case class State (count : Int ) extends STMultiNodeSerializable {
229+ final case class State (count : Int , processedRequests : Set [ String ] ) extends STMultiNodeSerializable {
225230
226231 def onMessage (message : Command ): Effect [Event , State ] =
227232 message match {
228- case Ping (replyTo) =>
229- Effect
230- .replicate(CountUp ())
231- .thenReply(replyTo)(s => Pong (s.count))
233+ case Ping (replyTo, requestId) =>
234+ if (processedRequests.contains(requestId)) {
235+ Effect .reply(replyTo)(Pong (count))
236+ } else {
237+ Effect
238+ .replicate(CountUp (requestId))
239+ .thenReply(replyTo)(s => Pong (s.count))
240+ }
232241 case Break () =>
233242 Effect
234243 .none[Event , State ]
@@ -243,7 +252,7 @@ object ReplicatedEntityMultiNodeSpec {
243252
244253 def applyEvent (event : Event ): State =
245254 event match {
246- case _ => copy(count = count + 1 )
255+ case CountUp (requestId) => copy(count = count + 1 , processedRequests = processedRequests + requestId )
247256 }
248257 }
249258
@@ -253,7 +262,7 @@ object ReplicatedEntityMultiNodeSpec {
253262 context.system.receptionist ! Receptionist .Register (serviceKey, context.self)
254263 ReplicatedEntityBehavior [Command , Event , State ](
255264 entityContext = entityContext,
256- emptyState = State (count = 0 ),
265+ emptyState = State (count = 0 , processedRequests = Set .empty ),
257266 commandHandler = _ onMessage _,
258267 eventHandler = _ applyEvent _,
259268 )
0 commit comments