@@ -49,6 +49,8 @@ import java.util.concurrent.TimeUnit
4949import kotlin.test.assertEquals
5050import kotlin.time.Duration.Companion.seconds
5151
52+ private val testIterations: Int =
53+ System .getProperties().getProperty(" io.github.nomisrev.kafka.TEST_ITERATIONS" )?.toIntOrNull() ? : 1
5254
5355@TestInstance(TestInstance .Lifecycle .PER_CLASS )
5456abstract class KafkaSpec {
@@ -62,7 +64,7 @@ abstract class KafkaSpec {
6264 fun destroy () {
6365 kafka.stop()
6466 }
65-
67+
6668 @BeforeAll
6769 @JvmStatic
6870 fun setup () {
@@ -85,10 +87,10 @@ abstract class KafkaSpec {
8587 withEnv(" KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND" , " true" )
8688 withReuse(true )
8789 }
88-
90+
8991 fun KafkaReceiver (): KafkaReceiver <String , String > =
9092 KafkaReceiver (receiverSetting())
91-
93+
9294 fun receiverSetting (): ReceiverSettings <String , String > =
9395 ReceiverSettings (
9496 bootstrapServers = kafka.bootstrapServers,
@@ -168,15 +170,17 @@ abstract class KafkaSpec {
168170 partitions : Int = 4,
169171 replicationFactor : Short = 1,
170172 test : suspend TopicTestScope .(NewTopic ) -> Unit
171- ): Unit = runTest {
172- val topic = NewTopic (nextTopicName(), partitions, replicationFactor).configs(topicConfig)
173- admin {
174- createTopic(topic)
175- try {
176- TopicTestScope (topic, this @runTest).test(topic)
177- } finally {
178- topic.shouldBeEmpty()
179- deleteTopic(topic.name())
173+ ): Unit = repeat(testIterations) {
174+ runTest {
175+ val topic = NewTopic (nextTopicName(), partitions, replicationFactor).configs(topicConfig)
176+ admin {
177+ createTopic(topic)
178+ try {
179+ TopicTestScope (topic, this @runTest).test(topic)
180+ } finally {
181+ topic.shouldBeEmpty()
182+ deleteTopic(topic.name())
183+ }
180184 }
181185 }
182186 }
@@ -295,6 +299,9 @@ abstract class KafkaSpec {
295299 {
296300 val producer = KafkaProducer (it.properties(), it.keySerializer, it.valueSerializer)
297301 object : Producer <String , String > {
302+ override fun clientInstanceId (p0 : Duration ? ): Uuid =
303+ producer.clientInstanceId(p0)
304+
298305 override fun close () {}
299306
300307 override fun close (timeout : Duration ? ) {}
0 commit comments