diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index e54fe5c8151..e34b64a906d 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -552,7 +552,7 @@ impl Sender { /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); - new_receiver(shared) + new_receiver(shared, None) } /// Returns the number of active receivers @@ -642,7 +642,8 @@ impl Sender { } } -fn new_receiver(shared: Arc>) -> Receiver { +/// Create a new `Receiver` which reads starting from the tail if `next_pos` is not specified. +fn new_receiver(shared: Arc>, next_pos: Option) -> Receiver { let mut tail = shared.tail.lock(); if tail.rx_cnt == MAX_RECEIVERS { @@ -651,10 +652,9 @@ fn new_receiver(shared: Arc>) -> Receiver { tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); - let next = tail.pos; + let next = next_pos.unwrap_or(tail.pos); drop(tail); - Receiver { shared, next } } @@ -1022,6 +1022,13 @@ impl Drop for Receiver { } } +impl Clone for Receiver { + fn clone(&self) -> Self { + let shared = self.shared.clone(); + new_receiver(shared, Some(self.next)) + } +} + impl<'a, T> Recv<'a, T> { fn new(receiver: &'a mut Receiver) -> Recv<'a, T> { Recv {