Skip to content

Commit

Permalink
Add subjects details to stream info
Browse files Browse the repository at this point in the history
  • Loading branch information
n1ghtmare committed Mar 2, 2023
1 parent 3f857b8 commit ca0cc47
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 3 deletions.
36 changes: 33 additions & 3 deletions async-nats/src/jetstream/stream.rs
Expand Up @@ -13,7 +13,6 @@
//
//! Manage operations on a [Stream], create/delete/update [Consumer][crate::jetstream::consumer::Consumer].

#[cfg(feature = "server-2.10")]
use std::collections::HashMap;
use std::{
fmt::Debug,
Expand Down Expand Up @@ -66,9 +65,37 @@ impl Stream {
/// # }
/// ```
pub async fn info(&mut self) -> Result<&Info, Error> {
self.make_info_request(&json!({})).await
}

/// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
/// [Stream] and returns it. This method returns the filtered subjects and the corresponding number of
/// messages for each.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let mut stream = jetstream
/// .get_stream("events").await?;
///
/// let info = stream.info_with_subjects("events.*").await?;
/// # Ok(())
/// # }
/// ```
pub async fn info_with_subjects(&mut self, filter: &str) -> Result<&Info, Error> {
self.make_info_request(&json!({ "subjects_filter": filter }))
.await
}

async fn make_info_request(&mut self, value: &serde_json::Value) -> Result<&Info, Error> {
let subject = format!("STREAM.INFO.{}", self.info.config.name);

match self.context.request(subject, &json!({})).await? {
match self.context.request(subject, value).await? {
Response::Ok::<Info>(info) => {
self.info = info;
Ok(&self.info)
Expand Down Expand Up @@ -1039,7 +1066,7 @@ pub struct DeleteStatus {
}

/// information about the given stream.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct State {
/// The number of messages contained in this stream
pub messages: u64,
Expand All @@ -1059,6 +1086,9 @@ pub struct State {
pub last_timestamp: time::OffsetDateTime,
/// The number of consumers configured to consume this stream
pub consumer_count: usize,
/// The number of messages for each subject specified in this steam
#[serde(skip_serializing_if = "Option::is_none")]
pub subjects: Option<HashMap<String, u64>>,
}

/// A raw stream message in the representation it is stored.
Expand Down
64 changes: 64 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Expand Up @@ -954,6 +954,70 @@ mod jetstream {
);
}

#[tokio::test]
async fn info_with_subjects() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let mut stream = context
.create_stream(stream::Config {
name: "foo".to_string(),
subjects: vec!["foo.*".to_string()],
..Default::default()
})
.await
.unwrap();

let payload = b"data";

context
.publish("foo.A".to_string(), payload.as_ref().into())
.await
.unwrap();
context
.publish("foo.A".to_string(), payload.as_ref().into())
.await
.unwrap();
context
.publish("foo.A".to_string(), payload.as_ref().into())
.await
.unwrap();

context
.publish("foo.B".to_string(), payload.as_ref().into())
.await
.unwrap();
context
.publish("foo.B".to_string(), payload.as_ref().into())
.await
.unwrap();

context
.publish("foo.C".to_string(), payload.as_ref().into())
.await
.unwrap();

let info = stream.info_with_subjects("foo.*").await.unwrap();
let subjects = info.state.subjects.as_ref().unwrap();

assert_eq!(subjects.keys().len(), 3);

assert!(subjects.contains_key("foo.A"));
assert!(subjects.contains_key("foo.B"));
assert!(subjects.contains_key("foo.C"));

assert_eq!(*subjects.get("foo.A").unwrap(), 3);
assert_eq!(*subjects.get("foo.B").unwrap(), 2);
assert_eq!(*subjects.get("foo.C").unwrap(), 1);

let info = stream.info_with_subjects("foo.A").await.unwrap();
let subjects = info.state.subjects.as_ref().unwrap();

assert_eq!(subjects.keys().len(), 1);
assert_eq!(*subjects.get("foo.A").unwrap(), 3);
}

#[tokio::test]
async fn create_consumer() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down

0 comments on commit ca0cc47

Please sign in to comment.