eventlet.green.zmq
– ØMQ support¶
pyzmq
[1] is a python binding to the C++ ØMQ [2] library written in Cython [3].
eventlet.green.zmq
is greenthread aware version of pyzmq.
The zmq
module wraps the Socket
and Context
found in pyzmq
to be non blocking.
- class eventlet.green.zmq.Context(io_threads: int = 1)¶
- class eventlet.green.zmq.Context(io_threads: Context)
- class eventlet.green.zmq.Context(*, shadow: Context | int)
Bases:
Context
Subclass of
zmq.Context
- class eventlet.green.zmq.Socket(context, socket_type)¶
Bases:
Socket
Green version of :class:
zmq.core.socket.Socket
.- The following three methods are always overridden:
send
recv
getsockopt
To ensure that the
zmq.NOBLOCK
flag is set and that sending or receiving is deferred to the hub (using :func:eventlet.hubs.trampoline
) if azmq.EAGAIN
(retry) error is raised.- For some socket types, the following methods are also overridden:
send_multipart
recv_multipart
- recv(flags=0, copy=True, track=False)¶
Receive a message.
With flags=NOBLOCK, this raises
ZMQError
if no messages have arrived; otherwise, this waits until a message arrives. SeePoller
for more general non-blocking I/O.Parameters¶
- flagsint
0 or NOBLOCK.
- copybool
Should the message be received in a copying or non-copying manner? If False a Frame object is returned, if True a string copy of message is returned.
- trackbool
Should the message be tracked for notification that ZMQ has finished with it? (ignored if copy=True)
Returns¶
- msgbytes or Frame
The received message frame. If copy is False, then it will be a Frame, otherwise it will be bytes.
Raises¶
- ZMQError
for any of the reasons zmq_msg_recv might fail (including if NOBLOCK is set and no new messages have arrived).
- send(msg, flags=0, copy=True, track=False)¶
Send a single zmq message frame on this socket.
This queues the message to be sent by the IO thread at a later time.
With flags=NOBLOCK, this raises
ZMQError
if the queue is full; otherwise, this waits until space is available. SeePoller
for more general non-blocking I/O.Parameters¶
- databytes, Frame, memoryview
The content of the message. This can be any object that provides the Python buffer API (i.e. memoryview(data) can be called).
- flagsint
0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
- copybool
Should the message be sent in a copying or non-copying manner.
- trackbool
Should the message be tracked for notification that ZMQ has finished with it? (ignored if copy=True)
- routing_idint
For use with SERVER sockets
- groupstr
For use with RADIO sockets
Returns¶
- Noneif copy or not track
None if message was sent, raises an exception otherwise.
- MessageTrackerif track and not copy
a MessageTracker object, whose done property will be False until the send is completed.
Raises¶
- TypeError
If a unicode object is passed
- ValueError
If track=True, but an untracked Frame is passed.
- ZMQError
If the send does not succeed for any reason (including if NOBLOCK is set and the outgoing queue is full).
Changed in version 17.0: DRAFT support for routing_id and group arguments.
- bind(addr)¶
Bind the socket to an address.
This causes the socket to listen on a network port. Sockets on the other side of this connection will use
Socket.connect(addr)
to connect to this socket.Returns a context manager which will call unbind on exit.
Added in version 20.0: Can be used as a context manager.
Added in version 26.0: binding to port 0 can be used as a context manager for binding to a random port. The URL can be retrieved as socket.last_endpoint.
Parameters¶
- addrstr
The address string. This has the form ‘protocol://interface:port’, for example ‘tcp://127.0.0.1:5555’. Protocols supported include tcp, udp, pgm, epgm, inproc and ipc. If the address is unicode, it is encoded to utf-8 first.
- bind_to_random_port(addr: str, min_port: int = 49152, max_port: int = 65536, max_tries: int = 100) int ¶
Bind this socket to a random port in a range.
If the port range is unspecified, the system will choose the port.
Parameters¶
- addrstr
The address string without the port to pass to
Socket.bind()
.- min_portint, optional
The minimum port in the range of ports to try (inclusive).
- max_portint, optional
The maximum port in the range of ports to try (exclusive).
- max_triesint, optional
The maximum number of bind attempts to make.
Returns¶
- portint
The port the socket was bound to.
Raises¶
- ZMQBindError
if max_tries reached before successful bind
- close(linger=None)¶
Close the socket.
If linger is specified, LINGER sockopt will be set prior to closing.
Note: closing a zmq Socket may not close the underlying sockets if there are undelivered messages. Only after all messages are delivered or discarded by reaching the socket’s LINGER timeout (default: forever) will the underlying sockets be closed.
This can be called to close the socket by hand. If this is not called, the socket will automatically be closed when it is garbage collected, in which case you may see a ResourceWarning about the unclosed socket.
- closed¶
Whether the socket is closed
- connect(addr)¶
Connect to a remote 0MQ socket.
Returns a context manager which will call disconnect on exit.
Added in version 20.0: Can be used as a context manager.
Parameters¶
- addrstr
The address string. This has the form ‘protocol://interface:port’, for example ‘tcp://127.0.0.1:5555’. Protocols supported are tcp, udp, pgm, inproc and ipc. If the address is unicode, it is encoded to utf-8 first.
- disable_monitor() None ¶
Shutdown the PAIR socket (created using get_monitor_socket) that is serving socket events.
Added in version 14.4.
- disconnect(addr)¶
Disconnect from a remote 0MQ socket (undoes a call to connect).
Added in version libzmq-3.2.
Added in version 13.0.
Parameters¶
- addrstr
The address string. This has the form ‘protocol://interface:port’, for example ‘tcp://127.0.0.1:5555’. Protocols supported are tcp, udp, pgm, inproc and ipc. If the address is unicode, it is encoded to utf-8 first.
- fileno() int ¶
Return edge-triggered file descriptor for this socket.
This is a read-only edge-triggered file descriptor for both read and write events on this socket. It is important that all available events be consumed when an event is detected, otherwise the read event will not trigger again.
Added in version 17.0.
- get(option: C.int)¶
Get the value of a socket option.
See the 0MQ API documentation for details on specific options.
Parameters¶
- optionint
The option to get. Available values will depend on your version of libzmq. Examples include:
zmq.IDENTITY, HWM, LINGER, FD, EVENTS
Returns¶
- optvalint or bytes
The value of the option as a bytestring or int.
- get_hwm() int ¶
Get the High Water Mark.
On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
- get_monitor_socket(events: int | None = None, addr: str | None = None) _SocketType ¶
Return a connected PAIR socket ready to receive the event notifications.
Added in version libzmq-4.0.
Added in version 14.0.
Parameters¶
- eventsint
default: zmq.EVENT_ALL The bitmask defining which events are wanted.
- addrstr
The optional endpoint for the monitoring sockets.
Returns¶
- socketzmq.Socket
The PAIR socket, connected and ready to receive messages.
- get_string(option: int, encoding='utf-8') str ¶
Get the value of a socket option.
See the 0MQ documentation for details on specific options.
Parameters¶
- optionint
The option to retrieve.
Returns¶
- optvalstr
The value of the option as a unicode string.
- getsockopt(option)¶
Get the value of a socket option.
See the 0MQ API documentation for details on specific options.
Parameters¶
- optionint
The option to get. Available values will depend on your version of libzmq. Examples include:
zmq.IDENTITY, HWM, LINGER, FD, EVENTS
Returns¶
- optvalint or bytes
The value of the option as a bytestring or int.
- getsockopt_string(option: int, encoding='utf-8') str ¶
Get the value of a socket option.
See the 0MQ documentation for details on specific options.
Parameters¶
- optionint
The option to retrieve.
Returns¶
- optvalstr
The value of the option as a unicode string.
- getsockopt_unicode(option: int, encoding='utf-8') str ¶
Get the value of a socket option.
See the 0MQ documentation for details on specific options.
Parameters¶
- optionint
The option to retrieve.
Returns¶
- optvalstr
The value of the option as a unicode string.
- property hwm: int¶
Property for High Water Mark.
Setting hwm sets both SNDHWM and RCVHWM as appropriate. It gets SNDHWM if available, otherwise RCVHWM.
- join(group)¶
Join a RADIO-DISH group
Only for DISH sockets.
libzmq and pyzmq must have been built with ZMQ_BUILD_DRAFT_API
Added in version 17.
- leave(group)¶
Leave a RADIO-DISH group
Only for DISH sockets.
libzmq and pyzmq must have been built with ZMQ_BUILD_DRAFT_API
Added in version 17.
- monitor(addr, events: C.int = 65535)¶
Start publishing socket events on inproc. See libzmq docs for zmq_monitor for details.
While this function is available from libzmq 3.2, pyzmq cannot parse monitor messages from libzmq prior to 4.0.
Parameters¶
- addrstr
The inproc url used for monitoring. Passing None as the addr will cause an existing socket monitor to be deregistered.
- eventsint
default: zmq.EVENT_ALL The zmq event bitmask for which events will be sent to the monitor.
- poll(timeout: int | None = None, flags: int = <PollEvent.POLLIN: 1>) int ¶
Poll the socket for events.
See
Poller
to wait for multiple sockets at once.Parameters¶
- timeoutint
The timeout (in milliseconds) to wait for an event. If unspecified (or specified None), will wait forever for an event.
- flagsint
default: POLLIN. POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for.
Returns¶
- event_maskint
The poll event mask (POLLIN, POLLOUT), 0 if the timeout was reached without an event.
- recv(flags=0, copy=True, track=False)¶
Receive a message.
With flags=NOBLOCK, this raises
ZMQError
if no messages have arrived; otherwise, this waits until a message arrives. SeePoller
for more general non-blocking I/O.Parameters¶
- flagsint
0 or NOBLOCK.
- copybool
Should the message be received in a copying or non-copying manner? If False a Frame object is returned, if True a string copy of message is returned.
- trackbool
Should the message be tracked for notification that ZMQ has finished with it? (ignored if copy=True)
Returns¶
- msgbytes or Frame
The received message frame. If copy is False, then it will be a Frame, otherwise it will be bytes.
Raises¶
- ZMQError
for any of the reasons zmq_msg_recv might fail (including if NOBLOCK is set and no new messages have arrived).
- recv_json(flags=0, **kwargs)¶
Receive a Python object as a message using json to serialize.
Keyword arguments are passed on to json.loads
Parameters¶
- flagsint
Any valid flags for
Socket.recv()
.
Returns¶
- objPython object
The Python object that arrives as a message.
Raises¶
- ZMQError
for any of the reasons
recv()
might fail
- recv_multipart(flags=0, copy=True, track=False)¶
Receive a multipart message as a list of bytes or Frame objects
Parameters¶
- flagsint, optional
Any valid flags for
Socket.recv()
.- copybool, optional
Should the message frame(s) be received in a copying or non-copying manner? If False a Frame object is returned for each part, if True a copy of the bytes is made for each frame.
- trackbool, optional
Should the message frame(s) be tracked for notification that ZMQ has finished with it? (ignored if copy=True)
Returns¶
- msg_partslist
A list of frames in the multipart message; either Frames or bytes, depending on copy.
Raises¶
- ZMQError
for any of the reasons
recv()
might fail
- recv_pyobj(flags=0)¶
Receive a Python object as a message using pickle to serialize.
Parameters¶
- flagsint
Any valid flags for
Socket.recv()
.
Returns¶
- objPython object
The Python object that arrives as a message.
Raises¶
- ZMQError
for any of the reasons
recv()
might fail
- recv_serialized(deserialize, flags=0, copy=True)¶
Receive a message with a custom deserialization function.
Added in version 17.
Parameters¶
- deserializecallable
The deserialization function to use. deserialize will be called with one argument: the list of frames returned by recv_multipart() and can return any object.
- flagsint, optional
Any valid flags for
Socket.recv()
.- copybool, optional
Whether to recv bytes or Frame objects.
Returns¶
- objobject
The object returned by the deserialization function.
Raises¶
- ZMQError
for any of the reasons
recv()
might fail
- recv_string(flags=0, encoding='utf-8')¶
Receive a unicode string, as sent by send_string.
Parameters¶
- flagsint
Any valid flags for
Socket.recv()
.- encodingstr
The encoding to be used
Returns¶
- sstr
The Python unicode string that arrives as encoded bytes.
Raises¶
- ZMQError
for any of the reasons
Socket.recv()
might fail
- recv_unicode(flags: int = 0, encoding: str = 'utf-8') str ¶
Receive a unicode string, as sent by send_string.
Parameters¶
- flagsint
Any valid flags for
Socket.recv()
.- encodingstr
The encoding to be used
Returns¶
- sstr
The Python unicode string that arrives as encoded bytes.
Raises¶
- ZMQError
for any of the reasons
Socket.recv()
might fail
- send(msg, flags=0, copy=True, track=False)¶
Send a single zmq message frame on this socket.
This queues the message to be sent by the IO thread at a later time.
With flags=NOBLOCK, this raises
ZMQError
if the queue is full; otherwise, this waits until space is available. SeePoller
for more general non-blocking I/O.Parameters¶
- databytes, Frame, memoryview
The content of the message. This can be any object that provides the Python buffer API (i.e. memoryview(data) can be called).
- flagsint
0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
- copybool
Should the message be sent in a copying or non-copying manner.
- trackbool
Should the message be tracked for notification that ZMQ has finished with it? (ignored if copy=True)
- routing_idint
For use with SERVER sockets
- groupstr
For use with RADIO sockets
Returns¶
- Noneif copy or not track
None if message was sent, raises an exception otherwise.
- MessageTrackerif track and not copy
a MessageTracker object, whose done property will be False until the send is completed.
Raises¶
- TypeError
If a unicode object is passed
- ValueError
If track=True, but an untracked Frame is passed.
- ZMQError
If the send does not succeed for any reason (including if NOBLOCK is set and the outgoing queue is full).
Changed in version 17.0: DRAFT support for routing_id and group arguments.
- send_json(obj, flags=0, **kwargs)¶
Send a Python object as a message using json to serialize.
Keyword arguments are passed on to json.dumps
Parameters¶
- objPython object
The Python object to send
- flagsint
Any valid flags for
Socket.send()
- send_multipart(msg_parts, flags=0, copy=True, track=False)¶
Send a sequence of buffers as a multipart message.
The zmq.SNDMORE flag is added to all msg parts before the last.
Parameters¶
- msg_partsiterable
A sequence of objects to send as a multipart message. Each element can be any sendable object (Frame, bytes, buffer-providers)
- flagsint, optional
Any valid flags for
Socket.send()
. SNDMORE is added automatically for frames before the last.- copybool, optional
Should the frame(s) be sent in a copying or non-copying manner. If copy=False, frames smaller than self.copy_threshold bytes will be copied anyway.
- trackbool, optional
Should the frame(s) be tracked for notification that ZMQ has finished with it (ignored if copy=True).
Returns¶
None : if copy or not track MessageTracker : if track and not copy
a MessageTracker object, whose done property will be False until the last send is completed.
- send_pyobj(obj, flags=0, protocol=2)¶
Send a Python object as a message using pickle to serialize.
Parameters¶
- objPython object
The Python object to send.
- flagsint
Any valid flags for
Socket.send()
.- protocolint
The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL where defined, and pickle.HIGHEST_PROTOCOL elsewhere.
- send_serialized(msg, serialize, flags=0, copy=True, **kwargs)¶
Send a message with a custom serialization function.
Added in version 17.
Parameters¶
msg : The message to be sent. Can be any object serializable by serialize. serialize : callable
The serialization function to use. serialize(msg) should return an iterable of sendable message frames (e.g. bytes objects), which will be passed to send_multipart.
- flagsint, optional
Any valid flags for
Socket.send()
.- copybool, optional
Whether to copy the frames.
- send_string(u, flags=0, copy=True, encoding='utf-8')¶
Send a Python unicode string as a message with an encoding.
0MQ communicates with raw bytes, so you must encode/decode text (str) around 0MQ.
Parameters¶
- ustr
The unicode string to send.
- flagsint, optional
Any valid flags for
Socket.send()
.- encodingstr
The encoding to be used
- send_unicode(u: str, flags: int = 0, copy: bool = True, encoding: str = 'utf-8', **kwargs) Frame | None ¶
Send a Python unicode string as a message with an encoding.
0MQ communicates with raw bytes, so you must encode/decode text (str) around 0MQ.
Parameters¶
- ustr
The unicode string to send.
- flagsint, optional
Any valid flags for
Socket.send()
.- encodingstr
The encoding to be used
- set(option: C.int, optval)¶
Set socket options.
See the 0MQ API documentation for details on specific options.
Parameters¶
- optionint
The option to set. Available values will depend on your version of libzmq. Examples include:
zmq.SUBSCRIBE, UNSUBSCRIBE, IDENTITY, HWM, LINGER, FD
- optvalint or bytes
The value of the option to set.
Notes¶
Warning
All options other than zmq.SUBSCRIBE, zmq.UNSUBSCRIBE and zmq.LINGER only take effect for subsequent socket bind/connects.
- set_hwm(value: int) None ¶
Set the High Water Mark.
On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
Warning
New values only take effect for subsequent socket bind/connects.
- set_string(option: int, optval: str, encoding='utf-8') None ¶
Set socket options with a unicode object.
This is simply a wrapper for setsockopt to protect from encoding ambiguity.
See the 0MQ documentation for details on specific options.
Parameters¶
- optionint
The name of the option to set. Can be any of: SUBSCRIBE, UNSUBSCRIBE, IDENTITY
- optvalstr
The value of the option to set.
- encodingstr
The encoding to be used, default is utf8
- setsockopt(option: C.int, optval)¶
Set socket options.
See the 0MQ API documentation for details on specific options.
Parameters¶
- optionint
The option to set. Available values will depend on your version of libzmq. Examples include:
zmq.SUBSCRIBE, UNSUBSCRIBE, IDENTITY, HWM, LINGER, FD
- optvalint or bytes
The value of the option to set.
Notes¶
Warning
All options other than zmq.SUBSCRIBE, zmq.UNSUBSCRIBE and zmq.LINGER only take effect for subsequent socket bind/connects.
- setsockopt_string(option: int, optval: str, encoding='utf-8') None ¶
Set socket options with a unicode object.
This is simply a wrapper for setsockopt to protect from encoding ambiguity.
See the 0MQ documentation for details on specific options.
Parameters¶
- optionint
The name of the option to set. Can be any of: SUBSCRIBE, UNSUBSCRIBE, IDENTITY
- optvalstr
The value of the option to set.
- encodingstr
The encoding to be used, default is utf8
- setsockopt_unicode(option: int, optval: str, encoding='utf-8') None ¶
Set socket options with a unicode object.
This is simply a wrapper for setsockopt to protect from encoding ambiguity.
See the 0MQ documentation for details on specific options.
Parameters¶
- optionint
The name of the option to set. Can be any of: SUBSCRIBE, UNSUBSCRIBE, IDENTITY
- optvalstr
The value of the option to set.
- encodingstr
The encoding to be used, default is utf8
- classmethod shadow(address: int | Socket) _SocketType ¶
Shadow an existing libzmq socket
address is a zmq.Socket or an integer (or FFI pointer) representing the address of the libzmq socket.
Added in version 14.1.
Added in version 25: Support for shadowing zmq.Socket objects, instead of just integer addresses.
- subscribe(topic: str | bytes) None ¶
Subscribe to a topic
Only for SUB sockets.
Added in version 15.3.
- unbind(addr)¶
Unbind from an address (undoes a call to bind).
Added in version libzmq-3.2.
Added in version 13.0.
Parameters¶
- addrstr
The address string. This has the form ‘protocol://interface:port’, for example ‘tcp://127.0.0.1:5555’. Protocols supported are tcp, udp, pgm, inproc and ipc. If the address is unicode, it is encoded to utf-8 first.
- underlying¶
The address of the underlying libzmq socket