Skip to content

Commit

Permalink
fix multiple flushes + lazy creation of futures
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Apr 4, 2024
1 parent 6c3ff1b commit 26344a8
Show file tree
Hide file tree
Showing 31 changed files with 212 additions and 203 deletions.
60 changes: 55 additions & 5 deletions src/fdb5/api/FDB.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,66 @@ eckit::DataHandle* FDB::read(const std::vector<eckit::URI>& uris, bool sorted) {
}
return result.dataHandle();
}


eckit::DataHandle* FDB::read(ListIterator& it, bool sorted) {
eckit::DataHandle* FDB::read(bool seekable, ListIterator& it, bool sorted) {

return new FieldHandle(it);
if (seekable) {
return new FieldHandle(it);
}

eckit::Timer timer;
timer.start();

HandleGatherer result(sorted);
ListElement el;

static bool dedup = eckit::Resource<bool>("fdbDeduplicate;$FDB_DEDUPLICATE_FIELDS", false);
if (dedup) {
if (it.next(el)) {
// build the request representing the tensor-product of all retrieved fields
metkit::mars::MarsRequest cubeRequest = el.combinedKey().request();
std::vector<ListElement> elements{el};

while (it.next(el)) {
cubeRequest.merge(el.combinedKey().request());
elements.push_back(el);
}

// checking all retrieved fields against the hypercube, to remove duplicates
ListElementDeduplicator dedup;
metkit::hypercube::HyperCubePayloaded<ListElement> cube(cubeRequest, dedup);
for(auto el: elements) {
cube.add(el.combinedKey().request(), el);
}

if (cube.countVacant() > 0) {
std::stringstream ss;
ss << "No matching data for requests:" << std::endl;
for (auto req: cube.vacantRequests()) {
ss << " " << req << std::endl;
}
eckit::Log::warning() << ss.str() << std::endl;
}

for (size_t i=0; i< cube.size(); i++) {
ListElement element;
if (cube.find(i, element)) {
result.add(element.location().dataHandle());
}
}
}
}
else {
while (it.next(el)) {
result.add(el.location().dataHandle());
}
}
return result.dataHandle();
}

eckit::DataHandle* FDB::retrieve(const metkit::mars::MarsRequest& request) {
eckit::DataHandle* FDB::retrieve(const metkit::mars::MarsRequest& request, bool seekable) {
ListIterator it = inspect(request);
return read(it, sorted(request));
return read(seekable, it, sorted(request));
}

ListIterator FDB::inspect(const metkit::mars::MarsRequest& request) {
Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/api/FDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ class FDB {

eckit::DataHandle* read(const std::vector<eckit::URI>& uris, bool sorted = false);

eckit::DataHandle* read(ListIterator& it, bool sorted = false);
eckit::DataHandle* read(bool seekable, ListIterator& it, bool sorted = false);

eckit::DataHandle* retrieve(const metkit::mars::MarsRequest& request);
eckit::DataHandle* retrieve(const metkit::mars::MarsRequest& request, bool seekable = false);

ListIterator inspect(const metkit::mars::MarsRequest& request);

Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/api/fdb_c.cc
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ int fdb_retrieve(fdb_handle_t* fdb, fdb_request_t* req, fdb_datareader_t* dr) {
ASSERT(fdb);
ASSERT(req);
ASSERT(dr);
dr->set(fdb->retrieve(req->request()));
dr->set(fdb->retrieve(req->request(), true));
});
}
int fdb_flush(fdb_handle_t* fdb) {
Expand Down
11 changes: 2 additions & 9 deletions src/fdb5/database/ArchiveVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
#include "fdb5/database/Catalogue.h"
#include "fdb5/database/Store.h"

// namespace {
// void CatalogueCallback(fdb5::CatalogueWriter* catalogue, const fdb5::InspectionKey &key, std::unique_ptr<fdb5::FieldLocation> fieldLocation) {
// catalogue->archive(key, std::move(fieldLocation));
// }
// }
namespace fdb5 {

ArchiveVisitor::ArchiveVisitor(Archiver &owner, const Key &dataKey, const void *data, size_t size) :
Expand All @@ -31,12 +26,10 @@ ArchiveVisitor::ArchiveVisitor(Archiver &owner, const Key &dataKey, const void *

bool ArchiveVisitor::selectDatum(const InspectionKey &key, const Key &full) {

// eckit::Log::info() << "selectDatum " << key << ", " << full << " " << size_ << std::endl;
checkMissingKeys(full);
const Key idxKey = current()->currentIndexKey();
const Key idxKey = catalogue()->currentIndexKey();

// store()->archive(idxKey, data_, size_, std::bind(&CatalogueCallback, current(), key, std::placeholders::_1));
store()->archive(idxKey, data_, size_, std::bind(&CatalogueWriter::archive, current(), key, std::placeholders::_1));
store()->archive(idxKey, data_, size_, std::bind(&CatalogueWriter::archive, catalogue(), key, std::placeholders::_1));

return true;
}
Expand Down
18 changes: 6 additions & 12 deletions src/fdb5/database/Archiver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ namespace fdb5 {

Archiver::Archiver(const Config& dbConfig) :
dbConfig_(dbConfig),
catalogue_(nullptr),
store_(nullptr) {}
db_(nullptr) {}

Archiver::~Archiver() {
flush(); // certify that all sessions are flushed before closing them
Expand Down Expand Up @@ -56,8 +55,8 @@ void Archiver::archive(const Key &key, BaseArchiveVisitor& visitor) {

void Archiver::flush() {
for (auto i = databases_.begin(); i != databases_.end(); ++i) {
i->second.store_->flush(); // flush the store
i->second.catalogue_->flush(); // flush the catalogue
// flush the store, pass the number of flushed fields to the catalogue
i->second.catalogue_->flush(i->second.store_->flush());
}
}

Expand All @@ -66,8 +65,7 @@ void Archiver::selectDatabase(const Key &dbKey) {
auto i = databases_.find(dbKey);

if (i != databases_.end() ) {
catalogue_ = i->second.catalogue_.get();
store_ = i->second.store_.get();
db_ = &(i->second);
i->second.time_ = ::time(0);
return;
}
Expand All @@ -86,8 +84,7 @@ void Archiver::selectDatabase(const Key &dbKey) {
}
}
if (found) {
databases_[oldK].store_->flush();
databases_[oldK].catalogue_->flush();
databases_[oldK].catalogue_->flush(databases_[oldK].store_->flush());

eckit::Log::info() << "Closing database " << *databases_[oldK].catalogue_ << std::endl;
databases_.erase(oldK);
Expand All @@ -105,10 +102,7 @@ void Archiver::selectDatabase(const Key &dbKey) {
}

std::unique_ptr<Store> str = cat->buildStore();
catalogue_ = cat.get();
store_ = str.get();

databases_[dbKey] = Database{::time(0), std::move(cat), std::move(str)};
db_ = &(databases_[dbKey] = Database{::time(0), std::move(cat), std::move(str)});
}

void Archiver::print(std::ostream &out) const {
Expand Down
3 changes: 1 addition & 2 deletions src/fdb5/database/Archiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ class Archiver : public eckit::NonCopyable {

std::vector<Key> prev_;

CatalogueWriter* catalogue_;
Store* store_;
Database* db_;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
19 changes: 10 additions & 9 deletions src/fdb5/database/BaseArchiveVisitor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@ BaseArchiveVisitor::BaseArchiveVisitor(Archiver &owner, const Key &dataKey) :
bool BaseArchiveVisitor::selectDatabase(const Key &dbKey, const Key&) {
LOG_DEBUG_LIB(LibFdb5) << "selectDatabase " << dbKey << std::endl;
owner_.selectDatabase(dbKey);
ASSERT(owner_.catalogue_);
owner_.catalogue_->deselectIndex();
catalogue()->deselectIndex();

return true;
}

bool BaseArchiveVisitor::selectIndex(const Key &idxKey, const Key&) {
// eckit::Log::info() << "selectIndex " << key << std::endl;
ASSERT(owner_.catalogue_);
return owner_.catalogue_->selectIndex(idxKey);
return catalogue()->selectIndex(idxKey);
}

void BaseArchiveVisitor::checkMissingKeys(const Key &full) {
Expand All @@ -47,16 +45,19 @@ void BaseArchiveVisitor::checkMissingKeys(const Key &full) {
}

const Schema& BaseArchiveVisitor::databaseSchema() const {
ASSERT(current());
return current()->schema();
return catalogue()->schema();
}

CatalogueWriter* BaseArchiveVisitor::current() const {
return owner_.catalogue_;
CatalogueWriter* BaseArchiveVisitor::catalogue() const {
ASSERT(owner_.db_);
ASSERT(owner_.db_->catalogue_);
return owner_.db_->catalogue_.get();
}

Store* BaseArchiveVisitor::store() const {
return owner_.store_;
ASSERT(owner_.db_);
ASSERT(owner_.db_->store_);
return owner_.db_->store_.get();
}

//----------------------------------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/BaseArchiveVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BaseArchiveVisitor : public WriteVisitor {

virtual const Schema& databaseSchema() const;

fdb5::CatalogueWriter* current() const;
fdb5::CatalogueWriter* catalogue() const;
fdb5::Store* store() const;

protected: // members
Expand Down
2 changes: 1 addition & 1 deletion src/fdb5/database/Catalogue.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class Catalogue {

virtual std::string type() const = 0;
virtual bool open() = 0;
virtual void flush() = 0;
virtual void flush(size_t archivedFields) = 0;
virtual void clean() = 0;
virtual void close() = 0;

Expand Down
4 changes: 2 additions & 2 deletions src/fdb5/database/Store.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Store {

virtual std::string type() const = 0;
virtual bool open() = 0;
virtual void flush() = 0;
virtual size_t flush() = 0;
virtual void close() = 0;

// virtual std::string owner() const = 0;
Expand All @@ -58,7 +58,7 @@ class Store {
virtual void remove(const Key& key) const { NOTIMP; }

virtual eckit::URI uri() const = 0;

};


Expand Down
13 changes: 3 additions & 10 deletions src/fdb5/io/FieldHandle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,6 @@ namespace fdb5 {

//----------------------------------------------------------------------------------------------------------------------

class ListElementDeduplicator : public metkit::hypercube::Deduplicator<ListElement> {
public:
bool toReplace(const ListElement& existing, const ListElement& replacement) const override {
return existing.timestamp() < replacement.timestamp();
}
};

//----------------------------------------------------------------------------------------------------------------------

FieldHandle::FieldHandle(ListIterator& it) :
datahandles_({}), totalSize_(0), currentIdx_(0), current_(nullptr), currentMemoryHandle_(false), buffer_(nullptr), sorted_(false), seekable_(true) {
ListElement el;
Expand Down Expand Up @@ -179,6 +170,8 @@ long FieldHandle::read1(char* buffer, long length) {
}

long FieldHandle::read(void* buffer, long length) {
long requested = length;

char* p = static_cast<char*>(buffer);
long n = 0;
long total = 0;
Expand All @@ -189,7 +182,7 @@ long FieldHandle::read(void* buffer, long length) {
p += n;
}

LOG_DEBUG_LIB(LibFdb5) << "FieldHandle::read " << (total > 0 ? total : n) << std::endl;
LOG_DEBUG_LIB(LibFdb5) << "FieldHandle::read - requested: " << requested << " read: " << (total > 0 ? total : n) << std::endl;

return total > 0 ? total : n;
}
Expand Down
9 changes: 9 additions & 0 deletions src/fdb5/io/FieldHandle.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ namespace fdb5 {

//----------------------------------------------------------------------------------------------------------------------

class ListElementDeduplicator : public metkit::hypercube::Deduplicator<ListElement> {
public:
bool toReplace(const ListElement& existing, const ListElement& replacement) const override {
return existing.timestamp() < replacement.timestamp();
}
};

//----------------------------------------------------------------------------------------------------------------------

class FieldHandle : public eckit::DataHandle {
public:

Expand Down
18 changes: 10 additions & 8 deletions src/fdb5/rados/RadosStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ namespace fdb5 {
//----------------------------------------------------------------------------------------------------------------------

RadosStore::RadosStore(const Key& key, const Config& config) :
Store(), directory_("mars:"+key.valuesToString()) {}
Store(), directory_("mars:"+key.valuesToString()), archivedFields_(0) {}

RadosStore(const Key& key, const Config& config, const eckit::net::Endpoint& controlEndpoint) :
Store(), directory_("mars:"+key.valuesToString()) {
Store(), directory_("mars:"+key.valuesToString()), archivedFields_(0) {
NOTIMP;
}

RadosStore::RadosStore(const eckit::URI& uri, const Config& config) :
Store(), directory_("mars:"+uri.path().dirName()) {}
Store(), directory_("mars:"+uri.path().dirName()), archivedFields_(0) {}

eckit::URI RadosStore::uri() const {
return URI("rados", directory_);
Expand All @@ -54,7 +54,7 @@ eckit::DataHandle* RadosStore::retrieve(Field& field, Key& remapKey) const {
}

std::unique_ptr<FieldLocation> RadosStore::archive(const uint32_t, const Key& key, const void *data, eckit::Length length) {
dirty_ = true;
archivedFields_++;

eckit::PathName dataPath = getDataPath(key);
eckit::URI dataUri("rados", dataPath);
Expand All @@ -70,16 +70,18 @@ std::unique_ptr<FieldLocation> RadosStore::archive(const uint32_t, const Key& ke
return std::unique_ptr<TocFieldLocation>(new RadosFieldLocation(dataUri, position, length));
}

void RadosStore::flush() {
if (!dirty_) {
return;
size_t RadosStore::flush() {
if (archivedFields_ == 0) {
return 0;
}

// ensure consistent state before writing Toc entry

flushDataHandles();

dirty_ = false;
size_t out = archivedFields_;
archivedFields_ = 0;
return out;
}

void RadosStore::close() {
Expand Down
6 changes: 3 additions & 3 deletions src/fdb5/rados/RadosStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RadosStore : public Store {
eckit::URI uri() const override;

bool open() override { return true; }
void flush() override;
size_t flush() override;
void close() override;

void checkUID() const override { /* nothing to do */ }
Expand Down Expand Up @@ -78,8 +78,8 @@ class RadosStore : public Store {

PathStore dataPaths_;
eckit::PathName directory_;

mutable bool dirty_;
size_t archivedFields_;
};

//----------------------------------------------------------------------------------------------------------------------
Expand Down
3 changes: 0 additions & 3 deletions src/fdb5/remote/FdbServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ void FDBForker::run() {

eckit::Log::info() << "FDB forked pid " << ::getpid() << std::endl;

// ServerConnection handler(socket_, config_);
// handler.handle();

if (config_.getString("type", "local") == "catalogue" || (::getenv("FDB_IS_CAT") && ::getenv("FDB_IS_CAT")[0] == '1')) {
eckit::Log::info() << "FDB using Catalogue Handler" << std::endl;
CatalogueHandler handler(socket_, config_);
Expand Down
Loading

0 comments on commit 26344a8

Please sign in to comment.