- SOFABolt 提供了两种协议 RpcProtocol 和 RpcProtocolV2,两种协议都有相应的5个重要属性;Bolt 针对这两种协议分别提供了不同的编解码器。
- ProtocolManager 是一个 Protocol 协议容器
- RpcServer 服务端启动或者 RpcClient 客户端启动的时候,都会分别创建各自的 远程调用执行类(RpcServerRemoting 或者 RpcClientRemoting),此时会执行其父类 RpcRemoting 的静态块,在静态块中实现类 Protocol 协议的实例化 + 添加到 ProtocolManager 容器中
一、使用姿势
--------------------------String addr------------------------------
String addr = "127.0.0.1:8888?_PROTOCOL=2&_VERSION=2";
String res = (String) client.invokeSync(addr, req, 3000);
--------------------------Url url------------------------------
Url url = new Url(ip, port);
url.setProtocol(RpcProtocolV2.PROTOCOL_VERSION_2);
url.setVersion(RpcProtocolV2.PROTOCOL_VERSION_2);
url.setConnNum(1); // must set, default 0
String res = (String) client.invokeSync(url, req, 3000);
上述两种调用方式是最常用的两种。(还有一种直接通过 Connection 对象进行调用),我们以第一种为例(实际上第一种的底层原理最终会调用到第二种,第二种的底层是调用第三种,即 addr -> url -> connection
),推荐尽量使用第一种,因为第一种方式会提供比较多的默认值,例如connNum,在直接使用url方式时需要手动设置,否则connNum为0,则不会创建连接。
客户端发起调用
- 在 addr 上添加参数
_PROTOCOL=2&_VERSION=2
,之后在调用过程中,会将这两个参数解析出来并设置到数据总线 com.alipay.remoting.Url 的 protocol 属性(protocolCode) + version 属性(protocolVersion)- 在创建连接的时候将 Url 中的 protocolCode 和 protocolVersion 设置到 netty channel的 附加属性中
- 根据 netty channel 中的 protocolCode 和 protocolVersion 进行编码
服务端处理请求
- 服务端接收到请求后,首先从发送来的 ByteBuf 数据中解码获取 protocolCode 和 protocolVersion,
- 之后将 protocolCode 和 protocolVersion 设置到客户端 channel 所对应的服务端的 netty channel 的附加属性中,
- 最后根据 netty channel 中的 protocolCode 和 protocolVersion 进行相应的解码操作
二、源码分析
源码分析分为两部分:
第一部分:初始化协议相关组件
第二部分:调用过程中协议的使用
2.1 初始化协议相关组件
2.1.1 两种协议的定义
RpcProtocol 协议定义
请求命令(协议头长度:22 byte)
0 1 2 4 6 8 10 12 14 16
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|proto| type| cmdcode |ver2 | requestId |codec| timeout | classLen |
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|headerLen | contentLen | ... ... |
+-----------+-----------+-----------+ +
| className + header + content bytes |
+ +
| ... ... |
+-----------------------------------------------------------------------------------------------+
ProtocolCode :这个字段是必须的。因为需要根据 ProtocolCode 来进入不同的核心编解码器。该字段可以在想换协议的时候,方便的进行更换。
RequestType :请求类型,request / response / oneway 三者之一。oneway 之所以需要单独设置,是因为在处理响应时,需要做特殊判断,来控制响应是否回传。
CommandCode :请求命令类型,request / response / heartbeat 三者之一。
CommandVersion :请求命令版本号。该字段用来区分请求命令的不同版本。如果修改 Command 版本,不修改协议,那么就是纯粹代码重构的需求;除此情况,Command 的版本升级,往往会同步做协议的升级。
RequestId :请求 ID,该字段主要用于异步请求时,保留请求存根使用,便于响应回来时触发回调。另外,在日志打印与问题调试时,也需要该字段。
Codec :序列化器。该字段用于保存在做业务的序列化时,使用的是哪种序列化器。通信框架不限定序列化方式,可以方便的扩展。
Timeout :超时字段,客户端发起请求时,所设置的超时时间。
ClassLen :业务请求类名长度
HeaderLen :业务请求头长度
ContentLen :业务请求体长度
ClassName :业务请求类名。需要注意类名传输的时候,务必指定字符集,不要依赖系统的默认字符集。曾经线上的机器,因为运维误操作,默认的字符集被修改,导致字符的传输出现编解码问题。而我们的通信框架指定了默认字符集,因此躲过一劫。
HeaderContent :业务请求头
BodyContent :业务请求体
响应命令(协议头长度:20 byte)
0 1 2 3 4 6 8 10 12 14 16
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|proto| type| cmdcode |ver2 | requestId |codec|respstatus | classLen |headerLen |
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
| contentLen | ... ... |
+-----------------------+ +
| className + header + content bytes |
+ +
| ... ... |
+-----------------------------------------------------------------------------------------------+
- ResponseStatus :响应码。从字段精简的角度,我们不可能每次响应都带上完整的异常栈给客户端排查问题,因此,我们会定义一些响应码,通过编号进行网络传输,方便客户端定位问题。
RpcProtocolV2 协议定义
请求命令(协议头长度:24 byte)
0 1 2 4 6 8 10 11 12 14 16
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
|proto| ver1|type | cmdcode |ver2 | requestId |codec|switch| timeout |
+-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
|classLen |headerLen |contentLen | ... |
+-----------+-----------+-----------+-----------+ +
| className + header + content bytes |
+ +
| ... ... | CRC32(optional) |
+------------------------------------------------------------------------------------------------+
ProtocolVersion :确定了某一种通信协议后,我们还需要考虑协议的微小调整需求,因此需要增加一个 version 的字段,方便在协议上追加新的字段
Switch :协议开关,用于一些协议级别的开关控制,比如 CRC 校验,安全校验等。
CRC32 :CRC校验码,这也是通信场景里必不可少的一部分,而我们金融业务属性的特征,这个显得尤为重要。
响应命令(协议头长度:22 byte)
0 1 2 3 4 6 8 10 11 12 14 16
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
|proto| ver1| type| cmdcode |ver2 | requestId |codec|switch|respstatus | classLen |
+-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
|headerLen | contentLen | ... |
+-----------------------------------+ +
| className + header + content bytes |
+ +
| ... ... | CRC32(optional) |
+------------------------------------------------------------------------------------------------+
SOFABolt 针对 RpcProtocol 和 RpcProtocolV2 这两种协议,提供了两组不同的编解码器。
2.1.2 初始化协议组件
----------------------------- RpcRemoting 静态块 --------------------------
static {
RpcProtocolManager.initProtocols();
}
------------------------- RpcProtocolManager.initProtocols --------------------------
public static void initProtocols() {
ProtocolManager.registerProtocol(new RpcProtocol(), RpcProtocol.PROTOCOL_CODE);
ProtocolManager.registerProtocol(new RpcProtocolV2(), RpcProtocolV2.PROTOCOL_CODE);
}
------------------------- ProtocolManager.registerProtocol --------------------------
private static final ConcurrentMap<ProtocolCode, Protocol> protocols = new ConcurrentHashMap<ProtocolCode, Protocol>();
public static Protocol getProtocol(ProtocolCode protocolCode) {
return protocols.get(protocolCode);
}
public static void registerProtocol(Protocol protocol, byte... protocolCodeBytes) {
registerProtocol(protocol, ProtocolCode.fromBytes(protocolCodeBytes));
}
public static void registerProtocol(Protocol protocol, ProtocolCode protocolCode) {
if (null == protocolCode || null == protocol) {
throw new RuntimeException("Protocol: " + protocol + " and protocol code:"
+ protocolCode + " should not be null!");
}
Protocol exists = ProtocolManager.protocols.putIfAbsent(protocolCode, protocol);
if (exists != null) {
throw new RuntimeException("Protocol for code: " + protocolCode + " already exists!");
}
}
------------------------- RpcProtocol.构造器 --------------------------
public static final byte PROTOCOL_CODE = (byte) 1;
private static final int REQUEST_HEADER_LEN = 22;
private static final int RESPONSE_HEADER_LEN = 20;
private CommandEncoder encoder;
private CommandDecoder decoder;
private HeartbeatTrigger heartbeatTrigger;
private CommandHandler commandHandler;
private CommandFactory commandFactory;
public RpcProtocol() {
this.encoder = new RpcCommandEncoder();
this.decoder = new RpcCommandDecoder();
this.commandFactory = new RpcCommandFactory();
this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory);
this.commandHandler = new RpcCommandHandler(this.commandFactory);
}
------------------------- RpcProtocolV2.构造器 --------------------------
public static final byte PROTOCOL_CODE = (byte) 2;
/** version 1, is the same with RpcProtocol */
public static final byte PROTOCOL_VERSION_1 = (byte) 1;
/** version 2, is the protocol version for RpcProtocolV2 */
public static final byte PROTOCOL_VERSION_2 = (byte) 2;
/**
* in contrast to protocol v1,
* one more byte is used as protocol version, and another one is userd as protocol switch
*/
private static final int REQUEST_HEADER_LEN = 22 + 2;
private static final int RESPONSE_HEADER_LEN = 20 + 2;
private CommandEncoder encoder;
private CommandDecoder decoder;
private HeartbeatTrigger heartbeatTrigger;
private CommandHandler commandHandler;
private CommandFactory commandFactory;
public RpcProtocolV2() {
this.encoder = new RpcCommandEncoderV2();
this.decoder = new RpcCommandDecoderV2();
this.commandFactory = new RpcCommandFactory();
this.heartbeatTrigger = new RpcHeartbeatTrigger(this.commandFactory);
this.commandHandler = new RpcCommandHandler(this.commandFactory);
}
代码较为简单,扩展性也很强,我们可以基于 Protocol 接口创建协议实现,之后将该实现注册到 ProtocolManager 中,最后在使用的时候根据传递的 protocolCode 就可以选择相应的协议实现了,在编解码的时候也可以根据 protocolVersion 做一些协议内的字段的细小调整。 - 策略模式
2.2 调用过程中协议的使用
String addr = "127.0.0.1:8888?_PROTOCOL=2&_VERSION=2";
String res = (String) client.invokeSync(addr, req, 3000);
以 String addr 形式为例。
------------------------- RpcRemoting.invokeSync --------------------------
public Object invokeSync(String addr, Object request, InvokeContext invokeContext, int timeoutMillis) {
// 将 addr 解析为 url 数据总线
Url url = this.addressParser.parse(addr);
// 转化为 url 形式的调用(url 形式的调用最终会转为 Connection形式的调用)
return this.invokeSync(url, request, invokeContext, timeoutMillis);
}
------------------------- RpcClientRemoting.invokeSync --------------------------
public Object invokeSync(Url url, Object request, InvokeContext invokeContext, int timeoutMillis {
final Connection conn = getConnectionAndInitInvokeContext(url, invokeContext);
this.connectionManager.check(conn);
return this.invokeSync(conn, request, invokeContext, timeoutMillis);
}
------------------------- RpcAddressParser.parse 轮廓 --------------------------
public static ConcurrentHashMap<String, SoftReference<Url>> parsedUrls = new ConcurrentHashMap<String, SoftReference<Url>>();
public Url parse(String url) {
// 1、从缓存获取 url
Url parsedUrl = this.tryGet(url);
if (null != parsedUrl) {
return parsedUrl;
}
String ip = null;
String port = null;
Properties properties = null;
// 分解获取 ip
ip = "127.0.0.1"
// 分解获取 port
port = 8888
// 分解获取设置属性
properties = new Properties();
properties.put("_PROTOCOL", "2");
properties.put("_VERSION", "2");
// 创建 url
parsedUrl = new Url(url, ip, Integer.parseInt(port), properties);
// 将 url#properties 中的键值对获取并设置到 url 的独立属性中(如果没有键值对,这里会设置默认值)
this.initUrlArgs(parsedUrl);
// 加入缓存
Url.parsedUrls.put(url, new SoftReference<Url>(parsedUrl));
return parsedUrl;
}
public void initUrlArgs(Url url) {
// 从 url#properties 属性中获取 _PROTOCOL
String protocolStr = url.getProperty(RpcConfigs.URL_PROTOCOL);
byte protocol = RpcProtocol.PROTOCOL_CODE;
if (StringUtils.isNotBlank(protocolStr)) {
protocol = Byte.parseByte(protocolStr);
}
url.setProtocol(protocol);
// 从 url#properties 属性中获取 _VERSION
String versionStr = url.getProperty(RpcConfigs.URL_VERSION);
byte version = RpcProtocolV2.PROTOCOL_VERSION_1;
if (StringUtils.isNotBlank(versionStr)) {
version = Byte.parseByte(versionStr);
}
url.setVersion(version);
// 除了 protocolCode 和 protocolVersion 之外,这里还会做 _CONNECTTIMEOUT 连接超时 + 每个addr的 _CONNECTIONNUM 连接数量 + _CONNECTIONWARMUP 是否需要做连接预热 三个配置
}
------------------------- Url 数据总线 --------------------------
/** origin url */
private String originUrl;
/** ip, can be number format or hostname format*/
private String ip;
/** port, should be integer between (0, 65535]*/
private int port;
/** unique key of this url */
private String uniqueKey; // 如果在构造器没有传递,默认为 ip:port
/** URL args: timeout value when do connect */
private int connectTimeout; // 默认为 1000 ms
/** URL args: protocol */
private byte protocol; // 默认为 RpcProtocol.PROTOCOL_CODE = 1
/** URL args: version */
private byte version = RpcProtocolV2.PROTOCOL_VERSION_1;
/** URL agrs: connection number */
private int connNum; // 默认为 1
/** URL agrs: whether need warm up connection */
private boolean connWarmup; // 默认为 false
/** URL agrs: all parsed args of each originUrl */
private Properties properties;
上述介绍了将 String addr 转化为 Url 的代码。接下来,就去创建连接,然后发起调用。
------------------------- AbstractConnectionFactory.createConnection --------------------------
public Connection createConnection(Url url) throws Exception {
// 创建 netty channel
Channel channel = doCreateConnection(url.getIp(), url.getPort(), url.getConnectTimeout());
// 包装 channel + 将 url 的属性赋值给 Connection 的属性 + 为 channel 添加 附属属性
Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url);
//触发 ConnectionEventType.CONNECT 事件
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
return conn;
}
------------------------- Connection 核心方法 --------------------------
public Connection(Channel channel, ProtocolCode protocolCode, byte version, Url url) {
this(channel, url);
this.protocolCode = protocolCode;
this.version = version;
this.init();
}
public Connection(Channel channel, Url url) {
this(channel);
this.url = url;
this.poolKeys.add(url.getUniqueKey());
}
public Connection(Channel channel) {
this.channel = channel;
// 将当前的 Connection 对象作为附属属性传递给 channel
this.channel.attr(CONNECTION).set(this);
}
private void init() {
this.channel.attr(HEARTBEAT_COUNT).set(new Integer(0));
// 设置 PROTOCOL 附属属性
this.channel.attr(PROTOCOL).set(this.protocolCode);
// 设置 VERSION 附属属性
this.channel.attr(VERSION).set(this.version);
this.channel.attr(HEARTBEAT_SWITCH).set(true);
}
这样就将 url 中的属性值设置给了 Connection 及其 channel 的附属属性中。之后发送请求时,编码器再从 channel 中解析出 protocolCode 和 version,进行相应的编码操作。
------------------------- ProtocolCodeBasedEncoder.encode --------------------------
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
// 从 channel 中获取 protocolCode
Attribute<ProtocolCode> att = ctx.channel().attr(Connection.PROTOCOL);
ProtocolCode protocolCode;
if (att == null || att.get() == null) {
protocolCode = this.defaultProtocolCode;
} else {
protocolCode = att.get();
}
// 从 ProtocolManager 协议容器中根据 protocolCode 获取 Protocol 实例
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
// 编码过程根据 protocolVersion 的不同,会做出相应字段的调整
protocol.getEncoder().encode(ctx, msg, out);
}
当服务端接收到消息后,从 ByteBuf 中进行解码。
------------------------- ProtocolCodeBasedDecoder.decode --------------------------
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
in.markReaderIndex();
// 1. 从 ByteBuf 中解析出 protocolCode
ProtocolCode protocolCode = decodeProtocolCode(in);
if (null != protocolCode) {
// 2. 从 ByteBuf 中解析出 protocolVersion
byte protocolVersion = decodeProtocolVersion(in);
if (ctx.channel().attr(Connection.PROTOCOL).get() == null) { // 第一次请求为 null,后续该 channel 就有值了
// 3. 将解析出来的 protocolCode 和 protocolVersion 设置到客户端 channel 所对应的服务端 channel 上
ctx.channel().attr(Connection.PROTOCOL).set(protocolCode);
if (DEFAULT_ILLEGAL_PROTOCOL_VERSION_LENGTH != protocolVersion) {
ctx.channel().attr(Connection.VERSION).set(protocolVersion);
}
}
// 4. 根据 protocolCode 获取 Protocol,之后进行解码(解码过程根据 protocolVersion 的不同,会做出相应字段的调整)
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
if (null != protocol) {
in.resetReaderIndex();
protocol.getDecoder().decode(ctx, in, out);
}
}
}
protected ProtocolCode decodeProtocolCode(ByteBuf in) {
if (in.readableBytes() >= protocolCodeLength) {
byte[] protocolCodeBytes = new byte[protocolCodeLength];
in.readBytes(protocolCodeBytes);
return ProtocolCode.fromBytes(protocolCodeBytes);
}
return null;
}
protected byte decodeProtocolVersion(ByteBuf in) {
if (in.readableBytes() >= DEFAULT_PROTOCOL_VERSION_LENGTH) {
return in.readByte();
}
return DEFAULT_ILLEGAL_PROTOCOL_VERSION_LENGTH;
}
关于编解码细节,见《编解码分析》;
关于连接细节,见 SOFABolt 源码分析12 - Connection 连接管理设计