Add create shortcut to RSocketStrategies

Now that RSocketStrategies has default settings it makes sense to have
a create() shortcut vs builder().build().

This commit also updates tests to take advantage of improvements in this
and the previous two commits.

See gh-23314
master
Rossen Stoyanchev 5 years ago
parent 91b040d0bf
commit c456950bc3
  1. 24
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java
  2. 15
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketBufferLeakTests.java
  3. 17
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java
  4. 25
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

@ -114,20 +114,30 @@ public interface RSocketStrategies {
*/
ReactiveAdapterRegistry reactiveAdapterRegistry();
/**
* Return a builder to create a new {@link RSocketStrategies} instance
* replicated from the current instance.
*/
default Builder mutate() {
return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(this);
}
/**
* Return a builder to build a new {@code RSocketStrategies} instance.
* Create an {@code RSocketStrategies} instance with default settings.
* Equivalent to {@code RSocketStrategies.builder().build()}.
*/
static Builder builder() {
return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder();
static RSocketStrategies create() {
return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder().build();
}
/**
* Return a builder to create a new {@link RSocketStrategies} instance
* replicated from the current instance.
* Return a builder to build a new {@code RSocketStrategies} instance.
* The builder applies default settings, see individual builder methods for
* details.
*/
default Builder mutate() {
return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder(this);
static Builder builder() {
return new DefaultRSocketStrategies.DefaultRSocketStrategiesBuilder();
}

@ -26,6 +26,7 @@ import io.netty.util.ReferenceCounted;
import io.rsocket.AbstractRSocket;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.transport.netty.server.CloseableChannel;
@ -44,8 +45,6 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.Resource;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
@ -74,21 +73,21 @@ public class RSocketBufferLeakTests {
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.serverResponder();
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.addResponderPlugin(payloadInterceptor) // intercept responding
.acceptor(context.getBean(RSocketMessageHandler.class).serverResponder())
.acceptor(responder)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
requester = RSocketRequester.builder()
.rsocketFactory(factory -> {
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
factory.addRequesterPlugin(payloadInterceptor); // intercept outgoing requests
})
.rsocketFactory(factory -> factory.addRequesterPlugin(payloadInterceptor))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block();
@ -215,8 +214,6 @@ public class RSocketBufferLeakTests {
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new LeakAwareNettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
.build();
}

@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket;
import java.time.Duration;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
@ -34,9 +34,6 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
@ -63,18 +60,20 @@ public class RSocketClientToServerIntegrationTests {
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(ServerConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.serverResponder();
server = RSocketFactory.receive()
.addResponderPlugin(interceptor)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean(RSocketMessageHandler.class).serverResponder())
.acceptor(responder)
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
requester = RSocketRequester.builder()
.rsocketFactory(factory -> factory.frameDecoder(PayloadDecoder.ZERO_COPY))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", 7000)
.block();
@ -266,11 +265,7 @@ public class RSocketClientToServerIntegrationTests {
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
.build();
return RSocketStrategies.create();
}
}

@ -18,8 +18,8 @@ package org.springframework.messaging.rsocket;
import java.time.Duration;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
@ -37,9 +37,6 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer;
@ -62,11 +59,14 @@ public class RSocketServerToClientIntegrationTests {
@BeforeClass
@SuppressWarnings("ConstantConditions")
public static void setupOnce() {
context = new AnnotationConfigApplicationContext(RSocketConfig.class);
RSocketMessageHandler messageHandler = context.getBean(RSocketMessageHandler.class);
SocketAcceptor responder = messageHandler.serverResponder();
server = RSocketFactory.receive()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.acceptor(context.getBean(RSocketMessageHandler.class).serverResponder())
.acceptor(responder)
.transport(TcpServerTransport.create("localhost", 0))
.start()
.block();
@ -103,21 +103,16 @@ public class RSocketServerToClientIntegrationTests {
ServerController serverController = context.getBean(ServerController.class);
serverController.reset();
RSocketStrategies strategies = context.getBean(RSocketStrategies.class);
RSocketRequester requester = null;
try {
ClientRSocketFactoryConfigurer responderConfigurer =
AnnotationClientResponderConfigurer.withHandlers(new ClientHandler());
requester = RSocketRequester.builder()
.rsocketFactory(factory -> {
factory.metadataMimeType("text/plain");
factory.setupPayload(ByteBufPayload.create("", connectionRoute));
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
})
.rsocketFactory(responderConfigurer)
.rsocketStrategies(strategies)
.rsocketFactory(AnnotationClientResponderConfigurer.withHandlers(new ClientHandler()))
.rsocketStrategies(context.getBean(RSocketStrategies.class))
.connectTcp("localhost", server.address().getPort())
.block();
@ -268,11 +263,7 @@ public class RSocketServerToClientIntegrationTests {
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.allMimeTypes())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT))
.build();
return RSocketStrategies.create();
}
}

Loading…
Cancel
Save