File tree Expand file tree Collapse file tree 1 file changed +30
-0
lines changed
core/shared/src/test/scala/fs2/concurrent Expand file tree Collapse file tree 1 file changed +30
-0
lines changed Original file line number Diff line number Diff line change @@ -185,4 +185,34 @@ class TopicSuite extends Fs2Suite {
185185
186186 TestControl .executeEmbed(program) // will fail if program is deadlocked
187187 }
188+
189+ // https://github.com/typelevel/fs2/issues/3646
190+ test(
191+ " all subscribers should receive messages in the same order, even on concurrent publishers" .flaky
192+ ) {
193+ val nSubscribers = 100
194+ val check =
195+ Topic [IO , String ]
196+ .flatMap { t =>
197+ t.subscribeAwaitUnbounded.replicateA(nSubscribers).use { subs =>
198+ IO .both(t.publish1(" foo" ), t.publish1(" bar" )) // racing two publishers
199+ .flatMap {
200+ case (Right (()), Right (())) =>
201+ subs
202+ .traverse(s => s.take(2 ).compile.toList)
203+ .map {
204+ case xs :: xss =>
205+ // all subscriptions must have received the events in the same order
206+ xss.foreach(ys => assertEquals(ys, xs))
207+ case Nil =>
208+ fail(s " Impossible, there are $nSubscribers subscribers " )
209+ }
210+ case _ =>
211+ fail(" There's no reason for either publish1 to reject the publication" )
212+ }
213+ }
214+ }
215+
216+ check.replicateA(10000 )
217+ }
188218}
You can’t perform that action at this time.
0 commit comments