What is yqueue?
yqueue是ZMQ底层实现的一个高效的队列数据结构。它的主要特点有两个:
- 动态增长的时候,它能够尽可能少的进行内存分配和内存释放。
- 允许生产者线程和消费者线程不加锁的进行生产和消费。
注意:用户必须保证不在空的yqueue上进行出队(pop)操作,并且在未进行同步的情况下,生产者和消费者不应该同时访问同一个元素。
yqueue API
ZMQ实现的yqueue提供了如下几个接口:
template <typename T, int N> class yqueue_t {
public:
// 获取队头元素的引用
inline T &front();
// 获取队尾元素的引用
inline T &back();
// 向队列插入元素
inline void push();
// 撤销最近一次得插入操作
inline void unpush();
// 将队头出列
inline void pop();
}
ZMQ的队列操作和C++的queue“很不一样”,在使用该数据结构时要非常小心。
- yqueue的push并不接收参数,而只是将队列扩容,真正要插入元素需要在调用push方法之后,调用
yqueue.back() = val
。 - yqueue的pop并不真正把数据删除,而只是将队头“指针”向后移动。并且该操作不返回队头的元素。想得到队头的元素需要在调用pop方法之前,调用
T val = yqueue.front()
。 - yqueue的unpush是独有的,与pop类似,在调用unpush之前,要先调用
T garbage = yqueue.front()
,同时,调用者还要保证以下两个条件:- 队列不空
- 释放要撤销的元素占用的资源
Implementation
yqueue实质上是对如下结构体的封装:
private:
struct chunk_t {
T values [N];
chunk_t *prev;
chunk_t *next;
};
- yqueue的内存动态分配都是以chunk_t为单位,其内部是包含N个元素的数组(ZMQ使用的N值是256,通过这样的设置,能够削弱约99.6%的由内存分配带来的影响)。
yqueue所有队列的操作都是通过下面的几个指针来完成的:
private:
chunk_t *begin_chunk;
int begin_pos;
chunk_t *back_chunk;
int back_pos;
chunk_t *end_chunk;
int end_pos;
- chunk_t *指针标志了对应的chunk_t块,而int值表明了块内偏移。利用这两个参数可以定位到yqueue内部的任意元素。
- 使用者应该保证三个指针分别被各自使用的线程独占,或互斥的占有。对应到API也就是说,pop、push和unpush操作应保证使用时被某线程独占;而生产者和消费者之间可以是两个无需同步的线程。
yqueue的生产者和消费者不用加锁是通过一个原子指针实现的:
private:
template <typename T> class atomic_ptr_t {
public:
// 非线程安全,在初始化时用来为指定初值
inline void set(T *ptr);
// 原子“交换”操作,将值设置为val并返回之前的值
inline T *xchg(T *val);
// 原子的“对比 & 交换”操作,若旧值与cmp相等,边赋值为val;
// 否则旧值不变。最后返回旧值。
inline T *cas(T *cmp, T*val);
private:
volatile T *ptr;
};
atomic_ptr_t<chunk_t> spare_chunk;
- atomic_ptr_t实质上是对各种平台的互斥操作进行了封装,保证了指针在任意时刻只能又一个线程持有,从而保证了基于该指针进行的所有操作的原子性。
yqueue之所以能极大程度的减少内存分配,并不仅仅是选取恰当的N值,还得益于底层实现采用的“内存重用”技术:
inline void push() {
// 将back()依赖的指针定位到yqueue尾部,以便push()调用后执行back()操作
back_chunk = end_chunk;
back_pos = end_pos;
// 当前的块仍有空间,不需要扩容
if (++end_pos != N)
return;
// 获取原子指针的值,并将原子指针的值设为NULL
// 如果有pop()操作造成的废弃的块,yqueue不会马上释放
// 而是用原子指针保存,以便内存重用
chunk_t *sc = spare_chunk.xchg(NULL);
if (sc) {
// 若此时有可重用的内存,则sc非空
end_chunk->next = sc;
sc->prev = end_chunk;
} else {
// sc == NULL表示没有可重用内存,此时需要malloc动态分配
end_chunk->next = (chunk_t*)malloc(sizeof(chunk_t));
end_chunk->next->prev = end_chunk;
}
// 将队尾移到扩容后的块上
end_chunk = end_chunk->next;
end_pos = 0;
}
inline void pop() {
if (++begin_pos == N) {
// 当前块已全部出队(废弃),用临时指针o指向
chunk_t *o = begin_chunk;
// 将队头指向下一块的开始,剔除旧块,并复位偏移量
begin_chunk = begin_chunk->next;
begin_chunk->prev = NULL;
begin_pos = 0;
// 根据“最近经常使用”内存分页算法,将O设为重用块效率最高
// 也就是说,如果马上扩容时,不需要进行内存页面替换操作
// geek,利用操作系统内存管理,追求性能到极致
chunk_t *cs = spare_chunk.xchg(o);
// 别忘了释放旧的、不用的块,这回是真的没用了
free(cs);
}
}
- 通过选取合适的N值,以及pop和push操作巧妙的利用原子指针进行内存重用,是的yqueue极大程度的减少了内存分配和销毁的开销。
Tips
- 高效往往是利用巧妙的数据结构,结合操作系统经过精心设计的结晶。