Skip to content

Commit

Permalink
Fix issues with using the kv store when a JS domain is set
Browse files Browse the repository at this point in the history
This is essentially the same fix as nats-io/nats.go#910
in the go client, complete with test to make sure things work
  • Loading branch information
thomastaylor312 authored and Jarema committed Apr 25, 2022
1 parent 4a60fc6 commit cbfc976
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 4 deletions.
5 changes: 4 additions & 1 deletion nats/src/jetstream/mod.rs
Expand Up @@ -140,6 +140,7 @@ use crate::{
#[derive(Clone)]
pub struct JetStreamOptions {
pub(crate) api_prefix: String,
pub(crate) has_domain: bool,
}

impl Debug for JetStreamOptions {
Expand All @@ -154,6 +155,7 @@ impl Default for JetStreamOptions {
fn default() -> JetStreamOptions {
JetStreamOptions {
api_prefix: "$JS.API.".to_string(),
has_domain: false,
}
}
}
Expand Down Expand Up @@ -195,10 +197,11 @@ impl JetStreamOptions {
/// let options = nats::JetStreamOptions::new()
/// .domain("some_domain");
/// ```
pub fn domain(self, domain: &str) -> Self {
pub fn domain(mut self, domain: &str) -> Self {
if domain.is_empty() {
self.api_prefix("".to_string())
} else {
self.has_domain = true;
self.api_prefix(format!("$JS.{}.API", domain))
}
}
Expand Down
22 changes: 19 additions & 3 deletions nats/src/kv.rs
Expand Up @@ -153,13 +153,15 @@ impl JetStream {
));
}

let prefix = format!("$KV.{}.", bucket);

Ok(Store {
name: bucket.to_string(),
stream_name,
prefix,
prefix: format!("$KV.{}.", bucket),
context: self.clone(),
domain_prefix: self
.options
.has_domain
.then(|| self.options.api_prefix.clone()),
})
}

Expand Down Expand Up @@ -245,6 +247,10 @@ impl JetStream {
stream_name: stream_info.config.name,
prefix: format!("$KV.{}.", config.bucket),
context: self.clone(),
domain_prefix: self
.options
.has_domain
.then(|| self.options.api_prefix.clone()),
})
}

Expand Down Expand Up @@ -314,6 +320,7 @@ pub struct Store {
stream_name: String,
prefix: String,
context: JetStream,
domain_prefix: Option<String>,
}

impl Store {
Expand Down Expand Up @@ -452,6 +459,9 @@ impl Store {
}

let mut subject = String::new();
if let Some(api_prefix) = self.domain_prefix.as_ref() {
subject.push_str(api_prefix);
}
subject.push_str(&self.prefix);
subject.push_str(key);

Expand Down Expand Up @@ -526,6 +536,9 @@ impl Store {
}

let mut subject = String::new();
if let Some(api_prefix) = self.domain_prefix.as_ref() {
subject.push_str(api_prefix);
}
subject.push_str(&self.prefix);
subject.push_str(key);

Expand Down Expand Up @@ -570,6 +583,9 @@ impl Store {
}

let mut subject = String::new();
if let Some(api_prefix) = self.domain_prefix.as_ref() {
subject.push_str(api_prefix);
}
subject.push_str(&self.prefix);
subject.push_str(key);

Expand Down
16 changes: 16 additions & 0 deletions nats/tests/configs/jetstream-domain-leaf.conf
@@ -0,0 +1,16 @@
port: 5223
monitor_port: 8223
leafnodes {
remotes = [
{
urls: [
"nats://127.0.0.1:7422",
]
}
]
}
jetstream {
domain = leaf,
max_mem_store: 8MiB,
max_file_store: 10GiB
}
10 changes: 10 additions & 0 deletions nats/tests/configs/jetstream-domain.conf
@@ -0,0 +1,10 @@
port: 5222
monitor_port: 8222
leafnodes {
port: 7422
}
jetstream {
domain = foobar,
max_mem_store: 8MiB,
max_file_store: 10GiB
}
35 changes: 35 additions & 0 deletions nats/tests/kv.rs
Expand Up @@ -339,3 +339,38 @@ fn key_value_keys() {
assert!(keys.iter().any(|s| s == "baz"));
assert_eq!(keys.len(), 1);
}

#[test]
// Sanity check that everything works if you are using a JS domain across leaf nodes. This does not
// attempt to exercise all logic, just the happy path, as the other logic is exercised in the other
// tests
fn key_value_domain() {
let _server = util::run_server("tests/configs/jetstream-domain.conf");
let leaf_server = util::run_server("tests/configs/jetstream-domain-leaf.conf");
let client = nats::connect(&leaf_server.client_url()).unwrap();
let opts = nats::jetstream::JetStreamOptions::new().domain("foobar");
let context = nats::jetstream::JetStream::new(client, opts);

let kv = context
.create_key_value(&Config {
bucket: "KVS".to_string(),
history: 2,
..Default::default()
})
.unwrap();

let revision = kv.put("foo", b"bar").expect("should be able put a value");
let val = kv.get("foo").expect("should be able to get key").unwrap();
assert_eq!(val, b"bar", "should have received the correct value");

kv.update("foo", b"baz", revision)
.expect("should be able to update");
let val = kv.get("foo").expect("should be able to get key").unwrap();
assert_eq!(
val, b"baz",
"should have received the correct value after update"
);

// Try a delete too for good measure
kv.delete("bar").expect("should be able to delete");
}

0 comments on commit cbfc976

Please sign in to comment.