diff --git a/src/fdb5/api/FDB.cc b/src/fdb5/api/FDB.cc index 99f709ba2..3fc20f99f 100644 --- a/src/fdb5/api/FDB.cc +++ b/src/fdb5/api/FDB.cc @@ -156,16 +156,66 @@ eckit::DataHandle* FDB::read(const std::vector& uris, bool sorted) { } return result.dataHandle(); } - -eckit::DataHandle* FDB::read(ListIterator& it, bool sorted) { +eckit::DataHandle* FDB::read(bool seekable, ListIterator& it, bool sorted) { - return new FieldHandle(it); + if (seekable) { + return new FieldHandle(it); + } + + eckit::Timer timer; + timer.start(); + + HandleGatherer result(sorted); + ListElement el; + + static bool dedup = eckit::Resource("fdbDeduplicate;$FDB_DEDUPLICATE_FIELDS", false); + if (dedup) { + if (it.next(el)) { + // build the request representing the tensor-product of all retrieved fields + metkit::mars::MarsRequest cubeRequest = el.combinedKey().request(); + std::vector elements{el}; + + while (it.next(el)) { + cubeRequest.merge(el.combinedKey().request()); + elements.push_back(el); + } + + // checking all retrieved fields against the hypercube, to remove duplicates + ListElementDeduplicator dedup; + metkit::hypercube::HyperCubePayloaded cube(cubeRequest, dedup); + for(auto el: elements) { + cube.add(el.combinedKey().request(), el); + } + + if (cube.countVacant() > 0) { + std::stringstream ss; + ss << "No matching data for requests:" << std::endl; + for (auto req: cube.vacantRequests()) { + ss << " " << req << std::endl; + } + eckit::Log::warning() << ss.str() << std::endl; + } + + for (size_t i=0; i< cube.size(); i++) { + ListElement element; + if (cube.find(i, element)) { + result.add(element.location().dataHandle()); + } + } + } + } + else { + while (it.next(el)) { + result.add(el.location().dataHandle()); + } + } + return result.dataHandle(); } -eckit::DataHandle* FDB::retrieve(const metkit::mars::MarsRequest& request) { +eckit::DataHandle* FDB::retrieve(const metkit::mars::MarsRequest& request, bool seekable) { ListIterator it = inspect(request); - return read(it, sorted(request)); + return read(seekable, it, sorted(request)); } ListIterator FDB::inspect(const metkit::mars::MarsRequest& request) { diff --git a/src/fdb5/api/FDB.h b/src/fdb5/api/FDB.h index cb9879363..5d938cd46 100644 --- a/src/fdb5/api/FDB.h +++ b/src/fdb5/api/FDB.h @@ -85,9 +85,9 @@ class FDB { eckit::DataHandle* read(const std::vector& uris, bool sorted = false); - eckit::DataHandle* read(ListIterator& it, bool sorted = false); + eckit::DataHandle* read(bool seekable, ListIterator& it, bool sorted = false); - eckit::DataHandle* retrieve(const metkit::mars::MarsRequest& request); + eckit::DataHandle* retrieve(const metkit::mars::MarsRequest& request, bool seekable = false); ListIterator inspect(const metkit::mars::MarsRequest& request); diff --git a/src/fdb5/api/fdb_c.cc b/src/fdb5/api/fdb_c.cc index 9c2fa3363..e64f5fef8 100644 --- a/src/fdb5/api/fdb_c.cc +++ b/src/fdb5/api/fdb_c.cc @@ -349,7 +349,7 @@ int fdb_retrieve(fdb_handle_t* fdb, fdb_request_t* req, fdb_datareader_t* dr) { ASSERT(fdb); ASSERT(req); ASSERT(dr); - dr->set(fdb->retrieve(req->request())); + dr->set(fdb->retrieve(req->request(), true)); }); } int fdb_flush(fdb_handle_t* fdb) { diff --git a/src/fdb5/database/ArchiveVisitor.cc b/src/fdb5/database/ArchiveVisitor.cc index 77d4ba5ef..254a7453e 100644 --- a/src/fdb5/database/ArchiveVisitor.cc +++ b/src/fdb5/database/ArchiveVisitor.cc @@ -15,11 +15,6 @@ #include "fdb5/database/Catalogue.h" #include "fdb5/database/Store.h" -// namespace { -// void CatalogueCallback(fdb5::CatalogueWriter* catalogue, const fdb5::InspectionKey &key, std::unique_ptr fieldLocation) { -// catalogue->archive(key, std::move(fieldLocation)); -// } -// } namespace fdb5 { ArchiveVisitor::ArchiveVisitor(Archiver &owner, const Key &dataKey, const void *data, size_t size) : @@ -31,12 +26,10 @@ ArchiveVisitor::ArchiveVisitor(Archiver &owner, const Key &dataKey, const void * bool ArchiveVisitor::selectDatum(const InspectionKey &key, const Key &full) { - // eckit::Log::info() << "selectDatum " << key << ", " << full << " " << size_ << std::endl; checkMissingKeys(full); - const Key idxKey = current()->currentIndexKey(); + const Key idxKey = catalogue()->currentIndexKey(); - // store()->archive(idxKey, data_, size_, std::bind(&CatalogueCallback, current(), key, std::placeholders::_1)); - store()->archive(idxKey, data_, size_, std::bind(&CatalogueWriter::archive, current(), key, std::placeholders::_1)); + store()->archive(idxKey, data_, size_, std::bind(&CatalogueWriter::archive, catalogue(), key, std::placeholders::_1)); return true; } diff --git a/src/fdb5/database/Archiver.cc b/src/fdb5/database/Archiver.cc index 98e988c81..ec5a19a80 100644 --- a/src/fdb5/database/Archiver.cc +++ b/src/fdb5/database/Archiver.cc @@ -26,8 +26,7 @@ namespace fdb5 { Archiver::Archiver(const Config& dbConfig) : dbConfig_(dbConfig), - catalogue_(nullptr), - store_(nullptr) {} + db_(nullptr) {} Archiver::~Archiver() { flush(); // certify that all sessions are flushed before closing them @@ -56,8 +55,8 @@ void Archiver::archive(const Key &key, BaseArchiveVisitor& visitor) { void Archiver::flush() { for (auto i = databases_.begin(); i != databases_.end(); ++i) { - i->second.store_->flush(); // flush the store - i->second.catalogue_->flush(); // flush the catalogue + // flush the store, pass the number of flushed fields to the catalogue + i->second.catalogue_->flush(i->second.store_->flush()); } } @@ -66,8 +65,7 @@ void Archiver::selectDatabase(const Key &dbKey) { auto i = databases_.find(dbKey); if (i != databases_.end() ) { - catalogue_ = i->second.catalogue_.get(); - store_ = i->second.store_.get(); + db_ = &(i->second); i->second.time_ = ::time(0); return; } @@ -86,8 +84,7 @@ void Archiver::selectDatabase(const Key &dbKey) { } } if (found) { - databases_[oldK].store_->flush(); - databases_[oldK].catalogue_->flush(); + databases_[oldK].catalogue_->flush(databases_[oldK].store_->flush()); eckit::Log::info() << "Closing database " << *databases_[oldK].catalogue_ << std::endl; databases_.erase(oldK); @@ -105,10 +102,7 @@ void Archiver::selectDatabase(const Key &dbKey) { } std::unique_ptr str = cat->buildStore(); - catalogue_ = cat.get(); - store_ = str.get(); - - databases_[dbKey] = Database{::time(0), std::move(cat), std::move(str)}; + db_ = &(databases_[dbKey] = Database{::time(0), std::move(cat), std::move(str)}); } void Archiver::print(std::ostream &out) const { diff --git a/src/fdb5/database/Archiver.h b/src/fdb5/database/Archiver.h index 0beab4267..2bbf637a5 100644 --- a/src/fdb5/database/Archiver.h +++ b/src/fdb5/database/Archiver.h @@ -80,8 +80,7 @@ class Archiver : public eckit::NonCopyable { std::vector prev_; - CatalogueWriter* catalogue_; - Store* store_; + Database* db_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/database/BaseArchiveVisitor.cc b/src/fdb5/database/BaseArchiveVisitor.cc index 9338d35c1..d5748d0ac 100644 --- a/src/fdb5/database/BaseArchiveVisitor.cc +++ b/src/fdb5/database/BaseArchiveVisitor.cc @@ -28,16 +28,14 @@ BaseArchiveVisitor::BaseArchiveVisitor(Archiver &owner, const Key &dataKey) : bool BaseArchiveVisitor::selectDatabase(const Key &dbKey, const Key&) { LOG_DEBUG_LIB(LibFdb5) << "selectDatabase " << dbKey << std::endl; owner_.selectDatabase(dbKey); - ASSERT(owner_.catalogue_); - owner_.catalogue_->deselectIndex(); + catalogue()->deselectIndex(); return true; } bool BaseArchiveVisitor::selectIndex(const Key &idxKey, const Key&) { // eckit::Log::info() << "selectIndex " << key << std::endl; - ASSERT(owner_.catalogue_); - return owner_.catalogue_->selectIndex(idxKey); + return catalogue()->selectIndex(idxKey); } void BaseArchiveVisitor::checkMissingKeys(const Key &full) { @@ -47,16 +45,19 @@ void BaseArchiveVisitor::checkMissingKeys(const Key &full) { } const Schema& BaseArchiveVisitor::databaseSchema() const { - ASSERT(current()); - return current()->schema(); + return catalogue()->schema(); } -CatalogueWriter* BaseArchiveVisitor::current() const { - return owner_.catalogue_; +CatalogueWriter* BaseArchiveVisitor::catalogue() const { + ASSERT(owner_.db_); + ASSERT(owner_.db_->catalogue_); + return owner_.db_->catalogue_.get(); } Store* BaseArchiveVisitor::store() const { - return owner_.store_; + ASSERT(owner_.db_); + ASSERT(owner_.db_->store_); + return owner_.db_->store_.get(); } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/database/BaseArchiveVisitor.h b/src/fdb5/database/BaseArchiveVisitor.h index c3d7dd071..8d8e55f1f 100644 --- a/src/fdb5/database/BaseArchiveVisitor.h +++ b/src/fdb5/database/BaseArchiveVisitor.h @@ -45,7 +45,7 @@ class BaseArchiveVisitor : public WriteVisitor { virtual const Schema& databaseSchema() const; - fdb5::CatalogueWriter* current() const; + fdb5::CatalogueWriter* catalogue() const; fdb5::Store* store() const; protected: // members diff --git a/src/fdb5/database/Catalogue.h b/src/fdb5/database/Catalogue.h index d8f050f74..17e90f8c8 100644 --- a/src/fdb5/database/Catalogue.h +++ b/src/fdb5/database/Catalogue.h @@ -88,7 +88,7 @@ class Catalogue { virtual std::string type() const = 0; virtual bool open() = 0; - virtual void flush() = 0; + virtual void flush(size_t archivedFields) = 0; virtual void clean() = 0; virtual void close() = 0; diff --git a/src/fdb5/database/Store.h b/src/fdb5/database/Store.h index ecf3e5695..f39a88018 100644 --- a/src/fdb5/database/Store.h +++ b/src/fdb5/database/Store.h @@ -46,7 +46,7 @@ class Store { virtual std::string type() const = 0; virtual bool open() = 0; - virtual void flush() = 0; + virtual size_t flush() = 0; virtual void close() = 0; // virtual std::string owner() const = 0; @@ -58,7 +58,7 @@ class Store { virtual void remove(const Key& key) const { NOTIMP; } virtual eckit::URI uri() const = 0; - + }; diff --git a/src/fdb5/io/FieldHandle.cc b/src/fdb5/io/FieldHandle.cc index 625623128..f89ddefb4 100644 --- a/src/fdb5/io/FieldHandle.cc +++ b/src/fdb5/io/FieldHandle.cc @@ -26,15 +26,6 @@ namespace fdb5 { //---------------------------------------------------------------------------------------------------------------------- -class ListElementDeduplicator : public metkit::hypercube::Deduplicator { -public: - bool toReplace(const ListElement& existing, const ListElement& replacement) const override { - return existing.timestamp() < replacement.timestamp(); - } -}; - -//---------------------------------------------------------------------------------------------------------------------- - FieldHandle::FieldHandle(ListIterator& it) : datahandles_({}), totalSize_(0), currentIdx_(0), current_(nullptr), currentMemoryHandle_(false), buffer_(nullptr), sorted_(false), seekable_(true) { ListElement el; @@ -179,6 +170,8 @@ long FieldHandle::read1(char* buffer, long length) { } long FieldHandle::read(void* buffer, long length) { + long requested = length; + char* p = static_cast(buffer); long n = 0; long total = 0; @@ -189,7 +182,7 @@ long FieldHandle::read(void* buffer, long length) { p += n; } - LOG_DEBUG_LIB(LibFdb5) << "FieldHandle::read " << (total > 0 ? total : n) << std::endl; + LOG_DEBUG_LIB(LibFdb5) << "FieldHandle::read - requested: " << requested << " read: " << (total > 0 ? total : n) << std::endl; return total > 0 ? total : n; } diff --git a/src/fdb5/io/FieldHandle.h b/src/fdb5/io/FieldHandle.h index eb64ff47a..679279283 100644 --- a/src/fdb5/io/FieldHandle.h +++ b/src/fdb5/io/FieldHandle.h @@ -22,6 +22,15 @@ namespace fdb5 { //---------------------------------------------------------------------------------------------------------------------- +class ListElementDeduplicator : public metkit::hypercube::Deduplicator { +public: + bool toReplace(const ListElement& existing, const ListElement& replacement) const override { + return existing.timestamp() < replacement.timestamp(); + } +}; + +//---------------------------------------------------------------------------------------------------------------------- + class FieldHandle : public eckit::DataHandle { public: diff --git a/src/fdb5/rados/RadosStore.cc b/src/fdb5/rados/RadosStore.cc index 8f9cbadcb..cdc2ad274 100644 --- a/src/fdb5/rados/RadosStore.cc +++ b/src/fdb5/rados/RadosStore.cc @@ -29,15 +29,15 @@ namespace fdb5 { //---------------------------------------------------------------------------------------------------------------------- RadosStore::RadosStore(const Key& key, const Config& config) : - Store(), directory_("mars:"+key.valuesToString()) {} + Store(), directory_("mars:"+key.valuesToString()), archivedFields_(0) {} RadosStore(const Key& key, const Config& config, const eckit::net::Endpoint& controlEndpoint) : - Store(), directory_("mars:"+key.valuesToString()) { + Store(), directory_("mars:"+key.valuesToString()), archivedFields_(0) { NOTIMP; } RadosStore::RadosStore(const eckit::URI& uri, const Config& config) : - Store(), directory_("mars:"+uri.path().dirName()) {} + Store(), directory_("mars:"+uri.path().dirName()), archivedFields_(0) {} eckit::URI RadosStore::uri() const { return URI("rados", directory_); @@ -54,7 +54,7 @@ eckit::DataHandle* RadosStore::retrieve(Field& field, Key& remapKey) const { } std::unique_ptr RadosStore::archive(const uint32_t, const Key& key, const void *data, eckit::Length length) { - dirty_ = true; + archivedFields_++; eckit::PathName dataPath = getDataPath(key); eckit::URI dataUri("rados", dataPath); @@ -70,16 +70,18 @@ std::unique_ptr RadosStore::archive(const uint32_t, const Key& ke return std::unique_ptr(new RadosFieldLocation(dataUri, position, length)); } -void RadosStore::flush() { - if (!dirty_) { - return; +size_t RadosStore::flush() { + if (archivedFields_ == 0) { + return 0; } // ensure consistent state before writing Toc entry flushDataHandles(); - dirty_ = false; + size_t out = archivedFields_; + archivedFields_ = 0; + return out; } void RadosStore::close() { diff --git a/src/fdb5/rados/RadosStore.h b/src/fdb5/rados/RadosStore.h index 4b03c31de..7bf8eba9e 100644 --- a/src/fdb5/rados/RadosStore.h +++ b/src/fdb5/rados/RadosStore.h @@ -39,7 +39,7 @@ class RadosStore : public Store { eckit::URI uri() const override; bool open() override { return true; } - void flush() override; + size_t flush() override; void close() override; void checkUID() const override { /* nothing to do */ } @@ -78,8 +78,8 @@ class RadosStore : public Store { PathStore dataPaths_; eckit::PathName directory_; - - mutable bool dirty_; + + size_t archivedFields_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/FdbServer.cc b/src/fdb5/remote/FdbServer.cc index c7f053634..e9e9d53be 100644 --- a/src/fdb5/remote/FdbServer.cc +++ b/src/fdb5/remote/FdbServer.cc @@ -47,9 +47,6 @@ void FDBForker::run() { eckit::Log::info() << "FDB forked pid " << ::getpid() << std::endl; - // ServerConnection handler(socket_, config_); - // handler.handle(); - if (config_.getString("type", "local") == "catalogue" || (::getenv("FDB_IS_CAT") && ::getenv("FDB_IS_CAT")[0] == '1')) { eckit::Log::info() << "FDB using Catalogue Handler" << std::endl; CatalogueHandler handler(socket_, config_); diff --git a/src/fdb5/remote/client/RemoteCatalogue.cc b/src/fdb5/remote/client/RemoteCatalogue.cc index 984c8861c..8c2596a53 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.cc +++ b/src/fdb5/remote/client/RemoteCatalogue.cc @@ -60,9 +60,6 @@ void RemoteCatalogue::sendArchiveData(uint32_t id, const Key& key, std::unique_p void RemoteCatalogue::archive(const InspectionKey& key, std::unique_ptr fieldLocation) { - // eckit::Timer timer; - // timer.start(); - ASSERT(!key.empty()); ASSERT(fieldLocation); @@ -85,9 +82,6 @@ void RemoteCatalogue::archive(const InspectionKey& key, std::unique_ptr{buffer, stream.position()}); dataWrite(Message::Blob, id, payloads); - // timer.stop(); - - // archivalStats_.addArchive(0, timer, 0); } bool RemoteCatalogue::selectIndex(const Key& idxKey) { @@ -110,13 +104,12 @@ const Schema& RemoteCatalogue::schema() const { return *schema_; } -void RemoteCatalogue::flush() { +void RemoteCatalogue::flush(size_t archivedFields) { - // Timer timer; + std::lock_guard lock(archiveMutex_); - // timer.start(); + ASSERT(archivedFields == numLocations_); - std::lock_guard lock(archiveMutex_); // Flush only does anything if there is an ongoing archive(); if (numLocations_ > 0) { @@ -132,9 +125,6 @@ void RemoteCatalogue::flush() { numLocations_ = 0; } - - // timer.stop(); - // internalStats_.addFlush(timer); } void RemoteCatalogue::clean() {NOTIMP;} @@ -189,17 +179,6 @@ void RemoteCatalogue::handleException(std::exception_ptr e) { NOTIMP; } -// Catalogue Reader -// DbStats RemoteCatalogue::stats() const { -// NOTIMP; -// } -// bool RemoteCatalogue::axis(const std::string& keyword, eckit::StringSet& s) const { -// NOTIMP; -// } -// bool RemoteCatalogue::retrieve(const InspectionKey& key, Field& field) const{ -// NOTIMP; -// } - void RemoteCatalogue::overlayDB(const Catalogue& otherCatalogue, const std::set& variableKeys, bool unmount) {NOTIMP;} void RemoteCatalogue::index(const InspectionKey& key, const eckit::URI& uri, eckit::Offset offset, eckit::Length length) {NOTIMP;} void RemoteCatalogue::reconsolidate(){NOTIMP;} @@ -214,7 +193,10 @@ void RemoteCatalogue::control(const ControlAction& action, const ControlIdentifi std::vector RemoteCatalogue::indexes(bool sorted) const {NOTIMP;} void RemoteCatalogue::maskIndexEntry(const Index& index) const {NOTIMP;} void RemoteCatalogue::allMasked(std::set>& metadata, std::set& data) const {NOTIMP;} -void RemoteCatalogue::print( std::ostream &out ) const {NOTIMP;} +void RemoteCatalogue::print( std::ostream &out ) const { + out << "RemoteCatalogue(endpoint=" << controlEndpoint() << ",clientID=" << clientId() << ")"; +} + std::string RemoteCatalogue::type() const { return "remote"; diff --git a/src/fdb5/remote/client/RemoteCatalogue.h b/src/fdb5/remote/client/RemoteCatalogue.h index d4d15c043..08c573b61 100644 --- a/src/fdb5/remote/client/RemoteCatalogue.h +++ b/src/fdb5/remote/client/RemoteCatalogue.h @@ -53,7 +53,7 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C void print( std::ostream &out ) const override; std::string type() const override; bool open() override; - void flush() override; + void flush(size_t archivedFields) override; void clean() override; void close() override; bool exists() const override; @@ -87,9 +87,6 @@ class RemoteCatalogue : public CatalogueReader, public CatalogueWriter, public C std::mutex archiveMutex_; size_t numLocations_; - - // // not owning - // RemoteCatalogueArchiver* archiver_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/client/RemoteStore.cc b/src/fdb5/remote/client/RemoteStore.cc index ac7cb8b03..6215bcfe6 100644 --- a/src/fdb5/remote/client/RemoteStore.cc +++ b/src/fdb5/remote/client/RemoteStore.cc @@ -87,7 +87,6 @@ class FDBRemoteDataHandle : public DataHandle { if (currentBuffer_.size() != 0) return bufferRead(pos, sz); // If we are in the DataHandle, then there MUST be data to read - RemoteStore::StoredMessage msg = std::make_pair(remote::Message{}, eckit::Buffer{0}); ASSERT(queue_.pop(msg) != -1); @@ -183,14 +182,14 @@ RemoteStore::RemoteStore(const Key& dbKey, const Config& config) : Client(storeEndpoints(config)), dbKey_(dbKey), config_(config), retrieveMessageQueue_(eckit::Resource("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200)), - dirty_(false), flushRequested_(false) {} + fieldsArchived_(0), locationsReceived_(0) {} // this is used only in retrieval, with an URI already referring to an accessible Store RemoteStore::RemoteStore(const eckit::URI& uri, const Config& config) : Client(eckit::net::Endpoint(uri.hostport()), uri.hostport()), dbKey_(Key()), config_(config), retrieveMessageQueue_(eckit::Resource("fdbRemoteRetrieveQueueLength;$FDB_REMOTE_RETRIEVE_QUEUE_LENGTH", 200)), - dirty_(false), flushRequested_(false) { + fieldsArchived_(0), locationsReceived_(0) { // no need to set the local_ flag on the read path ASSERT(uri.scheme() == "fdb"); @@ -201,7 +200,7 @@ RemoteStore::~RemoteStore() { // an error. n.b. if we don't do something, we will block in the destructor // of std::future. if (archivalCompleted_.valid() || !locations_.empty()) { - Log::error() << "Attempting to destruct RemoteStore with active archival" << std::endl; + Log::error() << "Attempting to destruct RemoteStore with active archival - location to receive: " << locations_.size() << " archivalCompleted_.valid() " << archivalCompleted_.valid() << std::endl; eckit::Main::instance().terminate(); } } @@ -220,9 +219,6 @@ eckit::DataHandle* RemoteStore::retrieve(Field& field) const { void RemoteStore::archive(const Key& key, const void *data, eckit::Length length, std::function fieldLocation)> catalogue_archive) { - eckit::Timer timer; - timer.start(); - ASSERT(!key.empty()); ASSERT(data); ASSERT(length != 0); @@ -230,16 +226,13 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length uint32_t id = connection_.generateRequestID(); { std::lock_guard lock(archiveMutex_); - if (!dirty_) { // if this is the first archival request, notify the server - ASSERT(archivalStats_.numArchive() == 0); - ASSERT(!archivalCompleted_.valid()); + if (fieldsArchived_ == 0) { // if this is the first archival request, notify the server ASSERT(locations_.size() == 0); - archivalCompleted_ = fieldLocationsReceived_.get_future(); controlWriteCheckResponse(Message::Store, id, true); - dirty_=true; } } + fieldsArchived_++; { std::lock_guard lock(locationMutex_); @@ -256,60 +249,57 @@ void RemoteStore::archive(const Key& key, const void *data, eckit::Length length payloads.push_back(std::pair{data, length}); dataWrite(Message::Blob, id, payloads); - - timer.stop(); - - archivalStats_.addArchive(length, timer, 1); } bool RemoteStore::open() { return true; } -FDBStats RemoteStore::archivalCompleted() { +size_t RemoteStore::flush() { - if (flushRequested_ && (archivalStats_.numArchive() == archivalStats_.numLocation()) && locations_.empty()) { - fieldLocationsReceived_.set_value(archivalStats_); + // Flush only does anything if there is an ongoing archive(); + if (fieldsArchived_ == 0) { + return 0; } - FDBStats stats = archivalCompleted_.get(); - - ASSERT(locations_.empty()); - archivalStats_ = FDBStats{}; - return stats; -} - -void RemoteStore::flush() { - - Timer timer; - - timer.start(); + LOG_DEBUG_LIB(LibFdb5) << " RemoteStore::flush - fieldsArchived_ " << fieldsArchived_ << " locationsReceived_ " << locationsReceived_ << std::endl; - flushRequested_ = true; - - // Flush only does anything if there is an ongoing archive(); - std::lock_guard lock(archiveMutex_); - if (archivalCompleted_.valid()) { + size_t locations; + bool wait = true; + { + std::lock_guard lock(locationMutex_); + if ((fieldsArchived_ > locationsReceived_) || !locations_.empty()) { + promiseArchivalCompleted_ = std::promise{}; + archivalCompleted_ = promiseArchivalCompleted_.get_future(); + } else { + wait = false; + } + } + if (wait) { // wait for archival completion (received all fieldLocations) - FDBStats stats = archivalCompleted(); - - if (stats.numArchive() > 0) { - Buffer sendBuf(1024); - MemoryStream s(sendBuf); - s << stats.numArchive(); + archivalCompleted_.wait(); + locations = archivalCompleted_.get(); + } else { + locations = locationsReceived_; + } - LOG_DEBUG_LIB(LibFdb5) << " RemoteStore::flush - flushing " << stats.numArchive() << " fields" << std::endl; - // The flush call is blocking - uint32_t id = generateRequestID(); - controlWriteCheckResponse(Message::Flush, id, false, sendBuf, s.position()); - } - dirty_ = false; + ASSERT(locations_.empty()); + if (locations > 0) { + Buffer sendBuf(1024); + MemoryStream s(sendBuf); + s << locations; + + LOG_DEBUG_LIB(LibFdb5) << " RemoteStore::flush - flushing " << locations << " fields" << std::endl; + // The flush call is blocking + uint32_t id = generateRequestID(); + controlWriteCheckResponse(Message::Flush, id, false, sendBuf, s.position()); } - timer.stop(); - flushRequested_ = false; - internalStats_.addFlush(timer); + fieldsArchived_ = 0; + locationsReceived_ = 0; + + return locations; } void RemoteStore::close() { @@ -382,10 +372,10 @@ bool RemoteStore::handle(Message message, bool control, uint32_t requestID, ecki } locations_.erase(it); - archivalStats_.addLocation(); + locationsReceived_++; - if (flushRequested_ && (archivalStats_.numArchive() == archivalStats_.numLocation()) && locations_.empty()) { - fieldLocationsReceived_.set_value(archivalStats_); + if (archivalCompleted_.valid() && (fieldsArchived_ == locationsReceived_) && locations_.empty()) { + promiseArchivalCompleted_.set_value(locationsReceived_); } return true; } diff --git a/src/fdb5/remote/client/RemoteStore.h b/src/fdb5/remote/client/RemoteStore.h index e2be1661f..ec6f5e4c6 100644 --- a/src/fdb5/remote/client/RemoteStore.h +++ b/src/fdb5/remote/client/RemoteStore.h @@ -47,7 +47,7 @@ class RemoteStore : public Store, public Client { eckit::URI uri() const override; bool open() override; - void flush() override; + size_t flush() override; void close() override; void checkUID() const override { } @@ -77,10 +77,6 @@ class RemoteStore : public Store, public Client { private: // methods - FDBStats archivalCompleted(); - - void flush(FDBStats& stats); - // handlers for incoming messages - to be defined in the client class bool handle(Message message, bool control, uint32_t requestID) override; bool handle(Message message, bool control, uint32_t requestID, eckit::Buffer&& payload) override; @@ -102,13 +98,11 @@ class RemoteStore : public Store, public Client { std::mutex locationMutex_; std::map fieldLocation)>> locations_; - FDBStats internalStats_; - FDBStats archivalStats_; - std::promise fieldLocationsReceived_; - std::future archivalCompleted_; + size_t fieldsArchived_; + size_t locationsReceived_; + std::promise promiseArchivalCompleted_; + std::future archivalCompleted_; std::mutex archiveMutex_; - bool dirty_; - bool flushRequested_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/remote/server/CatalogueHandler.cc b/src/fdb5/remote/server/CatalogueHandler.cc index 51d00b9c8..b56effb2e 100644 --- a/src/fdb5/remote/server/CatalogueHandler.cc +++ b/src/fdb5/remote/server/CatalogueHandler.cc @@ -365,11 +365,14 @@ void CatalogueHandler::flush(uint32_t clientID, uint32_t requestID, eckit::Buffe ASSERT(it != catalogues_.end()); it->second.locationsExpected = numArchived; + it->second.archivalCompleted = it->second.fieldLocationsReceived.get_future(); + if (it->second.locationsArchived < numArchived) { - it->second.archivalCompleted.get(); + it->second.archivalCompleted.wait(); + it->second.fieldLocationsReceived = std::promise{}; } - it->second.catalogue->flush(); + it->second.catalogue->flush(numArchived); Log::info() << "Flush complete" << std::endl; Log::status() << "Flush complete" << std::endl; @@ -403,7 +406,7 @@ void CatalogueHandler::archiveBlob(const uint32_t clientID, const uint32_t reque it->second.catalogue->selectIndex(idxKey); it->second.catalogue->archive(key, std::move(location)); it->second.locationsArchived++; - if (it->second.locationsExpected > 0 && it->second.locationsExpected == it->second.locationsArchived) { + if (it->second.archivalCompleted.valid() && it->second.locationsExpected == it->second.locationsArchived) { it->second.fieldLocationsReceived.set_value(it->second.locationsExpected); } } diff --git a/src/fdb5/remote/server/CatalogueHandler.h b/src/fdb5/remote/server/CatalogueHandler.h index 90826144a..095711828 100644 --- a/src/fdb5/remote/server/CatalogueHandler.h +++ b/src/fdb5/remote/server/CatalogueHandler.h @@ -20,19 +20,17 @@ namespace fdb5::remote { struct CatalogueArchiver { CatalogueArchiver(bool dataConnection, const Key& dbKey, const Config& config) : controlConnection(true), dataConnection(dataConnection), - catalogue(CatalogueWriterFactory::instance().build(dbKey, config)), locationsExpected(-1), locationsArchived(0) { - archivalCompleted = fieldLocationsReceived.get_future(); - } + catalogue(CatalogueWriterFactory::instance().build(dbKey, config)), locationsExpected(0), locationsArchived(0) {} bool controlConnection; bool dataConnection; std::unique_ptr catalogue; - int32_t locationsExpected; - int32_t locationsArchived; + size_t locationsExpected; + size_t locationsArchived; - std::promise fieldLocationsReceived; - std::future archivalCompleted; + std::promise fieldLocationsReceived; + std::future archivalCompleted; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/fdb5/toc/AdoptVisitor.cc b/src/fdb5/toc/AdoptVisitor.cc index 254272ea8..3988d3b9a 100644 --- a/src/fdb5/toc/AdoptVisitor.cc +++ b/src/fdb5/toc/AdoptVisitor.cc @@ -35,11 +35,11 @@ bool AdoptVisitor::selectDatum(const InspectionKey &key, const Key &full) { // Log::info() << "selectDatum " << key << ", " << full << " " << length_ << std::endl; checkMissingKeys(full); - CatalogueWriter* catalogue = current(); - ASSERT(catalogue); + CatalogueWriter* cat = catalogue(); + ASSERT(cat); - if (catalogue->type() == TocEngine::typeName()) { - catalogue->index(key, eckit::URI("file", path_), offset_, length_); + if (cat->type() == TocEngine::typeName()) { + cat->index(key, eckit::URI("file", path_), offset_, length_); return true; } return false; diff --git a/src/fdb5/toc/TocCatalogueReader.h b/src/fdb5/toc/TocCatalogueReader.h index d0eef481b..6068a1839 100644 --- a/src/fdb5/toc/TocCatalogueReader.h +++ b/src/fdb5/toc/TocCatalogueReader.h @@ -43,7 +43,7 @@ class TocCatalogueReader : public TocCatalogue, public CatalogueReader { void deselectIndex() override; bool open() override; - void flush() override {} + void flush(size_t archivedFields) override {} void clean() override {} void close() override; diff --git a/src/fdb5/toc/TocCatalogueWriter.cc b/src/fdb5/toc/TocCatalogueWriter.cc index a58c091cb..cbc600bdc 100644 --- a/src/fdb5/toc/TocCatalogueWriter.cc +++ b/src/fdb5/toc/TocCatalogueWriter.cc @@ -33,7 +33,8 @@ namespace fdb5 { TocCatalogueWriter::TocCatalogueWriter(const Key &dbKey, const fdb5::Config& config) : TocCatalogue(dbKey, config), - umask_(config.umask()) { + umask_(config.umask()), + archivedLocations_(0) { writeInitRecord(dbKey); TocCatalogue::loadSchema(); TocCatalogue::checkUID(); @@ -41,7 +42,8 @@ TocCatalogueWriter::TocCatalogueWriter(const Key &dbKey, const fdb5::Config& con TocCatalogueWriter::TocCatalogueWriter(const eckit::URI &uri, const fdb5::Config& config) : TocCatalogue(uri.path(), ControlIdentifiers{}, config), - umask_(config.umask()) { + umask_(config.umask()), + archivedLocations_(0) { writeInitRecord(TocCatalogue::key()); TocCatalogue::loadSchema(); TocCatalogue::checkUID(); @@ -110,7 +112,7 @@ void TocCatalogueWriter::clean() { LOG_DEBUG_LIB(LibFdb5) << "Closing path " << directory_ << std::endl; - flush(); // closes the TOC entries & indexes but not data files + flush(archivedLocations_); // closes the TOC entries & indexes but not data files compactSubTocIndexes(); @@ -123,7 +125,7 @@ void TocCatalogueWriter::close() { } void TocCatalogueWriter::index(const InspectionKey &key, const eckit::URI &uri, eckit::Offset offset, eckit::Length length) { - dirty_ = true; + archivedLocations_++; if (current_.null()) { ASSERT(!currentIndexKey_.empty()); @@ -301,7 +303,7 @@ bool TocCatalogueWriter::enabled(const ControlIdentifier& controlIdentifier) con } void TocCatalogueWriter::archive(const InspectionKey& key, std::unique_ptr fieldLocation) { - dirty_ = true; + archivedLocations_++; if (current_.null()) { ASSERT(!currentIndexKey_.empty()); @@ -316,14 +318,16 @@ void TocCatalogueWriter::archive(const InspectionKey& key, std::unique_ptr(-1)), - userUID_(::getuid()), - dirty_(false) {} + userUID_(::getuid()) {} void TocCommon::checkUID() const { static bool fdbOnlyCreatorCanWrite = eckit::Resource("fdbOnlyCreatorCanWrite", true); diff --git a/src/fdb5/toc/TocCommon.h b/src/fdb5/toc/TocCommon.h index d6caf21b4..6d967207a 100644 --- a/src/fdb5/toc/TocCommon.h +++ b/src/fdb5/toc/TocCommon.h @@ -50,8 +50,6 @@ class TocCommon { mutable uid_t dbUID_; uid_t userUID_; - - mutable bool dirty_; }; } diff --git a/src/fdb5/toc/TocHandler.cc b/src/fdb5/toc/TocHandler.cc index 71d90c866..b5cdd040e 100644 --- a/src/fdb5/toc/TocHandler.cc +++ b/src/fdb5/toc/TocHandler.cc @@ -131,7 +131,8 @@ TocHandler::TocHandler(const eckit::PathName& directory, const Config& config) : cachedToc_(nullptr), count_(0), enumeratedMaskedEntries_(false), - writeMode_(false) + writeMode_(false), + dirty_(false) { // An override to enable using sub tocs without configurations being passed in, for ease // of debugging @@ -154,7 +155,8 @@ TocHandler::TocHandler(const eckit::PathName& path, const Key& parentKey) : cachedToc_(nullptr), count_(0), enumeratedMaskedEntries_(false), - writeMode_(false) + writeMode_(false), + dirty_(false) { /// Are we remapping a mounted DB? if (exists()) { diff --git a/src/fdb5/toc/TocHandler.h b/src/fdb5/toc/TocHandler.h index d55d13789..7b5c2281c 100644 --- a/src/fdb5/toc/TocHandler.h +++ b/src/fdb5/toc/TocHandler.h @@ -262,6 +262,8 @@ class TocHandler : public TocCommon, private eckit::NonCopyable { mutable bool enumeratedMaskedEntries_; mutable bool writeMode_; + + mutable bool dirty_; }; diff --git a/src/fdb5/toc/TocStore.cc b/src/fdb5/toc/TocStore.cc index 973aab34a..1cbcd164f 100644 --- a/src/fdb5/toc/TocStore.cc +++ b/src/fdb5/toc/TocStore.cc @@ -35,10 +35,10 @@ namespace fdb5 { //---------------------------------------------------------------------------------------------------------------------- TocStore::TocStore(const Key& key, const Config& config) : - Store(), TocCommon(StoreRootManager(config).directory(key).directory_) {} + Store(), TocCommon(StoreRootManager(config).directory(key).directory_), archivedFields_(0) {} TocStore::TocStore(const eckit::URI& uri, const Config& config) : - Store(), TocCommon(uri.path().dirName()) {} + Store(), TocCommon(uri.path().dirName()), archivedFields_(0) {} eckit::URI TocStore::uri() const { return URI("file", directory_); @@ -53,8 +53,7 @@ eckit::DataHandle* TocStore::retrieve(Field& field) const { } std::unique_ptr TocStore::archive(const Key& key, const void *data, eckit::Length length) { - - dirty_ = true; + archivedFields_++; eckit::PathName dataPath = getDataPath(key); @@ -69,16 +68,18 @@ std::unique_ptr TocStore::archive(const Key& key, const void *dat return std::unique_ptr(new TocFieldLocation(dataPath, position, length, Key())); } -void TocStore::flush() { - if (!dirty_) { - return; +size_t TocStore::flush() { + if (archivedFields_ == 0) { + return 0; } // ensure consistent state before writing Toc entry - flushDataHandles(); - dirty_ = false; + size_t out = archivedFields_; + archivedFields_ = 0; + + return out; } void TocStore::close() { diff --git a/src/fdb5/toc/TocStore.h b/src/fdb5/toc/TocStore.h index 30d7be60f..2c972a6a7 100644 --- a/src/fdb5/toc/TocStore.h +++ b/src/fdb5/toc/TocStore.h @@ -42,7 +42,7 @@ class TocStore : public Store, public TocCommon { eckit::URI uri() const override; bool open() override { return true; } - void flush() override; + size_t flush() override; void close() override; void checkUID() const override { TocCommon::checkUID(); } @@ -84,7 +84,7 @@ class TocStore : public Store, public TocCommon { HandleStore handles_; ///< stores the DataHandles being used by the Session mutable PathStore dataPaths_; - + size_t archivedFields_; }; //----------------------------------------------------------------------------------------------------------------------