背景
最近刚完成一个task。有client&server端,server端发布服务,client调用服务,操作有put, get。有如下几点要求
- server端是动态部署,可以有1台也可以由n台;
- 需要按照业务key在client端进行路由,先put,在get,都需要在一台server上;
- server端上下线,client能够支持。
解决方式
- client端写路由分发;
- client端写zk监听zk的dubbo上下线,实现热更新;
- 通过ReferenceConfig实现路由后,能够获取到去哪个server存取数据。
使用完ReferenceConfig后,了解了一番源码。
部分代码
注册服务的引用
protected void addDubboRefBean(DubboRefBean bean) {
Object o=null;
ReferenceConfig reference = new ReferenceConfig(); // 此实例很重,封装了与注册中心的连接以及与提供者的连接,请自行缓存,否则可能造成内存和连接泄漏
reference.setApplication(application);
reference.setRegistry(registry); // 多个注册中心可以用setRegistries()
reference.setInterface(clazz);
//reference.setVersion(version);
reference.setGroup(group);
reference.setCheck(false);
reference.setUrl(bean.getUrl()+"/"+clazz.getName());
reference.setTimeout(100000);
reference.setConnections(200);
o=reference.get();
dubborefMap.put(bean.getCode(), (T) o);
}
zk监听到变化调用的程序
private void update(String path, boolean isMove) {
remoteState = RemoteListenerState.EXECUTE;
logger.info("node change text: " + path);
List<String> list = CommonUtils.analyzeZkAddress(path);
if(list == null || list.size() <= 0) {
return;
}
for(String unit: list) {
if(isMove) {
dubborefMap.remove(unit);
cacheLoop: for(DubboRefBean drb: cacheDubboRefBean) {
if(drb.getCode().equals(unit)) {
cacheDubboRefBean.remove(drb);
break cacheLoop;
}
}
}else {
DubboRefBean bean = new DubboRefBean(unit, unit);
addDubboRefBean(bean);
cacheDubboRefBean.add(bean);
}
}
remoteState = RemoteListenerState.FINISH;
}
ReferenceConfig源码分析
有dubbo配置如下
<dubbo:application name="consumer-of-helloService" />
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" />
<dubbo:reference id="helloService" interface="com.demo.dubbo.service.HelloService" />
HelloService接口内容如下
public interface HelloService {
public String hello(String msg);
}
服务引用过程
收集配置参数
methods=hello,
timestamp=1443695417847,
dubbo=2.5.3
application=consumer-of-helloService
side=consumer
pid=7748
interface=com.demo.dubbo.service.HelloService
从注册中心引用服务,创建Invoker对象
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
invoker = refprotocol.refer(interfaceClass, url);
使用ProxyFactory,创建接口代理对象
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
proxyFactory.getProxy(invoker);
Invoker、Protocol、ProxyFactory
ReferenceConfig中createProxy的代码
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
final boolean isJvmRefer;
if (isInjvm() == null) {
if (url != null && url.length() > 0) { //指定URL的情况下,不做本地引用
isJvmRefer = false;
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
//默认情况下如果本地有服务暴露,则引用本地服务.
isJvmRefer = true;
} else {
isJvmRefer = false;
}
} else {
isJvmRefer = isInjvm().booleanValue();
}
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
if (url != null && url.length() > 0) { // 用户指定URL,指定的URL可能是对点对直连地址,也可能是注册中心URL
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // 通过注册中心配置拼装URL
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // 用了最后一个registry url
}
}
if (registryURL != null) { // 有 注册中心协议的URL
// 对有注册中心的Cluster 只用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // 不是 注册中心的URL
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && ! invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
Invoker
Invoker:将通过远程通信将Invocation信息传递给服务器端,服务器端接收到该Invocation信息后,找到对应的本地Invoker,然后通过反射执行相应的方法,将方法的返回值再通过远程通信将结果传递给客户端。
分为三种类型:
- 本地执行类的Invoker
- 远程通信执行类的Invoker
- 多个类型2的Invoker聚合成的集群版的Invoker
Protocol
Protocol:根据url中指定的协议(没有指定的话使用默认的dubbo协议)对外公布这个HelloService服务,当客户端根据协议调用这个服务时,将客户端传递过来的Invocation参数交给服务器端的Invoker来执行。所以Protocol加入了远程通信协议的这一块,根据客户端的请求来获取参数Invocation invocation。
public interface Protocol {
/**
* 获取缺省端口,当用户没有配置端口时使用。
*
* @return 缺省端口
*/
int getDefaultPort();
/**
* 暴露远程服务:<br>
* 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
* 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
* 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
*
* @param <T> 服务的类型
* @param invoker 服务的执行体
* @return exporter 暴露服务的引用,用于取消暴露
* @throws RpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* 引用远程服务:<br>
* 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
* 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
* 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
*
* @param <T> 服务的类型
* @param type 服务的类型
* @param url 远程服务的URL地址
* @return invoker 服务的本地代理
* @throws RpcException 当连接服务提供方失败时抛出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* 释放协议:<br>
* 1. 取消该协议所有已经暴露和引用的服务。<br>
* 2. 释放协议所占用的所有资源,比如连接和端口。<br>
* 3. 协议在释放后,依然能暴露和引用新的服务。<br>
*/
void destroy();
}
ReferenceConfig中服务引用的过程
invoker = refprotocol.refer(interfaceClass, url);
- 注册中心/ 找到注册服务
Registry registry = registryFactory.getRegistry(url);
- 注册中心/ 注册
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)));
- 创建一个RegistryDirectory,从注册中心中订阅自己引用的服务,将订阅到的url在RegistryDirectory内部转换成Invoker
- 然后使用Cluster cluster对象将上述多个Invoker对象(此时还没有真正创建出来,异步订阅,订阅成功之后,回调时才会创建出Invoker)聚合成一个集群版的Invoker对象。
Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
//失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。
cluster.join(directory)
ProxyFactory
ProxyFactory:主要负责将服务如HelloServiceImpl统一进行包装成一个Invoker,这些Invoker通过反射来执行具体的HelloServiceImpl对象的方法。而对于client端,则是将上述创建的集群版Invoker创建出代理对象。
最终执行:
return method.invoke(invoker, args);