- 首先,我们都知道 dubbo 调用,consumer 可以配置接口超时时间,provider 也可以配置超时时间,但是最终会以 consumer 为准,这里还有一个 MethodConfig 的概念。
- 具体的优先级体现为 conusmer.MethodConfig.timeout > provider.MethodConfig.timeout > conusmer.ServiceBean.timeout
源码
- provider.xml
<bean id="innerStockServiceExport" class="com.alibaba.dubbo.config.spring.ServiceBean">
<property name="interface" value="com.yit.stock.api.inner.InnerStockService"/>
<property name="ref" ref="innerStockServiceImpl"/>
<property name="application" ref="dubboApplicationConfig"/>
<property name="registry" ref="dubboRegistryConfig"/>
<property name="protocol" ref="dubboProtocolConfig"/>
<property name="version" value="${dubbo.reference.version}"/>
<property name="timeout" value="${dubbo.export.timeout}"/>
<property name="retries" value="0"/>
<property name="methods">
<list>
<bean class="com.alibaba.dubbo.config.MethodConfig">
<property name="name" value="initVirtualStock"/>
<property name="timeout" value="4000"/>
</bean>
<bean class="com.alibaba.dubbo.config.MethodConfig">
<property name="name" value="batchFreezeStock"/>
<property name="timeout" value="6000"/>
</bean>
<bean class="com.alibaba.dubbo.config.MethodConfig">
<property name="name" value="batchCancelStock"/>
<property name="timeout" value="6000"/>
</bean>
<bean class="com.alibaba.dubbo.config.MethodConfig">
<property name="name" value="batchConsumeStock"/>
<property name="timeout" value="6000"/>
</bean>
<bean class="com.alibaba.dubbo.config.MethodConfig">
<property name="name" value="cancelForConsumedOrder"/>
<property name="timeout" value="30000"/>
</bean>
</list>
</property>
</bean>
- provider.xml
<bean id="innerStockService" class="com.alibaba.dubbo.config.spring.ReferenceBean">
<property name="interface" value="com.yit.stock.api.inner.InnerStockService"/>
<property name="application" ref="dubboApplicationConfig"/>
<property name="registry" ref="dubboRegistryConfig"/>
<property name="timeout" value="3000"/>
<property name="check" value="false"/>
<property name="version" value="${dubbo.reference.version}"/>
<property name="methods">
<list>
<bean class="com.alibaba.dubbo.config.MethodConfig">
<property name="name" value="batchFreezeStock"/>
<property name="timeout" value="16000"/>
</bean>
<bean class="com.alibaba.dubbo.config.MethodConfig">
<property name="name" value="splitFreezeStock"/>
<property name="timeout" value="6000"/>
</bean>
<bean class="com.alibaba.dubbo.config.MethodConfig">
<property name="name" value="finishStockIn"/>
<property name="timeout" value="16000"/>
</bean>
</list>
</property>
</bean>
加载本地的配置信息构建 URL
com.alibaba.dubbo.config.ReferenceConfig#init
private void init() {
if (initialized) {
return;
}
initialized = true;
..... 省略一部分
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
String prifix = StringUtils.getServiceKey(map);
// 这里拿到本地 consumer.xml 中 MethodConfig 的信息初始化 key: method.timeout, value: 毫秒数到 map
if (methods != null && methods.size() > 0) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
appendAttributes(attributes, method, prifix + "." + method.getName());
checkAndConvertImplicitConfig(method, map, attributes);
}
}
//attributes通过系统context进行存储.
StaticContext.getSystemContext().putAll(attributes);
ref = createProxy(map);
}
-
我们可以看到此时的map只有comsumer.xml 的配置信息
与远程配置进行 merge
其实我们关注的就是 RegistryDirectory.mergeUrl 中的 ClusterUtils.mergeUrl(providerUrl, queryMap);
providerUrl :远程的接口配置
queryMap : 本地配置URL 中的 map
ClusterUtils.mergeUrl 中的实现就是依赖了 map.put() 的实现,先put provider,再put local。最终达到了以本地MethodConfig 为准的目的
com.alibaba.dubbo.registry.integration.RegistryDirectory#mergeUrl
/**
* 合并url参数 顺序为override > -D >Consumer > Provider
* @param providerUrl
* @param overrides
* @return
*/
private URL mergeUrl(URL providerUrl){
// 其实我们关注的就是这一部分
providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // 合并消费端参数
List<Configurator> localConfigurators = this.configurators; // local reference
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
providerUrl = configurator.configure(providerUrl);
}
}
providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // 不检查连接是否成功,总是创建Invoker!
... 省略一部分
return providerUrl;
}
public static URL mergeUrl(URL remoteUrl, Map<String, String> localMap) {
Map<String, String> map = new HashMap<String, String>();
Map<String, String> remoteMap = remoteUrl.getParameters();
if (remoteMap != null && remoteMap.size() > 0) {
// 先put remote
map.putAll(remoteMap);
//线程池配置不使用提供者的
map.remove(Constants.THREAD_NAME_KEY);
map.remove(Constants.DEFAULT_KEY_PREFIX + Constants.THREAD_NAME_KEY);
... 省略一部分
}
if (localMap != null && localMap.size() > 0) {
// 再put local
map.putAll(localMap);
}
... 省略一部分
}
return remoteUrl.clearParameters().addParameters(map);
}
-
我们可以看到 merge 后的map 已经多出了一些provider.xml 的配置信息
获取 timeout 的地方
- 从URL 中的 parameters 中根据 method.timeout 或者 timeout 为key 取出对应的数值
// 重点就是这句话
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
public int getMethodParameter(String method, String key, int defaultValue) {
String methodKey = method + "." + key;
Number n = getNumbers().get(methodKey);
if (n != null) {
return n.intValue();
}
String value = getMethodParameter(method, key);
if (value == null || value.length() == 0) {
return defaultValue;
}
int i = Integer.parseInt(value);
getNumbers().put(methodKey, i);
return i;
}