Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sources and mirror to stream config #673

Merged
merged 1 commit into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 48 additions & 1 deletion async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{Error, StatusCode};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_json::json;
use time::serde::rfc3339;
use time::{serde::rfc3339, OffsetDateTime};

use super::{
consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
Expand Down Expand Up @@ -867,11 +867,22 @@ pub struct Config {
#[serde(default, skip_serializing_if = "is_default")]
pub deny_purge: bool,

/// Optional republish config.
#[serde(default, skip_serializing_if = "is_default")]
pub republish: Option<Republish>,

/// Enables direct get, which would get messages from
/// non-leader.
#[serde(default, skip_serializing_if = "is_default")]
pub allow_direct: bool,

/// Stream mirror configuration.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mirror: Option<Source>,

/// Sources configration.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sources: Option<Vec<Source>>,
}

impl From<&Config> for Config {
Expand Down Expand Up @@ -1204,3 +1215,39 @@ pub struct PurgeRequest {
#[serde(default, skip_serializing_if = "is_default")]
pub keep: Option<u64>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct Source {
/// Name of the stream source.
pub name: String,
/// Optional source start sequence.
#[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
pub start_sequence: Option<u64>,
#[serde(
default,
rename = "opt_start_time",
skip_serializing_if = "is_default",
with = "rfc3339::option"
)]
/// Optional source start time.
pub start_time: Option<OffsetDateTime>,
/// Optional additional filter subject.
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: Option<String>,
/// Optional config for sourcing streams from another prefix, used for cross-account.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub external: Option<External>,
/// Optional config to set a domain, if source is residing in different one.
#[serde(default, rename = "opt_start", skip_serializing_if = "is_default")]
pub doamin: Option<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct External {
/// Api prefix of external source.
#[serde(rename = "api")]
pub api_prefix: String,
/// Optional configuration of delivery prefix.
#[serde(rename = "deliver", skip_serializing_if = "is_default")]
pub delivery_prefix: Option<String>,
}
160 changes: 160 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1997,4 +1997,164 @@ mod jetstream {
.await
.expect_err("should get 503 maximum messages per subject exceeded error");
}

#[tokio::test]
async fn mirrors() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "TEST".to_string(),
subjects: vec!["events".to_string()],
..Default::default()
})
.await
.unwrap();

jetstream
.publish("events".to_string(), "skipped".into())
.await
.unwrap();
jetstream
.publish("events".to_string(), "data".into())
.await
.unwrap();

let mut mirror = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "MIRROR".to_string(),
mirror: Some(async_nats::jetstream::stream::Source {
name: stream.cached_info().config.name.clone(),
start_sequence: Some(2),
..Default::default()
}),
..Default::default()
})
.await
.unwrap();

let mirror_info = mirror.info().await.unwrap();
assert_eq!(
mirror_info.config.mirror.as_ref().unwrap().name.as_str(),
"TEST"
);

let mut messages = mirror
.create_consumer(async_nats::jetstream::consumer::pull::Config {
name: Some("consumer".to_string()),
..Default::default()
})
.await
.unwrap()
.messages()
.await
.unwrap();

assert_eq!(
from_utf8(&messages.next().await.unwrap().unwrap().message.payload).unwrap(),
"data".to_string(),
)
}
#[tokio::test]
async fn sources() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "TEST".to_string(),
subjects: vec!["events".to_string()],
..Default::default()
})
.await
.unwrap();
let stream2 = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "TEST2".to_string(),
subjects: vec!["events2".to_string()],
..Default::default()
})
.await
.unwrap();

jetstream
.publish("events".to_string(), "skipped".into())
.await
.unwrap();
jetstream
.publish("events".to_string(), "data".into())
.await
.unwrap();
jetstream
.publish("events2".to_string(), "data".into())
.await
.unwrap();
jetstream
.publish("events2".to_string(), "data".into())
.await
.unwrap();

let mut source = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "SOURCE".to_string(),
sources: Some(vec![
async_nats::jetstream::stream::Source {
name: stream.cached_info().config.name.clone(),
start_sequence: Some(2),
..Default::default()
},
async_nats::jetstream::stream::Source {
name: stream2.cached_info().config.name.clone(),
start_sequence: Some(1),
..Default::default()
},
]),
..Default::default()
})
.await
.unwrap();

let source_info = source.info().await.unwrap();
assert_eq!(
source_info.config.sources.as_ref().unwrap()[0]
.name
.as_str(),
"TEST"
);
assert_eq!(
source_info.config.sources.as_ref().unwrap()[1]
.name
.as_str(),
"TEST2"
);

let mut messages = source
.create_consumer(async_nats::jetstream::consumer::pull::Config {
name: Some("consumer".to_string()),
..Default::default()
})
.await
.unwrap()
.messages()
.await
.unwrap();

assert_eq!(
from_utf8(&messages.next().await.unwrap().unwrap().message.payload).unwrap(),
"data".to_string(),
);
assert_eq!(
from_utf8(&messages.next().await.unwrap().unwrap().message.payload).unwrap(),
"data".to_string(),
);
assert_eq!(
from_utf8(&messages.next().await.unwrap().unwrap().message.payload).unwrap(),
"data".to_string(),
);
}
}