Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
[#3] current number of publishers|subscribers added to publish_subscr…
Browse files Browse the repository at this point in the history
…ibe port factory and removed from ports

Signed-off-by: Christian Eltzschig <[email protected]>
  • Loading branch information
elfenpiff committed Oct 31, 2023
1 parent 8ef93d1 commit 7ed31b3
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 96 deletions.
10 changes: 0 additions & 10 deletions elkodon/src/port/details/publisher_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,6 @@ impl<'global_config, Service: service::Details<'global_config>>
Ok(())
}

pub fn number_of_publishers(&self) -> usize {
self.connections
.iter()
.filter(|&connection| match unsafe { &*connection.get() } {
None => false,
Some(c) => c.receiver.is_connected(),
})
.count()
}

pub(crate) fn remove(&self, index: usize) {
*self.get_mut(index) = None;
}
Expand Down
15 changes: 1 addition & 14 deletions elkodon/src/port/details/subscriber_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::cell::UnsafeCell;
use elkodon_bb_log::fail;
use elkodon_cal::named_concept::NamedConceptBuilder;
use elkodon_cal::zero_copy_connection::{
ZeroCopyConnection, ZeroCopyConnectionBuilder, ZeroCopyCreationError, ZeroCopyPortDetails,
ZeroCopyConnection, ZeroCopyConnectionBuilder, ZeroCopyCreationError,
};

use crate::service::connection_config;
Expand Down Expand Up @@ -96,19 +96,6 @@ impl<'global_config, Service: service::Details<'global_config>>
}
}

pub(crate) fn number_of_subscribers(&self) -> usize {
self.connections
.iter()
.filter(|&connection| {
let connection = unsafe { &*connection.get() };
match connection {
None => false,
Some(c) => c.sender.is_connected(),
}
})
.count()
}

pub(crate) fn len(&self) -> usize {
self.connections.len()
}
Expand Down
4 changes: 0 additions & 4 deletions elkodon/src/port/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,6 @@ impl<'a, 'global_config: 'a, Service: service::Details<'global_config>, MessageT
Ok(())
}

pub fn number_of_subscribers(&self) -> usize {
self.subscriber_connections.number_of_subscribers()
}

pub fn send<'publisher>(
&'publisher self,
sample: SampleMut<'a, 'publisher, 'global_config, Service, Header, MessageType>,
Expand Down
4 changes: 0 additions & 4 deletions elkodon/src/port/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,4 @@ impl<'a, 'global_config: 'a, Service: service::Details<'global_config>, MessageT

Ok(())
}

pub fn number_of_publishers(&self) -> usize {
self.publisher_connections.number_of_publishers()
}
}
8 changes: 8 additions & 0 deletions elkodon/src/service/dynamic_config/publish_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ impl DynamicConfig {
+ Container::<UniquePublisherId>::memory_size(config.number_of_publishers)
}

pub fn current_number_of_publishers(&self) -> usize {
self.publishers.len()
}

pub fn current_number_of_subscribers(&self) -> usize {
self.subscribers.len()
}

pub fn number_of_supported_publishers(&self) -> usize {
self.publishers.capacity()
}
Expand Down
20 changes: 20 additions & 0 deletions elkodon/src/service/port_factory/publish_subscribe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{fmt::Debug, marker::PhantomData};

use elkodon_cal::dynamic_storage::DynamicStorage;

use crate::service;
use crate::service::service_name::ServiceName;

Expand Down Expand Up @@ -40,6 +42,24 @@ impl<'global_config, Service: service::Details<'global_config>, MessageType: Deb
self.service.state().static_config.service_name()
}

pub fn current_number_of_publishers(&self) -> usize {
self.service
.state()
.dynamic_storage
.get()
.publish_subscribe()
.current_number_of_publishers()
}

pub fn current_number_of_subscribers(&self) -> usize {
self.service
.state()
.dynamic_storage
.get()
.publish_subscribe()
.current_number_of_subscribers()
}

pub fn max_supported_publishers(&self) -> usize {
self.service
.state()
Expand Down
17 changes: 0 additions & 17 deletions elkodon/tests/publisher_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,6 @@ mod publisher {
service
}

#[test]
fn number_of_subscriber_works<Sut: Service>() {
let service_name = generate_name();
let service = Sut::new(&service_name)
.publish_subscribe()
.create::<u64>()
.unwrap();

let sut = service.publisher().create().unwrap();
assert_that!(sut.number_of_subscribers(), eq 0);

let _subscriber = service.subscriber().create();
assert_that!(sut.number_of_subscribers(), eq 0);
assert_that!(sut.update_connections(), is_ok);
assert_that!(sut.number_of_subscribers(), eq 1);
}

#[test]
fn publisher_can_borrow_multiple_sample_at_once<Sut: Service>() {
let service_name = generate_name();
Expand Down
78 changes: 78 additions & 0 deletions elkodon/tests/service_publish_subscribe_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,84 @@ mod service_publish_subscribe {
assert_that!(sut2.subscriber_buffer_size(), eq 13);
}

#[test]
fn number_of_publishers_works<Sut: Service>() {
let service_name = generate_name();
const MAX_PUBLISHERS: usize = 2;

let sut = Sut::new(&service_name)
.publish_subscribe()
.max_publishers(MAX_PUBLISHERS)
.create::<u64>()
.unwrap();

let sut2 = Sut::new(&service_name)
.publish_subscribe()
.open::<u64>()
.unwrap();

let mut publishers = vec![];

for i in 0..MAX_PUBLISHERS / 2 {
publishers.push(sut.publisher().create().unwrap());
assert_that!(sut.current_number_of_publishers(), eq 2 * i + 1);
assert_that!(sut2.current_number_of_publishers(), eq 2 * i + 1);
assert_that!(sut.current_number_of_subscribers(), eq 0);
assert_that!(sut2.current_number_of_subscribers(), eq 0);

publishers.push(sut2.publisher().create().unwrap());
assert_that!(sut.current_number_of_publishers(), eq 2 * i + 2);
assert_that!(sut2.current_number_of_publishers(), eq 2 * i + 2);
assert_that!(sut.current_number_of_subscribers(), eq 0);
assert_that!(sut2.current_number_of_subscribers(), eq 0);
}

for i in 0..MAX_PUBLISHERS {
publishers.pop();
assert_that!(sut.current_number_of_publishers(), eq MAX_PUBLISHERS - i - 1);
assert_that!(sut2.current_number_of_publishers(), eq MAX_PUBLISHERS - i - 1);
}
}

#[test]
fn number_of_subscribers_works<Sut: Service>() {
let service_name = generate_name();
const MAX_SUBSCRIBERS: usize = 8;

let sut = Sut::new(&service_name)
.publish_subscribe()
.max_subscribers(MAX_SUBSCRIBERS)
.create::<u64>()
.unwrap();

let sut2 = Sut::new(&service_name)
.publish_subscribe()
.open::<u64>()
.unwrap();

let mut subscribers = vec![];

for i in 0..MAX_SUBSCRIBERS / 2 {
subscribers.push(sut.subscriber().create().unwrap());
assert_that!(sut.current_number_of_subscribers(), eq 2 * i + 1);
assert_that!(sut2.current_number_of_subscribers(), eq 2 * i + 1);
assert_that!(sut.current_number_of_publishers(), eq 0);
assert_that!(sut2.current_number_of_publishers(), eq 0);

subscribers.push(sut2.subscriber().create().unwrap());
assert_that!(sut.current_number_of_subscribers(), eq 2 * i + 2);
assert_that!(sut2.current_number_of_subscribers(), eq 2 * i + 2);
assert_that!(sut.current_number_of_publishers(), eq 0);
assert_that!(sut2.current_number_of_publishers(), eq 0);
}

for i in 0..MAX_SUBSCRIBERS {
subscribers.pop();
assert_that!(sut.current_number_of_subscribers(), eq MAX_SUBSCRIBERS - i - 1);
assert_that!(sut2.current_number_of_subscribers(), eq MAX_SUBSCRIBERS - i - 1);
}
}

#[test]
fn simple_communication_works_subscriber_created_first<Sut: Service>() {
let service_name = generate_name();
Expand Down
44 changes: 0 additions & 44 deletions elkodon/tests/subscriber_tests.rs

This file was deleted.

10 changes: 10 additions & 0 deletions elkodon_bb/lock_free/src/mpmc/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,16 @@ impl<T: Copy + Debug> Container<T> {
self.capacity
}

/// Returns the current len of the container
pub fn len(&self) -> usize {
self.index_set.borrowed_indices()
}

/// Returns true if the container is empty, otherwise false
pub fn is_empty(&self) -> bool {
self.len() == 0
}

/// Adds a new element to the [`Container`]. If there is no more space available it returns
/// [`None`], otherwise [`Some`] containing the [`UniqueIndex`] to the underlying element.
/// If the [`UniqueIndex`] goes out of scope the added element is removed.
Expand Down
22 changes: 19 additions & 3 deletions elkodon_bb/lock_free/src/mpmc/unique_index_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ use elkodon_bb_log::{fail, fatal_panic};
use std::alloc::Layout;
use std::cell::UnsafeCell;
use std::fmt::Debug;
use std::sync::atomic::{fence, AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{fence, AtomicBool, AtomicU64, AtomicUsize, Ordering};
use tiny_fn::tiny_fn;

tiny_fn! {
Expand Down Expand Up @@ -186,6 +186,7 @@ impl Drop for UniqueIndex<'_> {
pub struct UniqueIndexSet {
data_ptr: RelocatablePointer<UnsafeCell<u32>>,
capacity: u32,
borrowed_indices: AtomicUsize,
pub(crate) head: AtomicU64,
is_memory_initialized: AtomicBool,
}
Expand All @@ -198,6 +199,7 @@ impl RelocatableContainer for UniqueIndexSet {
Self {
data_ptr: RelocatablePointer::new_uninit(),
capacity: capacity as u32,
borrowed_indices: AtomicUsize::new(0),
head: AtomicU64::new(0),
is_memory_initialized: AtomicBool::new(false),
}
Expand Down Expand Up @@ -229,6 +231,7 @@ impl RelocatableContainer for UniqueIndexSet {
Self {
data_ptr: RelocatablePointer::new(distance_to_data),
capacity: capacity as u32,
borrowed_indices: AtomicUsize::new(0),
head: AtomicU64::new(0),
is_memory_initialized: AtomicBool::new(true),
}
Expand Down Expand Up @@ -293,6 +296,11 @@ impl UniqueIndexSet {
self.capacity
}

/// Returns the current len.
pub fn borrowed_indices(&self) -> usize {
self.borrowed_indices.load(Ordering::Relaxed)
}

/// Acquires a raw ([`u32`]) index from the [`UniqueIndexSet`]. Returns [`None`] when no more
/// indices are available. The index **must** be returned manually with
/// [`UniqueIndexSet::release_raw_index()`].
Expand Down Expand Up @@ -354,7 +362,7 @@ impl UniqueIndexSet {
*self.get_next_free_index(index) = self.capacity + 1;

fence(Ordering::Acquire);

self.borrowed_indices.fetch_add(1, Ordering::Relaxed);
Some(index)
}

Expand Down Expand Up @@ -387,7 +395,10 @@ impl UniqueIndexSet {
.head
.compare_exchange(old, new, Ordering::AcqRel, Ordering::Acquire)
{
Ok(_) => return,
Ok(_) => {
self.borrowed_indices.fetch_sub(1, Ordering::Relaxed);
return;
}
Err(v) => {
old = v;
Self::extract_head_and_aba(v)
Expand Down Expand Up @@ -528,4 +539,9 @@ impl<const CAPACITY: usize> FixedSizeUniqueIndexSet<CAPACITY> {
pub unsafe fn release_raw_index(&self, index: u32) {
self.state.release_raw_index(index)
}

/// Returns the current len.
pub fn borrowed_indices(&self) -> usize {
self.state.borrowed_indices()
}
}
18 changes: 18 additions & 0 deletions elkodon_bb/lock_free/tests/mpmc_unique_index_set_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ fn mpmc_unique_index_set_acquire_and_release_works() {
}
}

#[test]
fn mpmc_unique_index_set_borrowed_indices_works() {
let sut = FixedSizeUniqueIndexSet::<CAPACITY>::new();
let mut ids = vec![];

for i in 0..CAPACITY {
let e = sut.acquire();
assert_that!(e, is_some);
ids.push(e.unwrap());
assert_that!(sut.borrowed_indices(), eq i + 1);
}

for i in 0..CAPACITY {
ids.pop();
assert_that!(sut.borrowed_indices(), eq CAPACITY - i - 1);
}
}

#[test]
fn mpmc_unique_index_set_acquire_and_release_works_with_uninitialized_memory() {
init_stack!(
Expand Down

0 comments on commit 7ed31b3

Please sign in to comment.