From 7ed31b3d266be7cebe13fedffddac6455f974409 Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Mon, 30 Oct 2023 22:55:17 +0100 Subject: [PATCH] [#3] current number of publishers|subscribers added to publish_subscribe port factory and removed from ports Signed-off-by: Christian Eltzschig --- .../src/port/details/publisher_connections.rs | 10 --- .../port/details/subscriber_connections.rs | 15 +--- elkodon/src/port/publisher.rs | 4 - elkodon/src/port/subscriber.rs | 4 - .../dynamic_config/publish_subscribe.rs | 8 ++ .../service/port_factory/publish_subscribe.rs | 20 +++++ elkodon/tests/publisher_tests.rs | 17 ---- .../tests/service_publish_subscribe_tests.rs | 78 +++++++++++++++++++ elkodon/tests/subscriber_tests.rs | 44 ----------- elkodon_bb/lock_free/src/mpmc/container.rs | 10 +++ .../lock_free/src/mpmc/unique_index_set.rs | 22 +++++- .../tests/mpmc_unique_index_set_tests.rs | 18 +++++ 12 files changed, 154 insertions(+), 96 deletions(-) delete mode 100644 elkodon/tests/subscriber_tests.rs diff --git a/elkodon/src/port/details/publisher_connections.rs b/elkodon/src/port/details/publisher_connections.rs index f8a55f5..004306e 100644 --- a/elkodon/src/port/details/publisher_connections.rs +++ b/elkodon/src/port/details/publisher_connections.rs @@ -122,16 +122,6 @@ impl<'global_config, Service: service::Details<'global_config>> Ok(()) } - pub fn number_of_publishers(&self) -> usize { - self.connections - .iter() - .filter(|&connection| match unsafe { &*connection.get() } { - None => false, - Some(c) => c.receiver.is_connected(), - }) - .count() - } - pub(crate) fn remove(&self, index: usize) { *self.get_mut(index) = None; } diff --git a/elkodon/src/port/details/subscriber_connections.rs b/elkodon/src/port/details/subscriber_connections.rs index f010796..b072584 100644 --- a/elkodon/src/port/details/subscriber_connections.rs +++ b/elkodon/src/port/details/subscriber_connections.rs @@ -3,7 +3,7 @@ use std::cell::UnsafeCell; use elkodon_bb_log::fail; use elkodon_cal::named_concept::NamedConceptBuilder; use elkodon_cal::zero_copy_connection::{ - ZeroCopyConnection, ZeroCopyConnectionBuilder, ZeroCopyCreationError, ZeroCopyPortDetails, + ZeroCopyConnection, ZeroCopyConnectionBuilder, ZeroCopyCreationError, }; use crate::service::connection_config; @@ -96,19 +96,6 @@ impl<'global_config, Service: service::Details<'global_config>> } } - pub(crate) fn number_of_subscribers(&self) -> usize { - self.connections - .iter() - .filter(|&connection| { - let connection = unsafe { &*connection.get() }; - match connection { - None => false, - Some(c) => c.sender.is_connected(), - } - }) - .count() - } - pub(crate) fn len(&self) -> usize { self.connections.len() } diff --git a/elkodon/src/port/publisher.rs b/elkodon/src/port/publisher.rs index 62bc6c4..a12d494 100644 --- a/elkodon/src/port/publisher.rs +++ b/elkodon/src/port/publisher.rs @@ -417,10 +417,6 @@ impl<'a, 'global_config: 'a, Service: service::Details<'global_config>, MessageT Ok(()) } - pub fn number_of_subscribers(&self) -> usize { - self.subscriber_connections.number_of_subscribers() - } - pub fn send<'publisher>( &'publisher self, sample: SampleMut<'a, 'publisher, 'global_config, Service, Header, MessageType>, diff --git a/elkodon/src/port/subscriber.rs b/elkodon/src/port/subscriber.rs index 93a385f..6ec7213 100644 --- a/elkodon/src/port/subscriber.rs +++ b/elkodon/src/port/subscriber.rs @@ -249,8 +249,4 @@ impl<'a, 'global_config: 'a, Service: service::Details<'global_config>, MessageT Ok(()) } - - pub fn number_of_publishers(&self) -> usize { - self.publisher_connections.number_of_publishers() - } } diff --git a/elkodon/src/service/dynamic_config/publish_subscribe.rs b/elkodon/src/service/dynamic_config/publish_subscribe.rs index da5fd13..972bd39 100644 --- a/elkodon/src/service/dynamic_config/publish_subscribe.rs +++ b/elkodon/src/service/dynamic_config/publish_subscribe.rs @@ -39,6 +39,14 @@ impl DynamicConfig { + Container::::memory_size(config.number_of_publishers) } + pub fn current_number_of_publishers(&self) -> usize { + self.publishers.len() + } + + pub fn current_number_of_subscribers(&self) -> usize { + self.subscribers.len() + } + pub fn number_of_supported_publishers(&self) -> usize { self.publishers.capacity() } diff --git a/elkodon/src/service/port_factory/publish_subscribe.rs b/elkodon/src/service/port_factory/publish_subscribe.rs index 3674fc7..c277bed 100644 --- a/elkodon/src/service/port_factory/publish_subscribe.rs +++ b/elkodon/src/service/port_factory/publish_subscribe.rs @@ -1,5 +1,7 @@ use std::{fmt::Debug, marker::PhantomData}; +use elkodon_cal::dynamic_storage::DynamicStorage; + use crate::service; use crate::service::service_name::ServiceName; @@ -40,6 +42,24 @@ impl<'global_config, Service: service::Details<'global_config>, MessageType: Deb self.service.state().static_config.service_name() } + pub fn current_number_of_publishers(&self) -> usize { + self.service + .state() + .dynamic_storage + .get() + .publish_subscribe() + .current_number_of_publishers() + } + + pub fn current_number_of_subscribers(&self) -> usize { + self.service + .state() + .dynamic_storage + .get() + .publish_subscribe() + .current_number_of_subscribers() + } + pub fn max_supported_publishers(&self) -> usize { self.service .state() diff --git a/elkodon/tests/publisher_tests.rs b/elkodon/tests/publisher_tests.rs index 1986559..a7b8a5f 100644 --- a/elkodon/tests/publisher_tests.rs +++ b/elkodon/tests/publisher_tests.rs @@ -26,23 +26,6 @@ mod publisher { service } - #[test] - fn number_of_subscriber_works() { - let service_name = generate_name(); - let service = Sut::new(&service_name) - .publish_subscribe() - .create::() - .unwrap(); - - let sut = service.publisher().create().unwrap(); - assert_that!(sut.number_of_subscribers(), eq 0); - - let _subscriber = service.subscriber().create(); - assert_that!(sut.number_of_subscribers(), eq 0); - assert_that!(sut.update_connections(), is_ok); - assert_that!(sut.number_of_subscribers(), eq 1); - } - #[test] fn publisher_can_borrow_multiple_sample_at_once() { let service_name = generate_name(); diff --git a/elkodon/tests/service_publish_subscribe_tests.rs b/elkodon/tests/service_publish_subscribe_tests.rs index 5da8c5d..bd89317 100644 --- a/elkodon/tests/service_publish_subscribe_tests.rs +++ b/elkodon/tests/service_publish_subscribe_tests.rs @@ -357,6 +357,84 @@ mod service_publish_subscribe { assert_that!(sut2.subscriber_buffer_size(), eq 13); } + #[test] + fn number_of_publishers_works() { + let service_name = generate_name(); + const MAX_PUBLISHERS: usize = 2; + + let sut = Sut::new(&service_name) + .publish_subscribe() + .max_publishers(MAX_PUBLISHERS) + .create::() + .unwrap(); + + let sut2 = Sut::new(&service_name) + .publish_subscribe() + .open::() + .unwrap(); + + let mut publishers = vec![]; + + for i in 0..MAX_PUBLISHERS / 2 { + publishers.push(sut.publisher().create().unwrap()); + assert_that!(sut.current_number_of_publishers(), eq 2 * i + 1); + assert_that!(sut2.current_number_of_publishers(), eq 2 * i + 1); + assert_that!(sut.current_number_of_subscribers(), eq 0); + assert_that!(sut2.current_number_of_subscribers(), eq 0); + + publishers.push(sut2.publisher().create().unwrap()); + assert_that!(sut.current_number_of_publishers(), eq 2 * i + 2); + assert_that!(sut2.current_number_of_publishers(), eq 2 * i + 2); + assert_that!(sut.current_number_of_subscribers(), eq 0); + assert_that!(sut2.current_number_of_subscribers(), eq 0); + } + + for i in 0..MAX_PUBLISHERS { + publishers.pop(); + assert_that!(sut.current_number_of_publishers(), eq MAX_PUBLISHERS - i - 1); + assert_that!(sut2.current_number_of_publishers(), eq MAX_PUBLISHERS - i - 1); + } + } + + #[test] + fn number_of_subscribers_works() { + let service_name = generate_name(); + const MAX_SUBSCRIBERS: usize = 8; + + let sut = Sut::new(&service_name) + .publish_subscribe() + .max_subscribers(MAX_SUBSCRIBERS) + .create::() + .unwrap(); + + let sut2 = Sut::new(&service_name) + .publish_subscribe() + .open::() + .unwrap(); + + let mut subscribers = vec![]; + + for i in 0..MAX_SUBSCRIBERS / 2 { + subscribers.push(sut.subscriber().create().unwrap()); + assert_that!(sut.current_number_of_subscribers(), eq 2 * i + 1); + assert_that!(sut2.current_number_of_subscribers(), eq 2 * i + 1); + assert_that!(sut.current_number_of_publishers(), eq 0); + assert_that!(sut2.current_number_of_publishers(), eq 0); + + subscribers.push(sut2.subscriber().create().unwrap()); + assert_that!(sut.current_number_of_subscribers(), eq 2 * i + 2); + assert_that!(sut2.current_number_of_subscribers(), eq 2 * i + 2); + assert_that!(sut.current_number_of_publishers(), eq 0); + assert_that!(sut2.current_number_of_publishers(), eq 0); + } + + for i in 0..MAX_SUBSCRIBERS { + subscribers.pop(); + assert_that!(sut.current_number_of_subscribers(), eq MAX_SUBSCRIBERS - i - 1); + assert_that!(sut2.current_number_of_subscribers(), eq MAX_SUBSCRIBERS - i - 1); + } + } + #[test] fn simple_communication_works_subscriber_created_first() { let service_name = generate_name(); diff --git a/elkodon/tests/subscriber_tests.rs b/elkodon/tests/subscriber_tests.rs deleted file mode 100644 index 8c3aef0..0000000 --- a/elkodon/tests/subscriber_tests.rs +++ /dev/null @@ -1,44 +0,0 @@ -#[generic_tests::define] -mod subscriber { - use elkodon::service::{service_name::ServiceName, Service}; - use elkodon_bb_container::semantic_string::*; - use elkodon_bb_posix::unique_system_id::UniqueSystemId; - use elkodon_bb_testing::assert_that; - - fn generate_name() -> ServiceName { - let mut service = ServiceName::new(b"service_tests_").unwrap(); - service - .push_bytes( - UniqueSystemId::new() - .unwrap() - .value() - .to_string() - .as_bytes(), - ) - .unwrap(); - service - } - - #[test] - fn number_of_publishers_works() { - let service_name = generate_name(); - let service = Sut::new(&service_name) - .publish_subscribe() - .create::() - .unwrap(); - - let sut = service.subscriber().create().unwrap(); - assert_that!(sut.number_of_publishers(), eq 0); - - let _publisher = service.publisher().create(); - assert_that!(sut.number_of_publishers(), eq 0); - assert_that!(sut.update_connections(), is_ok); - assert_that!(sut.number_of_publishers(), eq 1); - } - - #[instantiate_tests()] - mod zero_copy {} - - #[instantiate_tests()] - mod process_local {} -} diff --git a/elkodon_bb/lock_free/src/mpmc/container.rs b/elkodon_bb/lock_free/src/mpmc/container.rs index eb09e6e..1e94e2f 100644 --- a/elkodon_bb/lock_free/src/mpmc/container.rs +++ b/elkodon_bb/lock_free/src/mpmc/container.rs @@ -220,6 +220,16 @@ impl Container { self.capacity } + /// Returns the current len of the container + pub fn len(&self) -> usize { + self.index_set.borrowed_indices() + } + + /// Returns true if the container is empty, otherwise false + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Adds a new element to the [`Container`]. If there is no more space available it returns /// [`None`], otherwise [`Some`] containing the [`UniqueIndex`] to the underlying element. /// If the [`UniqueIndex`] goes out of scope the added element is removed. diff --git a/elkodon_bb/lock_free/src/mpmc/unique_index_set.rs b/elkodon_bb/lock_free/src/mpmc/unique_index_set.rs index 6e5ce1e..336f14c 100644 --- a/elkodon_bb/lock_free/src/mpmc/unique_index_set.rs +++ b/elkodon_bb/lock_free/src/mpmc/unique_index_set.rs @@ -78,7 +78,7 @@ use elkodon_bb_log::{fail, fatal_panic}; use std::alloc::Layout; use std::cell::UnsafeCell; use std::fmt::Debug; -use std::sync::atomic::{fence, AtomicBool, AtomicU64, Ordering}; +use std::sync::atomic::{fence, AtomicBool, AtomicU64, AtomicUsize, Ordering}; use tiny_fn::tiny_fn; tiny_fn! { @@ -186,6 +186,7 @@ impl Drop for UniqueIndex<'_> { pub struct UniqueIndexSet { data_ptr: RelocatablePointer>, capacity: u32, + borrowed_indices: AtomicUsize, pub(crate) head: AtomicU64, is_memory_initialized: AtomicBool, } @@ -198,6 +199,7 @@ impl RelocatableContainer for UniqueIndexSet { Self { data_ptr: RelocatablePointer::new_uninit(), capacity: capacity as u32, + borrowed_indices: AtomicUsize::new(0), head: AtomicU64::new(0), is_memory_initialized: AtomicBool::new(false), } @@ -229,6 +231,7 @@ impl RelocatableContainer for UniqueIndexSet { Self { data_ptr: RelocatablePointer::new(distance_to_data), capacity: capacity as u32, + borrowed_indices: AtomicUsize::new(0), head: AtomicU64::new(0), is_memory_initialized: AtomicBool::new(true), } @@ -293,6 +296,11 @@ impl UniqueIndexSet { self.capacity } + /// Returns the current len. + pub fn borrowed_indices(&self) -> usize { + self.borrowed_indices.load(Ordering::Relaxed) + } + /// Acquires a raw ([`u32`]) index from the [`UniqueIndexSet`]. Returns [`None`] when no more /// indices are available. The index **must** be returned manually with /// [`UniqueIndexSet::release_raw_index()`]. @@ -354,7 +362,7 @@ impl UniqueIndexSet { *self.get_next_free_index(index) = self.capacity + 1; fence(Ordering::Acquire); - + self.borrowed_indices.fetch_add(1, Ordering::Relaxed); Some(index) } @@ -387,7 +395,10 @@ impl UniqueIndexSet { .head .compare_exchange(old, new, Ordering::AcqRel, Ordering::Acquire) { - Ok(_) => return, + Ok(_) => { + self.borrowed_indices.fetch_sub(1, Ordering::Relaxed); + return; + } Err(v) => { old = v; Self::extract_head_and_aba(v) @@ -528,4 +539,9 @@ impl FixedSizeUniqueIndexSet { pub unsafe fn release_raw_index(&self, index: u32) { self.state.release_raw_index(index) } + + /// Returns the current len. + pub fn borrowed_indices(&self) -> usize { + self.state.borrowed_indices() + } } diff --git a/elkodon_bb/lock_free/tests/mpmc_unique_index_set_tests.rs b/elkodon_bb/lock_free/tests/mpmc_unique_index_set_tests.rs index ea517a0..6ebe1b7 100644 --- a/elkodon_bb/lock_free/tests/mpmc_unique_index_set_tests.rs +++ b/elkodon_bb/lock_free/tests/mpmc_unique_index_set_tests.rs @@ -69,6 +69,24 @@ fn mpmc_unique_index_set_acquire_and_release_works() { } } +#[test] +fn mpmc_unique_index_set_borrowed_indices_works() { + let sut = FixedSizeUniqueIndexSet::::new(); + let mut ids = vec![]; + + for i in 0..CAPACITY { + let e = sut.acquire(); + assert_that!(e, is_some); + ids.push(e.unwrap()); + assert_that!(sut.borrowed_indices(), eq i + 1); + } + + for i in 0..CAPACITY { + ids.pop(); + assert_that!(sut.borrowed_indices(), eq CAPACITY - i - 1); + } +} + #[test] fn mpmc_unique_index_set_acquire_and_release_works_with_uninitialized_memory() { init_stack!(