初探guava cache实现

王二北原创,转载请标明出处:来自[王二北]

1、简单介绍guava cache

guava cache是Google guava中提供的一款轻量级缓存组件,最近项目中用到了guava cache做本地缓存,之所以选择guava cache,原因是guava cache够轻量、够简单、key过期和内存管理机制也较为完善,可扩展性强。
并且相比于redis等分布式缓存,guava cahce有以下优点:
1、减少了网络调用的开销
2、减少了分布式缓存中客户端和服务端的序列化和反序列化。

下面是guava cache的一个简单例子:

 LoadingCache<String, String> cache = CacheBuilder.newBuilder()
                // 当cache中元素个数超过100时,则使用LRU进行内存回收
                .maximumSize(100)
                // 设置cache基于比重大小来清理,这个和maximumSize不可同时使用!!!
                // .maximumWeight(1000)
                // 如果使用基于比重的清理策略,最好自己实现weigher,来指定某些k/v占的比重
//                .weigher(new Weigher<String, String>() {
//                    @Override
//                    public int weigh(String key, String value) {
//                        return 1;
//                    }
//                })
                // 缓存项在给定时间内没有被进行写操作,则回收这个数据项占用内存
                .expireAfterWrite(5, TimeUnit.SECONDS)
                // 缓存项在给定时间内没有被访问(读/写操作),则回收这个数据项占用内存
                .expireAfterAccess(10,TimeUnit.SECONDS) 缓存项在给的时间内没有进行写操作(创建/更新值),则刷新缓存, 调用reload()去重新加载数据
                .refreshAfterWrite(15, TimeUnit.SECONDS)
                // 删除监听器,当缓存被删除时,会触发onRemoval()方法
                // RemovalNotification是删除通知,其中包含移除原因[RemovalCause]、键和值。
                .removalListener(new RemovalListener<String, String>() {
                    @Override
                    public void onRemoval(RemovalNotification<String, String> notification) {
                        System.out.println("onRemoval execute: key="+notification.getKey()+",value="+notification.getValue()+" was deleted,cause="+notification.getCause().name());
                    }
                })

                /**
                 * recordStats用来开启Guava Cache的统计功能,用于统计缓存命中率、命中次数等值。
                 * 统计打开后,使用Cache.stats()方法会返回CacheStats对象以提供如下统计信息:
                 * hitRate():缓存命中率;
                 * averageLoadPenalty():加载新值的平均时间,单位为纳秒;
                 * evictionCount():缓存项被回收的总数,不包括显式清除。
                 */
                .recordStats()
                .build(
                        // CacheLoader用于处理load, reload等逻辑
                        new CacheLoader<String, String>() {
                            public String load(String key)  {
                                System.out.println("load .......");
                                 return key + new Date().toString();
                            }

                            //重载CacheLoader.reload(K, V)可以扩展刷新时(调用Cache#refresh()方法时)的行为,
                            // 这个方法允许在获取新值时返回旧的值。
                            @Override
                            public ListenableFuture<String> reload(String key, String oldValue) throws Exception {
                                System.out.println("reload cache:  key="+key+",oldValue="+oldValue);
                                return super.reload(key, oldValue);
                            }
                        });

2、对guava cache的疑惑

虽然guava cache很“小”,很轻量,但是在使用的过程中,我一直对其中的一些机制感到疑惑:

(1)guava cache有几个重要的参数:expireAfterAccess、expireAfterWrite、refreshAfterWrite,第一个和第二个参数从字面上理解,分别表示一个key对应的value,多久没有访问就会过期和多久没有进行写操作就会过期,第三个参数表示写操作多久后进行刷新。 那么这三个参数是如何管理cache中数据的有效性的呢,实际应用中我该如何设置呢?

(2)guava cache官方文档介绍说,guava cache本身并不会开启一个线程去维护缓存, 而是在访问key时顺便检查一下key的有效期,然后该过期的过期,该刷新缓存的刷缓存,那么在多线程访问的情况下,需要注意哪些问题呢?如果要实现主动过期,该如何实现呢?

要回答第一个问题,最好从本质上,也就是从源码层面来解答这些问题。

注:如果你感觉源码比较枯燥,可以直接跳过2.1小节,从2.2小节看起。

2.1、从源码中探究问题的答案

首先看LoadindgCache,它是一个接口(又继承了Cache接口),接口中定义了get/getUnchecked/refresh等跟缓存存取相关的方法。

@GwtCompatible
public interface LoadingCache<K, V> extends Cache<K, V>, Function<K, V>{
    //xxxx
}

LoadingCache实现类的结构如下:
[图片上传失败...(image-3237d0-1571749293092)]

那么我们会用到哪个类呢?回头查看上面的例子,在调用构建器CacheBuilder#build方法时,返回LoadingCache对象,查看一下build方法源码:

public <K1 extends K, V1 extends V> LoadingCache<K1, V1> build(
      CacheLoader<? super K1, V1> loader) {
    checkWeightWithWeigher();
    return new LocalCache.LocalLoadingCache<>(this, loader);
  }

可以看到,cache使用的是LocalLoadingCache这个实现类。

我们平常用的最多的,也就是cache.get()方法,那么就从LocalLoadingCache入手,查看一下其对应的get(xxx)方法:

   // 其get方法调用的是LocalCache的getOrLoad方法
    @Override
    public V get(K key) throws ExecutionException {
      return localCache.getOrLoad(key);
    }
    
    // LocalCache中getOrLoad内调用了get
    V getOrLoad(K key) throws ExecutionException {
        return get(key, defaultLoader);
    }
  
    // get方法内部的实现,看着是不是有点熟悉,有点像concurrentHashMap
    V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
        // 注意,key不可为空
        int hash = hash(checkNotNull(key));
        return segmentFor(hash).get(key, hash, loader);
    }

当看到上面的LocalCache中的V get(K key, CacheLoader<? super K, V> loader)方法时,突然感觉有点眼熟,这个又是hash,又是segment的,怎么感觉和ConcurrentHashMap很像,接着往下看,首先取hash值的方法实现和hashMap等中取hash的方式大同小异,这里不是重点,接着看segmentFor(hash)

 // 通过hash得到Segment数组下标,通过下标得到segment数组的元素
 final Segment<K, V>[] segments;
 Segment<K, V> segmentFor(int hash) {
    // TODO(fry): Lazily create segments?
    return segments[(hash >>> segmentShift) & segmentMask];
  }

Segment是LocalCache的内部类,在创建Sement时,会初始化一个AtomicReferenceArray实现的可并发读写的数组,数组元素是ReferenceEntry:
[图片上传失败...(image-7bc62a-1571749293092)]

// 初始化线程安全的数组,数组元素是ReferenceEntry(也就是链表中的头结点)
 void initTable(AtomicReferenceArray<ReferenceEntry<K, V>> newTable) {
      this.threshold = newTable.length() * 3 / 4; // 0.75
      if (!map.customWeigher() && this.threshold == maxSegmentWeight) {
        // prevent spurious expansion before eviction
        this.threshold++;
      }
      this.table = newTable;
    }
AtomicReferenceArray<ReferenceEntry<K, V>> newEntryArray(int size) {
      return new AtomicReferenceArray<>(size);
}

ReferenceEntry是一个接口,有基于强引用和弱引用的实现,这里不再展开讲,后面再写一篇去讲,下面是ReferenceEntry的定义的方法,可以看出其也是链表的形式:
[图片上传失败...(image-a5b567-1571749293092)]

由此可见LoadingCache的内部数据结构和ConcurrentHashMap是类似的。
[图片上传失败...(image-893cc4-1571749293092)]

接着分析get方法,前面讲到get的内部实现是:

 // get方法内部的实现
    V get(K key, CacheLoader<? super K, V> loader) throws ExecutionException {
        // 注意,key不可为空
        int hash = hash(checkNotNull(key));
        // 通过hash定位到segment数组的某个Segment元素,然后调用其get方法
        return segmentFor(hash).get(key, hash, loader);
    }
    
    // LocalCache内部类Segment的get方法实现
    V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      checkNotNull(key);
      checkNotNull(loader);
      try {
        if (count != 0) { // read-volatile
          // 内部也是通过找Entry链数组定位到某个Entry节点
          ReferenceEntry<K, V> e = getEntry(key, hash);
          if (e != null) {
            long now = map.ticker.read();
            // 根据key获取值,这个方法中也会对值进行一些失效数据的清理
            // 另外需要注意的是,这个方法中不会去load数据。
            // 如果过期的值在这一步被清理了,则下面回去load
            V value = getLiveValue(e, now);
            if (value != null) {
              // 更新key的AccessTime
              recordRead(e, now);
              // 记录key命中
              statsCounter.recordHits(1);
              // 这个方法中会判断当前key是否到了需要刷新(reload)的时间,并进行刷新
              // 具体参考后面的scheduleRefresh()源码
              return scheduleRefresh(e, key, hash, value, now, loader);
            }
            // 当值不存在时(有可能是前面过期时清理了),并且有其他线程正在加载这个数据时,则去同步等待load数据
            ValueReference<K, V> valueReference = e.getValueReference();
            if (valueReference.isLoading()) {
              return waitForLoadingValue(e, key, valueReference);
            }
          }
        }

        // 如果前面值不存在,或者值已过期被清理(且前面isLoading没有成立),则会执行到这一步
        return lockedGetOrLoad(key, hash, loader);
      } catch (ExecutionException ee) {
        Throwable cause = ee.getCause();
        if (cause instanceof Error) {
          throw new ExecutionError((Error) cause);
        } else if (cause instanceof RuntimeException) {
          throw new UncheckedExecutionException(cause);
        }
        throw ee;
      } finally {
        postReadCleanup();
      }
    }

这里主要关注上面代码中getLiveValue()、scheduleRefresh()、lockedGetOrLoad()和 lockedGetOrLoad()方法:

先看getLiveValue()方法的源码:

 /**
 * 这个方法会触发一些失效数据的清理,但不会去load数据
 *  (1)清理《整个segment中》所有非强引用类型中,被gc回收的key或value对应的entry
 *  (2)清理《整个segment中》所有过期失效的数据。
 */
 V getLiveValue(ReferenceEntry<K, V> entry, long now) {
      // 下面两步是对那些不是强引用类型的key或value被垃圾回收后,去清理链中对应的entry。这两步体现了guava cache并不会主动去清理失效的数据,而是在访问数据时才被动去清理。
      //( 关于key或value的源码在下面,这里不做深入探讨,如果需要可以自行查看。)
      //比如key是一个弱引用,gc后,key对象被回收了,这个key对于的entry就会被放到一个keyReferenceQueue中,每次get操作时,如果当前key是null,则就触发一次遍历这个keyReferenceQueue,去清理entry链中对于的entry节点。
      if (entry.getKey() == null) {
        tryDrainReferenceQueues();
        return null;
      }
      // value也一样,也是遍历对应的queue
      V value = entry.getValueReference().get();
      if (value == null) {
        tryDrainReferenceQueues();
        return null;
      }
      // 首先调用map.isExpired判断当前entry是否过期,如果已经过期,则触发过期清理,具体的源码在下面
      if (map.isExpired(entry, now)) {
        tryExpireEntries(now);
        return null;
      }
      return value;
    }

   // 下面是对应key/value 不是强引用类型时的处理 逻辑
    /**
     * The key reference queue contains entries whose keys have been garbage collected, and which
     * need to be cleaned up internally.
     */
    final @Nullable ReferenceQueue<K> keyReferenceQueue;

    /**
     * The value reference queue contains value references whose values have been garbage collected,
     * and which need to be cleaned up internally.
     */
    final @Nullable ReferenceQueue<V> valueReferenceQueue;
    
/** Cleanup collected entries when the lock is available. */
    void tryDrainReferenceQueues() {
      if (tryLock()) {
        try {
          drainReferenceQueues();
        } finally {
          unlock();
        }
      }
    }
    // 判断key或value是否是非强引用类型
     void drainReferenceQueues() {
      if (map.usesKeyReferences()) {
        drainKeyReferenceQueue();
      }
      if (map.usesValueReferences()) {
        drainValueReferenceQueue();
      }
    }
    
  boolean usesKeyReferences() {
    return keyStrength != Strength.STRONG;
  }

  boolean usesValueReferences() {
    return valueStrength != Strength.STRONG;
  }

下面是guava cache中关于过期数据的处理过程:

// 下面是guava cache中判断一个entry是否过期的处理
  boolean isExpired(ReferenceEntry<K, V> entry, long now) {
    checkNotNull(entry);
    // 首先判断accessTime是否设置,如果设置则判断当前是否已达到accessTiMe过期条件
    if (expiresAfterAccess() && (now - entry.getAccessTime() >= expireAfterAccessNanos)) {
      return true;
    }
    // 然后判断afterWrite是否已设置,如果设置则判断当前是否已达到writeTime过期条件
    if (expiresAfterWrite() && (now - entry.getWriteTime() >= expireAfterWriteNanos)) {
      return true;
    }
    return false;
  }
    

   // 尝试清理过期的数据,这一步会加锁
    void tryExpireEntries(long now) {
      if (tryLock()) {
        try {
          expireEntries(now);
        } finally {
          unlock();
          // don't call postWriteCleanup as we're in a read
        }
      }
    }
    
    // 注意每次触发清理时,会清理<整个segment中>所有已经过期的数据,注意是segment中所有的,不是只当前key。
    void expireEntries(long now) {
      drainRecencyQueue();
      // 清理时,会遍历两个queue,去判断过期的entry,然后清理掉(这样的好处比遍历整个map去判断过期再清理效率要高很多,也算是空间换时间的一种手段)。
      // 这两个queue,一个是writequeue,它记录了最近被write过的entry(是按write时间排序的)
      // 另一个是accessQueue,它记录了最近被访问过的entry(是按访问时间排序)
      ReferenceEntry<K, V> e;
      while ((e = writeQueue.peek()) != null && map.isExpired(e, now)) {
        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
          throw new AssertionError();
        }
      }
      while ((e = accessQueue.peek()) != null && map.isExpired(e, now)) {
        if (!removeEntry(e, e.getHash(), RemovalCause.EXPIRED)) {
          throw new AssertionError();
        }
      }
    }

下面是判断是否需要刷新的逻辑

/*
* 如果当前key,已经到了需要刷新的时间,则调用reload进行刷新。
* 并且,同一时刻,只会有一个线程去刷新。其他线程判断到当前key正在被别的线程刷新,则会返回旧的值。
**/
V scheduleRefresh(
        ReferenceEntry<K, V> entry,
        K key,
        int hash,
        V oldValue,
        long now,
        CacheLoader<? super K, V> loader) {
      // 如果设置了refreshAfterWrite,并且刷新时间点已到,则进行刷新操作,也就是reload
      // 并且,如果有其他线程正在进行load操作(isloading为true),则直接返回旧的值。
      // 这里提前剧透一下,同一时刻只会有一个线程去reload数据,其他线程直接返回旧的值。这一点需要和后面要讲的load区分,load是线程阻塞的
      // 那么这个isloading是怎么为true的呢,接着看下面的refresh
      if (map.refreshes()
          && (now - entry.getWriteTime() > map.refreshNanos)
          && !entry.getValueReference().isLoading()) {
          // refresh方法,刷新开始
        V newValue = refresh(key, hash, loader, true);
        if (newValue != null) {
          return newValue;
        }
      }
      // 如果已经有其他线程在加载当前key对应的新数据,当前线程直接返回旧的数据
      return oldValue;
    }
    
    // 下面是实现key对应数据刷新的方法实现
     V refresh(K key, int hash, CacheLoader<? super K, V> loader, boolean checkTime) {
      // 这一步insertLoadingValueReference()方法就是isloading的关键,具体实现参考下方
      final LoadingValueReference<K, V> loadingValueReference =
          insertLoadingValueReference(key, hash, checkTime);
      if (loadingValueReference == null) {
        return null;
      }
      // 异步的去加载数据(guava内部是使用当前线程同步调用,如果要实现真正的异步,需要重写reload方法!!!!)
      ListenableFuture<V> result = loadAsync(key, hash, loadingValueReference, loader);
      // 如果不能立即返回(异步任务还没有执行完),则会返回null,外层判断异步得到的值为null,则会返回旧的值
      if (result.isDone()) {
        try {
          return Uninterruptibles.getUninterruptibly(result);
        } catch (Throwable t) {
          // don't let refresh exceptions propagate; error was already logged
        }
      }
      return null;
    }
    
    // 下面方法主要实现了两个功能
    // 1、保证同一时刻只能有一个线程对同一个key进行刷新
    // 2、标记当前key正在loading
    LoadingValueReference<K, V> insertLoadingValueReference(
        final K key, final int hash, boolean checkTime) {
      ReferenceEntry<K, V> e = null;
      // 首先加锁, 注意,此处不是tryLock,而是lock,并且锁是同一把锁,也就说,获得不到锁时,则会阻塞。如果有其他线程,在reload别的key,也会被阻塞。
      (该锁并不是以key为单位的,而是以segment为单位的,因此如果当前segment中有其他线程在刷新其他key,会导致当前线程阻塞)
      lock();
      try {
        long now = map.ticker.read();
        // 这一步也会执行无效数据的清理
        preWriteCleanup(now);
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);

        // 首先通过key找到对应的entry
        for (e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            // entry存在,则判断当前entry是否正在被其他线程执行加载数据
            或者当前entry并未过期,则直接返回null
            ValueReference<K, V> valueReference = e.getValueReference();
            if (valueReference.isLoading()
                || (checkTime && (now - e.getWriteTime() < map.refreshNanos))) {
              return null;
            }
            
            // 如果没有其他线程正在操作,则创建一个新的LoadingValueReference给这个entry,新LoadingValueReference中的isLoading()方法就只有一行代码,return true.(看下方的源码截图)
            ++modCount;
            LoadingValueReference<K, V> loadingValueReference =
                new LoadingValueReference<>(valueReference);
            e.setValueReference(loadingValueReference);
            return loadingValueReference;
          }
        }
        // 有可能在进行refresh操作之前,key对应entry已经被清理掉,因此这里会重新创建一个entry,后续会接着加载数据。
        ++modCount;
        LoadingValueReference<K, V> loadingValueReference = new LoadingValueReference<>();
        e = newEntry(key, hash, first);
        e.setValueReference(loadingValueReference);
        table.set(index, e);
        return loadingValueReference;
      } finally {
        unlock();
        postWriteCleanup();
      }
    }
    
    // 下面代码的意思是异步(需重写reload实现真正的异步)的去加载数据,返回一个future,表示异步处理
    ListenableFuture<V> loadAsync(
        final K key,
        final int hash,
        final LoadingValueReference<K, V> loadingValueReference,
        CacheLoader<? super K, V> loader) {
      final ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
      loadingFuture.addListener(
          new Runnable() {
            @Override
            public void run() {
              try {
                getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
              } catch (Throwable t) {
                logger.log(Level.WARNING, "Exception thrown during refresh", t);
                loadingValueReference.setException(t);
              }
            }
          },
          directExecutor());
      return loadingFuture;
    }
    // 异步加载的处理过程(默认仍是同步,需重写reload重写实现异步)
    public ListenableFuture<V> loadFuture(K key, CacheLoader<? super K, V> loader) {
      try {
        stopwatch.start();
        V previousValue = oldValue.get();
        // 如果旧数据为空,则调用load()方法加载数据
        if (previousValue == null) {
          V newValue = loader.load(key);
          return set(newValue) ? futureValue : Futures.immediateFuture(newValue);
        }
        // 否则,调用reload()加载数据,reload内部默认同步调用load()
        ListenableFuture<V> newValue = loader.reload(key, previousValue);
        if (newValue == null) {
          return Futures.immediateFuture(null);
        }
        // To avoid a race, make sure the refreshed value is set into loadingValueReference
        // *before* returning newValue from the cache query.
        return transform(
            newValue,
            new com.google.common.base.Function<V, V>() {
              @Override
              public V apply(V newValue) {
                LoadingValueReference.this.set(newValue);
                return newValue;
              }
            },
            directExecutor());
      } catch (Throwable t) {
        ListenableFuture<V> result = setException(t) ? futureValue : fullyFailedFuture(t);
        if (t instanceof InterruptedException) {
          Thread.currentThread().interrupt();
        }
        return result;
      }
    }
    
  // reload的默认实现就是同步调用load(),因此guava cache官方推荐重新override 这个方法,自己实现多线程异步处理(比如从线程池中取出一个线程去处理)
  // It is recommended that it be overridden with an asynchronous implementation when using CacheBuilder#refreshAfterWrite
  public ListenableFuture<V> reload(K key, V oldValue) throws Exception {
    checkNotNull(key);
    checkNotNull(oldValue);
    return Futures.immediateFuture(load(key));
  }

[图片上传失败...(image-cd7f6-1571749293092)]

最后看一下lockedGetOrLoad实现

 // 这是最后一步,如果前面判断到值不存在,或者值已过期被清理,则会执行这一步
 // 这一步总的来说会执行下面操作
 // 1、再次执行清理操作,清理过期失效、非强引用被gc回收等数据
 // 2、通过key再找一次对应的value,如果value此时已经存在,并且没有过期,则直接返回,如果value不存在或者已经过期,则将entry销毁原因加入到统计队列 中。这样做的目的其实就是一个双重校验,防止其他线程并发存入了数据
 // 3、如果经过2找不到value,则同步调用load操作去加载数据(loadSync)
 // 4、如果经过2判断到value正在被其他线程loading,则当前线程同步等待loading结束,返回结果
 V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
      ReferenceEntry<K, V> e;
      ValueReference<K, V> valueReference = null;
      LoadingValueReference<K, V> loadingValueReference = null;
      boolean createNewEntry = true;
      // 第一步加锁,这个锁也是segment级别的,如果同一个key或同一个segment中对应并发较大,则此处竞争就会比较大
      lock();
      try {
        // re-read ticker once inside the lock
        long now = map.ticker.read();
        // 再尝试清理下面三类数据:
        // 1、过期失效的数据 2、非强引用gc被回收的entry
        // 3、也会去清理recencyQueue中的数据,每access一次某个值,这个值就会被放入recencyQueue队列,这个队列的作用是用来帮助更新accessQueue这个队列,每个值被读取一次就会放入这个队列的尾部
        preWriteCleanup(now);

        int newCount = this.count - 1;
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);
        // 这里为了防止中间有其他线程并发操作了当前key,需要再次根据key找到对应的entry链表,进行遍历查找,判断对应的value
        for (e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            valueReference = e.getValueReference();
            // 如果找到相等的key,并且key的value正在加载中(其他线程并发操作的)
            // 则将createNewEntry设置为false,表示当前线程没有创建新的entry
            if (valueReference.isLoading()) {
              createNewEntry = false;
            } else {
             
              V value = valueReference.get();
              // 如果value为null,则表示value是因为非强引用遇到gc回收导致的null值情况,需要加入到统计队列中
              if (value == null) {
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.COLLECTED);
              //如果找到了value,当前值已过期,则也加入到统计通知队列中
              } else if (map.isExpired(e, now)) {
                // This is a duplicate check, as preWriteCleanup already purged expired
                // entries, but let's accomodate an incorrect expiration queue.
                enqueueNotification(
                    entryKey, hash, value, valueReference.getWeight(), RemovalCause.EXPIRED);
                // 如果找到了value,并且value没有过期,则直接返回
              } else {
                recordLockedRead(e, now);
                statsCounter.recordHits(1);
                // we were concurrent with loading; don't consider refresh
                return value;
              }

              // immediately reuse invalid entries
              writeQueue.remove(e);
              accessQueue.remove(e);
              this.count = newCount; // write-volatile
            }
            break;
          }
        }

        // 如果需要重新创建entry(前面判断得到的value为空,或者value已过期),则创建新ValueReference,并设置到链表中,需要注意的是,这个只是设置valueReference,并不是正在的去加载值,同步加载值的操作是在下面loadSync
        if (createNewEntry) {
          loadingValueReference = new LoadingValueReference<>();

          if (e == null) {
            e = newEntry(key, hash, first);
            e.setValueReference(loadingValueReference);
            table.set(index, e);
          } else {
            e.setValueReference(loadingValueReference);
          }
        }
      } finally {
        unlock();
        postWriteCleanup();
      }
     // 前面设置了ValueReference后,这一步进行同步的加载获取值
      if (createNewEntry) {
        try {
          // Synchronizes on the entry to allow failing fast when a recursive load is
          // detected. This may be circumvented when an entry is copied, but will fail fast most
          // of the time.
          // 同步获取结果值,会对entry加锁,当多个线程同时操作同一个key时,会有同步阻塞等待。
          synchronized (e) {
            return loadSync(key, hash, loadingValueReference, loader);
          }
        } finally {
          statsCounter.recordMisses(1);
        }
      } else {
        // 前面有一步判断key的状态是否为loading,如果是,到这一步就是去同步等待loading结束,返回结果
        return waitForLoadingValue(e, key, valueReference);
      }
    }
    
    /**
    * 同步加载数据
    **/
     V loadSync(
        K key,
        int hash,
        LoadingValueReference<K, V> loadingValueReference,
        CacheLoader<? super K, V> loader)
        throws ExecutionException {
        // 同步加载操作,封装为一个loadingFutrue返回,异步加载loadAsync也是调用loadFutrue这个方法,loadFutrue()方法总的来说就是将加载操作封装为一个future返回,没什么可说的,这里不再分析其源码
      ListenableFuture<V> loadingFuture = loadingValueReference.loadFuture(key, loader);
      // 这一步是得到value,并进行相关的缓存容量管理等操作
      return getAndRecordStats(key, hash, loadingValueReference, loadingFuture);
    }
    
    /**
    * 等待数据加载完成,并存储到cache中
    **/
    V getAndRecordStats(
        K key,
        int hash,
        LoadingValueReference<K, V> loadingValueReference,
        ListenableFuture<V> newValue)
        throws ExecutionException {
      V value = null;
      try {
        // 这一步就是调用future的get,等待数据加载返回,根据名字可以知道这个方法忽略了线程interrupt,知道数据加载结束
        value = getUninterruptibly(newValue);
        if (value == null) {
          throw new InvalidCacheLoadException("CacheLoader returned null for key " + key + ".");
        }
        // 记录数据加载成功相关信息, 做统计使用
        statsCounter.recordLoadSuccess(loadingValueReference.elapsedNanos());
        // 
        storeLoadedValue(key, hash, loadingValueReference, value);
        return value;
      } finally {
        if (value == null) {
          statsCounter.recordLoadException(loadingValueReference.elapsedNanos());
          // 删除value正在loading的状态
          removeLoadingValue(key, hash, loadingValueReference);
        }
      }
    }
    
    /**
    * 将新加载的结果存储到cache中
    * 这一步也会对缓存容量进行管理操作(基于size或者weight的LRU清理)
    */
    boolean storeLoadedValue(
        K key, int hash, LoadingValueReference<K, V> oldValueReference, V newValue) {
      lock();
      try {
        long now = map.ticker.read();
        // 这一步也是数据清理三件套
        preWriteCleanup(now);
        int newCount = this.count + 1;
        // 扩容操作
        if (newCount > this.threshold) { // ensure capacity
          expand();
          newCount = this.count + 1;
        }
        AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
        int index = hash & (table.length() - 1);
        ReferenceEntry<K, V> first = table.get(index);
        // 遍历得到对应的entry
        for (ReferenceEntry<K, V> e = first; e != null; e = e.getNext()) {
          K entryKey = e.getKey();
          if (e.getHash() == hash
              && entryKey != null
              && map.keyEquivalence.equivalent(key, entryKey)) {
            ValueReference<K, V> valueReference = e.getValueReference();
            V entryValue = valueReference.get();
            // replace the old LoadingValueReference if it's live, otherwise
            // perform a putIfAbsent
            // 旧值如果存在,进行替换
            if (oldValueReference == valueReference
                || (entryValue == null && valueReference != UNSET)) {
              ++modCount;
              if (oldValueReference.isActive()) {
                RemovalCause cause =
                    (entryValue == null) ? RemovalCause.COLLECTED : RemovalCause.REPLACED;
                enqueueNotification(key, hash, entryValue, oldValueReference.getWeight(), cause);
                newCount--;
              }
              setValue(e, key, newValue, now);
              this.count = newCount; // write-volatile
              evictEntries(e);
              return true;
            }

            // the loaded value was already clobbere
            enqueueNotification(key, hash, newValue, 0, RemovalCause.REPLACED);
            return false;
          }
        }

        ++modCount;
        ReferenceEntry<K, V> newEntry = newEntry(key, hash, first);
        // 设置新值
        setValue(newEntry, key, newValue, now);
        table.set(index, newEntry);
        this.count = newCount; // write-volatile
        // 基于weight比重清理策略进行清理操作
        evictEntries(newEntry);
        return true;
      } finally {
        unlock();
        postWriteCleanup();
      }
    }
    
    @GuardedBy("this")
    void evictEntries(ReferenceEntry<K, V> newest) {
      if (!map.evictsBySize()) {
        return;
      }

      drainRecencyQueue();

      // 如果当前新加入的值的weight大于weight阈值,则直接删除当前值所在entry
      // 注意:此处的maxSegmentWeight并不一定是weight,也可能是size!!!
     // guava cache在初始化时,会判断当前是基于size还是基于weight来设置maxSegmentWeight,后面会讲
      if (newest.getValueReference().getWeight() > maxSegmentWeight) {
        if (!removeEntry(newest, newest.getHash(), RemovalCause.SIZE)) {
          throw new AssertionError();
        }
      }

     // 如果当前总的weight大于weight阈值,则循环滴遍历accessQueue(队列头部),删除队列中最前的weight>0的元素,直到总的weight <= weight阈值
     // 注意:此处的maxSegmentWeight并不一定是weight,也可能是size!!!
      while (totalWeight > maxSegmentWeight) {
        ReferenceEntry<K, V> e = getNextEvictable();
        if (!removeEntry(e, e.getHash(), RemovalCause.SIZE)) {
          throw new AssertionError();
        }
      }
    }
    
    前面在说到,maxSegmentWeight有可能是weight也有可能是size,这是因为guava 在初始化时,会判断当前是设置了size还是weight,如果是size则就按size计算得的weight阈值,否则就根据weight得到
    // 下面的操作是判断设置了size还是weight
    long getMaximumWeight() {
        if (expireAfterWriteNanos == 0 || expireAfterAccessNanos == 0) {
          return 0;
        }
        // 如果设置了size,就返回size,否则返回weight
        return (weigher == null) ? maximumSize : maximumWeight;
    }
    // 当前maxWeight,是根据getMaximumWeight()得到的,因此maxWeight有可能是size也可能是weight的阈值
    maxWeight = builder.getMaximumWeight();

    // 如果设置了size阈值或者weight阈值(evictsBySize()方法判断)
    // 一个cache中有多个segment,因此就是将一个size阈值或weight阈值平分到各个segment上
    if (evictsBySize()) {
      // Ensure sum of segment max weights = overall max weights
      long maxSegmentWeight = maxWeight / segmentCount + 1;
      long remainder = maxWeight % segmentCount;
      for (int i = 0; i < this.segments.length; ++i) {
        if (i == remainder) {
          maxSegmentWeight--;
        }
        this.segments[i] =
            createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
      }
    } else {
      for (int i = 0; i < this.segments.length; ++i) {
        this.segments[i] =
            createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
      }
    }
    

2.2、源码分析总结

guava cache总结
    内部数据结构
        类似于ConcurrentHashMap,Segement数组 + Entry链表数组
    如何管理数据
        1、被动清理。只有当访问数据(比如get操作)时,guavacache才会去清理数据.
        2、清理两方面的数据
            非强引用回收
            (1)当key或value为非强引用类型(弱引用或软引用)的对象被GC回收后,其对应的entry会被清理(清理当前segment中<所有(reference queues队列中所有的)>非强引用被回收的数据)。
            有效期判断
            (2)当数据失效时。 当前数据失效时,会清理整个segment中所有失效的数据(不是只清理当前失效的这个key)。
            清理时,线程会尝试获得锁,只有获得锁的线程才会去清理,其他线程得不到锁,则直接返回.
            (3)基于size或weight的LRU清理
            guava cache是基于LRU算法实现的数据清理,guava cache中有一个accessQueue队列和一个recencyQueue队列,当数据被get读取时,数据节点被放入recencyQueue队列的尾部,当数据被加载出来(包括写操作)时,如果对应的key之前已经存在,则会根据recencyQueue更新accessQueue中元素的顺序(数据顺序按时间排序),然后将新值放入accessQueue尾部,如果key不存在,则直接加入到accessQueue尾部。
           当cache中超过阈值需要清理时,则从accessQueue的头部开始清理,这样就实现了LRU
        3、失效判断
            设置了expiresAfterAccess,并且超过expiresAfterAccess时间没有访问数据(读、写),则数据失效。
            设置了expiresAfterWrite,并且超过expiresAfterWrite时间没有更新数据(写),则数据失效。
            数据失效时,清理数据,并去load数据
        4、数据刷新
            设置了refreshAfterWrite,并且超过refreshAfterWrite时间没有更新数据,则调用reload刷新数据(guava内部默认是同步调用,如果要实现异步,需要重写reload操作)
        5、清理刷新数据流程
            1、首先,访问数据时,如果能通过key找到对应的entry,如果entry对象中对应的key或value为null,则表示是由于gc回收(回收非强引用)导致的,此时会触发对cache中这类数据的主动清理
            2、接着判断通过key得到的entry是否超过expiresAfterAccess,如果是则过期,触发主动清理过期数据的操作
            3、然后判断entry是否超过expiresAfterWrite,如果是则过期,触发清理过程
            4、如果经过上面的操作,数据被清理(返回null),则最后调用load()加载数据。
            5、如果经过123,数据不为空,则判断refreshAfterWrite,如果满足,则调用reload()刷新数据

    并发处理
        guava cache内部没有实现多线程回收数据,而是在访问数据时主动去清理(一般以segment为单位清理),目的是为了减小多线程带来的开销
        load()时, 只会有一个线程去执行load(),其他线程会被阻塞,直到数据加载成功。
        reload()时,只会有一个线程去执行reload(),其他线程会返回oldValue(),guava内部默认是同步调用reload。
        因此guava cache推荐重写reload()方法,其默认实现是同步调用load(),需要自己实现多线程处理(比如在reload中搞一个线程池)
    三个时间参数
        cache的时间参数一般用于控制数据占用空间和数据的实时性
        expiresAfterAccess用于管理数据空间,清理不常用的数据
        expiresAfterWrite和refreshAfterWrite则用于管理数据的时效性。
        不同的是当expireAfterWrite过期时,会重新同步的load()数据,而refreshAfterWrite过期时,会reload()数据,reload可以实现异步加载。
        因此在数据时效性上expireAfterWrite和refreshAfterWrite要比expireAfterAccess更灵敏 , expireAfterAccess可设置的大一些 expireAfterWrite不可设置的太小,否则会造成业务线程同步去拉取数据的频率执行,如果要实现全异步刷新数据,refreshAfterWrite要设置的比前两个值都小,并且实现reload的异步加载。

[图片上传失败...(image-3e576d-1571749293092)]

通过上面对guava cache源码的浅读,解决了我的第一个问题和第二个问题前半部分。
那么如何实现主动清理数据呢?
可以使用HashedWheelTimer来搞一下,写的有点累了,这里就不介绍HashedWheel了,可以自己百度一下,实现很简单,也很使用,尤其是在任务量比较大的场景:
[图片上传失败...(image-c400ef-1571749293092)],另外也可以选择caffeine,这缓存组件性能要远超guava cache,后续再写一篇文章搞一下。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • 原文 使用Guava cache构建本地缓存 - sameLuo的个人空间 - OSCHINA Guava Cac...
    OoLukeoO阅读 6,604评论 0 3
  • 在上篇文章01 初识缓存-了解缓存中简单了介绍了下缓存的历程以及几种常见的技术进行简单介绍,本着学习的目的本节针对...
    花神子阅读 736评论 0 4
  • Google Guava Cache是一种非常优秀本地缓存解决方案,提供了基于容量,时间和引用的缓存回收方式。基于...
    时之令阅读 4,307评论 0 1
  • 钰教主/文 转眼来京半年了,还记得刚出北京南站的那天,入春的暖阳带着些许冬日的冷色,吹在身上的风也是软软的,仿若入...
    河对岸的窗阅读 971评论 17 16
  • 瓶开秋夜酒香浓,一片丹衷尽在胸。 原地与君逢二度,当时感我意千重。 只缘心境缜而栗,难得仪形温且恭。 此醉再添几分...
    雪窗_武立之阅读 319评论 0 3