diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index 7d66979afb25..5be957ecc2b8 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -71,6 +71,8 @@ public class ReactiveAdapterRegistry { private static final boolean MUTINY_PRESENT; + private static final boolean CONTEXT_PROPAGATION_PRESENT; + static { ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader(); REACTIVE_STREAMS_PRESENT = ClassUtils.isPresent("org.reactivestreams.Publisher", classLoader); @@ -78,6 +80,7 @@ public class ReactiveAdapterRegistry { RXJAVA_3_PRESENT = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader); COROUTINES_REACTOR_PRESENT = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader); MUTINY_PRESENT = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader); + CONTEXT_PROPAGATION_PRESENT = ClassUtils.isPresent("io.micrometer.context.ContextSnapshotFactory", classLoader); } private final List adapters = new ArrayList<>(); @@ -356,7 +359,9 @@ void registerAdapters(ReactiveAdapterRegistry registry) { registry.registerReactiveType( ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow), - source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow) source), + CONTEXT_PROPAGATION_PRESENT ? + source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow) source, new PropagationContextElement()) : + source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow) source), kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow); } } diff --git a/spring-core/src/test/kotlin/org/springframework/core/ReactiveAdapterRegistryKotlinTests.kt b/spring-core/src/test/kotlin/org/springframework/core/ReactiveAdapterRegistryKotlinTests.kt index 817fecbed5b1..540337db6580 100644 --- a/spring-core/src/test/kotlin/org/springframework/core/ReactiveAdapterRegistryKotlinTests.kt +++ b/spring-core/src/test/kotlin/org/springframework/core/ReactiveAdapterRegistryKotlinTests.kt @@ -16,13 +16,18 @@ package org.springframework.core +import io.micrometer.observation.Observation +import io.micrometer.observation.tck.TestObservationRegistry import kotlinx.coroutines.Deferred import kotlinx.coroutines.DelicateCoroutinesApi +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.async import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.reactivestreams.Publisher @@ -40,6 +45,8 @@ import kotlin.reflect.KClass @OptIn(DelicateCoroutinesApi::class) class ReactiveAdapterRegistryKotlinTests { + private val observationRegistry = TestObservationRegistry.create() + private val registry = ReactiveAdapterRegistry.getSharedInstance() @Test @@ -82,6 +89,23 @@ class ReactiveAdapterRegistryKotlinTests { assertThat((target as Flow<*>).toList()).contains(1, 2, 3) } + @Test + fun propagateMicrometerContextToFlow() { + val observation = Observation.createNotStarted("coroutine", observationRegistry) + observation.observe { + val source = flow { + val currentObservation = observationRegistry.currentObservation + assertThat(currentObservation).isNotNull + emit(currentObservation?.context?.name) + } + val target: Publisher = getAdapter(Flow::class).toPublisher(source) + val result = runBlocking(Dispatchers.IO) { + target.awaitSingle() + } + assertThat(result).isEqualTo("coroutine") + } + } + private fun getAdapter(reactiveType: KClass<*>): ReactiveAdapter { return this.registry.getAdapter(reactiveType.java)!! } diff --git a/spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/method/annotation/CoroutinesIntegrationTests.kt b/spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/method/annotation/CoroutinesIntegrationTests.kt index b932e5204b42..6850468bb22a 100644 --- a/spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/method/annotation/CoroutinesIntegrationTests.kt +++ b/spring-webflux/src/test/kotlin/org/springframework/web/reactive/result/method/annotation/CoroutinesIntegrationTests.kt @@ -16,18 +16,16 @@ package org.springframework.web.reactive.result.method.annotation -import kotlinx.coroutines.Deferred -import kotlinx.coroutines.DelicateCoroutinesApi -import kotlinx.coroutines.GlobalScope -import kotlinx.coroutines.async -import kotlinx.coroutines.delay +import io.micrometer.observation.ObservationRegistry +import io.micrometer.observation.tck.TestObservationRegistry +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType -import org.junit.jupiter.api.Assumptions.assumeFalse import org.springframework.context.ApplicationContext import org.springframework.context.annotation.AnnotationConfigApplicationContext +import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan import org.springframework.context.annotation.Configuration import org.springframework.http.HttpHeaders @@ -86,6 +84,15 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() { assertThat(entity.body).isEqualTo("foobar") } + @ParameterizedHttpServerTest + fun `Handler method returning Flow with observation`(httpServer: HttpServer) { + startServer(httpServer) + + val entity = performGet("/flow-observation", HttpHeaders.EMPTY, String::class.java) + assertThat(entity.statusCode).isEqualTo(HttpStatus.OK) + assertThat(entity.body).isEqualTo("http.server.requests") + } + @ParameterizedHttpServerTest fun `Suspending handler method returning Flow`(httpServer: HttpServer) { startServer(httpServer) @@ -135,11 +142,16 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() { @Configuration @EnableWebFlux @ComponentScan(resourcePattern = "**/CoroutinesIntegrationTests*") - open class WebConfig + open class WebConfig { + + @Bean + open fun observationRegistry(): ObservationRegistry = TestObservationRegistry.create() + + } @OptIn(DelicateCoroutinesApi::class) @RestController - class CoroutinesController { + class CoroutinesController(private val observationRegistry: ObservationRegistry) { @GetMapping("/suspend") suspend fun suspendingEndpoint(): String { @@ -167,6 +179,15 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() { delay(1) } + @GetMapping("/flow-observation") + fun flowObservationEndpoint(): Flow { + return flow { + val currentObservation = observationRegistry.currentObservation + assertThat(currentObservation).isNotNull + emit(currentObservation?.context?.name) + } + } + @GetMapping("/suspending-flow") suspend fun suspendingFlowEndpoint(): Flow { delay(1)