Dubbo与spring整合、SPI拓展机制、服务暴露、服务引用、容错机制、预热。
Dubbo架构图(取自dubbo官网):
0. 加载、运行服务提供者。
1. 向注册中心注册自己提供的服务。
2. 向注册中心订阅自己所需的服务。
3. 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
4. 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
5. 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。
一、Dubbo融合Spring
Dubbo采用Spring的配置方式,透明化的接入应用,对应用没有任何API侵入,只需用Spring加载Dubbo的配置即可。
dubbo-config-api模块定义了dubbo中核心的配置,本文后续会对服务提供者ServiceConfig(暴露服务)、服务消费者ReferenceConfig(引用服务)进行主要分析。
dubbo-config-spring模块中,采用Spring中的Schema扩展将dubbo标签解析为相应的Bean:
spring.handlers指定了具体的dubbo标签处理者:DubboNamespaceHandler
http\://dubbo.apache.org/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler
其中DubboBeanDefinitionParser则是具体的将配置属性、继承属性注入到相应Bean中,解析相关的代码就不细说了。
二、Dubbo的SPI拓展机制
public class ServiceConfig<T> extends AbstractServiceConfig {
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
//...
在分析Dubbo源码过程中,发现很多使用ExtensionLoader来获取对象的地方,没搞明白这种加载方式可不利于继续阅读源码,就先让我们从这个SPI加载机制来入手吧。
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
ExtensionLoader.getExtensionLoader(Protocol.class)获取到的是Protocol接口类的一个Loader。Loader下面的getAdaptiveExtension()方法则是获取适配的类对象。Dubbo使用这样的方式来获取对象是为了更好的拓展性。
具体来看看是怎么获取到适配对象的吧(部分代码省略):
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
//省略 双重判空锁
instance = createAdaptiveExtension();//1、创建适配对象,往下看
cachedAdaptiveInstance.set(instance);
//省略
}
return (T) instance;
}
private T createAdaptiveExtension() {
//省略try catch块 2、继续往下看getAdaptiveExtensionClass方法
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
}
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();//3、获取所有的拓展Class对象及默认实现类,下面会做简要介绍
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}// 4、跳到createAdaptiveExtensionClass里创建适配类
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
getExtensionClasses方法会对META-INF/dubbo/internal/、META-INF/dubbo/、META-INF/services/ 三个文件夹下的文件进行扫描
比如当前Protocol加载的就是com.alibaba.dubbo.rpc.Protocol文件,文件中是Protocol接口的各个实现类的全路径。
其中有一种特殊的实现如ProtocolFilterWrapper,该类的构造参数是Protocol ,在Dubbo中如果构造参数只有一个且是该类实现的接口类型,则认为该类为Wapper包装类。Protocol就存在ProtocolFilterWrapper 和 ProtocolListenerWrapper 两个包装类。Dubbo中自定义的Filter就是通过ProtocolFilterWrapper对Invoker进行包装的。Dubbo中服务暴露的监听事件是通过ProtocolListenerWrapper包装实现的。
final SPI defaultAnnotation = type.getAnnotation(SPI.class);//获取SPI注解
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
//省略校验
//获取默认实现类
if (names.length == 1) cachedDefaultName = names[0];
}
}
// 获取全部实现类
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadDirectory(extensionClasses, DUBBO_DIRECTORY);
loadDirectory(extensionClasses, SERVICES_DIRECTORY);
Protocol默认实现类就是接口上SPI注解的值“dubbo”在com.alibaba.dubbo.rpc.Protocol文件中对应的实现类。
获取到所有的实现类了后,接下来接着看怎么创建适配类
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();//5、组装适配类的代码
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);//6、将组装code编译成class
}
通过createAdaptiveExtensionClassCode方法组装的code实际上就是一个代理类,通过行参中的URL对象(或者行参中能获取到URL对象)来选择具体调用哪一个实现类的该方法。以Protocol为例来看看组装的具体代码:
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
//省略 getDefaultPort、destroy 、refer方法 都是代理 不一一做分析,下面拿export方法开刀
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
//dubbo中拓展点实现方法都包含URL对象
//Dubbo中统一的URL模型:
// 1、有的配置信息都转换成URL参数
// 2、所有的元信息传输都采用URL
// 3、所有接口都能获取到URL
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
//获取URL对象,Dubbo中通过URL来控制对象调用的切换
com.alibaba.dubbo.common.URL url = arg0.getUrl();
//获取当前URL中指定的Protocol实现
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
//获取当前URL中指定的Protocol实现的对象,如果有Wapper对象,则返回用wapper包装后的对象
com.alibaba.dubbo.rpc.Protocol extension =
(com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);//代理到获取到对象的此方法
}
}
三、服务的提供者ServiceConfig
服务暴露触发点
Dubbo是通过Spring配置来植入应用的,ServiceBean是SreviceConfig的实现,在ServiceBean中监听了容器刷新事件onApplicationEvent(ContextRefreshedEvent event)。
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
//(支持ApplicationListener|非延迟暴露) | 已经暴露 | 不暴露
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
//1、调用ServiceConfig export方法导出
export();
}
}
服务暴露
public synchronized void export() {
//省略继承provider属性代码
if (delay != null && delay > 0) {//配置了延迟暴露
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport(); //2、继续往里走
}
}
doExport方法中大部分代码在进行校验,最后调用了doExportUrls方法进行导出,直接进入到doExportUrls中:
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);//加载注册中心信息
for (ProtocolConfig protocolConfig : protocols) { //dubbo支持多协议暴露
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
接下来进入到doExportUrlsFor1Protocol中,大部分构建URL的代码被省略掉。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//省略… 获取协议版本、发布版本、时间戳 等信息来构造URL
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//暴露在本地
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//远程暴露
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (registryURLs != null && !registryURLs.isEmpty()) {
//向service配置的注册中心逐个暴露
for (URL registryURL : registryURLs) {
//省略 加载监控、URL中添加监控地址信息、日志
//代理层工厂创建Invoker,默认使用javassist代理
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//再将Invoker包装一层,并持有ServiceConfig对象
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//按照指定协议获取暴露服务对象,protocol导出服务调用链下方用UML序列图展示
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
//通过代理工厂生成代理对象,默认javassist代理 (也可指定为JDK代理)
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
代理对象
通过javassist代理工厂包装出来的类如下:
public class Wrapper20 extends Wrapper2 implements DC {
//省略代理类方法,属性相关的代码
//代理方法
public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
DemoServiceImpl var5; //代理对象 var1传入
try {
var5 = (DemoServiceImpl)var1;
} catch (Throwable var8) {
throw new IllegalArgumentException(var8);
}
try { //相关方法,只做一层代理
if ("funcOne".equals(var2) && var3.length == 0) {
return var5.funcOne();
}
if ("funcTwo".equals(var2) && var3.length == 1) {
return var5.funcTwo((Integer)var4[0]);
}
} catch (Throwable var9) {
throw new InvocationTargetException(var9);
}
throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.xx.xx.samples.loader.service.impl.UserServiceImpl.");
}
}
服务暴露时序图:
四、服务的消费方ReferenceConfig
初始化入口
Dubbo对于ReferenceConfig接入Spring提供了ReferenceBean的实现类,实现FactoryBean接口通过getObject方法来获取远程调度代理对象。
@Override
public Object getObject() throws Exception {
return get();//调用父类ReferenceConfig的get方法获取
}
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
init();//初始化
}
return ref;
}
private void init() {
//构建参数,忽略...
ref = createProxy(map);//创建代理对象
//忽略
}
创建代理对象
private T createProxy(Map<String, String> map) {
if (isJvmRefer) {
//忽略,本地引用
} else {
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
//有设置url参数,直接使用提供方IP引用,忽略
} else { // assemble URL from register center's configuration
List<URL> us = loadRegistries(false);//组装注册的URL
//URL加参数、判空,忽略
}
}
invoker = refprotocol.refer(interfaceClass, urls.get(0));//服务引用
//默认通过javassist创建代理对象
//如果配置了stub,invoker还要经过StubProxyFactoryWrapper进行包装一层
return (T) proxyFactory.getProxy(invoker);
}
createProxy生成的代理类:
public class proxy0 implements DC, EchoService, DemoService {
public static Method[] methods;
private InvocationHandler handler;
public String sayHello(String var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[0], var2);
return (String)var3;
}
public Object $echo(Object var1) {
Object[] var2 = new Object[]{var1};
Object var3 = this.handler.invoke(this, methods[1], var2);
return (Object)var3;
}
public proxy0() {
}
public proxy0(InvocationHandler var1) {
this.handler = var1;
}
}
在了解SPI拓展机制后这里Stub的创建看起来就很容易了。proxyFactory.getProxy(invoker)
创建出代理对象。
这里是使用SPI机制获取的proxyFactory对象,首先找到META-INF\dubbo\internal\com.alibaba.dubbo.rpc.ProxyFactory
文件里的内容:
stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory
在stub、jdk、javassist中stub比较特殊,StubProxyFactoryWrapper构造函数的参数是ProxyFactory 按照之前讲的SPI机制,该类为一个包装类。这里就很轻松的看出来Stub对象是通过StubProxyFactoryWrapper这个包装类包装出来的了。
服务引用时序图(默认使用FailoverCluster):
RegistryProtocol将自己注册到Consumer节点,再订阅相关节点(含provider),回调实现在RegistryDirectory的notify方法:
public synchronized void notify(List<URL> urls) {
//省略configurators、routers
// 刷新Provider列表、将provider的url列表转换为Invoker map
refreshInvoker(invokerUrls);
}
集群容错机制:
当消费方发起调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。
FailoverCluster支持失败自动切换,当出现失败,重试其它服务器。FailoverCluster的超时失败重试是通过FailoverClusterInvoker来实现的。
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;//默认失败重试2次
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {//重试循环体
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
//负载算法,选择invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try { //try块调用invoker
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
//error日志
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // 业务异常不做重试,直接抛出
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le);
}
}
异常种类:
UNKNOWN_EXCEPTION = 0;
NETWORK_EXCEPTION = 1;
TIMEOUT_EXCEPTION = 2;
BIZ_EXCEPTION = 3;
FORBIDDEN_EXCEPTION = 4;
SERIALIZATION_EXCEPTION = 5;
重试异常抛出点:com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
//忽略
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
//忽略Oneway和Async 方式
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
} catch (TimeoutException e) {//超时异常
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
负载均衡策略:
Dubbo中支持“一致性hash”、“轮询”、“最小活跃数”、“随机”四种均衡策略,其中“随机”是dubbo默认的负载均衡策略。
random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
下面选择RandomLoadBalance(随机)的doselect方法来看看怎么取Invoker的:
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
//sameWeight标志所有的invoker权重是否都相同,都相同直接使用随机数
boolean sameWeight = true; // Every invoker has the same weight?
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // Sum
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
//存在权重不一致的情况
if (totalWeight > 0 && !sameWeight) {
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {//遍历权重值,看随机数落在哪一个invoker上
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 权重一致,直接使用随机数
return invokers.get(random.nextInt(length));
}
上面代码中权重不一致的情况,除了配置的时候weight属性不同以外,还有一个原因,那就是dubbo中存在预热的机制,在新注册到注册中心的provider权重会在10分钟(默认)内从0%均速增长到100%。下面来看看这段预热的代码:
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
//获取provider注册的时间戳
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);//获取总预热时长
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);//计算权重
}
}
}
return weight;
}
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
// 根据uptime、warmup 时间比值来释放权重
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}