ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ZMQ Socket API

2021-05-25 19:34:22  阅读:311  来源: 互联网

标签:ZMQ Socket ctx API MQ 接字 zmq socket


文章目录

1. zmq_ctx_new(3)

Synopsis

void *zmq_ctx_new ();

Description

zmq_ctx_new() 函数创建了一个新的 ØMQ context。

thread safety

一个 ØMQ context 是线程安全的,可能在多个应用线程间共享,对于调用方而言不需要额外的锁。

Return value

如果成功,zmq_ctx_new()函数应当返回一个指向新创建的context的不透明handle。否则它将返回NULL,并设置errno的值。

Errors

这个函数未定义任何错误。

2. zmq_socket(3)

Synopsis

void *zmq_socket (void *context, int type);

Description

zmq_socket()函数使用指定的 context 将创建一个 ØMQ socket 并返回一个指向新创建socket的不透明 handle。类型参数指定套接字类型,它决定了在socket之上的通讯语义。
新创建的套接字的初始化状态是未绑定的,它与任何终端无关。为了建立一个消息流,一个socket必须首先使用zmq_connet()与至少一个终端建立连接,或者必须使用zmq_bind()创建一个终端来接收到来的连接。

较传统sockets的关键不同点

通常来说,传统的套接字提供面向连接的可靠字节流(SOCK_STREAM)或无连接的不可开数据报的同步接口。相较而言,ØMQ sockets 提供一个异步消息队列的抽象,依据使用的套接字类型有额外的队列语义。传统的套接字传输字节流或离散的数据报,而 ØMQ 套接字传输离散的消息。

ØMQ是异步的,这意味着物理连接的建立和离开,重新连接和有效交付的时间对用户来说是透明的,并且由ØMQ自己组织。此外,在peer无法接收消息的情况下,消息可能会被排队。

传统套接字仅允许严格意义上的one-to-one(two peers),many-to-one(many clinets, one server),或者在一些情况下的 one-to-many(multicast) 关系。除了ZMQ_PAIR,ØMQ 套接字可能使用zmq_connect()连接多个终端,同时,使用zmq_bind()接收多个终端到来的连接绑定到终端,因此允许many-to-many关系。

线程安全

ØMQ 有线程安全的套接字类型和非线程安全的套接字类型。应用不能在多线程中使用非线程安全的套接字类型,除非将套接字从一条线程迁移到另一条线程,并使用full fence内存屏障。

下述是线程安全的套接字:* ZMQ_CLIENT * ZMQ_SERVER * ZMQ_DISH * ZMQ_RADIO * ZMQ_SCATTER * ZMQ_GATHER

套接字类型

下节介绍ØMQ定义的套接字类型,按照通用的消息模式分类–由相关的套接字类型构建。

发布订阅模式
发布-订阅模式用于one-to-many的数据分发,以扇形发散的方式,从单个的发布者分发到所有的订阅者处。

ZMQ_PUB
ZMQ_PUB类型的套接字被发布者用于发布数据。消息以扇形发散的方式发送到所有的连接peers。zmq_recv(3)函数没有实现这个套接字的对应功能。

当 ZMQ_PUB 因为到达订阅者的高水位而进入静音状态。然后直到静音状态结束,任何将被发送到订阅者的消息都将被丢弃。

ZMQ_PUB特性
兼容的peer套接字ZMQ_SUB, ZMQ_XSUB
方向单向
发送/接收模式仅发送
进入路由策略N/A
外出路由策略Fan out
静音状态行为丢弃

ZMQ_SUB
ZMQ_SUB套接字类型,被一个订阅者用于订阅发布者分发的数据。起初ZMQ_SUB套接字没有订阅任何消息,使用 zmq_setsocketopt(3) 的 ZMQ_SUBSCRIBE 选项来指定要订阅哪些消息。对于ZMQ_SUB类型的套接字来说,zmq_send() 函数没有实现。

ZMQ_SUB 的特性总结
兼容的peer套接字ZMQ_PUB, ZMQ_XPUB
方向单向
发送/接收模式仅接收
进入路由策略Fair-queued
外出路由策略N/A

Example

使用 zmq_stream 创建一个简单的http server

void *ctx = zmq_ctx_new ();
assert (ctx);
/* Create ZMQ_STREAM socket */
void *socket = zmq_socket (ctx, ZMQ_STREAM);
assert (socket);
int rc = zmq_bind (socket, "tcp://*:8080");
assert (rc == 0);
/* Data structure to hold the ZMQ_STREAM routing id */
uint8_t routing_id [256];
size_t routing_id_size = 256;
/* Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
size_t raw_size = 256;
while (1) {
 	/* Get HTTP request; routing id frame and then request */
 	routing_id_size = zmq_recv (socket, routing_id, 256, 0);
 	assert (routing_id_size > 0);
	do {
		raw_size = zmq_recv (socket, raw, 256, 0);
		assert (raw_size >= 0);
	} while (raw_size == 256);
	/* Prepares the response */
	char http_response [] =
	"HTTP/1.0 200 OK\r\n"
	"Content-Type: text/plain\r\n"
	"\r\n"
	"Hello, World!";
	/* Sends the routing id frame followed by the response */
	zmq_send (socket, routing_id, routing_id_size, ZMQ_SNDMORE);
	zmq_send (socket, http_response, strlen (http_response), 0);
	/* Closes the connection by sending the routing id frame followed by a zero response */
	zmq_send (socket, routing_id, routing_id_size, ZMQ_SNDMORE);
	zmq_send (socket, 0, 0, 0);
}
zmq_close (socket); 
zmq_ctx_destroy (ctx);

3. zmq_close(3)

Synopsis

int zmq_close (void *socket);

Description

zmq_close()将会销毁socket参数引用的socket。任何从网络上接收到的还未被zmq_recv()接收的消息将会被丢弃。关于已经调用zmq_send()发送但是还没有被发送到网络上的消息的处理行为,取决于指定套接字的ZMQ_LINGER套接字选项的值

对每个socket,zmq_close()应当被显式的调用一次。如果它始终未被调用,zmq_ctx_term()将会被永远阻塞。如果对相同的socket调用多次,或者参数socket并没有指向一个socket,行为是未定义的。

ZMQ_LINGER 的默认设置不丢弃未发送的消息;这个行为可能会造成应用阻塞在`zmq_ctx_term()`的调用上。
更多细节查看`zmq_setsockopt(3)` 和 `zmq_ctx_term(3)`。
这个API将异步完成,所有当它返回后,不是所有的东西都被解分配了。

Return value

成功返回0,否则返回-1,并设置errno变量。

Errors

ENOTSOCK

  • 参数socket为NULL

4. zmq_bind(3)

Synopsis

int zmq_bind (void *socket, const char *endpoint);

Description

zmq_bind() 函数绑定socket到本地终端endpoint,然后在终端上接受到来的连接。

终端是一个字符串,格式为transport://address,transport指定底层使用的协议。address指定要绑定的指定传输方式的地址。
ØMQ提供下列传输方式:
tcp

  • 使用TCP的单播传输,见zmq_tcp(7)

ipc

  • 本地进程间通信通讯方式,见zmq_ipc(7)

inproc

  • 本地进程内(线程间)通讯方式,见zmq_inproc(7)

pgm, epgm

  • 使用PGM的可靠多播传输,见zmq_pgm(7)

vmci

  • 虚拟机器通讯接口(VMCI),见zmq_vmci(7)

除了 ZMQ_PAIR 外,每个套接字类型都支持one-to-many和many-to-one语义。明确的语义取决于套接字类型并被定义在zmq_socket(3)中。

ipc,tcp和vmci传输方式支持 wildcard address。

对于 `zmq_bind()` 和 `zmq_connect()`,地址语法可能稍有不同,特别是tcp,pgm和epgm传输方式。
在`zmq_bind()`后,socket进入静音状态(`mute state`),除非或直到有至少一个进入或外出的连接建立,
此时socket进入准备状态(`ready state`)。在静音状态中,依据socket类型,socket阻塞或者丢弃消息。
相较而言,在libzmq::zmq_connect[3]之后,socket进入准备状态(ready state)

Return value

zmq_bind()函数成功返回0。否则返回-1,并设置errno。

Errors

EINVAL

  • 提供的终端无效

EPROTONOSUPPORT

  • 请求运输方式不支持

ENOCOMPATPROTO

  • 请求的传输协议不兼容套接字类型

EADDRINUSE

  • 请求地址已经在使用中了

EADDRNOTAVAIL

  • 请求地址不是本地的

ENODEV

  • 请求地址指定不存在的接口

ETERM

  • 与指定的socket相关的ØMQ context被终止了

ENOTSOCK

  • 提供的套接字是非法的

EMTHREAD

  • 没有可用的I/O线程来完成任务

Example

绑定发布者套接字到一个in-process和tcp的transport。

/* Create a ZMQ_PUB socket */
void *socket = zmq_socket (context, ZMQ_PUB);
assert (socket);
/* Bind it to a in-process transport with the address 'my_publisher' */
int rc = zmq_bind (socket, "inproc://my_publisher");
assert (rc == 0);
/* Bind it to a TCP transport on port 5555 of the 'eth0' interface */
rc = zmq_bind (socket, "tcp://eth0:5555"); 
assert (rc == 0);

5. zmq_connect(3)

Synopsis

int zmq_connect (void *socket, const char *endpoint);

Description

zmq_connect()将套接字连接到一个终端上,然后在终端上接受到来的连接
终端字符串格式如下:transport://addresstransport指定底层使用的协议,address指定要连接的传输指定的地址。

ØMQ提供下列传输方式:
tcp

  • 使用TCP的单播传输,见zmq_tcp(7)

ipc

  • 本地进程间通信通讯方式,见zmq_ipc(7)

inproc

  • 本地进程内(线程间)通讯方式,见zmq_inproc(7)

pgm, epgm

  • 使用PGM的可靠多播传输,见zmq_pgm(7)

vmci

  • 虚拟机器通讯接口(VMCI),见zmq_vmci(7)

除了 ZMQ_PAIR 外,每个套接字类型都支持one-to-many和many-to-one语义。明确的语义取决于套接字类型并被定义在zmq_socket(3)中。

对大部分的传输和套接字类型而言,连接并非立即执行,而是依据 ØMQ 的需要执行。因此一次成功的
`zmq_connect` 函数的调用并不意味着连接已经或能够成功建立。因此,对大部分传输和套接字类型而言,
服务器执行绑定,客户端执行连接的顺序如何并不重要。`ZMQ_PAIR`套接字类型是个例外,因为它们不
会自动重连终端。
`zmq_connect()`之后,除了`ZMQ_ROUTER`类型外,其他套接字类型的套接字进入正常的就绪状态
(*ready* state)。仅当对peer的握手完成后,`ZMQ_ROUTE`套接字对一指定的peer进入它的正常就绪状态,
握手可能发生在任意时间。
对一些套接字类型,向相同的终端进行多重连接没有任何意义。对于这些套接字类型,任何尝试连接一个已经建立
连接的终端会被静默的忽略(比如,返回0)。这一行为应用于`ZMQ_DEALER`,`ZMQ_SUB`,`ZMQ_PUB`和`ZMQ_REQ`

Return value

zmq_bind()函数成功返回0。否则返回-1,并设置errno。

Errors

EINVAL

  • 提供的终端无效

EPROTONOSUPPORT

  • 请求运输方式不支持

ENOCOMPATPROTO

  • 请求的传输协议不兼容套接字类型

ETERM

  • 与指定的socket相关的ØMQ context被终止了

ENOTSOCK

  • 提供的套接字是非法的

EMTHREAD

  • 没有可用的I/O线程来完成任务

Example

连接订阅套接字到一个in-process和一个tcp传输。

/* Create a ZMQ_SUB socket */
void *socket = zmq_socket (context, ZMQ_SUB);
assert (socket);
/* Connect it to an in-process transport with the address 'my_publisher' */
int rc = zmq_connect (socket, "inproc://my_publisher");
assert (rc == 0);
/* Connect it to the host server001, port 5555 using a TCP transport */
rc = zmq_connect (socket, "tcp://server001:5555"); 
assert (rc == 0);

标签:ZMQ,Socket,ctx,API,MQ,接字,zmq,socket
来源: https://blog.csdn.net/qq_41278986/article/details/117220852

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有