diff --git a/async-nats/src/jetstream/stream.rs b/async-nats/src/jetstream/stream.rs index e52f13e91..896a7e029 100644 --- a/async-nats/src/jetstream/stream.rs +++ b/async-nats/src/jetstream/stream.rs @@ -13,7 +13,6 @@ // //! Manage operations on a [Stream], create/delete/update [Consumer][crate::jetstream::consumer::Consumer]. -#[cfg(feature = "server-2.10")] use std::collections::HashMap; use std::{ fmt::Debug, @@ -66,9 +65,37 @@ impl Stream { /// # } /// ``` pub async fn info(&mut self) -> Result<&Info, Error> { + self.make_info_request(&json!({})).await + } + + /// Retrieves `info` about [Stream] from the server, updates the cached `info` inside + /// [Stream] and returns it. This method returns the filtered subjects and the corresponding number of + /// messages for each. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// let client = async_nats::connect("localhost:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// + /// let mut stream = jetstream + /// .get_stream("events").await?; + /// + /// let info = stream.info_with_subjects("events.*").await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn info_with_subjects(&mut self, filter: &str) -> Result<&Info, Error> { + self.make_info_request(&json!({ "subjects_filter": filter })) + .await + } + + async fn make_info_request(&mut self, value: &serde_json::Value) -> Result<&Info, Error> { let subject = format!("STREAM.INFO.{}", self.info.config.name); - match self.context.request(subject, &json!({})).await? { + match self.context.request(subject, value).await? { Response::Ok::(info) => { self.info = info; Ok(&self.info) @@ -1039,7 +1066,7 @@ pub struct DeleteStatus { } /// information about the given stream. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct State { /// The number of messages contained in this stream pub messages: u64, @@ -1059,6 +1086,9 @@ pub struct State { pub last_timestamp: time::OffsetDateTime, /// The number of consumers configured to consume this stream pub consumer_count: usize, + /// The number of messages for each subject specified in this steam + #[serde(skip_serializing_if = "Option::is_none")] + pub subjects: Option>, } /// A raw stream message in the representation it is stored. diff --git a/async-nats/tests/jetstream_tests.rs b/async-nats/tests/jetstream_tests.rs index f34efd810..f225deba1 100644 --- a/async-nats/tests/jetstream_tests.rs +++ b/async-nats/tests/jetstream_tests.rs @@ -954,6 +954,70 @@ mod jetstream { ); } + #[tokio::test] + async fn info_with_subjects() { + let server = nats_server::run_server("tests/configs/jetstream.conf"); + let client = async_nats::connect(server.client_url()).await.unwrap(); + let context = async_nats::jetstream::new(client); + + let mut stream = context + .create_stream(stream::Config { + name: "foo".to_string(), + subjects: vec!["foo.*".to_string()], + ..Default::default() + }) + .await + .unwrap(); + + let payload = b"data"; + + context + .publish("foo.A".to_string(), payload.as_ref().into()) + .await + .unwrap(); + context + .publish("foo.A".to_string(), payload.as_ref().into()) + .await + .unwrap(); + context + .publish("foo.A".to_string(), payload.as_ref().into()) + .await + .unwrap(); + + context + .publish("foo.B".to_string(), payload.as_ref().into()) + .await + .unwrap(); + context + .publish("foo.B".to_string(), payload.as_ref().into()) + .await + .unwrap(); + + context + .publish("foo.C".to_string(), payload.as_ref().into()) + .await + .unwrap(); + + let info = stream.info_with_subjects("foo.*").await.unwrap(); + let subjects = info.state.subjects.as_ref().unwrap(); + + assert_eq!(subjects.keys().len(), 3); + + assert!(subjects.contains_key("foo.A")); + assert!(subjects.contains_key("foo.B")); + assert!(subjects.contains_key("foo.C")); + + assert_eq!(*subjects.get("foo.A").unwrap(), 3); + assert_eq!(*subjects.get("foo.B").unwrap(), 2); + assert_eq!(*subjects.get("foo.C").unwrap(), 1); + + let info = stream.info_with_subjects("foo.A").await.unwrap(); + let subjects = info.state.subjects.as_ref().unwrap(); + + assert_eq!(subjects.keys().len(), 1); + assert_eq!(*subjects.get("foo.A").unwrap(), 3); + } + #[tokio::test] async fn create_consumer() { let server = nats_server::run_server("tests/configs/jetstream.conf");