spring cloud gateway 二次开发之 ServerListUpdater 服务列表更新

年后不久换了部门,一直在改Bug和优化。。。终于有了点时间,把之前漏下没记录的点慢慢补上

gateway使用ribbon作为服务调用的负载均衡中间件,根据配置的 IRule 对拉取到的服务列表进行负载

而这些真正提供服务的实例是有动态上下线的情况存在的,为了保证轮询到的服务实例能正常访问,ribbon中有一个接口

ServerListUpdater 会定期对服务列表进行更新

在使用 Eureka 作为注册中心的时候,ServerListUpdater有两个实现类:

  • PollingServerListUpdater :定时从注册中心拉取服务列表,如果没有配置,默认为30秒

  • EurekaNotificationServerListUpdater :注册中心中的服务有变动时,通知客户端,EurekaNotificationServerListUpdater是通过添加了一个监听器,当收到注册中心的通知后,做出相应的动作

PollingServerListUpdater 也是 默认的 ServerListUpdater 配置

分别看一下它们的代码实现

PollingServerListUpdater

// 从注册中心拉取服务列表的间隔
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;


 @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {

            // 这里封装了一个进行服务拉取的任务
            final Runnable wrapperRunnable = new Runnable() {
                @Override
                public void run() {
                    if (!isActive.get()) {
                        if (scheduledFuture != null) {
                            scheduledFuture.cancel(true);
                        }
                        return;
                    }
                    try {
                        //这里是进行服务列表更新的动作
                        updateAction.doUpdate();
                        lastUpdated = System.currentTimeMillis();
                    } catch (Exception e) {
                        logger.warn("Failed one update cycle", e);
                    }
                }
            };
           
            // 使用 scheduled 进行定时拉取
            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
                    wrapperRunnable,
                    initialDelayMs,
                    refreshIntervalMs,//这个就是刚才配置的拉取间隔
                    TimeUnit.MILLISECONDS
            );
        } else {
            logger.info("Already active, no-op");
        }
    }


EurekaNotificationServerListUpdater

 @Override
    public synchronized void start(final UpdateAction updateAction) {
        if (isActive.compareAndSet(false, true)) {
            this.updateListener = new EurekaEventListener() {

               // 监听 Eureka发布的事件,然后拉取最新的列表
                @Override
                public void onEvent(EurekaEvent event) {
                    if (event instanceof CacheRefreshedEvent) {
                        if (!updateQueued.compareAndSet(false, true)) {  // if an update is already queued
                            logger.info("an update action is already queued, returning as no-op");
                            return;
                        }

                        if (!refreshExecutor.isShutdown()) {
                            try {
                                refreshExecutor.submit(new Runnable() {
                                    @Override
                                    public void run() {
                                        try {
                                            //这里是进行服务列表更新的动作
                                            updateAction.doUpdate();
                                            lastUpdated.set(System.currentTimeMillis());
                                        } catch (Exception e) {
                                            logger.warn("Failed to update serverList", e);
                                        } finally {
                                            updateQueued.set(false);
                                        }
                                    }
                                });  // fire and forget
                            } catch (Exception e) {
                                logger.warn("Error submitting update task to executor, skipping one round of updates", e);
                                updateQueued.set(false);  // if submit fails, need to reset updateQueued to false
                            }
                        }
                        else {
                            logger.debug("stopping EurekaNotificationServerListUpdater, as refreshExecutor has been shut down");
                            stop();
                        }
                    }
                }
            };
        
      ...以下代码省略

        } else {
            logger.info("Update listener already registered, no-op");
        }
    }

可以看到,这两个 ServerListUpdater 实现类在更新服务列表的时候,都做了同一个动作updateAction.doUpdate()
进入这个方法,发现它是一个定义在ServerListUpdater 中的一个接口

public interface ServerListUpdater {

    /**
     * an interface for the updateAction that actually executes a server list update
     */
    public interface UpdateAction {
        void doUpdate();
    }
 ...以下代码省略

}

而它的方法实现是在 负载均衡器 DynamicServerListLoadBalancer 中定义的

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

   ......代码省略......

    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            // UpdateAction的方法实现,调用了另一个方法
            updateListOfServers();
        }
    };

    
    
    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            //真正进行列表更新的地方
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
                    getIdentifier(), servers);

            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
                        getIdentifier(), servers);
            }
        }
        updateAllServerList(servers);
    }

......代码省略......

}

可以看到,这个 serverListImpl 就是在负载均衡器的ServerList属性,使用这个接口的getUpdatedListOfServers方法进行列表更新,因为我的项目里使用的是自己的注册中心,没有用Eureka,所以也写了一个实现类,参照了使用Eureka的情况下的默认类 DiscoveryEnabledNIWSServerList,让我们看看这个类的代码

 @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        //其实在更新的时候,调用的也是从注册中心拉取列表的方法
        return obtainServersViaDiscovery();
    }
    
   //从Eureka拉取服务
    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        ......代码省略......

        return serverList;
    }

由此我们知道,无论配置哪个 ServerListUpdater,在更新服务列表的时候,都是调用ServerList接口进行一次服务拉取,然后更新本地的列表,只是触发的时机不同:

  • PollingServerListUpdater 30秒拉取一次(时间可以修改)
  • EurekaNotificationServerListUpdater 当服务更新时,通知客户端

那么这两种方法分别有什么弊端呢?

先说说PollingServerListUpdater,如果在拉取的间隔中,有服务下线了,极端情况下,原来所有的实例都不可用,换成了新的一批实例,要等到下一次拉取的时间点才会更新,这样会造成最久30秒的时间服务不可用。 比如:原来5个实例 A、B、C、D、E变成了 F、G、H、I、J,但是由于并没有到更新的时间点,ribbon保存的还是老的服务实例,而这时它们都已下线,无法提供服务。

那如果我们换成,一旦服务变更(这里是上下线都会通知)就通知客户端的 EurekaNotificationServerListUpdater 会怎么样呢,这样好像可以实时的替换成最新的可用实例,保证服务不会打到失效的实例上。可是这会有另一个问题,也是在极端情况下,如果这次通知由于网络问题,没有通知到客户端,那么这次变动过后,如果一直没有服务变更,客户端就再也不会进行服务的拉取,这个时候造成不可用的时间就难以预估了。比如:在9:00的时候,A、B、C、D、E服务全部下线,F、G、H、I、J上线,Eureka通知客户端,但网络抖动,客户端没有收到,或者客户端收到了,拉取时候失败,并没有更新本地的列表,这样只有等到下次收到通知时才会去拉取,假设接下来服务很稳定,12:00的时候才有一次更新,这样就有3个小时的服务不可用。

那么有没有什么办法既可以拥有2个实现类的优势,又摒弃它们的弊端呢?有的!
小孩子才做选择,我们大人是全都要!那就是自己写一个ServerListUpdater的实现类,然后:

  • 1 启动一个定时任务,定时从注册中心拉取,而这个拉取间隔也可以根据自己的预估进行修改。
  • 2 注册一个监听器,可以收到注册中心的服务变更通知
  • 3 在配置中,把默认的 ServerListUpdater改成自己写的 ServerListUpdater,这样ribbonConfig类在看到有ServerListUpdater的实现类的情况下,就不会加载 Eureka的 ServerListUpdater了。

实现类的代码太长,就不贴了,逻辑很清晰,可以参考PollingServerListUpdater 和 EurekaNotificationServerListUpdater,把他们的代码照着写就行

接下来就是配置,在配置类中添加上自己的实现类

/**
     * 服务更新通知机制
     * @param notificationService
     * @param clientConfig
     * @return
     */
    @Bean
    public ServerListUpdater ribbonServerListUpdater(NotificationServiceImpl notificationService, IClientConfig clientConfig) {
        return new PollingNotificationServerListUpdater(notificationService, clientConfig);
    }

这样就实现了轮询和通知并存的形式。
打完收工!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容