Skip to content

Commit

Permalink
FDB remote - fix race condition blocking requests
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed May 1, 2024
1 parent a59cff6 commit ac9d98f
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 16 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.11.118
5.11.119
8 changes: 2 additions & 6 deletions src/fdb5/remote/client/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ void Client::controlWriteCheckResponse(Message msg, uint32_t requestID, bool dat
if (payloadLength) {
data.push_back(std::make_pair(payload, payloadLength));
}
std::future<eckit::Buffer> f = connection_.controlWrite(*this, msg, requestID, dataListener, data);

eckit::Buffer buf = f.get();
eckit::Buffer buf = connection_.controlWrite(*this, msg, requestID, dataListener, data);
ASSERT(buf.size() == 0);
}

Expand All @@ -71,10 +70,7 @@ eckit::Buffer Client::controlWriteReadResponse(Message msg, uint32_t requestID,
if (payloadLength) {
data.push_back(std::make_pair(payload, payloadLength));
}
std::future<eckit::Buffer> f = connection_.controlWrite(*this, msg, requestID, false, data);

eckit::Buffer buf = f.get();
return buf;
return connection_.controlWrite(*this, msg, requestID, false, data);
}

void Client::dataWrite(remote::Message msg, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data) {
Expand Down
8 changes: 6 additions & 2 deletions src/fdb5/remote/client/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,16 @@ eckit::LocalConfiguration ClientConnection::availableFunctionality() const {

// -----------------------------------------------------------------------------------------------------

std::future<eckit::Buffer> ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector<std::pair<const void*, uint32_t>> data) {
eckit::Buffer ClientConnection::controlWrite(Client& client, Message msg, uint32_t requestID, bool dataListener, std::vector<std::pair<const void*, uint32_t>> data) {
auto it = clients_.find(client.clientId());
ASSERT(it != clients_.end());

auto pp = promises_.emplace(requestID, std::promise<eckit::Buffer>{});
std::future<eckit::Buffer> f = pp.first->second.get_future();

Connection::write(msg, true, client.clientId(), requestID, data);

return pp.first->second.get_future();
return f.get();
}

void ClientConnection::dataWrite(DataWriteRequest& r) {
Expand Down Expand Up @@ -419,6 +421,8 @@ void ClientConnection::listeningDataThreadLoop() {

try {

LOG_DEBUG_LIB(LibFdb5) << "ClientConnection::listeningDataThreadLoop started" << std::endl;

MessageHeader hdr;

while (true) {
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/remote/client/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ClientConnection : protected Connection {

virtual ~ClientConnection();

std::future<eckit::Buffer> controlWrite(Client& client, Message msg, uint32_t requestID, bool startDataListener, std::vector<std::pair<const void*, uint32_t>> data={});
eckit::Buffer controlWrite(Client& client, Message msg, uint32_t requestID, bool startDataListener, std::vector<std::pair<const void*, uint32_t>> data={});
void dataWrite(Client& client, Message msg, uint32_t requestID, std::vector<std::pair<const void*, uint32_t>> data={});

void add(Client& client);
Expand Down
11 changes: 5 additions & 6 deletions src/fdb5/toc/TocEngine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <cstring>
#include <list>
#include <ostream>
#include <regex>

#include "eckit/eckit.h"

Expand All @@ -24,7 +25,6 @@
#include "eckit/log/Log.h"
#include "eckit/os/BackTrace.h"
#include "eckit/os/Stat.h"
#include "eckit/utils/Regex.h"
#include "eckit/utils/StringTools.h"

#include "fdb5/LibFdb5.h"
Expand Down Expand Up @@ -148,8 +148,6 @@ std::set<eckit::PathName> TocEngine::databases(const std::set<InspectionKey>& ke
const std::vector<eckit::PathName>& roots,
const Config& config) const {

static bool searchCaseSensitiveDB = eckit::Resource<bool>("fdbSearchCaseSensitiveDB;$FDB_SEARCH_CASESENSITIVE_DB", true);

std::set<eckit::PathName> result;

for (std::vector<eckit::PathName>::const_iterator j = roots.begin(); j != roots.end(); ++j) {
Expand All @@ -166,12 +164,13 @@ std::set<eckit::PathName> TocEngine::databases(const std::set<InspectionKey>& ke
for(std::vector<std::string>::const_iterator dbpath = dbpaths.begin(); dbpath != dbpaths.end(); ++dbpath) {

std::string regex = "^" + *j + "/" + *dbpath + "$";
Regex re(searchCaseSensitiveDB ? eckit::StringTools::lower(regex) : regex);
std::regex reg(regex, std::regex_constants::syntax_option_type::icase | std::regex_constants::syntax_option_type::optimize);

LOG_DEBUG_LIB(LibFdb5) << " -> key i " << *i
<< " dbpath " << *dbpath
<< " pathregex " << re << std::endl;
<< " pathregex " << regex << std::endl;

std::smatch m;
for (std::list<std::string>::const_iterator k = dbs.begin(); k != dbs.end(); ++k) {

LOG_DEBUG_LIB(LibFdb5) << " -> db " << *k << std::endl;
Expand All @@ -180,7 +179,7 @@ std::set<eckit::PathName> TocEngine::databases(const std::set<InspectionKey>& ke
continue;
}

if (re.match(searchCaseSensitiveDB ? eckit::StringTools::lower(*k) : *k)) {
if (std::regex_match(*k, m, reg)) {
result.insert(*k);
}
}
Expand Down

0 comments on commit ac9d98f

Please sign in to comment.