Skip to content

Commit

Permalink
Fix MQTT plugin (#1006, #1008) (#1009)
Browse files Browse the repository at this point in the history
* mqtt_client: fix bug where program flow gets stuck when calling mosquitto_connect_bind_v5.

The connection procedure starts with a fresh mosquitto instance each time. The instance will be destroyed on disconnect.
All mosquitto return codes are now checked during client configuration.

* mqtt_client: add debug messages and improve QMessageBox messages.

* mqtt_client: add mosquitto thread, fix #1006.

Threaded mode of libmosquitto may not be supported on windows (libmosquitto < 2.1). At some point libmosquitto has removed threaded support (mosquitto_loop_start doesn't work and returns MOSQ_ERR_NOT_SUPPORTED). Threaded support will be added again in the future, see eclipse-mosquitto/mosquitto#2707. This commit introduces a thread dedicated to mosquitto to solve the problem.

* mqtt_client: fix typo.

---------

Co-authored-by: Valentin Platzgummer <[email protected]>
  • Loading branch information
Valle125 and Valentin Platzgummer authored Nov 10, 2024
1 parent d4240e5 commit 78fb7a1
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 19 deletions.
162 changes: 143 additions & 19 deletions plotjuggler_plugins/DataStreamMQTT/mqtt_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,19 @@
#include <QDebug>
#include <QMessageBox>
#include <QString>
#include <QtGlobal>
#include <QLoggingCategory>

#ifdef WIN32
#include <windows.h>
#include <strsafe.h>
#endif

#define MQTT_DEBUG 0
#define debug() qCDebug(category)

static const QLoggingCategory category("MQTTClient");

void connect_callback(struct mosquitto* mosq, void* context, int result, int,
const mosquitto_property*)
{
Expand Down Expand Up @@ -47,16 +54,39 @@ void message_callback(struct mosquitto* mosq, void* context,
self->onMessageReceived(message);
}

void log_callback(struct mosquitto* mosq, void* context, int log_level, const char* msg)
{
const std::pair<int, const char*> log_level_map[] = {
{ MOSQ_LOG_INFO, "MOSQ_LOG_INFO" },
{ MOSQ_LOG_NOTICE, "MOSQ_LOG_NOTICE" },
{ MOSQ_LOG_WARNING, "MOSQ_LOG_WARNING" },
{ MOSQ_LOG_ERR, "MOSQ_LOG_ERR " },
{ MOSQ_LOG_DEBUG, "MOSQ_LOG_DEBUG" },
{ MOSQ_LOG_SUBSCRIBE, "MOSQ_LOG_SUBSCRIBE" },
{ MOSQ_LOG_UNSUBSCRIBE, "MOSQ_LOG_UNSUBSCRIBE" },
{ MOSQ_LOG_WEBSOCKETS, "MOSQ_LOG_WEBSOCKETS" },
};

const auto it =
std::find_if(std::begin(log_level_map), std::end(log_level_map),
[log_level](const auto& pair) { return log_level == pair.first; });
if (it == std::end(log_level_map))
return;

debug() << it->second << ": " << msg;
}

//----------------------------

MQTTClient::MQTTClient()
{
mosquitto_lib_init();
_mosq = mosquitto_new(nullptr, true, this);

mosquitto_connect_v5_callback_set(_mosq, connect_callback);
mosquitto_disconnect_callback_set(_mosq, disconnect_callback);
mosquitto_message_v5_callback_set(_mosq, message_callback);
#if MQTT_DEBUG
int major = 0, minor = 0, revision = 0;
mosquitto_lib_version(&major, &minor, &revision);
debug() << "mosquitto version: " << major << "." << minor << "." << revision;
#endif // MQTT_DEBUG
}

MQTTClient::~MQTTClient()
Expand All @@ -75,13 +105,53 @@ bool MQTTClient::connect(const MosquittoConfig& config)
disconnect();
}

mosquitto_int_option(_mosq, MOSQ_OPT_PROTOCOL_VERSION, config.protocol_version);
// Start with a fresh mosquitto instance.
Q_ASSERT(_mosq == nullptr);
_mosq = mosquitto_new(nullptr, true, this);

bool success = configureMosquitto(config);
if (!success)
{
mosquitto_destroy(_mosq);
_mosq = nullptr;
return false;
}

_connected = true;
_config = config;
return true;
}

bool MQTTClient::configureMosquitto(const MosquittoConfig& config)
{
mosquitto_connect_v5_callback_set(_mosq, connect_callback);
mosquitto_disconnect_callback_set(_mosq, disconnect_callback);
mosquitto_message_v5_callback_set(_mosq, message_callback);
#if MQTT_DEBUG
mosquitto_log_callback_set(_mosq, log_callback);
#endif

int rc =
mosquitto_int_option(_mosq, MOSQ_OPT_PROTOCOL_VERSION, config.protocol_version);
if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client", QString("MQTT initialization failed."),
QMessageBox::Ok);
debug() << "MQTT initialization failed:" << mosquitto_strerror(rc);
return false;
}

if ((!config.username.empty() || !config.password.empty()))
{
if (mosquitto_username_pw_set(_mosq, config.username.c_str(),
config.password.c_str()))
rc = mosquitto_username_pw_set(_mosq, config.username.c_str(),
config.password.c_str());
if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client",
QString("MQTT initialization failed. Double check username "
"and password."),
QMessageBox::Ok);
debug() << "MQTT username or password error:" << mosquitto_strerror(rc);
return false;
}
}
Expand All @@ -91,16 +161,31 @@ bool MQTTClient::connect(const MosquittoConfig& config)
const char* cafile = config.cafile.c_str();
const char* certfile = config.certfile.empty() ? nullptr : config.certfile.c_str();
const char* keyfile = config.keyfile.empty() ? nullptr : config.keyfile.c_str();

mosquitto_tls_set(_mosq, cafile, nullptr, certfile, keyfile, nullptr);
rc = mosquitto_tls_set(_mosq, cafile, nullptr, certfile, keyfile, nullptr);
if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client",
QString("MQTT initialization failed. Double check "
"certificates."),
QMessageBox::Ok);
debug() << "MQTT certificate error:" << mosquitto_strerror(rc);
return false;
}
}

mosquitto_max_inflight_messages_set(_mosq, config.max_inflight);
rc = mosquitto_max_inflight_messages_set(_mosq, config.max_inflight);
if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client", QString("MQTT initialization failed."),
QMessageBox::Ok);
debug() << "MQTT setting max inflight messages failed:" << mosquitto_strerror(rc);
return false;
}

const mosquitto_property* properties = nullptr; // todo

int rc = mosquitto_connect_bind_v5(_mosq, config.host.c_str(), config.port,
config.keepalive, nullptr, properties);
rc = mosquitto_connect_bind_v5(_mosq, config.host.c_str(), config.port,
config.keepalive, nullptr, properties);
// TODO bind
if (rc > 0)
{
Expand All @@ -121,13 +206,38 @@ bool MQTTClient::connect(const MosquittoConfig& config)
QString("Unable to connect (%1)").arg(mosquitto_strerror(rc)),
QMessageBox::Ok);
}
_connected = false;
debug() << "MQTT connect failed:" << mosquitto_strerror(rc);
return false;
}

_connected = true;
_config = config;
mosquitto_loop_start(_mosq);
rc = mosquitto_loop_start(_mosq);
if (rc == MOSQ_ERR_NOT_SUPPORTED)
{
// Threaded mode may not be supported on windows (libmosquitto < 2.1).
// See https://github.com/eclipse/mosquitto/issues/2707
Q_ASSERT(_thread == nullptr);
_thread = new std::thread([this]() {
int rc = mosquitto_loop_forever(this->_mosq, -1, 1);
if (rc != MOSQ_ERR_SUCCESS)
{
debug() << "MQTT loop forever failed:" << mosquitto_strerror(rc);
}
});
if (_thread == nullptr)
{
QMessageBox::warning(nullptr, "MQTT Client", QString("Failed to start MQTT client"),
QMessageBox::Ok);
debug() << "MQTT start failed: could not allocate memory.";
return false;
}
}
else if (rc != MOSQ_ERR_SUCCESS)
{
QMessageBox::warning(nullptr, "MQTT Client", QString("Failed to start MQTT client"),
QMessageBox::Ok);
debug() << "MQTT start loop failed:" << mosquitto_strerror(rc);
return false;
}
return true;
}

Expand All @@ -137,6 +247,14 @@ void MQTTClient::disconnect()
{
mosquitto_disconnect(_mosq);
mosquitto_loop_stop(_mosq, true);
if (_thread != nullptr)
{
_thread->join();
delete _thread;
_thread = nullptr;
}
mosquitto_destroy(_mosq);
_mosq = nullptr;
}
_connected = false;
_topics_set.clear();
Expand Down Expand Up @@ -181,10 +299,16 @@ std::unordered_set<std::string> MQTTClient::getTopicList()

void MQTTClient::subscribe(const std::string& topic, int qos)
{
mosquitto_subscribe(_mosq, nullptr, topic.c_str(), qos);
if (_connected)
{
mosquitto_subscribe(_mosq, nullptr, topic.c_str(), qos);
}
}

void MQTTClient::unsubscribe(const std::string& topic)
{
mosquitto_unsubscribe(_mosq, nullptr, topic.c_str());
}
if (_connected)
{
mosquitto_unsubscribe(_mosq, nullptr, topic.c_str());
}
}
4 changes: 4 additions & 0 deletions plotjuggler_plugins/DataStreamMQTT/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "mosquitto_config.h"
#include <string>
#include <functional>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <mutex>
Expand Down Expand Up @@ -46,11 +47,14 @@ class MQTTClient : public QObject
void disconnected();

private:
bool configureMosquitto(const MosquittoConfig& config);

mosquitto* _mosq = nullptr;
std::unordered_map<std::string, TopicCallback> _message_callbacks;
std::unordered_set<std::string> _topics_set;
std::mutex _mutex;
MosquittoConfig _config;
std::thread* _thread;
};

#endif // MQTT_CLIENT_H

0 comments on commit 78fb7a1

Please sign in to comment.