不怕难之Spring Cloud系列之Eureka


一、前言

Spring Cloud 是一系列框架的有序集合。它利用 Spring Boot 的开发便利性巧妙地简化了分布式系统基础设施的开发,如服务发现注册、配置中心、消息总线、负载均衡、断路器、数据监控等,都可以用 Spring Boot 的开发风格做到一键启动和部署。

二、简单介绍

1. WHY(出现的时机)

CAP 理论

CAP 分别为 consistency(强一致性)、availability(可用性) 和 partition tolerance(分区容错性)。

理论核心:一个分布式系统不可能同时很好的满足一致性、可用性和分区容错性这三个需求。因此,根据 CAP 原理将 NoSQL 数据库分成满足 CA 原则、满足 CP 原则和满足 AP 原则三大类:

CA:单点集群,满足一致性,可用性的系统,通常在可扩展性上不高

CP: 满足一致性,分区容错性的系统,通常性能不是特别高

AP: 满足可用性,分区容错性的系统,通过对一致性要求较低

CAP 理论描述在分布式存储系统中,最多只能满足两个需求。

Zookeeper 保证 CP

当向注册中心查询服务列表时,我们可以容忍注册中心返回的是几分钟前的注册信息,但不能接受服务直接挂掉不可用了。因此,服务注册中心对可用性的要求高于一致性。

但是,zookeeper 会出现一种情况,当 master 节点因为网络故障与其他节点失去联系时,剩余节点会重新进行 leader 选举。问题在于,选举 leader 的时间较长,30 ~ 120 秒,且选举期间整个 zookeeper 集群是不可用的,这期间会导致注册服务瘫痪。在云部署的环境下,因网络问题导致 zookeeper 集群失去 master 节点的概率较大,虽然服务能最终恢复,但是漫长的选举时间导致注册服务长期不可用是不能容忍的。

Eureka 保证 AP

Eureka 在设计上优先保证了可用性。EurekaServer 各个节点都是平等的,几个节点挂掉不会影响正常节点的工作,剩余的节点依然可以提供注册和发现服务。

而 Eureka 客户端在向某个 EurekaServer 注册或发现连接失败时,会自动切换到其他 EurekaServer 节点,只要有一台 EurekaServer 正常运行,就能保证注册服务可用,只不过查询到的信息可能不是最新的。

除此之外,EurekaServer 还有一种自我保护机制,如果在 15 分钟内超过 85% 的节点都没有正常的心跳,

Eureka 可以很好的应对因网络故障导致部分节点失去联系的情况,而不会向 Zookeeper 那样是整个注册服务瘫痪。

2.WHAT(是什么)

Eureka 是 Netflix 的子模块,它是一个基于 REST 的服务,用于定位服务,以实现云端中间层服务发现和故障转移。

服务注册和发现对于微服务架构而言,是非常重要的。有了服务发现和注册,只需要使用服务的标识符就可以访问到服务,而不需要修改服务调用的配置文件。该功能类似于 Dubbo 的注册中心,比如 Zookeeper。

Eureka 采用了 CS 的设计架构。Eureka Server 作为服务注册功能的服务端,它是服务注册中心。而系统中其他微服务则使用 Eureka 的客户端连接到 Eureka Server 并维持心跳连接。

3. HOW(运行原理图)


由图可知,Eureka 的运行原理和 Dubbo 大同小异, Eureka 包含两个组件: Eureka Server 和 Eureka Client。

Eureka Server 提供服务的注册服务。各个服务节点启动后会在 Eureka Server 中注册服务,Eureka Server 中的服务注册表会存储所有可用的服务节点信息。

Eureka Client 是一个 Java 客户端,用于简化 Eureka Server 的交互,客户端同时也具备一个内置的、使用轮询负载算法的负载均衡器。在应用启动后,向 Eureka Server 发送心跳(默认周期 30 秒)。如果 Eureka Server 在多个心跳周期内没有接收到某个节点的心跳,Eureka Server 会从服务注册表中将该服务节点信息移除。


4. 高可用架构图



三、入门使用

1. 添加依赖

<dependencyManagement>

<dependencies>

<dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-dependencies</artifactId>

<version>Greenwich.RELEASE</version>

<type>pom</type>

<scope>import</scope>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.1.3.RELEASE</version>

<type>pom</type>

<scope>import</scope>

</dependency>

</dependencies>

</dependencyManagement>

<dependencies>

<!-- eureka 服务端 -->

<dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>

</dependency>

</dependencies>


2. 配置文件

server:

    port:9000

spring:

  application:

    name:EUREKA

eureka:

    instance:

        hostname:localhost# eureka 实例名称

    client:

        register-with-eureka:false# 不向注册中心注册自己

        fetch-registry:false# 是否检索服务

        service-url:

            defaultZone:http://${eureka.instance.hostname}:${server.port}/eureka/# 注册中心访问地址


3.开启注册功能

@EnableEurekaServer

@SpringBootApplication

publicclass EurekaServerApplication {

public static void main(String[] args) {

        SpringApplication.run(EurekaServerApplication.class, args);

    }

}


四. Eureka Client源码赏析

1. 介绍

Eureka Client负责了下面的任务:

- 向Eureka Server注册服务实例

- 向Eureka Server为租约续期

- 当服务关闭期间,向Eureka Server取消租约

- 查询Eureka Server中的服务实例列表

Eureka Client还需要配置一个Eureka Server的URL列表。

2. 入口类DiscoveryClient

2.1 注解源码

@Target(ElementType.TYPE)

@Retention(RetentionPolicy.RUNTIME)

@Documented

@Inherited

@Import(EnableDiscoveryClientImportSelector.class)

public@interfaceEnableDiscoveryClient {

}

2.2 类结构



3.服务注册类DiscoveryClient

private void initScheduledTasks(){

    ...

if(clientConfig.shouldRegisterWithEureka()) {

        ...

// InstanceInfo replicator

instanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),

2);// burstSize

        ...

        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());

}else{

       logger.info("Not registering with Eureka server per configuration");

    }

}

InstanceInfoReplicator类的实例,它会执行一个定时任务,查看该类的run()函数了解该任务做了什么工作

public void run(){

try{

        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();

        if(dirtyTimestamp !=null) {

            discoveryClient.register();

            instanceInfo.unsetIsDirty(dirtyTimestamp);

        }

}catch(Throwable t) {

        logger.warn("There was a problem with the instance info replicator", t);

}finally{

        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);

        scheduledPeriodicRef.set(next);

    }

}

discoveryClient.register();这一行,真正触发调用注册的地方就在这里

boolean register()  throwsThrowable{

    logger.info(PREFIX + appPathIdentifier +": registering service...");

    EurekaHttpResponse<Void> httpResponse;

   try{

        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);

   }catch(Exception e) {

        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);

         throw e;

    }

if(logger.isInfoEnabled()) {

        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());

    }

return  httpResponse.getStatusCode() ==204;

}

注册操作也是通过REST请求的方式进行的。同时,这里我们也能看到发起注册请求的时候,传入了一个com.netflix.appinfo.InstanceInfo对象,该对象就是注册时候客户端给服务端的服务的元数据

默认情况下,Eureka使用Jersey和XStream配合JSON作为Server与Client之间的通讯协议。

4. 服务获取与服务续约

DiscoveryClient的initScheduledTasks函数,不难发现在其中还有两个定时任务,分别是“服务获取”和“服务续约”:

private void initScheduledTasks(){

if(clientConfig.shouldFetchRegistry()) {

// registry cache refresh timer

intregistryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();

intexpBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();

        scheduler.schedule(

new TimedSupervisorTask(

     "cacheRefresh",scheduler,cacheRefreshExecutor, registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,newCacheRefreshThread()

),

                registryFetchIntervalSeconds, TimeUnit.SECONDS);

}

if(clientConfig.shouldRegisterWithEureka()) {

      intrenewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();

      intexpBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();

      logger.info("Starting heartbeat executor: "+"renew interval is: "+ renewalIntervalInSecs);

     // Heartbeat timer

        scheduler.schedule(

new TimedSupervisorTask(

       "heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,newHeartbeatThread()

                ),

                renewalIntervalInSecs, TimeUnit.SECONDS);

         // InstanceInfo replicator

……

}

}

其中“服务续约”的实现较为简单,直接以REST请求的方式进行续约

boolean renew(){

      EurekaHttpResponse<InstanceInfo> httpResponse;

try{

      httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo,null);

       logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());

if(httpResponse.getStatusCode() ==404) {

       REREGISTER_COUNTER.increment();

       logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());

       return  register();

        }

      return httpResponse.getStatusCode() ==200;

}catch(Throwable e) {

    logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);

    returnfalse;

    }

}


五. Eureka Server源码赏析

通过上面的源码分析,可以看到所有的交互都是通过REST的请求来发起的。下面我们来看看服务注册中心对这些请求的处理

1. 接收请求

@POST

@Consumes({"application/json","application/xml"})

public  Response  addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication){

       logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);

      // validate that the instanceinfo contains all the necessary required fields

      ...

    // handle cases where clients may be registering with bad DataCenterInfo with missing data

    DataCenterInfo dataCenterInfo = info.getDataCenterInfo();

   if(dataCenterInfoinstanceofUniqueIdentifier) {

        String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();

   if(isBlank(dataCenterInfoId)) {

booleanexperimental ="true".equalsIgnoreCase(

serverConfig.getExperimental("registration.validation.dataCenterInfoId"));

  if(experimental) {

        String entity ="DataCenterInfo of type "+ dataCenterInfo.getClass()+" must contain a valid id";

     return Response.status(400).entity(entity).build();

}else if(dataCenterInfo instanceof AmazonInfo) {

                AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;

                String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);

               if(effectiveId ==null) {

                    amazonInfo.getMetadata().put( AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());

                }

}else{

     logger.warn("Registering DataCenterInfo of type {} without an appropriate id",

     dataCenterInfo.getClass());

            }

        }

    }

registry.register(info,"true".equals(isReplication));

returnResponse.status(204).build();// 204 to be backwards compatible

}

在注册函数中,先调用publishEvent函数,将该新服务注册的事件传播出去,然后调用com.netflix.eureka.registry.AbstractInstanceRegistry父类中的注册实现,将InstanceInfo中的元数据信息存储在一个ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>对象中,它是一个两层Map结构,第一层的key存储服务名:InstanceInfo中的appName属性,第二层的key存储实例名:InstanceInfo中的instanceId属性。

public void register(InstanceInfo info,int leaseDuration,boolean isReplication){

  if(log.isDebugEnabled()) {

          log.debug("register "+ info.getAppName() +", vip "+ info.getVIPAddress()+", leaseDuration "+ leaseDuration +", isReplication "+ isReplication);

}

this.ctxt.publishEvent(new EurekaInstanceRegisteredEvent(this, info,leaseDuration, isReplication));

super.register(info, leaseDuration, isReplication);

}

super.register(info, leaseDuration, isReplication)是注册方法;

replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication)是同步集群方法。

数据是存贮在concurrenthashmap里:


2. 节点复制

2.1 源码赏析

@Override

public void register(final InstanceInfo info, final boolean isReplication) {

    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;

    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {

        //默认租约90s,如果用户更改了心跳周期等,使用用户自定义的租约

        leaseDuration = info.getLeaseInfo().getDurationInSecs();

    }

    //调用父类的注册,注册到本地双层Map中

    super.register(info, leaseDuration, isReplication);

    //本地注册完成后,向其他节点复制,注意isReplication这个属性

    //用来判断是client发来的注册,还是其他Eureka Server临节点复制过来的注册

    //如果是复制过来的注册信息,那么就不再向其他Eureka Server节点进行传播复制

    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);

private void replicateToPeers(Action action, String appName, String id,

                              InstanceInfo info /* optional */,

                              InstanceStatus newStatus /* optional */, boolean isReplication) {

    Stopwatch tracer = action.getTimer().start();

    try {

        if (isReplication) {

            //记录每分钟收到的复制次数

            numberOfReplicationsLastMin.increment();

        }

        // If it is a replication already, do not replicate again as this will create a poison replication

        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {

            //如果没有Eureka Server临节点,或者是别的Eureka Server复制过来的信息

            //那么就不再向其他临节点进行复制,

            //也就是说既然收到了复制过来的信息,那么其他eureka server节点也会收到

            //所以就没必要再去发送一遍复制了,return。

            return;

        }

        //遍历所有的Eureka Server邻节点,向它们复制register、cancel等信息

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {

            // If the url represents this host, do not replicate to yourself.

            //如果这个节点url是自己的,那么不向自身复制

            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {

                continue;

            }

            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);

        }

    } finally {

        tracer.stop();

    }

2.2 结构图


private void replicateInstanceActionsToPeers(Action action, String appName,

                                            String id, InstanceInfo info, InstanceStatus newStatus,

                                            PeerEurekaNode node) {

    try {

        InstanceInfo infoFromRegistry = null;

        CurrentRequestVersion.set(Version.V2);

        switch (action) {

            case Cancel:

                node.cancel(appName, id);

                break;

            case Heartbeat:

                InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);

                infoFromRegistry = getInstanceByAppAndId(appName, id, false);

                node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);

                break;

            case Register:

                node.register(info);

                break;

            case StatusUpdate:

                infoFromRegistry = getInstanceByAppAndId(appName, id, false);

                node.statusUpdate(appName, id, newStatus, infoFromRegistry);

                break;

            case DeleteStatusOverride:

                infoFromRegistry = getInstanceByAppAndId(appName, id, false);

                node.deleteStatusOverride(appName, id, infoFromRegistry);

                break;

        }

    } catch (Throwable t) {

        //由于此方法是循环复制操作,如果发生异常不进行处理,直接抛出,那么就不会向后面的节点复制了

        //比如有10个Eureka Server节点,再向第2个复制的时候抛出异常,后面8个节点就收不到复制信息

        //这个地方,只是做log,并没有抛出异常

        logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);

    }

六、总结

1. Eureka作为单纯的服务注册中心来说要比zookeeper更加“专业”,因为注册服务更重要的是可用性,我们可以接受短期内达不到一致性的状况

2. Eureka Client的定时任务调用Eureka Server的Reset接口,而Eureka接收到调用请求后会处理服务的注册以及Eureka Server中的数据同步的问题

3.  服务的注册实际上是将服务信息添加到一个map中,map的key是服务名称,value也是一个map,是提供该服务的所有客户端信息;

     服务的续约实际上是获取map中该服务的客户端信息,然后修改其最新更新时间

     服务的下线实际上是删除该map中该服务信息,然后修改服务状态

4. Eureka高可用实质是多个注册中心中信息的传播

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容