@ -47,7 +47,7 @@ import org.springframework.util.ReflectionUtils;
*
*
* < p > By default , depending on classpath availability , adapters are registered
* < p > By default , depending on classpath availability , adapters are registered
* for Reactor , RxJava 1 , RxJava 2 types , { @link CompletableFuture } , Java 9 +
* for Reactor , RxJava 1 , RxJava 2 types , { @link CompletableFuture } , Java 9 +
* { @code Flow . Publisher } and Kotlin Coroutines { @code Deferred } .
* { @code Flow . Publisher } and Kotlin Coroutines { @code Deferred } and { @code Flow } .
*
*
* @author Rossen Stoyanchev
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @author Sebastien Deleuze
@ -97,13 +97,9 @@ public class ReactiveAdapterRegistry {
// We can fall back on "reactive-streams-flow-bridge" (once released)
// We can fall back on "reactive-streams-flow-bridge" (once released)
// Coroutines
// Coroutines
if ( this . reactorPresent & & ClassUtils . isPresent ( "kotlinx.coroutines.Deferred " , classLoader ) ) {
if ( this . reactorPresent & & ClassUtils . isPresent ( "kotlinx.coroutines.reactor.MonoKt" , classLoader ) & & ClassUtils . isPresent ( "kotlinx.coroutines.reactive.flow.PublisherAsFlowKt " , classLoader ) ) {
new CoroutinesRegistrar ( ) . registerAdapters ( this ) ;
new CoroutinesRegistrar ( ) . registerAdapters ( this ) ;
}
}
// TODO Use a single CoroutinesRegistrar when Flow will be not experimental anymore
if ( this . reactorPresent & & ClassUtils . isPresent ( "kotlinx.coroutines.flow.Flow" , classLoader ) ) {
new CoroutinesFlowRegistrar ( ) . registerAdapters ( this ) ;
}
}
}
@ -353,13 +349,7 @@ public class ReactiveAdapterRegistry {
( ) - > CompletableDeferredKt . CompletableDeferred ( null ) ) ,
( ) - > CompletableDeferredKt . CompletableDeferred ( null ) ) ,
source - > CoroutinesUtils . deferredToMono ( ( Deferred < ? > ) source ) ,
source - > CoroutinesUtils . deferredToMono ( ( Deferred < ? > ) source ) ,
source - > CoroutinesUtils . monoToDeferred ( Mono . from ( source ) ) ) ;
source - > CoroutinesUtils . monoToDeferred ( Mono . from ( source ) ) ) ;
}
}
private static class CoroutinesFlowRegistrar {
void registerAdapters ( ReactiveAdapterRegistry registry ) {
registry . registerReactiveType (
registry . registerReactiveType (
ReactiveTypeDescriptor . multiValue ( kotlinx . coroutines . flow . Flow . class , FlowKt : : emptyFlow ) ,
ReactiveTypeDescriptor . multiValue ( kotlinx . coroutines . flow . Flow . class , FlowKt : : emptyFlow ) ,
source - > FlowAsPublisherKt . from ( ( kotlinx . coroutines . flow . Flow < ? > ) source ) ,
source - > FlowAsPublisherKt . from ( ( kotlinx . coroutines . flow . Flow < ? > ) source ) ,
@ -367,5 +357,4 @@ public class ReactiveAdapterRegistry {
) ;
) ;
}
}
}
}
}
}