Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add subjects details to stream info #859

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 33 additions & 3 deletions async-nats/src/jetstream/stream.rs
Expand Up @@ -13,7 +13,6 @@
//
//! Manage operations on a [Stream], create/delete/update [Consumer][crate::jetstream::consumer::Consumer].

#[cfg(feature = "server-2.10")]
use std::collections::HashMap;
use std::{
fmt::Debug,
Expand Down Expand Up @@ -66,9 +65,37 @@ impl Stream {
/// # }
/// ```
pub async fn info(&mut self) -> Result<&Info, Error> {
self.make_info_request(&json!({})).await
}

/// Retrieves `info` about [Stream] from the server, updates the cached `info` inside
/// [Stream] and returns it. This method returns the filtered subjects and the corresponding number of
/// messages for each.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = async_nats::connect("localhost:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
///
/// let mut stream = jetstream
/// .get_stream("events").await?;
///
/// let info = stream.info_with_subjects("events.*").await?;
/// # Ok(())
/// # }
/// ```
pub async fn info_with_subjects(&mut self, filter: &str) -> Result<&Info, Error> {
self.make_info_request(&json!({ "subjects_filter": filter }))
.await
}

async fn make_info_request(&mut self, value: &serde_json::Value) -> Result<&Info, Error> {
let subject = format!("STREAM.INFO.{}", self.info.config.name);

match self.context.request(subject, &json!({})).await? {
match self.context.request(subject, value).await? {
Response::Ok::<Info>(info) => {
self.info = info;
Ok(&self.info)
Expand Down Expand Up @@ -1039,7 +1066,7 @@ pub struct DeleteStatus {
}

/// information about the given stream.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct State {
/// The number of messages contained in this stream
pub messages: u64,
Expand All @@ -1059,6 +1086,9 @@ pub struct State {
pub last_timestamp: time::OffsetDateTime,
/// The number of consumers configured to consume this stream
pub consumer_count: usize,
/// The number of messages for each subject specified in this steam
#[serde(skip_serializing_if = "Option::is_none")]
pub subjects: Option<HashMap<String, u64>>,
}

/// A raw stream message in the representation it is stored.
Expand Down
64 changes: 64 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Expand Up @@ -954,6 +954,70 @@ mod jetstream {
);
}

#[tokio::test]
async fn info_with_subjects() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let context = async_nats::jetstream::new(client);

let mut stream = context
.create_stream(stream::Config {
name: "foo".to_string(),
subjects: vec!["foo.*".to_string()],
..Default::default()
})
.await
.unwrap();

let payload = b"data";

context
.publish("foo.A".to_string(), payload.as_ref().into())
.await
.unwrap();
context
.publish("foo.A".to_string(), payload.as_ref().into())
.await
.unwrap();
context
.publish("foo.A".to_string(), payload.as_ref().into())
.await
.unwrap();

context
.publish("foo.B".to_string(), payload.as_ref().into())
.await
.unwrap();
context
.publish("foo.B".to_string(), payload.as_ref().into())
.await
.unwrap();

context
.publish("foo.C".to_string(), payload.as_ref().into())
.await
.unwrap();

let info = stream.info_with_subjects("foo.*").await.unwrap();
let subjects = info.state.subjects.as_ref().unwrap();

assert_eq!(subjects.keys().len(), 3);

assert!(subjects.contains_key("foo.A"));
assert!(subjects.contains_key("foo.B"));
assert!(subjects.contains_key("foo.C"));

assert_eq!(*subjects.get("foo.A").unwrap(), 3);
assert_eq!(*subjects.get("foo.B").unwrap(), 2);
assert_eq!(*subjects.get("foo.C").unwrap(), 1);

let info = stream.info_with_subjects("foo.A").await.unwrap();
let subjects = info.state.subjects.as_ref().unwrap();

assert_eq!(subjects.keys().len(), 1);
assert_eq!(*subjects.get("foo.A").unwrap(), 3);
}

#[tokio::test]
async fn create_consumer() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
Expand Down