diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index faf97b1db8..a0af9b25f7 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -75,12 +75,10 @@ use crate::{ mod metrics; mod node_map; mod relay_actor; -mod timer; mod udp_conn; pub use node_map::Source; -pub(super) use self::timer::Timer; pub use self::{ metrics::Metrics, node_map::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo}, diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index d116be6695..3a2f76f610 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -12,6 +12,7 @@ use iroh_relay::protos::stun; use netwatch::ip::is_unicast_link_local; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; +use tokio_util::task::AbortOnDropHandle; use tracing::{debug, event, info, instrument, trace, warn, Level}; use super::{ @@ -24,7 +25,7 @@ use super::{ use crate::endpoint::PathSelection; use crate::{ disco::{self, SendAddr}, - magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, Timer, HEARTBEAT_INTERVAL}, + magicsock::{ActorMessage, MagicsockMetrics, QuicMappedAddr, HEARTBEAT_INTERVAL}, watchable::{Watchable, Watcher}, }; @@ -526,19 +527,20 @@ impl NodeState { } let id = self.id; - let timer = Timer::after(PING_TIMEOUT_DURATION, async move { + let _expiry_task = AbortOnDropHandle::new(tokio::spawn(async move { + tokio::time::sleep(PING_TIMEOUT_DURATION).await; sender .send(ActorMessage::EndpointPingExpired(id, tx_id)) .await .ok(); - }); + })); self.sent_pings.insert( tx_id, SentPing { to, at: now, purpose, - timer, + _expiry_task, }, ); } @@ -887,8 +889,6 @@ impl NodeState { None } Some(sp) => { - sp.timer.abort(); - let mut node_map_insert = None; let now = Instant::now(); @@ -1230,7 +1230,7 @@ pub(super) struct SentPing { pub(super) at: Instant, #[allow(dead_code)] pub(super) purpose: DiscoPingPurpose, - pub(super) timer: Timer, + pub(super) _expiry_task: AbortOnDropHandle<()>, } /// The reason why a discovery ping message was sent. diff --git a/iroh/src/magicsock/timer.rs b/iroh/src/magicsock/timer.rs deleted file mode 100644 index 0ffeee4896..0000000000 --- a/iroh/src/magicsock/timer.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::{future::Future, time::Duration}; - -use tokio::{task::JoinHandle, time}; - -/// A timer that works similar to golangs `Timer`. -#[derive(Debug)] -pub struct Timer { - t: JoinHandle<()>, -} - -impl Timer { - /// Will trigger the execution of `f` after time `d` once. - pub fn after(d: Duration, f: F) -> Self - where - F: Future + Send + Sync + 'static, - { - let t = tokio::task::spawn(async move { - time::sleep(d).await; - f.await - }); - - Timer { t } - } - - /// Abort the timer. - pub fn abort(self) { - self.t.abort(); - } -} - -impl Future for Timer { - type Output = (); - - fn poll( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - std::pin::Pin::new(&mut self.t).poll(cx).map(|_| ()) - } -} - -#[cfg(test)] -mod tests { - use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }; - - use super::*; - - #[tokio::test(flavor = "current_thread", start_paused = true)] - async fn test_timer_success() { - let val = Arc::new(AtomicBool::new(false)); - - assert!(!val.load(Ordering::Relaxed)); - - let moved_val = val.clone(); - let timer = Timer::after(Duration::from_millis(10), async move { - moved_val.store(true, Ordering::Relaxed); - }); - - assert!(!val.load(Ordering::Relaxed)); - - timer.await; - assert!(val.load(Ordering::Relaxed)); - } - - #[tokio::test(flavor = "current_thread", start_paused = true)] - async fn test_timer_abort() { - let val = Arc::new(AtomicBool::new(false)); - - assert!(!val.load(Ordering::Relaxed)); - - let moved_val = val.clone(); - let timer = Timer::after(Duration::from_millis(10), async move { - moved_val.store(true, Ordering::Relaxed); - }); - - assert!(!val.load(Ordering::Relaxed)); - timer.abort(); - assert!(!val.load(Ordering::Relaxed)); - } - - #[tokio::test(flavor = "current_thread", start_paused = true)] - async fn test_timer_abort_late() { - let val = Arc::new(AtomicBool::new(false)); - - assert!(!val.load(Ordering::Relaxed)); - - let moved_val = val.clone(); - let timer = Timer::after(Duration::from_millis(50), async move { - moved_val.store(true, Ordering::Relaxed); - }); - - assert!(!val.load(Ordering::Relaxed)); - time::sleep(Duration::from_millis(75)).await; - - timer.abort(); - assert!(val.load(Ordering::Relaxed)); - } -}