From 1d71738f85eb4cae5bb299c6591edcc88b6a5bd0 Mon Sep 17 00:00:00 2001 From: Taylor Thomas Date: Thu, 21 Apr 2022 18:06:42 -0600 Subject: [PATCH] fix(kv): Fixes issues with using the kv store when a JS domain is set This is essentially the same fix as https://github.com/nats-io/nats.go/pull/910 in the go client, complete with test to make sure things work --- nats/src/jetstream/mod.rs | 5 ++- nats/src/kv.rs | 22 ++++++++++-- nats/tests/configs/jetstream-domain-leaf.conf | 16 +++++++++ nats/tests/configs/jetstream-domain.conf | 10 ++++++ nats/tests/kv.rs | 35 +++++++++++++++++++ 5 files changed, 84 insertions(+), 4 deletions(-) create mode 100644 nats/tests/configs/jetstream-domain-leaf.conf create mode 100644 nats/tests/configs/jetstream-domain.conf diff --git a/nats/src/jetstream/mod.rs b/nats/src/jetstream/mod.rs index 0a9e6a9ad..ac27645db 100644 --- a/nats/src/jetstream/mod.rs +++ b/nats/src/jetstream/mod.rs @@ -140,6 +140,7 @@ use crate::{ #[derive(Clone)] pub struct JetStreamOptions { pub(crate) api_prefix: String, + pub(crate) has_domain: bool, } impl Debug for JetStreamOptions { @@ -154,6 +155,7 @@ impl Default for JetStreamOptions { fn default() -> JetStreamOptions { JetStreamOptions { api_prefix: "$JS.API.".to_string(), + has_domain: false, } } } @@ -195,10 +197,11 @@ impl JetStreamOptions { /// let options = nats::JetStreamOptions::new() /// .domain("some_domain"); /// ``` - pub fn domain(self, domain: &str) -> Self { + pub fn domain(mut self, domain: &str) -> Self { if domain.is_empty() { self.api_prefix("".to_string()) } else { + self.has_domain = true; self.api_prefix(format!("$JS.{}.API", domain)) } } diff --git a/nats/src/kv.rs b/nats/src/kv.rs index 305a32d50..1b1420922 100644 --- a/nats/src/kv.rs +++ b/nats/src/kv.rs @@ -153,13 +153,15 @@ impl JetStream { )); } - let prefix = format!("$KV.{}.", bucket); - Ok(Store { name: bucket.to_string(), stream_name, - prefix, + prefix: format!("$KV.{}.", bucket), context: self.clone(), + domain_prefix: self + .options + .has_domain + .then(|| self.options.api_prefix.clone()), }) } @@ -245,6 +247,10 @@ impl JetStream { stream_name: stream_info.config.name, prefix: format!("$KV.{}.", config.bucket), context: self.clone(), + domain_prefix: self + .options + .has_domain + .then(|| self.options.api_prefix.clone()), }) } @@ -314,6 +320,7 @@ pub struct Store { stream_name: String, prefix: String, context: JetStream, + domain_prefix: Option, } impl Store { @@ -452,6 +459,9 @@ impl Store { } let mut subject = String::new(); + if let Some(api_prefix) = self.domain_prefix.as_ref() { + subject.push_str(api_prefix); + } subject.push_str(&self.prefix); subject.push_str(key); @@ -526,6 +536,9 @@ impl Store { } let mut subject = String::new(); + if let Some(api_prefix) = self.domain_prefix.as_ref() { + subject.push_str(api_prefix); + } subject.push_str(&self.prefix); subject.push_str(key); @@ -570,6 +583,9 @@ impl Store { } let mut subject = String::new(); + if let Some(api_prefix) = self.domain_prefix.as_ref() { + subject.push_str(api_prefix); + } subject.push_str(&self.prefix); subject.push_str(key); diff --git a/nats/tests/configs/jetstream-domain-leaf.conf b/nats/tests/configs/jetstream-domain-leaf.conf new file mode 100644 index 000000000..85bed995a --- /dev/null +++ b/nats/tests/configs/jetstream-domain-leaf.conf @@ -0,0 +1,16 @@ +port: 5223 +monitor_port: 8223 +leafnodes { + remotes = [ + { + urls: [ + "nats://127.0.0.1:7422", + ] + } + ] +} +jetstream { + domain = leaf, + max_mem_store: 8MiB, + max_file_store: 10GiB +} \ No newline at end of file diff --git a/nats/tests/configs/jetstream-domain.conf b/nats/tests/configs/jetstream-domain.conf new file mode 100644 index 000000000..219f4d963 --- /dev/null +++ b/nats/tests/configs/jetstream-domain.conf @@ -0,0 +1,10 @@ +port: 5222 +monitor_port: 8222 +leafnodes { + port: 7422 +} +jetstream { + domain = foobar, + max_mem_store: 8MiB, + max_file_store: 10GiB +} \ No newline at end of file diff --git a/nats/tests/kv.rs b/nats/tests/kv.rs index 108f8cbda..0d6a8041d 100644 --- a/nats/tests/kv.rs +++ b/nats/tests/kv.rs @@ -339,3 +339,38 @@ fn key_value_keys() { assert!(keys.iter().any(|s| s == "baz")); assert_eq!(keys.len(), 1); } + +#[test] +// Sanity check that everything works if you are using a JS domain across leaf nodes. This does not +// attempt to exercise all logic, just the happy path, as the other logic is exercised in the other +// tests +fn key_value_domain() { + let _server = util::run_server("tests/configs/jetstream-domain.conf"); + let leaf_server = util::run_server("tests/configs/jetstream-domain-leaf.conf"); + let client = nats::connect(&leaf_server.client_url()).unwrap(); + let opts = nats::jetstream::JetStreamOptions::new().domain("foobar"); + let context = nats::jetstream::JetStream::new(client, opts); + + let kv = context + .create_key_value(&Config { + bucket: "KVS".to_string(), + history: 2, + ..Default::default() + }) + .unwrap(); + + let revision = kv.put("foo", b"bar").expect("should be able put a value"); + let val = kv.get("foo").expect("should be able to get key").unwrap(); + assert_eq!(val, b"bar", "should have received the correct value"); + + kv.update("foo", b"baz", revision) + .expect("should be able to update"); + let val = kv.get("foo").expect("should be able to get key").unwrap(); + assert_eq!( + val, b"baz", + "should have received the correct value after update" + ); + + // Try a delete too for good measure + kv.delete("bar").expect("should be able to delete"); +}