Skip to content

Commit

Permalink
fixed handler termination (closed archivalQueue)
Browse files Browse the repository at this point in the history
  • Loading branch information
danovaro committed Mar 12, 2024
1 parent c89ada3 commit 4fb2629
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 51 deletions.
19 changes: 0 additions & 19 deletions src/fdb5/remote/FdbServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,12 @@ void FDBForker::run() {
eckit::Log::info() << "FDB using Catalogue Handler" << std::endl;
CatalogueHandler handler(socket_, config_);
handler.handle();
stop();
}
else if (config_.getString("type", "local") == "store" || (::getenv("FDB_IS_STORE") && ::getenv("FDB_IS_STORE")[0] == '1')) {
eckit::Log::info() << "FDB using Store Handler" << std::endl;
StoreHandler handler(socket_, config_);
handler.handle();
stop();
}
// else {
// eckit::Log::info() << "FDB using Remote Handler" << std::endl;
// RemoteHandler handler(socket_, config_);
// handler.handle();
// }
}

//----------------------------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -98,12 +91,8 @@ FDBServerThread::FDBServerThread(net::TCPSocket& socket, const Config& config) :
config_(config) {}

void FDBServerThread::run() {
std::cout << "FDBServerThread::run()" << std::endl;
eckit::Log::info() << "FDB started handler thread" << std::endl;

// ServerConnection handler(socket_, config_);
// handler.handle();

if (config_.getString("type", "local") == "catalogue" || (::getenv("FDB_IS_CAT") && ::getenv("FDB_IS_CAT")[0] == '1')) {
eckit::Log::info() << "FDB using Catalogue Handler" << std::endl;
CatalogueHandler handler(socket_, config_);
Expand All @@ -114,14 +103,6 @@ void FDBServerThread::run() {
StoreHandler handler(socket_, config_);
handler.handle();
}
// else {
// eckit::Log::info() << "FDB using Remote Handler" << std::endl;
// RemoteHandler handler(socket_, config_);
// handler.handle();
// }

// // RemoteHandler handler(socket_, config_);
// // handler.handle();
}

//----------------------------------------------------------------------------------------------------------------------
Expand Down
34 changes: 2 additions & 32 deletions src/fdb5/remote/server/ServerConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,6 @@ ServerConnection::~ServerConnection() {
// We don't want to die before the worker threads are cleaned up
waitForWorkers();

std::this_thread::sleep_for(std::chrono::milliseconds(500));

// And notify the client that we are done.
// eckit::Log::info() << "Sending exit message to client" << std::endl;
// // write(Message::Exit, true, 0, 0);
// write(Message::Exit, false, 0, 0);
eckit::Log::info() << "Done" << std::endl;
}

Expand Down Expand Up @@ -426,32 +420,14 @@ void ServerConnection::listeningThreadLoopData() {
}
}

// // Trigger cleanup of the workers
// auto q = archiveQueues_.find(archiverID);
// ASSERT(q != archiveQueues_.end());
// q->second.close();

// auto w = archiveFuture_.find(archiverID);
// ASSERT(w != archiveFuture_.end());
// // Ensure worker is done
// ASSERT(w->second.valid());
// totalArchived = worker.get(); // n.b. use of async, get() propagates any exceptions.
}
catch (std::exception& e) {
// n.b. more general than eckit::Exception
error(e.what(), hdr.clientID(), hdr.requestID);
// auto q = archiveQueues_.find(archiverID);
// if(q != archiveQueues_.end()) {
// q->second.interrupt(std::current_exception());
// }
throw;
}
catch (...) {
error("Caught unexpected, unknown exception in retrieve worker", hdr.clientID(), hdr.requestID);
// auto q = archiveQueues_.find(archiverID);
// if(q != archiveQueues_.end()) {
// q->second.interrupt(std::current_exception());
// }
throw;
}
}
Expand All @@ -460,9 +436,6 @@ void ServerConnection::handle() {
initialiseConnections();

std::thread listeningThreadData;
// if (!single_) {
// listeningThreadData = std::thread([this] { listeningThreadLoopData(); });
// }

MessageHeader hdr;

Expand Down Expand Up @@ -556,6 +529,8 @@ void ServerConnection::handle() {
if (listeningThreadData.joinable()) {
listeningThreadData.join();
}
ASSERT(archiveQueue_.empty());
archiveQueue_.close();
}

void ServerConnection::handleException(std::exception_ptr e) {
Expand Down Expand Up @@ -600,11 +575,6 @@ void ServerConnection::archiver() {
// Start archive worker thread
archiveFuture_ = std::async(std::launch::async, [this] { return archiveThreadLoop(); });
}

// // Start data reader thread if double connection and we aren't already running it
// if (!single_ && !dataReader_.valid()) {
// dataReader_ = std::async(std::launch::async, [this] { return listeningThreadLoopData(); });
// }
}

void ServerConnection::waitForWorkers() {
Expand Down

0 comments on commit 4fb2629

Please sign in to comment.