dimanche 2 décembre 2018

Zmq concepts and application

Introduction

Zmq is a messaging library, unlike traditional messaging applications (like ActiveMq, RabbitMq, ..., etc); it offers a toolbox to create a messaging system (that's why it is a library).

Diving into MQTT protocol

Mqtt is a messaging protocol, it relies on binary format to exchange messages with low overhead. It is heavily used in embedded systems and IoT.

MQTT is based on publish-subscribe paradigm, it is completely asynchronous (unlike client-server model). There are three different actors that build this protocol :
  • Publisher : the part that produces information and organizes them into topics.
  • Broker : relays data generated by Publishers to registered subscribers. Publisher's (or Publishers) topics are known by the broker.
  • Subscriber : data consumer end-point. After registering to a topic (or multiple topics), it can receive messages for processing or visualization.
At a high level, We can compare MQTT to MVC model :
  1. Publishers : is the Model because it contains the data.
  2. Brokers : are the Controllers which maps data to the Vue.
  3. Subscribers : are the Vue which displays data.

Zmq working modes

Zmq can be used in different ways from which one can state :
  • REQ-REP : request-reply, in other words client server paradigm (synchronous operations).
  • PUB-SUB : Publish-Subscribe, based on MQTT protocol (asynchronous operations).
  • PUSH-PULL : or pipeline mode which distribute tasks from a PUSH node to multiple PULL nodes (concurrent processing).

Zmq PUB-SUB mode

We will focus on Publisher-Subscriber paradigm using Zmq. However, keep in mind that Zmq does not implement a broker (We are going to see how to create one).

Brokerless mode (default mode)

Zmq works in brokerless mode (zqm means zero broker and zero cost) and does not offer a central broker as classical MQTT applications but does offer ways to create it.

Setting Zmq in this mode is similar to client and server architecture as the image shown below :

Connecting multiple publishers and subscribers using Zmq forms a Star Topology that works well for small networks (but does not scale). It means also that Subscribers must know the Publishers (IP addresses and binding ports) to which they want to register.

Let's have a working example to illustrate zmq brokerless mode as shown in the image below :

The publisher binds to port 5580 and sends messages on topic "/test". on the other hand, the subscriber connects to publisher (subscriber must be aware of IP address and binding port of publisher) on topic "/test".

Remark : Either publisher or subscriber can start first (Zmq pauses the subscriber until the publisher can emit messages). In practice, try to start subscriber before publisher in order to get all messages.

We can have a look at the source codes written in PYTHON :
  • Publisher code:
    import zmq
    import time
    
    portPublisher = "5580"
    
    # create an zmq context
    context = zmq.Context()
    # create a publisher socket
    socket = context.socket(zmq.PUB)
    # Bind the socket at a predefined port  
    socket.bind("tcp://*:%s" % portPublisher)
    
    while True:
        # Set the topic on which to publish
        topicFilter = "/test"
        # create some data to send
        msgToPublish = "Hello world Zmq!"
    
        # print topic and data
        print("%s %s" % (topicFilter, msgToPublish))
        # send topic and message separated by comma 
        # (in order to parse them easily on subscriber side)    
        socket.send_string("%s,%s" % (topicFilter, msgToPublish))
        # slow down sending rate
        time.sleep(1)
    
  • Subscriber code :
    import sys
    import zmq
    
    portPublisher = "5580"
    
    # Create an zmq context
    context = zmq.Context()
    # create a subscriber socket 
    socket = context.socket(zmq.SUB)
    
    print("Waiting to connect to remote publisher...")
    
    # subscriber must connect to publisher
    socket.connect("tcp://localhost:%s" % portPublisher)
    
    
    # Choose a topic 
    topicFilter = "/test"
    socket.setsockopt_string(zmq.SUBSCRIBE, topicFilter)
    
    while True:
        # get received data
        dataReceived = socket.recv_string()
        # parse received data using comma separator
        topic, msgContent = dataReceived.split(",")
        # display topic name and message content 
        print("Topic : " + topic + " ==> Message content : " + msgContent)
    
We can run the above programs and see the result:

This mode works fine with few pub-sub devices but cannot scale efficiently, publishers binding (IP addresses and ports) must be known by subscribers in the system. For better performance and maintainability, We can use an intermediary device called broker (or sometimes proxy).

Building a Broker with zmq

Zmq broker can be built quiet easily, a broker (sometimes called proxy) will act as an intermediary device with two interfaces as shown :

We should point out few recommandation :
  • The XSUB interface must listen to the empty topic "" in order to get all traffic from all publishers.
  • The publisher must connect to the broker (unlike in broker mode where it was binding).
  • With/without broker Subscribers always connects
As a working exercise, one can build a broker quickly as follow :
  1. Proxy-Broker :
    import zmq
    
    # Creating zmq context
    context = zmq.Context()
    
    # Creating subX interface 
    subX = context.socket(zmq.SUB)
    subX.bind("tcp://*:5580")
    
    subX.setsockopt_string(zmq.SUBSCRIBE, "")
    
    # Creating the pubX interface
    pubX = context.socket(zmq.PUB)
    pubX.bind("tcp://*:5581")
    
    print("Proxy is active!")
    # connect subX and pubX (creating the proxy)
    zmq.device(zmq.FORWARDER, subX, pubX)
    
    
    # close and free resources
    subX.close()
    pubX.close()
    context.term()
    
  2. Publisher :
    import zmq
    import time
    
    portSubscriberX = "5580"
    
    # create an zmq context
    context = zmq.Context()
    # create a publisher socket
    socket = context.socket(zmq.PUB)
    # COnnect the socket to SUBX (broker interface)  
    socket.connect("tcp://localhost:%s" % portSubscriberX)
    
    while True:
        # Set the topic on which to publish
        topicFilter = "/test"
        # create some data to send
        msgToPublish = "Hello world Zmq!"
    
        # print topic and data
        print("%s %s" % (topicFilter, msgToPublish))
        # send topic and message separated by comma 
        # (in order to parse them easily on subscriber side)    
        socket.send_string("%s,%s" % (topicFilter, msgToPublish))
        # slow down sending rate
        time.sleep(1)
    
  3. Subscriber :
    import sys
    import zmq
    
    portPublisherX = "5581"
    
    # Create an zmq context
    context = zmq.Context()
    # create a subscriber socket 
    socket = context.socket(zmq.SUB)
    
    print("Waiting to connect to remote publisherX...")
    
    # subscriber must connect to publisher
    socket.connect("tcp://localhost:%s" % portPublisherX)
    
    
    # Choose a topic 
    topicFilter = "/test"
    socket.setsockopt_string(zmq.SUBSCRIBE, portPublisherX)
    
    while True:
        # get received data
        dataReceived = socket.recv_string()
        # parse received data using comma separator
        topic, msgContent = dataReceived.split(",")
        # display topic name and message content 
        print("Topic : " + topic + " ==> Message content : " + msgContent)
    

Test the above programs, then try to use multiple publishers and subscribers (it's a good exercise)

For further understanding of Zmq, an excellent reference is available at : https://zodml.org/sites/default/files/ZeroMQ.pdf

Aucun commentaire:

Enregistrer un commentaire