package testForFun.demo20181210.demo03;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import javax.swing.plaf.synth.SynthOptionPaneUI;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created with IntelliJ IDEA
* User:Ryannn
* Date:2018/12/13
* Time:19:09
* Test: Disruptor
*/
public class Test01 {
static class TaskEvent{
private String taskType;
private Integer count;
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
}
public static void main(String[] args) throws InterruptedException {
ThreadFactory threadFactory = new ThreadFactory() {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
@Override
public Thread newThread(Runnable r) {
AtomicInteger integer = new AtomicInteger();
int num = integer.incrementAndGet();
System.out.println("生成线程:"+num);
return new Thread(r,"simple Thread"+String.valueOf(num));
}
};
//2,事件工场,初始化事件的时候使用
EventFactory<TaskEvent> factory = new EventFactory<TaskEvent>() {
@Override
public TaskEvent newInstance() {
return new TaskEvent();
}
};
//3.1,处理PGN事件的handler,
EventHandler<TaskEvent> handler1 = new EventHandler<TaskEvent>() {
@Override
public void onEvent(TaskEvent taskEvent, long l, boolean b) throws Exception {
if(taskEvent.getTaskType().equals("PGN")){
System.out.println("【hander1】处理【PGN事件】"+taskEvent.getCount());
}
}
};
//3.2,处理PSR事件的hangler
EventHandler<TaskEvent> handler2 = new EventHandler<TaskEvent>() {
@Override
public void onEvent(TaskEvent taskEvent, long l, boolean b) throws Exception {
if(taskEvent.getTaskType().equals("PSR")){
System.out.println("【handler2】处理【PSR事件】"+taskEvent.getCount());
}
}
};
//4,设置策略
BlockingWaitStrategy strategy = new BlockingWaitStrategy();
//5,指定ringBuffer的大小
int size=1024;
//6,组装Disruptor
Disruptor<TaskEvent> disruptor = new Disruptor<>(factory, size, threadFactory, ProducerType.SINGLE, strategy);
//7,设置处理的hangdler
disruptor.handleEventsWith(handler1);
disruptor.handleEventsWith(handler2);
//8,启动
disruptor.start();
//9,测试demo
RingBuffer<TaskEvent> ringBuffer = disruptor.getRingBuffer();
//for (int i = 0; i < 3; i++) {
long sequence = ringBuffer.next();
try {
TaskEvent taskEvent = ringBuffer.get(sequence);
// taskEvent.setTaskType("PGN");
taskEvent.setTaskType("PSR");
// taskEvent.setTaskType("LSR");
taskEvent.setCount(2);
}finally {
ringBuffer.publish(sequence);
}
Thread.sleep(10);
}
}
Disruptor-->Demo02
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 本文是笔者在研究Disruptor过程中翻译的Disruptor1.0论文精选,中间穿插了一些感想和说明,均以“译...
- 译文转载自:http://ifeve.com/disruptor-cacheline-padding/ 英文原文地...
- 本篇文章是后续解读Disruptor源码的导读,适合对Disruptor还不了解的同学。如果有兴趣,还可以看下我之...
- 前言 秒杀架构持续优化中,基于自身认知不足之处在所难免,也请大家指正,共同进步。文章标题来自码友的建议,希望可以把...