Updates to RSocket[Strategies|Requester] defaults

1. RSocketStrategies hooks in the basic codecs from spring-core by
default. Now that we have support for composite metadata, it makes
sense to have multiple codecs available.

2. RSocketStrategies is pre-configured with NettyDataBufferFactory.

3. DefaultRSocketRequesterBuilder configures RSocket with a frame
decoder that matches the DataBufferFactory choice, i.e. ensuring
consistency of zero copy vs default (copy) choice.

4. DefaultRSocketRequesterBuilder now tries to find a single non-basic
decoder to select a default data MimeType (e.g. CBOR), or otherwise
fall back on the first default decoder (e.g. String).

See gh-23314
master
Rossen Stoyanchev 5 years ago
parent c3c152f806
commit a780cad12e
  1. 49
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java
  2. 46
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java
  3. 33
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java
  4. 53
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java
  5. 190
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java

@ -18,9 +18,9 @@ package org.springframework.messaging.rsocket;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Stream;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
@ -29,6 +29,9 @@ import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import reactor.core.publisher.Mono;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
@ -110,7 +113,10 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
rsocketFactory.dataMimeType(dataMimeType.toString());
rsocketFactory.metadataMimeType(this.metadataMimeType.toString());
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
}
this.rsocketFactoryConfigurers.forEach(configurer -> {
configurer.configureWithStrategies(rsocketStrategies);
@ -139,16 +145,35 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
if (this.dataMimeType != null) {
return this.dataMimeType;
}
return Stream
.concat(
strategies.encoders().stream()
.flatMap(encoder -> encoder.getEncodableMimeTypes().stream()),
strategies.decoders().stream()
.flatMap(encoder -> encoder.getDecodableMimeTypes().stream())
)
.filter(MimeType::isConcrete)
.findFirst()
.orElseThrow(() -> new IllegalArgumentException("Failed to select data MimeType to use."));
// Look for non-basic Decoder (e.g. CBOR, Protobuf)
MimeType selected = null;
List<Decoder<?>> decoders = strategies.decoders();
for (Decoder<?> candidate : decoders) {
if (!isCoreCodec(candidate) && !candidate.getDecodableMimeTypes().isEmpty()) {
Assert.state(selected == null,
() -> "Cannot select default data MimeType based on configured decoders: " + decoders);
selected = getMimeType(candidate);
}
}
if (selected != null) {
return selected;
}
// Fall back on 1st decoder (e.g. String)
for (Decoder<?> decoder : decoders) {
if (!decoder.getDecodableMimeTypes().isEmpty()) {
return getMimeType(decoder);
}
}
throw new IllegalArgumentException("Failed to select data MimeType to use.");
}
private static boolean isCoreCodec(Object codec) {
return codec.getClass().getPackage().equals(StringDecoder.class.getPackage());
}
private static MimeType getMimeType(Decoder<?> decoder) {
MimeType mimeType = decoder.getDecodableMimeTypes().get(0);
return new MimeType(mimeType, Collections.emptyMap());
}
}

@ -22,11 +22,21 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import io.netty.buffer.PooledByteBufAllocator;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.ByteArrayDecoder;
import org.springframework.core.codec.ByteArrayEncoder;
import org.springframework.core.codec.ByteBufferDecoder;
import org.springframework.core.codec.ByteBufferEncoder;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.DataBufferDecoder;
import org.springframework.core.codec.DataBufferEncoder;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@ -90,18 +100,33 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
private ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
@Nullable
private DataBufferFactory dataBufferFactory;
private DataBufferFactory bufferFactory;
DefaultRSocketStrategiesBuilder() {
// Order of decoders may be significant for default data MimeType
// selection in RSocketRequester.Builder
public DefaultRSocketStrategiesBuilder() {
this.decoders.add(StringDecoder.allMimeTypes());
this.decoders.add(new ByteBufferDecoder());
this.decoders.add(new ByteArrayDecoder());
this.decoders.add(new DataBufferDecoder());
this.encoders.add(CharSequenceEncoder.allMimeTypes());
this.encoders.add(new ByteBufferEncoder());
this.encoders.add(new ByteArrayEncoder());
this.encoders.add(new DataBufferEncoder());
}
public DefaultRSocketStrategiesBuilder(RSocketStrategies other) {
DefaultRSocketStrategiesBuilder(RSocketStrategies other) {
this.encoders.addAll(other.encoders());
this.decoders.addAll(other.decoders());
this.adapterRegistry = other.reactiveAdapterRegistry();
this.dataBufferFactory = other.dataBufferFactory();
this.bufferFactory = other.dataBufferFactory();
}
@Override
public Builder encoder(Encoder<?>... encoders) {
this.encoders.addAll(Arrays.asList(encoders));
@ -135,14 +160,19 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
@Override
public Builder dataBufferFactory(DataBufferFactory bufferFactory) {
this.dataBufferFactory = bufferFactory;
this.bufferFactory = bufferFactory;
return this;
}
@Override
public RSocketStrategies build() {
return new DefaultRSocketStrategies(this.encoders, this.decoders, this.adapterRegistry,
this.dataBufferFactory != null ? this.dataBufferFactory : new DefaultDataBufferFactory());
return new DefaultRSocketStrategies(
this.encoders, this.decoders, this.adapterRegistry, initBufferFactory());
}
private DataBufferFactory initBufferFactory() {
return this.bufferFactory != null ? this.bufferFactory :
new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT);
}
}

@ -28,6 +28,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.codec.Decoder;
import org.springframework.lang.Nullable;
import org.springframework.messaging.rsocket.annotation.support.AnnotationClientResponderConfigurer;
import org.springframework.util.MimeType;
@ -125,32 +126,32 @@ public interface RSocketRequester {
interface Builder {
/**
* Configure the MimeType for payload data which is then specified
* on the {@code SETUP} frame and applies to the whole connection.
* <p>By default this is set to the first concrete mime type supported
* by the configured encoders and decoders.
* @param mimeType the data MimeType to use
* Configure the payload data MimeType to specify on the {@code SETUP}
* frame that applies to the whole connection.
* <p>If this is not set, the builder will try to select the mime type
* based on the presence of a single
* {@link RSocketStrategies.Builder#decoder(Decoder[]) non-default}
* {@code Decoder}, or the first default decoder otherwise
* (i.e. {@code String}) if no others are configured.
*/
RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType);
/**
* Configure the MimeType for payload metadata which is then specified
* on the {@code SETUP} frame and applies to the whole connection.
* Configure the payload metadata MimeType to specify on the {@code SETUP}
* frame and applies to the whole connection.
* <p>By default this is set to
* {@code "message/x.rsocket.composite-metadata.v0"} in which case the
* route, if provided, is encoded as a
* {@code "message/x.rsocket.routing.v0"} metadata entry, potentially
* with other metadata entries added too. If this is set to any other
* mime type, and a route is provided, it is assumed the mime type is
* for the route.
* @param mimeType the data MimeType to use
* {@code "message/x.rsocket.routing.v0"} composite metadata entry.
* For any other MimeType, it is assumed to be the MimeType for the
* route, if provided.
*/
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
/**
* Set the {@link RSocketStrategies} to use for access to encoders,
* decoders, and a factory for {@code DataBuffer's}.
* @param strategies the codecs strategies to use
* Set the {@link RSocketStrategies} to use.
* <p>By default this is set to {@code RSocketStrategies.builder().build()}
* but may be further customized via {@link #rsocketStrategies(Consumer)}.
*/
RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies);
@ -159,7 +160,6 @@ public interface RSocketRequester {
* <p>By default this starts out with an empty builder, i.e.
* {@link RSocketStrategies#builder()}, but the strategies can also be
* set via {@link #rsocketStrategies(RSocketStrategies)}.
* @param configurer the configurer to apply
*/
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
@ -172,7 +172,6 @@ public interface RSocketRequester {
* {@code ClientRSocketFactory}. Use the shortcuts on this builder
* instead since the created {@code RSocketRequester} needs to be aware
* of those settings.
* @param configurer consumer to customize the factory
* @see AnnotationClientResponderConfigurer
*/
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);

@ -19,13 +19,17 @@ package org.springframework.messaging.rsocket;
import java.util.List;
import java.util.function.Consumer;
import io.netty.buffer.PooledByteBufAllocator;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory.ClientRSocketFactory;
import io.rsocket.RSocketFactory.ServerRSocketFactory;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.MimeType;
@ -120,24 +124,28 @@ public interface RSocketStrategies {
interface Builder {
/**
* Add encoders to use for serializing Objects.
* <p>By default this is empty.
* Append to the list of encoders to use for serializing Objects to the
* data or metadata of a {@link Payload}.
* <p>By default this is initialized with encoders for {@code String},
* {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}.
*/
Builder encoder(Encoder<?>... encoder);
/**
* Access and manipulate the list of configured {@link #encoder encoders}.
* Apply the consumer to the list of configured encoders, immediately.
*/
Builder encoders(Consumer<List<Encoder<?>>> consumer);
/**
* Add decoders for de-serializing Objects.
* <p>By default this is empty.
* Append to the list of decoders to use for de-serializing Objects from
* the data or metadata of a {@link Payload}.
* <p>By default this is initialized with decoders for {@code String},
* {@code byte[]}, {@code ByteBuffer}, and {@code DataBuffer}.
*/
Builder decoder(Decoder<?>... decoder);
/**
* Access and manipulate the list of configured {@link #encoder decoders}.
* Apply the consumer to the list of configured decoders, immediately.
*/
Builder decoders(Consumer<List<Decoder<?>>> consumer);
@ -146,28 +154,23 @@ public interface RSocketStrategies {
* to adapt to, and/or determine the semantics of a given
* {@link org.reactivestreams.Publisher Publisher}.
* <p>By default this {@link ReactiveAdapterRegistry#getSharedInstance()}.
* @param registry the registry to use
*/
Builder reactiveAdapterStrategy(ReactiveAdapterRegistry registry);
/**
* Configure the DataBufferFactory to use for allocating buffers, for
* example when preparing requests or when responding. The choice here
* must be aligned with the frame decoder configured in
* {@link io.rsocket.RSocketFactory}.
* <p>By default this property is an instance of
* {@link org.springframework.core.io.buffer.DefaultDataBufferFactory
* DefaultDataBufferFactory} matching to the default frame decoder in
* {@link io.rsocket.RSocketFactory} which copies the payload. This
* comes at cost to performance but does not require reference counting
* and eliminates possibility for memory leaks.
* <p>To switch to a zero-copy strategy,
* <a href="https://github.com/rsocket/rsocket-java#zero-copy">configure RSocket</a>
* accordingly, and then configure this property with an instance of
* {@link org.springframework.core.io.buffer.NettyDataBufferFactory
* NettyDataBufferFactory} with a pooled allocator such as
* {@link PooledByteBufAllocator#DEFAULT}.
* @param bufferFactory the DataBufferFactory to use
* Configure the DataBufferFactory to use for allocating buffers when
* preparing requests or creating responses.
* <p>By default this is set to {@link NettyDataBufferFactory} with
* pooled, allocated buffers for zero copy. RSocket must also be
* <a href="https://github.com/rsocket/rsocket-java#zero-copy">configured</a>
* for zero copy. For client setup, {@link RSocketRequester.Builder}
* adapts automatically to the {@code DataBufferFactory} configured
* here, and sets the frame decoder in {@link ClientRSocketFactory
* ClientRSocketFactory} accordingly. For server setup, the
* {@link ServerRSocketFactory ServerRSocketFactory} must be configured
* accordingly too for zero copy.
* <p>If using {@link DefaultDataBufferFactory} instead, there is no
* need for related config changes in RSocket.
*/
Builder dataBufferFactory(DataBufferFactory bufferFactory);

@ -17,11 +17,16 @@
package org.springframework.messaging.rsocket;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
import org.junit.Before;
import org.junit.Test;
@ -29,13 +34,19 @@ import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.codec.CharSequenceEncoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.BDDMockito.given;
@ -52,6 +63,8 @@ public class DefaultRSocketRequesterBuilderTests {
private ClientTransport transport;
private final TestRSocketFactoryConfigurer rsocketFactoryConfigurer = new TestRSocketFactoryConfigurer();
@Before
public void setup() {
@ -63,53 +76,113 @@ public class DefaultRSocketRequesterBuilderTests {
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizationsAtSubscription() {
ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketFactory(factoryConfigurer)
.rsocketFactory(this.rsocketFactoryConfigurer)
.rsocketStrategies(strategiesConfigurer)
.connect(this.transport);
verifyZeroInteractions(this.transport, factoryConfigurer, strategiesConfigurer);
verifyZeroInteractions(this.transport);
assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNull();
}
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizations() {
RSocketStrategies strategies = RSocketStrategies.builder()
.encoder(CharSequenceEncoder.allMimeTypes())
.decoder(StringDecoder.allMimeTypes())
.build();
ClientRSocketFactoryConfigurer factoryConfigurer = mock(ClientRSocketFactoryConfigurer.class);
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
Consumer<RSocketStrategies.Builder> rsocketStrategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketStrategies(strategies)
.rsocketFactory(factoryConfigurer)
.rsocketStrategies(strategiesConfigurer)
.rsocketFactory(this.rsocketFactoryConfigurer)
.rsocketStrategies(rsocketStrategiesConfigurer)
.connect(this.transport)
.block();
// RSocketStrategies and RSocketFactory configurers should have been called
verify(this.transport).connect(anyInt());
verify(factoryConfigurer).configureWithStrategies(any(RSocketStrategies.class));
verify(factoryConfigurer).configure(any(RSocketFactory.ClientRSocketFactory.class));
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
verify(rsocketStrategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
assertThat(this.rsocketFactoryConfigurer.rsocketStrategies()).isNotNull();
assertThat(this.rsocketFactoryConfigurer.rsocketFactory()).isNotNull();
}
@Test
public void defaultDataMimeType() {
RSocketRequester requester = RSocketRequester.builder()
.connect(this.transport)
.block();
assertThat(requester.dataMimeType())
.as("Default data MimeType, based on the first configured Decoder")
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
}
@Test
public void dataMimeType() throws NoSuchFieldException {
public void defaultDataMimeTypeWithCustomDecoderRegitered() {
RSocketStrategies strategies = RSocketStrategies.builder()
.encoder(CharSequenceEncoder.allMimeTypes())
.decoder(StringDecoder.allMimeTypes())
.decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.connect(this.transport)
.block();
assertThat(requester.dataMimeType())
.as("Default data MimeType, based on the first configured, non-default Decoder")
.isEqualTo(MimeTypeUtils.APPLICATION_JSON);
}
@Test
public void defaultDataMimeTypeWithMultipleCustomDecoderRegitered() {
RSocketStrategies strategies = RSocketStrategies.builder()
.decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON))
.decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_XML))
.build();
assertThatThrownBy(() ->
RSocketRequester
.builder()
.rsocketStrategies(strategies)
.connect(this.transport)
.block())
.hasMessageContaining("Cannot select default data MimeType");
}
@Test
public void dataMimeTypeSet() {
RSocketRequester requester = RSocketRequester.builder()
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.connect(this.transport)
.block();
Field field = DefaultRSocketRequester.class.getDeclaredField("dataMimeType");
assertThat(requester.dataMimeType()).isEqualTo(MimeTypeUtils.APPLICATION_JSON);
}
@Test
public void frameDecoderMatchesDataBufferFactory() throws Exception {
testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY);
testFrameDecoder(new DefaultDataBufferFactory(), PayloadDecoder.DEFAULT);
}
private void testFrameDecoder(DataBufferFactory bufferFactory, PayloadDecoder frameDecoder)
throws NoSuchFieldException {
RSocketStrategies strategies = RSocketStrategies.builder()
.dataBufferFactory(bufferFactory)
.build();
RSocketRequester.builder()
.rsocketStrategies(strategies)
.rsocketFactory(this.rsocketFactoryConfigurer)
.connect(this.transport)
.block();
RSocketFactory.ClientRSocketFactory factory = this.rsocketFactoryConfigurer.rsocketFactory();
assertThat(factory).isNotNull();
Field field = RSocketFactory.ClientRSocketFactory.class.getDeclaredField("payloadDecoder");
ReflectionUtils.makeAccessible(field);
MimeType dataMimeType = (MimeType) ReflectionUtils.getField(field, requester);
assertThat(dataMimeType).isEqualTo(MimeTypeUtils.APPLICATION_JSON);
PayloadDecoder decoder = (PayloadDecoder) ReflectionUtils.getField(field, factory);
assertThat(decoder).isSameAs(frameDecoder);
}
@ -135,4 +208,75 @@ public class DefaultRSocketRequesterBuilderTests {
}
}
static class TestRSocketFactoryConfigurer implements ClientRSocketFactoryConfigurer {
private RSocketStrategies strategies;
private RSocketFactory.ClientRSocketFactory rsocketFactory;
public RSocketStrategies rsocketStrategies() {
return this.strategies;
}
public RSocketFactory.ClientRSocketFactory rsocketFactory() {
return this.rsocketFactory;
}
@Override
public void configureWithStrategies(RSocketStrategies strategies) {
this.strategies = strategies;
}
@Override
public void configure(RSocketFactory.ClientRSocketFactory rsocketFactory) {
this.rsocketFactory = rsocketFactory;
}
}
static class TestJsonDecoder implements Decoder<Object> {
private final MimeType mimeType;
TestJsonDecoder(MimeType mimeType) {
this.mimeType = mimeType;
}
@Override
public List<MimeType> getDecodableMimeTypes() {
return Collections.singletonList(this.mimeType);
}
@Override
public boolean canDecode(ResolvableType elementType, MimeType mimeType) {
return false;
}
@Override
public Mono<Object> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Map<String, Object> hints) {
throw new UnsupportedOperationException();
}
@Override
public Flux<Object> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
MimeType mimeType, Map<String, Object> hints) {
throw new UnsupportedOperationException();
}
@Override
public Object decode(DataBuffer buffer, ResolvableType targetType, MimeType mimeType,
Map<String, Object> hints) throws DecodingException {
throw new UnsupportedOperationException();
}
}
}

Loading…
Cancel
Save