diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 2f81a6c959..c0d8edbf9d 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -26,6 +26,9 @@ import reactor.netty.NettyOutbound; import reactor.netty.http.client.HttpClient; import reactor.netty.http.client.HttpClientRequest; import reactor.netty.http.client.HttpClientResponse; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.resources.LoopResources; +import reactor.netty.tcp.TcpClient; import org.springframework.http.HttpMethod; import org.springframework.util.Assert; @@ -39,21 +42,53 @@ import org.springframework.util.Assert; */ public class ReactorClientHttpConnector implements ClientHttpConnector { + private final static Function defaultInitializer = HttpClient::compress; + + private final HttpClient httpClient; /** - * Create a Reactor Netty {@link ClientHttpConnector} - * with default configuration and HTTP compression support enabled. + * Default constructor that initializes an {@link HttpClient} with: + *
+	 * HttpClient.create().compress()
+	 * 
*/ public ReactorClientHttpConnector() { - this.httpClient = HttpClient.create().compress(); + this.httpClient = defaultInitializer.apply(HttpClient.create()); + } + + /** + * Constructor with externally managed Reactor Netty resources, including + * {@link LoopResources} for event loop threads, and {@link ConnectionProvider} + * for the connection pool. + *

This constructor should be used only when you don't want the client + * to participate in the Reactor Netty global resources. By default the + * client participates in the Reactor Netty global resources held in + * {@link reactor.netty.http.HttpResources}, which is recommended since + * fixed, shared resources are favored for event loop concurrency. However, + * consider declaring a {@link ReactorResourceFactory} bean with + * {@code globaResources=true} in order to ensure the Reactor Netty global + * resources are shut down when the Spring ApplicationContext is closed. + * @param factory the resource factory to obtain the resources from + * @param mapper a mapper for further initialization of the created client + * @since 5.1 + */ + public ReactorClientHttpConnector(ReactorResourceFactory factory, Function mapper) { + this.httpClient = defaultInitializer.andThen(mapper).apply(initHttpClient(factory)); + } + + private static HttpClient initHttpClient(ReactorResourceFactory resourceFactory) { + ConnectionProvider provider = resourceFactory.getConnectionProvider(); + LoopResources resources = resourceFactory.getLoopResources(); + Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?"); + Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?"); + return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources)).compress(); } /** - * Create a Reactor Netty {@link ClientHttpConnector} with a fully - * configured {@code HttpClient}. - * @param httpClient the client instance to use + * Constructor with a pre-configured {@code HttpClient} instance. + * @param httpClient the client to use * @since 5.1 */ public ReactorClientHttpConnector(HttpClient httpClient) { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java new file mode 100644 index 0000000000..6cafa276ce --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorResourceFactory.java @@ -0,0 +1,153 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.http.client.reactive; + +import reactor.netty.http.HttpResources; +import reactor.netty.resources.ConnectionProvider; +import reactor.netty.resources.LoopResources; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Factory to manage Reactor Netty resources, i.e. {@link LoopResources} for + * event loop threads, and {@link ConnectionProvider} for the connection pool, + * within the lifecycle of a Spring {@code ApplicationContext}. + * + *

This factory implements {@link InitializingBean} and {@link DisposableBean} + * and is expected typically to be declared as a Spring-managed bean. + * + * @author Rossen Stoyanchev + * @since 5.1 + */ +public class ReactorResourceFactory implements InitializingBean, DisposableBean { + + private boolean globalResources = true; + + @Nullable + private ConnectionProvider connectionProvider; + + @Nullable + private LoopResources loopResources; + + private String threadPrefix = "reactor-http"; + + + /** + * Whether to expose and manage the global Reactor Netty resources from the + * {@link HttpResources} holder. + *

Default is "true" in which case this factory helps to configure and + * shut down the global Reactor Netty resources within the lifecycle of a + * Spring {@code ApplicationContext}. + *

If set to "false" then the factory creates and manages its own + * {@link LoopResources} and {@link ConnectionProvider}, independent of the + * global ones in the {@link HttpResources} holder. + * @param globalResources whether to expose and manage the global resources + */ + public void setGlobalResources(boolean globalResources) { + this.globalResources = globalResources; + } + + /** + * Configure the {@link ConnectionProvider} to use. + *

By default, initialized with {@link ConnectionProvider#elastic(String)}. + * @param connectionProvider the connection provider to use + */ + public void setConnectionProvider(@Nullable ConnectionProvider connectionProvider) { + this.connectionProvider = connectionProvider; + } + + /** + * Configure the {@link LoopResources} to use. + *

By default, initialized with {@link LoopResources#create(String)}. + * @param loopResources the loop resources to use + */ + public void setLoopResources(@Nullable LoopResources loopResources) { + this.loopResources = loopResources; + } + + /** + * Configure the thread prefix to initialize {@link LoopResources} with. This + * is used only when a {@link LoopResources} instance isn't + * {@link #setLoopResources(LoopResources) provided}. + *

By default set to "reactor-http". + * @param threadPrefix the thread prefix to use + */ + public void setThreadPrefix(String threadPrefix) { + Assert.notNull(threadPrefix, "Thread prefix is required"); + this.threadPrefix = threadPrefix; + } + + + /** + * Whether this factory exposes the global + * {@link reactor.netty.http.HttpResources HttpResources} holder. + */ + public boolean isGlobalResources() { + return this.globalResources; + } + + /** + * Return the configured {@link ConnectionProvider}. + */ + @Nullable + public ConnectionProvider getConnectionProvider() { + return this.connectionProvider; + } + + /** + * Return the configured {@link LoopResources}. + */ + @Nullable + public LoopResources getLoopResources() { + return this.loopResources; + } + + /** + * Return the configured prefix for event loop threads. + */ + public String getThreadPrefix() { + return this.threadPrefix; + } + + + @Override + public void afterPropertiesSet() throws Exception { + if (this.loopResources == null) { + this.loopResources = LoopResources.create(this.threadPrefix); + } + if (this.connectionProvider == null) { + this.connectionProvider = ConnectionProvider.elastic("http"); + } + if (this.globalResources) { + HttpResources.set(this.loopResources); + HttpResources.set(this.connectionProvider); + } + } + + @Override + public void destroy() { + + Assert.notNull(this.connectionProvider, "No ConnectionProvider"); + this.connectionProvider.dispose(); + + Assert.notNull(this.loopResources, "No LoopResources"); + this.loopResources.dispose(); + } + +}