dubbo是一个分布式服务框架,能避免单点故障和支持服务的横向扩容。一个服务通常会部署多个实例,同时一个服务能注册到多个注册中心。如何从多个服务 Provider 组成的集群中挑选出一个进行调用,就涉及到一个负载均衡的策略。
1、dubbo负载均衡实现说明
dubbo服务调用流程图:
从以上调用流程图可知,dubbo的负载均衡主要在客户端实现,并通过封装Cluster、Directory、LoadBalance相关接口实现。
1.1、Cluster、Directory、Router、LoadBalance关系
各组件关系说明:
- 这里的Invoker是Provider的一个可调用Service的抽象,Invoker封装了Provider地址及Service接口信息。
- Directory代表多个Invoker,可以把它看成List,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。
- Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。
- Router负责从多个Invoker中按路由规则选出子集,比如读写分离,应用隔离等。
- LoadBalance负责从多个Invoker中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选。
1.2、客户端负载均衡源码分析
1.2.1、ReferenceConfig中负载均衡的封装
客户端在进行代理处理时,在如下地方对负载均衡相关进行封装:
包路径:dubbo-config->dubbo-config-api
类名:ReferenceConfig
方法名:createProxy()
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = cluster.join(new StaticDirectory(invokers));
}
}
处理流程:
- 若只有一个注册中心,则直接调用RegistryProtocol.refer()进行服务引用处理,将其封装成invoker,RegistryProtocol.refer()内部对负载均衡进行了封装;
- 若为多个注册中心,先分别对各注册中心进行服务引用处理,然后再应用Cluster.join()及StaticDirectory将多个注册中心的invoker再封装成一个invoker来达到对外透明;
1.2.2、RegistryProtocol中负载均衡的封装
RegistryProtocol中对单个注册中心进行了负载均衡的封装:
包路径:dubbo-registry->dubbo-registry-api
类名:RegistryProtocol
方法名:doRefer()
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
registry.register(getRegisteredConsumerUrl(subscribeUrl, url));
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
主要处理流程:
- 实例化RegistryDirectory类型的Directory,RegistryDirectory有如下功能,订阅感兴趣的服务提供者信息,当提供者或路由变更时,动态变更本地的提供者列表及路由过滤链;并根据Router、LoadBalance动态选择调用的服务提供者;
- 将消费端注册到注册中心;
- 构建Directory中的路由过滤链;
- 在Directory中对注册中心中对应的提供者相关的配置、提供者、路由规则进行订阅;
- 用Cluster对Directory进行封装;
1.2.3、RegistryDirectory中负载均衡处理的封装
RegistryDirectory主要封装了订阅、信息变更通知处理、获取服务提供者信息等;
包路径:dubbo-registry->dubbo-registry-api
类名:RegistryDirectory
(1)、订阅感兴趣的信息
源码如下:
public void subscribe(URL url) {
setConsumerUrl(url);
consumerConfigurationListener.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
本处主要对消费端的订阅进行了处理,消费端向注册中心订阅三个信息:配置信息、服务提供者、路由信息;当这三个信息有任何变更,本地就会接到通知,并进行处理;
(2)、配置信息、服务提供者、路由信息变更通知处理
源码如下:
public synchronized void notify(List<URL> urls) {
List<URL> categoryUrls = urls.stream()
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.toList());
/**
* TODO Try to refactor the processing of these three type of urls using Collectors.groupBy()?
*/
this.configurators = Configurator.toConfigurators(classifyUrls(categoryUrls, UrlUtils::isConfigurator))
.orElse(configurators);
toRouters(classifyUrls(categoryUrls, UrlUtils::isRoute)).ifPresent(this::addRouters);
// providers
refreshOverrideAndInvoker(classifyUrls(categoryUrls, UrlUtils::isProvider));
}
当有信息变更时,本方法就会被调用,会根据变更的配置或路由信息或服务提供者进行相应处理;若路由信息变更,则重新构建路由过滤链;若服务提供者变更,则重构刷新本地缓存的服务提供者列表;
(3)、获取服务提供者列表
源码如下:
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
", please check status of providers(disabled, not registered or in blacklist).");
}
if (multiGroup) {
return this.invokers == null ? Collections.emptyList() : this.invokers;
}
List<Invoker<T>> invokers = null;
try {
// Get invokers from cache, only runtime routers will be executed.
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
// FIXME Is there any need of failing back to Constants.ANY_VALUE or the first available method invokers when invokers is null?
/*Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
invokers = localMethodInvokerMap.get(methodName);
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}*/
return invokers == null ? Collections.emptyList() : invokers;
}
当负载均衡获取可用的服务提供者列表时会调用此方法,此方法主要根据注册中心提供的服务提供者列表,并利用路由规则对提供者列表进行过滤。
1.2.4、FailoverCluster中负载均衡处理的封装
包路径:dubbo-cluster
dubbo中默认的Cluster实现为FailoverCluster,其主要是通过join()方法将Directory进行封装的,而实际的处理是通过FailoverClusterInvoker实现的,客户端调用服务时就是通过此invoker.invoke()进行实际调用处理的;
源码如下:
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
FailoverClusterInvoker.invoke()实现:
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
主要处理流程:
- 获取rpc上下文信息,将这些信息添加到调用附加参数信息中;
- 调用list(),实际调用的是Directory.list()获取可用的服务提供者列表,此列表是通过Directory中的路由器过滤后的列表;
- 调用initLoadBalance(),获取配置的LoadBalance,若为配置,则使用默认的LoadBalance实现RandomLoadBalance;
- 调用doInvode()进行实际的服务提供者选取及服务调用等;
FailoverClusterInvoker.doInvoke()实现:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
主要处理流程为:调用select()选取服务提供者并调用;select()中主要调用LoadBalance.selct()进行选择;
2、Directory实现分析
Directory代表多个Invoker,可以把它看成List,但与List不同的是,它的值可能是动态变化的,比如注册中心推送变更。Cluster将Directory中的多个Invoker伪装成一个Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个。
Directory接口类继承图:
RegistryDirectory:
RegistryDirectory实现了NotifyListener接口,因此他本身也是一个监听器,可以在服务变更时接受通知,消费方要调用远程服务,会向注册中心订阅这个服务的所有的服务提供方,订阅的时候会调用notify方法,进行invoker实例的重新生成,也就是服务的重新引用。在服务提供方有变动时,也会调用notify方法,有关notify方法在Dubbo中订阅和通知解析那篇文章中已经解释,不做重复。subscribe方法也不做重复解释。
StaticDirectory:
静态目录服务,当有多个注册中心时会使用此实现。
3、Cluster实现分析
Dubbo中的Cluster可以将多个服务提供方伪装成一个提供方,具体也就是将Directory中的多个Invoker伪装成一个Invoker,在伪装的过程中包含了容错的处理,负载均衡的处理和路由的处理。
Cluster主要实现的类继承图:
AbstractClusterInvoker主要实现类继承图:
集群的容错模式:
failover(默认):
- 失败自动切换,当出现失败,重试其他服务器。
- 通常用于读操作,但重试会带来更长延迟。
- 可通过retries=x来设置重试次数(不含第一次)。
failfast:
- 快速失败,只发起一次调用,失败理解报错。
- 通常用于非幂等性的写操作,比如新增记录。
failsafe:
- 失败安全,出现异常时,直接忽略;
- 通常用于写入审计日志等操作。
failback:
- 失败自动回复,后台记录失败请求,定时重发。
- 通常用于消息通知操作。
forking:
- 并行调用多个服务器,只要一个成功即返回。
- 通常用于实时性要求较高的读操作,但需要浪费更多服务资源。
- 可通过forks=x来设置最大并行数。
broadcast:
- 广播调用所有提供者,逐个调用,任意一台报错则报错。
- 通常用于通知所有提供者更新缓存或日志等本地资源信息。
4、LoadBalance实现分析
random:
- 随机,按权重设置随机概率。
- 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
roundrobin:
- 轮询,按公约后的权重设置轮询比率。
- 存在慢的提供者累计请求问题,比如:第二台机器很慢,但没挂,当请求调用到第二台是就卡在那,久而久之,所有请求都卡在掉第二台上。
leastactive:
- 最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
- 使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
consistenthash:
- 一致性Hash,相同参数的请求总是发到同一提供者。
- 当某一台提供者挂掉时,原本发往该提供者的请求,基于虚拟节点,平摊到其他提供者,不会引起剧烈变动。
- 算法参见:https://en.wikipedia.org/wiki/Consistent_hashiing
- 缺省只对第一个参考Hash,如果要修改,请配置<dubbo:parameter key="hash.arguments" value="0,1" />
- 缺省用160份虚拟节点,如果要修改,请配置<dubbo:parameter key="hash.nodes" value="320" />
5、Router实现分析
dubbo的路由干的事,就是一个请求过来,dubbo依据配置的路由规则,计算出哪些提供者可以提供这次的请求服务。所以,它的优先级是在集群容错策略和负载均衡策略之前的。即先有路由规则遴选出符合条件的服务提供者然后,再在这些服务提供者之中应用负载均衡,集群容错策略。
Router接口继承图:
ScriptRouter:
脚本路由规则 支持 JDK 脚本引擎的所有脚本,比如:javascript, jruby, groovy 等,通过 type=javascript 参数设置脚本类型,缺省为 javascript。
ConditionRouter:
条件路由主要就是根据dubbo管理控制台配置的路由规则来过滤相关的invoker,当我们对路由规则点击启用的时候,就会触发RegistryDirectory类的notify方法,其会重构本地路由调用链,而当从Directory中获取服务提供者的list时,会利用此路由规则将提供者列表进行过滤;