Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Udp-socket-thread #956

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 85 additions & 48 deletions plotjuggler_plugins/DataStreamUDP/udp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ THE SOFTWARE.
#include <QSettings>
#include <QDialog>
#include <mutex>
#include <QWebSocket>
#include <QIntValidator>
#include <QMessageBox>
#include <chrono>
#include <QNetworkDatagram>
#include <QNetworkInterface>
#include <arpa/inet.h>
#include <unistd.h>

#include "ui_udp_server.h"

Expand Down Expand Up @@ -146,79 +145,116 @@ bool UDP_Server::start(QStringList*)

QHostAddress address(address_str);

// this->startAsync(address,port,address_str);
workerThread = new WorkerThread(this);
workerThread->udpServer = this;
workerThread->port = port;
workerThread->address_str = address_str;
workerThread->start();

while (!_threadStarted)
{
std::this_thread::sleep_until(std::chrono::high_resolution_clock::now() +
std::chrono::milliseconds(10));
}

return _running;
}

void UDP_Server::shutdown()
{
if (_running)
{
_running = false;
while (!_threadFinished)
{
std::this_thread::sleep_until(std::chrono::high_resolution_clock::now() +
std::chrono::milliseconds(10));
}
workerThread->deleteLater();
_threadStarted = false;
}
}

int UDP_Server::normal_socket(char* address_str, int port)
{
bool success = true;
success &= !address.isNull();

_udp_socket = new QUdpSocket();
// UDP socket setup
int sockfd;
struct sockaddr_in servaddr;

if (!address.isMulticast())
// Create socket
if ((sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0)
{
success &= _udp_socket->bind(address, port);
success = false;
perror("socket creation failed");
}
else

memset(&servaddr, 0, sizeof(servaddr));

// Server address setup
servaddr.sin_family = AF_INET;
// network interface
if (inet_aton(address_str, &servaddr.sin_addr) == 0)
{
success &= _udp_socket->bind(
address, port, QAbstractSocket::ShareAddress | QAbstractSocket::ReuseAddressHint);
success = false;
perror("Invalid address");
}
servaddr.sin_port = htons(port); // Port 9870 (adjust as needed)

// Add multicast group membership to all interfaces which support multicast.
for (const auto& interface : QNetworkInterface::allInterfaces())
{
QNetworkInterface::InterfaceFlags iflags = interface.flags();
if (interface.isValid() && !iflags.testFlag(QNetworkInterface::IsLoopBack) &&
iflags.testFlag(QNetworkInterface::CanMulticast) &&
iflags.testFlag(QNetworkInterface::IsRunning))
{
success &= _udp_socket->joinMulticastGroup(address, interface);
}
}
// Bind socket with server address
if (bind(sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)
{
success = false;
perror("bind failed");
}

int n;
// Receive data from client
_running = true;

connect(_udp_socket, &QUdpSocket::readyRead, this, &UDP_Server::processMessage);

if (success)
{
qDebug() << tr("UDP listening on (%1, %2)").arg(address_str).arg(port);
}
else
{
qDebug() << tr("Couldn't bind to UDP (%1, %2)").arg(address_str).arg(port);
QMessageBox::warning(nullptr, tr("UDP Server"),
tr("Couldn't bind to UDP (%1, %2)").arg(address_str).arg(port),
QMessageBox::Ok);
close(sockfd);
shutdown();
return -1;
}

return _running;
}

void UDP_Server::shutdown()
{
if (_running && _udp_socket)
{
_udp_socket->deleteLater();
_running = false;
}
}

void UDP_Server::processMessage()
{
while (_udp_socket->hasPendingDatagrams())
_threadStarted = true;
while (_running)
{
QNetworkDatagram datagram = _udp_socket->receiveDatagram();
std::vector<unsigned char> data(2048);
n = recvfrom(sockfd, data.data(), 2048, 0, NULL, 0);
if (n < 0)
{
perror("recvfrom failed");
close(sockfd);
shutdown();
return -1;
}

using namespace std::chrono;
auto ts = high_resolution_clock::now().time_since_epoch();
double timestamp = 1e-6 * double(duration_cast<microseconds>(ts).count());

QByteArray m = datagram.data();
MessageRef msg(reinterpret_cast<uint8_t*>(m.data()), m.count());

MessageRef msg(reinterpret_cast<uint8_t*>(data.data()), n);
// printf("%s\n", data.data());
try
{
std::lock_guard<std::mutex> lock(mutex());
// important use the mutex to protect any access to the data
_parser->parseMessage(msg, timestamp);

// notify the GUI
emit dataReceived();
}
catch (std::exception& err)
{
Expand All @@ -230,10 +266,11 @@ void UDP_Server::processMessage()
shutdown();
// notify the GUI
emit closed();
return;
return -1;
}
}
// notify the GUI
emit dataReceived();
return;
}
qDebug() << tr("Closing UDP thread");
close(sockfd);
_threadFinished = true;
return 1;
}
36 changes: 31 additions & 5 deletions plotjuggler_plugins/DataStreamUDP/udp_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ THE SOFTWARE.
#include "PlotJuggler/datastreamer_base.h"
#include "PlotJuggler/messageparser_base.h"

#include <QThread>

using namespace PJ;


class WorkerThread;
class UDP_Server : public PJ::DataStreamer
{
Q_OBJECT
Expand Down Expand Up @@ -54,12 +58,34 @@ class UDP_Server : public PJ::DataStreamer
return false;
}

private:
bool _running;
public:
QUdpSocket* _udp_socket;
PJ::MessageParserPtr _parser;

private slots:
int normal_socket(char* address_str, int port);

void processMessage();
private:
bool _running,_threadStarted = false,_threadFinished = false;
WorkerThread* workerThread;
PJ::MessageParserPtr _parser;
};

class WorkerThread : public QThread
{
Q_OBJECT
public:
WorkerThread(QObject* parent = nullptr) : QThread(parent)
{
}

UDP_Server* udpServer;
int port;
QString address_str;

protected:
void run() override
{
QByteArray byteArray = address_str.toUtf8(); // Convert QString to QByteArray using UTF-8 encoding
char* charPtr = byteArray.data(); // Get pointer to the underlying data
udpServer->normal_socket(charPtr, port);
}
};