mardi 25 décembre 2018

ZMQ and Google Buffer Protocol V3

Introduction

ZMQ is high level UNIX sockets abstraction library allowing to send data in binary format (typically in Google Buffer Protocol). Previous articles on this blog already explained both ZMQ and Google Buffer Protocol V2.

Google Buffer Protocol V3

Google Buffer Protocol V3 (also called protobuf) is just a step ahead it's predecessor (It's recommended to read Google Buffer Protocol V2 before going further). As already said in previous article, Google Buffer Protocol lifecycle goes through 3 steps :
  1. Describe the layout of data to transmit in .proto file.
  2. Compile .proto file using protoc (protobuf compiler) to generate corresponding classes.
  3. Use generated classes in your code.
We are going to review the first two steps as they have changed a little from previous version of protobuf.

Protobuf3 layout description

Data must be described using protobuf syntax which looks like regular C structures. An example would be :
syntax = "proto3";

package com_company_atom;

message AtomStructure{
     string atom_name = 1;
     int32  atom_nb_protons = 2;
     int32  atom_nb_neutrons = 3;
     int32  atom_nb_electrons = 4;
     bool   atom_is_radioactive = 5;
}
Let's discuss the structure of a .proto file :

Prootbuf syntax version declaration

This must be the first line (don't insert comments or empty lines before syntax declaration) in proto file. Version-3 is the latest version of protobuf.

Remark : Every proto file must start with a version syntax declaration, otherwise protoc will assume version 2 by default (syntax="proto2").

Package name (optional)

Although being not mandatory, package names will be translated into namespaces (to avoid naming conflicts in your code). One should always include a package name.

Message description

Data content is described using message keyword, and fields are conventional data types (string, bool, int32, int64, float, double, ..., etc).

Some conventions

Naming conventions
  • Message name : message names should have a capital letter for every new word.
  • field names : should be in lower case separated by "_" for every new word.
The following schematic summarizes the above two properties :
Field identifiers

Every field must be given a unique ID starting from 1 (in our example : atom_name have an ID of 1).

One should ask, We do wee need an ID if field names are already unique? The reader should keep in mind that protobuf does serialize field names (because field names as being string in nature requires more bytes). Protocol buffer serializes only field type + field ID.

Remark : When field ID is less than 16, only one byte is required to serialize field type + field ID

Protoc V3

Protoc-V3 is not available in repository as this time of writing (only Protoc-V2 can be found).

Installing protoc-V3

One can easily download and compile protoc sources as follow:
  • Getting required dependencies :
    $ sudo apt-get install autoconf automake libtool curl make g++ unzip
    
  • Download protobuf-all-[VERSION].tar.gz (this compiler can generate protobuf classes for various languages like Python and C++) from https://github.com/protocolbuffers/protobuf/releases.
  • Compile sources as shown :
    $ cd protobuf
    $ ./configure
    $ make
    $ sudo make install
    $ sudo ldconfig # refresh shared library cache
    

Compiling proto buffer files

As We have already mentioned, protoc can compile proto files to multiple programming languages. The general compilation syntax :
$ protoc --[LANGUAGE]_out=[OUTPUT_GENERATED_CLASS_DIRECTORY] [PATH_PROTO_FILE]
An example is shown below: Some remarks :
  • C++ : protoc generates two files : fileName.pb.h (to include in your code) and fileName.pb.cc (to include for compilation).
  • Python : protoc generates one file fileName_pb2.py (to be imported in your code).

Classes generated by protoc contain at least setters and getters for every field name.

Working with protobuf

Let's have a practical example in both Python and C++ and see how We can serialize our data using Google Protocol Buffer.
  1. Writing a proto file :
    syntax = "proto3";
    
    package com_company_pet;
    
    message PetIdentity{
        string pet_name = 1;
        int32  pet_age = 2;
        bool   pet_gender = 3;
    }
    
    
    // petIdentity.proto
    
  2. Generating protobuf classes :
    $ protoc --cpp_out=. petIdentity.proto
    $ protoc --python_out=. petIdentity.proto
    
  3. Using protobuf classes in our code:
    • Python :
      # main.py
      # import petIdentity_pb2 module
      import petIdentity_pb2
      import sys
      
      
      print("-------- Serializing data -------")
      # Create an instance of PetIdentity
      petIdentity = petIdentity_pb2.PetIdentity()
      
      
      # Fill PetIdentity instance
      petIdentity.pet_name = "Oscar";
      petIdentity.pet_age = 2;
      petIdentity.pet_gender = True;
      
      # Serialize PetIdentity instance using protobuf
      petIdentitySerialized = petIdentity.SerializeToString()
      
      # display serialized data
      print("Serialized Data : " + petIdentitySerialized) 
      
      print("") # add empty line
      
      print("-------- Deserializing data -------")
      # Create an instance of PetIdentity for deserialization
      petIdentityDeserialized = petIdentity_pb2.PetIdentity()
      
      # Deserialize Serialized data
      petIdentityDeserialized.ParseFromString(petIdentitySerialized)
      
      # Display deserialized data
      print("Cat-Name : " + petIdentityDeserialized.pet_name + " <===> Cat-age : " + str(petIdentityDeserialized.pet_age) + " <===> Cat-gender : " + ("male" if petIdentityDeserialized.pet_gender  else "female"))
      
      The above code yields to the following output :

      Remark : In practice, the serialized data (in this example, it's petIdentitySerialized) is what we need to send through the network.

    • C++ :
      /* 
         --------------- main.cpp ----------
         ----- Google Protocol Buffer ------
         --------- Serializer and ----------
         ------- Deserializer Example ------
      */
      
      #include <iostream>
      #include <string>
      #include "petIdentity.pb.h"
      using namespace std;
      
      
      int main(){
          GOOGLE_PROTOBUF_VERIFY_VERSION; // it's recommanded by Google to make sure that the correct protobuf library is loaded
          
          /* -------------------------------
             ---- Protobuf serialization --- 
             ------------ process ----------
             -------------------------------
          */
          com_company_pet::PetIdentity petIdentity; // Create an instance of PetIndentity
      
          petIdentity.set_pet_name("Oscar"); // Set pet name to Oscar
          petIdentity.set_pet_age(2); // Set pet age to 2 years
          petIdentity.set_pet_gender(false); // Set gender to female
      
          string petIdentitySerialized;
      
          petIdentity.SerializeToString(&petIdentitySerialized);    
      
          cout << "Serialized protobuf data : " << petIdentitySerialized << endl;
       
      
      /* 
         ---------------------------------------------
         ------ Protobuf deserialization process -----
         ---------------------------------------------
      */
      
          com_company_pet::PetIdentity petIdentityDeserialized;
          
          petIdentityDeserialized.ParseFromString(petIdentitySerialized);
          cout << "\nDeserializing the data" << endl;
          cout << "Cat-Name : " << petIdentityDeserialized.pet_name() << " <===> Cat-age : " << petIdentityDeserialized.pet_age() << " <===> Cat-gender : " << (petIdentityDeserialized.pet_gender()?"male":"female") << endl; 
      
      
      
      
          google::protobuf::ShutdownProtobufLibrary(); // free all resources
          return 0;    
      }
      
      Executing the above code generates the following output :

      Remark : In practice, the serialized data (in this example, it's petIdentitySerialized) is what we need to send through the network.

Sending protobuf data with ZMQ

As one may expect, protobuf data are expected to be sent through the network. We may use traditional UNIX sockets, however; they can become quickly a bottle in the neck.

ZMQ is an easier, reliable and less cumbersome to use. Previous post already discussed ZMQ. Google Protocol Buffer is cross platform and can be used between multiple languages.
  1. Creating a proto file :
    syntax = "proto3";
    
    package com_company_caesar;
    
    message CaesarCipher {
        string caesar_cipher_text = 1; // Carries caesar cipher
        int32 shift_key = 2; // Shift key (it is equal to 3)
    }
    
  2. Heterogeneous Publisher and Subscriber
    • Python Publisher :
      import caesarCipher_pb2
      import zmq
      import time
      
      def encryptCaesarCipher(plainText, shiftKey):
          cipherText = ""    
          for character in plainText:
              # shift every letter in message by 3
              cipherText += chr(ord(character) + shiftKey) 
          return cipherText
      
      def serializeToProtobuf(msg, caesarCipherProto, shiftKey):
          # fill caesarCipherProto
          caesarCipherProto.caesar_cipher_text = encryptCaesarCipher(msg, shiftKey)
          caesarCipherProto.shift_key = shiftKey
          # return serialized protobuf caesarCipherProto
          return caesarCipherProto.SerializeToString()
      
      # messages to send
      messagesPlainText = ["hello world!", "programming is awesome", "computer science"]
      caesarCipherProto = caesarCipher_pb2.CaesarCipher()
      
      
      portPublisher = "5580"
      # create an zmq context
      context = zmq.Context()
      # create a publisher socket
      socket = context.socket(zmq.PUB)
      # Bind the socket at a predefined port  
      socket.bind("tcp://*:%s" % portPublisher)
      
      
      while True:
          for msg in messagesPlainText:
              # serialize caesarCipherProto into protobuf format
              dataSerialized = serializeToProtobuf(msg, caesarCipherProto, 3)
              print("Plain Text : " + msg + " <===> Caesar cipher : " + caesarCipherProto.caesar_cipher_text)
              print("Protobuf message to send : " + str(dataSerialized)) # display caesarCipherProto data
              time.sleep(1)
              socket.send(b""+dataSerialized) # send binary serialized data
              print("---------------------------------")
              print("---------------------------------")
              print("---------------------------------")
      
    • C++ Subscriber :
      #include <iostream>
      #include <zmq.hpp>
      #include <string>
      #include "caesarCipher.pb.h"
      
      using namespace std;
      
      void DecryptCipherDisplay(std::string cipherText, int cipherKey);
      
      int main(){
          GOOGLE_PROTOBUF_VERIFY_VERSION;
          /* -------------------------- */
          /* Create a subscriber socket */
          /* -------------------------- */
          zmq::context_t context(1);
          zmq::socket_t subSocket(context, ZMQ_SUB);
          // Connect to pyton's publisher binding port
          subSocket.connect("tcp://localhost:5580"); 
        
          cout << "------ Subscriber running ------\n" << endl;
          // Listen for all topics
          subSocket.setsockopt(ZMQ_SUBSCRIBE, "" , strlen(""));
          // Instantiate a CaesarCipher to be filled with received data
          com_company_caesar::CaesarCipher caesarCipher; 
          while(true) {
              
              zmq::message_t zmqMessageReceived; // used to hold zmq received data
              subSocket.recv(&zmqMessageReceived); // Blocks until data reception
              // Map zmq data holder to string
              std::string messageReceived(static_cast<char*>(zmqMessageReceived.data()), zmqMessageReceived.size());
              // Deserialize protobuf data and store them into caesarCipher
              caesarCipher.ParseFromString(messageReceived);
               
              // Descrypt caesar cipher and display received string
              DecryptCipherDisplay(caesarCipher.caesar_cipher_text(), caesarCipher.shift_key());
                 
              cout << "-------------------------------------" << endl;
          }        
      
          google::protobuf::ShutdownProtobufLibrary();
      
          return 0;
      }
      
      
      void DecryptCipherDisplay(std::string cipherText, int cipherKey){
          string::iterator it;
          string PlainTextRecovered;
          for (it = cipherText.begin(); it < cipherText.end(); it++) 
              PlainTextRecovered += static_cast<char>(*it - cipherKey); // reverse caesar cipher
          cout <<  "Reversing caesar cipher : "<< PlainTextRecovered << endl;
      }
      
  3. Testing the communication :

samedi 22 décembre 2018

Google Buffer Protocol V2

Introduction

Data exchange formats are standards used to transmit data across different applications running on a variety of platforms in the internet. During a long time, XML and JSON were the leaders. However, those formats lake some programming paradigms that developers tend to use like ENUMs and methods (OOP compliant). In addition, human readable formats (like JSON) are slower to parse and quiet large in size compared to binary files; reasons that lead Google to release Google Buffer Protocol.

Google Protocol Buffer (Protobuf)

Google Protocol Buffer (also known as protobuf) was developed by Google as a data exchange neutral independent platform. It's goal is to represent complex structures and transmit them in a more efficient, reliable and safer way in different programming languages like Python and C++. Protobuf is seen as a better and more extensible format compared with XML or JSON :

  • It has support for ENUMs
  • Methods are used to get (and set) and serialize (and deserialize) structures.
  • It is faster (from 20 to 100)and smaller in size (from 3 to 10) than traditional data exchange formats (like XML)

Generating Google Buffer Format

Only three steps are required to serialize your data to protobuf compliant format :
  • Write your data in a .proto file (example in picture above shows a class representing car information).
  • Use Google ProtoBuffer Compiler (called protoc) to generate classes which can get/set and serialize/deserialize data (data defined in .proto file)
  • Include generated classes in your code and have fun

Note : protoc generates class names as those defined in .proto file (in our example, class name is CarInfo).

General Syntax Overview

A proto file is no different from standard C structure (with only some additional specifications). Let's have an example :
syntax = "proto2"; // mandatory field (proto3 - protobuf version 3 exists also)

package com.company.test; // The package is an optional field 

message EmployeeInformation{
  required string employee_name = 1;
  required int32  employee_id = 2;
  optional bool gender = 3;
  repeated float last_three_month_salary = 4;
}
We will analyse this message step by step (see schematic below which serves as an introduction).

Data type

As one can see, familiar data types (string, bool, int32, float..., etc) are used for variable members (employee_id, employee_id, ..., etc).
Nevertheless to say that other data types are possible : more an be found here https://developers.google.com/protocol-buffers/docs/proto#simple

Field modifiers

In order to understand more, We can compare those modifiers to some regular expression concepts :
ModifierRegular expressionMeaning
required{1}mandatory field that cannot be empty (otherwise protobuf
throws an error when serializing or deserializing)
optional? or {0,1}field can be empty and not set by user
repeated* or {0,}can have zero or more values (used to
create arrays
)

Field identifiers

Every field must be associated with a unique identifier. In fact, the latter will be serialized (and not the field name) and deserialized by protobuf. Both of field type and identifier are encoded. However, the required number of bytes used for representing them (field type+identifier) depends on identifier's value. When the latter is less than 16, only one byte is required (try using it for more frequently transmitted data to).

Default values

Default values can be assigned to fields declared as "optional". If the user did not set an optional field value, default one will be assigned. In case a default value was not provided, another value will assigned implicitly by protobuf as follow :
Field type Value (absence of default value)
numeric (int, float, double, ..., etc)0
stringempty
ENUMfirst value in ENUM

Nested messages

Multiple messages can be nested like C structures as shown in the example below :
message AirCraft{
    required string aircraft_model = 1;
    required double aircraft_max_speed = 2;
    
    message TransmissionAndDiscovery{
        required string transmission_frequency_technology_version = 1;
        required double transmission_frequency = 2;
        required double transmission_frequency_range = 3;
        
    }

    required bool aircraft_stealthy = 3;
}

Remark : Field identifier must be reset when declared inside another message, so transmission_frequency_technology_version = 1 and not 3

Miscellaneous concepts

Syntax version

Two different versions of the protocol are used in the wild (version 2 is more dominant). We must include the version number (typically syntax = "proto2" or syntax = "proto3").

Package name

Package name is translated by protoc (google buffer protocol compiler) to a Namespace in the generated classes in order to avoid naming conflicts (this field is optional but it's highly recommended).

Naming conventions

  • Message name : Capitalize the first word and use camel case for others (if any), for instance :
    • message teslaCarfeatures ==> Wrong
    • message TeslaCarFeatures ==> Correct
  • Field name : use lower case notation separated by underscore when having multiple words
    • required int32 chargingSpeed ==> Wrong
    • required int32 charging_speed ==> Correct

Practical Protobuf

As We have already said, Google Protocol Buffer can be used with various programming languages (C++, GO, DART, and more). We can demonstrate it's usage with two widely spread languages C++ and PYTHON.

Protobuf in PYTHON

In this section, We are going to use google protocol buffer to serialize data required to identify a given person and save it to a file. Another process will parse the file, read the data back for display.
  1. Describing the data using protocol buffer format :
    syntax = "proto2";
    
    package com.company.humainIdentity;
    
    message HumainIdentityDescription{
        required string humain_first_name = 1;
        required string humain_last_name = 2;
        required int32 humain_age = 3; 
        required bool humain_gender = 4;
        optional string humain_profession = 5 [default="No Profession"];
    }
    
    Save the file as humainIdentity.proto (or any other name like example.proto).
  2. Generating classes using protoc (Google Buffer Protocol)
  3. protoc --python_out=. humainIdentity.proto
    
    The compiler protoc produces a file called humainIdentity_pb2.py (or nameOfFile_pb2.py if one used another name).
  4. Use generated classes in Python code
    • serializer.py :
      import humainIdentity_pb2 # import generated class
      import sys
      
      
      humainId = humainIdentity_pb2.HumainIdentityDescription() # Create an objectHumainIdentityDescription
      humainId.humain_first_name = "Jugurtha" # Set first name
      humainId.humain_last_name = "BELKALEM" # Set last name
      humainId.humain_age = 26 # Set age
      humainId.humain_gender = True # Set age
      humainId.humain_profession = "Embedded System Enginner" # Set profession (this optional, so can be left empty)
      
      
      humainIdSerialized = humainId.SerializeToString() # Serialize data using SerializeToString method
      
      print(humainIdSerialized) # display serialized data
      
      fileOutputStream = open("humain_id", "wb")
      fileOutputStream.write(humainIdSerialized) # save serialized data to a file
      fileOutputStream.close()
      
    • deserializer.py
      import humainIdentity_pb2 # import generated class (required to deserialize)
      import sys
      
      
      def getGender(humainGender):
          if humainGender:
              return "Male"
          else:
              return "Female"
      
      
      def parseDeserializedData(hId):
          print("Full Name : " + hId.humain_first_name + " " + hId.humain_last_name)
          print("Gender : " + getGender(hId.humain_gender))
          print("Age : " + str(hId.humain_age))
          print("Profession : "+ hId.humain_profession)
      
      humainId = humainIdentity_pb2.HumainIdentityDescription() # Create object Instance 
      
      fileInputStream = open("humain_id", "rb")
      humainIdSerialized = fileInputStream.read() # Read data from file
      humainId.ParseFromString(humainIdSerialized) # deserialize data read from file using ParseFromString method
      fileInputStream.close()
      
      parseDeserializedData(humainId) # display deserialized data
      
Executing the above programs yields the following :

protobuf in C++

In this example we are going to gather necessary information to compute a BMI (Body Mass Index) of an individual, save it to a file and read it back to calculate the BMI.
  • proto buffer file specification :
    syntax = "proto2";
    
    package com.company.bodyMassIndex;
    
    
    message BodyMassIndex{
        required float height = 1; 
        required float weight = 2;
    }
    
  • Generate C++ classes :
    protoc --cpp_out=. bodyMassIndex.proto
    
    In case of C++, a header (bodyMassIndex.pb.h) and class implementation (bodyMassIndex.pb.cc) are generated.
  • Use generated classes in your code :
    • serializer.cpp :
      #include <iostream>
      #include <fstream>
      #include <string>
      #include "bodyMassIndex.pb.h"
      using namespace std;
      
      
      int main(){
          GOOGLE_PROTOBUF_VERIFY_VERSION; // it's recommanded by Google to make sure that the correct protobuf library is loaded
      
          com_company_bodyMassIndex::BodyMassIndex bmi; // Create an instance of BodyMassIndex
      
          bmi.set_height(1.75); // Set the height
          bmi.set_weight(60); // Set the weight
      
          // Create a output file stream (in order to save BodyMassIndex instance)
          fstream outFileStream("bmi", ios::out | ios::trunc | ios::binary);
      
          if (outFileStream) {
            bmi.SerializeToOstream(&outFileStream); // Serialize bmi and save it
          } else
              cout << "file error"  << endl;
          google::protobuf::ShutdownProtobufLibrary(); // free all resources
          return 0;    
      }
      
    • deserializer.cpp :
      #include <iostream>
      #include <fstream>
      #include <string>
      #include "bodyMassIndex.pb.h"
      using namespace std;
      
      
      int main(){
          GOOGLE_PROTOBUF_VERIFY_VERSION;
      
          com_company_bodyMassIndex::BodyMassIndex bmi;
          
          // Create a input stream (to read serialized data from a file)
          fstream inFileStream("bmi", ios::in | ios::binary);
      
          if (inFileStream) {
            bmi.ParseFromIstream(&inFileStream); // Deserialize read data and load them into bmi
          } else
              cout << "file error"  << endl;
      
          cout << "Height : " << bmi.height() << " ==> Weight : " << bmi.weight() << endl; // display deserialized data
      
          cout << "BMI = " << bmi.weight() / (bmi.height() * bmi.height()) << endl; // bmi = (weight / (height * height))
          google::protobuf::ShutdownProtobufLibrary();
          return 0;    
      }
      
The above programs produces the following output :

dimanche 2 décembre 2018

Zmq concepts and application

Introduction

Zmq is a messaging library, unlike traditional messaging applications (like ActiveMq, RabbitMq, ..., etc); it offers a toolbox to create a messaging system (that's why it is a library).

Diving into MQTT protocol

Mqtt is a messaging protocol, it relies on binary format to exchange messages with low overhead. It is heavily used in embedded systems and IoT.

MQTT is based on publish-subscribe paradigm, it is completely asynchronous (unlike client-server model). There are three different actors that build this protocol :
  • Publisher : the part that produces information and organizes them into topics.
  • Broker : relays data generated by Publishers to registered subscribers. Publisher's (or Publishers) topics are known by the broker.
  • Subscriber : data consumer end-point. After registering to a topic (or multiple topics), it can receive messages for processing or visualization.
At a high level, We can compare MQTT to MVC model :
  1. Publishers : is the Model because it contains the data.
  2. Brokers : are the Controllers which maps data to the Vue.
  3. Subscribers : are the Vue which displays data.

Zmq working modes

Zmq can be used in different ways from which one can state :
  • REQ-REP : request-reply, in other words client server paradigm (synchronous operations).
  • PUB-SUB : Publish-Subscribe, based on MQTT protocol (asynchronous operations).
  • PUSH-PULL : or pipeline mode which distribute tasks from a PUSH node to multiple PULL nodes (concurrent processing).

Zmq PUB-SUB mode

We will focus on Publisher-Subscriber paradigm using Zmq. However, keep in mind that Zmq does not implement a broker (We are going to see how to create one).

Brokerless mode (default mode)

Zmq works in brokerless mode (zqm means zero broker and zero cost) and does not offer a central broker as classical MQTT applications but does offer ways to create it.

Setting Zmq in this mode is similar to client and server architecture as the image shown below :

Connecting multiple publishers and subscribers using Zmq forms a Star Topology that works well for small networks (but does not scale). It means also that Subscribers must know the Publishers (IP addresses and binding ports) to which they want to register.

Let's have a working example to illustrate zmq brokerless mode as shown in the image below :

The publisher binds to port 5580 and sends messages on topic "/test". on the other hand, the subscriber connects to publisher (subscriber must be aware of IP address and binding port of publisher) on topic "/test".

Remark : Either publisher or subscriber can start first (Zmq pauses the subscriber until the publisher can emit messages). In practice, try to start subscriber before publisher in order to get all messages.

We can have a look at the source codes written in PYTHON :
  • Publisher code:
    import zmq
    import time
    
    portPublisher = "5580"
    
    # create an zmq context
    context = zmq.Context()
    # create a publisher socket
    socket = context.socket(zmq.PUB)
    # Bind the socket at a predefined port  
    socket.bind("tcp://*:%s" % portPublisher)
    
    while True:
        # Set the topic on which to publish
        topicFilter = "/test"
        # create some data to send
        msgToPublish = "Hello world Zmq!"
    
        # print topic and data
        print("%s %s" % (topicFilter, msgToPublish))
        # send topic and message separated by comma 
        # (in order to parse them easily on subscriber side)    
        socket.send_string("%s,%s" % (topicFilter, msgToPublish))
        # slow down sending rate
        time.sleep(1)
    
  • Subscriber code :
    import sys
    import zmq
    
    portPublisher = "5580"
    
    # Create an zmq context
    context = zmq.Context()
    # create a subscriber socket 
    socket = context.socket(zmq.SUB)
    
    print("Waiting to connect to remote publisher...")
    
    # subscriber must connect to publisher
    socket.connect("tcp://localhost:%s" % portPublisher)
    
    
    # Choose a topic 
    topicFilter = "/test"
    socket.setsockopt_string(zmq.SUBSCRIBE, topicFilter)
    
    while True:
        # get received data
        dataReceived = socket.recv_string()
        # parse received data using comma separator
        topic, msgContent = dataReceived.split(",")
        # display topic name and message content 
        print("Topic : " + topic + " ==> Message content : " + msgContent)
    
We can run the above programs and see the result:

This mode works fine with few pub-sub devices but cannot scale efficiently, publishers binding (IP addresses and ports) must be known by subscribers in the system. For better performance and maintainability, We can use an intermediary device called broker (or sometimes proxy).

Building a Broker with zmq

Zmq broker can be built quiet easily, a broker (sometimes called proxy) will act as an intermediary device with two interfaces as shown :

We should point out few recommandation :
  • The XSUB interface must listen to the empty topic "" in order to get all traffic from all publishers.
  • The publisher must connect to the broker (unlike in broker mode where it was binding).
  • With/without broker Subscribers always connects
As a working exercise, one can build a broker quickly as follow :
  1. Proxy-Broker :
    import zmq
    
    # Creating zmq context
    context = zmq.Context()
    
    # Creating subX interface 
    subX = context.socket(zmq.SUB)
    subX.bind("tcp://*:5580")
    
    subX.setsockopt_string(zmq.SUBSCRIBE, "")
    
    # Creating the pubX interface
    pubX = context.socket(zmq.PUB)
    pubX.bind("tcp://*:5581")
    
    print("Proxy is active!")
    # connect subX and pubX (creating the proxy)
    zmq.device(zmq.FORWARDER, subX, pubX)
    
    
    # close and free resources
    subX.close()
    pubX.close()
    context.term()
    
  2. Publisher :
    import zmq
    import time
    
    portSubscriberX = "5580"
    
    # create an zmq context
    context = zmq.Context()
    # create a publisher socket
    socket = context.socket(zmq.PUB)
    # COnnect the socket to SUBX (broker interface)  
    socket.connect("tcp://localhost:%s" % portSubscriberX)
    
    while True:
        # Set the topic on which to publish
        topicFilter = "/test"
        # create some data to send
        msgToPublish = "Hello world Zmq!"
    
        # print topic and data
        print("%s %s" % (topicFilter, msgToPublish))
        # send topic and message separated by comma 
        # (in order to parse them easily on subscriber side)    
        socket.send_string("%s,%s" % (topicFilter, msgToPublish))
        # slow down sending rate
        time.sleep(1)
    
  3. Subscriber :
    import sys
    import zmq
    
    portPublisherX = "5581"
    
    # Create an zmq context
    context = zmq.Context()
    # create a subscriber socket 
    socket = context.socket(zmq.SUB)
    
    print("Waiting to connect to remote publisherX...")
    
    # subscriber must connect to publisher
    socket.connect("tcp://localhost:%s" % portPublisherX)
    
    
    # Choose a topic 
    topicFilter = "/test"
    socket.setsockopt_string(zmq.SUBSCRIBE, portPublisherX)
    
    while True:
        # get received data
        dataReceived = socket.recv_string()
        # parse received data using comma separator
        topic, msgContent = dataReceived.split(",")
        # display topic name and message content 
        print("Topic : " + topic + " ==> Message content : " + msgContent)
    

Test the above programs, then try to use multiple publishers and subscribers (it's a good exercise)

For further understanding of Zmq, an excellent reference is available at : https://zodml.org/sites/default/files/ZeroMQ.pdf