ZeroMQ API Reference
void *zmq_ctx_new();
创建一个ZMQ的上下文环境,是ZMQ一切的开始。
- 线程安全,不需要自己加锁和信号量处理。
- Return:成功的话返回一个句柄;否则返回NULL并设置errno。
int zmq_ctx_shutdown(void *context);
关闭ZMQ的上下文环境。
- 基于该上下文环境的socket进行的阻塞操作立即返回ETERM。
- zmq_close()也会立即返回ETERM。
- Return:成功的话返回0;否则返回-1并设置errno。
注意 该函数并没有释放ZMQ分配的资源。因此,对于彻底关闭ZMQ通讯而言,执行该函数是可选的,调用之后仍需调用zmq_ctx_term释放ZMQ分配的所有资源。
int zmq_ctx_term(void *context);
销毁ZMQ的上下文环境,释放ZMQ分配的所有资源,是ZMQ一切的结束。执行zmq_ctx_shutdown的所有操作。
- 在打断所有阻塞的调用后,zmq_ctx_term自己会阻塞直到所有context上的socket执行zmq_close()完毕。
- Return:成功的话返回0;否二返回-1并设置errno。
void *zmq_socket(void *context, int type);
创建一个给定上下文环境下的ZMQ socket,type指定了ZMQ通讯的模式。
- 返回的socket是无状态的,还需要调用zmq_connect连接到端点,或zmq_bind绑定到指定端口/IP
- Return:成功的话返回一个句柄;否则返回NULL并设置errno。
- ZMQ socket不是线程安全的,尽量不要多线程共享一个zmq socket。
注意
- 传统的socket提供的是同步的字节流或数据包通讯接口;而ZMQ socket提供的是异步的消息队列,传递的不是离散的数据包或连续的字节流,而是离散的“消息”。
- 传统的socket提供的是一对一或多对一(多个客户端,一个服务器),特殊情况下会有一对多(多播);而ZMQ socket提供的是多对多模型,客户端可以通过zmq_connect连接多个服务器,服务器端可以通过zmq_bind连接多个客户端。
int zmq_close(void *socket);
关闭指定的socket上的所有连接,未到达的数据会被丢弃。
- 默认情况下,未发送的数据不会被丢弃,这会使zmq_ctx_term函数调用发生堵塞(30秒)。可以通过zmq_setsockopt的ZMQ_LINGER改变策略。
- Return:成功的话返回0;否则返回-1并设置errno。
int zmq_connect(void *socket, const char *endpoint);
(客户端)与指定的端点建立连接,通过endpoint参数指定了通讯协议。
- 除ZMQ_PAIR类型的socket,任何ZMQ socket都支持通过zmq_connect与多个端点建立连接,具体的机制取决于socket模式。
- Return:成功的话返回0;否则返回-1并设置errno。
注意
- 传统的connect函数一般不马上返回(三次握手),一旦超时则socket不再可用,必须关闭;zmq_connect总是立即返回,但是成功调用函数并不表明已经建立连接。因此,绝大多数情况下,服务器调用zmq_bind与客户端调用zmq_connect的顺序无所谓。
- 除了ZMQ_ROUTER模式外,调用zmq_connect后socket进入就绪状态;而调用zmq_bind后socket进入静音状态,根据zmq_socket类型会阻塞或返回并丢弃消息。
int zmq_disconnect(void *socket, const char *endpoint);
(客户端)关闭指定的socket上的指定连接,未到达的数据会被丢弃。
- 默认情况下,与zmq_close相同,未发送的数据不会被丢弃。
- Return:成功的话返回0;否则返回-1并设置errno。
int zmq_bind(void *socket, const char *endpoint);
(服务器)将socket与指定的端点绑定,接收客户端的连接。
- 除ZMQ_PAIR类型的socket,任何ZMQ socket都支持通过zmq_bind与多个端点绑定,接收客户的连接请求。
- Return:成功的话返回0;否则返回-1并设置errno。
注意 调用zmq_bind后的socket进入静音状态,根据zmq_socket类型会阻塞或立即返回并丢弃消息
int zmq_unbind(void *socket, const char *endpoint);
(服务器)将socket与指定的端点解绑,不再接收向该端点提出连接请求的客户端连接。
- 端点包含通配符“*”时,需通过getsockopt的ZMQ_LAST_ENDPOINT获得真实的端点再调用zmq_unbind解绑。例如:
/* Create a ZMQ_SUB socket */
void *socket = zmq_socket(context, ZMQ_SUB);
assert(socket);
/* Bind it to the system-assigned ephemeral port using a TCP transport */
rc = zmq_bind(socket, "tcp://127.0.0.1:*");
assert(rc == 0);
/* Obtain real endpoint */
const size_t buf_size = 32;
char buf[buf_size];
rc = zmq_getsockopt(socket, ZMQ_LAST_ENDPOINT, buf,
(size_t *)&buf_size);
assert (rc == 0);
/* Unbind socket by real endpoint */
rc = zmq_unbind(socket, buf);
assert (rc == 0);
- Return:成功的话返回0;否则返回-1并设置errno。
int zmq_setsockopt(void *socket, int option_name,
const void *option_value,size_t option_len);
为指定的socket设定option_name,值由option_value指定。为了使选项生效,要在建立连接(如zmq_connect)之前调用zmq_setsockopt。
- ZMQ_BACKLOG:设定面向连接的协议中,socket队列的最大长度,默认是100。
- ZMQ_CONNECT_TIMEOUT:为connect()系统调用设置超时时间,默认是0。
- ZMQ_IPV6:设置值为1表示启用IPv6协议,默认是0。
- ZMQ_LINGER:设置调用zmq_disconnect或zmq_close后尚未接收消息的等待时间。该选项将进一步影响zmq_ctx_term的阻塞时间。设置为-1表示不丢弃任何消息,zmq_ctx_term将会一直阻塞直到消息全部接收;设置为0表示丢弃消息并立刻返回;设置为正值x表示,x毫秒后将返回,默认是30000毫秒。
- ZMQ_RCVTIMEO:设置recv操作的超时时间。在没有消息接收的情况下,设置为0时,zmq_recv将立即返回-1,并设置errno为EAGAIN;设置为-1时,zmq_recv会一直阻塞直到有消息到达;设置为x时,zmq_recv将等待x毫秒再返回。
注意 该选项会被zmq_recv的flag参数ZMQ_DONTWAIT屏蔽。
- ZMQ_SNDTIMEO:设置send操作的超时时间,与ZMQ_RCVTIMEO类似。
注意 该选项会被zmq_send的flag参数ZMQ_DONTWAIT屏蔽。
int zmq_getsockopt(void *socket, int option_name,
void *option_value,size_t *option_len);
从指定的socket中提取指定option_name,保存在option_value中。
- ZMQ_THREAD_SAFE:返回一个布尔类型,表示当前的socket是否是线程安全的。
- ZMQ_LAST_ENDPOINT:获得试用通配符的端点的真实值。需要注意的是,若TCP的主机为INADDR_ANY,则返回0.0.0.0。
- ZMQ_RCVMORE:返回一个布尔类型,表示从socket最后接收到的消息是否还有后续消息。该选项由zmq_send的flags设置。
int zmq_recv(void *socket, void *buf,size_t len, int flags);
从指定的socket接收消息,并存储在buf中。
- 如果接收的消息长度大于参数len,消息会被截断。
- 若flags参数为0,则在无消息时zmq_recv会阻塞直到消息到来;若flags为ZMQ_DONTWAIT,则zmq_recv会立即返回EAGAIN。
注意 当socket无数据接收时,若flags参数设置为ZMQ_DONTWAIT则zmq_recv立即返回-1,忽略zmq_setsockopt的ZMQ_RCVTIMEO设置的超时时间。
- Return:成功时返回接收的比特数;否则返回-1并设置errno。
int zmq_send(void *socket, void *buf,size_t len, int flags);
将buf中的消息放入指定socket的发送队列。
- 若flags参数为ZMQ_DONTWAIT,则在无建立的连接情况下,zmq_send立即返回EAGAIN。若flags参数为ZMQ_SNDMORE,则表明消息将分片发送。
- Return:成功时返回消息的比特数;否则返回-1并设置errno。
注意
- 成功调用zmq_send并不能说明消息已经传输到网络,只能保证消息被放入消息队列,而ZMQ对消息队列中的消息负责。
对于多片的消息,ZMQ会“原子”地发送。即要么消息全部送达,要么消息全部未送达。- 当socket无数据发送时,若flags参数设置为ZMQ_DONTWAIT则zmq_send立即返回-1,忽略zmq_setsockopt的ZMQ_SNDTIMEO设置的超时时间。
const char *zmq_strerror(int errnum);
根据给定的errno序号得到相应的出错信息。
- ZMQ定义了额外的errno,因此应用程序应该调用zmq_strerror而不是strerror来获得出错信息。
- Return:返回一个字符串的首地址。
int zmq_poll(zmq_pollitem_t *items, int nitems,long timeout);
提供ZMQ的多路I/O复用机制。
- items参数的结构如下,其中,socket为ZMQ socket,而fd为标准的posix文件描述符,两者填一个。调用zmq_poll之前为items的每个成员指定events:
- ZMQ_POLLIN, ZMQ_POLLOUT, ZMQ_POLLERR, ZMQ_POLLPRI
typedef struct{
void *socket;
int fd;
short events;
short revents;
} zmq_pollitem_t;
- Return:成功时返回revents触发并与events匹配的items成员个数;否则返回-1并设置errno。
注意
- zmq_poll提供了与传统的select和poll类似的“轮询”功能。在单线程下可以同时监控多个套接字的响应状态,使得服务端能够同时处理多个客户端的请求,也可以使客户端同时向多个服务端发送请求。
- 其实,对于ZMQ来说,zmq_poll是不必要的,因为zmq_connect和zmq_bind支持连接/绑定多个端点。完全可以使用它们取代zmq_poll的功能。例如:
// using zmq_poll implements I/O multiplexing
vector<string> zmq_url/* */;
int port_num/* */;
vector<void *> receiver;
vector<void *> context;
zmq_pollitem_t *poll_item;int ret;
poll_item = (zmq_pollitem_t*)malloc(sizeof(zmq_pollitem_t) * port_num);
for (int i = 0; i < port_num; ++i) {
void *aContext, *aReceiver;
aContext = zmq_ctx_new();
aReceiver = zmq_socket(aContext, ZMQ_PULL);
int timeout = 1000; // milliseconds
zmq_setsockopt(aReceiver, ZMQ_RCVTIMEO, &timeout, sizeof(int));
ret = zmq_connect(aReceiver, zmq_url[i].c_str());
assert(0 == ret);
receiver.push_back(aReceiver);
context.push_back(aContext);
poll_item[i].socket = receiver[i];
poll_item[i].fd = 0;
poll_item[i].events = ZMQ_POLLIN;
poll_item[i].revents = 0;
}
ret = zmq_poll(poll_item, port_num, 0);
assert(-1 != ret);
for (int i = 0; i < port_num; ++i) {
if (poll_item[i].revents & ZMQ_POLLIN) {
ret = zmq_recv(receiver[i] ...
...
}
}
// using zmq_connect implements I/O multiplexing
vector<string> zmq_url/* */;
int port_num/* */;
void *context = zmq_ctx_new();
void *receiver = zmq_socket(context, ZMQ_PULL);
zmq_setsockopt(receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int));
int timeout = 1000; // milliseconds
int ret;
for (int i = 0; i < port_num; ++i) {
ret = zmq_connect(receiver, zmq_url[i].c_str());
assert(0 == ret);
}
ret = zmq_recv(receiver[i] ...
...