diff --git a/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala b/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala index 6f731d41eb..fff897eddf 100644 --- a/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/TopicSuite.scala @@ -185,4 +185,39 @@ class TopicSuite extends Fs2Suite { TestControl.executeEmbed(program) // will fail if program is deadlocked } + + // https://github.com/typelevel/fs2/issues/3642 + test("subscribe and close concurrently".flaky) { + val check: IO[Unit] = + for { + t <- Topic[IO, Int] + fiber <- t + .subscribe(maxQueued = 1) + .compile + .toList + .start // let the subscription race with closing + _ <- t.close + _ <- fiber.join.timeout(5.seconds) // checking termination of the subscription stream + } yield () + + check.replicateA_(100000) + } + + // https://github.com/typelevel/fs2/issues/3642 + test("subscribeAwait and close concurrently".flaky) { + val check: IO[Unit] = + for { + t <- Topic[IO, Int] + fiber <- Stream + .resource(t.subscribeAwait(maxQueued = 1)) + .flatten + .compile + .toList + .start // let the subscription race with closing + _ <- t.close + _ <- fiber.join.timeout(5.seconds) // checking termination of the subscription stream + } yield () + + check.replicateA_(100000) + } }