前提条件
这里我是用zookeeper作为注册中心,使用的dubbo版本:2.7.1
消费端启动时序图
源码解析
- org.apache.dubbo.config.ReferenceConfig#get
时序图步骤1
检查并更新配置,初始化接口代理类
public synchronized T get() {
//检查并更新配置
checkAndUpdateSubConfigs();
//是否在调用get()之前调用了destroy(),即当前ReferenceConfig已经销毁
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
//如果接口代理类还没有创建,进行初始化
if (ref == null) {
init();
}
//返回初始化好的接口代理类
return ref;
}
- org.apache.dubbo.config.ReferenceConfig#init
时序图步骤2
加载各种配置到map,基于map中的配置构建proxy
private void init() {
//initialized与 ServiceConfig#exported作用类似,只不过exported是为了避免重复发布,而initialized是为了避免重复初始化
if (initialized) {
return;
}
initialized = true;
//检查本地存根类是否存在且合法
//如果local(即将弃用)或stub设置true或default,则会验证interfaceClass+Stub/Local类是否存在,
//如果指定了特定的类作为stub/local的值则会验证该类是否存在
//会校验存根类是否实现了interfaceClass且是否含有interfaceClass作为参数的构造器
checkStubAndLocal(interfaceClass);
//检查mock参数是否合法
checkMock(interfaceClass);
//保存ApplicationConfig、ModuleConfig、ConsumerConfig、ReferenceConfig,MethodConfig,runtimeParameters中的配置参数
Map<String, String> map = new HashMap<String, String>();
//设置side=consumer
map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE);
//保存dubbo:dubboRPC协议版本号,release:Dubbo版本,timestamp:当前时间戳,pid:进程id
appendRuntimeParameters(map);
if (!isGeneric()) {
//获取revision ,先从MANIFEST.MF中读取版本信息,如果不存在版本信息,则从jar文件名中获取,如果也没有则使用用户指定version
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
//保存revision
map.put("revision", revision);
}
//获取interfaceClass的方法
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {//如果没有方法methods:*
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
} else {//保存 methods:方法拼接的字符串
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
//加载各种配置中的参数到map中
map.put(Constants.INTERFACE_KEY, interfaceName);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, consumer, Constants.DEFAULT_KEY);
appendParameters(map, this);
Map<String, Object> attributes = null;
if (CollectionUtils.isNotEmpty(methods)) {
attributes = new HashMap<String, Object>();
for (MethodConfig methodConfig : methods) {
appendParameters(map, methodConfig, methodConfig.getName());
//方法重试次数对应的key
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
//retryValue设置为false表示不重试
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
//设置异步配置
attributes.put(methodConfig.getName(), convertMethodConfig2AyncInfo(methodConfig));
}
}
//获取服务消费端的ip地址
String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {//如果为空则获取本地ip
hostToRegistry = NetUtils.getLocalHost();
}
//保存注册到注册表中的ip地址
map.put(Constants.REGISTER_IP_KEY, hostToRegistry);
//创建代理对象
ref = createProxy(map);
//构建serviceKey
String serviceKey = URL.buildKey(interfaceName, group, version);
//初始化ConsumerModel
//保存到consumedServices中
ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes));
}
- org.apache.dubbo.config.ReferenceConfig#createProxy
时序图步骤3
创建服务代理类
private T createProxy(Map<String, String> map) {
//是否使用同一jvm中的服务提供者(本地调用)
//1.如果设置了injvm,则按照用户设置
//2.如果没有设置injvm,但url不为空,表示直连,也当做远程调用
//3. 否则,根据scope参数判断,如果scope=local表示本地调用,scope=remote,远程调用
//4.如果scope 没有设置,使用map构建的serviceKey在exporterMap中查找如果存在对应的本地服务则表示使用本地调用,否则表示远程调用
if (shouldJvmRefer(map)) {
//构建injvm协议URL
URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
//这里使用的Protocol SPI拓展实现类:InjvmProtocol,获取InjvmInvoker
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {//如果是远程调用
//如果是直连
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
//如果想配置多个直连URL,可以用分号隔开,在这里会根据分号分割
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
//设置interfaceName作为url的path
url = url.setPath(interfaceName);
}
//如果url是registry协议,表示用户想使用自己指定的注册中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
//将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
//合并url#parameters和map,移除服务提供端的设置如:线程名,核心线程数量等;移除方法异步的设置;使用url设置的group,release,dubbo,version,methods,timestamp参数值;最终将合并后的参数作为url参数
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 根据服务注册中新信息装配URL对象
//检查注册中心配置是否存在,将其转化成RegistryConfig
checkRegistry();
//加载注册表配置并将其转换为registryURL,优先顺序为:系统属性> dubbo注册表配置
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
//从系统属性中加载监视器配置,转为monitorUrl
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
//以"monitor"作为key存储到map中
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
//将map转换成查询字符串,并作为 refer 参数的值添加到 registryURL 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {//表示只有一个RegistryURL
//使用对应的Protocol SPI拓展点实现得到invoker
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
//如果有多个RegistryURL,对urls遍历调用RegistryProtocol#refer,得到所有invoker
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
//使用最后一个注册中心url作为registryURL
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // 如果存在注册中心url
// use RegistryAwareCluster only when register's cluster is available
//使用RegistryAwareCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
//将所有invoker包裹得到RegistryAwareClusterInvoker
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
//如果是直连方式没有注册中心url,使用用户指定的cluster参数值对应的集群策略包裹所有invoker,默认FailoverCluster->FailoverClusterInvoker
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
//是否需要启动时检查服务提供者是否可用
if (shouldCheck() && !invoker.isAvailable()) {
// make it possible for consumer to retry later if provider is temporarily unavailable
initialized = false;
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(Constants.CONSUMER_PROTOCOL, map.remove(Constants.REGISTER_IP_KEY), 0, map.get(Constants.INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
//通过ProxyFactory创建服务代理类
return (T) proxyFactory.getProxy(invoker);
}
- org.apache.dubbo.config.ReferenceConfig#shouldJvmRefer
判断是否引用本地服务:
- 如果设置了injvm,则按照用户设置
- 如果没有设置injvm,但url不为空,表示直连,也当做远程调用
- 否则,根据scope参数判断,如果scope=local表示本地调用,scope=remote,远程调用
- 如果scope 没有设置,使用map构建的serviceKey在exporterMap中查找如果存在对应的本地服务则表示使用本地调用,否则表示远程调用
protected boolean shouldJvmRefer(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
boolean isJvmRefer;
if (isInjvm() == null) {
// if a url is specified, don't do local reference
if (url != null && url.length() > 0) {// 如果没有设置injvm,但url不为空,表示直连,也当做远程调用
isJvmRefer = false;
} else {
// by default, reference local service if there is
isJvmRefer = InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl);
}
} else { //如果设置了injvm,则按照用户设置
isJvmRefer = isInjvm();
}
return isJvmRefer;
}
public boolean isInjvmRefer(URL url) {
String scope = url.getParameter(Constants.SCOPE_KEY);
// Since injvm protocol is configured explicitly, we don't need to set any extra flag, use normal refer process.
if (Constants.SCOPE_LOCAL.equals(scope) || (url.getParameter(Constants.LOCAL_PROTOCOL, false))) {//scope=local或者injvm=true表示本地调用
// if it's declared as local reference
// 'scope=local' is equivalent to 'injvm=true', injvm will be deprecated in the future release
return true;
} else if (Constants.SCOPE_REMOTE.equals(scope)) {//scope=remote表示远程调用
// it's declared as remote reference
return false;
} else if (url.getParameter(Constants.GENERIC_KEY, false)) {//如果是泛型调用,默认远程调用
// generic invocation is not local reference
return false;
} else if (getExporter(exporterMap, url) != null) {//否则,从AbstractProtocol#exporterMap中找有没有相同serviceKey的Exporter
// by default, go through local reference if there's the service exposed locally
return true;
} else {
return false;
}
}
//根据URL 中的参数获取对应的本地服务
//之前在dubbo服务提供端本地服务暴露中提过,服务提供端暴露本地服务时
//以key:serviceKey,value:InjvmExporter作为键值对缓存到AbstractProtocol#exporterMap
static Exporter<?> getExporter(Map<String, Exporter<?>> map, URL key) {
Exporter<?> result = null;
if (!key.getServiceKey().contains("*")) {//通过serviceKey能找到对应的本地服务
result = map.get(key.getServiceKey());
} else {
if (CollectionUtils.isNotEmptyMap(map)) {//serviceKey=*,则通过比较interface,group,version是否匹配,获取本地服务
for (Exporter<?> exporter : map.values()) {
if (UrlUtils.isServiceKeyMatch(key, exporter.getInvoker().getUrl())) {
result = exporter;
break;
}
}
}
}
if (result == null) {
return null;
} else if (ProtocolUtils.isGeneric(//如果找到的export是泛型服务,则丢弃
result.getInvoker().getUrl().getParameter(Constants.GENERIC_KEY))) {
return null;
} else {
return result;
}
}
- Protocol$Adaptive#refer
时序图步骤4,5,6,7
通过Protocol的适配器类,找到url#protocol参数指定的SPI实现类,并使用wrapper类(ProtocolFilterWrapper,ProtocolListenterWrapper,QosProtocolWrapper)对Protocol实现类进行增强.
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
- org.apache.dubbo.registry.integration.RegistryProtocol#refer
时序图步骤8
如果url#protocol=registry,使用RegistryProtocol,主要作用根据registryURL获取注册中心实例。
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//将registryURL协议由registry替换成parameters中registry对应的值,然后移除registry这个键值对
url = URLBuilder.from(url)
.setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
.removeParameter(REGISTRY_KEY)
.build();
//根据url#protocol找到对应的的spi实现,获取注册中心实例,先从缓存中获取,如果没有则创建注册中心实例
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {//如果需要引用的服务类型是RegistryService,则获取注册服务的invoker
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
//将之前存储在registryURL#parameters中key为refer的查询参数转换成map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0) {
//如果多个group用逗号隔开,使用MergeableCluster作为集群策略
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
// 获取invoker
return doRefer(getMergeableCluster(), registry, type, url);
}
}
//如果只有一个 group
// 获取invoker
return doRefer(cluster, registry, type, url);
}
- org.apache.dubbo.registry.integration.RegistryProtocol#doRefer
时序图步骤9
- 将consumerURL注册到注册中心
- 构建RegistryDirectory,构建路由规则链,监听providers,routers.configurators节点数据,当节点数据发生变化动态更新invoker列表
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//创建RegistryDirectory,基于注册中心动态感知服务提供者的变化
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 这里获取的其实就是registryURL#parameters中key为refer的参数值转换的map
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
//初步生成subscribeUrl,使用consumer协议
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
//如果服务接口不等于'*'并且registry=true默认为true,注册消费端url(consumerURL)
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
//在subscribeUrl基础上添加category=consumers,check=false,产生consumerURL
directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
//将consumerURL注册到注册中心,由于category=consumers,如果使用zookeeper作为注册中心,会注册到/group/path/consumers目录下
registry.register(directory.getRegisteredConsumerUrl());
}
//构建路由规则链
directory.buildRouterChain(subscribeUrl);
//订阅providers,routers.configurators节点数据,并动态刷新invoker列表
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
//RegistryDirectory中可能包含多个可用invoker,集群策略会将Directory伪装成一个Invoker返回
Invoker invoker = cluster.join(directory);
////将invoker注册到本地注册表中
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
- org.apache.dubbo.registry.support.FailbackRegistry#register
如果使用zookeeper作为注册中心,会调用FailbackRegistry#register注册ConsumerURL
@Override
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
- org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
将ConsumerURL注册到zookeeper上,其实就是创建一个临时节点
public void doRegister(URL url) {
try {
// 创建/dubbo/path/consumers/URL.encode(consumerURL)节点
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
- org.apache.dubbo.registry.integration.RegistryDirectory#buildRouterChain
时序图步骤10
构建路由规则链
public void buildRouterChain(URL url) {
this.setRouterChain(RouterChain.buildChain(url));
}
加载所有使用了@Activate注解的RouterFactory的SPI实现,构建对应的Router,利用加载的Router列表创建RouterChain
public static <T> RouterChain<T> buildChain(URL url) {
return new RouterChain<>(url);
}
private RouterChain(URL url) {
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());
initWithRouters(routers);
}
public void initWithRouters(List<Router> builtinRouters) {
this.builtinRouters = builtinRouters;
this.routers = new CopyOnWriteArrayList<>(builtinRouters);
this.sort();
}
8.org.apache.dubbo.registry.integration.RegistryDirectory#subscribe
时序图步骤11
保存consumerURL,设置监听,订阅节点数据
public void subscribe(URL url) {
//设置消费者URL
setConsumerUrl(url);
//监听app-name.configurators节点
consumerConfigurationListener.addNotifyListener(this);
//监听service-name.configurators节点
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
//订阅
registry.subscribe(url, this);
}
- org.apache.dubbo.registry.support.FailbackRegistry#subscribe
时序图步骤12
保存listener,并删除相同订阅请求的重试任务,发送订阅请求到注册中心,如果注册失败,记录注册请求,并创建重试任务
public void subscribe(URL url, NotifyListener listener) {
//将listener保存到AbstractRegistry#subscribed中
super.subscribe(url, listener);
//删除失败的订阅重试任务(FailedSubscribedTask)
removeFailedSubscribed(url, listener);
try {
// 发送订阅请求到注册中心
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// 如果订阅失败,记录失败的订阅,并创建重试任务
addFailedSubscribed(url, listener);
}
}
- org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doSubscribe
时序图步骤13
监听consumerURL指定的节点,如果被监听节点的子节点有变化,会通知数据变化
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
//订阅所有服务,例如监控中心的订阅
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
//获取根节点
String root = toRootPath();
//获得url对应的监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {//如果监听器集合为空,创建监听器集合
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {//如果不存在子节点监听器,创建节点监听器
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
//如果根节点数据变化,遍历根节点下所有子节点
for (String child : currentChilds) {
child = URL.decode(child);//对providerURL 解码
if (!anyServices.contains(child)) {//如果anyServices中不存在该服务,则添加表示是后来注册的新服务
anyServices.add(child);
//订阅新节点
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
zkListener = listeners.get(listener);
}
//保证根节点的存在,创建根节点
zkClient.create(root, false);
//到根节点上,监听根节点,返回子节点路径
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
//对子节点遍历,如果出现 新的服务,添加到anyServices中,并订阅新节点
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {//如果是某个服务的订阅
List<URL> urls = new ArrayList<>();
//获取订阅的服务的providers,routers.configurators路径
//例如/dubbo/com.books.dubbo.demo.api.testService/providers,/dubbo/com.books.dubbo.demo.api.testService/consumers
for (String path : toCategoriesPath(url)) {
//获得url对应的监听器集合
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {//如果监听器集合为空,创建监听器集合
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
//获取listener对应的子节点监听器
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {//如果不存在子节点监听器,创建子节点监听器
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
//创建节点
zkClient.create(path, false);
//将节点监听器注册到path对应的节点上,获取节点下的所有子节点路径
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
//将匹配的children保留,如果没有匹配的子节点,会产生一个emptyURL
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//通知数据变化
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
- org.apache.dubbo.registry.support.FailbackRegistry#notify
时序图步骤14
校验参数,通知数据变化,如果通知失败,创建失败通知重试任务(FailedNotifiedTask)
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
//如果通知失败,创建失败通知重试任务
addFailedNotified(url, listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
- org.apache.dubbo.registry.support.FailbackRegistry#doNotify
时序图步骤15
将urls按照服务的类目分类,按照类目通知数据变化
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
super.notify(url, listener, urls);
}
- org.apache.dubbo.registry.support.AbstractRegistry#notify(URL , NotifyListener, java.util.List<URL>)
时序图步骤16
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
//将urls按照服务的类目分类
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//将consumerURL与分类好的urls做映射 notified: consumerURL->category :categoryList
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
//通知类目下的数据变化
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//通知监听器数据变化
listener.notify(categoryList);
//更新缓存文件
saveProperties(url);
}
}
- org.apache.dubbo.registry.integration.RegistryDirectory#notify
时序图步骤17
- 获取url中的configurators信息,configurators是外部化配置信息,包含服务者动态配置 URL 元数据信息。
- 获取url中的routers信息, routers是路由配置信息,包含消费者路由策略 URL 元数据信息。
- 获取url中的providers信息,providers是服务提供者注册信息,包含服务者 URL 元数据信息,对overrideDirectoryUrl配置信息覆盖,刷新invoker列表
public synchronized void notify(List<URL> urls) {
//按照configurators,routers,providers分类
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(url -> {
if (UrlUtils.isConfigurator(url)) {
return CONFIGURATORS_CATEGORY;
} else if (UrlUtils.isRoute(url)) {
return ROUTERS_CATEGORY;
} else if (UrlUtils.isProvider(url)) {
return PROVIDERS_CATEGORY;
}
return "";
}));
//获取configuratorURL
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
//将configuratorURL转换成Configurator
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
//获取routerURL
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
//将routerURL转换成Router,和builtinRouters共同组成路由规则链RouterChain
toRouters(routerURLs).ifPresent(this::addRouters);
// 获取providerURL
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
//对overrideDirectoryUrl配置信息覆盖,刷新invoker列表
refreshOverrideAndInvoker(providerURLs);
}
- org.apache.dubbo.registry.integration.RegistryDirectory#refreshOverrideAndInvoker
时序图步骤18
对RegistryDirectory#overrideDirectoryUrl 信息覆盖 ,刷新invoker列表
private void refreshOverrideAndInvoker(List<URL> urls) {
//对RegistryDirectory#overrideDirectoryUrl 信息覆盖
overrideDirectoryUrl();
//刷新invoker列表
refreshInvoker(urls);
}
- org.apache.dubbo.registry.integration.RegistryDirectory#overrideDirectoryUrl
时序图步骤19
对RegistryDirectory#overrideDirectoryUrl 信息覆盖
private void overrideDirectoryUrl() {
// merge override parameters
this.overrideDirectoryUrl = directoryUrl;
//使用我们在时序图步骤17中获取的外部化配置,覆盖overrideDirectoryUrl ,比如将动态注册的mock策略添加到overrideDirectoryUrl 中
List<Configurator> localConfigurators = this.configurators; // local reference
doOverrideUrl(localConfigurators);
//使用消费端配置,覆盖overrideDirectoryUrl
List<Configurator> localAppDynamicConfigurators = consumerConfigurationListener.getConfigurators(); // local reference
doOverrideUrl(localAppDynamicConfigurators);
if (serviceConfigurationListener != null) {
//使用服务端配置,覆盖overrideDirectoryUrl
List<Configurator> localDynamicConfigurators = serviceConfigurationListener.getConfigurators(); // local reference
doOverrideUrl(localDynamicConfigurators);
}
}
- org.apache.dubbo.registry.integration.RegistryDirectory#refreshInvoker
时序图步骤20
刷新invoker列表
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
//如果地址只有一个并且协议参数值为empty,清空并销毁所有invoker
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; //表示没有可用的服务提供者
this.invokers = Collections.emptyList();
routerChain.setInvokers(this.invokers);
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // 表示有服务提供者可用
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
//如果invokerUrls为空但cachedInvokerUrls 不为空,使用cachedInvokerUrls
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
//缓存invokerUrls
this.cachedInvokerUrls.addAll(invokerUrls);
}
if (invokerUrls.isEmpty()) {
return;
}
//将providerUrl转化成Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
// pre-route and build cache, notice that route cache should build on original Invoker list.
// toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
//将刷新后的invoker列表设置到routerChain中
routerChain.setInvokers(newInvokers);
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
//缓存刷新后的invoker列表
this.urlInvokerMap = newUrlInvokerMap;
try {
//销毁过时的(无用的)invokers
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
- org.apache.dubbo.registry.integration.RegistryDirectory#toInvokers
时序图步骤21
将providerUrl转换成invoker
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
// //如果在消费端配置了协议,则只匹配相同协议的providerUrl
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
//是否存在支持providerUrl.getProtocol()的Protocol的SPI实现
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
//合并url参数 合并顺序:override > -D >Consumer > Provider
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
//是否是重复的providerUrl
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// 先从本地缓存中获取
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { //如果没有对应的缓存,创建invoker
try {
//url是否可用
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {//如果可用,创建InvokerDelegate包裹Invoker
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);//放入缓存
}
} else {
newUrlInvokerMap.put(key, invoker);//放入缓存
}
}
keys.clear();
return newUrlInvokerMap;
}
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#refer
时序图步骤22
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
//如果指定了optimizer参数,加载optimizer对应的SerializationOptimizer实例,获取序列化类列表,注册到序列化注册表中
optimizeSerialization(url);
// 创建invoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
//缓存
invokers.add(invoker);
return invoker;
}
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getClients
时序图步骤23
获取客户端ExchangeClient的实例
这里会发现默认情况下,同一台机器的多个服务是共享连接的,而不是每一个服务都有自己单独的连接
private ExchangeClient[] getClients(URL url) {
//不同服务是否共享连接
boolean useShareConnect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// 如果没有配置connections ,则默认连接是共享的,否则每个服务单独有自己的连接
if (connections == 0) {
useShareConnect = true;
/**
* xml配置优先于属性配置
*/
//默认共享连接数:1
String shareConnectionsStr = url.getParameter(Constants.SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(Constants.SHARE_CONNECTIONS_KEY,
Constants.DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
//获取共享客户端
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {//否则每个服务单独有自己的连接
clients[i] = initClient(url);
}
}
return clients;
}
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#getSharedClient
时序图步骤24
获取共享客户端
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
//获取服务提供者地址
String key = url.getAddress();
//先从缓存获取
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
//检查缓存中的客户端是否都可用,如果都可用直接复用
if (checkClientCanUse(clients)) {
//如果可用增加ReferenceCountExchangeClient的引用次数
batchClientRefIncr(clients);
return clients;
}
//为服务提供者创建对象,作为锁
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
clients = referenceClientMap.get(key);
// dubbo check
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
// connectNum 必须大于等于1
connectNum = Math.max(connectNum, 1);
//如果缓存中不存在需要创建客户端
if (CollectionUtils.isEmpty(clients)) {
//创建客户端
clients = buildReferenceCountExchangeClientList(url, connectNum);
//缓存
referenceClientMap.put(key, clients);
} else {
//如果缓存中存在
for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
// 如果存在一个客户端不可用,创建一个新的客户端替换他
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
referenceCountExchangeClient.incrementAndGetCount();
}
}
/**
* I understand that the purpose of the remove operation here is to avoid the expired url key
* always occupying this memory space.
*/
locks.remove(key);
return clients;
}
}
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#buildReferenceCountExchangeClientList
时序图步骤25
创建指定数量的ReferenceCountExchangeClient
private List<ReferenceCountExchangeClient> buildReferenceCountExchangeClientList(URL url, int connectNum) {
List<ReferenceCountExchangeClient> clients = new CopyOnWriteArrayList<>();
for (int i = 0; i < connectNum; i++) {
//创建ReferenceCountExchangeClient
clients.add(buildReferenceCountExchangeClient(url));
}
return clients;
}
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#buildReferenceCountExchangeClient
时序图步骤26
创建ReferenceCountExchangeClient
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
//初始化客户端
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#initClient
时序图步骤27
设置编解码类型,心跳间隔,校验是否Transporter类型是否合法,如果设置了惰性连接则在使用时才会初始化连接,否则及时连接
private ExchangeClient initClient(URL url) {
// 客户端类型,默认NettyClient
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
//设置编解码类型,默认使用DubboCodec
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 设置心跳间隔默认60*1000ms
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 是否存在名称为str的Transporter SPI实现
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// 是否惰性连接,默认false
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
//LazyConnectExchangeClient 只有使用时才会初始化连接
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
//及时连接
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
- org.apache.dubbo.remoting.exchange.Exchangers#connect(URL,ExchangeHandler)
时序图步骤28
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
//如果没有设置编解码参数,默认使用 ExchangeCodec
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//默认使用HeaderExchanger
return getExchanger(url).connect(url, handler);
}
public static Exchanger getExchanger(URL url) {
//默认使用HeaderExchanger
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
- org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#connect
时序图步骤29
创建HeaderExchangeClient
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
- org.apache.dubbo.remoting.Transporters#connect(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)
时序图步骤30
获取NettyClient
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {//如果只有一个处理器,则直接使用
handler = handlers[0];
} else {//如果有多个,使用ChannelHandlerDispatcher对多个处理器做组装
handler = new ChannelHandlerDispatcher(handlers);
}
//默认使用 netty4.NettyTransporter
return getTransporter().connect(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
- org.apache.dubbo.remoting.transport.netty4.NettyTransporter#connect
时序图步骤31
创建NettyClient
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
- org.apache.dubbo.remoting.transport.netty4.NettyClient#NettyClient
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
- org.apache.dubbo.remoting.transport.AbstractClient#wrapChannelHandler
设置线程前缀,设置线程池策略使用CachedThreadPool,构建处理器责任链
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
// 设置线程前缀:DubboClientHandler
url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
//设置线程池策略使用CachedThreadPool
url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
//构建处理器责任链,其实可以发现在这一步时,消费端的处理器责任链和服务提供端是一样的
//MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol#requestHandler
return ChannelHandlers.wrap(handler, url);
}
- org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers
处理器责任链构建
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
- org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient
创建bootstrap,建立连接
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
try {
doOpen();
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
try {
// connect.
connect();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress() + " connect to the server " + getRemoteAddress());
}
} catch (RemotingException t) {
if (url.getParameter(Constants.CHECK_KEY, true)) {
close();
throw t;
} else {
logger.warn("Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + " (check == false, ignore and retry later!), cause: " + t.getMessage(), t);
}
} catch (Throwable t) {
close();
throw new RemotingException(url.toInetSocketAddress(), null,
"Failed to start " + getClass().getSimpleName() + " " + NetUtils.getLocalAddress()
+ " connect to the server " + getRemoteAddress() + ", cause: " + t.getMessage(), t);
}
executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
ExtensionLoader.getExtensionLoader(DataStore.class)
.getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
}
- org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen
构建netty客户端处理器nettyClientHandler(NettyClientHandler ->NettyClient->MultiMessageHandler->HeartbeatHandler...) ,创建bootstrap
protected void doOpen() throws Throwable {
final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
bootstrap = new Bootstrap();
bootstrap.group(nioEventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
//.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
.channel(NioSocketChannel.class);
if (getConnectTimeout() < 3000) {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
} else {
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
}
bootstrap.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
}
});
}
- org.apache.dubbo.remoting.transport.netty4.NettyClient#doConnect
建立连接
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.channel();
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.cause() != null) {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + future.cause().getMessage(), future.cause());
} else {
throw new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
}
} finally {
if (!isConnected()) {
//future.cancel(true);
}
}
}
- org.apache.dubbo.rpc.cluster.support.FailoverCluster#join
时序图步骤38 39
用集群容错策略包装invoker,默认集群容错策略:FailoverCluster(失败重试)
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
- org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
时序图步骤37 40
MockClusterWrapper会对Cluster SPI实现增强,提供服务降级能力
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}
- org.apache.dubbo.rpc.proxy.AbstractProxyFactory#getProxy(org.apache.dubbo.rpc.Invoker<T>)
时序图步骤46
获取代理类
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
return getProxy(invoker, false);
}
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Class<?>[] interfaces = null;
String config = invoker.getUrl().getParameter(Constants.INTERFACES);
if (config != null && config.length() > 0) {
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
// TODO can we load successfully for a different classloader?.
interfaces[i + 2] = ReflectUtils.forName(types[i]);
}
}
}
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
if (!GenericService.class.isAssignableFrom(invoker.getInterface()) && generic) {
int len = interfaces.length;
Class<?>[] temp = interfaces;
interfaces = new Class<?>[len + 1];
System.arraycopy(temp, 0, interfaces, 0, len);
interfaces[len] = com.alibaba.dubbo.rpc.service.GenericService.class;
}
//调用子类实现
return getProxy(invoker, interfaces);
}
- org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getProxy
默认使用JavassistProxyFactory,使用InvokerInvocationHandler包装MockClusterInvoker
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
总结
dubbo服务消费端的启动流程比较复杂,总结一下:
- 加载ApplicationConfig、ModuleConfig、ConsumerConfig、ReferenceConfig,MethodConfig,runtimeParameters中的配置参数
- 获取invoker,这里分为2种情况
- 本地服务调用
- 远程服务调用:
- 直连方式
- 指定了注册中心
- 如果是本地服务调用,直接从缓存中获取invoker
- 如果直连方式调用,且没有指定注册中心,直接获取集群容错策略包装的invoker
- 如果指定了注册中心
- 创建RegistryDirectory
- 将consumerURL注册到注册中心,
- 构建路由规则链
-监听调用服务的providers,configurators,routers节点的数据变化 ,随着注册中心数据的变化动态刷新invoker列表 - 将providers节点下的所有匹配的providerURL转换成Invoker(每个invoker包装着一个client)
- 将invoker列表存到RegistryDirectory,最终得到集群容错策略包装的Invoker
- 基于invoker生成代理类
所以每次通过代理类调用,实际是通过invoker中的client发起远程调用