Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,16 @@ public class ReactiveAdapterRegistry {

private static final boolean MUTINY_PRESENT;

private static final boolean CONTEXT_PROPAGATION_PRESENT;

static {
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
REACTIVE_STREAMS_PRESENT = ClassUtils.isPresent("org.reactivestreams.Publisher", classLoader);
REACTOR_PRESENT = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader);
RXJAVA_3_PRESENT = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
COROUTINES_REACTOR_PRESENT = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
MUTINY_PRESENT = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader);
CONTEXT_PROPAGATION_PRESENT = ClassUtils.isPresent("io.micrometer.context.ContextSnapshotFactory", classLoader);
}

private final List<ReactiveAdapter> adapters = new ArrayList<>();
Expand Down Expand Up @@ -356,7 +359,9 @@ void registerAdapters(ReactiveAdapterRegistry registry) {

registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(kotlinx.coroutines.flow.Flow.class, kotlinx.coroutines.flow.FlowKt::emptyFlow),
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source),
CONTEXT_PROPAGATION_PRESENT ?
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source, new PropagationContextElement()) :
source -> kotlinx.coroutines.reactor.ReactorFlowKt.asFlux((kotlinx.coroutines.flow.Flow<?>) source),
kotlinx.coroutines.reactive.ReactiveFlowKt::asFlow);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,18 @@

package org.springframework.core

import io.micrometer.observation.Observation
import io.micrometer.observation.tck.TestObservationRegistry
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.runBlocking
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.reactivestreams.Publisher
Expand All @@ -40,6 +45,8 @@ import kotlin.reflect.KClass
@OptIn(DelicateCoroutinesApi::class)
class ReactiveAdapterRegistryKotlinTests {

private val observationRegistry = TestObservationRegistry.create()

private val registry = ReactiveAdapterRegistry.getSharedInstance()

@Test
Expand Down Expand Up @@ -82,6 +89,23 @@ class ReactiveAdapterRegistryKotlinTests {
assertThat((target as Flow<*>).toList()).contains(1, 2, 3)
}

@Test
fun propagateMicrometerContextToFlow() {
val observation = Observation.createNotStarted("coroutine", observationRegistry)
observation.observe {
val source = flow {
val currentObservation = observationRegistry.currentObservation
assertThat(currentObservation).isNotNull
emit(currentObservation?.context?.name)
}
val target: Publisher<String> = getAdapter(Flow::class).toPublisher(source)
val result = runBlocking(Dispatchers.IO) {
target.awaitSingle()
}
assertThat(result).isEqualTo("coroutine")
}
}

private fun getAdapter(reactiveType: KClass<*>): ReactiveAdapter {
return this.registry.getAdapter(reactiveType.java)!!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@

package org.springframework.web.reactive.result.method.annotation

import kotlinx.coroutines.Deferred
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import io.micrometer.observation.ObservationRegistry
import io.micrometer.observation.tck.TestObservationRegistry
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.junit.jupiter.api.Assumptions.assumeFalse
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.http.HttpHeaders
Expand Down Expand Up @@ -86,6 +84,15 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
assertThat(entity.body).isEqualTo("foobar")
}

@ParameterizedHttpServerTest
fun `Handler method returning Flow with observation`(httpServer: HttpServer) {
startServer(httpServer)

val entity = performGet("/flow-observation", HttpHeaders.EMPTY, String::class.java)
assertThat(entity.statusCode).isEqualTo(HttpStatus.OK)
assertThat(entity.body).isEqualTo("http.server.requests")
}

@ParameterizedHttpServerTest
fun `Suspending handler method returning Flow`(httpServer: HttpServer) {
startServer(httpServer)
Expand Down Expand Up @@ -135,11 +142,16 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
@Configuration
@EnableWebFlux
@ComponentScan(resourcePattern = "**/CoroutinesIntegrationTests*")
open class WebConfig
open class WebConfig {

@Bean
open fun observationRegistry(): ObservationRegistry = TestObservationRegistry.create()

}

@OptIn(DelicateCoroutinesApi::class)
@RestController
class CoroutinesController {
class CoroutinesController(private val observationRegistry: ObservationRegistry) {

@GetMapping("/suspend")
suspend fun suspendingEndpoint(): String {
Expand Down Expand Up @@ -167,6 +179,15 @@ class CoroutinesIntegrationTests : AbstractRequestMappingIntegrationTests() {
delay(1)
}

@GetMapping("/flow-observation")
fun flowObservationEndpoint(): Flow<String?> {
return flow {
val currentObservation = observationRegistry.currentObservation
assertThat(currentObservation).isNotNull
emit(currentObservation?.context?.name)
}
}

@GetMapping("/suspending-flow")
suspend fun suspendingFlowEndpoint(): Flow<String> {
delay(1)
Expand Down