Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: TinyGsmClientBG96: Add Quectel MQTT stack support #726

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 249 additions & 0 deletions src/TinyGsmClientBG96.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "TinyGsmTemperature.tpp"
#include "TinyGsmTime.tpp"
#include "TinyGsmNTP.tpp"
#include "TinyGsmMQTT.tpp"

#define GSM_NL "\r\n"
static const char GSM_OK[] TINY_GSM_PROGMEM = "OK" GSM_NL;
Expand All @@ -49,6 +50,7 @@ class TinyGsmBG96 : public TinyGsmModem<TinyGsmBG96>,
public TinyGsmTCP<TinyGsmBG96, TINY_GSM_MUX_COUNT>,
public TinyGsmCalling<TinyGsmBG96>,
public TinyGsmSMS<TinyGsmBG96>,
public TinyGsmMQTT<TinyGsmBG96>,
public TinyGsmTime<TinyGsmBG96>,
public TinyGsmNTP<TinyGsmBG96>,
public TinyGsmGPS<TinyGsmBG96>,
Expand All @@ -59,6 +61,7 @@ class TinyGsmBG96 : public TinyGsmModem<TinyGsmBG96>,
friend class TinyGsmTCP<TinyGsmBG96, TINY_GSM_MUX_COUNT>;
friend class TinyGsmCalling<TinyGsmBG96>;
friend class TinyGsmSMS<TinyGsmBG96>;
friend class TinyGsmMQTT<TinyGsmBG96>;
friend class TinyGsmTime<TinyGsmBG96>;
friend class TinyGsmNTP<TinyGsmBG96>;
friend class TinyGsmGPS<TinyGsmBG96>;
Expand Down Expand Up @@ -606,6 +609,234 @@ class TinyGsmBG96 : public TinyGsmModem<TinyGsmBG96>,
return 2 == res;
}

/*
* MQTT related functions
*/
protected:

int mqttOpenImpl(int id, const char* hostName, int port)
{
sendAT(GF("+QMTOPEN="),id, GF(",\""), hostName, GF("\","), port);

// Wait for OK
if (waitResponse(300L) != 1) {
return -1;
}

// Wait for +QMTOPEN
if (waitResponse(15000L, GF("+QMTOPEN:")) != 1) {
return -1;
}

int clientIndex = streamGetIntBefore(',');
int result = streamGetIntLength(1);

/*
DBG(GF("clientIndex: "), clientIndex);
DBG(GF("result: "), result);
*/

streamSkipUntil('\n'); // The error code of the operation. If it is not

return result;
}

int mqttCloseImpl(int mux)
{
mqtt_connected[mux] = 0;

sendAT(GF("+QMTCLOSE="),mux);

// Wait for OK
if (waitResponse(300L) != 1) {
return -1;
}

// Wait for +QMTCLOSE
if (waitResponse(300L, GF("+QMTCLOSE:")) != 1) {
return -1;
}

int clientIndex = streamGetIntBefore(',');
int state = streamGetIntLength(1);

/*
DBG(GF("clientIndex: "), clientIndex);
DBG(GF("state: "), state);
*/

streamSkipUntil('\n'); // The error code of the operation. If it is not

return state;
}

int mqttConnectImpl(int mux, const char* clientId, const char* userName, const char* password)
{
sendAT(GF("+QMTCONN="),mux, ",\"", clientId, "\",\"", userName, "\",\"", password, "\"");
if (waitResponse(5000L, GF("+QMTCONN:")) != 1) {
return -1;
}

int clientIndex = streamGetIntBefore(',');
int state = streamGetIntLength(1);

/*
DBG(GF("clientIndex: "), clientIndex);
DBG(GF("state: "), state);
*/

streamSkipUntil('\n'); // The error code of the operation. If it is not

mqtt_connected[mux] = (state == 0) ? true : false;

return state;
}

int mqttDisconnectImpl(int mux)
{
mqtt_connected[mux] = false;

sendAT(GF("+QMTDISC="),mux);

// Wait for OK
if (waitResponse(300L) != 1) {
return -1;
}

// Wait for +QMTDISCs
if (waitResponse(300L, GF("+QMTDISC:")) != 1) {
return -1;
}

int clientIndex = streamGetIntBefore(',');
int result = streamGetIntLength(1);

/*
DBG(GF("clientIndex: "), clientIndex);
DBG(GF("result: "), result);
*/

streamSkipUntil('\n'); // The error code of the operation. If it is not

return result;
}

int mqttSubscribeImpl(int id, int msgId, const char* topic, int qos)
{
sendAT(GF("+QMTSUB="),id, ",", msgId, ",\"", topic, "\",", qos);

// Wait for OK
if (waitResponse(300L) != 1) {
return -1;
}

// Wait for +QMTSUB
if (waitResponse(15000L, GF("+QMTSUB:")) != 1) {
return -1;
}

int clientIndex = streamGetIntBefore(',');
int msgIdRsp = streamGetIntBefore(',');
int result = streamGetIntLength(1);
// Ignore optional value

/*
DBG(GF("clientIndex: "), clientIndex);
DBG(GF("msgId: "), msgIdRsp);
DBG(GF("result: "), result);
*/

streamSkipUntil('\n'); // The error code of the operation. If it is not

return result;
}

int mqttUnsubscribeImpl(int id, int msgId, const char *topic)
{
sendAT(GF("+QMTUNS="),id, msgId, topic);

// Wait for OK
if (waitResponse(300L) != 1) {
return -1;
}

// Wait for +QMTUNS
if (waitResponse(15000L, GF("+QMTUNS:")) != 1) {
return -1;
}

int clientIndex = streamGetIntBefore(',');
int msgIdRsp = streamGetIntBefore(',');
int result = streamGetIntLength(1);

/*
DBG(GF("clientIndex: "), clientIndex);
DBG(GF("msgId: "), msgIdRsp);
DBG(GF("result: "), result);
*/

streamSkipUntil('\n'); // The error code of the operation. If it is not

return result;
}

int mqttPublishImpl(int mux, int msgId, int qos, bool retain, const char* topic, const char *payload, int payloadLen)
{
sendAT(GF("+QMTPUB="),mux, ",", msgId, ",", qos, ",", retain, ",\"", topic, "\",", payloadLen);
if (waitResponse(GF(">")) != 1) { return -1; }

// TODO: This isn't going to work for payloads with 0's in them ?

stream.print(payload); // Actually send the message
stream.write(static_cast<char>(0x1A)); // Terminate the message
stream.flush();

// Wait for OK
if (waitResponse(300L) != 1) {
return -1;
}

// Wait for +QMTPUB
if (waitResponse(60000L, GF("+QMTPUB:")) != 1) {
return -1;
}

int clientIndex = streamGetIntBefore(',');
int msgIdRsp = streamGetIntBefore(',');
int result = streamGetIntLength(1);

/*
DBG(GF("clientIndex: "), clientIndex);
DBG(GF("msgId: "), msgIdRsp);
DBG(GF("result: "), result);
*/

streamSkipUntil('\n'); // The error code of the operation. If it is not

return result;
}

int mqttLoopImpl(int mux)
{
// Response is handled in modem implementation
sendAT(GF("+QMTRECV="), mux);

// TODO: +QMTRECV always seems to return an ERROR not OK ?
// NB. Modem Info: Quectel BG600L-M3 Revision: BG600LM3LAR02A04
waitResponse();

return 0;
}

int mqttSetKeepAliveImpl(int mux, int keep_alive_secs)
{
// Response is handled in modem implementation
sendAT(GF("+QMTCFG=\"keepalive\","), mux, ",", keep_alive_secs);

// Wait for OK
return waitResponse() == 1 ? 0 : -1;
}

/*
* Utilities
*/
Expand Down Expand Up @@ -677,6 +908,24 @@ class TinyGsmBG96 : public TinyGsmModem<TinyGsmBG96>,
streamSkipUntil('\n');
}
data = "";
} else if (data.endsWith(GF(GSM_NL "+QMTRECV:"))) {

// +QMTRECV: <client_idx>,<msgID>,<topic>,[<payload_len>,]<payload>)
int clientIndex = streamGetIntBefore(',');
int msgIdRsp = streamGetIntBefore(',');
String topic = stream.readStringUntil(',');
String payload = stream.readStringUntil('\n');

/*
DBG(GF("clientIndex: "), clientIndex);
DBG(GF("msgId: "), msgIdRsp);
DBG(GF("topic: "), topic);
DBG(GF("payload: "), payload);
*/

// TODO: We're dealing with payloads as strings. Might want to reconfigure MQTT stack to return lengths?
if(_mqttReceiveCB)
_mqttReceiveCB[clientIndex](clientIndex, msgIdRsp, topic.c_str(), payload.c_str(), strlen(payload.c_str()));
}
}
} while (millis() - startMillis < timeout_ms);
Expand Down
Loading