Skip to content

Commit

Permalink
Add sources and mirror to stream config
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
Jarema committed Oct 26, 2022
1 parent 8561a18 commit 2eb5459
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 1 deletion.
49 changes: 48 additions & 1 deletion async-nats/src/jetstream/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{Error, StatusCode};
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use serde_json::json;
use time::serde::rfc3339;
use time::{serde::rfc3339, OffsetDateTime};

use super::{
consumer::{self, Consumer, FromConsumer, IntoConsumerConfig},
Expand Down Expand Up @@ -867,11 +867,22 @@ pub struct Config {
#[serde(default, skip_serializing_if = "is_default")]
pub deny_purge: bool,

/// Optional republish config.
#[serde(default, skip_serializing_if = "is_default")]
pub republish: Option<Republish>,

/// Enables direct get, which would get messages from
/// non-leader.
#[serde(default, skip_serializing_if = "is_default")]
pub allow_direct: bool,

/// Stream mirror configuration.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mirror: Option<Source>,

/// Sources configration.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sources: Option<Vec<Source>>,
}

impl From<&Config> for Config {
Expand Down Expand Up @@ -1204,3 +1215,39 @@ pub struct PurgeRequest {
#[serde(default, skip_serializing_if = "is_default")]
pub keep: Option<u64>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct Source {
/// Name of the stream source.
pub name: String,
/// Optional source start sequence.
#[serde(default, rename = "opt_start_seq", skip_serializing_if = "is_default")]
pub start_sequence: Option<u64>,
#[serde(
default,
rename = "opt_start_time",
skip_serializing_if = "is_default",
with = "rfc3339::option"
)]
/// Optional source start time.
pub start_time: Option<OffsetDateTime>,
/// Optional additional filter subject.
#[serde(default, skip_serializing_if = "is_default")]
pub filter_subject: Option<String>,
/// Optional config for sourcing streams from another prefix, used for cross-account.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub external: Option<External>,
/// Optional config to set a domain, if source is residing in different one.
#[serde(default, rename = "opt_start", skip_serializing_if = "is_default")]
pub doamin: Option<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Default)]
pub struct External {
/// Api prefix of external source.
#[serde(rename = "api")]
pub api_prefix: String,
/// Optional configuration of delivery prefix.
#[serde(rename = "deliver", skip_serializing_if = "is_default")]
pub delivery_prefix: Option<String>,
}
160 changes: 160 additions & 0 deletions async-nats/tests/jetstream_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1997,4 +1997,164 @@ mod jetstream {
.await
.expect_err("should get 503 maximum messages per subject exceeded error");
}

#[tokio::test]
async fn mirrors() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "TEST".to_string(),
subjects: vec!["events".to_string()],
..Default::default()
})
.await
.unwrap();

jetstream
.publish("events".to_string(), "skipped".into())
.await
.unwrap();
jetstream
.publish("events".to_string(), "data".into())
.await
.unwrap();

let mut mirror = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "MIRROR".to_string(),
mirror: Some(async_nats::jetstream::stream::Source {
name: stream.cached_info().config.name.clone(),
start_sequence: Some(2),
..Default::default()
}),
..Default::default()
})
.await
.unwrap();

let mirror_info = mirror.info().await.unwrap();
assert_eq!(
mirror_info.config.mirror.as_ref().unwrap().name.as_str(),
"TEST"
);

let mut messages = mirror
.create_consumer(async_nats::jetstream::consumer::pull::Config {
name: Some("consumer".to_string()),
..Default::default()
})
.await
.unwrap()
.messages()
.await
.unwrap();

assert_eq!(
from_utf8(&messages.next().await.unwrap().unwrap().message.payload).unwrap(),
"data".to_string(),
)
}
#[tokio::test]
async fn sources() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();

let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "TEST".to_string(),
subjects: vec!["events".to_string()],
..Default::default()
})
.await
.unwrap();
let stream2 = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "TEST2".to_string(),
subjects: vec!["events2".to_string()],
..Default::default()
})
.await
.unwrap();

jetstream
.publish("events".to_string(), "skipped".into())
.await
.unwrap();
jetstream
.publish("events".to_string(), "data".into())
.await
.unwrap();
jetstream
.publish("events2".to_string(), "data".into())
.await
.unwrap();
jetstream
.publish("events2".to_string(), "data".into())
.await
.unwrap();

let mut source = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "SOURCE".to_string(),
sources: Some(vec![
async_nats::jetstream::stream::Source {
name: stream.cached_info().config.name.clone(),
start_sequence: Some(2),
..Default::default()
},
async_nats::jetstream::stream::Source {
name: stream2.cached_info().config.name.clone(),
start_sequence: Some(1),
..Default::default()
},
]),
..Default::default()
})
.await
.unwrap();

let source_info = source.info().await.unwrap();
assert_eq!(
source_info.config.sources.as_ref().unwrap()[0]
.name
.as_str(),
"TEST"
);
assert_eq!(
source_info.config.sources.as_ref().unwrap()[1]
.name
.as_str(),
"TEST2"
);

let mut messages = source
.create_consumer(async_nats::jetstream::consumer::pull::Config {
name: Some("consumer".to_string()),
..Default::default()
})
.await
.unwrap()
.messages()
.await
.unwrap();

assert_eq!(
from_utf8(&messages.next().await.unwrap().unwrap().message.payload).unwrap(),
"data".to_string(),
);
assert_eq!(
from_utf8(&messages.next().await.unwrap().unwrap().message.payload).unwrap(),
"data".to_string(),
);
assert_eq!(
from_utf8(&messages.next().await.unwrap().unwrap().message.payload).unwrap(),
"data".to_string(),
);
}
}

0 comments on commit 2eb5459

Please sign in to comment.