diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java index 6c1572aaa8..92eba2b58c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java @@ -114,20 +114,30 @@ public interface RSocketStrategies { */ ReactiveAdapterRegistry reactiveAdapterRegistry(); + /** + * Return a builder to create a new {@link RSocketStrategies} instance + * replicated from the current instance. + */ + default Builder mutate() { + return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(this); + } + /** - * Return a builder to build a new {@code RSocketStrategies} instance. + * Create an {@code RSocketStrategies} instance with default settings. + * Equivalent to {@code RSocketStrategies.builder().build()}. */ - static Builder builder() { - return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(); + static RSocketStrategies create() { + return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder().build(); } /** - * Return a builder to create a new {@link RSocketStrategies} instance - * replicated from the current instance. + * Return a builder to build a new {@code RSocketStrategies} instance. + * The builder applies default settings, see individual builder methods for + * details. */ - default Builder mutate() { - return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(this); + static Builder builder() { + return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java index d95218707d..76be98cef1 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java @@ -26,6 +26,7 @@ import io.netty.util.ReferenceCounted; import io.rsocket.AbstractRSocket; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.plugins.RSocketInterceptor; import io.rsocket.transport.netty.server.CloseableChannel; @@ -44,8 +45,6 @@ import reactor.test.StepVerifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.codec.CharSequenceEncoder; -import org.springframework.core.codec.StringDecoder; import org.springframework.core.io.Resource; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; @@ -74,21 +73,21 @@ public class RSocketBufferLeakTests { @BeforeClass @SuppressWarnings("ConstantConditions") public static void setupOnce() { + context = new AnnotationConfigApplicationContext(ServerConfig.class); + RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); + SocketAcceptor responder = messageHandler.serverResponder(); server = RSocketFactory.receive() .frameDecoder(PayloadDecoder.ZERO_COPY) .addResponderPlugin(payloadInterceptor) // intercept responding - .acceptor(context.getBean(RSocketMessageHandler.class).serverResponder()) + .acceptor(responder) .transport(TcpServerTransport.create("localhost", 7000)) .start() .block(); requester = RSocketRequester.builder() - .rsocketFactory(factory -> { - factory.frameDecoder(PayloadDecoder.ZERO_COPY); - factory.addRequesterPlugin(payloadInterceptor); // intercept outgoing requests - }) + .rsocketFactory(factory -> factory.addRequesterPlugin(payloadInterceptor)) .rsocketStrategies(context.getBean(RSocketStrategies.class)) .connectTcp("localhost", 7000) .block(); @@ -215,8 +214,6 @@ public class RSocketBufferLeakTests { @Bean public RSocketStrategies rsocketStrategies() { return RSocketStrategies.builder() - .decoder(StringDecoder.allMimeTypes()) - .encoder(CharSequenceEncoder.allMimeTypes()) .dataBufferFactory(new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)) .build(); } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index e6e4e1f2d6..58f807d6c6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket; import java.time.Duration; -import io.netty.buffer.PooledByteBufAllocator; import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; @@ -34,9 +34,6 @@ import reactor.test.StepVerifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.codec.CharSequenceEncoder; -import org.springframework.core.codec.StringDecoder; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.messaging.handler.annotation.MessageExceptionHandler; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; @@ -63,18 +60,20 @@ public class RSocketClientToServerIntegrationTests { @BeforeClass @SuppressWarnings("ConstantConditions") public static void setupOnce() { + context = new AnnotationConfigApplicationContext(ServerConfig.class); + RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); + SocketAcceptor responder = messageHandler.serverResponder(); server = RSocketFactory.receive() .addResponderPlugin(interceptor) .frameDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(context.getBean(RSocketMessageHandler.class).serverResponder()) + .acceptor(responder) .transport(TcpServerTransport.create("localhost", 7000)) .start() .block(); requester = RSocketRequester.builder() - .rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY)) .rsocketStrategies(context.getBean(RSocketStrategies.class)) .connectTcp("localhost", 7000) .block(); @@ -266,11 +265,7 @@ public class RSocketClientToServerIntegrationTests { @Bean public RSocketStrategies rsocketStrategies() { - return RSocketStrategies.builder() - .decoder(StringDecoder.allMimeTypes()) - .encoder(CharSequenceEncoder.allMimeTypes()) - .dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)) - .build(); + return RSocketStrategies.create(); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index db66151000..c153112436 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket; import java.time.Duration; -import io.netty.buffer.PooledByteBufAllocator; import io.rsocket.RSocketFactory; +import io.rsocket.SocketAcceptor; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; @@ -37,9 +37,6 @@ import reactor.test.StepVerifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.codec.CharSequenceEncoder; -import org.springframework.core.codec.StringDecoder; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.rsocket.annotation.ConnectMapping; import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer; @@ -62,11 +59,14 @@ public class RSocketServerToClientIntegrationTests { @BeforeClass @SuppressWarnings("ConstantConditions") public static void setupOnce() { + context = new AnnotationConfigApplicationContext(RSocketConfig.class); + RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class); + SocketAcceptor responder = messageHandler.serverResponder(); server = RSocketFactory.receive() .frameDecoder(PayloadDecoder.ZERO_COPY) - .acceptor(context.getBean(RSocketMessageHandler.class).serverResponder()) + .acceptor(responder) .transport(TcpServerTransport.create("localhost", 0)) .start() .block(); @@ -103,21 +103,16 @@ public class RSocketServerToClientIntegrationTests { ServerController serverController = context.getBean(ServerController.class); serverController.reset(); - RSocketStrategies strategies = context.getBean(RSocketStrategies.class); RSocketRequester requester = null; try { - ClientRSocketFactoryConfigurer responderConfigurer = - AnnotationClientResponderConfigurer.withHandlers(new ClientHandler()); - requester = RSocketRequester.builder() .rsocketFactory(factory -> { factory.metadataMimeType("text/plain"); factory.setupPayload(ByteBufPayload.create("", connectionRoute)); - factory.frameDecoder(PayloadDecoder.ZERO_COPY); }) - .rsocketFactory(responderConfigurer) - .rsocketStrategies(strategies) + .rsocketFactory(AnnotationClientResponderConfigurer.withHandlers(new ClientHandler())) + .rsocketStrategies(context.getBean(RSocketStrategies.class)) .connectTcp("localhost", server.address().getPort()) .block(); @@ -268,11 +263,7 @@ public class RSocketServerToClientIntegrationTests { @Bean public RSocketStrategies rsocketStrategies() { - return RSocketStrategies.builder() - .decoder(StringDecoder.allMimeTypes()) - .encoder(CharSequenceEncoder.allMimeTypes()) - .dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)) - .build(); + return RSocketStrategies.create(); } }