讲过了集群容错的原理分析,介些来我们看看这些服务是如何暴露出来供消费者使用的。
这一节我们主要看看是什么时候开始了服务暴露,在这个过程中发生了什么。
我们从ServiceBean
讲起,先看一下继承结构:
可见它实现了许多的spring的接口,其中有一个
ApplicationListener
,熟悉spring的话可以知道这是一个事件监听方法,当spring发出事件的时候执行这个方法。在ServiceBean
中,监听的是ContextRefreshedEvent
事件。
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
可以看到这里有个export
方法,要执行此方法需要三个条件:
-
isDelay()
: 这个要注意是立即导出的意思,不是需要延迟,通过实现过程就可以发现。
private boolean isDelay() {
Integer delay = getDelay();
ProviderConfig provider = getProvider();
if (delay == null && provider != null) {
delay = provider.getDelay();
} // 获取到provider的 delay配置
return supportedApplicationListener && (delay == null || delay == -1);
}
看(delay == null || delay == -1)
就知道只有delay无效的时候才会返回true,所以这个方法好像写反了。
-
!isExported()
: 返回ServiceConfig
中exported
,通过查询赋值过程可以发现只有在doExport
中赋值true
。也就是说在服务暴露后的一个标记。 -
!isUnexported()
:返回ServiceConfig
中unexported
,同上可知是在服务销毁时候赋值的标记。
综上可见要执行export
方法的过程是 是否有延迟导出 && 是否已导出 && 是不是已被取消导出 来决定的。
继续往下走进入export
:
public synchronized void export() {
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
if (delay != null && delay > 0) {
delayExportExecutor.schedule(new Runnable() {
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
处理了是否需要延迟暴露,需要的话放入线程池中稍后执行doExport
;否则立即执行。
doExport
:
protected synchronized void doExport() {
// 执行过取消暴露的话报错
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
// 已经暴露过就不继续了
if (exported) {
return;
}
exported = true;
// 检查服务名称合法性
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
// 检测provider是否为空,空的话新建并且根据系统变量进行初始化。
checkDefault();
// 从其他配置中获取配置,设置核心配置类对象
if (provider != null) {
if (application == null) {
application = provider.getApplication();
}
if (module == null) {
module = provider.getModule();
}
if (registries == null) {
registries = provider.getRegistries();
}
if (monitor == null) {
monitor = provider.getMonitor();
}
if (protocols == null) {
protocols = provider.getProtocols();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
// 设置泛化服务类型
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
// 获取对应的服务类信息
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
// 对 interfaceClass,以及 <dubbo:method> 标签中的必要字段进行检查
checkInterfaceAndMethods(interfaceClass, methods);
// 对 ref 合法性进行检测
checkRef();
generic = Boolean.FALSE.toString();
}
if (local != null) {
if ("true".equals(local)) {
local = interfaceName + "Local";
}
Class<?> localClass;
try {
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if (stub != null) {
if ("true".equals(stub)) {
stub = interfaceName + "Stub";
}
Class<?> stubClass;
try {
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if (!interfaceClass.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
// 检查各模块配置
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
// 导出服务
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
总结一下在doExport
中做的事情:
- 检测当前服务状态,在未暴露,未取消暴露的情况下执行。
- 检测服务名称的合法性
- 建立Provider,初始化
- 从当前配置中获取核心配置并赋值。
- 检查是否是泛化调用,非泛华调用的进行标签配置与服务类型检查。
- 本地存根的配置与检查。
- 对配置类进行最后的检查。
- 执行暴露
先不管最后两行代码,留个问题,先来看看doExportUrls
中做了什么事情。
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
先获取了注册中心的url集合,再遍历protocols进行暴露服务。
loadRegistries
:
protected List<URL> loadRegistries(boolean provider) {
checkRegistry();
List<URL> registryList = new ArrayList<URL>();
if (registries != null && registries.size() > 0) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (address == null || address.length() == 0) {
address = Constants.ANYHOST_VALUE;
}
String sysaddress = System.getProperty("dubbo.registry.address");
if (sysaddress != null && sysaddress.length() > 0) {
address = sysaddress;
}
if (address != null && address.length() > 0
&& !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
map.put("dubbo", Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
if (!map.containsKey("protocol")) {
if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {
map.put("protocol", "remote");
} else {
map.put("protocol", "dubbo");
}
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
1.先从配置,环境变量获取注册中心的有效地址。
2.获取到地址后,获取应用与注册中心的配置信息,再设置
-
path
注册中心服务路径, -
dubbo
dubbo版本, -
timestamp
启动时间戳, -
pid
应用进程id, -
protocol
协议信息 remote/dubbo
3.通过UrlUtils.parseURLs
组装成Url,这里的address可以是多个,对应多个url,用|
或;
分割。
4.设置注册中心的参数,协议等信息,检查有效性并加入结果中。
现在拿到了所有的注册中心与协议信息,可以对协议与注册中心的信息进行组装了。
doExportUrlsFor1Protocol
:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// ......
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// export service
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
这里主要是进行一些配置参数的组装,对泛化与非泛化服务调用的参数处理等,最后组装成com.alibaba.dubbo.common.URL
来持有服务的信息。太过于细节的地方现在先不看,上面省略了methods配置的处理,下面简单来看一下做了什么事。
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
这里要对应文档看一下功能,处理的是这个标签:
<dubbo:reference interface="com.xxx.XxxService">
<dubbo:method name="findXxx" timeout="3000" retries="2">
<dubbo:argument index="0" callback="true" />
</dubbo:method>
</dubbo:reference>
对每一个method
标签处理,对每一个参数进行设置,对retry
与retries
进行处理,统一用retries
表示,不重试的retries = 0。
然后是一个很深的嵌套循环,其实不难看懂,两层for循环for (ArgumentConfig argument : arguments) {
和for (int i = 0; i < methods.length; i++) {
主要是为了把标签的method与服务类反射获得的方法进行匹配,来执行对callback的处理。
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
匹配到了方法中的参数,并且类型相同的化,就把callback的配置放入map中。
如果没有设置参数的index,那就把所有的参数都配置上callback的配置。
继续看
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
加载对应协议的ConfiguratorFactory,设置参数
下面终于到了暴露服务的核心了
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { // @1
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { // @2
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
首先获取到scope
参数,如果是none
就什么也不做,接下来看到两个if代码块
@1本地服务暴露,@2远程服务暴露
这个配置的作用就是指定暴露的位置,默认暴露本地与远程,不是远程就是本地,不是本地就是远程。
@1中的exportLocal
单独用一节来讲。
@2中有注册中心时,为每一个注册中心加载监听参数,最后生成invoker并导出。没有注册中心时直接导出。
在本地暴露或是远程暴露时,都能看到时先用proxyFactory
创建一个invoker,再使用protocol.export
进行导出,那我们先看一看invoker时怎么创建的。
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
ProxyFactory
的默认实现为JavassistProxyFactory
,简单地看一下里面是什么样的:
/**
* JavaassistRpcProxyFactory
*/
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
在getInvoker
方法中,先用Wrapper
为对应的目标类创建一个wrapper,再返回一个匿名的Invoker,重写了doInvoke
方法,将调用交给了wrapper.invokeMethod处理。
这里先简单介绍下Wrapper.getWrapper
方法,就是先从缓存中获取目标类的wrapper对象,没有的话,通过字符拼接的方式组装方法,通过Dubbo的ClassGenerator
来生成一个目标类的Wrapper对象。细节方面在后面放一节来讲,需要一下javassist知识。
Invoker的创建过程大概知道了,下面看protocol.export
方法,通过SPI可以发现对应多种实现,在调试过程中,不能获取到protocol的具体引用,是一个这个东西:
咱也不知道这是干啥的,这个需要了解dubbo spi的机制。关于spi的知识会在后面介绍,这里需要知道在调用InjvmProtocol或者其他的Protocol的时候,需要先经过
QosProtocolWrapper
,ProtocolFilterWrapper
,ProtocolListenerWrapper
的export
方法。先看一下本地暴露
exportLocal
:
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
首先是判断url中的protocol
参数是否设置了injvm
,设置过的话表示已经暴露过,跳过。未设置的话进入暴露流程。
为本地暴露组装local的URL,加上protocol = injvm, host = 127.0.0.1, port = 0参数。
根据ref创建一个本地暴露用的invoker,通过protocol.export
暴露,并加入到缓存中。
暴露的过程在InjvmProtocol
中:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
在InjvmProtocol
维护了一个InjvmExporter
的缓存,暴露工作实际是就是创建一个InjvmExporter
对象。本地暴露就是这么简单,继续分析远程暴露。
远程注册中心暴露是通过RegistryProtocol
执行的,比本地复杂的多:
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker @1
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker); // @2
//registry provider
final Registry registry = getRegistry(originInvoker); // @3
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); // @4
if (register) { // @5
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // @6
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); //@7
}
@1通过doLocalExport
对服务进行导出。
@2获取注册中心的URL。
@3获取注册中心,并拿到provider的URL,在ProviderConsumerRegTable注册表中进行provider的注册。
@5向注册中心注册服务,并设置provider的注册标记。
@6创建override的监听,目前先不看是做什么的。
@7返回一个DestroyableExporter
除了@7其他的都比较复杂,我们来一个一个分析下去。
先来看一下第一步的doLocalExport
做了什么
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
使用双重检查锁来获得缓存中的exporter。如果不存在,通过protocol.export
创建。之前讲过在这个方法前会有三个装饰器wrapper先执行,假设我们使用的是Dubbo的Protocol,那么来看一下DubboProtocol#export
方法:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
开头是创建DubboExporter
的过程,中间是关于本地存根相关的处理,这里不做分析。看一下最后两个方法openServer
和optimizeSerialization
是分析的重点。
openServer
:
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
key是url中的进程地址与端口号,每一个key维护一个ExchangeServer
,没有时进行创建createServer
,有的话进行重置server.reset(url)
。
下面分两个分支,先讲一下创建createServer
的过程:
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @1
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @2
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler); // @3
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY); // @4
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
在前二行@1中设置了两个参数:
-
channel.readonly.sent
表示server关闭时是否发送只读事件 -
heartbeat
心跳时间默认1分钟。
@2中获取到server的类型,并检查是不是所支持的netty
,mina
,grizzly
。
@3设置编码解码器参数并创建server。
@4检查client
参数是不是支持的。
主要步骤是检测server参数,创建server,检测client参数。
继续来看Exchangers.bind
:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
看看getExchanger
做的事情:
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
获取到了HeaderExchanger
实例,看看bind方法:
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
包含了4个逻辑,3个是构造方法,一个是Transporters.bind
,我们主要关心下bind
方法:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
用默认的NettyTransporter
,bind方法
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
直接创建了一个NettyServer
,继续看它的构造函数:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
super:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
首先获取到ip,port等信息赋默认值,然后执行doOpen
方法。doOpen
是一个模板方法,由子类实现。继续看看NettyServer
的doOpen
:
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
创建boss与worker的线程池,创建Netty的ServerBootstrap
,设置 PipelineFactory,绑定ip与端口。
以上就是一个server的创建过程,后面的都是netty相关的源码了。
总结一下这个server的创建过程,dubbo将netty的创建给封装在NettyServer
中,通过spi获取到相关的Transpoter,获取到创建server的相关参数来创建server。
第二个分支server.reset
,具体的调用路径不贴了,主要看下AbstractServer
的reset
方法:
@Override
public void reset(URL url) {
if (url == null) {
return;
}
try {
if (url.hasParameter(Constants.ACCEPTS_KEY)) {
int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
if (a > 0) {
this.accepts = a;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
if (t > 0) {
this.idleTimeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.THREADS_KEY)
&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(Constants.THREADS_KEY, 0);
int max = threadPoolExecutor.getMaximumPoolSize();
int core = threadPoolExecutor.getCorePoolSize();
if (threads > 0 && (threads != max || threads != core)) {
if (threads < core) {
threadPoolExecutor.setCorePoolSize(threads);
if (core == max) {
threadPoolExecutor.setMaximumPoolSize(threads);
}
} else {
threadPoolExecutor.setMaximumPoolSize(threads);
if (core == max) {
threadPoolExecutor.setCorePoolSize(threads);
}
}
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
super.setUrl(getUrl().addParameters(url.getParameters()));
}
设置一些参数 最大可连接的客户端数量accepts
,空闲超时时间idleTimeout
,线程数threads
。
在设置threads
的时候,会把threads
的值与当前线程池比较。
- 如果
threads
比核心线程数小,那么减小核心线程数。 - 如果
threads
比核心线程数大,那么增大最大线程数。
这里这么设置我想是为了不影响原来正在执行的线程吧,同时节省资源,留个疑问???
最后一行是对url的一个重置,把当前的持有的url与暴露出来的url参数进行合并,放入AbstractPeer
里面的url。
protected void setUrl(URL url) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
this.url = url;
}
AbstractPeer.url
是由volatile修饰的,通过调试发现,这个url的基本信息path
,host
等都是不变的,变的只有parameters
。可见这个url在这里的作用与具体的服务无关,只是刷新参数信息,可能是与server相关的一些参数调整。当然参数调整也不是任何都会调整,回过来看AbstractServer#reset
刚才讲了三个参数的赋值,都是有条件的accepts
与idleTimeout
只能是单调递增的调整。这就是要把server的这些参数调整为暴露服务中的最大值以保证配置生效。
讲完了createServer
和server.reset
了,整个openServer
的过程大概清楚了。
然后调用optimizeSerialization(URL url)方法,将指定的序列化类加载到缓存,通常配置kryo, fst序列化,不配置,默认用hessian2序列化。
到此为止服务的远程暴露过程就结束了。回到RegistryProtocol
,知道了doLoaclExport
后来看看是如何注册的:
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
首先看一下注册表的结构:
public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) {
ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl);
String serviceUniqueName = providerUrl.getServiceKey();
Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
if (invokers == null) {
providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ProviderInvokerWrapper>());
invokers = providerInvokers.get(serviceUniqueName);
}
invokers.add(wrapperInvoker);
}
通过ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> 分别维护了provider与consumer的服务映射,key为服务的group/serviceName:version组装,映射了不同的invoker。invoker由ProviderInvokerWrapper
装饰,封装了invoker,注册中心url,invoker的url,provider的url。同时由isReg
来标记是否注册成功。
接下来看注册的过程,主要分析下register
方法。
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
看看AbstractRegistryFactory#getRegistry
:
@Override
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}