From 18583149303c2c1f74dff32edefa0763580c0594 Mon Sep 17 00:00:00 2001 From: Alex J Lennon Date: Tue, 25 Apr 2023 16:20:27 +0100 Subject: [PATCH 1/3] TinyGsmClientBG96: Add Quectel MQTT stack support Signed-off-by: Alex J Lennon --- src/TinyGsmClientBG96.h | 21 +++ src/TinyGsmMQTT.tpp | 358 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 379 insertions(+) create mode 100644 src/TinyGsmMQTT.tpp diff --git a/src/TinyGsmClientBG96.h b/src/TinyGsmClientBG96.h index e6964040..7e8a6060 100644 --- a/src/TinyGsmClientBG96.h +++ b/src/TinyGsmClientBG96.h @@ -25,6 +25,7 @@ #include "TinyGsmTemperature.tpp" #include "TinyGsmTime.tpp" #include "TinyGsmNTP.tpp" +#include "TinyGsmMQTT.tpp" #define GSM_NL "\r\n" static const char GSM_OK[] TINY_GSM_PROGMEM = "OK" GSM_NL; @@ -49,6 +50,7 @@ class TinyGsmBG96 : public TinyGsmModem, public TinyGsmTCP, public TinyGsmCalling, public TinyGsmSMS, + public TinyGsmMQTT, public TinyGsmTime, public TinyGsmNTP, public TinyGsmGPS, @@ -59,6 +61,7 @@ class TinyGsmBG96 : public TinyGsmModem, friend class TinyGsmTCP; friend class TinyGsmCalling; friend class TinyGsmSMS; + friend class TinyGsmMQTT; friend class TinyGsmTime; friend class TinyGsmNTP; friend class TinyGsmGPS; @@ -677,6 +680,24 @@ class TinyGsmBG96 : public TinyGsmModem, streamSkipUntil('\n'); } data = ""; + } else if (data.endsWith(GF(GSM_NL "+QMTRECV:"))) { + + // +QMTRECV: ,,,[,]) + int clientIndex = streamGetIntBefore(','); + int msgIdRsp = streamGetIntBefore(','); + String topic = stream.readStringUntil(','); + String payload = stream.readStringUntil('\n'); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("topic: "), topic); + DBG(GF("payload: "), payload); + */ + + // TODO: We're dealing with payloads as strings. Might want to reconfigure MQTT stack to return lengths? + if(_mqttReceiveCB) + _mqttReceiveCB[clientIndex](clientIndex, msgIdRsp, topic.c_str(), payload.c_str(), strlen(payload.c_str())); } } } while (millis() - startMillis < timeout_ms); diff --git a/src/TinyGsmMQTT.tpp b/src/TinyGsmMQTT.tpp new file mode 100644 index 00000000..89080c29 --- /dev/null +++ b/src/TinyGsmMQTT.tpp @@ -0,0 +1,358 @@ +/* TODO: + + - MUX isn't working + - MUX isn't implemented correcty + - Quectel modem specific implementation needs to move into modem class + +*/ + +/** + * @file TinyGsmMQTT.tpp + * @author Alex J Lennon + * @license LGPL-3.0 + * @copyright Copyright (c) 2023 Alex J Lennon + * @date Apr 2023 + */ + +#ifndef SRC_TINYGSMMQTT_H_ +#define SRC_TINYGSMMQTT_H_ + +#include "TinyGsmCommon.h" + +#define TINY_GSM_MODEM_HAS_MQTT + +#define MQTT_DEFAULT_MUX 0 +#define MQTT_MUX 5 +#define MQTT_DEFAULT_PORT 1883 + +template +class TinyGsmMQTT { + public: + /* + * Messaging functions + */ + + bool mqttConnected() + { + return mqttConnectedImpl(MQTT_DEFAULT_MUX); + } + + bool mqttConnected(int mux) + { + return mqttConnectedImpl(mux); + } + + int mqttOpen(const char* hostName, int port) + { + return thisModem().mqttOpenImpl(MQTT_DEFAULT_MUX, hostName, port); + } + + int mqttOpen(int mux, const char* hostName, int port) + { + return thisModem().mqttOpenImpl(mux, hostName, port); + } + + int mqttClose() + { + return thisModem().mqttCloseImpl(MQTT_DEFAULT_MUX); + } + + int mqttClose(int mux) + { + return thisModem().mqttCloseImpl(mux); + } + + int mqttConnect(const char* clientId, const char* userName, const char* password) + { + return thisModem().mqttConnectImpl(MQTT_DEFAULT_MUX, clientId, userName, password); + } + + int mqttConnect(int mux, const char* clientId, const char* userName, const char* password) + { + return thisModem().mqttConnectImpl(mux, clientId, userName, password); + } + + int mqttDisconnect() + { + return thisModem().mqttDisconnectImpl(MQTT_DEFAULT_MUX); + } + + int mqttDisconnect(int mux) + { + return thisModem().mqttDisconnectImpl(mux); + } + + int mqttSubscribe(int msgId, const char* topic, int qos) + { + return thisModem().mqttSubscribeImpl(MQTT_DEFAULT_MUX, msgId, topic, qos); + } + + int mqttSubscribe(int mux, int msgId, const char* topic, int qos) + { + return thisModem().mqttSubscribeImpl(mux, msgId, topic, qos); + } + + int mqttUnsubscribe(int msgId, const char *topic) + { + return thisModem().mqttUnsubscribeImpl(MQTT_DEFAULT_MUX, msgId, topic); + } + + int mqttUnsubscribe(int mux, int msgId, const char *topic) + { + return thisModem().mqttUnsubscribeImpl(mux, msgId, topic); + } + + int mqttPublish(int msgId, int qos, bool retain, const char* topic, const char* payload, int payloadLen) + { + return thisModem().mqttPublishImpl(MQTT_DEFAULT_MUX, msgId, qos, retain, topic, payload, payloadLen); + } + + int mqttPublish(int mux, int msgId, int qos, bool retain, const char* topic, const char* payload, int payloadLen) + { + return thisModem().mqttPublishImpl(mux, msgId, qos, retain, topic, payload, payloadLen); + } + + int mqttLoop(int mux) + { + return thisModem().mqttLoopImpl(mux); + } + + int mqttLoop() + { + return thisModem().mqttLoopImpl(MQTT_DEFAULT_MUX); + } + + void mqttSetReceiveCB(void (*cb)(int mux, int msgId, const char *topic, const char *payload, int len) ) + { + _mqttReceiveCB[MQTT_DEFAULT_MUX] = cb; + } + + void mqttSetReceiveCB(int mux, void (*cb)(int mux, int msgId, const char *topic, const char *payload, int len) ) + { + _mqttReceiveCB[mux] = cb; + } + + /* + * CRTP Helper + */ + protected: + inline const modemType& thisModem() const { + return static_cast(*this); + } + inline modemType& thisModem() { + return static_cast(*this); + } + + protected: + + void (*(_mqttReceiveCB[MQTT_MUX]))(int mux, int msgId, const char *topic, const char *payload, int len); + bool mqtt_connected[MQTT_MUX]; + bool mqtt_got_data[MQTT_MUX]; + + bool mqttConnectedImpl(int mux) + { + return mqtt_connected[mux]; + } + + + int mqttOpenImpl(int id, const char* hostName, int port) + { + thisModem().sendAT(GF("+QMTOPEN="),id, GF(",\""), hostName, GF("\","), port); + + // Wait for OK + if (thisModem().waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTOPEN + if (thisModem().waitResponse(15000L, GF("+QMTOPEN:")) != 1) { + return -1; + } + + int clientIndex = thisModem().streamGetIntBefore(','); + int result = thisModem().streamGetIntLength(1); + + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("result: "), result); + + thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttCloseImpl(int mux) + { + mqtt_connected[mux] = 0; + + thisModem().sendAT(GF("+QMTCLOSE="),mux); + + // Wait for OK + if (thisModem().waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTCLOSE + if (thisModem().waitResponse(300L, GF("+QMTCLOSE:")) != 1) { + return -1; + } + + int clientIndex = thisModem().streamGetIntBefore(','); + int state = thisModem().streamGetIntLength(1); + + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("state: "), state); + + thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not + + return state; + } + + int mqttConnectImpl(int mux, const char* clientId, const char* userName, const char* password) + { + thisModem().sendAT(GF("+QMTCONN="),mux, ",\"", clientId, "\",\"", userName, "\",\"", password, "\""); + if (thisModem().waitResponse(5000L, GF("+QMTCONN:")) != 1) { + return -1; + } + + int clientIndex = thisModem().streamGetIntBefore(','); + int state = thisModem().streamGetIntLength(1); + + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("state: "), state); + + thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not + + mqtt_connected[mux] = (state == 0) ? true : false; + + return state; + } + + int mqttDisconnectImpl(int mux) + { + mqtt_connected[mux] = false; + + thisModem().sendAT(GF("+QMTDISC="),mux); + + // Wait for OK + if (thisModem().waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTDISCs + if (thisModem().waitResponse(300L, GF("+QMTDISC:")) != 1) { + return -1; + } + + int clientIndex = thisModem().streamGetIntBefore(','); + int result = thisModem().streamGetIntLength(1); + + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("result: "), result); + + thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttSubscribeImpl(int id, int msgId, const char* topic, int qos) + { + thisModem().sendAT(GF("+QMTSUB="),id, ",", msgId, ",\"", topic, "\",", qos); + + // Wait for OK + if (thisModem().waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTSUB + if (thisModem().waitResponse(15000L, GF("+QMTSUB:")) != 1) { + return -1; + } + + int clientIndex = thisModem().streamGetIntBefore(','); + int msgIdRsp = thisModem().streamGetIntBefore(','); + int result = thisModem().streamGetIntLength(1); + // Ignore optional value + + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("result: "), result); + + thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttUnsubscribeImpl(int id, int msgId, const char *topic) + { + thisModem().sendAT(GF("+QMTUNS="),id, msgId, topic); + + // Wait for OK + if (thisModem().waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTUNS + if (thisModem().waitResponse(15000L, GF("+QMTUNS:")) != 1) { + return -1; + } + + int clientIndex = thisModem().streamGetIntBefore(','); + int msgIdRsp = thisModem().streamGetIntBefore(','); + int result = thisModem().streamGetIntLength(1); + + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("result: "), result); + + thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttPublishImpl(int mux, int msgId, int qos, bool retain, const char* topic, const char *payload, int payloadLen) + { + thisModem().sendAT(GF("+QMTPUB="),mux, ",", msgId, ",", qos, ",", retain, ",\"", topic, "\",", payloadLen); + if (thisModem().waitResponse(GF(">")) != 1) { return -1; } + + // TODO: This isn't going to work for payloads with 0's in them ? + + thisModem().stream.print(payload); // Actually send the message + thisModem().stream.write(static_cast(0x1A)); // Terminate the message + thisModem().stream.flush(); + + // Wait for OK + if (thisModem().waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTPUB + if (thisModem().waitResponse(60000L, GF("+QMTPUB:")) != 1) { + return -1; + } + + int clientIndex = thisModem().streamGetIntBefore(','); + int msgIdRsp = thisModem().streamGetIntBefore(','); + int result = thisModem().streamGetIntLength(1); + + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("result: "), result); + + thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttLoopImpl(int mux) + { + // Response is handled in modem implementation + thisModem().sendAT(GF("+QMTRECV="), mux); + + // TODO: +QMTRECV always seems to return an ERROR not OK ? + thisModem().waitResponse(); + + return 0; + } +}; + +#endif // SRC_TINYGSMSMS_H_ From 80cf381b452d60140f4918f59214f16cb207f849 Mon Sep 17 00:00:00 2001 From: Alex J Lennon Date: Tue, 25 Apr 2023 16:50:56 +0100 Subject: [PATCH 2/3] Move MQTT implementation specifics into BG96 code Signed-off-by: Alex J Lennon --- src/TinyGsmClientBG96.h | 219 ++++++++++++++++++++++++++++++++++++++++ src/TinyGsmMQTT.tpp | 200 ------------------------------------ 2 files changed, 219 insertions(+), 200 deletions(-) diff --git a/src/TinyGsmClientBG96.h b/src/TinyGsmClientBG96.h index 7e8a6060..9ef5e719 100644 --- a/src/TinyGsmClientBG96.h +++ b/src/TinyGsmClientBG96.h @@ -609,6 +609,225 @@ class TinyGsmBG96 : public TinyGsmModem, return 2 == res; } + /* + * MQTT related functions + */ + protected: + + int mqttOpenImpl(int id, const char* hostName, int port) + { + sendAT(GF("+QMTOPEN="),id, GF(",\""), hostName, GF("\","), port); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTOPEN + if (waitResponse(15000L, GF("+QMTOPEN:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int result = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttCloseImpl(int mux) + { + mqtt_connected[mux] = 0; + + sendAT(GF("+QMTCLOSE="),mux); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTCLOSE + if (waitResponse(300L, GF("+QMTCLOSE:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int state = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("state: "), state); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return state; + } + + int mqttConnectImpl(int mux, const char* clientId, const char* userName, const char* password) + { + sendAT(GF("+QMTCONN="),mux, ",\"", clientId, "\",\"", userName, "\",\"", password, "\""); + if (waitResponse(5000L, GF("+QMTCONN:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int state = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("state: "), state); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + mqtt_connected[mux] = (state == 0) ? true : false; + + return state; + } + + int mqttDisconnectImpl(int mux) + { + mqtt_connected[mux] = false; + + sendAT(GF("+QMTDISC="),mux); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTDISCs + if (waitResponse(300L, GF("+QMTDISC:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int result = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttSubscribeImpl(int id, int msgId, const char* topic, int qos) + { + sendAT(GF("+QMTSUB="),id, ",", msgId, ",\"", topic, "\",", qos); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTSUB + if (waitResponse(15000L, GF("+QMTSUB:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int msgIdRsp = streamGetIntBefore(','); + int result = streamGetIntLength(1); + // Ignore optional value + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttUnsubscribeImpl(int id, int msgId, const char *topic) + { + sendAT(GF("+QMTUNS="),id, msgId, topic); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTUNS + if (waitResponse(15000L, GF("+QMTUNS:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int msgIdRsp = streamGetIntBefore(','); + int result = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttPublishImpl(int mux, int msgId, int qos, bool retain, const char* topic, const char *payload, int payloadLen) + { + sendAT(GF("+QMTPUB="),mux, ",", msgId, ",", qos, ",", retain, ",\"", topic, "\",", payloadLen); + if (waitResponse(GF(">")) != 1) { return -1; } + + // TODO: This isn't going to work for payloads with 0's in them ? + + stream.print(payload); // Actually send the message + stream.write(static_cast(0x1A)); // Terminate the message + stream.flush(); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTPUB + if (waitResponse(60000L, GF("+QMTPUB:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int msgIdRsp = streamGetIntBefore(','); + int result = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttLoopImpl(int mux) + { + // Response is handled in modem implementation + sendAT(GF("+QMTRECV="), mux); + + // TODO: +QMTRECV always seems to return an ERROR not OK ? + // NB. Modem Info: Quectel BG600L-M3 Revision: BG600LM3LAR02A04 + waitResponse(); + + return 0; + } + /* * Utilities */ diff --git a/src/TinyGsmMQTT.tpp b/src/TinyGsmMQTT.tpp index 89080c29..eac50c77 100644 --- a/src/TinyGsmMQTT.tpp +++ b/src/TinyGsmMQTT.tpp @@ -153,206 +153,6 @@ class TinyGsmMQTT { { return mqtt_connected[mux]; } - - - int mqttOpenImpl(int id, const char* hostName, int port) - { - thisModem().sendAT(GF("+QMTOPEN="),id, GF(",\""), hostName, GF("\","), port); - - // Wait for OK - if (thisModem().waitResponse(300L) != 1) { - return -1; - } - - // Wait for +QMTOPEN - if (thisModem().waitResponse(15000L, GF("+QMTOPEN:")) != 1) { - return -1; - } - - int clientIndex = thisModem().streamGetIntBefore(','); - int result = thisModem().streamGetIntLength(1); - - DBG(GF("clientIndex: "), clientIndex); - DBG(GF("result: "), result); - - thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not - - return result; - } - - int mqttCloseImpl(int mux) - { - mqtt_connected[mux] = 0; - - thisModem().sendAT(GF("+QMTCLOSE="),mux); - - // Wait for OK - if (thisModem().waitResponse(300L) != 1) { - return -1; - } - - // Wait for +QMTCLOSE - if (thisModem().waitResponse(300L, GF("+QMTCLOSE:")) != 1) { - return -1; - } - - int clientIndex = thisModem().streamGetIntBefore(','); - int state = thisModem().streamGetIntLength(1); - - DBG(GF("clientIndex: "), clientIndex); - DBG(GF("state: "), state); - - thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not - - return state; - } - - int mqttConnectImpl(int mux, const char* clientId, const char* userName, const char* password) - { - thisModem().sendAT(GF("+QMTCONN="),mux, ",\"", clientId, "\",\"", userName, "\",\"", password, "\""); - if (thisModem().waitResponse(5000L, GF("+QMTCONN:")) != 1) { - return -1; - } - - int clientIndex = thisModem().streamGetIntBefore(','); - int state = thisModem().streamGetIntLength(1); - - DBG(GF("clientIndex: "), clientIndex); - DBG(GF("state: "), state); - - thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not - - mqtt_connected[mux] = (state == 0) ? true : false; - - return state; - } - - int mqttDisconnectImpl(int mux) - { - mqtt_connected[mux] = false; - - thisModem().sendAT(GF("+QMTDISC="),mux); - - // Wait for OK - if (thisModem().waitResponse(300L) != 1) { - return -1; - } - - // Wait for +QMTDISCs - if (thisModem().waitResponse(300L, GF("+QMTDISC:")) != 1) { - return -1; - } - - int clientIndex = thisModem().streamGetIntBefore(','); - int result = thisModem().streamGetIntLength(1); - - DBG(GF("clientIndex: "), clientIndex); - DBG(GF("result: "), result); - - thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not - - return result; - } - - int mqttSubscribeImpl(int id, int msgId, const char* topic, int qos) - { - thisModem().sendAT(GF("+QMTSUB="),id, ",", msgId, ",\"", topic, "\",", qos); - - // Wait for OK - if (thisModem().waitResponse(300L) != 1) { - return -1; - } - - // Wait for +QMTSUB - if (thisModem().waitResponse(15000L, GF("+QMTSUB:")) != 1) { - return -1; - } - - int clientIndex = thisModem().streamGetIntBefore(','); - int msgIdRsp = thisModem().streamGetIntBefore(','); - int result = thisModem().streamGetIntLength(1); - // Ignore optional value - - DBG(GF("clientIndex: "), clientIndex); - DBG(GF("msgId: "), msgIdRsp); - DBG(GF("result: "), result); - - thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not - - return result; - } - - int mqttUnsubscribeImpl(int id, int msgId, const char *topic) - { - thisModem().sendAT(GF("+QMTUNS="),id, msgId, topic); - - // Wait for OK - if (thisModem().waitResponse(300L) != 1) { - return -1; - } - - // Wait for +QMTUNS - if (thisModem().waitResponse(15000L, GF("+QMTUNS:")) != 1) { - return -1; - } - - int clientIndex = thisModem().streamGetIntBefore(','); - int msgIdRsp = thisModem().streamGetIntBefore(','); - int result = thisModem().streamGetIntLength(1); - - DBG(GF("clientIndex: "), clientIndex); - DBG(GF("msgId: "), msgIdRsp); - DBG(GF("result: "), result); - - thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not - - return result; - } - - int mqttPublishImpl(int mux, int msgId, int qos, bool retain, const char* topic, const char *payload, int payloadLen) - { - thisModem().sendAT(GF("+QMTPUB="),mux, ",", msgId, ",", qos, ",", retain, ",\"", topic, "\",", payloadLen); - if (thisModem().waitResponse(GF(">")) != 1) { return -1; } - - // TODO: This isn't going to work for payloads with 0's in them ? - - thisModem().stream.print(payload); // Actually send the message - thisModem().stream.write(static_cast(0x1A)); // Terminate the message - thisModem().stream.flush(); - - // Wait for OK - if (thisModem().waitResponse(300L) != 1) { - return -1; - } - - // Wait for +QMTPUB - if (thisModem().waitResponse(60000L, GF("+QMTPUB:")) != 1) { - return -1; - } - - int clientIndex = thisModem().streamGetIntBefore(','); - int msgIdRsp = thisModem().streamGetIntBefore(','); - int result = thisModem().streamGetIntLength(1); - - DBG(GF("clientIndex: "), clientIndex); - DBG(GF("msgId: "), msgIdRsp); - DBG(GF("result: "), result); - - thisModem().streamSkipUntil('\n'); // The error code of the operation. If it is not - - return result; - } - - int mqttLoopImpl(int mux) - { - // Response is handled in modem implementation - thisModem().sendAT(GF("+QMTRECV="), mux); - - // TODO: +QMTRECV always seems to return an ERROR not OK ? - thisModem().waitResponse(); - - return 0; - } }; #endif // SRC_TINYGSMSMS_H_ From 29e92a490505e603574080d116075de066c8a92a Mon Sep 17 00:00:00 2001 From: Alex J Lennon Date: Tue, 25 Apr 2023 17:07:09 +0100 Subject: [PATCH 3/3] Add support for setting MQTT keep alive Signed-off-by: Alex J Lennon --- src/TinyGsmClientBG96.h | 9 +++++++++ src/TinyGsmMQTT.tpp | 10 ++++++++++ 2 files changed, 19 insertions(+) diff --git a/src/TinyGsmClientBG96.h b/src/TinyGsmClientBG96.h index 9ef5e719..989f6e81 100644 --- a/src/TinyGsmClientBG96.h +++ b/src/TinyGsmClientBG96.h @@ -828,6 +828,15 @@ class TinyGsmBG96 : public TinyGsmModem, return 0; } + int mqttSetKeepAliveImpl(int mux, int keep_alive_secs) + { + // Response is handled in modem implementation + sendAT(GF("+QMTCFG=\"keepalive\","), mux, ",", keep_alive_secs); + + // Wait for OK + return waitResponse() == 1 ? 0 : -1; + } + /* * Utilities */ diff --git a/src/TinyGsmMQTT.tpp b/src/TinyGsmMQTT.tpp index eac50c77..6cb8f7bd 100644 --- a/src/TinyGsmMQTT.tpp +++ b/src/TinyGsmMQTT.tpp @@ -122,6 +122,16 @@ class TinyGsmMQTT { return thisModem().mqttLoopImpl(MQTT_DEFAULT_MUX); } + int mqttSetKeepAlive(int keep_alive_secs) + { + return thisModem().mqttSetKeepAliveImpl(MQTT_DEFAULT_MUX, keep_alive_secs); + } + + int mqttSetKeepAlive(int mux, int keep_alive_secs) + { + return thisModem().mqttSetKeepAliveImpl(mux, keep_alive_secs); + } + void mqttSetReceiveCB(void (*cb)(int mux, int msgId, const char *topic, const char *payload, int len) ) { _mqttReceiveCB[MQTT_DEFAULT_MUX] = cb;