Link.ONE MQTT Disconnects Randomly

Hello everyone,

I am creating a “gateway” which receives messages over LoRa and pushes them to Ubidots. The LoRa and MQTT side work reliably except for the disconnections, which give me a “+QMTSTAT: 0,1” periodically, indicating that the connection is closed by the peer. I am little confused by the terminology; for instance, does that mean the modem is closing the connection or that Ubidots is, or something else entirely? There appears to not be any problem with the cellular network, as checking Hologram during one of the disconnects shows it still live.

As of right now my solution is to just run the connection command again every 6 minutes or so, which always connects back reliably. I’d prefer not to do this forever if possible, and am wondering if anyone here has any insight into this issue. If I should be asking somewhere else please let me know!

I am using the Link.ONE WisTrio. The code I am using is pasted below, with some things omitted for privacy. Nothing is omitted which affects the AT commands used. Anywhere that says “PL()” is a macro for Serial.println.

#define DEBUG_DEVICE
#include "ArduinoJson-v6.21.3-custom.h"
#include <CircularBuffer.h>
#include <unordered_map>

// CONSTANTS
constexpr uint8_t BG77_POWER_KEY = WB_IO1;

// info for mqtt connection
constexpr char *SERVER = "\"169.55.61.243\""; // direct link not working as of 10/25
// constexpr char *SERVER = "\"industrial.api.ubidots.com\"";
constexpr char *PORT = "1883";
constexpr char *USERNAME = "\"API KEY OMITTED\"";
constexpr char *TOPIC = "/v1.6/devices/";

constexpr int TIME_TO_TX =
    3600000; // time for gateway to send info packet to ubidots
constexpr int TIME_TO_RECONN =
    420000; // time for gateway to reconnect to ubidots
// constexpr int TIME_TO_TX =
//     30000; // time for gateway to send info packet to ubidots

// lora stuff
constexpr uint32_t US_FREQ = 915000000;
constexpr uint16_t sf = 12, bw = 1, cr = 0, preamble = 8, txPower = 22;

// GLOBALS
GatewayInfo infoPacket;                         // info aboout gateway
CxInfoLoRa cxinfo;                              // packet for new lora info
Packet newData;                                 // packet for received info
CircularBuffer<Packet, UINT8_MAX> mainDataBuff; // static queue for rx packets
unordered_map<uint16_t, time_t>
    uniqueDevList; // table for amount of devices that have talked to gateway
time_t curr_tx_timer = 0;     // timer for sending gateway stuff
time_t curr_reconn_timer = 0; // timer for reconnecting
bool firstBoot = true;        // to use to do stuff on first boot

// FUNCTIONS
// read response from modem
String BG77_read(time_t timeout_ms) {
  String resp = "";
  time_t timeStamp = millis();
  while ((millis() - timeStamp) < timeout_ms) {
    if (Serial1.available() > 0) {
      resp += (char)Serial1.read();
      delay(1);
    }
  }
  PL(resp);
  return resp;
}

// write commnads to modem.
void BG77_write(const char *command) {
  while (*command) {
    Serial1.write(*command);
    command++;
  }
  Serial1.println();
}

// send packet of info to ubidots
void cell_cmds(const char *jsonString, const char *id) {
  // AT CMDS FOR BG77
  String command = "AT+QMTPUBEX=0,0,0,0,\"";
  command += TOPIC;
  command += id;
  command += "\",\"";
  command += jsonString;
  command += "\"\r";

  BG77_write(command.c_str());
  BG77_read(3000);
}

// generic function to send any form of struct to JSON
template <typename T> void send_to_cell(T infoPacket) {}
template <> void send_to_cell<GatewayInfo>(const GatewayInfo infoPacket) {
  StaticJsonDocument<256> infoJSON;
  infoJSON["eui"] = infoPacket.node_id;
  infoJSON["r"] = 1; // generic heartbeat send
  infoJSON["b"] = double(infoPacket.batteryVoltage / 1000.0);
  infoJSON["s1"] = infoPacket.numDevicesConnected;
  infoJSON["s2"] = infoPacket.waterDetected;
  infoJSON["s3"] = infoPacket.mainsPowerConnected;
  infoJSON["s4"] = UINT16_MAX;
  infoJSON["s5"] = UINT16_MAX;
  infoJSON["dt"] = infoPacket.device_type;

  char jsonString[256];
  serializeJson(infoJSON, jsonString);
  PL(jsonString);

  cell_cmds(jsonString, infoPacket.node_id);
}
template <> void send_to_cell<Packet>(const Packet dataPacket) {
  StaticJsonDocument<300> packetJSON;
  packetJSON["r"] = dataPacket.reason;
  packetJSON["dt"] = dataPacket.device_type;
  packetJSON["eui"] = dataPacket.node_id;
  packetJSON["s1"] = dataPacket.sns_reading_1;
  packetJSON["s2"] = dataPacket.sns_reading_2;
  packetJSON["s3"] = dataPacket.sns_reading_3;
  packetJSON["s4"] = dataPacket.sns_reading_4;
  packetJSON["s5"] = dataPacket.sns_reading_5;
  packetJSON["b"] = double(dataPacket.batt_reading / 1000.0);
  packetJSON["snr"] = dataPacket.snr;
  packetJSON["rssi"] = dataPacket.rssi;

  char jsonString[300];
  serializeJson(packetJSON, jsonString);
  PL(jsonString);

  cell_cmds(jsonString, dataPacket.node_id);
}

// lora rx callback
void recv_cb(rui_lora_p2p_recv_t data) {
  memcpy(&newData, data.Buffer, data.BufferSize);
  newData.rssi = data.Rssi;
  newData.snr = data.Snr;
  mainDataBuff.unshift(newData);
}

// check if cellular is connected
bool checkConnection() {
  // wait until cell is registered
  String command = "AT+CREG?\r";
  while (true) {
    BG77_write(command.c_str());
    String bg_rsp = BG77_read(300);
    {
      int v = bg_rsp.indexOf('0,');
      if (v > 0) {
        if (bg_rsp[v + 1] == '5') {
          PL("Registered: Roaming");
          return true;
        } else if (bg_rsp[v + 1] == '1') {
          PL("Registered: Home");
          return true;
        }
      }
    }
  }
}

// setup modem
void configureCell(const char *id) {
  // turn verbose errors on
  String command = "AT+CMEE=2\r";
  BG77_write(command.c_str());
  BG77_read(300);

  // disable gps
  command = "AT+QGPSEND\r";
  BG77_write(command.c_str());
  BG77_read(300);

  // set full functionality of radio
  command = "AT+CFUN=1,0\r";
  BG77_write(command.c_str());
  BG77_read(1000);

  // check if sim is ready or needs password
  // should return +CPIN: READY
  command = "AT+CPIN?\r";
  BG77_write(command.c_str());
  BG77_read(1000);

  command = "AT+COPS=0\r";
  BG77_write(command.c_str());
  BG77_read(1000);

  // set emtc lte mode
  command = "AT+QCFG=\"iotopmode\",0,1\r";
  BG77_write(command.c_str());
  BG77_read(1000);

  // check signal strength
  command = "AT+CSQ\r";
  BG77_write(command.c_str());
  BG77_read(1000);

  // check sim operator?
  command = "AT+COPS?\r";
  BG77_write(command.c_str());
  BG77_read(1000);

  command = "AT+CGATT=1\r";
  BG77_write(command.c_str());
  BG77_read(1000);

  // check connection to service
  checkConnection();

  // check current network info
  command = "AT+QNWINFO\r";
  BG77_write(command.c_str());
  BG77_read(3000);

  // open network for mqtt client
  command = "AT+QMTOPEN=0,";
  command += SERVER;
  command += ",";
  command += PORT;
  command += "\r";
  BG77_write(command.c_str());
  BG77_read(30000);

  // checking to make sure were good
  command = "AT+QMTOPEN?\r";
  BG77_write(command.c_str());
  BG77_read(30000);

  // connect to client
  command = "AT+QMTCONN=0,\"";
  command += "CMPS";
  command += __RAK_NODE_ID_LOWER;
  command += "\",";
  command += USERNAME;
  command += ",\"\"\r"; // password = ""
  BG77_write(command.c_str());
  BG77_read(30000);

  // check if we connected
  command = "AT+QMTCONN?\r";
  BG77_write(command.c_str());
  BG77_read(10000);
}

// if modem off, wake it up
void wakeupCell() {
  time_t timeout = millis();
  bool moduleSleeps = true;

  Serial1.println("ATI");

  while ((millis() - timeout) < 6000) {
    if (Serial1.available()) {
      String result = Serial1.readString();
      PL("Modem response after start:");
      PL(result);
      moduleSleeps = false;
    }
  }
  if (moduleSleeps) {
    // Module slept, wake it up
    pinMode(BG77_POWER_KEY, OUTPUT);
    digitalWrite(BG77_POWER_KEY, 0);
    delay(1000);
    digitalWrite(BG77_POWER_KEY, 1);
    delay(2000);
    digitalWrite(BG77_POWER_KEY, 0);
    delay(1000);
  }
  PL("BG77 power up!");
}

void setup() {
  CN(115200);
  Serial1.begin(115200);
  delay(1000);

  // setup device info
  infoPacket.device_type = Gateway; // 0 = gateway
  ID_Builder(infoPacket.node_id);
  // TOPIC += infoPacket.node_id;

  wakeupCell();
  configureCell(infoPacket.node_id);

  // setup default settings
  if (api.lorawan.nwm.get() != RAK_LORA_P2P) {
    PL("Setting device into P2P mode and restarting..");
    api.lorawan.nwm.set(RAK_LORA_P2P);
    api.system.reboot();
  }

  PL("Configuring");
  api.lorawan.pfreq.set(US_FREQ);
  api.lorawan.psf.set(sf);
  api.lorawan.pbw.set(bw);
  api.lorawan.pcr.set(cr);
  api.lorawan.ppl.set(preamble);
  api.lorawan.ptp.set(txPower);

  cxinfo.RESERVED = 0x0;
  cxinfo.sf = sf;
  cxinfo.bw = bw;
  cxinfo.cr = cr;
  cxinfo.txPower = txPower;

  // register callbacks
  api.lorawan.registerPRecvCallback(recv_cb);
  api.lorawan.precv(65533);

  // TODO: set pins for water interrupts
  // TODO: setup adc
}

void loop() {
  if (false) // TODO: commissioning mode again
  {          // commissioning mode is active
    api.lorawan.psend(sizeof(cxinfo), (uint8_t *)&cxinfo);
    delay(10000); // wait 10 seconds inbetween sends
  } else {
    while (!mainDataBuff.isEmpty()) {
      // get node from queue and remove
      Packet currData = mainDataBuff.pop();
      const char *lastFourChars =
          &currData.node_id[strnlen(currData.node_id, 17) - 4];
      uniqueDevList[atoi(lastFourChars)] = millis();

      // send info over mqtt to ubidots
      send_to_cell(currData);
    }

    if (millis() - curr_tx_timer > TIME_TO_TX || firstBoot) {
      // check if any devices have stopped communicating
      for (const auto &iterator : uniqueDevList) {
        // if not heard from in x minutes, remove from list
        if (millis() - iterator.second > 18000) // > 3600000) // one hour
        {
          uniqueDevList.erase(iterator.first);
        }
      }

      // get info to send
      firstBoot = false;
      infoPacket.numDevicesConnected = uniqueDevList.size();
      // info.batteryVoltage = []
      // {
      // 	uint16_t voltage_adc = analogRead(BATT_READ);
      // 	return (uint16_t)((3.3 / 4096) * voltage_adc) * 1000;
      // }();
      // info.mainsPowerConnected = !digitalRead(PGOOD_LOW);

      infoPacket.batteryVoltage = uint16_t(3.3F * 1000); //! set as 3.3V for now
      infoPacket.mainsPowerConnected = 1; //! leave as true for now

      // send info over mqtt to ubidots
      send_to_cell(infoPacket);
      infoPacket.waterDetected = 0;

      // reset timer
      curr_tx_timer = millis();
    }

    if (millis() - curr_reconn_timer > TIME_TO_RECONN) {
      String command = "AT+QMTOPEN=0,";
      command += SERVER;
      command += ",";
      command += PORT;
      command += "\r";
      BG77_write(command.c_str());
      BG77_read(10000);

      command = "AT+QMTCONN=0,\"";
      command += "CMPS";
      command += __RAK_NODE_ID_LOWER;
      command += "\",";
      command += USERNAME;
      command += ",\"\"\r"; // password = ""
      BG77_write(command.c_str());
      BG77_read(10000);

      curr_reconn_timer = millis();
    }
  }
}

Thank you all for your time!

Welcome back to forum @dirkt68 !

Interesting project on creating your own custom LoRa network implementation!

With regards to the MQTT issue of BG77, it might be best to share the MQTT AT commands you use directly to Quectel and have them take a look. There could be a setting that needs to be setup to keep the connection active always.

Carl,

Thank you for the quick reply. I have also posted the same thread at Quectel’s forum, and am still waiting to hear back from them. Will update here if a solution is found so everyone can see it!

1 Like

I have the same problem with RAK5860 and no solution until now.

Check Quectel’s BG95&BG77&BG600L_Series_MQTT_Application_Note. Page 6 charts the MQTT sequence. You may download a copy after signing up at quectel’s site. At connection time two timers are started and the modem regularly sends pingreq to hold open the mqtt client connection.

Hello guys

you might want to look into MQTT keepalive parameter. E.g.

AT+QMTCFG="keepalive",0,300

Thanks
Felix