九、soul源码学习-http长轮训数据同步机制详解

上一节讲了数据持久化后,发送事件后,Spring监听到事件后,做了什么事,并看到现有四种数据同步机制。这节具体加一下http长轮训

org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener http长轮训数据监听器

先看下构造器:在构造器中,构造了一个1024长度的阻塞队列,以及一个ScheduledThreadPoolExecutor,并初始化HttpSyncProperties,

/**
     * Blocked client.
     */
    private final BlockingQueue<LongPollingClient> clients;

    private final ScheduledExecutorService scheduler;

    private final HttpSyncProperties httpSyncProperties;

    /**
     * Instantiates a new Http long polling data changed listener.
     * @param httpSyncProperties the HttpSyncProperties
     */
    public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
        this.clients = new ArrayBlockingQueue<>(1024);
        this.scheduler = new ScheduledThreadPoolExecutor(1,
                SoulThreadFactory.create("long-polling", true));
        this.httpSyncProperties = httpSyncProperties;
    }

HttpSyncProperties主要是http同步的配置

@Getter
@Setter
@ConfigurationProperties(prefix = "soul.sync.http")
public class HttpSyncProperties {

    /**
     * Whether enabled http sync strategy, default: true.
     */
    private boolean enabled = true;

    /**
     * Periodically refresh the config data interval from the database, default: 5 minutes.
     */
    private Duration refreshInterval = Duration.ofMinutes(5);

}

主要定义了http同步开关以及刷新周期。 类初始化之后,更新各种数据缓存,然后执行了一个定时任务,每次调用refreshLocalCache刷新本地缓存

@Override
public final void afterPropertiesSet() {
  updateAppAuthCache();
  updatePluginCache();
  updateRuleCache();
  updateSelectorCache();
  updateMetaDataCache();
  afterInitialize();
}

@Override
protected void afterInitialize() {
  long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
  // Periodically check the data for changes and update the cache
  scheduler.scheduleWithFixedDelay(() -> {
    log.info("http sync strategy refresh config start.");
    try {
      this.refreshLocalCache();
      log.info("http sync strategy refresh config success.");
    } catch (Exception e) {
      log.error("http sync strategy refresh config error!", e);
    }
  }, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
  log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
private void refreshLocalCache() {
  this.updateAppAuthCache();
  this.updatePluginCache();
  this.updateRuleCache();
  this.updateSelectorCache();
  this.updateMetaDataCache();
}

这些方法要做的事情都很类似,就是从数据库拿到对应ConfigGroup的所有配置,并更新本地缓存,比如看updateAppAuthCache,就是将当前数据库的配置更新到本地缓存,至于具体为什么要更新到本地缓存,我们后面分晓。

//org.dromara.soul.admin.listener.AbstractDataChangedListener
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
/**
     * Update app auth cache.
     */
protected void updateAppAuthCache() {
  this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll());
}

protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
  String json = GsonUtils.getInstance().toJson(data);
  ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
  ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
  log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}

之前我们看到,当数据事件变化监听器分发者,监听到事件后,会调用各个监听器的对应方法:

//org.dromara.soul.admin.listener.DataChangedEventDispatcher
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
  for (DataChangedListener listener : listeners) {
    switch (event.getGroupKey()) {
      case APP_AUTH:
        listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
        break;
      case PLUGIN:
        listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
        break;
      case RULE:
        listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
        break;
      case SELECTOR:
        listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
        break;
      case META_DATA:
        listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
        break;
      default:
        throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
    }
  }
}

那么在长轮训机制下,主要做了如下事情,还拿AppAuth看下

@Override
public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
  if (CollectionUtils.isEmpty(changed)) {
    return;
  }
  this.updateAppAuthCache();
  this.afterAppAuthChanged(changed, eventType);
}

接收到变更数据后,会先更新下对应的内存缓存,然后再做数据变更。

//org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#afterAppAuthChanged
@Override
protected void afterAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
  scheduler.execute(new DataChangeTask(ConfigGroupEnum.APP_AUTH));
}

这里看到,通过线程池执行一个数据变化任务

        /**
     * When a group's data changes, the thread is created to notify the client asynchronously.
     */
    class DataChangeTask implements Runnable {

        /**
         * The Group where the data has changed.
         */
        private final ConfigGroupEnum groupKey;

        /**
         * The Change time.
         */
        private final long changeTime = System.currentTimeMillis();

        /**
         * Instantiates a new Data change task.
         *
         * @param groupKey the group key
         */
        DataChangeTask(final ConfigGroupEnum groupKey) {
            this.groupKey = groupKey;
        }

        @Override
        public void run() {
          //循环所有的LongPollingClient,并调用了sendResponse
            for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
                LongPollingClient client = iter.next();
                iter.remove();
                client.sendResponse(Collections.singletonList(groupKey));
                log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
            }
        }
    }

我们先来看下LongPollingClient是个什么东东,它主要有以下几个属性,一个异步的上下文,ip,超时时间和异步结果Future。LongPollingClient本身实现了一个Runnable接口

class LongPollingClient implements Runnable {           
                /**
         * The Async context.
         */
        private final AsyncContext asyncContext;

        /**
         * The Ip.
         */
        private final String ip;

        /**
         * The Timeout time.
         */
        private final long timeoutTime;

        /**
         * The Async timeout future.
         */
        private Future<?> asyncTimeoutFuture;

我们再来看下run方法:这方法较难看懂。

@Override
public void run() {
  //通过org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener的ScheduledExecutorService scheduler延迟执行一个一次性的动作,延迟时间是timeoutTime毫秒,当延迟动作开始执行时,将当前的LongPollingClient对象从clients中移除
  this.asyncTimeoutFuture = scheduler.schedule(() -> {
    clients.remove(LongPollingClient.this);
    //1.1
    List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
    //1.2
    sendResponse(changedGroups);
  }, timeoutTime, TimeUnit.MILLISECONDS);
  //将当前对象加入到clients中
  clients.add(this);
}

这里LongPollingClient.this之前没有见到过,主要是当我们在一个类的内部类中,如果需要访问外部类的方法或者成员域的时候,如果直接使用 this.成员域(与 内部类.this.成员域 没有分别) 调用的显然是内部类的域 , 如果我们想要访问外部类的域的时候,就要必须使用 外部类.this.成员域

package com.test;
public class TestA 
{    
    public void tn()
    {          
        System.out.println("外部类tn");         
    }  
    Thread thread = new Thread(){     
          public void tn(){System.out.println("inner tn");}        
          public void run(){           
                 System.out.println("内部类run");        
                 TestA.this.tn();//调用外部类的tn方法。          
                 this.tn();//调用内部类的tn方法           
             }    
     };          
     public static void main(String aaa[])
     {new TestA().thread.start();}
}

1.1 compareChangedGroup具体做了什么,先不要关注HttpServletRequest是从哪来的,这里也看出了我们本地Cache的作用是什么

private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
  List<ConfigGroupEnum> changedGroup = new ArrayList<>(4);
  for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
    // 针对每一个group获取的对应的参数
    String[] params = StringUtils.split(request.getParameter(group.name()), ',');
    if (params == null || params.length != 2) {
      throw new SoulException("group param invalid:" + request.getParameter(group.name()));
    }
    //参数第一位时client端的Md5值, 第二位时client端的修改时间戳
    String clientMd5 = params[0];
    long clientModifyTime = NumberUtils.toLong(params[1]);
    //获取本地缓存的配置
    ConfigDataCache serverCache = CACHE.get(group.name());
    // 检查是否需要更新服务器的缓存配置
    //1.1.1
    if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
      changedGroup.add(group);
    }
  }
  return changedGroup;
}

1.1.1 checkCacheDelayAndUpdate

private boolean checkCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, final long clientModifyTime) {

  // 如果md5相等,说明配置相同,不需要更新
  if (StringUtils.equals(clientMd5, serverCache.getMd5())) {
    return false;
  }

  // 如果md5值不等,说明服务器的配置和客户端的缓存不一致
  long lastModifyTime = serverCache.getLastModifyTime();
  //在比对下服务器配置是否比客户端的更新
  if (lastModifyTime >= clientModifyTime) {
    // 如果更新,说明客户端的配置是旧的,需要更新
    return true;
  }

  // 如果服务端的缓存配置,比客户端的配置还要老,那么说明,服务端的缓存配置需要更新了
  // 这里soul考虑到并发问题,如果多个client都来soul拉取最新配置,而当前的soul-admin配置因为都会走到这里,那么如果我们不加锁的话,会导致,同时走到后面的refreshLocalCache,而refreshLocalCache我们前面看到是需要查询数据库并更新到本地缓存的,那么会导致大量的sql查询给数据库带来压力,所以这里加了一个锁,并设置了超时时间
  boolean locked = false;
  try {
    locked = LOCK.tryLock(5, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return true;
  }
  if (locked) {
    try {
      //这里在拿到锁以后,先去本地缓存再拿一遍最新的缓存配置,与刚才获取到的配置做下对比,如果发现不相等,说明之前获取到锁之前已经有数据更新到缓存,
      ConfigDataCache latest = CACHE.get(serverCache.getGroup());
      if (latest != serverCache) {
        // 在判断当前的最新配置和客户端配置的Md5是否一致.
        return !StringUtils.equals(clientMd5, latest.getMd5());
      }
      // 更新缓存数据
      this.refreshLocalCache();
      //拿到最新的配置
      latest = CACHE.get(serverCache.getGroup());
      //比对
      return !StringUtils.equals(clientMd5, latest.getMd5());
    } finally {
      LOCK.unlock();
    }
  }
  // 没有获取到锁,默认当成需要更新处理
  return true;

}

上面的代码,看出了soul设计的代码的精妙之处

接着上面代码,1.1之后,会调用sendResponse(changedGroups);

void sendResponse(final List<ConfigGroupEnum> changedGroups) {
  // 这里逻辑场景就是上面我们刚开始跟过来的DataChangeTask执行的run里面,对所有client的主动触发的场景,这里是想取消掉client的run执行时候的延迟动作,防止重复运行,具体原因还需要在往后看
  if (null != asyncTimeoutFuture) {
    asyncTimeoutFuture.cancel(false);
  }
  //生成response,aysncContext完成
  generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
  asyncContext.complete();
}

通过上面的源码分析。我们现在主要有几个疑惑点:

  1. AsyncContext到底是干嘛的?
  2. 为什么是直接生成的Response返回?
  3. Client是什么时候添加到org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#clients里面的

带着这几个问题,我们在看下一篇文章

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容