RAK7289CV2 Resend uplink after MQTT back from outage

Hi guys, recently I’m doing testing on the Rak7289CV2 built in network server, and the uplink I will forward to an external MQTT broker. Now I facing an issue on failing resending uplink message (uplink during the broker outage) after MQTT broker resume from outage.

In the integration interface parameter, I’ve set false to clean session, Qos1, and fixed client Id. and in my self host MQTT broker (mosquitto) I configure with persistence. But unfortunately, the uplink was not resend again once the MQTT reconnect again.

May I know if there is anything I can try out further to archive this? Thank you

Hello,
Please clarify if you are using an external MQTT Broker, or if you are using the WisGateOS2 MQTT Broker and only subscribing to the MQTT Topics of the Built-In NS Integration Interface?
Also, it is advisable to use QoS2 - Exactly Once.

Hi,

Thanks for replying,

Yes I’m using external MQTT broker (self hosted mosquitto broker), and I configure this broker info into the WisGate integration interface parameter section (fixed client id and disable clean session, to enable publishing the uplink into this broker.

Also, I follow your advice to change to QOS=2, it is still same, and below is the testing flow:

  1. Stop the external MQTT broker
  2. Start the MQTT broker after 10 minutes
  3. Message uplink during the outage is not send to the subscriber after started.

For additional info, my MQTT broker also configure with persistence.

Is there anything I still missed out?

Hi,

MQTT brokers such as Mosquitto do buffering to some extent for the QoS feature. But when the broker goes down itself, your gateway is trying to connect for 10 minutes continuously, and cannot send uplinks. It’s the same for the subscribers. During this period, uplink messages evaporate because there’s no broker to connect. I hope you understand this mechanism.

Otherwise, if the connection status of the MQTT broker itself is unstable and it matters, you can consider implementing a higher-level protocol that, between the end-device and an end-user(subscribers), transceives ACK messages and on-device buffering features. Or directly connecting to the gateway built-in MQTT broker is another option.

Hi,

Thanks for the info and suggestion.

The purpose of self host the external MQTT broker is to allow me centralize the broker, ease for maintain, and also security purpose especially those subscriber is only allow to access to the private network.

Let say if I direct use the gateway built-in MQTT broker, is it possible to use the built-in broker (with persistence) to forward to the external broker? So that it able to persist the message if the external broker has offline, and continue forward when the broker back to service.

Hello,

To use the gateway’s built-in MQTT broker, another device (e.g., a Raspberry Pi) in the network would subscribe to the gateway’s internal broker and publish the LoRaWAN packets to an external broker. If the external broker goes down, the intermediate subscriber would buffer the LoRaWAN packets internally and then send the buffered packets when the external broker becomes available again.

The disadvantage is the need for this intermediate subscriber device, which would likely be more costly than implementing a higher-level protocol that handles ACK messages.

The most important thing is to keep the server robustly running the external MQTT broker.

Hi,

Got it, but is it recommended use ssh to access gateway and config the built-in MQTT broker to bridging topic to the external broker?
so that the Raspberry Pi can be excluded, but I not sure is this a practical way to do so? and is there any further concern on this?

Hi,

If the SSH you mentioned means registering and running a service via SSH, it can be done with it using Python. The WisGateOS 2 has a built-in Python executable and the paho.mqtt library so that some custom jobs can run.

A variety of experiments can yield more practical solutions. Registering Python as a service is one such experiment. Regarding concerns, all custom files except for the SD card will be deleted with each WisGateOS 2 firmware update.

Hi,

I using power shell >ssh [email protected] to access the gateway config /etc, and I think there is config folder related to the mosquito config, not sure whether is this a correct way to perform this and also whether will impact to any function in the gateway.

Can you show me more the python library that you mentioned? I would like to explore more on this.

Thanks a lot.

Hi,

The built-in MQTT broker on the WisGateOS 2 is totally controlled by the gateway itself. So I would not recommend manipulating directly.

Here is a skeleton code for buffering using Python and the Paho.MQTT library. Also, referring to the library documentation would be nice. - paho-mqtt · PyPI

import queue
import json
import paho.mqtt.client as mqtt

class Program:
    """
    example
    {
        "builtin_broker": "localhost",
        "external_broker": "external.broker.address"
    }
    """
    config_file_path = "./program-config.json"

    _config: dict = None
    _builtin_client: mqtt.Client = None
    _external_client: mqtt.Client = None

    # message buffer for external broker
    _buffer: queue.Queue = None
    _external_connected: bool = False

    def __init__(self):
        self._load_config()
        self._buffer = queue.Queue()
        self._builtin_client = mqtt.Client()
        self._builtin_client.on_connect = self._builtin_on_connect
        self._builtin_client.on_message = self._builtin_on_message
        self._builtin_client.on_disconnect = self._builtin_on_disconnect
        self._external_client = mqtt.Client()
        self._external_client.on_connect = self._external_on_connect
        self._external_client.on_disconnect = self._external_on_disconnect

    def _load_config(self):
        with open(self.config_file_path, "r") as f:
            self._config = json.load(f)

    def start(self):
        self._builtin_client.connect_async(self._config["builtin_broker"], 1883, 60)
        self._external_client.connect_async(self._config["external_broker"], 1883, 60)
        self._builtin_client.loop_start()
        while True:
            try:
                self._external_client.loop_forever()
                    
            except Exception as e:
                print(f"Exception occurred: {e}")
                pass

    # when connected to the builtin broker, subscribe to the uplink topic
    def _builtin_on_connect(self, builtin_client: mqtt.Client, userdata, flags, rc):
        print("Connected with result code " + str(rc))
        builtin_client.subscribe("application/+/device/+/rx")

    # bridge the builtin broker to the external broker
    def _builtin_on_message(self, builtin_client, userdata, msg):
        print(f"builtin message: {msg.topic}")
        if self._external_connected:
            self._external_client.publish(msg.topic, msg.payload)
        else:
            print("External broker not connected, buffering message")
            self._buffer.put((msg.topic, msg.payload))

    def _builtin_on_disconnect(self, builtin_client, userdata, rc):
        print("Disconnected with result code " + str(rc))
        self._builtin_client.connect_async(self._config["builtin_broker"], 1883, 60)

    def _external_on_connect(self, external_client, userdata, flags, rc):
        print("Connected with result code " + str(rc))
        if self._buffer.qsize() > 0:
            print(f"Flushing {self._buffer.qsize()} buffered messages to external broker")
            while not self._buffer.empty():
                topic, payload = self._buffer.get()
                self._external_client.publish(topic, payload)
        self._external_connected = True

    def _external_on_disconnect(self, external_client, userdata, rc):
        print("Disconnected with result code " + str(rc))
        self._external_connected = False
        self._external_client.connect_async(self._config["builtin_broker"], 1883, 60)

program = Program()
program.start()

Hi,

Thanks for sharing, and I would like get some advices on how/where can I execute this python program? is this program setup on the portal WisGate Edge Pro V2?

Hi @ccy ,

This program can run on WisGateOS 2 as a service program. To register, copy the source and make a new file named such as “buffering.py”. Then you should replace config_file_path = "./program-config.json" with config_file_path = "/mnt/mmcblk0p1/broker-config.json". And then pull the SD card from the gateway, and insert it into your host PC. Copy the buffering.py to the SD card. Also, make a new file named “broker-config.json” and write into it as below.

{
    "builtin_broker": "localhost",
    "external_broker": "<external MQTT broker ip address>"
}

And then insert your SD card into the gateway. SSH into the gateway with ssh root@<gateway ip address>. On the gateway shell, copy the buffering.py file to /usr/bin with cp /mnt/mmcblk0p1/buffering.py /usr/bin/.

To run this program, there are two ways.

  1. Background execution

    Simply run python /usr/bin/buffering.py&. This runs the program in the background. But you should run the command every bootup.

  2. Running as a service

    Make a file named “/etc/init.d/buffering” and fill the content with below.

#!/bin/sh /etc/rc.common

START=95
STOP=10
USE_PROCD=1

SERVICE_NAME="buffering"
PYTHON_BIN="/usr/bin/python"
SCRIPT="/usr/bin/buffering.py"

start_service() {
    procd_open_instance
    procd_set_param command "$PYTHON_BIN" "$SCRIPT"
    procd_set_param respawn 3600 5 0
    procd_close_instance
}

stop_service() {
    :
}

Then run the shell commands below.

/etc/init.d/buffering enable # Execute the buffering program at boot
/etc/init.d/buffering start # Run immediately

Then try checking the uplink messages with a client app like MQTT Explorer.

Hi,

Ok let me try on this further.

By running this program and based on the default Mosquito config of the built-in MQTT broker, how it can persist the message if even the network server completely shutdown before the buffering continue send out to the external broker? Since by default the messages will just retain in memory.

Hi @ccy,

It depends on your experimentation and measurements. If you want, you can just modify the source code I provided to save the buffered messages to a file.

It sounds like your gateway isn’t queuing uplinks during the MQTT outage even with clean-session off and QoS1 enabled — some Rak firmware versions simply don’t buffer messages. You can try updating the firmware or switching to a bridge mode that forces local caching. And for sharing configs/logs easily between devices,you can use any sharing platform to transfer data quickly while testing.