Java并发-Disruptor


  • Disruptor 通过以下设计来解决队列速度慢的问题
    • 环形数组结构:为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好(空间局部性原理)
    • 元素位置定位:数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完
    • 无锁设计:每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据
    • 利用缓存行填充解决了伪共享的问题
    • 实现了基于事件驱动的生产者消费者模型(观察者模式)
  • RingBuffer 数据结构:一个可自定义大小的环形数组,还有一个序列号(sequence),用以指向下一个可用的元素
    • Disruptor要求设置数组长度为2的n次幂。在知道索引(index)下标的情况下,存与取数组上的元素时间复杂度只有O(1),而这个index我们可以通过序列号与数组的长度取模来计算得出,index=sequence % entries.length。也可以用位运算来计算效率更高,此时array.length必须是2的幂次方,index=sequece&(entries.length-1)
    • 当需要覆盖数据时,会执行一个策略,Disruptor给提供多种策略
      • BlockingWaitStrategy策略,常见且默认的等待策略。使用ReentrantLock+Condition实现阻塞,最节省cpu,但高并发场景下性能最差。适合CPU资源紧缺,吞吐量和延迟并不重要的场景
      • SleepingWaitStrategy策略,会在循环中不断等待数据。先进行自旋等待如果不成功,则使用Thread.yield()让出CPU,并最终使用LockSupport.parkNanos(1L)进行线程休眠,以确保不占用太多的CPU资源。因此这个策略会产生比较高的平均延时。典型的应用场景就是异步日志
      • YieldingWaitStrategy策略,这个策略用于低延时的场合。消费者线程会不断循环监控缓冲区变化,在循环内部使用Thread.yield()让出CPU给别的线程执行时间。如果需要一个高性能的系统,并且对延时比较有严格的要求,可以考虑这种策略
      • BusySpinWaitStrategy策略: 采用死循环,消费者线程会尽最大努力监控缓冲区的变化。对延时非常苛刻的场景使用,cpu核数必须大于消费者线程数量。推荐在线程绑定到固定的CPU的场景下使用
  • 一个生产者单线程写数据的流程
    1. 申请写入m个元素
    2. 若是有m个元素可以写入,则返回最大的序列号。这里主要判断是否会覆盖未读的元素
    3. 若是返回的正确,则生产者开始写入元素
  • 多个生产者写数据的流程
    1. 申请写入m个元素
    2. 若是有m个元素可以写入,则返回最大的序列号。每个生产者会被分配一段独享的空间
    3. 生产者写入元素,写入元素的同时设置available Buffer里面相应的位置,以标记自己哪些位置是已经写入成功的
    • 问题
      • 如何防止多个线程重复写同一个元素:每个线程通过CAS获取不同的一段数组空间进行操作
      • 如何防止读取的时候,读到还未写的元素:引入了一个与Ring Buffer大小相同的buffer:available Buffer。当某个位置写入成功的时候,便把availble Buffer相应的位置置位,标记为写入成功。读取的时候,会遍历available Buffer,来判断元素是否已经就绪
  • 生产者多线程写入的情况下读数据
    1. 申请读取到序号n
    2. 若writer cursor >= n,这时仍然无法确定连续可读的最大下标。从reader cursor开始读取available Buffer,一直查到第一个不可用的元素,然后返回最大连续可读元素的位置
    3. 消费者读取元素


  • Disruptor核心概念
    • RingBuffer(环形缓冲区):基于数组的内存级别缓存,是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口
    • Disruptor(总体执行入口):对RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用
    • Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享
    • Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
    • SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑
    • WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略
    • Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义
    • EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口
    • EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence


Disruptor的使用

<dependency>
	<groupId>com.lmax</groupId>
	<artifactId>disruptor</artifactId>
</dependency>
  1. 创建 Event(消息载体/事件)和 EventFactory(事件工厂) implements EventFactory
  2. 创建消息(事件)生产者 RingBuffer#publish
  3. 创建消费者 implements EventHandler
    • disruptor#handleEventsWithWorkerPool 多消费者下一个消息只会被一个消费者消费,要实现 WorkHandler
    • 顺序消费:disruptor.handleEventsWith().then().then()

文章作者: 钱不寒
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 钱不寒 !
  目录