Skip to content

Commit

Permalink
Use new Consumer API and add Consumer name
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Sep 15, 2022
1 parent e26585a commit a145dd3
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 9 deletions.
6 changes: 5 additions & 1 deletion async-nats/src/jetstream/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub trait IntoConsumerConfig {
}

#[allow(dead_code)]
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Consumer<T: IntoConsumerConfig> {
pub(crate) context: Context,
pub(crate) config: T,
Expand Down Expand Up @@ -241,6 +241,10 @@ pub struct Config {
/// to recover.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
/// A name of the consumer. Can be specified for both durable and ephemeral
/// consumers.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// A short description of the purpose of this consumer.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
Expand Down
6 changes: 6 additions & 0 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,10 @@ pub struct Config {
/// to recover.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
/// A name of the consumer. Can be specified for both durable and ephemeral
/// consumers.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// A short description of the purpose of this consumer.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
Expand Down Expand Up @@ -1430,6 +1434,7 @@ impl IntoConsumerConfig for Config {
fn into_consumer_config(self) -> consumer::Config {
jetstream::consumer::Config {
deliver_subject: None,
name: self.name,
durable_name: self.durable_name,
description: self.description,
deliver_group: None,
Expand Down Expand Up @@ -1463,6 +1468,7 @@ impl FromConsumer for Config {
}
Ok(Config {
durable_name: config.durable_name,
name: config.name,
description: config.description,
deliver_policy: config.deliver_policy,
ack_policy: config.ack_policy,
Expand Down
13 changes: 12 additions & 1 deletion async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ pub struct Config {
/// The delivery subject used by the push consumer.
#[serde(default)]
pub deliver_subject: String,

/// Setting `durable_name` to `Some(...)` will cause this consumer
/// to be "durable". This may be a good choice for workloads that
/// benefit from the `JetStream` server or cluster remembering the
Expand All @@ -143,6 +142,10 @@ pub struct Config {
/// to recover.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
/// A name of the consumer. Can be specified for both durable and ephemeral
/// consumers.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// A short description of the purpose of this consumer.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
Expand Down Expand Up @@ -205,6 +208,7 @@ impl FromConsumer for Config {
Ok(Config {
deliver_subject: config.deliver_subject.unwrap(),
durable_name: config.durable_name,
name: config.name,
description: config.description,
deliver_group: config.deliver_group,
deliver_policy: config.deliver_policy,
Expand All @@ -230,6 +234,7 @@ impl IntoConsumerConfig for Config {
jetstream::consumer::Config {
deliver_subject: Some(self.deliver_subject),
durable_name: self.durable_name,
name: self.name,
description: self.description,
deliver_group: self.deliver_group,
deliver_policy: self.deliver_policy,
Expand Down Expand Up @@ -269,6 +274,10 @@ pub struct OrderedConfig {
/// The delivery subject used by the push consumer.
#[serde(default)]
pub deliver_subject: String,
/// A name of the consumer. Can be specified for both durable and ephemeral
/// consumers.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// A short description of the purpose of this consumer.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
Expand Down Expand Up @@ -308,6 +317,7 @@ impl FromConsumer for OrderedConfig {
)));
}
Ok(OrderedConfig {
name: config.name,
deliver_subject: config.deliver_subject.unwrap(),
description: config.description,
filter_subject: config.filter_subject,
Expand All @@ -327,6 +337,7 @@ impl IntoConsumerConfig for OrderedConfig {
jetstream::consumer::Config {
deliver_subject: Some(self.deliver_subject),
durable_name: None,
name: self.name,
description: self.description,
deliver_group: None,
deliver_policy: self.deliver_policy,
Expand Down
3 changes: 3 additions & 0 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use serde_json::{self, json};
use std::borrow::Borrow;
use std::io::{self, ErrorKind};
use std::time::Duration;
use tracing::debug;

use super::kv::{Store, MAX_HISTORY};
use super::stream::{self, Config, DeleteStatus, Info, Stream};
Expand Down Expand Up @@ -561,6 +562,8 @@ impl Context {
{
let request = serde_json::to_vec(&payload).map(Bytes::from)?;

debug!("JetStream request sent: {:?}", request);

let message = self
.client
.request(format!("{}.{}", self.prefix, subject), request)
Expand Down
40 changes: 33 additions & 7 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,14 +578,40 @@ impl Stream {
config: C,
) -> Result<Consumer<C>, Error> {
let config = config.into_consumer_config();
let subject = if let Some(ref durable_name) = config.durable_name {
format!(
"CONSUMER.DURABLE.CREATE.{}.{}",
self.info.config.name, durable_name
)
} else {
format!("CONSUMER.CREATE.{}", self.info.config.name)

let subject = {
if self.context.client.is_server_compatible(2, 9, 0) {
let filter = if config.filter_subject.is_empty() {
"".to_string()
} else {
format!(".{}", config.filter_subject)
};
config
.name
.as_ref()
.or(config.durable_name.as_ref())
.map(|name| {
format!(
"CONSUMER.CREATE.{}.{}{}",
self.info.config.name, name, filter
)
})
.unwrap_or_else(|| format!("CONSUMER.CREATE.{}", self.info.config.name))
} else if config.name.is_some() {
return Err(Box::new(std::io::Error::new(
ErrorKind::Other,
"can't use consumer name with server below version 2.9",
)));
} else if let Some(ref durable_name) = config.durable_name {
format!(
"CONSUMER.DURABLE.CREATE.{}.{}",
self.info.config.name, durable_name
)
} else {
format!("CONSUMER.CREATE.{}", self.info.config.name)
}
};
println!("SUBJECT: {:?}", subject);

match self
.context
Expand Down
25 changes: 25 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,31 @@ mod jetstream {
})
.await
.unwrap();

let consumer = context
.get_or_create_stream("events")
.await
.unwrap()
.create_consumer(consumer::pull::Config {
name: Some("name".to_string()),
..Default::default()
})
.await
.unwrap();

assert_eq!("name".to_string(), consumer.cached_info().name);

context
.get_or_create_stream("events")
.await
.unwrap()
.create_consumer(consumer::pull::Config {
durable_name: Some("namex".to_string()),
name: Some("namey".to_string()),
..Default::default()
})
.await
.unwrap_err();
}
#[tokio::test]
async fn delete_consumer() {
Expand Down

0 comments on commit a145dd3

Please sign in to comment.