高性能百万QPS队列Disruptor解析

1. Disruptor是什么?

1.1 简介

    Disruptor它是一个开源的并发框架,并获得2011年Oracle Duke’s 程序框架创新奖,它的设计初衷是解决高并发场景下队列锁的问题。它最早是由一家英国的外汇公司LMAX(一种新型零售金融交易平台)开发与开源的,能够在无锁的情况下实现对队列的高并发操作,这个系统是建立在JVM平台上,核心是一个业务逻辑处理器,官方号称它能够在单线程里每秒处理6百万订单,业务逻辑处理器完全是运行在内存中,使用事件源驱动方式。业务逻辑处理器的核心是Disruptor。

    需要特别说明的是,这里所说的队列是系统内部的内存队列,而非Kafka、RocketMQ这样的分布式队列。

推荐学习网站:

https://ifeve.com/disruptor-getting-started/

https://www.jianshu.com/p/78160f213862

1.2队列

1.2 .1 特性

    先进先出(FIFO),先进入队列的元素先出队列(可以理解为我们生活中的食堂排队情况,先排队,先吃饭),生产者(Producer)往队列里面发布事件,消费者(Consumer)获得消息通知并消费,如果队列里面没有事件的时候,消费者堵塞,直到生产者发布了新的事件,消费者再继续消费。

1.2.2 Java中的阻塞队列

ArrayBlockingQueue:基于数组结构组成的有界阻塞队列,使用时必须指定容量大小,内部使用加锁机制保证多线程运行的安全性。

LinkedBlockingQueue:基于链表实现的可选界限的双端阻塞队列,可在构造函数中指定容量,不指定意味着容量为Integer.MAX_VALUE,内部使用加锁机制保证多线程运行的安全性。

ConcurrentLinkedQueue:基于链表实现的线程安全无界非阻塞队列,内部使用CAS操作实现无锁机制保证多线程运行的安全性。

LinkedTransferQueue:基于链表实现的无界阻塞队列,内部使用CAS操作实现无锁机制保证多线程运行的安全性。

总结:通过不加锁的方式实现的队列都是无界的,无界意味着队列长度不可控,若生产者速度过快,则会引起内存溢出。所以在稳定性要求高的系统中,只能选择有界队列,而为了减少Java的垃圾回收对系统性能的影响,会尽量选择数组格式的数据结构,这样筛选下来,只有ArrayBlockingQueue符合了。

1.2.3 Disruptor队列

    有界,无锁,多生产者或消费者时才用到CAS操作机制。官方也对disruptor和ArrayBlockingQueue的性能在不同的应用场景下做了对比,目测性能有5~10倍左右的提升。

    内部核心RingBuffer数据结构图如下:

2. 为何如此快?

2.1 环形数组结构

1. 数组查询效率高,时间复杂度O(1),而链表查询的时间复杂度为O(n)

2. 用数组实现, 解决了链表节点分散, 不利于cache预读问题

3. 可以预分配用于存储事件内容的内存空间,并且解决了节点每次需要分配和释放, 需要大量的GC问题,此外,不像链表那样,需要为每一个添加到其上面的对象创造节点对象—对应的,当删除节点时,需要执行相应的内存清理操作。

4.环形数组的元素,采用覆盖的方式,避免了JVM的GC。

2.2 无锁

    Disruptor根本就不用锁。只在需要确保操作是线程安全的(特别是,在多生产者的环境下,更新下一个可用的序列号)地方,我们使用CAS(Compare And Swap/Set)操作,使用CAS,严格意义上说仍然是使用锁,因为CAS本质上类似是一种乐观锁, 只不过是CPU级别指令, 不涉及到操作系统, 所以效率很高(AtomicLong实现Sequence)。

    而单线程的使用普通long,没有锁也没有CAS。这意味着单线程版本会非常快,因为它只有一个生产者,不会产生序号上的冲突。

2.3 数组元素定位

    求余操作本身也是一种高耗费的操作, 所以Ringbuffer的size设成2的n次方, 可以利用位操作来高效实现求余。要找到数组中当前序号指向的元素,可以通过mod操作,正常通过sequence % arr.length,优化后可以通过sequence & (arr.length-1)来实现数组下标的获取。比如一共有8槽,seq当前为10,10%8取模后为2,代表应该在数组下标为4的位置,而位操作10&(8-1)=2,HashMap就是用这个方式来定位数组元素的,这种方式比取模的速度更快。

2.4 解决伪共享

2.4.1 缓存行

Cpu cache简单示意图:


    CPU是机器的心脏,最终由它来执行所有运算和程序。主内存(RAM)是数据(包括代码行)存放的地方。CPU和主内存之间有好几层缓存,因为直接访问主内存也是非常慢的。其中L1,L2,L3等级缓存都是由缓存行组成的, 通常是64字节, 一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量. 缓存行是缓存更新的基本单位, 就算你只读一个变量, 系统也会预读其余7个, 并cache这一行, 并且这行中的任一变量发生改变, 都需要重新加载整行, 而非仅仅重新加载一个变量。

2.4.2 伪共享及解决方案-神奇的填充

    缓存行的这种免费加载同时也引入了一个弊端,当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享,整个缓存行需要从主内存重新读取,从而影响了并发效率,这叫作“伪共享”(译注:可以理解为错误的共享)

    你会看到Disruptor消除这个问题,至少对于缓存行大小是64字节或更少的处理器架构来说是这样的(译注:有可能处理器的缓存行是128字节,那么使用64字节填充还是会存在伪共享问题),通过增加补全来确保ring buffer的序列号Sequence不会和其他东西同时存在于一个缓存行中。

public longp1, p2, p3, p4, p5, p6, p7; // cache line padding

private volatile long cursor = INITIAL_CURSOR_VALUE;

public long p9, p10, p11, p12,p13, p14, p15; // cache line padding

3. Disruptor开发示例

从一个简单的事件(消息)生产和消费为例,核心类如下:

UUIDEvent.java        定义事件或消息的数据

public class UUIDEvent {

      private String uuid;

      public String getUuid() {

                return uuid;

      }

        public void setUuid(String uuid){

            this.uuid = uuid;

      }

}

UUIDEventFactory.java  定义事件工厂类实例化事件数据对象

public class UUIDEventFactory implements EventFactory<UUIDEvent> {

    /**    * 通过事件工厂类实例化事件数据对象    *    * @return    */

      @Override

    public UUIDEvent newInstance() {

              return new UUIDEvent();

      }

}

Consumer.java          定义事件消费者

public class Consumer implements EventHandler<UUIDEvent> {

      private String consumerId;

      private static final AtomicInteger ai = new AtomicInteger(0);

        public Consumer() {

              this.consumerId = "消费者" + ai.getAndIncrement();

        }

  @Override

  public void onEvent(UUIDEvent longEvent, long l, boolean b) throws Exception {

          try {

                  Thread.sleep(500);

          } catch (InterruptedException e) {

                  e.printStackTrace();

            }

          System.out.println("msg:" + longEvent.getUuid());

      }

}

Producer.java          定义事件生产者

public class Producer {

      private final RingBuffer<UUIDEvent> ringBuffer;

      public Producer(RingBuffer<UUIDEvent> ringBuffer) {

              this.ringBuffer = ringBuffer;

        }

    public void onData(String uuid) {

          //可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽        long    sequence = ringBuffer.next();

          try {

                  //用上面的序列号取出一个事件用于填充或覆盖

                  UUIDEvent event = ringBuffer.get(sequence);

                  event.setUuid(uuid);

              } finally {

                      //发布事件,发布后会提交给消费者进行消费

                      ringBuffer.publish(sequence);

                }

        }

}

UUIDEventMain.java      定义运行启动类

public class UUIDEventMain {

      public static void main(String[] args) throws InterruptedException {

              ThreadFactory threadFactory = Executors.defaultThreadFactory();

              UUIDEventFactory factory = new UUIDEventFactory();

              int bufferSize = 1024 * 1024;

        Disruptor<UUIDEvent> disruptor = new Disruptor<>(factory, bufferSize, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy());        disruptor.handleEventsWith(new Consumer());

        disruptor        disruptor.start();

              System.out.println("disruptor启动");

        Producer producer = new Producer(disruptor.getRingBuffer());

                for (int i = 0; i < 32; i++) {            //生产者发布事件                            producer.onData(UUID.randomUUID().toString());

              }

        disruptor        disruptor.shutdown();

              System.out.println("disruptor关闭");

      }

}

4. Disruptor原理分析

4.1 Ringbuffer

    RingBuffer是一个环形队列,内部是一个数组和序列号组成,RingBuffer是存储消息的地方,通过一个名为cursor的Sequence对象指示队列的头,协调多个生产者向RingBuffer中添加消息,并用于在消费者端判断RingBuffer是否为空。巧妙的是,表示队列尾的Sequence并没有在RingBuffer中,而是由消费者维护。这样的好处是多个消费者处理消息的方式更加灵活,可以在一个RingBuffer上实现消息的单播,多播,流水线以及它们的组合。其缺点是在生产者端判断RingBuffer是否已满是需要跟踪更多的信息,为此,在RingBuffer中维护了一个名为gatingSequences的Sequence数组来跟踪相关Seqence。

4.2 Producer/Consumer

    Producer即生产者,调用Disruptor 发布事件。有两种实现策略,对应的实现类为SingleProducerSequencerMultiProducerSequencer【都实现了Sequencer类,之所以叫Sequencer是因为他们都是通过Sequence来实现数据读写】 ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。具体使用哪个根据自己的场景来定,多线程的策略使用了AtomicLong(Java提供的CAS操作),而单线程的使用long,没有锁也没有CAS。这意味着单线程版本会非常快,因为它只有一个生产者,不会产生序号上的冲突。

    Consumer和EventProcessor是一个概念,新的版本中由EventProcessor概念替代了Consumer。

总结:Producer生产event数据存入RingBuffer中,然后发布给消费者,EventHandler或WorkHandler作为消费者消费event并进行逻辑处理。消费消息的进度通过Sequence来控制。

4.3 Sequence

    Sequence是Disruptor最核心的组件,上面已经提到过了。生产者对RingBuffer的互斥访问,生产者与消费者之间的协调以及消费者之间的协调,都是通过Sequence实现。几乎每一个重要的组件都包含Sequence。那么Sequence是什么呢?首先Sequence是一个递增的序号,说白了就是计数器;其次,由于需要在线程间共享,所以Sequence是引用传递,并且是线程安全的;再次,Sequence支持CAS操作;最后,为了提高效率,Sequence通过padding来避免伪共享。

总结:Sequence是一个做了缓存行填充优化的原子序列。

4.4 SequenceBarrier

    用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。 Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。SequenceBarrier用来在消费者之间以及消费者和RingBuffer之间建立依赖关系。在Disruptor中,依赖关系实际上指的是Sequence的大小关系,消费者A依赖于消费者B指的是消费者A的Sequence一定要小于等于消费者B的Sequence,这种大小关系决定了处理某个消息的先后顺序。因为所有消费者都依赖于RingBuffer,所以消费者的Sequence一定小于等于RingBuffer中名为cursor的Sequence,即消息一定是先被生产者放到Ringbuffer中,然后才能被消费者处理。

    SequenceBarrier在初始化的时候会收集需要依赖的组件的Sequence,RingBuffer的cursor会被自动的加入其中。需要依赖其他消费者和/或RingBuffer的消费者在消费下一个消息时,会先等待在SequenceBarrier上,直到所有被依赖的消费者和RingBuffer的Sequence大于等于这个消费者的Sequence。当被依赖的消费者或RingBuffer的Sequence有变化时,会通知SequenceBarrier唤醒等待在它上面的消费者。

4.5 WaitStrategy

    当消费者等待在SequenceBarrier上时,有许多可选的等待策略,不同的等待策略在延迟和CPU资源的占用上有所不同,可以视应用场景选择:

BusySpinWaitStrategy:自旋等待,类似Linux Kernel使用的自旋锁。低延迟但同时对CPU资源的占用也多。

BlockingWaitStrategy:使用锁和条件变量。CPU资源的占用少,延迟大。

SleepingWaitStrategy:在多次循环尝试不成功后,选择让出CPU,等待下次调度,多次调度后仍不成功,尝试前睡眠一个纳秒级别的时间再尝试。这种策略平衡了延迟和CPU资源占用,但延迟不均匀。

YieldingWaitStrategy:在多次循环尝试不成功后,选择让出CPU,等待下次调。平衡了延迟和CPU资源占用,但延迟也比较均匀。

PhasedBackoffWaitStrategy:上面多种策略的综合,CPU资源的占用少,延迟大。


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,457评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,837评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,696评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,183评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,057评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,105评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,520评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,211评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,482评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,574评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,353评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,897评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,489评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,683评论 2 335