ZeroMQ
ZeroMQ
ZeroMQ (0MQ ZMQ) is a minimal Message-oriented middleware (MOM)
zeromq - Distributed Computing Made Simple - http://zeromq.org/
Installation
python-zmq - Python bindings for 0MQ library
Ubuntu/Debian/Raspberry Pi:
apt-get install python-zmq
apt-get install python-dev pip install pyzmq
Redhat: [1]
# redhat 5 wget http://download.opensuse.org/repositories/home:/fengshuo:/zeromq/CentOS_CentOS-5/home:fengshuo:zeromq.repo -O /etc/yum.repos.d/zeromq.repo # redhat 6 wget http://download.opensuse.org/repositories/home:/fengshuo:/zeromq/CentOS_CentOS-6/home:fengshuo:zeromq.repo -O /etc/yum.repos.d/zeromq.repo
yum install zeromq yum install zeromq-devel # needed for pyzmq
pip install pyzmq
Manual Installation
Note: I couldn't get it to work.
Dependencies: libtool, autoconf, automake, e2fsprogs
Library install:
mkdir -p ~/.src ; cd ~/.src wget http://download.zeromq.org/zeromq-4.0.3.tar.gz tar -zvxf zeromq-4.0.3.tar.gz cd zeromq-4.0.3 ./configure make sudo make install sudo ldconfig
--
Python binding install: [2]
easy_install pyzmq # or pip install pyzmq
---
Alternate path
# ... tar -zvxf zeromq-4.0.3.tar.gz cd zeromq-4.0.3 ./configure --prefix=/opt/zeromq make clean make sudo make install sudo /sbin/ldconfig -v -n /opt/zeromq/lib/ # creates generic links
PKG_CONFIG_PATH=/opt/zeromq/lib/pkgconfig/ sudo pip install --upgrade pyzmq
Python Guide
ØMQ - The Guide - ØMQ - The Guide - http://zguide.zeromq.org/py:all
Examples
git clone --depth=1 git://github.com/imatix/zguide.git
eBooks
ZeroMQ by Pieter Hintjens
ZeroMQ By Pieter Hintjens - O'Reilly Media - http://shop.oreilly.com/product/0636920026136.do
"Dive into ZeroMQ, the smart socket library that gives you fast, easy, message-based concurrency for your applications. With this quick-paced guide, you'll learn hands-on how to use this scalable, lightweight, and highly flexible networking tool for exchanging messages among clusters, the cloud, and other multi-system environments.
ZeroMQ maintainer Pieter Hintjens takes you on a tour of real-world applications, using extended examples in C to help you work with ZeroMQ's API, sockets, and patterns. Learn how to use specific ZeroMQ programming techniques, build multithreaded applications, and create your own messaging architectures. You'll discover how ZeroMQ works with several programming languages and most operating systems - with little or no cost."
ZeroMQ - Free Download eBook - pdf http://filepi.com/i/qo5Uv2G
ZeroMQ by Faruk Akgul
"ØMQ (also spelled ZeroMQ, 0MQ, or ZMQ) is a high-performance asynchronous messaging library aimed at use in scalable distributed or concurrent applications. It provides a message queue, but unlike message-oriented middleware, a ØMQ system can run without a dedicated message broker. The library is designed to have a familiar socket-style API.
ZeroMQ teaches you to use ZeroMQ through examples in C programming language. You will learn how to use fundamental patterns of message / queuing with a step-by-step tutorial approach and how to apply them. Then, you'll learn how to use high level APIs and to work with multiple sockets and multithreaded programs through many examples."
Download:
- ZeroMQ - Free Download eBook - pdf - http://it-ebooks.info/book/2780/
- ZeroMQ pdf | EBook Free Download - http://ebooknew.net/2013/09/zeromq-pdf.html
- http://dl.e-book-free.com/2013/07/zeromq.pdf
Patterns
The built-in core ØMQ patterns are:
- Request-reply, which connects a set of clients to a set of services. This is a remote procedure call and task distribution pattern.
- Pub-sub, which connects a set of publishers to a set of subscribers. This is a data distribution pattern.
- Pipeline, which connects nodes in a fan-out/fan-in pattern that can have multiple steps and loops. This is a parallel task distribution and collection pattern.
- Exclusive pair, which connects two sockets exclusively. This is a pattern for connecting two threads in a process, not to be confused with "normal" pairs of sockets.
Request-Reply
REQuest REPly
This is the request-reply pattern, probably the simplest way to use ØMQ. It maps to RPC and the classic client/server model.
Warning: Must be synchronized. Both send and receive block. You cannot send if there is something waiting on the receive queue (or vice a versa)! Each request must be met with a reply.
"The REQ-REP socket pair is in lockstep. The client issues zmq_send() and then zmq_recv(), in a loop (or once if that's all it needs). Doing any other sequence (e.g., sending two messages in a row) will result in a return code of -1 from the send or recv call. Similarly, the service issues zmq_recv() and then zmq_send() in that order, as often as it needs to."
zmq_hello_server.py :
import zmq import time context = zmq.Context() print "starting server..." socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") print "server started." while True: print "waiting for data..." message = socket.recv() print "recv:", message print "send:", message socket.send(message)
zmq_hello_client.py :
import zmq import time context = zmq.Context() print "connecting to server..." socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") print "connected to server." i = 1 while True: i = i + 1 print "send:", i socket.send(str(i)) message = socket.recv() print "message:", message time.sleep(1)
Publish-Subscribe
One way to many communication.
Error if you try to send to a SUB socket, or receive on a PUB socket.
The PUB-SUB socket pair is asynchronous. Messages will queue up if you don't receive them fast enough.
Subscribe - "Note that when you use a SUB socket you must set a subscription using zmq_setsockopt() and SUBSCRIBE, as in this code. If you don't set any subscription, you won't get any messages. It's a common mistake for beginners. The subscriber can set many subscriptions, which are added together. That is, if an update matches ANY subscription, the subscriber receives it. The subscriber can also cancel specific subscriptions. A subscription is often, but not necessarily a printable string."
Subscribe Filter - "The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter. An empty option_value of length zero shall subscribe to all incoming messages. A non-empty option_value shall subscribe to all messages beginning with the specified prefix. Multiple filters may be attached to a single ZMQ_SUB socket, in which case a message shall be accepted if it matches at least one filter."
Binding - "In theory with ØMQ sockets, it does not matter which end connects and which end binds. However, in practice there are undocumented differences that I'll come to later. For now, bind the PUB and connect the SUB, unless your network design makes that impossible."
Slow joiner - "Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends."
Synchronize - "In Chapter 2 - Sockets and Patterns we'll explain how to synchronize a publisher and subscribers so that you don't start to publish data until the subscribers really are connected and ready. The alternative to synchronization is to simply assume that the published data stream is infinite and has no start and no end. One also assumes that the subscriber doesn't care what transpired before it started up."
zmq_pub_server.py
# # Weather update server # Binds PUB socket to tcp://*:5556 # Publishes random weather updates # import zmq from random import randrange context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5556") while True: zipcode = randrange(1, 100000) temperature = randrange(1, 215) - 80 relhumidity = randrange(1, 50) + 10 socket.send("%d %d %d" % (zipcode, temperature, relhumidity))
zmq_pub_client.py:
# # Weather update client # Connects SUB socket to tcp://localhost:5556 # Collects weather updates and finds avg temp in zipcode # import sys import zmq # Socket to talk to server context = zmq.Context() socket = context.socket(zmq.SUB) print "Collecting updates from weather server…" socket.connect("tcp://localhost:5556") # Subscribe to zipcode, default is NYC, 10001 zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10001" socket.setsockopt(zmq.SUBSCRIBE, zip_filter) # Process 5 updates total_temp = 0 for update_nbr in range(5): string = socket.recv() zipcode, temperature, relhumidity = string.split() total_temp += int(temperature) print "Average temperature for zipcode '%s' was %dF" % ( zip_filter, total_temp / update_nbr)
---
This shows a filter that collects data from two different remote publishers and sends it to local subscribers: [3]
import zmq import time context = zmq.Context() subscriber = context.socket (zmq.SUB) subscriber.connect ("tcp://192.168.55.112:5556") subscriber.connect ("tcp://192.168.55.201:7721") subscriber.setsockopt (zmq.SUBSCRIBE, "NASDAQ") publisher = context.socket (zmq.PUB) publisher.bind ("ipc://nasdaq-feed") while True: message = subscriber.recv() publisher.send (message)
--
# Subscribe on everything frontend.setsockopt(zmq.SUBSCRIBE, )
Parallel Pipeline
Dive and Conquer - task distribution
PUSH (send) and PULL (recv) - both blocking on consumer/producer
Tasks will be divided out fairly among workers.
Our supercomputing application is a fairly typical parallel processing model. We have:
- A ventilator that produces tasks that can be done in parallel
- A set of workers that process tasks
- A sink that collects results back from the worker processes
Server:
import zmq import time context = zmq.Context() print "starting server..." socket = context.socket(zmq.PUB) socket.bind("tcp://*:5555") print "server started." i = 1 while True: try: print i socket.send(str(i)) i = i + 1 except zmq.error.ZMQError: pass
Client:
import zmq import time context = zmq.Context() print "connecting to server..." socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:5555") socket.setsockopt(zmq.SUBSCRIBE, '') print "connected to server." last = 0 while True: try: message = socket.recv() print "message:", message current = int(message) if current != last + 1: print "err count", current, last last = current except zmq.error.ZMQError as err: pass time.sleep(1)
code
version
$ python -c 'import zmq ; print zmq.zmq_version()' 4.0.4
non blocking receive
while True: try: rc = receiver.recv(zmq.DONTWAIT) except zmq.ZMQError: break # do work ... # No activity, so sleep for 1 msec time.sleep(0.001)
polling
# Initialize poll set poller = zmq.Poller() poller.register(receiver, zmq.POLLIN) poller.register(subscriber, zmq.POLLIN) while True: socks = dict(poller.poll()) #if receiver in socks and socks[receiver] == zmq.POLLIN: if socks.get(receiver) == zmq.POLLIN: message = receiver.recv() # do work ...
Poll Timeout:
poller.poll(5000) # 5 second timeout
Simpler check, if only one poller:
evts = poller.poll() if evts: message = receiver.recv()
Tutorials
Nicholas Piël » ZeroMQ an introduction - http://nichol.as/zeromq-an-introduction
- "ZeroMQ is a messaging library, which allows you to design a complex communication system without much effort. It has been wrestling with how to effectively describe itself in the recent years. In the beginning it was introduced as ‘messaging middleware’ later they moved to ‘TCP on steroids’ and right now it is a ‘new layer on the networking stack’."
Message Queue Comparisons
Message Queue Evaluation Notes - Second Life Wiki - http://wiki.secondlife.com/wiki/Message_Queue_Evaluation_Notes
Failure Philosophy
"ØMQ's error handling philosophy is a mix of fail-fast and resilience. Processes, we believe, should be as vulnerable as possible to internal errors, and as robust as possible against external attacks and errors. To give an analogy, a living cell will self-destruct if it detects a single internal error, yet it will resist attack from the outside by all means possible.
Assertions, which pepper the ØMQ code, are absolutely vital to robust code; they just have to be on the right side of the cellular wall. And there should be such a wall. If it is unclear whether a fault is internal or external, that is a design flaw to be fixed." [4]
Sender Receiver with Kill
# Adds pub-sub flow to receive and respond to kill signal import sys import time import zmq context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") # Socket to send messages to sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") # Socket for control input controller = context.socket(zmq.SUB) controller.connect("tcp://localhost:5559") controller.setsockopt(zmq.SUBSCRIBE, "") # Process messages from receiver and controller poller = zmq.Poller() poller.register(receiver, zmq.POLLIN) poller.register(controller, zmq.POLLIN) # Process messages from both sockets while True: socks = dict(poller.poll()) if socks.get(receiver) == zmq.POLLIN: message = receiver.recv() # Process task workload = int(message) # Workload in msecs # Do the work time.sleep(workload / 1000.0) # Send results to sink sender.send(message) # Simple progress indicator for the viewer sys.stdout.write(".") sys.stdout.flush() # Any waiting controller command acts as 'KILL' if socks.get(controller) == zmq.POLLIN: break
Handle Ctrl-C
# # Shows how to handle Ctrl-C # import zmq import signal interrupted = False def signal_handler(signum, frame): global interrupted interrupted = True context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5558") # SIGINT will normally raise a KeyboardInterrupt, just like any other Python call try: socket.recv() except KeyboardInterrupt: print "W: interrupt received, proceeding…" # or you can use a custom handler counter = 0 signal.signal(signal.SIGINT, signal_handler) while True: try: message = socket.recv(zmq.DONTWAIT) except zmq.ZMQError: pass counter += 1 if interrupted: print "W: interrupt received, killing server…" break
If your code is blocking in a blocking call (sending a message, receiving a message, or polling), then when a signal arrives, the call will return with EINTR.
Multithreading with ØMQ
Multithreading (MT) Programming
If there's one lesson we've learned from 30+ years of concurrent programming, it is: just don't share state. It's like two drunkards trying to share a beer. It doesn't matter if they're good buddies. Sooner or later, they're going to get into a fight. And the more drunkards you add to the table, the more they fight each other over the beer. The tragic majority of MT applications look like drunken bar fights.
You should follow some rules to write happy multithreaded code with ØMQ:
- Isolate data privately within its thread and never share data in multiple threads. The only exception to this are ØMQ contexts, which are threadsafe.
- Stay away from the classic concurrency mechanisms like as mutexes, critical sections, semaphores, etc. These are an anti-pattern in ØMQ applications.
- Create one ØMQ context at the start of your process, and pass that to all threads that you want to connect via inproc sockets.
- Use attached threads to create structure within your application, and connect these to their parent threads using PAIR sockets over inproc. The pattern is: bind parent socket, then create child thread which connects its socket.
- Use detached threads to simulate independent tasks, with their own contexts. Connect these over tcp. Later you can move these to stand-alone processes without changing the code significantly.
- All interaction between threads happens as ØMQ messages, which you can define more or less formally.
- Don't share ØMQ sockets between threads. ØMQ sockets are not threadsafe. Technically it's possible to migrate a socket from one thread to another but it demands skill. The only place where it's remotely sane to share sockets between threads are in language bindings that need to do magic like garbage collection on sockets.
Threading
The MT version of the Hello World service basically collapses the broker and workers into a single process:
""" Multithreaded Hello World server Author: Guillaume Aubert (gaubert) <guillaume(dot)aubert(at)gmail(dot)com> """ import time import threading import zmq def worker_routine(worker_url, context): """ Worker routine """ # Socket to talk to dispatcher socket = context.socket(zmq.REP) socket.connect(worker_url) while True: string = socket.recv() print("Received request: [%s]\n" % (string)) # do some 'work' time.sleep(1) #send reply back to client socket.send("World") def main(): """ server routine """ url_worker = "inproc://workers" url_client = "tcp://*:5555" # Prepare our context and sockets context = zmq.Context(1) # Socket to talk to clients clients = context.socket(zmq.ROUTER) clients.bind(url_client) # Socket to talk to workers workers = context.socket(zmq.DEALER) workers.bind(url_worker) # Launch pool of worker threads for i in range(5): thread = threading.Thread(target=worker_routine, args=(url_worker, context, )) thread.start() zmq.device(zmq.QUEUE, clients, workers) # We never get here but clean up anyhow clients.close() workers.close() context.term() if __name__ == "__main__": main()
keywords
ZeroMQ 0MQ ZMQ MOM Message-oriented middleware