From 0e50a3cb9904716c52ba91e6980d341d8f4a486c Mon Sep 17 00:00:00 2001 From: Israel Sother Date: Fri, 22 Mar 2024 23:40:50 +0000 Subject: [PATCH 1/2] feat(udp-socket): receive packets on separeted thread --- .../DataStreamUDP/udp_server.cpp | 134 ++++++++++++------ .../DataStreamUDP/udp_server.h | 36 ++++- 2 files changed, 120 insertions(+), 50 deletions(-) diff --git a/plotjuggler_plugins/DataStreamUDP/udp_server.cpp b/plotjuggler_plugins/DataStreamUDP/udp_server.cpp index 181b873d3..e32ed16ba 100644 --- a/plotjuggler_plugins/DataStreamUDP/udp_server.cpp +++ b/plotjuggler_plugins/DataStreamUDP/udp_server.cpp @@ -29,6 +29,12 @@ THE SOFTWARE. #include #include #include +#include +#include +#include +#include +#include +#include #include "ui_udp_server.h" @@ -146,79 +152,116 @@ bool UDP_Server::start(QStringList*) QHostAddress address(address_str); + // this->startAsync(address,port,address_str); + workerThread = new WorkerThread(this); + workerThread->udpServer = this; + workerThread->port = port; + workerThread->address_str = address_str; + workerThread->start(); + + while (!_threadStarted) + { + std::this_thread::sleep_until(std::chrono::high_resolution_clock::now() + + std::chrono::milliseconds(10)); + } + + return _running; +} + +void UDP_Server::shutdown() +{ + if (_running) + { + _running = false; + while (!_threadFinished) + { + std::this_thread::sleep_until(std::chrono::high_resolution_clock::now() + + std::chrono::milliseconds(10)); + } + workerThread->deleteLater(); + _threadStarted = false; + } +} + +int UDP_Server::normal_socket(char* address_str, int port) +{ bool success = true; - success &= !address.isNull(); - _udp_socket = new QUdpSocket(); + // UDP socket setup + int sockfd; + struct sockaddr_in servaddr; - if (!address.isMulticast()) + // Create socket + if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) { - success &= _udp_socket->bind(address, port); + success = false; + perror("socket creation failed"); } - else + + memset(&servaddr, 0, sizeof(servaddr)); + + // Server address setup + servaddr.sin_family = AF_INET; + // network interface + if (inet_aton(address_str, &servaddr.sin_addr) == 0) { - success &= _udp_socket->bind( - address, port, QAbstractSocket::ShareAddress | QAbstractSocket::ReuseAddressHint); + success = false; + perror("Invalid address"); + } + servaddr.sin_port = htons(port); // Port 9870 (adjust as needed) - // Add multicast group membership to all interfaces which support multicast. - for (const auto& interface : QNetworkInterface::allInterfaces()) - { - QNetworkInterface::InterfaceFlags iflags = interface.flags(); - if (interface.isValid() && !iflags.testFlag(QNetworkInterface::IsLoopBack) && - iflags.testFlag(QNetworkInterface::CanMulticast) && - iflags.testFlag(QNetworkInterface::IsRunning)) - { - success &= _udp_socket->joinMulticastGroup(address, interface); - } - } + // Bind socket with server address + if (bind(sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr)) < 0) + { + success = false; + perror("bind failed"); } + int n; + // Receive data from client _running = true; - connect(_udp_socket, &QUdpSocket::readyRead, this, &UDP_Server::processMessage); - if (success) { qDebug() << tr("UDP listening on (%1, %2)").arg(address_str).arg(port); } else { + qDebug() << tr("Couldn't bind to UDP (%1, %2)").arg(address_str).arg(port); QMessageBox::warning(nullptr, tr("UDP Server"), tr("Couldn't bind to UDP (%1, %2)").arg(address_str).arg(port), QMessageBox::Ok); + close(sockfd); shutdown(); + return -1; } - return _running; -} - -void UDP_Server::shutdown() -{ - if (_running && _udp_socket) - { - _udp_socket->deleteLater(); - _running = false; - } -} - -void UDP_Server::processMessage() -{ - while (_udp_socket->hasPendingDatagrams()) + _threadStarted = true; + while (_running) { - QNetworkDatagram datagram = _udp_socket->receiveDatagram(); + std::vector data(2048); + n = recvfrom(sockfd, data.data(), 2048, 0, NULL, 0); + if (n < 0) + { + perror("recvfrom failed"); + close(sockfd); + shutdown(); + return -1; + } using namespace std::chrono; auto ts = high_resolution_clock::now().time_since_epoch(); double timestamp = 1e-6 * double(duration_cast(ts).count()); - - QByteArray m = datagram.data(); - MessageRef msg(reinterpret_cast(m.data()), m.count()); - + MessageRef msg(reinterpret_cast(data.data()), n); + // printf("%s\n", data.data()); try { std::lock_guard lock(mutex()); // important use the mutex to protect any access to the data _parser->parseMessage(msg, timestamp); + + // notify the GUI + emit dataReceived(); } catch (std::exception& err) { @@ -230,10 +273,11 @@ void UDP_Server::processMessage() shutdown(); // notify the GUI emit closed(); - return; + return -1; } } - // notify the GUI - emit dataReceived(); - return; -} + qDebug() << tr("Closing UDP thread"); + close(sockfd); + _threadFinished = true; + return 1; +} \ No newline at end of file diff --git a/plotjuggler_plugins/DataStreamUDP/udp_server.h b/plotjuggler_plugins/DataStreamUDP/udp_server.h index 0a3046d1d..4f16c986e 100644 --- a/plotjuggler_plugins/DataStreamUDP/udp_server.h +++ b/plotjuggler_plugins/DataStreamUDP/udp_server.h @@ -22,8 +22,12 @@ THE SOFTWARE. #include "PlotJuggler/datastreamer_base.h" #include "PlotJuggler/messageparser_base.h" +#include + using namespace PJ; + +class WorkerThread; class UDP_Server : public PJ::DataStreamer { Q_OBJECT @@ -54,12 +58,34 @@ class UDP_Server : public PJ::DataStreamer return false; } -private: - bool _running; +public: QUdpSocket* _udp_socket; - PJ::MessageParserPtr _parser; -private slots: + int normal_socket(char* address_str, int port); - void processMessage(); +private: + bool _running,_threadStarted = false,_threadFinished = false; + WorkerThread* workerThread; + PJ::MessageParserPtr _parser; }; + +class WorkerThread : public QThread +{ + Q_OBJECT +public: + WorkerThread(QObject* parent = nullptr) : QThread(parent) + { + } + + UDP_Server* udpServer; + int port; + QString address_str; + +protected: + void run() override + { + QByteArray byteArray = address_str.toUtf8(); // Convert QString to QByteArray using UTF-8 encoding + char* charPtr = byteArray.data(); // Get pointer to the underlying data + udpServer->normal_socket(charPtr, port); + } +}; \ No newline at end of file From 7af07a2e61151dc6b193a7c3c564f98fdcf8c1ef Mon Sep 17 00:00:00 2001 From: Israel Sother Date: Sat, 23 Mar 2024 00:22:32 +0000 Subject: [PATCH 2/2] fix(udp-socket): remove old includes --- plotjuggler_plugins/DataStreamUDP/udp_server.cpp | 7 ------- 1 file changed, 7 deletions(-) diff --git a/plotjuggler_plugins/DataStreamUDP/udp_server.cpp b/plotjuggler_plugins/DataStreamUDP/udp_server.cpp index e32ed16ba..61b27330b 100644 --- a/plotjuggler_plugins/DataStreamUDP/udp_server.cpp +++ b/plotjuggler_plugins/DataStreamUDP/udp_server.cpp @@ -23,16 +23,9 @@ THE SOFTWARE. #include #include #include -#include #include #include #include -#include -#include -#include -#include -#include -#include #include #include