diff --git a/src/TinyGsmClientBG96.h b/src/TinyGsmClientBG96.h index e6964040..989f6e81 100644 --- a/src/TinyGsmClientBG96.h +++ b/src/TinyGsmClientBG96.h @@ -25,6 +25,7 @@ #include "TinyGsmTemperature.tpp" #include "TinyGsmTime.tpp" #include "TinyGsmNTP.tpp" +#include "TinyGsmMQTT.tpp" #define GSM_NL "\r\n" static const char GSM_OK[] TINY_GSM_PROGMEM = "OK" GSM_NL; @@ -49,6 +50,7 @@ class TinyGsmBG96 : public TinyGsmModem, public TinyGsmTCP, public TinyGsmCalling, public TinyGsmSMS, + public TinyGsmMQTT, public TinyGsmTime, public TinyGsmNTP, public TinyGsmGPS, @@ -59,6 +61,7 @@ class TinyGsmBG96 : public TinyGsmModem, friend class TinyGsmTCP; friend class TinyGsmCalling; friend class TinyGsmSMS; + friend class TinyGsmMQTT; friend class TinyGsmTime; friend class TinyGsmNTP; friend class TinyGsmGPS; @@ -606,6 +609,234 @@ class TinyGsmBG96 : public TinyGsmModem, return 2 == res; } + /* + * MQTT related functions + */ + protected: + + int mqttOpenImpl(int id, const char* hostName, int port) + { + sendAT(GF("+QMTOPEN="),id, GF(",\""), hostName, GF("\","), port); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTOPEN + if (waitResponse(15000L, GF("+QMTOPEN:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int result = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttCloseImpl(int mux) + { + mqtt_connected[mux] = 0; + + sendAT(GF("+QMTCLOSE="),mux); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTCLOSE + if (waitResponse(300L, GF("+QMTCLOSE:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int state = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("state: "), state); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return state; + } + + int mqttConnectImpl(int mux, const char* clientId, const char* userName, const char* password) + { + sendAT(GF("+QMTCONN="),mux, ",\"", clientId, "\",\"", userName, "\",\"", password, "\""); + if (waitResponse(5000L, GF("+QMTCONN:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int state = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("state: "), state); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + mqtt_connected[mux] = (state == 0) ? true : false; + + return state; + } + + int mqttDisconnectImpl(int mux) + { + mqtt_connected[mux] = false; + + sendAT(GF("+QMTDISC="),mux); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTDISCs + if (waitResponse(300L, GF("+QMTDISC:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int result = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttSubscribeImpl(int id, int msgId, const char* topic, int qos) + { + sendAT(GF("+QMTSUB="),id, ",", msgId, ",\"", topic, "\",", qos); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTSUB + if (waitResponse(15000L, GF("+QMTSUB:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int msgIdRsp = streamGetIntBefore(','); + int result = streamGetIntLength(1); + // Ignore optional value + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttUnsubscribeImpl(int id, int msgId, const char *topic) + { + sendAT(GF("+QMTUNS="),id, msgId, topic); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTUNS + if (waitResponse(15000L, GF("+QMTUNS:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int msgIdRsp = streamGetIntBefore(','); + int result = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttPublishImpl(int mux, int msgId, int qos, bool retain, const char* topic, const char *payload, int payloadLen) + { + sendAT(GF("+QMTPUB="),mux, ",", msgId, ",", qos, ",", retain, ",\"", topic, "\",", payloadLen); + if (waitResponse(GF(">")) != 1) { return -1; } + + // TODO: This isn't going to work for payloads with 0's in them ? + + stream.print(payload); // Actually send the message + stream.write(static_cast(0x1A)); // Terminate the message + stream.flush(); + + // Wait for OK + if (waitResponse(300L) != 1) { + return -1; + } + + // Wait for +QMTPUB + if (waitResponse(60000L, GF("+QMTPUB:")) != 1) { + return -1; + } + + int clientIndex = streamGetIntBefore(','); + int msgIdRsp = streamGetIntBefore(','); + int result = streamGetIntLength(1); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("result: "), result); + */ + + streamSkipUntil('\n'); // The error code of the operation. If it is not + + return result; + } + + int mqttLoopImpl(int mux) + { + // Response is handled in modem implementation + sendAT(GF("+QMTRECV="), mux); + + // TODO: +QMTRECV always seems to return an ERROR not OK ? + // NB. Modem Info: Quectel BG600L-M3 Revision: BG600LM3LAR02A04 + waitResponse(); + + return 0; + } + + int mqttSetKeepAliveImpl(int mux, int keep_alive_secs) + { + // Response is handled in modem implementation + sendAT(GF("+QMTCFG=\"keepalive\","), mux, ",", keep_alive_secs); + + // Wait for OK + return waitResponse() == 1 ? 0 : -1; + } + /* * Utilities */ @@ -677,6 +908,24 @@ class TinyGsmBG96 : public TinyGsmModem, streamSkipUntil('\n'); } data = ""; + } else if (data.endsWith(GF(GSM_NL "+QMTRECV:"))) { + + // +QMTRECV: ,,,[,]) + int clientIndex = streamGetIntBefore(','); + int msgIdRsp = streamGetIntBefore(','); + String topic = stream.readStringUntil(','); + String payload = stream.readStringUntil('\n'); + + /* + DBG(GF("clientIndex: "), clientIndex); + DBG(GF("msgId: "), msgIdRsp); + DBG(GF("topic: "), topic); + DBG(GF("payload: "), payload); + */ + + // TODO: We're dealing with payloads as strings. Might want to reconfigure MQTT stack to return lengths? + if(_mqttReceiveCB) + _mqttReceiveCB[clientIndex](clientIndex, msgIdRsp, topic.c_str(), payload.c_str(), strlen(payload.c_str())); } } } while (millis() - startMillis < timeout_ms); diff --git a/src/TinyGsmMQTT.tpp b/src/TinyGsmMQTT.tpp new file mode 100644 index 00000000..6cb8f7bd --- /dev/null +++ b/src/TinyGsmMQTT.tpp @@ -0,0 +1,168 @@ +/* TODO: + + - MUX isn't working + - MUX isn't implemented correcty + - Quectel modem specific implementation needs to move into modem class + +*/ + +/** + * @file TinyGsmMQTT.tpp + * @author Alex J Lennon + * @license LGPL-3.0 + * @copyright Copyright (c) 2023 Alex J Lennon + * @date Apr 2023 + */ + +#ifndef SRC_TINYGSMMQTT_H_ +#define SRC_TINYGSMMQTT_H_ + +#include "TinyGsmCommon.h" + +#define TINY_GSM_MODEM_HAS_MQTT + +#define MQTT_DEFAULT_MUX 0 +#define MQTT_MUX 5 +#define MQTT_DEFAULT_PORT 1883 + +template +class TinyGsmMQTT { + public: + /* + * Messaging functions + */ + + bool mqttConnected() + { + return mqttConnectedImpl(MQTT_DEFAULT_MUX); + } + + bool mqttConnected(int mux) + { + return mqttConnectedImpl(mux); + } + + int mqttOpen(const char* hostName, int port) + { + return thisModem().mqttOpenImpl(MQTT_DEFAULT_MUX, hostName, port); + } + + int mqttOpen(int mux, const char* hostName, int port) + { + return thisModem().mqttOpenImpl(mux, hostName, port); + } + + int mqttClose() + { + return thisModem().mqttCloseImpl(MQTT_DEFAULT_MUX); + } + + int mqttClose(int mux) + { + return thisModem().mqttCloseImpl(mux); + } + + int mqttConnect(const char* clientId, const char* userName, const char* password) + { + return thisModem().mqttConnectImpl(MQTT_DEFAULT_MUX, clientId, userName, password); + } + + int mqttConnect(int mux, const char* clientId, const char* userName, const char* password) + { + return thisModem().mqttConnectImpl(mux, clientId, userName, password); + } + + int mqttDisconnect() + { + return thisModem().mqttDisconnectImpl(MQTT_DEFAULT_MUX); + } + + int mqttDisconnect(int mux) + { + return thisModem().mqttDisconnectImpl(mux); + } + + int mqttSubscribe(int msgId, const char* topic, int qos) + { + return thisModem().mqttSubscribeImpl(MQTT_DEFAULT_MUX, msgId, topic, qos); + } + + int mqttSubscribe(int mux, int msgId, const char* topic, int qos) + { + return thisModem().mqttSubscribeImpl(mux, msgId, topic, qos); + } + + int mqttUnsubscribe(int msgId, const char *topic) + { + return thisModem().mqttUnsubscribeImpl(MQTT_DEFAULT_MUX, msgId, topic); + } + + int mqttUnsubscribe(int mux, int msgId, const char *topic) + { + return thisModem().mqttUnsubscribeImpl(mux, msgId, topic); + } + + int mqttPublish(int msgId, int qos, bool retain, const char* topic, const char* payload, int payloadLen) + { + return thisModem().mqttPublishImpl(MQTT_DEFAULT_MUX, msgId, qos, retain, topic, payload, payloadLen); + } + + int mqttPublish(int mux, int msgId, int qos, bool retain, const char* topic, const char* payload, int payloadLen) + { + return thisModem().mqttPublishImpl(mux, msgId, qos, retain, topic, payload, payloadLen); + } + + int mqttLoop(int mux) + { + return thisModem().mqttLoopImpl(mux); + } + + int mqttLoop() + { + return thisModem().mqttLoopImpl(MQTT_DEFAULT_MUX); + } + + int mqttSetKeepAlive(int keep_alive_secs) + { + return thisModem().mqttSetKeepAliveImpl(MQTT_DEFAULT_MUX, keep_alive_secs); + } + + int mqttSetKeepAlive(int mux, int keep_alive_secs) + { + return thisModem().mqttSetKeepAliveImpl(mux, keep_alive_secs); + } + + void mqttSetReceiveCB(void (*cb)(int mux, int msgId, const char *topic, const char *payload, int len) ) + { + _mqttReceiveCB[MQTT_DEFAULT_MUX] = cb; + } + + void mqttSetReceiveCB(int mux, void (*cb)(int mux, int msgId, const char *topic, const char *payload, int len) ) + { + _mqttReceiveCB[mux] = cb; + } + + /* + * CRTP Helper + */ + protected: + inline const modemType& thisModem() const { + return static_cast(*this); + } + inline modemType& thisModem() { + return static_cast(*this); + } + + protected: + + void (*(_mqttReceiveCB[MQTT_MUX]))(int mux, int msgId, const char *topic, const char *payload, int len); + bool mqtt_connected[MQTT_MUX]; + bool mqtt_got_data[MQTT_MUX]; + + bool mqttConnectedImpl(int mux) + { + return mqtt_connected[mux]; + } +}; + +#endif // SRC_TINYGSMSMS_H_