Dubbo服务提供端启动时序图
首先通过一个时序图直观地看一下Dubbo服务提供端的启动流程
源码解析
Dubbo服务的发布是通过ServiceConfig Api进行的,下面我们根据时序图一步一步分析,因为整个流程相对较长,所以我这里就只挑选核心逻辑分析。
- org.apache.dubbo.config.ServiceConfig#export
public synchronized void export() {
//检查并更新配置
checkAndUpdateSubConfigs();
//是否应该发布服务,默认true
if (!shouldExport()) {
return;
}
//是否延时发布
if (shouldDelay()) {
delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
} else {
//发布服务
doExport();
}
}
- org.apache.dubbo.config.ServiceConfig#doExport
protected synchronized void doExport() {
//是否取消发布,如果调用unexport()则unexported为true
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
//服务是否已经发布,或正在发布
if (exported) {
return;
}
//先标记exported=true,避免服务重复发布
exported = true;
//如果path为空,将interfaceName赋值给path
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
//服务发布
doExportUrls();
}
这里用volatile修饰也是为了保证可见性,从而保证线程直接从主内存中获取exported,而不是获取的线程工作内存中exported变量,从而避免相同服务的重复发布。
private transient volatile boolean exported;
- org.apache.dubbo.config.ServiceConfig#doExportUrls
private void doExportUrls() {
//加载registries中的RegistryConfig配置以及system property组装registryURL列表
List<URL> registryURLs = loadRegistries(true);
//遍历协议配置(ProtocolConfig )
for (ProtocolConfig protocolConfig : protocols) {
//组装pathKey group/contentPath/path:version
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
//构建 ProviderModel
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
//将providerModel 缓存到 providedServices中pathKey =>providerModel
ApplicationModel.initProviderModel(pathKey, providerModel);
//对于每个ProtocolConfig配置,组装providerURL,注册到每个注册中心上
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
- registryURL
这个URL表示他是一个registry协议,127.0.0.1:2181是注册中心的地址,服务接口是RegistryService,注册中心使用的是zookeeper
- org.apache.dubbo.config.AbstractInterfaceConfig#loadRegistries
加载注册表并将其转换为registryURL,优先顺序为:系统属性> dubbo注册表配置
protected List<URL> loadRegistries(boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
if (CollectionUtils.isNotEmpty(registries)) {
//遍历registries,加载每一个注册配置,将其转化成registryURL
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (StringUtils.isEmpty(address)) {
address = Constants.ANYHOST_VALUE;
}
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
//先加载ApplicationConfig->RegistryConfig->system property,优先级由低到高
appendParameters(map, application);
appendParameters(map, config);
map.put(Constants.PATH_KEY, RegistryService.class.getName());
appendRuntimeParameters(map);
if (!map.containsKey(Constants.PROTOCOL_KEY)) {
map.put(Constants.PROTOCOL_KEY, Constants.DUBBO_PROTOCOL);
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
//构建RegistryURL
url = URLBuilder.from(url)
//将使用的注册协议存储到RegistryURL,key="registry"
.addParameter(Constants.REGISTRY_KEY, url.getProtocol())
.setProtocol(Constants.REGISTRY_PROTOCOL)
.build();
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
- org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
根据配置以及其他一些信息组装providerUrl(服务提供者URL)。URL 是 Dubbo 配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递.
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//------------------------将各种配置加载到map中,作为providerUrl的参数 start-----------------------
//获取协议名称
String name = protocolConfig.getName();
//如果没有设置,默认为dubbo协议
if (StringUtils.isEmpty(name)) {
name = Constants.DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
//设置side=provide
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
//将运行时参数加载到map 中
appendRuntimeParameters(map);
//将ApplicationConfig,ModuleConfig,ProviderConfig,ProtocolConfig,ServiceConfig配置中的参数加载到map中
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
//methods不为空
if (CollectionUtils.isNotEmpty(methods)) {
//对methods中的MethodConfig 进行遍历,将MethodConfig 中配置参数加载到map中
for (MethodConfig method : methods) {
//加载 method 中配置的参数,并且key=methodName+"."+parameterName
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
//如果arguments 不为空,加载ArgumentConfig配置参数到map中
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
//是否泛型暴露,设置generic,及methods=*,否则获取服务接口的所有method
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
//如果不是泛型暴露,key:methods ,value:服务接口所有方法
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
//是否设置token
if (!ConfigUtils.isEmpty(token)) {
//如果设置token,且token=true||token=default,使用UUID作为token
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {//否则使用用户设置的token
map.put(Constants.TOKEN_KEY, token);
}
}
// export service
//服务地址
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
//服务端口
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// -----------------------到这里各种配置都加载到了map中 end------------------------------
//将name,host,port,path,map组装成providerUrl
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
//利用Configurator实例配置 providerUrl
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//-------------------------根据scope选择服务发布的范围 ---------------------------------
// none:不发布
// local:只暴露本地服务
// remote:只暴露远程服务
// null:既暴露本地服务,也暴露远程服务
//获取服务暴露范围
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
//如果scope.equals(none),表示不暴露服务,否则暴露服务
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
//如果!scope.equals(remote),则暴露本地服务
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
//如果!scope.equals(local),则暴露远程服务
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
//如果有注册中心,则将服务注册到注册中心
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//如果registryURL存在dynamic配置,为providerUrl添加dynamic
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
//从系统属性中加载监视器配置,构建monitorUrl
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
//将monitorUrl 添加到providerUrl中
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// 对于服务提供端,用户可以指定ProxyFactory的拓展实现类来生成Invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
//如果指定了providerUrl中存在proxy配置,将proxy添加到registryURL中
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
//ProxyFactory$Adaptive根据registryURL中的proxy,选择对应的拓展实现类,默认是JavassistProxyFactory
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//Protocol$Adaptive 获取wrapperInvoker#url 指定的protocol拓展实现类,这里是远程服务暴露,protocol为registry,会选择protocol实现类RegistryProtocol,将wrapperInvoker暴露远程服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
//缓存暴露的远程服务(exporter)
exporters.add(exporter);
}
} else {
//直连方式
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
//元数据存储
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
}
- providerUrl
这个URL表示它是一个dubbo协议(DubboProtocol)的服务提供者url,地址是当前服务器的ip,端口是要暴露的服务的端口号,可以从dubbo:protocol配置,服务接口为dubbo:service配置发布的接口。
- 暴露本地服务
(1). org.apache.dubbo.config.ServiceConfig#exportLocal
本地服务暴露
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
//构建injvm协议URL
URL local = URLBuilder.from(url)
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
//使用InjvmProtocol生成InjvmInvoker
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
//缓存到exporters中
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
(2). org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol#export
构建InjvmExporter
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
(3). org.apache.dubbo.rpc.protocol.injvm.InjvmExporter#InjvmExporter
InjvmExporter构造器,记录InjvmExporter到AbstractProtocol#exporterMap中
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
//InjvmExporter构造函数会将自己保存到AbstractProtocol#exporterMap中, serviceKey:this
exporterMap.put(key, this);
}
- org.apache.dubbo.rpc.ProxyFactory#getInvoker
主要作用:使用Dubbo SPI生成ProxyFactory适配器,选择指定的代理工程来生成服务实现类的代理类,当调用Invoker#invoke时,实际上调用的是根据服务实现类生成的包装类(Wrapper)的invokeMethod函数;
用户如果没有指定proxy,默认使用JavassistProxyFactory.我们看一下JavassistProxyFactory#getInvoker
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 将服务实现类转换成Wrapper 类
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//对服务实现类进行代理
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
- org.apache.dubbo.common.bytecode.Wrapper#getWrapper
对服务实现类进行包装,生成wrapper实例;
这里我们看一下invokeMethod方法
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
com.books.dubbo.demo.api.GreetingService w;
try {
w = ((com.books.dubbo.demo.api.GreetingService) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("test".equals($2) && $3.length == 1) {
return ($w) w.testGeneric((com.books.dubbo.demo.api.PoJo) $4[0]);
}
if ("say".equals($2) && $3.length == 1) {
return ($w) w.sayHello((java.lang.String) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.books.dubbo.demo.api.GreetingService.");
}
不难发现invokeMethod其实就是根据methodName调用服务实现类proxy对应的方法.从而减少了反射的调用.
- org.apache.dubbo.rpc.Protocol#export
时序图步骤8
导出服务以进行远程调用
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
Protocol$Adaptive 根据Invoker#url 指定的protocol选择拓展实现类,将Invoker导出远程服务,由于wrapperInvoker#url为registryURL,protocol为registry,所以Protocol的拓展实现类会选择RegistryProtocol
- Protocol的wrapper会对拓展实现进行增强
时序图步骤9
这里只展示了ProtocolFilterWrapper,ProtocolFilterWrapper会将拓展实现加入到Filter责任链中
- org.apache.dubbo.registry.integration.RegistryProtocol#export
时序图步骤10
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 替换originalInvoker#url的protocol,如果配置zookeeper作为注册中心,则使用zookeeper作为protocol,即将registry://改成类似zookeeper://
URL registryUrl = getRegistryUrl(originInvoker);
// 取出存储在originInvoker#url中的providerURL
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
/**
* 生成overrideSubscribeUrl,与监听有关
* 在providerUrl的基础上,生成overrideSubscribeUrl,协议为provider://,增加参数category=configurators&check=false
*overrideSubscribeUrl是用来对动态配置监听的,需要监听服务对应的动态配置目录(configurators)
*一个overrideSubscribeUrl对应一个OverrideListener,用来监听overrideSubscribeUrl变化事件
*
*/
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//导出invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 根据调用者的地址获取注册表的实例
final Registry registry = getRegistry(originInvoker);
//获取注册到注册中心的url,并过滤掉一些参数
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
//将originInvoker注册到本地注册表中
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
//是否要讲providerURL注册到注册中心
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
//将providerURL注册到注册中心
register(registryUrl, registeredProviderUrl);
//将刚刚注册到本地注册表中的providerInvokerWrapper设置为已注册
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
//订阅监听目录
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
- org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
时序图步骤11
此时由于invokerDelegete#url protocol:dubbo,所以使用的Protocol实现拓展类是DubboProtocol,DubboProtocol同样被相应的wrapper类包裹进行增强。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
});
}
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
时序图步骤12
将Invoker转化成Exporter,并启动server进行监听服务
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//获取providerURL
URL url = invoker.getUrl();
// export service.
//组装serviceKey :group/interfaceName/version/port,serviceKey 作为服务的唯一标识
String key = serviceKey(url);
//组装DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
//将DubboExporter缓存到exporterMap ,服务提供方处理请求时会从中取出
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//启动服务端监听
openServer(url);
optimizeSerialization(url);
return exporter;
}
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer
时序图步骤12
启动服务监听
private void openServer(URL url) {
// 服务提供者机器的地址ip:port
String key = url.getAddress();
//只有服务提供端才能启动监听
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
//先从缓存中拿ExchangeServer
ExchangeServer server = serverMap.get(key);
if (server == null) {// dubbo check
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//如果缓存中没有,则创建server,并缓存到serverMap中,以 url.getAddress()作为key
//由于每个机器ip:port是唯一的,所以多个不同服务启动时只有第一个会创建监听,后面的都是直接从缓存中获取返回的
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
- org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
时序图步骤16
设置默认参数,校验providerURL的server,client 参数是否存在相应的Transporter SPI 实现,创建Server,启动服务监听
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// 服务器关闭时 发送readonly事件
.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// 为providerUrl增加heartbeat属性,表示心跳间隔时间,默认为60*1000,表示60s
.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT))
//为providerUrl增加codec属性,默认协议为dubbo,编解码协议
.addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
.build();
//获取providerUrl的server参数,默认 是netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
//校验是否存在name为str的Transporter SPI拓展实现,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
//创建ExchangeServer,并将org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler作为请求处理器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
//获取providerUrl的client参数
str = url.getParameter(Constants.CLIENT_KEY);
//校验是否存在name为str的Transporter SPI拓展实现,不存在则抛出异常
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
- org.apache.dubbo.remoting.exchange.Exchangers#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.exchange.ExchangeHandler)
时序图步骤17
参数校验,创建ExchangeServer
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
//如果providerURL没有设置codec参数,使用exchangeCodec编解码
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
获取Exchanger SPI实现
时序图步骤19 20
public static Exchanger getExchanger(URL url) {
//获取providerURL exchanger参数,默认值 header
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
//获取Exchanger SPI拓展实现,默认HeaderExchanger
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
- org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
时序图步骤21
创建请求处理器->创建服务监听->封装成HeaderExchangeServer
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
- org.apache.dubbo.remoting.Transporters#bind(org.apache.dubbo.common.URL, org.apache.dubbo.remoting.ChannelHandler...)
时序图步骤22
校验参数,创建Server
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
//如果只有一个请求处理器,则直接使用该处理器创建Server
if (handlers.length == 1) {
handler = handlers[0];
} else {//负责封装成ChannelHandlerDispatcher,作为Server 的请求处理器,ChannelHandlerDispatcher其实就是ChannelHandler的组合模式实现
handler = new ChannelHandlerDispatcher(handlers);
}
//基于providerURL,handler创建Server
return getTransporter().bind(url, handler);
}
获取Transporter 的适配器类实例 , 2.7版本netty4是默认的Transporter 的SPI实现,在2.6版本使用的是netty3;
时序图步骤23 24 25
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
- org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind
时序图步骤26
创建NettyServer
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
- org.apache.dubbo.remoting.transport.netty.NettyServer#NettyServer
时序图步骤27
ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME):设置providerURL threadname参数;
对handler进一步包装增强->创建 NettyServer;
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
- org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
//对handler进行包装,增强
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
//选择线层模型,默认的线程模型(Dispatcher):AllDispatcher
/**
* 处理器责任链:MultiMessageHandler->HeartbeatHandler->AllChannelHandler(所有请求都派发到业务线程池)->DecodeHandler->HeaderExchangeHandler->
* org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler
*/
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
- org.apache.dubbo.remoting.transport.AbstractServer#AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
//从providerURL中获取bindIp,port
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = Constants.ANYHOST_VALUE;
}
//组装bindAddress
bindAddress = new InetSocketAddress(bindIp, bindPort);
//最大连接数
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
//空闲超时时长
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
//调用子类方法 开启服务器
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
- org.apache.dubbo.remoting.transport.AbstractEndpoint#AbstractEndpoint
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
this.codec = getChannelCodec(url);
this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
- org.apache.dubbo.remoting.transport.AbstractPeer#AbstractPeer
public AbstractPeer(URL url, ChannelHandler handler) {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
this.url = url;
this.handler = handler;
}
- org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen
启动NettyServer,至此服务提供端的Server启动完成。
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
//NettyServer中保存了之前构建的处理器,且NettyServer本身也继承了ChannelHandler,将NettyServer保存到NettyServerHandler 中
//处理器责任链: NettyServerHandler ->NettyServer->MultiMessageHandler->....
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
//编解码适配器
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// 绑定监听地址
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
服务监听启动好后,接下来要做的就是将服务注册到注册中心
- org.apache.dubbo.registry.integration.RegistryProtocol#getRegistry
时序图步骤33
根据调用者的地址获取注册中心实例
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
//根据registryUrl获取注册中心实例
return registryFactory.getRegistry(registryUrl);
}
//将registryURL的协议转换成真正要使用的注册中心的协议
private URL getRegistryUrl(Invoker<?> originInvoker) {
//获取registryURL
URL registryUrl = originInvoker.getUrl();
//如果registryURL的协议是registry,将registryURL的协议替换成之前以key="registry"存储在registryUrl中的注册协议.
//我这里使用了zookeeper作为注册中心所以:registry:// --->zookeeper://
if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
}
return registryUrl;
}
- org.apache.dubbo.registry.support.AbstractRegistryFactory#getRegistry
根据registryURL获取注册表实例
public Registry getRegistry(URL url) {
//在registryURL基础上设置path,添加interface参数,移除export,refer参数
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY)
.build();
//获取标识注册中心唯一性的key
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
//先从缓存中获取
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//如果没有则调用子类实现,创建相应的注册中心实例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
//缓存到本地
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
- org.apache.dubbo.registry.integration.RegistryProtocol#register
时序图步骤34
将registryUrl注册到注册中心
public void register(URL registryUrl, URL registeredProviderUrl) {
//直接从缓存中取出之前已经创建好的注册中心实例
Registry registry = registryFactory.getRegistry(registryUrl);
//将registeredProviderUrl注册到注册中心
registry.register(registeredProviderUrl);
}
- org.apache.dubbo.registry.support.FailbackRegistry#register
public void register(URL url) {
super.register(url);
//从failedRegistered中移除registeredProviderUrl
removeFailedRegistered(url);
//从failedUnregistered移除registeredProviderUrl
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
//调用子类实现注册 registeredProviderUrl
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
- org.apache.dubbo.registry.zookeeper.ZookeeperRegistry#doRegister
如果使用的zookeeper作为注册中心,服务的注册实际就是在zookeeper创建一个节点
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
总结
至此,Dubbo服务提供端启动流程就分析完了。总结一下:
- 将ApplicationConfig,ModuleConfig,RegistryConfig,ServiceConfig,ProviderConfig,MethodConfig,RuntimeParameters组装成registryURL, providerURL,URL 是 Dubbo 配置的载体,通过 URL 可让 Dubbo 的各种配置在各个模块之间传递.
- 对服务实现类转换成Wrapper然后其进行代理,得到代理对象Invoker,减少反射的调用
- 根据providerURL中的协议(protocol)构建Exporter默认使用dubbo协议-->DubboExporter
- 启动服务监听,根据providerURL中的参数启动服务Server,默认netty4.NettyServer
- 根据registryURL获取注册中心实例,将registeredProviderUrl注册到注册中心
- 订阅监听目录,对于服务提供端来说,会监听configurators目录
因为篇幅过长,如果有理解有误的地方,希望大家提出宝贵意见。