一、Reactor 模式
reactor
模式是一种事件驱动的应用层 I/O 处理模式,基于分而治之和事件驱动的思想,致力于构建一个高性能的可伸缩的 I/O 处理模式。维基百科对 Reactor pattern 的解释:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers
大致意思是说,reactor
设计模式是一种事件处理模式,用于同时有一个或多个请求发送到事件处理器(service handler),这个事件处理器会采用多路分离(demultiplexes )的方式,同步的将这些请求分发到请求处理器(request handlers)。
不难看出,上边介绍的 reactor
模式是一种抽象;从实现角度说,reactor
模式有许多变种,不同编程语言中的实现也有差异。就 java 而言,大师 Doug Lea 在其【Scalable IO in Java】中就讲述了几个reactor
模式的演进,如单线程版本
、多线程版
,阅读此文后,笔者对大师所讲reactor
模式演进的理解与网络中一些描述稍有差异。
在reactor
单线程版中,只有一个reactor
线程,线程中通过 select
(I/O 多路复用接口) 监听所有 I/O 事件,收到 I/O 事件后通过 dispatch
进行分发给 Handlers
处理,此版本容易实现,也容易理解,但性能不高。为了适配多处理器,充分利用多核并行处理的优势,实现高性能的网络服务,可以采用分治策略,关键环节采用多线程模式,于是就出现了reactor
多线程版本,而多线程的应用体现为worder
线程和reactor
线程,多线程应该被池化管理,这样才容易被调整和控制。线程池中的线程数会比客户端的数量少很多,实际数量可以根据程序本身是 CPU 密集型还是 I/O 密集型操作来进行合理的分配。
-
多个 worder 线程(池化管理)
- 属于网络 I/O 操作与业务处理的拆分,因为
reactors
监听到 I/O 事件后应该快速分发给handlers
来处理程序;但如果handler
中的非 I/O 操作慢了就会减慢reactor
中的 I/O 事件响应速度,所以把非 I/O 操作从reactors
的 I/O 线程转移到其他线程中,即由worker
线程来分担非 I/O 逻辑的操作处理。
- 属于网络 I/O 操作与业务处理的拆分,因为
-
多个 reactor 线程(池化管理)
- 属于网络建连操作与网络 I/O 读写操作的拆分,因为由一个
reactor
在一个线程中完成所有 I/O 操作也会遇到性能瓶颈,可采取拆分并增加reactor
策略,将 I/O 负载分配给多个reactor
(每个reactor
都有自己的线程、选择器和调度循环)以达到负载平衡。这看起来挺不错,但谁来执行分配以达到负载均衡呢?或许是因为这个问题,将reactor
拆分为两类角色,mainReactor
负责接收连接,之后采用一定的负载均衡策略将新连接分配给其他subReactor
来处理 I/O 读写,这样的拆分自然流畅。
- 属于网络建连操作与网络 I/O 读写操作的拆分,因为由一个
如此就演进出如上图中的主从reactor
多线程模型。请注意,结合【Scalable IO in Java】原文中的用词和描述看,上图中的mainReactor
和subReactor
可以有多个并做池化管理,所有也有一些文章中会看到如主ReactorGroup
、mainReactorGroup
、从ReactorGroup
、subReactorGroup
等这类名词用 Group 后缀来强调 Reactor 是池化管理。 或许是不好布局,也或许是为了凸显主从reactor
角色的协作关系,上图中都只展示了一个,另外服务端应用通常只暴露一个服务端口时,只需用一个 mainReactor
来监听端口上的连接事件并处理。
二、Netty 主从 reactor
多线程模型
Netty
中reactor
所对应的实现类是NioEventLoop
,其核心逻辑如下:
- 不同类型的 channel 向 Selector 注册所感兴趣的事件
- 扫描是否有感兴趣的事件发生
- 事件发生后做相应的处理
客户端和服务端分别会有不同类型的channel
,客户端创建SocketChannel
向服务端发起连接请求,服务端创建ServerSocketChannel
监听客户端连接,建连后创建SocketChannel
与客户端的SocketChannel
互相收发数据,这些channel
分工不同,向 Selector 注册所感兴趣的事件情况也不同:
客户端/服务端 | channel | OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ |
---|---|---|---|---|---|
客户端 | SocketChannel | YES | |||
服务端 | ServerSocketChannel | YES | |||
服务端 | SocketChannel | YES | YES |
Netty
中 Nio 方式实现几种 reactor
模型如下:
mainReactor
对应 Netty
中配置的 bossGroup
线程组(下图中的主ReactorGroup
),主要负责接受客户端连接的建立。每 bind
一个端口就用掉一个bossGroup
中的线程。
subReactor
对应 Netty
中配置的 workerGroup
线程组(下图中的 reactorGroup
),bossGroup
线程组接受完客户端的连接后,将 channel
转交给 workerGroup
线程组,在 workerGroup
线程组内选择一个线程,执行 I/O 读写的处理,workerGroup
线程组默认是 2 * CPU 核数个线程。
主从 reactor
模式的核心流程:
如果只监听一个端口,那么只需一个主
reactor
干活儿,所以通常看到boosGroup
只配置一个线程。主reactor
运行在独立的线程中 ,该线程中只负责与客户端的连接请求从
reactor
在服务器端可以不止一个, 通常运行多个从reactor
, 每个从reactor
也运行在一个独立的线程中 ,负责与客户端的读写操作主
reactor
检测到客户端的链接后,创建NioSocketChannel
,按照一定的算法循环选取(负载均衡)一个从reactor
,并把刚创建的NioSocketChannel
注册到这个从reactor
中,这样建连和读写事件互不影响。一个
reactor
中可被注册多个NioSocketChannel
,这个reactor
监听所有的被分配的NioSocketChannel
的读写事件 , 如果监听到客户端的数据发送事件 , 将对应的业务逻辑转发给NioSocketChannel
中的pipeline
里的handler
链进行处理handler
最好只负责响应 I/O 事件,不处理具体的与客户端交互的业务逻辑 , 这样不会长时间阻塞 , 其read
方法读取客户端数据后 , 将消息数据交给业务线程池去处理相关业务逻辑业务线程池完成相关业务逻辑的处理后,将结果返回,通过
NioSocketChannel
的的pipeline
里的handler
链将结果消息写回给客户端当
buffer
不满足将结果消息写回给客户端时的条件时,注册写事件,等待可写时再写
三、Seata Server 端 的 reactor 模式应用
Seata Server 采用了 主从 reactor
多线程模型,对应这个模型的话是有四个线程池,其中自定义业务线程池是两个。
功能 | 线程池对象 | 备注 |
---|---|---|
接收客户端连接 | NettyServerBootstrap#eventLoopGroupBoss |
|
处理 IO 事件 | NettyServerBootstrap#eventLoopGroupWorker |
部分 RPC 消息在这里处理 |
处理客户端的 request 消息 | AbstractNettyRemoting#messageExecutor |
客户端主动发给的消息 |
处理客户端的 response 消息 | NettyRemotingServer#branchResultMessageExecutor |
服务端主动发给客户端消息,客户端处理后给服务端响应 |
3.1、NettyServerBootstrap#eventLoopGroupBoss
笔者的环境未启用 epoll
,关键信息如下:
- 线程数:1,只监听一个端口
- 线程名前缀:“NettyBoss”
this.eventLoopGroupBoss = new NioEventLoopGroup(
//CONFIG.getInt("transport.threadFactory.bossThreadSize", 1);
nettyServerConfig.getBossThreadSize(),
new NamedThreadFactory(
// CONFIG.getConfig("transport.threadFactory.bossThreadPrefix", "NettyBoss");
nettyServerConfig.getBossThreadPrefix(),
//CONFIG.getConfig("transport.threadFactory.bossThreadSize", 1);
nettyServerConfig.getBossThreadSize())
);
3.2、NettyServerBootstrap#eventLoopGroupWorker
笔者的环境未启用 epoll
,关键信息如下:
- 线程数:默认值是 cpu 核数 * 2
- 线程名前缀:“NettyServerNIOWorker”
this.eventLoopGroupWorker = new NioEventLoopGroup(
// System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//默认值cpu核数*2
nettyServerConfig.getServerWorkerThreads(),
new NamedThreadFactory(
// CONFIG.getConfig("transport.threadFactory.workerThreadPrefix",
// enableEpoll() ? EPOLL_WORKER_THREAD_PREFIX : DEFAULT_NIO_WORKER_THREAD_PREFIX);
// 默认值 NettyServerNIOWorker ,没有启用 epoll
nettyServerConfig.getWorkerThreadPrefix(),
//System.getProperty("transport.serverWorkerThreads", String.valueOf(WORKER_THREAD_SIZE)));//默认值 cpu核数*2
nettyServerConfig.getServerWorkerThreads())
);
3.3、AbstractNettyRemoting#messageExecutor
此线程池处理客户端的 request 消息,关键参数信息如下:
- 线程数:50 ~ 500
- keepAlive:500 秒
- 线程名字前缀: "ServerHandlerThread"
- 队列长度: 500
- 拒绝策略:CallerRunsPolicy(),饱和的情况下,调用者来执行该任务,即 Netty 的 I/O 线程
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(
//Integer.parseInt(System.getProperty("transport.minServerPoolSize", "50"));
NettyServerConfig.getMinServerPoolSize(),
//Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
NettyServerConfig.getMaxServerPoolSize(),
//Integer.parseInt(System.getProperty("transport.keepAliveTime", "500"));
NettyServerConfig.getKeepAliveTime(),
TimeUnit.SECONDS,
//Integer.parseInt(System.getProperty("transport.maxTaskQueueSize", "500"));
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory(
"ServerHandlerThread",
//Integer.parseInt(System.getProperty("transport.maxServerPoolSize", "500"));
NettyServerConfig.getMaxServerPoolSize()),
//饱和的情况下,调用者来执行该任务,即Netty的IO线程
new ThreadPoolExecutor.CallerRunsPolicy()
);
3.4、NettyRemotingServer#branchResultMessageExecutor
此线程池处理客户端的 response 消息,关键参数信息如下:
- 线程数:cpu 核数2 ~ cpu 核数2
- keepAlive:500 秒
- 线程名字前缀: "BranchResultHandlerThread"
- 队列长度: 20000
- 拒绝策略:
CallerRunsPolicy()
,饱和的情况下,调用者来执行该任务,即 Netty 的 IO 线程
private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(
//System.getProperty("transport.minBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu核数*2
NettyServerConfig.getMinBranchResultPoolSize(),
//System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu核数*2
NettyServerConfig.getMaxBranchResultPoolSize(),
// System.getProperty("transport.keepAliveTime", "500"),默认值500
NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(
//System.getProperty("transport.maxTaskQueueSize", "20000"),默认值 20000
NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory(
// 分支响应消息的处理线程的名字前缀 BranchResultHandlerThread
"BranchResultHandlerThread",
// System.getProperty("transport.maxBranchResultPoolSize", String.valueOf(WORKER_THREAD_SIZE))),默认值 cpu核数*2
NettyServerConfig.getMaxBranchResultPoolSize()
),
//饱和的情况下,调用者来执行该任务,即Netty的IO线程
new ThreadPoolExecutor.CallerRunsPolicy()
);
3.5、业务线程池如何处理消息
3.5.1、登记消息处理器
Seata 消息处理的核心逻辑是:定义好什么类型的消息,使用哪个消息处理器,这个消息处理器的消息处理逻辑在哪个线程池中执行。这个映射关系通过AbstractNettyRemoting#processorTable
来存储。
/**
* 可以接收什么类型的消息,以及使用哪个消息处理器和线程池来处理消息
* HashMap<消息类型, Pair<消息处理器, 线程池>>
* processor type {@link MessageType}
*/
protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);
各模块 Netty 组件启动前,通过AbstractNettyRemotingServer#registerProcessor
方法登记到这个结构中。
public void registerProcessor(int messageType, RemotingProcessor processor, ExecutorService executor) {
Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(messageType, pair);
}
拿 Seata Server 来说,如在ServerBootStrap
启动前,通过NettyRemotingServer#registerProcessor
注册好消息处理器。不同消息对应的处理器的线程池也不同,也有一些消息没有指定业务线程池(没必要),情况如下:
private void registerProcessor() {
// 1\. registry on request message processor
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
ShutdownHook.getInstance().addDisposable(onRequestProcessor);
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2\. registry on response message processor
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, branchResultMessageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, branchResultMessageExecutor);
// 3\. registry rm message processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4\. registry tm message processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5\. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
3.5.2、处理消息
当 Seata Server 收到客户端发送的 RPC 消息后,会进入AbstractNettyRemotingServer.ServerHandler#channelRead
中,在这里对消息类型简单判断后,委托给processMessage
处理。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof RpcMessage)) {
return;
}
// 收到消息后,委托给 processMessage 处理
processMessage(ctx, (RpcMessage) msg);
}
processMessage
中通过消息类型找到消息处理器进行业务层处理:
- 如果消息处理器有指定的业务线程池,在指定的业务线程池中处理消息
- 若消息处理器没有指定的业务线程池,则在 I/O 线程中直接处理。
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
...
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
// 通过消息类型找到消息处理器
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair != null) {
// 如果消息处理器有指定的业务线程池
if (pair.getSecond() != null) {
try {
// 在指定的业务线程池中处理消息
pair.getSecond().execute(() -> {
...
pair.getFirst().process(ctx, rpcMessage);
...
});
} catch (RejectedExecutionException e) {
...
}
} else {
try {
//若消息处理器没有指定的业务线程池,则在I/O现成中直接处理。
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
...
}
}
} else {
...
}
} else {
...
}
}
四、Seata client 端的 reactor 模式应用
Seata client 端也采用了 reactor
多线程模型,在初始化的时候有RmNettyRemotingClient
和TmNettyRemotingClient
两个对象,分别会创建各自的 Bootstrap
,RM 和 TM 各有自己的 I/O 线程池和业务线程池。
功能 | 线程池对象 | 备注 |
---|---|---|
处理 IO 事件 | NettyClientBootstrap#eventLoopGroupWorker |
|
处理业务消息 | AbstractNettyRemoting#messageExecutor |
源码里还有个NettyClientBootstrap#defaultEventExecutorGroup
,没看出来哪里有用。TmNettyRemotingClient#getInstance()
中构建了 TM 的业务线程池,赋值给NettyClientBootstrap#messageExecutor
,同样RmNettyRemotingClient#getInstance()
中构建了 RM 的业务线程池
4.1、NettyClientBootstrap#eventLoopGroupWorker
客户端此线程池关键信息如下:
- 线程数:1
- 线程名字前缀:
- TM:"NettyClientSelector_TMROLE"
- RM:"NettyClientSelector_RMROLE"
// 单I/O线程
this.eventLoopGroupWorker = new NioEventLoopGroup(
//CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
selectorThreadSizeThreadSize,
new NamedThreadFactory(
// CONFIG.getConfig("transport.threadFactory.clientSelectorThreadPrefix", "NettyClientSelector");
// 再拼上角色后默认值为:"NettyClientSelector_TMROLE"
getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()),
//CONFIG.getInt("transport.threadFactory.clientSelectorThreadSize", 1)
selectorThreadSizeThreadSize)
);
4.2、AbstractNettyRemoting#messageExecutor
TmNettyRemotingClient#getInstance()
和RmNettyRemotingClient#getInstance()
创建各自的线程池,配置并不相同。
1)TmNettyRemotingClient#getInstance()
中所创建线程池的关键信息如下:
- 线程数:默认值是 cpu 核数 _ 2 ~ cpu 核数 _ 2
- keepAlive:Integer.MAX_VALUE 秒
- 线程名字前缀:rpcDispatch_TMROLE
- 队列长度: 2000
- 拒绝策略:
runsOldestTaskPolicy()
,饱和的情况下,添加新任务并由投递任务的线程运行最早的任务。
public static TmNettyRemotingClient getInstance() {
if (instance == null) {
synchronized (TmNettyRemotingClient.class) {
if (instance == null) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
// 自定义TM业务线程池
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2
nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2
KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//2000
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),// TM的线程名是:rpcDispatch_TMROLE
nettyClientConfig.getClientWorkerThreads()),// 默认是cpu核数 * 2
RejectedPolicies.runsOldestTaskPolicy());//添加新任务并由主线程运行最早的任务。
instance = new TmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
2)RmNettyRemotingClient#getInstance()
中所创建线程池的关键信息如下:
- 线程数:默认是 cpu 核数 _ 2 ~ cpu 核数 _ 2
- keepAlive:Integer.MAX_VALUE 秒
- 线程名字前缀:rpcDispatch_RMROLE
- 队列长度: 20000
- 拒绝策略:
CallerRunsPolicy()
,饱和的情况下,调用者来执行该任务,即 Netty 的 IO 线程。
public static RmNettyRemotingClient getInstance() {
if (instance == null) {
synchronized (RmNettyRemotingClient.class) {
if (instance == null) {
NettyClientConfig nettyClientConfig = new NettyClientConfig();
// 自定义RM业务线程池
final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2
nettyClientConfig.getClientWorkerThreads(), // 默认是cpu核数 * 2
KEEP_ALIVE_TIME, TimeUnit.SECONDS,//Integer.MAX_VALUE;
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),//20000
new NamedThreadFactory(
nettyClientConfig.getRmDispatchThreadPrefix(),// RM的线程名是:rpcDispatch_RMROLE,
nettyClientConfig.getClientWorkerThreads()),// 默认是cpu核数 * 2
new ThreadPoolExecutor.CallerRunsPolicy());////饱和的情况下,调用者来执行该任务,即Netty的IO线程
instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
}
}
}
return instance;
}
4.3、消息处理
TmNettyRemotingClient
和RmNettyRemotingClient
在init()
方法中会调用registerProcessor()
方法注册各自的 RPC 消息处理器。收到 RPC 消息后就由这些处理器+对应的线程池做后续处理,消息的相关业务属性在后续的事务流程中介绍。
五、支撑特殊能力的业务线程池
1)AbstractNettyRemotingClient#mergeSendExecutorService
用于批量发送请求,多个消息合并,减少通信次数。实现逻辑比较清晰,当允许发送批量消息时,消息首先分桶保存到 basketMap,在一个周期性的无线循环中,把 basketMap 中的消息队列取出来,把每个队列的消息都放到 mergeMessage 中,最后把 mergeMessage 发送出去。
- 线程数:1
- 线程名前缀:”rpcMergeMessageSend“
-
AbstractNettyRemotingClient
中功能相关的属性介绍:-
Object mergeLock
:发送请求的锁对象。 -
Map<Integer, MergeMessage> mergeMsgMap
:当发送消息的类型是 MergeMessage,那么就将消息保存到 mergeMsgMap。 -
ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap
:当允许发送批量消息时,消息首先分桶保存到 basketMap,然后通过定时任务将保存 basketMap 的消息发送出去。basketMap 的是服务器的地址,value 是保存的发送个服务器的消息。按照地址分桶是将要发给同一个服务器的多个消息合并到一个MergedWarpMessage
后发送。
-
- 有配置开关,默认值如下:
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
对应的关键代码逻辑如下:
- 在
AbstractNettyRemotingClient#sendSyncRequest
中,同步发送时将消息缓存起来,默认配置看只有 RM 开启了消息合并发送,另外同步发送超时设定,默认 TM 30 秒,RM 15 秒。按照 IP 地址分桶,同一个目标实例的消息才可以合并发送
public Object sendSyncRequest(Object msg) throws TimeoutException {
String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
// 同步发送超时设定,默认 TM 30秒,RM 15秒
long timeoutMillis = this.getRpcRequestTimeout();
RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
// send batch message
// put message into basketMap, @see MergedSendRunnable
// 默认只有RM开启了消息合并发送,TM 并未开启批发送
if (this.isEnableClientBatchSendRequest()) {
// send batch message is sync request, needs to create messageFuture and put it in futures.
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
futures.put(rpcMessage.getId(), messageFuture);
// put message into basketMap
// 按照目标地址分桶,同一个TC实例的消息才可以合并发送
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
return null;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
// 通知合并发送线程 有消息要发送,醒来干活儿
if (!isSending) {
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}
try {
// 阻塞等待消息的响应。
return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception exx) {
LOGGER.error("wait response error:{},ip:{},request:{}",
exx.getMessage(), serverAddress, rpcMessage.getBody());
if (exx instanceof TimeoutException) {
throw (TimeoutException) exx;
} else {
throw new RuntimeException(exx);
}
}
} else {
// 不合并发送的话,就获取指定IP的channel,并立即发送。
Channel channel = clientChannelManager.acquireChannel(serverAddress);
return super.sendSync(channel, rpcMessage, timeoutMillis);
}
}
- 在
AbstractNettyRemotingClient#init
中构建线程池mergeSendExecutorService
,在这个线程池中执行消息的批处理(消息合并、消息发送)。
public void init() {
...
// 通过线程池有1个线程,执行消息合并发送
if (this.isEnableClientBatchSendRequest()) {
mergeSendExecutorService = new ThreadPoolExecutor(
MAX_MERGE_SEND_THREAD,//1
MAX_MERGE_SEND_THREAD,//1
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(
//TM : rpcMergeMessageSend_TMROLE
//RM : rpcMergeMessageSend_RMROLE
//SERVER : rpcMergeMessageSend_SERVERROLE
getThreadPrefix(),
MAX_MERGE_SEND_THREAD)//1
);
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}
- 批处理任务
MergedSendRunnable
中,实现了消息合并和消息发送
private class MergedSendRunnable implements Runnable {
@Override
public void run() {
while (true) {
//mergeLock 用于生产-消费的协作
synchronized (mergeLock) {
try {
// MAX_MERGE_SEND_MILLS = 1,还有线程休眠的效果
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
isSending = true;
// 发送消息,消息是按照IP地址分组
basketMap.forEach((address, basket) -> {
if (basket.isEmpty()) {
return;
}
MergedWarpMessage mergeMessage = new MergedWarpMessage();
//如果basket队列不为空,将其中的消息全取出来,添加到mergeMessage中
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
// debug 打印本次发送的消息个数和每个消息的Id,以及此时在futures中做超时管控的所有消息的Id,
// 两个消息Id比对,可知道消息积压情况9666
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// 获取指定地址的channel对象,异步发送消息
// 发送批量消息是同步的请求,但是这里不需要得到返回的值,在消息保存到basketMap之前,已经创建了messageFuture了,
// 返回值将会从ClientOnResponseProcessor中得到
sendChannel = clientChannelManager.acquireChannel(address);
// 因为原始消息的发送已经加入过超时管控,所以批量发送环节不再需要加入额外的超时控制
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
destroyChannel(address, sendChannel);
}
// fast fail
// 发生异常,快速将保存在mergeMessage的消息清理掉
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
});
isSending = false;
}
}
2)AbstractNettyRemoting#timerExecutor
Netty 的 I/O 操作异步的,RPC 消息的发送操作会对应一个 Future
对象,在 Seata 中这个 Futrue
对象被封装为 MessageFuture
,需同步发送的消息,其对应的 MessageFuture
被放入 map 缓存起来,当收到消息的 response 后,将消息从 map 中移除。AbstractNettyRemoting#timerExecutor
里的这个线程定时巡检 map 中的消息,若超时未收到 response 则认定为发送超时。
- 线程数:1
- 线程名前缀:”timeoutChecker“
- scheduleAtFixedRate :延迟 3 秒,频率 3 秒
-
AbstractNettyRemoting
中的功能相关的属性介绍:-
ScheduledExecutorService timerExecutor
:执行定时任务,消息发送以后,到了过期时间还没有返回,则会对消息进行清理。 -
ConcurrentHashMap<Integer, MessageFuture> futures
:保存着不同消息,timerExecutor 会清理 futures 中过期的消息。
-
对应的关键代码逻辑如下:
- 构建定时任务的线程池
AbstractNettyRemoting#timerExecutor
,只用 1 个线程
/**
* 定时器,用于巡检消息的发送是否超时
*/
protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("timeoutChecker", 1, true));
复制代码
- 通过
AbstractNettyRemoting#sendSync
同步发送消息,构建MessageFuture
并放入futures
这个 map 中,发送过程配置监听器 用于处理channel
异常,指定失败原因并从futures
中移除,还要销毁channel
。
protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
...
// 构建 MessageFuture
MessageFuture messageFuture = new MessageFuture();
messageFuture.setRequestMessage(rpcMessage);
messageFuture.setTimeout(timeoutMillis);
// 放入 futures 这个map中
futures.put(rpcMessage.getId(), messageFuture);
//检查通道是否可以写
channelWritableCheck(channel, rpcMessage.getBody());
String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
//在请求发送之前执行钩子
doBeforeRpcHooks(remoteAddr, rpcMessage);
// 发送请求,并配置监听器 用于处理channel异常
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
// 这里响应不成功,基本是channel不正常了
if (!future.isSuccess()) {
//移除消息
MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
if (messageFuture1 != null) {
messageFuture1.setResultMessage(future.cause());
}
//响应不成功,则销毁channel
destroyChannel(future.channel());
}
});
...
//获取响应结果
Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
//响应之后执行钩子
doAfterRpcHooks(remoteAddr, rpcMessage, result);
...
}
- 正常收到 response 后,给
MessageFuture
对象赋值,从futures
中移除,如ClientOnResponseProcessor#process
中的实现
@Override
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
...
// 从futures中移除
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (messageFuture != null) {
// 赋值结果
messageFuture.setResultMessage(rpcMessage.getBody());
}
}
- 在
AbstractNettyRemoting#init
中开启定时任务,巡检出futures
这个 map 中的超时对象后从futures
中移除,不再检查,并指定结果为TimeoutException
public void init() {
// 检测消息同步发送(sendSync(xxx))是否超时,
// 定时任务默认是延迟3秒,间隔3秒
timerExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {
MessageFuture future = entry.getValue();
if (future.isTimeout()) {
// 如果过期了则将发送结果设置为TimeoutException
// 从futures中移除,不再检查
futures.remove(entry.getKey());
RpcMessage rpcMessage = future.getRequestMessage();
future.setResultMessage(new TimeoutException(String
.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());
}
}
}
nowMills = System.currentTimeMillis();
}
}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
}
还有线程池跟服务注册发现和建连相关,会后边篇章再介绍。