Skip to content

Commit

Permalink
Merge pull request #68 from ecmwf/feature/remoteFDB-add-catalogue-exists
Browse files Browse the repository at this point in the history
Adds functionality to the client-side exists() methods
  • Loading branch information
danovaro authored Jan 17, 2025
2 parents 1336a19 + 5c18872 commit dc2cda7
Show file tree
Hide file tree
Showing 18 changed files with 619 additions and 349 deletions.
60 changes: 33 additions & 27 deletions src/fdb5/remote/Connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@

#include "fdb5/LibFdb5.h"
#include "fdb5/remote/Connection.h"
#include "fdb5/remote/Messages.h"
#include <cstdint>
#include <mutex>
#include <string_view>

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

Connection::Connection() : single_(false) {}
Connection::~Connection() {}
Connection::Connection() : single_(false) { }

void Connection::teardown() {

Expand All @@ -19,20 +22,20 @@ void Connection::teardown() {
// all done - disconnecting
Connection::write(Message::Exit, false, 0, 0);
} catch(...) {
// if connection is already down, no need to escalate
// if connection is already down, no need to escalate
}
}
try {
// all done - disconnecting
Connection::write(Message::Exit, true, 0, 0);
} catch(...) {
// if connection is already down, no need to escalate
// if connection is already down, no need to escalate
}
}

//----------------------------------------------------------------------------------------------------------------------

void Connection::writeUnsafe(bool control, const void* data, size_t length) {
void Connection::writeUnsafe(const bool control, const void* const data, const size_t length) const {
long written = 0;
if (control || single_) {
written = controlSocket().write(data, length);
Expand All @@ -51,7 +54,7 @@ void Connection::writeUnsafe(bool control, const void* data, size_t length) {
}
}

void Connection::readUnsafe(bool control, void* data, size_t length) {
void Connection::readUnsafe(bool control, void* data, size_t length) const {
long read = 0;
if (control || single_) {
read = controlSocket().read(data, length);
Expand All @@ -70,14 +73,14 @@ void Connection::readUnsafe(bool control, void* data, size_t length) {
}
}

eckit::Buffer Connection::read(bool control, MessageHeader& hdr) {
eckit::Buffer Connection::read(const bool control, MessageHeader& hdr) const {
eckit::FixedString<4> tail;

std::lock_guard<std::mutex> lock((control || single_) ? readControlMutex_ : readDataMutex_);
readUnsafe(control, &hdr, sizeof(hdr));

ASSERT(hdr.marker == StartMarker);
ASSERT(hdr.version == CurrentVersion);
ASSERT(hdr.marker == MessageHeader::StartMarker);
ASSERT(hdr.version == MessageHeader::currentVersion);
ASSERT(single_ || hdr.control() == control);

eckit::Buffer payload{hdr.payloadSize};
Expand All @@ -86,7 +89,7 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) {
}
// Ensure we have consumed exactly the correct amount from the socket.
readUnsafe(control, &tail, sizeof(tail));
ASSERT(tail == EndMarker);
ASSERT(tail == MessageHeader::EndMarker);

if (hdr.message == Message::Error) {

Expand All @@ -99,40 +102,43 @@ eckit::Buffer Connection::read(bool control, MessageHeader& hdr) {
return payload;
}

void Connection::write(remote::Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) {
write(msg, control, clientID, requestID, std::vector<std::pair<const void*, uint32_t>>{{data, length}});
}

void Connection::write(remote::Message msg, bool control, uint32_t clientID, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data) {
void Connection::write(const Message msg,
const bool control,
const uint32_t clientID,
const uint32_t requestID,
const PayloadList payloads) const {

uint32_t payloadLength = 0;
for (auto d: data) {
ASSERT(d.first);
payloadLength += d.second;
for (const auto& payload : payloads) {
ASSERT(payload.data);
payloadLength += payload.length;
}

MessageHeader message{msg, control, clientID, requestID, payloadLength};

LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID() << ",control=" << control << ",requestID=" << requestID << ",data=" << data.size() << ",payload=" << payloadLength << "]" << std::endl;
LOG_DEBUG_LIB(LibFdb5) << "Connection::write [message=" << msg << ",clientID=" << message.clientID()
<< ",control=" << control << ",requestID=" << requestID << ",payloadsSize=" << payloads.size()
<< ",payloadLength=" << payloadLength << "]" << std::endl;

std::lock_guard<std::mutex> lock((control || single_) ? controlMutex_ : dataMutex_);

writeUnsafe(control, &message, sizeof(message));
for (auto d: data) {
writeUnsafe(control, d.first, d.second);
}
writeUnsafe(control, &EndMarker, sizeof(EndMarker));

for (const auto& payload : payloads) { writeUnsafe(control, payload.data, payload.length); }

writeUnsafe(control, &MessageHeader::EndMarker, MessageHeader::markerBytes);
}

void Connection::error(const std::string& msg, uint32_t clientID, uint32_t requestID) {
void Connection::error(std::string_view msg, uint32_t clientID, uint32_t requestID) const {
eckit::Log::error() << "[clientID=" << clientID << ",requestID=" << requestID << "] " << msg << std::endl;
write(Message::Error, false, clientID, requestID, std::vector<std::pair<const void*, uint32_t>>{{msg.c_str(), msg.length()}});
write(Message::Error, false, clientID, requestID, msg.data(), msg.length());
}

eckit::Buffer Connection::readControl(MessageHeader& hdr) {
eckit::Buffer Connection::readControl(MessageHeader& hdr) const {
return read(true, hdr);
}

eckit::Buffer Connection::readData(MessageHeader& hdr) {
eckit::Buffer Connection::readData(MessageHeader& hdr) const {
return read(false, hdr);
}

Expand Down
56 changes: 35 additions & 21 deletions src/fdb5/remote/Connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,19 @@

#pragma once

#include "eckit/serialisation/MemoryStream.h"
#include "fdb5/remote/Messages.h"

#include "eckit/exception/Exceptions.h"
#include "eckit/net/TCPSocket.h"
#include "eckit/os/BackTrace.h"

#include "fdb5/remote/Messages.h"
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <mutex>
#include <string_view>
#include <vector>

namespace eckit {

Expand All @@ -40,41 +48,47 @@ class TCPException : public eckit::Exception {

class Connection : eckit::NonCopyable {

public: // types
using PayloadList = std::vector<Payload>;

public: // methods
Connection();
virtual ~Connection();

void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length);
void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data = {});
virtual ~Connection() = default;

void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, PayloadList payloads = {}) const;

void error(const std::string& msg, uint32_t clientID, uint32_t requestID);
void write(Message msg, bool control, uint32_t clientID, uint32_t requestID, const void* data, uint32_t length) const {
write(msg, control, clientID, requestID, {{length, data}});
}

eckit::Buffer readControl(MessageHeader& hdr);
eckit::Buffer readData(MessageHeader& hdr);
void error(std::string_view msg, uint32_t clientID, uint32_t requestID) const;

void teardown();
eckit::Buffer readControl(MessageHeader& hdr) const;

private: // methods
eckit::Buffer readData(MessageHeader& hdr) const;

eckit::Buffer read(bool control, MessageHeader& hdr);
void teardown();

void writeUnsafe(bool control, const void* data, size_t length);
void readUnsafe(bool control, void* data, size_t length);
private: // methods
eckit::Buffer read(bool control, MessageHeader& hdr) const;

virtual eckit::net::TCPSocket& controlSocket() = 0;
virtual eckit::net::TCPSocket& dataSocket() = 0;
void writeUnsafe(bool control, const void* data, size_t length) const;

protected: // members
void readUnsafe(bool control, void* data, size_t length) const;

bool single_;
virtual const eckit::net::TCPSocket& controlSocket() const = 0;

private: // members
virtual const eckit::net::TCPSocket& dataSocket() const = 0;

std::mutex controlMutex_;
std::mutex dataMutex_;
std::mutex readControlMutex_;
std::mutex readDataMutex_;
protected: // members
bool single_;

private: // members
mutable std::mutex controlMutex_;
mutable std::mutex dataMutex_;
mutable std::mutex readControlMutex_;
mutable std::mutex readDataMutex_;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
5 changes: 2 additions & 3 deletions src/fdb5/remote/Messages.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

#include "fdb5/remote/Messages.h"

using namespace eckit;

namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -47,6 +45,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) {
case Message::Move: s << "Move"; break;
case Message::Store: s << "Store"; break;
case Message::Axes: s << "Axes"; break;
case Message::Exists: s << "Exists"; break;

// Responses
case Message::Received: s << "Received"; break;
Expand All @@ -62,7 +61,7 @@ std::ostream& operator<<(std::ostream& s, const Message& m) {

MessageHeader::MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize) :
marker(StartMarker),
version(CurrentVersion),
version(currentVersion),
message(message),
clientID_((clientID<<1) + (control ? 1 : 0)),
requestID(requestID),
Expand Down
60 changes: 33 additions & 27 deletions src/fdb5/remote/Messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

#pragma once

#include <cstdint>
#include <cmath>
#include <cstddef>
#include <cstdint>

#include "eckit/types/FixedString.h"

Expand All @@ -31,10 +32,12 @@ namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------

const static eckit::FixedString<4> StartMarker {"SFDB"};
const static eckit::FixedString<4> EndMarker {"EFDB"};
struct Payload {
Payload(std::size_t length, const void* data) : length {length}, data {data} { }

constexpr uint16_t CurrentVersion = 12;
std::size_t length {0};
const void* data {nullptr};
};

enum class Message : uint16_t {

Expand Down Expand Up @@ -63,6 +66,7 @@ enum class Message : uint16_t {
Move,
Store,
Axes,
Exists,

// Responses
Received = 200,
Expand All @@ -76,21 +80,31 @@ enum class Message : uint16_t {

std::ostream& operator<<(std::ostream& s, const Message& m);

//----------------------------------------------------------------------------------------------------------------------

// Header used for all messages
class MessageHeader {

public: // methods
public: // types
constexpr static uint16_t currentVersion = 12;

constexpr static const auto hashBytes = 16;

constexpr static const auto markerBytes = 4;

using MarkerType = eckit::FixedString<markerBytes>;

using HashType = eckit::FixedString<hashBytes>;

MessageHeader() :
version(CurrentVersion),
message(Message::None),
clientID_(0),
requestID(0),
payloadSize(0) {}
inline static const MarkerType StartMarker {"SFDB"};

inline static const MarkerType EndMarker {"EFDB"};

public: // methods
MessageHeader() = default;

MessageHeader(Message message, bool control, uint32_t clientID, uint32_t requestID, uint32_t payloadSize);

bool control() const {
return ((clientID_ & 0x00000001) == 1);
}
Expand All @@ -99,21 +113,13 @@ class MessageHeader {
}

public:

eckit::FixedString<4> marker; // 4 bytes --> 4

uint16_t version; // 2 bytes --> 6

Message message; // 2 bytes --> 8

uint32_t clientID_; // 4 bytes --> 12

uint32_t requestID; // 4 bytes --> 16

uint32_t payloadSize; // 4 bytes --> 20

eckit::FixedString<16> hash; // 16 bytes --> 36

MarkerType marker; // 4 bytes --> 4
uint16_t version {currentVersion}; // 2 bytes --> 6
Message message {Message::None}; // 2 bytes --> 8
uint32_t clientID_ {0}; // 4 bytes --> 12
uint32_t requestID {0}; // 4 bytes --> 16
uint32_t payloadSize {0}; // 4 bytes --> 20
HashType hash; // 16 bytes --> 36
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit dc2cda7

Please sign in to comment.