Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kv): Fixes issues with using the kv store when a JS domain is set #383

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion nats/src/jetstream/mod.rs
Expand Up @@ -140,6 +140,7 @@ use crate::{
#[derive(Clone)]
pub struct JetStreamOptions {
pub(crate) api_prefix: String,
pub(crate) has_domain: bool,
}

impl Debug for JetStreamOptions {
Expand All @@ -154,6 +155,7 @@ impl Default for JetStreamOptions {
fn default() -> JetStreamOptions {
JetStreamOptions {
api_prefix: "$JS.API.".to_string(),
has_domain: false,
}
}
}
Expand Down Expand Up @@ -195,10 +197,11 @@ impl JetStreamOptions {
/// let options = nats::JetStreamOptions::new()
/// .domain("some_domain");
/// ```
pub fn domain(self, domain: &str) -> Self {
pub fn domain(mut self, domain: &str) -> Self {
if domain.is_empty() {
self.api_prefix("".to_string())
} else {
self.has_domain = true;
self.api_prefix(format!("$JS.{}.API", domain))
}
}
Expand Down
22 changes: 19 additions & 3 deletions nats/src/kv.rs
Expand Up @@ -153,13 +153,15 @@ impl JetStream {
));
}

let prefix = format!("$KV.{}.", bucket);

Ok(Store {
name: bucket.to_string(),
stream_name,
prefix,
prefix: format!("$KV.{}.", bucket),
context: self.clone(),
domain_prefix: self
.options
.has_domain
.then(|| self.options.api_prefix.clone()),
})
}

Expand Down Expand Up @@ -245,6 +247,10 @@ impl JetStream {
stream_name: stream_info.config.name,
prefix: format!("$KV.{}.", config.bucket),
context: self.clone(),
domain_prefix: self
.options
.has_domain
.then(|| self.options.api_prefix.clone()),
})
}

Expand Down Expand Up @@ -314,6 +320,7 @@ pub struct Store {
stream_name: String,
prefix: String,
context: JetStream,
domain_prefix: Option<String>,
}

impl Store {
Expand Down Expand Up @@ -452,6 +459,9 @@ impl Store {
}

let mut subject = String::new();
if let Some(api_prefix) = self.domain_prefix.as_ref() {
subject.push_str(api_prefix);
}
subject.push_str(&self.prefix);
subject.push_str(key);

Expand Down Expand Up @@ -526,6 +536,9 @@ impl Store {
}

let mut subject = String::new();
if let Some(api_prefix) = self.domain_prefix.as_ref() {
subject.push_str(api_prefix);
}
subject.push_str(&self.prefix);
subject.push_str(key);

Expand Down Expand Up @@ -570,6 +583,9 @@ impl Store {
}

let mut subject = String::new();
if let Some(api_prefix) = self.domain_prefix.as_ref() {
subject.push_str(api_prefix);
}
subject.push_str(&self.prefix);
subject.push_str(key);

Expand Down
16 changes: 16 additions & 0 deletions nats/tests/configs/jetstream-domain-leaf.conf
@@ -0,0 +1,16 @@
port: 5223
monitor_port: 8223
leafnodes {
remotes = [
{
urls: [
"nats://127.0.0.1:7422",
]
}
]
}
jetstream {
domain = leaf,
max_mem_store: 8MiB,
max_file_store: 10GiB
}
10 changes: 10 additions & 0 deletions nats/tests/configs/jetstream-domain.conf
@@ -0,0 +1,10 @@
port: 5222
monitor_port: 8222
leafnodes {
port: 7422
}
jetstream {
domain = foobar,
max_mem_store: 8MiB,
max_file_store: 10GiB
}
35 changes: 35 additions & 0 deletions nats/tests/kv.rs
Expand Up @@ -339,3 +339,38 @@ fn key_value_keys() {
assert!(keys.iter().any(|s| s == "baz"));
assert_eq!(keys.len(), 1);
}

#[test]
// Sanity check that everything works if you are using a JS domain across leaf nodes. This does not
// attempt to exercise all logic, just the happy path, as the other logic is exercised in the other
// tests
fn key_value_domain() {
let _server = util::run_server("tests/configs/jetstream-domain.conf");
let leaf_server = util::run_server("tests/configs/jetstream-domain-leaf.conf");
let client = nats::connect(&leaf_server.client_url()).unwrap();
let opts = nats::jetstream::JetStreamOptions::new().domain("foobar");
let context = nats::jetstream::JetStream::new(client, opts);

let kv = context
.create_key_value(&Config {
bucket: "KVS".to_string(),
history: 2,
..Default::default()
})
.unwrap();

let revision = kv.put("foo", b"bar").expect("should be able put a value");
let val = kv.get("foo").expect("should be able to get key").unwrap();
assert_eq!(val, b"bar", "should have received the correct value");

kv.update("foo", b"baz", revision)
.expect("should be able to update");
let val = kv.get("foo").expect("should be able to get key").unwrap();
assert_eq!(
val, b"baz",
"should have received the correct value after update"
);

// Try a delete too for good measure
kv.delete("bar").expect("should be able to delete");
}