Skip to content

Commit

Permalink
Add unsubscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Apr 22, 2022
1 parent 106f6d7 commit 5fe49f0
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 12 deletions.
94 changes: 82 additions & 12 deletions async-nats/src/lib.rs
Expand Up @@ -228,7 +228,7 @@ pub enum ClientOp {
subject: String,
},
Unsubscribe {
sid: u64,
id: u64,
},
Ping,
Pong,
Expand Down Expand Up @@ -464,10 +464,10 @@ impl Connection {
self.stream.flush().await?;
}

ClientOp::Unsubscribe { sid } => {
ClientOp::Unsubscribe { id } => {
self.stream.write_all(b"UNSUB ").await?;
self.stream
.write_all(format!("{}\r\n", sid).as_bytes())
.write_all(format!("{}\r\n", id).as_bytes())
.await?;
}
ClientOp::Ping => {
Expand Down Expand Up @@ -500,29 +500,43 @@ struct Subscription {
#[derive(Debug)]
struct SubscriptionContext {
next_sid: u64,
next_uid: u64,
subscription_map: HashMap<u64, Subscription>,
uid_map: HashMap<u64, u64>,
}

impl SubscriptionContext {
pub fn new() -> SubscriptionContext {
fn new() -> SubscriptionContext {
SubscriptionContext {
next_sid: 1,
next_uid: 1,
subscription_map: HashMap::new(),
uid_map: HashMap::new(),
}
}

pub fn get(&mut self, sid: u64) -> Option<&Subscription> {
fn get(&mut self, sid: u64) -> Option<&Subscription> {
self.subscription_map.get(&sid)
}

pub fn insert(&mut self, subscription: Subscription) -> u64 {
fn insert(&mut self, subscription: Subscription) -> u64 {
let sid = self.next_sid;
let uid = self.next_uid;
self.next_sid += 1;
self.next_uid += 1;

self.subscription_map.insert(sid, subscription);
self.uid_map.insert(uid, sid);

sid
}
fn remove(&mut self, sid: u64) -> bool {
self.subscription_map.remove(&sid).is_some()
}

fn get_sid(&self, uid: u64) -> Option<u64> {
self.uid_map.get(&uid).copied()
}
}

/// A connector which facilitates communication from channels to a single shared connection.
Expand Down Expand Up @@ -554,6 +568,26 @@ impl Connector {
maybe_op = receiver.recv().fuse() => {
match maybe_op {
Some(op) => {
// until we have separeted commands and op, let's just intercept
// Unsubscibe and replace Subscription uid with sid
if let ClientOp::Unsubscribe{id} = op {
let mut context = self.subscription_context.lock().await;
let sid = {
let sid = context.get_sid(id);
match sid {
Some(sid) => sid,
None => continue
}
};

context.remove(sid);

if let Err(err) = self.connection.write_op(ClientOp::Unsubscribe { id: sid }).await {
println!("Send failed with {:?}", err);
}
continue

}
if let Err(err) = self.connection.write_op(op).await {
println!("Send failed with {:?}", err);
}
Expand Down Expand Up @@ -581,7 +615,14 @@ impl Connector {
payload,
};

subscription.sender.send(message).await.unwrap();
// if the channel for subscription was dropped, remove the
// subscription from the map and unsubscribe.
if subscription.sender.send(message).await.is_err() {
context.remove(sid);
self.connection.write_op(ClientOp::Unsubscribe { id: sid }).await?;
self.connection.stream.flush().await?;
}

}
}

Expand Down Expand Up @@ -696,7 +737,7 @@ impl Client {
.await
.unwrap();

Ok(Subscriber::new(sid, receiver))
Ok(Subscriber::new(sid, self.sender.clone(), receiver))
}

pub async fn flush(&mut self) -> Result<(), Error> {
Expand Down Expand Up @@ -831,23 +872,52 @@ pub struct Message {
/// # }
/// ```
pub struct Subscriber {
_sid: u64,
uid: u64,
receiver: mpsc::Receiver<Message>,
sender: mpsc::Sender<ClientOp>,
}

impl Subscriber {
fn new(sid: u64, receiver: mpsc::Receiver<Message>) -> Subscriber {
fn new(
uid: u64,
sender: mpsc::Sender<ClientOp>,
receiver: mpsc::Receiver<Message>,
) -> Subscriber {
Subscriber {
_sid: sid,
uid,
sender,
receiver,
}
}

/// Unsubscribes from subscription, draining all remaining messages.
///
/// # Examples
/// ```
/// # #[tokio::main]
/// # async fn unsubscribe() -> Result<(), Box<dyn std::error::Error>> {
/// let mut client = async_nats::connect("demo.nats.io").await?;
///
/// let mut subscriber = client.subscribe("foo".into()).await?;
///
/// subscriber.unsubscribe();
/// # Ok(())
/// # }
pub fn unsubscribe(self) {
drop(self)
}
}

impl Drop for Subscriber {
fn drop(&mut self) {
// Can we get away with just closing, and then handling that on the sender side?
self.receiver.close();
tokio::spawn({
let sender = self.sender.clone();
let id = self.uid;
async move {
sender.send(ClientOp::Unsubscribe { id }).await.ok();
}
});
}
}

Expand Down
19 changes: 19 additions & 0 deletions async-nats/tests/client_tests.rs
Expand Up @@ -126,4 +126,23 @@ mod client {
.unwrap();
assert_eq!(resp.unwrap().payload, Bytes::from("reply"));
}

#[tokio::test]
async fn unsubscribe() {
let server = nats_server::run_basic_server();
let mut client = async_nats::connect(server.client_url()).await.unwrap();

let mut sub = client.subscribe("test".into()).await.unwrap();

client.publish("test".into(), "data".into()).await.unwrap();
client.flush().await.unwrap();

assert!(sub.next().await.is_some());
sub.unsubscribe();
// check if we can still send messages after unsubscribe.
let mut sub2 = client.subscribe("test2".into()).await.unwrap();
client.publish("test2".into(), "data".into()).await.unwrap();
client.flush().await.unwrap();
assert!(sub2.next().await.is_some());
}
}

0 comments on commit 5fe49f0

Please sign in to comment.