1 Dubbo注册中心概述
Dubbo的注册中心承担着Dubbo服务的注册与发现的功能。
Dubbo支持的注册中心主要包括:
ZookeeperRegistry
MulticastRegistry
RedisRegistry
DubboRegistry
其中dubbo官方推荐用Zookeeper作为注册中心,下面介绍ZookeeperRegistry
。
2 Dubbo服务注册
2.1 Dubbo服务注册组件
Dubbo在Registry层实现服务的注册于发现,主要包括如下几个类:
ZookeeperRegistry:负责与zookeeper进行交互;
RegistryProtocol:从注册中心获取可用服务,或者将服务注册到zookeeper,然后提供服务或者提供调用代理;
RegistryDirectory:维护着所有可用的远程
Invoker
或者本地的Invoker
,这个类实现了NotifyListner
;NotifyListener :负责
RegistryDirectory
和ZookeeperRegistry
的通信;FailbackRegistry:继承自
Registry
,实现了失败重试机制;
2.2 Zookeeper的服务注册模型
流程说明:
-
服务提供者启动时
- 向
/dubbo/com.foo.BarService/providers
目录下写入自己的URL地址。
- 向
-
服务消费者启动时
订阅
/dubbo/com.foo.BarService/providers
目录下的提供者URL地址。并向
/dubbo/com.foo.BarService/consumers
目录下写入自己的URL地址。
-
监控中心启动时
- 订阅
/dubbo/com.foo.BarService
目录下的所有提供者和消费者URL地址。
- 订阅
3 服务发布
RegistryProtocol
是对需要暴露服务到注册中心的一层封装,通过RegistryProtocol
实现将暴露的服务信息注册到注册中心。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 暴露服务,然后包装暴露者
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 获取注册中心地址
// 通过服务的registry配置,
//<dubbo:registry id="asRegistry" client="zkclient" protocol="zookeeper" register="true" address="zookeeper:2181" file="D:/dubbo/user/dubbo.registry"/>
//<dubbo:protocol name="dubbo" port="20880" dispatcher="direct" threadpool="fixed" threads="2"/>
//<dubbo:service connections="10" interface="com.wuhulala.dubbo.user.service.UserService" ref="demoService" registry="asRegistry"/>
URL registryUrl = getRegistryUrl(originInvoker);
//根据注册URL 获取注册中心
final Registry registry = getRegistry(originInvoker);
// 处理需要注册到注册中心的地址
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//判断是否需要注册,为什么不放到最上面
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
// 注册逻辑
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// 省略 ...
}
public void register(URL registryUrl, URL registedProviderUrl) {
// registry在初始化的时候被注入一个自适应的扩展点ExtensionLoader.getExtensionLoader(type).getAdaptiveExtension()
//所以发布的时候也是自适应寻找对应扩展点进行注册到注册中心
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
3 ZookeeperRegistry
ZookeeperRegistry
是通过ZookeeperRegistryFactory
创建。
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
//从缓存中
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
说明:
上面是AbstractRegistryFactory#getRegistry
方法根据URL获取对应注册中心,真正的创建在子类的createRegistry
方法中实现。
3.1 ZookeeperRegistryFactory
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
注意:
ZookeeperTransporter
才是真正与Zookeeper通信的对象。默认的Zookeeper客户端是curator。
3.2 ZookeeperRegistry的创建
/**
* 默认zookeeper的跟节点 dubbo
*/
private final static String DEFAULT_ROOT = "dubbo";
/**
* 根节点
*/
private final String root;
/**
* Service 接口全名集合。
* 该属性适可用于监控中心,订阅整个Service层。因为Service层是动态的,可以有不断有新的Service服务发布(注意,不是服务实例)。
* 在 #doSubscribe(url, notifyListener) 方法中,
*/
private final Set<String> anyServices = new ConcurrentHashSet<>();
/**
* 监听器集合
*/
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();
/**
* zookeeper客户端
*/
private final ZookeeperClient zkClient;
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
//获取根节点,不设置则使用/dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
//获取zk客户端调用 ZookeeperTransporter#connect(url) 方法,
// 基于 Dubbo SPIAdaptive 机制,根据url参数,加载对应的 ZookeeperTransporter 实现类,创建对应的 ZookeeperClient 实现类的对应。
zkClient = zookeeperTransporter.connect(url);
//设置监听器
zkClient.addStateListener((state) -> {
//重连,重新获取最新的服务提供方地址信息
if (state == StateListener.RECONNECTED) {
logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
" Since ephemeral ZNode will not get deleted for a connection lose, " +
"there's no need to re-register url of this instance.");
ZookeeperRegistry.this.fetchLatestAddresses();
} else if (state == StateListener.NEW_SESSION_CREATED) {
//重新创建session时,将本服务进行recover恢复,重新发起注册和订阅
logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
try {
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
logger.warn("Url of this instance will be deleted from registry soon. " +
"Dubbo client will try to re-register once a new session is created.");
} else if (state == StateListener.SUSPENDED) {
} else if (state == StateListener.CONNECTED) {
}
});
}
ZookeeperRegistry
在创建时便和Zookeeper创建了连接。
4 服务注册
Zookeeper的服务注册就是创建Zookeeper的节点。
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
根据url对象构建一个URL的Path
,然后在Zookeeper上创建一个节点,节点是否是持久化的根据dynamic
参数决定,true
表示创建一个持久化节点,当注册方下线时,持久化节点仍然在Zookeeper上。false
表示创建临时节点,当注册方下线时,会删除节点,同时通知监听节点的消费方。
4.1 节点路径
Zookeeper中的服务节点路径如Group/ServiceInterface/category/URL
:
Group:表示节点的根路径,默认是dubbo;
ServiceInterface:接口的全限定类名;
category:接口的类别,providers、consumers、configurators、routers等这些;
URL:URL的编码;
4.2 服务下线
服务下线就是把注册的节点进行删除。
public void doUnregister(URL url) {
try {
zkClient.delete(toUrlPath(url));
} catch (Throwable e) {
throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
5 服务订阅
订阅有两种方式pull、push ,pull是客户端定时轮询注册中心拉取配置,push是注册中心主动推送数据给客户端。dubbo是两者结合的方式进行配置的拉取,当第一次启动的时候通过客户端pull拉取所有数据方式,在订阅的节点上进行注册watcher,然后客户端与注册中心保持长连接,而后 每个节点有任何数据变化,注册中心就会更新watcher情况进行回调通知到客户端,客户端就接收到这个通知了。
服务在进行注册的时候,服务端会订阅configurators用来监听动态配置的变更。
在消费者启动的时候,消费者会监听providers、routers、configurators来监听服务提供者、路由规则、配置变更的通知。
5.1 doSubscribe
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
//处理所有service的订阅,
//客户端第一次连接上注册中心的时候,会把这个service设置成* 也就是 ANY_VALUE,进行获取全量数据
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
//获取url的监听器
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
//没有listener则创建一
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
//获取ChildListener
ChildListener zkListener = listeners.get(listener);
//第一次连接时为空,新建一个childListener
if (zkListener == null) {
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
//遍历所有子节点
for (String child : currentChilds) {
child = URL.decode(child);
//有新增服务,则发起这个服务的订阅
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
});
zkListener = listeners.get(listener);
}
//创建持久化节点,订阅持久化节点的子节点
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
//获取到全量service,对每一个service进行订阅
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {//客户端非第一次连接上注册中心,对于具体的服务进行订阅
List<URL> urls = new ArrayList<>();
//根据URL类别,获取一组需要订阅的路径
//类别:providers、routers、consumers、configurators
for (String path : toCategoriesPath(url)) {
//获取url对应的监听器,没有则创建
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
listeners = zkListeners.get(url);
}
//获取ChildListener,没有则创建
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
//子节点数据发生变更时则调用notify方法进行通知这个listener
listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkListener = listeners.get(listener);
}
//创建type的持久化节点,且对于这个节点进行订阅
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener,urls为所有自节点的url
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
5.2 doUnsubscribe
public void doUnsubscribe(URL url, NotifyListener listener) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners != null) {
ChildListener zkListener = listeners.get(listener);
if (zkListener != null) {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
zkClient.removeChildListener(root, zkListener);
} else {
for (String path : toCategoriesPath(url)) {
zkClient.removeChildListener(path, zkListener);
}
}
}
}
}
取消订阅就是删除监听器。
5.3 fetchLatestAddresses
当与zookeeper重新连接时,需要刷新本机生产者列表,这个方法就是干这个的,它把的已订阅的列表都加入订阅失败的容器。
//刷新生产者列表
private void fetchLatestAddresses() {
// subscribe
Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>(getSubscribed());
if (!recoverSubscribed.isEmpty()) {
if (logger.isInfoEnabled()) {
logger.info("Fetching the latest urls of " + recoverSubscribed.keySet());
}
for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
URL url = entry.getKey();
//将当前订阅的列表全部推送至订阅失败的容器
for (NotifyListener listener : entry.getValue()) {
addFailedSubscribed(url, listener);
}
}
}
}
注意:
addFailedSubscribed
方法是FailbackRegistry
将订阅失败的连接放入失败容器,FailbackRegistry
有定时器会定时的处理这些失败的订阅。
6 服务监听
通知事件是调用的AbstractRegistry#notify
方法完成。
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
//......
//将url分类放置
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
//获取url在notified的全部url数据,若notified无则创建一个且放入
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
//对于所有url进行通知
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
//当我们的注册表由于网络抖动而出现订阅失败时,我们至少可以返回现有的缓存URL。
saveProperties(url);
}
}
流程说明:
- 对需要通知的url进行分类;
- 分类调用notify方法进行通知,这个也就是
org.apache.dubbo.registry.NotifyListener#notify
这个方法,也就是在服务订阅doSubscribe
传入的参数NotifyListener
; - 将url进行保存缓存;
6.1 NotifyListener通知
public interface NotifyListener {
/**
* 当收到服务变更通知时触发。
* <p>
* 通知需处理契约:<br>
* 1\. 总是以服务接口和数据类型为维度全量通知,即不会通知一个服务的同类型的部分数据,用户不需要对比上一次通知结果。<br>
* 2\. 订阅时的第一次通知,必须是一个服务的所有类型数据的全量通知。<br>
* 3\. 中途变更时,允许不同类型的数据分开通知,比如:providers, consumers, routers, overrides,允许只通知其中一种类型,但该类型的数据必须是全量的,不是增量的。<br>
* 4\. 如果一种类型的数据为空,需通知一个empty协议并带category参数的标识性URL数据。<br>
* 5\. 通知者(即注册中心实现)需保证通知的顺序,比如:单线程推送,队列串行化,带版本对比。<br>
*
* @param urls 已注册信息列表,总不为空,含义同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
*/
void notify(List<URL> urls);
}
RegistryDirectory#notify
就是更新本地缓存。
6.2 saveProperties
private void saveProperties(URL url) {
if (file == null) {
return;
}
try {
StringBuilder buf = new StringBuilder();
/**
* notifies是 被通知的 URL 集合
*
* key1:消费者的 URL ,例如消费者的 URL ,和 {@link #subscribed} 的键一致
* key2:分类,例如:providers、consumers、routes、configurators。【实际无 consumers ,因为消费者不会去订阅另外的消费者的列表】
* 在 {@link Constants} 中,以 "_CATEGORY" 结尾
*/
Map<String, List<URL>> categoryNotified = notified.get(url);
if (categoryNotified != null) {
for (List<URL> us : categoryNotified.values()) {
for (URL u : us) {
if (buf.length() > 0) {
buf.append(URL_SEPARATOR);
}
buf.append(u.toFullString());
}
}
}
properties.setProperty(url.getServiceKey(), buf.toString());
long version = lastCacheChanged.incrementAndGet();
//同步写入文件还是异步写入文件
if (syncSaveFile) {
doSaveProperties(version);
} else {
registryCacheExecutor.execute(new SaveProperties(version));
}
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
注意:
file在AbstractRegistry
对象创建的时候初始化。
6.3 doSaveProperties
public void doSaveProperties(long version) {
if (version < lastCacheChanged.get()) {
return;
}
if (file == null) {
return;
}
// Save
try {
File lockfile = new File(file.getAbsolutePath() + ".lock");
if (!lockfile.exists()) {
lockfile.createNewFile();
}
try (RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
FileChannel channel = raf.getChannel()) {
FileLock lock = channel.tryLock();
if (lock == null) {
throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
}
// Save
try {
if (!file.exists()) {
file.createNewFile();
}
try (FileOutputStream outputFile = new FileOutputStream(file)) {
properties.store(outputFile, "Dubbo Registry Cache");
}
} finally {
lock.release();
}
}
} catch (Throwable e) {
savePropertiesRetryTimes.incrementAndGet();
if (savePropertiesRetryTimes.get() >= MAX_RETRY_TIMES_SAVE_PROPERTIES) {
logger.warn("Failed to save registry cache file after retrying " + MAX_RETRY_TIMES_SAVE_PROPERTIES + " times, cause: " + e.getMessage(), e);
savePropertiesRetryTimes.set(0);
return;
}
if (version < lastCacheChanged.get()) {
savePropertiesRetryTimes.set(0);
return;
} else {
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
}
logger.warn("Failed to save registry cache file, will retry, cause: " + e.getMessage(), e);
}
}
这使用了文件锁的方式保证只有一个在读写文件。
消费者从注册中心获取注册信息后会做本地缓存。内存中也有一份,保存在Properties
对象里,磁盘上也持久化一份,通过file对象进行引用。