1. Eureka Client自动配置的加载
当引入Eureka Client的相关依赖,如下:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
会下载spring-cloud-netflix-eureka-client-2.1.0.RELEASE.jar包,版本号可能会因为Spring Cloud的版本号不同而不同,此包的MATA-INF/spring.factories文件中配置了EurekaClientAutoConfiguration类,如下为MATA-INF/spring.factories的内容:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
所以当SpringBoot启动的时候自动加载配置就会加载到EurekaClientAutoConfiguration这个Configuration,这个类注入了三个和Eureka Client相关的核心接口:EurekaDiscoveryClient、EurekaClientConfigBean和com.netflix.discovery包下的DiscoveryClient,其相关源码如下:
@Bean
@ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT)
public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
EurekaClientConfigBean client = new EurekaClientConfigBean();
if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
client.setRegisterWithEureka(false);
}
return client;
}
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
@org.springframework.cloud.context.config.annotation.RefreshScope
@Lazy
public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance,
@Autowired(required = false) HealthCheckHandler healthCheckHandler) {
ApplicationInfoManager appManager;
if(AopUtils.isAopProxy(manager)) {
appManager = ProxyUtils.getTargetObject(manager);
} else {
appManager = manager;
}
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager, config, this.optionalArgs,
this.context);
cloudEurekaClient.registerHealthCheck(healthCheckHandler);
return cloudEurekaClient;
}
@Bean
public DiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) {
return new EurekaDiscoveryClient(client, clientConfig);
}
CloudEurekaClient继承了DiscoveryClient,DiscoveryClient实现了EurekaClient接口。
2. EurekaClientConfig
EurekaClientConfig是一个接口,其实现类为EurekaClientConfigBean,此类保存了两类信息,第一是Eureka Client的自身的配置信息,如hostname、端口、实例名、应用名等,这写信息需要注册到Eureka Server。第二是和Eureka Server保持通信的以及心跳的时间间隔信息等,另外此类有个Map属性serviceUrl, 这个Map保存了Eureka Server的地址信息。
3. EurekaDiscoveryClient
此类对外暴漏了获取Eureka Client自身的信息以及从Eureka Server获取的服务列表信息的接口。依赖DiscoveryClient和EurekaClientConfig接口。Ribbon做负载均衡的时候就是通过此接口获取服务列表。
4. DiscoveryClient
com.netflix.discovery包下的DiscoveryClient类是Eureka Client的核心类,Eureka Client向Eureka Server进行服务注册、服务发现以及续约的逻辑实现都在此类中。此类在构造函数中初始化了用于服务注册、服务发现以及服务续约的线程池以及调度器。并向Eureka Server发起Http请求获取服务列表缓存至其属性localRegionApps中。如下部分源码:
private final ScheduledExecutorService scheduler;
private final ThreadPoolExecutor heartbeatExecutor;
private final ThreadPoolExecutor cacheRefreshExecutor;
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
// 略
try {
// 创建任务调度线程池
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true).build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true).build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true).build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
initScheduledTasks();
}
// 从Eureka Server获取服务列表并缓存起来
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
// 全量获取服务注册列表
getAndStoreFullRegistry();
} else {
// 增量获取服务注册列表
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
/**
* 初始化调度任务
*/
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 略
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
此类的核心方法有:
- getAndStoreFullRegistry()
- register()
- renew()
- unregister()
- shutdown()
- refreshRegistry()
- refreshInstanceInfo()
- getAndUpdateDelta()
5. 服务注册-InstanceInfoReplicator
InstanceInfoReplicator是一个定时任务,默认每隔30s刷新一次本地InstanceInfo,如果发现本地InstanceInfo有变化则调用DiscoveryClient的register()方法把InstanceInfo同步至Eureka Server。InstanceInfoReplicator首先会调用DiscoveryClient的refreshInstanceInfo()方法会刷新自身实例信息,这些实例信息是hostname、续约过期时间、续约间隔时间、实例状态,如果发现这些信息发生了变化则将InstanceInfo标记为dirty,接着调用DiscoveryClient的register()方法进行服务注册,相关源码如下:
public void run() {
try {
// 刷新本地InstanceInfo
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
// 如果有变化则向Eureka Server进行服务注册
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
void refreshInstanceInfo() {
// 以下两行检测hostname、续约过期时间、续约间隔时间、实例状态是否发生变化。
applicationInfoManager.refreshDataCenterInfoIfRequired();
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
boolean register() throws Throwable {
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
通过上面的源码可以看到,服务注册很简单,就是通过REST调用把自身InstanceInfo发送到Eureka Server。另外还有两个和服务注册相关的地方:
- 当Eureka Client发起续约调用的时候如果返回404那么就会发起一次服务注册,详见下面的服务续约。
- Eureka Client发起第一次服务注册并不是依赖此任务,而是在执行类EurekaAutoServiceRegistration的start()方法时进行的,此方法执行的时候会调用EurekaServiceRegistry的register()方法进行注册,它直接调用EurekaRegistration的getApplicationInfoManager()获取ApplicationInfoManager,然后调用ApplicationInfoManager的setInstanceStatus(),此方法会发布StatusChangeEvent事件,事件被监听然后调用InstanceInfoReplicator的onDemandUpdate()方法,此方法调用InstanceInfoReplicator的run()方法发起第一次服务注册。而且第一次服务注册并不存在网上某些文章提到的延迟40s的问题。
6. 服务续约-HeartbeatThread
HeartbeatThread线程默认每隔30s向Eureka Server发送一次服务续约,通过调用DiscoveryClient的renew()方法向Eureka Server发送心跳实现。值得注意的一点是,当Discovery Client发起续约Eureka Server却返回404时,Discovery Client会重新发起一次服务注册。具体实现为:Eureka Client首先会标记InstanceInfo为isDirtyWithTime,然后向Eureka Server发起服务注册,服务注册成功抹去isDirtyWithTime,如果服务注册失败则抹去isDirtyWithTime,此时因为已经标记了InstanceInfo为isDirtyWithTime,所以InstanceInfoReplicator定时任务会再次发起服务注册。此举实现了Eureka Client被Eureka Server剔除后再次恢复时自动注册。Eureka Server如果90s(Eureka Server实现的时候有个小bug,实际至少180s)没有收到服务的续约请求则将此服务从服务注册列表中删除。相关源码如下:
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
// 续约时返回404标记InstanceInfo的isDirtyWithTime为true
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
// 续约成功才抹去isDirtyWithTime
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
7. 服务发现-CacheRefreshThread
CacheRefreshThread线程默认每隔30s被调度任务执行一次,其每执行一次就通过调用DiscoveryClient的refreshRegistry()方法向Eureka Server获取一次最近3分钟内变更的服务注册列表信息并更新本地缓存,也就是说此举是增量获取,关注新增、修改、添加的服务实例(InstanceInfo)。这里有两个值得注意的地方:
- 当Eureka Server返回状态不是OK或者返回增量信息为空时进行一次全量获取。
-
当Eureka Server返回的appsHashCode和本地的appsHashCode不一致时进行一次全量获取。
appsHashCode是由实例状态名称以及此状态的实例的数量通过下划线拼接而来的字符串。此举可以避免Eureka Client因网络问题导致和Eureka Server通信中断从而错过变更的服务列表的问题。毕竟Eureka Server默认只保留最近3分钟变更的服务列表。
相关源码如下:
/**
* The task that fetches the registry information at specified intervals.
*
*/
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
void refreshRegistry() {
try {
boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
boolean remoteRegionsModified = false;
// This makes sure that a dynamic change to remote regions to fetch is honored.
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
if (null != latestRemoteRegions) {
String currentRemoteRegions = remoteRegionsToFetch.get();
if (!latestRemoteRegions.equals(currentRemoteRegions)) {
// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
synchronized (instanceRegionChecker.getAzToRegionMapper()) {
if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
String[] remoteRegions = latestRemoteRegions.split(",");
remoteRegionsRef.set(remoteRegions);
instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
remoteRegionsModified = true;
} else {
logger.info("Remote regions to fetch modified concurrently," +
" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
}
}
} else {
// Just refresh mapping to reflect any DNS/Property change
instanceRegionChecker.getAzToRegionMapper().refreshMapping();
}
}
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
// 略
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
// 如果Eureka Server返回不是OK或者为空都会进行服务列表的全量获取
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
// 增量获取注册列表返回null就进行全量服务列表获取
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// 如果appsHashCode不一致进行服务列表全量获取
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
private void updateDelta(Applications delta) {
int deltaCount = 0;
for (Application app : delta.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
// 略
++deltaCount;
if (ActionType.ADDED.equals(instance.getActionType())) {
// 处理新增的实例
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) {
// 处理变更的实例
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.DELETED.equals(instance.getActionType())) {
// 处理删除的实例
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp != null) {
existingApp.removeInstance(instance);
if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
applications.removeApplication(existingApp);
}
}
}
}
}
getApplications().setVersion(delta.getVersion());
getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
for (Applications applications : remoteRegionVsApps.values()) {
applications.setVersion(delta.getVersion());
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}
private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable {
RECONCILE_HASH_CODES_MISMATCH.increment();
long currentUpdateGeneration = fetchRegistryGeneration.get();
// 全量获取服务列表
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
Applications serverApps = httpResponse.getEntity();
if (serverApps == null) {
logger.warn("Cannot fetch full registry from the server; reconciliation failure");
return;
}
if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 更新本地缓存服务列表
localRegionApps.set(this.filterAndShuffle(serverApps));
getApplications().setVersion(delta.getVersion());
} else {
logger.warn("Not setting the applications map as another thread has advanced the update generation");
}
}
8. 服务下线
DiscoveryClient Bean被销毁之前会调用shutdown(),此方法调用了unregister()方法进行服务下线,相关源码如下:
/**
* Shuts down Eureka Client. Also sends a deregistration request to the
* eureka server.
*/
@PreDestroy
@Override
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
unregister();
}
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
/**
* 服务下线
*/
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
8. 总结
- Eureka Client在启动时首先会注册自身实例信息。
- Eureka Client默认每隔30s检测一次自身实例信息是否发生了变化,如果有变化发生则进行一次服务注册。
- Eureka Client默认每隔30s向Eureka Server发送一次服务续约,如果Eureka Server响应404则进行一次服务注册,Eureka Server响应404表示由于某种原因服务被剔除了,此时应当进行一次服务注册。
- Eureka Client默认每隔30s获取一次服务注册列表的增量信息,如果增量获取失败则进行一次全量获取。
- DiscoveryClient Bean被销毁之前会进行服务下线。
如下为相关类图: