Mastering Secure MQTT with Python

Build an MQTT Publisher and Subscriber in Python

·

10 min read

Mastering Secure MQTT with Python

Generated with Bing DALL-E

Do you need secure communication via MQTT? Yes, then this is the right article for you! We show you how to implement an MQTT Publisher and Subscriber in Python. We will then establish a secure connection via an MQTT broker. If you want to know how to set up an MQTT broker with Docker, check out our article on this topic.

We recommend reading the article on MQTT Broker first. This article introduces some basics.

First, we will introduce the basic functionalities of MQTT. Then, we show you how to create an MQTT Publisher and Subscriber in Python. So be curious! The steps are the following:

  1. Recap

  2. Technical requirements

  3. Create an MQTT Publisher in Python

  4. Create an MQTT Subscriber in Python

  5. Conclusion

  6. Useful links


🎓 Ad | Learn Coding - Ignite Your Tech Future

In a digital age, coding is your gateway to a prosperous future. Boost your job security and break into exciting new careers - no prior degree required. Discover the course: Hello Coding - Anyone Can Learn to Code*.

Most of today’s top coders started just like you! It is a completely self-paced online learning course. All Resources Included: Every tool, every file, yours forever.

Here you’ll discover:

  • Web Dev: Design, build & monetize websites, and discover affiliate marketing.

  • Python: Step into high-paying roles such as data scientist or machine learning engineer.

  • App Dev: Create apps and harness revenue streams like ads or in-app purchases.

👉🏽 Enroll today and take the next step in your Coding career.*


🤔 Recap

MQTT stands for Message Queuing Telemetry Transport. It is a messaging protocol for IoT devices with resource constraints. It is specialized for low bandwidth and high latency environments, making it an ideal protocol for machine-to-machine (M2M) communication.

MQTT works according to the publisher/ subscriber principle with a central broker. This principle means that the sender and receiver have no direct connection. The senders publish the data on a topic, and all recipients who have subscribed to this topic receive the data.

✅ Technical requirements

Prerequisites

You will need the following prerequisites:

  • Installed Python (≥ 3.7)

  • Installed conda and pip

  • Access to a bash (macOS, Linux or Windows)

  • Code editor of your choice (We use VSCode.)

The implementation was tested on macOS Ventura 13.2.1.

Setup

Enter the following in your terminal:

  • Create a conda environment (env): conda create -n mqtt-env python=3.9.12 -> Answer the question Proceed ([y]/n)? with y.

  • Activate the conda env: conda activate mqtt-env

  • Install the necessary libraries: pip install pandas==1.5.3 paho-mqtt==1.6.1

💻 Create an MQTT Publisher in Python

In this section, we implement the MQTT Publisher with Python.

First, we create an empty Python file and call it pub.py. In this file, we start a menu. Insert the following code into the file.

from menu import Menu

menu = Menu()
menu.start_menu()

Then we create the file menu.py. In this file, we implement the menu.

from simulator import Simulator

class Menu:
    def start_menu(self):
        value = True
        while value:
            try:
                print("### Simulator ###")
                print("(1) Simulate data")
                print("(0) Exit")
                print("> ", end="")
                choice = int(input())

                if choice == 0:
                    value = False

                if choice == 1:
                    Simulator.simulate_data("data")

            except ValueError:
                print("Wrong input!!!")

You can start the data simulation under menu item 1. In the next step, we create the simulator. Create a file called simulator.py and insert the following code:

import json
from os import path
import time
import pandas as pd
from mqtt_publisher import Publisher

class Simulator:
    @staticmethod
    def simulate_data(file_name):
        # read data
        data = pd.read_csv(path.join(file_name + ".csv"), parse_dates=["Date"])

        # MQTT publisher
        mqtt_publisher = Publisher()

        while True:
            for rows in range(len(data.iloc[:,1])):
                for cols in range(len(data.iloc[1, 2:])):
                    cols = cols + 2
                    sensor_value = str(data.iloc[rows, cols])
                    key_value = str(data.columns[cols])
                    mqtt_publisher.set_topic(key_value)

                    mqtt_msg = json.dumps({
                        "type": "sample data",
                        "cycle_counter": str(data.iloc[rows, 0]),
                        "timestamp": str(data.iloc[rows, 1]),
                        "value": sensor_value
                    })

                    mqtt_publisher.publish_value(mqtt_msg)
                time.sleep(1)   # the cycle time is 1 second.

First, we import all the necessary libraries. Then we create the class Simulator that contains the method simulate_data(). The method reads sample data from a CSV file. Create a file with the name data.csv and the following content (You can also use your own data.):

"ID","Date","Param1","Param2","Param3"
"0","2023-03-24 20:31:00","3.1","0.7","4.3"
"1","2023-03-24 20:31:01","3.2","0.6","4.3"
"2","2023-03-24 20:31:02","3.2","0.6","4.4"
"3","2023-03-24 20:31:03","3.2","0.6","4.4"
"4","2023-03-24 20:31:04","3.1","0.6","4.4"
"5","2023-03-24 20:31:05","3.1","0.6","4.3"
"6","2023-03-24 20:31:06","3","0.6","4.3"
"7","2023-03-24 20:31:07","3","0.6","4.3"

Next, we create an object of the Publisher class. In the while-loop, we publish each parameter individually to the MQTT broker. The function publish_value() publishes the parameters as a JSON string to the MQTT broker.

In the next step, we look at the MQTT Publisher. Create a Python file with the name mqtt_publisher.py and add the following:

import paho.mqtt.client as mqtt
import mqtt_constants

def on_connect(client, userdata, flags, rc):
    """
    The callback for when the client receives a CONNACK response from the server.
    """
    print("Client: " + str(client))
    print("User data: " + str(userdata))
    print("Flags: " + str(flags))
    print("Connected with result code " + str(rc))

class Publisher:
    """
    This class contains the attributes and functions for MQTT publisher.
    For more information about the paho.mqtt.python library see:
    https://github.com/eclipse/paho.mqtt.python
    """

    username = mqtt_constants.USER_NAME
    password = mqtt_constants.PASSWORD
    ip_broker = mqtt_constants.IP_ADDRESS
    port_broker = mqtt_constants.PORT_BROKER
    keep_alive = mqtt_constants.KEEP_ALIVE
    qos_level = mqtt_constants.QOS_LEVEL
    topic = ""

    client = mqtt.Client()

    def __init__(self):
        print("Broker-IP: " + str(self.ip_broker) + ", Broker-Port: " + str(self.port_broker))
        if mqtt_constants.TLS_ON:
            self.client.tls_set(ca_certs="./certs/ca.crt",
                                certfile="./certs/client.crt",
                                keyfile="./certs/client.key")
        self.client.username_pw_set(self.username, self.password)
        self.client.connect(self.ip_broker, self.port_broker, self.keep_alive)   # mqtt broker
        self.client.on_connect = on_connect
        self.client.loop_start()

    def __del__(self):
        self.client.disconnect()   # disconnect
        self.client.loop_stop()   # stop loop

    def set_topic(self, topic):
        """
        Setter method for the MQTT topic.
        :param topic: This parameter contains the MQTT topic.
        :return: None
        """
        self.topic = topic

    def publish_value(self, value):
        """
        This function publishes the JSON string to MQTT broker.
        :param value: This parameter contains the JSON string to be published.
        :return: None
        """
        print("Published JSON: " + str(value))
        self.client.publish(self.topic, value, self.qos_level)

The on_connect() function informs about the connection status. It is a callback for when the client receives a CONNACK response from the server. In the Publisher class, we set the connection data. The file mqtt_constants.py contains the connection data. You still have to create this file. Then insert the following into the file.

"""
This file contains environment variables for the MQTT broker.
Further information:
No TLS: PORT_BROKER = 1883, TLS_ON = False
TLS: PORT_BROKER = 8883, TLS_ON = True
"""

USER_NAME = "mosquitto"
PASSWORD = "mosquitto"
IP_ADDRESS = "<ip> or <computer-name>"
PORT_BROKER = 8883
QOS_LEVEL = 2
KEEP_ALIVE = 8000
TLS_ON = True

The file contains the connection data, the QoS level and whether encryption should be used or not. If we set the variable TLS_ON to True and the port to 8883, there is an encrypted connection between the Publisher and the MQTT Broker.

In the Publisher class, we then create a Client object. On this client object, we set the connection data in the constructor. Furthermore, we set the certificates for an encrypted connection. To do this, create a folder called certs and place the three files there. We generated the certificate files in a previous article.

Then we define a setter method to set the MQTT topic and a publish method to publish the JSON string.

If you created everything correctly, your MQTT Publisher is ready to use. To start the MQTT Publisher, run the following command in the terminal:

$ python pub.py

For testing, you can use the MQTT Broker from the previous article. The article describes how you can use the client application MQTT Explorer. With the MQTT Explorer, you can test whether your publisher publishes the data correctly on the broker.

💻 Create an MQTT Subscriber in Python

In this section, we implement the MQTT Subscriber with Python. First, we create a new Python file called sub.py. Insert the following content into this file:

from mqtt_subscriber import Subscriber

sub = Subscriber()

We import the class Subscriber and create an object of the class. Next, create the file in which we implement the Subscriber class. We call this file mqtt_subscriber.py. In this file, we add the following:

import paho.mqtt.client as mqtt
import mqtt_constants


def on_connect(client, userdata, flags, rc):
    """
    The callback for when the client receives a CONNACK response from the server.
    """
    relevant_topics = [("Param1", 2), ("Param2", 2), ("Param3", 2)]
    print("Client: " + str(client))
    print("User data: " + str(userdata))
    print("Flags: " + str(flags))
    print("Connected with result code " + str(rc))
    client.subscribe(relevant_topics)


def on_message(client, userdata, msg):
    """
    This function is called when a message is received on a topic.
    """
    print("Client: " + str(client))
    print("User data: " + str(userdata))
    print("Topic: " + msg.topic + " and " + "QoS level: " + str(msg.qos))
    print(str(msg.payload))


class Subscriber:
    """
    This class contains the attributes and functions for MQTT subscriber.
    For more information about the paho.mqtt.python library see:
    https://github.com/eclipse/paho.mqtt.python
    """

    username = mqtt_constants.USER_NAME
    password = mqtt_constants.PASSWORD
    ip_broker = mqtt_constants.IP_ADDRESS
    port_broker = mqtt_constants.PORT_BROKER
    keep_alive = mqtt_constants.KEEP_ALIVE
    qos_level = mqtt_constants.QOS_LEVEL
    topic = ""

    client = mqtt.Client()

    def __init__(self):
        print("Broker-IP: " + str(self.ip_broker) + ", Broker-Port: " + str(self.port_broker))
        if mqtt_constants.TLS_ON:
            self.client.tls_set(ca_certs="./certs/ca.crt",
                                certfile="./certs/client.crt",
                                keyfile="./certs/client.key")
        self.client.username_pw_set(self.username, self.password)
        self.client.connect(self.ip_broker, self.port_broker, self.keep_alive)   # mqtt broker
        self.client.on_message = on_message
        self.client.on_connect = on_connect
        self.client.loop_forever()

    def __del__(self):
        self.client.disconnect()   # disconnect
        self.client.loop_stop()   # stop loop

First, we import the library mqtt and our file mqtt_constants.py. In the file mqtt_constants.py, we save the relevant constants for the Subscriber. Create a file with the name mqtt_constants.py and add the following content:

"""
This file contains environment variables for the MQTT broker.
Further information:
No TLS: PORT_BROKER = 1883, TLS_ON = False
TLS: PORT_BROKER = 8883, TLS_ON = True
"""

USER_NAME = "mosquitto"
PASSWORD = "mosquitto"
IP_ADDRESS = "<ip> or <computer-name>"
PORT_BROKER = 8883
QOS_LEVEL = 2
KEEP_ALIVE = 8000
TLS_ON = True

The file contains the connection data, the QoS level and whether encryption should be used or not. If we set the variable TLS_ON to True and the port to 8883, there is an encrypted connection between the Subscriber and the MQTT Broker.

Now, back to the mqtt_subscriber.py file. In this file, we first define the on_connect() method. In this method, we subscribe to the relevant topics (“Param1”, “Param2”, and “Param3”). The number 2 indicates the QoS level.

Next, we define the on_message() method. In this method, we receive messages from the MQTT Broker. There we can print the message payload (JSON string).

In addition, we define the class Subscriber. In this class, we set the class attributes based on the values in the file mqtt_constants.py. Then we create an object of the class Client. In the constructor, we check whether an encrypted connection should be used or not used. If yes, we load the certificates in the certs folder. Then we set the connection data and the relevant callbacks. After that, we start the subscriber in a loop forever. In the destructor, we terminate the connection to the MQTT broker.

If you created everything correctly, your MQTT Subscriber is ready to use. You can start the Subscriber with the following command (Make sure you are in the Python environment mqtt-env.):

$ python sub.py

You should then see the following output in your terminal:

Output Subscriber (Screenshot by authors)

Output Subscriber (Screenshot by authors)

The output shows the transmitted JSON string, information about the client, the topic and the QoS level. Congratulations, you have implemented a secure MQTT Publisher and Subscriber in Python.

🎬 Conclusion

In this article, we learned how to implement an MQTT Publisher and Subscriber in Python. The Publisher and Subscriber can establish a secure connection to the MQTT broker by securing the connection via TLS. You can use the article as a code base for your next MQTT project. Based on this, you can implement a secure MQTT variant for your project.


👉🏽 Join our free weekly Magic AI newsletter for the latest AI updates!

👉🏽 Elevate your Python and ML skills with our top course recommendations!

Do you want to support us as writers? You can do it by becoming a member of Tinz Twins Hub. Thanks for your support. Don’t forget to follow us on X. Thanks for your support. 🙏🏽🙏🏽

Thanks so much for reading. Have a great day!


🔍 Useful literature

Books

* Disclosure: The links are affiliate links, which means we will receive a commission if you purchase through these links. There are no additional costs for you.

The authors are NOT LIABLE for any damages arising from the software. Use of the software is at your own risk.