soul网关学习10-配置数据同步1-HttpLongPolling_1

前言

我们知道soul-bootstrap作为网关入口,需要能承载这些流量,同时又能实现网关插件功能(路由、限流、熔断)的动态配置,其配置动态生效的原理大致如下:

  1. 网关soul-boostrap会将配置数据存放到内存HashMap中,当流量请求进来的时候,直接从内存中匹配对应配置,从而实现插件功能的逻辑
  2. 网关soul-admin控制台,提供了可视化的管理界面,能够供开发和运维人员维护功能插件的配置数据,该配置数据是会存储到soul-admin数据库的
  3. 通过soul-boostrapsoul-admin中数据同步机制,将配置数据从soul-admin同步到soul-boostrap
  4. 实现流程如下:


    config-data-sync
  5. 源码模块如下:
    soul-sync-data-center

    接下来我们分析HttpLongPolling的配置同步方式。

HttpLongPolling(Http长轮询)

使用

soul-bootstrap

  • 引入依赖,把数据同步相关的starter引入进来
   <!--soul data sync start use zookeeper-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
            <version>${project.version}</version>
        </dependency>

        <!--soul data sync start use websocket-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
            <version>${project.version}</version>
        </dependency>

        <!--soul data sync start use http-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-http</artifactId>
            <version>${project.version}</version>
        </dependency>

        <!--soul data sync start use nacos-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
            <version>${project.version}</version>
        </dependency>
  • 配置


    bootstrap.applicaiton.sync-http

soul-admin

  • 配置


    admin.applcaiton.sync-http

使用注意

  1. 启动soul-bootstrap程序时需注意:采用http长轮询同步,需确保soul-admin是正常启动的,否则会导致soul-bootstrap启动失败
  2. 具体采用哪种同步方式,需注意soul-adminsoul-bootstrap两边配置的同步方式要保持一致,否则会同步异常
  3. 如果soul-bootstrap启动输出如下日志,则表明成功使用HttpLongPoling的数据同步方式
2021-01-24 08:27:45.086  INFO 5667 --- [           main] .s.s.b.s.s.d.h.HttpSyncDataConfiguration : you use http long pull sync soul data
2021-01-24 08:27:45.702  INFO 5667 --- [           main] o.d.s.s.data.http.HttpSyncDataService    : request configs: [http://localhost:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATA]
2021-01-24 08:27:46.324  INFO 5667 --- [onPool-worker-1] o.d.s.s.d.h.refresh.AppAuthDataRefresh   : clear all appAuth data cache

源码分析

soul-bootstrap

  • 引入starter依赖。把四种同步类型的starter全部引入也没有关系,因为最终生效还需要对应属性配置有进行配置才可。具体可参考对应同步类型starter中的xxxSyncDataConfiguration
// 配置生效关系
HttpLongPoling -> HttpSyncDataConfiguration -> soul.sync.http.url
Websocket -> WebsocketSyncDataConfiguration -> soul.sync.websocket.url
Zookeeper -> ZookeeperSyncDataConfiguration -> soul.sync.zookeeper.url
Nacos -> NacosSyncDataConfiguration -> soul.sync.nacos.url
  • 启动类入口为org.dromara.soul.sync.data.http.HttpSyncDataService。其关键逻辑:HttpSyncDataService实例化 -> 启动 start ->拉取配置 fetchGroupConfig -> 执行拉取配置doFetchGroupConfig ->更新内存配置updateCache -> 启动长轮询线程池 -> 线程池长轮询任务HttpLongPollingTask -> 长轮询逻辑实现doLongPolling
  • 更详细一些的逻辑就需要看下面的源码注释拉。
  • HttpSyncDataService实例化
    public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,
                               final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
        this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
        this.httpConfig = httpConfig;
        // 多个admin配置用,分割
        this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
        this.httpClient = createRestTemplate();
        // 实例化过程会调用启动逻辑,启动时admin必须在线,否则会导致bootstrap也启动不起来
        this.start();
    }
  • 启动start
    private void start() {
        // It could be initialized multiple times, so you need to control that.
        if (RUNNING.compareAndSet(false, true)) {
            // fetch all group configs.
            // 初始启动时拉取一次
            this.fetchGroupConfig(ConfigGroupEnum.values());
            // TODO question:  如果admin的配置使用的是负载均衡呢?这个逻辑是不是就会存在问题了?
            // 线程池的大小为admin服务器数目,需要分别使用一个线程去连接每个admin,去做长轮询
            int threadSize = serverList.size();
            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),
                    SoulThreadFactory.create("http-long-polling", true));
            // start long polling, each server creates a thread to listen for changes.
            // 开启长轮询,每个server都创建一个线程去监听配置变化
            // 这里与启动过程拉配置是不一样的,启动过程拉取初始配置,则需要从某一台admin机器拉到就行;
            // 监听配置变更则不一样,因为用户操作时,不知道是在哪台节点上面响应的,所有需要全部监听
            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
        } else {
            log.info("soul http long polling was started, executor=[{}]", executor);
        }
    }

  • 拉取配置fetchGroupConfig -> 执行拉取配置doFetchGroupConfig
    private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
        // 从配置的多台服务器遍历拉取配置,如果只要有一台拉取成功,则直接结束遍历
        for (int index = 0; index < this.serverList.size(); index++) {
            String server = serverList.get(index);
            try {
                this.doFetchGroupConfig(server, groups);
                break;
            } catch (SoulException e) {
                // no available server, throw exception.
                // 如果为最后一台的异常,则向上抛出异常,会中断启动程序
                if (index >= serverList.size() - 1) {
                    throw e;
                }
                log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
            }
        }
    }

    private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
        // 即使有多个group的配置,也是一次请求拉取到的
        StringBuilder params = new StringBuilder();
        for (ConfigGroupEnum groupKey : groups) {
            params.append("groupKeys").append("=").append(groupKey.name()).append("&");
        }
        String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
        log.info("request configs: [{}]", url);
        String json = null;
        try {
            // 拉取admin配置
            json = this.httpClient.getForObject(url, String.class);
        } catch (RestClientException e) {
            String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
            log.warn(message);
            throw new SoulException(message, e);
        }
        // update local cache
        // 拉取成功后,则更新本地配置缓存
        boolean updated = this.updateCacheWithJson(json);
        if (updated) {
            log.info("get latest configs: [{}]", json);
            return;
        }
        // not updated. it is likely that the current config server has not been updated yet. wait a moment.
        log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
        // 若从当前配置服务器没有拉到配置,则睡眠30s后再继续执行
        // TODO questtion: 如果每个server都没有拉到配置,然后都会阻塞30s,会导致整个bootstrap启动很慢,是否考虑将启动时拉取配置逻辑异步化?
        ThreadUtils.sleep(TimeUnit.SECONDS, 30);
    }
  • 更新内存配置updateCache:调用数据更新的策略工厂
/**
     * update local cache.
     * @param json the response from config server.
     * @return true: the local cache was updated. false: not updated.
     */
    private boolean updateCacheWithJson(final String json) {
        JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
        JsonObject data = jsonObject.getAsJsonObject("data");
        // if the config cache will be updated?
        // factory为数据更新的策略功能
        return factory.executor(data);
    }
  • 更新内存配置updateCache:DataRefreshFactory数据更新策略工厂,工厂会调用不同类型数据的更新操作
    public boolean executor(final JsonObject data) {
        // TODO  question 为什么会用一个奇怪的数组?
        final boolean[] success = {false};
        // 对于不同的数据类型,去进行数据更新;这里使用了并行流,实现并行更新配置
        ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
        return success[0];
    }
  • 更新内存配置updateCache:数据更新的类图


    dataRefresh
  • 启动长轮询线程池。参考 start流程后部分
  • 线程池长轮询HttpLongPollingTask
    public void run() {
            // 需判断bootstrap的状态是否正常,避免出现当bootstap停止了,http长轮询的线程还在跑,这是无意义的
            while (RUNNING.get()) {
                for (int time = 1; time <= retryTimes; time++) {
                    try {
                        // 长轮询失败的情况下,会进行重试,每隔5s重试一次,重试3次
                        doLongPolling(server);
                    } catch (Exception e) {
                        // print warnning log.
                        if (time < retryTimes) {
                            log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
                                    time, retryTimes - time, e.getMessage());
                            ThreadUtils.sleep(TimeUnit.SECONDS, 5);
                            continue;
                        }
                        // print error, then suspended for a while.
                        log.error("Long polling failed, try again after 5 minutes!", e);
                        // 3次重试失败后,则先睡眠5min钟,再次去轮询
                        ThreadUtils.sleep(TimeUnit.MINUTES, 5);
                    }
                }
            }
            log.warn("Stop http long polling.");
        }
  • 长轮询逻辑doLongPolling
    private void doLongPolling(final String server) {
        MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
            ConfigData<?> cacheConfig = factory.cacheConfigData(group);
            // 将缓存的md5值与最后更新时间传递到admin
            String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
            params.put(group.name(), Lists.newArrayList(value));
        }
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
        HttpEntity httpEntity = new HttpEntity(params, headers);
        String listenerUrl = server + "/configs/listener";
        log.debug("request listener configs: [{}]", listenerUrl);
        JsonArray groupJson = null;
        try {
            // 调用admin的配置监听接口,如果admin有配置变更则会返回变更的配置数据类型给到bootstrap
            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
            log.debug("listener result: [{}]", json);
            groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
        } catch (RestClientException e) {
            String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
            throw new SoulException(message, e);
        }
        if (groupJson != null) {
            // fetch group configuration async.
            // 如果存在结果返回,则自己再去拉取相应配置
            ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
            if (ArrayUtils.isNotEmpty(changedGroups)) {
                log.info("Group config changed: {}", Arrays.toString(changedGroups));
                // 拉取有变化的group的配置,这里的group有:APP_AUTH、PLUGIN、RULE、SELECTOR、META_DATA
                this.doFetchGroupConfig(server, changedGroups);
            }
        }
    }

至此,soul-bootstrap端的http长轮询就分析完了,接下来看看soul-admin

To be continued ...

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容