连接池用在网络通信场景中的客户端,起到连接复用、资源管理和保护的作用。用到的地方到处都是,数据库JDBC,Apache HttpClient,WebClient,Redis的Jedis和Lettuce连接池等等等。
起因疑问
翻之前的笔记看到WebClient本地有个连接上限是500,其他文章也提到这个连接池上限的问题Spring5的WebClient使用详解 - HelloWorld开发者社区 。有些疑惑,基于Netty做网络层的框架,能连接共享且非阻塞io的情况下为什么还需要设置连接池呢?比如Lettuce默认就是单连接的、还比如Dubbo默认貌似也是单连接的。
难道是因为WebClient没法像Lettuce那样确定自己的对端就是Redis,然后需要多准备几个连接防止不够用,具体的原理不是很理解。感觉每个路由1个连接就够了,难道是为了连接500个不同的路由么?
另外,如果实现了连接共享和非阻塞IO,基于Netty到指定的对端应该就不用连接池了吧,连接的多少跟传输效率到底有没有关系?多连接会比单连接快?还是传输效率跟连接多少没关系只跟带宽有关?《跟着案例学Netty》里边提到的:“创建多个TCP连接,提升消息的收发能力”,“随着硬件性能的不断提升,多处理器多网卡已经成为标配,为了充分利用硬件资源,应用程序通过并发编程、客户端与服务端创建多链路的方式提升性能”,又怎么理解?
下面带着问题,一个一个的搞懂。
单连接VS多连接
先说结论:在不存在链路争用的情况下,如果连接可以在多线程间共享且非阻塞,单连接是和多连接一样快的。
如果客户端与服务端之间出现了某个瓶颈节点,比如路由器交换机之类的,网络中的公共节点为多个链路公用。由于TCP的拥塞控制机制,经过该公共设备的连接的实际速率会趋于平均,所以会出现某个客户端没有充分利用带宽的情况,这样如果在客户端上再开几个连接,由于平均分配机制,那么就会提高该客户端在公共节点的流量占比,从客户端角度看过来就是传输速率变快了。其实可以认为是从其他客户端那儿抢过来的一些公共节点的带宽使用。
如图,客户端1和客户端2如果都是单连接的话,在路由器会被均分带宽,每个是R/2,假设如果客户端带宽也是R的话,那么从客户端的角度来看就没有充分利用带宽;这时候客户端1开两个连接,那么其在路由器相当于会分到2R/3带宽,从其角度来看两连接就比原来的单连接要快了。
连接共享模式
实现连接共享模式的连接池,一般需要客户端与服务端在协议上做出约定,使用类似RequestID这种、服务端在返回Response时带上这个ID,用来识别这个是客户端的哪个请求的响应,后面就可以根据这个ID去做对应的处理了,比如回调对应的handle等等。
或者还有一种情况,比如基于Netty的Redis Lettuce连接池,它可以采用单连接共享模式的原因在于:它对端的服务端是单线程处理的Redis,加之TCP协议本身的顺序性保证,这样一来就能够确保请求发出的顺序和响应回来的顺序是一致的,所以就不需要上面所说的客户端与服务端之间约定的ID了。
连接独占模式
之前笔者错误的认为只有JDBC这种阻塞io基础上制定协议才不得不使用连接独占的模式:1个连接同一时刻只会被1个线程使用,使用完了归还到连接池里,其他线程才能使用。基于Netty做网络层的组件有时候也要使用连接独占模式。比如WebClient这个Http客户端,它采用的是连接独占模式,没法使用共享模式、因为作为一款Http客户端组件,它的服务端完全是不可控制的,服务端不一定会配合客户端进行约定请求ID的支持,这样连接没法共享,就必然要搞多个连接了。
进一步查阅Reactor Netty的官方文档:Reactor Netty Reference Guide (projectreactor.io),HTTP连接池默认的策略是最大500连接,当尝试获取连接时,当前连接数如果不到500就创建连接并交由池管理,如果达到500了以后最多可以允许1000个挂起尝试,再超过的话就拒绝。
最后看看基于Netty的WebClient的连接池设计。
WebClient中的连接池
从配置类开始:
@Configuration
public class ReactiveHttpClientConfig {
@Bean
public WebClient webClient() {
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient()))
.build(); //全局一个单例webClient
}
//设置connect、read、write timeout
public HttpClient httpClient() {
return HttpClient.create()
.tcpConfiguration(tcpClient ->
tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.TCP_NODELAY, true) //禁用Nagle算法,取消延时发送,允许小包发送
.doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(15))
.addHandlerLast(new WriteTimeoutHandler(15))
)
);
}
}
HttpClient.create()方法进去,调用的是create(HttpResources.get())
,create方法声明是public static HttpClient create(ConnectionProvider connectionProvider),使用ConnectionProvider创建HttpClient,所以我们看下HttpResources.get():调用的是getOrCreate(httpResources, null, null, ON_HTTP_NEW, "http")
protected static <T extends TcpResources> T getOrCreate(AtomicReference<T> ref,
@Nullable LoopResources loops,
@Nullable ConnectionProvider provider,
BiFunction<LoopResources, ConnectionProvider, T> onNew,
String name) {
//...
update = create(resources, loops, provider, name, onNew);
//...
return update;
}
来到了TcpResources.create方法:
static <T extends TcpResources> T create(@Nullable T previous,
@Nullable LoopResources loops, @Nullable ConnectionProvider provider,
String name,
BiFunction<LoopResources, ConnectionProvider, T> onNew) {
if (previous == null) {
loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
provider = provider == null ?
ConnectionProvider.builder(name).maxConnections(500).pendingAcquireMaxCount(-1).build() : provider;
}
else {
loops = loops == null ? previous.defaultLoops : loops;
provider = provider == null ? previous.defaultProvider : provider;
}
return onNew.apply(loops, provider);
}
下面这行,使用的是builder模式来构建ConnectionProvider:
ConnectionProvider.builder(name).maxConnections(500).pendingAcquireMaxCount(-1).build()
build()里边:
public ConnectionProvider build() {
return new PooledConnectionProvider(this);
}
到此搞清楚了,reactor-netty里边默认的连接池是用的这个PooledConnectionProvider
,类总体结构:
final class PooledConnectionProvider implements ConnectionProvider {
final ConcurrentMap<PoolKey, InstrumentedPool<PooledConnection>> channelPools =
PlatformDependent.newConcurrentHashMap();
private final Map<PoolKey, ConnectionPoolMetrics> poolMetrics = new WeakHashMap<>();
final String name;
final Map<SocketAddress, PoolFactory> poolFactoryPerRemoteHost = new HashMap<>();
final PoolFactory defaultPoolFactory;
PooledConnectionProvider(Builder builder){
this.name = builder.name;
this.defaultPoolFactory = new PoolFactory(builder);
for(Map.Entry<SocketAddress, ConnectionPoolSpec<?>> entry : builder.confPerRemoteHost.entrySet()) {
poolFactoryPerRemoteHost.put(entry.getKey(), new PoolFactory(entry.getValue()));
}
}
@Override
public void disposeWhen(@NonNull SocketAddress address)
private boolean compareAddresses(SocketAddress origin, SocketAddress target)
@Override
public Mono<Connection> acquire(Bootstrap b) //获取连接
@Override
public Mono<Void> disposeLater() //释放
@Override
public boolean isDisposed()
static void disposableAcquire(DisposableAcquire disposableAcquire)
static final AttributeKey<ConnectionObserver> OWNER =
AttributeKey.valueOf("connectionOwner");
//几个内部类
final static class PooledConnectionAllocator
final static class PendingConnectionObserver implements ConnectionObserver
final static class PooledConnection implements Connection, ConnectionObserver
final static class DisposableAcquire implements ConnectionObserver, Runnable, CoreSubscriber<PooledRef<PooledConnection>>, Disposable
final static class PoolKey
final static class PoolFactory
}
从netstat来看,没常驻连接,设置了idleTimeout以后到时间就关闭掉,限制最多500个连接。
参考
基于 Netty 如何实现高性能的 HTTP Client 的连接池 - 云+社区 - 腾讯云 (tencent.com) 唯品会架构师
为什么多 TCP 连接分块下载比单连接下载快? - 知乎 (zhihu.com)
Reactor Netty Reference Guide (projectreactor.io)
Spring5的WebClient使用详解 - HelloWorld开发者社区
Java编程方法论-Reactor-Netty与Spring WebFlux解读 整体简介与导读 - 知乎 (zhihu.com)