Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new Consumer API and add Consumer name #637

Merged
merged 3 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion async-nats/src/jetstream/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub trait IntoConsumerConfig {
}

#[allow(dead_code)]
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Consumer<T: IntoConsumerConfig> {
pub(crate) context: Context,
pub(crate) config: T,
Expand Down Expand Up @@ -241,6 +241,10 @@ pub struct Config {
/// to recover.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
/// A name of the consumer. Can be specified for both durable and ephemeral
/// consumers.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// A short description of the purpose of this consumer.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
Expand Down
6 changes: 6 additions & 0 deletions async-nats/src/jetstream/consumer/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,10 @@ pub struct Config {
/// to recover.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
/// A name of the consumer. Can be specified for both durable and ephemeral
/// consumers.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// A short description of the purpose of this consumer.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
Expand Down Expand Up @@ -1430,6 +1434,7 @@ impl IntoConsumerConfig for Config {
fn into_consumer_config(self) -> consumer::Config {
jetstream::consumer::Config {
deliver_subject: None,
name: self.name,
durable_name: self.durable_name,
description: self.description,
deliver_group: None,
Expand Down Expand Up @@ -1463,6 +1468,7 @@ impl FromConsumer for Config {
}
Ok(Config {
durable_name: config.durable_name,
name: config.name,
description: config.description,
deliver_policy: config.deliver_policy,
ack_policy: config.ack_policy,
Expand Down
13 changes: 12 additions & 1 deletion async-nats/src/jetstream/consumer/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ pub struct Config {
/// The delivery subject used by the push consumer.
#[serde(default)]
pub deliver_subject: String,

/// Setting `durable_name` to `Some(...)` will cause this consumer
/// to be "durable". This may be a good choice for workloads that
/// benefit from the `JetStream` server or cluster remembering the
Expand All @@ -143,6 +142,10 @@ pub struct Config {
/// to recover.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub durable_name: Option<String>,
/// A name of the consumer. Can be specified for both durable and ephemeral
/// consumers.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// A short description of the purpose of this consumer.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
Expand Down Expand Up @@ -205,6 +208,7 @@ impl FromConsumer for Config {
Ok(Config {
deliver_subject: config.deliver_subject.unwrap(),
durable_name: config.durable_name,
name: config.name,
description: config.description,
deliver_group: config.deliver_group,
deliver_policy: config.deliver_policy,
Expand All @@ -230,6 +234,7 @@ impl IntoConsumerConfig for Config {
jetstream::consumer::Config {
deliver_subject: Some(self.deliver_subject),
durable_name: self.durable_name,
name: self.name,
description: self.description,
deliver_group: self.deliver_group,
deliver_policy: self.deliver_policy,
Expand Down Expand Up @@ -269,6 +274,10 @@ pub struct OrderedConfig {
/// The delivery subject used by the push consumer.
#[serde(default)]
pub deliver_subject: String,
/// A name of the consumer. Can be specified for both durable and ephemeral
/// consumers.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// A short description of the purpose of this consumer.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
Expand Down Expand Up @@ -308,6 +317,7 @@ impl FromConsumer for OrderedConfig {
)));
}
Ok(OrderedConfig {
name: config.name,
deliver_subject: config.deliver_subject.unwrap(),
description: config.description,
filter_subject: config.filter_subject,
Expand All @@ -327,6 +337,7 @@ impl IntoConsumerConfig for OrderedConfig {
jetstream::consumer::Config {
deliver_subject: Some(self.deliver_subject),
durable_name: None,
name: self.name,
description: self.description,
deliver_group: None,
deliver_policy: self.deliver_policy,
Expand Down
3 changes: 3 additions & 0 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use serde_json::{self, json};
use std::borrow::Borrow;
use std::io::{self, ErrorKind};
use std::time::Duration;
use tracing::debug;

use super::kv::{Store, MAX_HISTORY};
use super::stream::{self, Config, DeleteStatus, Info, Stream};
Expand Down Expand Up @@ -561,6 +562,8 @@ impl Context {
{
let request = serde_json::to_vec(&payload).map(Bytes::from)?;

debug!("JetStream request sent: {:?}", request);

let message = self
.client
.request(format!("{}.{}", self.prefix, subject), request)
Expand Down
40 changes: 33 additions & 7 deletions async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,14 +318,40 @@ impl Stream {
config: C,
) -> Result<Consumer<C>, Error> {
let config = config.into_consumer_config();
let subject = if let Some(ref durable_name) = config.durable_name {
format!(
"CONSUMER.DURABLE.CREATE.{}.{}",
self.info.config.name, durable_name
)
} else {
format!("CONSUMER.CREATE.{}", self.info.config.name)

let subject = {
if self.context.client.is_server_compatible(2, 9, 0) {
let filter = if config.filter_subject.is_empty() {
"".to_string()
} else {
format!(".{}", config.filter_subject)
};
config
.name
.as_ref()
.or(config.durable_name.as_ref())
.map(|name| {
format!(
"CONSUMER.CREATE.{}.{}{}",
self.info.config.name, name, filter
)
})
.unwrap_or_else(|| format!("CONSUMER.CREATE.{}", self.info.config.name))
} else if config.name.is_some() {
return Err(Box::new(std::io::Error::new(
ErrorKind::Other,
"can't use consumer name with server below version 2.9",
)));
} else if let Some(ref durable_name) = config.durable_name {
format!(
"CONSUMER.DURABLE.CREATE.{}.{}",
self.info.config.name, durable_name
)
} else {
format!("CONSUMER.CREATE.{}", self.info.config.name)
}
};
println!("SUBJECT: {:?}", subject);

match self
.context
Expand Down
25 changes: 25 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,31 @@ mod jetstream {
})
.await
.unwrap();

let consumer = context
.get_or_create_stream("events")
.await
.unwrap()
.create_consumer(consumer::pull::Config {
name: Some("name".to_string()),
..Default::default()
})
.await
.unwrap();

assert_eq!("name".to_string(), consumer.cached_info().name);

context
.get_or_create_stream("events")
.await
.unwrap()
.create_consumer(consumer::pull::Config {
durable_name: Some("namex".to_string()),
name: Some("namey".to_string()),
..Default::default()
})
.await
.unwrap_err();
}
#[tokio::test]
async fn delete_consumer() {
Expand Down