Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kv parity issues 94, 95, 96 #599

Merged
merged 22 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4f07e7f
kv updated delivery policy and pending! instead of getLastMessage for…
scottf Feb 16, 2022
e5d977e
kv updated delivery policy and pending! instead of getLastMessage for…
scottf Feb 16, 2022
4ebf19d
kv updated delivery policy and pending! instead of getLastMessage for…
scottf Feb 16, 2022
915bab8
shoring up test coverage
scottf Feb 16, 2022
eae97ca
shoring up test coverage
scottf Feb 16, 2022
2ad90d2
Merge branch 'main' into kv-updates-issue-94
scottf Feb 16, 2022
d54f8ab
shoring up test coverage
scottf Feb 16, 2022
eee3db0
shoring up test coverage
scottf Feb 16, 2022
e180db1
issue 96 purge deletes
scottf Feb 17, 2022
0dd4ad8
issue 96 purge deletes fix test
scottf Feb 17, 2022
349843f
issue 95 use of prefix as specified in ADR
scottf Feb 18, 2022
ef4fdbe
issue 95 use of prefix as specified in ADR
scottf Feb 18, 2022
8c12d93
updated licence
scottf Feb 23, 2022
780b964
Merge branch 'main' into kv-updates-issue-94
scottf Feb 23, 2022
38cb5af
Merge branch 'main' into kv-updates-issue-94
scottf Feb 28, 2022
b352a52
doh missing last line in files
scottf Feb 28, 2022
29cdbab
Merge remote-tracking branch 'origin/kv-updates-issue-94' into kv-upd…
scottf Feb 28, 2022
4c23814
Merge branch 'main' into kv-updates-issue-94
scottf Mar 1, 2022
0089a72
Merge branch 'main' into kv-updates-issue-94
scottf Mar 1, 2022
6163bf9
Merge branch 'main' into kv-updates-issue-94
scottf Mar 3, 2022
b2b1a59
Merge branch 'main' into kv-updates-issue-94
scottf Mar 7, 2022
51a81dd
addressed comments
scottf Mar 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/main/java/io/nats/client/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
// limitations under the License.
package io.nats.client;

import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.KeyValueStatus;
import io.nats.client.api.KeyValueWatchOption;
import io.nats.client.api.KeyValueWatcher;
import io.nats.client.api.*;
import io.nats.client.impl.NatsKeyValueWatchSubscription;

import java.io.IOException;
Expand Down Expand Up @@ -196,7 +193,8 @@ public interface KeyValue {
List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException;

/**
* Remove history from all keys that currently are deleted or purged.
* Remove history from all keys that currently are deleted or purged
* with using a default KeyValuePurgeOptions
* THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove the BETA FEATURE comments if PR makes the KV API GA. Alternatively, that could be another PR - up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do it on another PR

* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
Expand All @@ -205,6 +203,16 @@ public interface KeyValue {
*/
void purgeDeletes() throws IOException, JetStreamApiException, InterruptedException;

/**
* Remove history from all keys that currently are deleted or purged, considering options.
* THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
void purgeDeletes(KeyValuePurgeOptions options) throws IOException, JetStreamApiException, InterruptedException;

/**
* Get the KeyValueStatus object
* THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
Expand Down
76 changes: 44 additions & 32 deletions src/main/java/io/nats/client/KeyValueOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,18 @@

import java.time.Duration;

import static io.nats.client.support.Validator.ensureEndsWithDot;
import static io.nats.client.support.Validator.validatePrefixOrDomain;

/**
* The KeyValueOptions class specifies the general options for KeyValueO.
* Options are created using the {@link KeyValueOptions.Builder Builder}.
*/
public class KeyValueOptions {

public static final Duration DEFAULT_TIMEOUT = Options.DEFAULT_CONNECTION_TIMEOUT;
public static final KeyValueOptions DEFAULT_JS_OPTIONS = new Builder().build();

private final String featurePrefix;
private final JetStreamOptions jso;

private KeyValueOptions(String featurePrefix, JetStreamOptions jso) {
this.featurePrefix = featurePrefix;
private KeyValueOptions(JetStreamOptions jso) {
this.jso = jso;
}

/**
* Gets the feature [subject] prefix.
* @return the prefix.
*/
public String getFeaturePrefix() {
return featurePrefix;
}

/**
* Gets the JetStream options for a KeyValue store
* @return the options
Expand Down Expand Up @@ -74,35 +58,63 @@ public static Builder builder(KeyValueOptions jso) {
*/
public static class Builder {

private String featurePrefix;
private JetStreamOptions jso;
private JetStreamOptions.Builder jsoBuilder;

public Builder() {}
public Builder() {
this(null);
}

public Builder(KeyValueOptions kvo) {
if (kvo != null) {
this.featurePrefix = kvo.featurePrefix;
this.jso = kvo.jso;
if (kvo == null) {
jsoBuilder = new JetStreamOptions.Builder();
}
else {
jsoBuilder = new JetStreamOptions.Builder(kvo.jso);
}
}

/**
* Sets the prefix for subject in features such as KeyValue.
* @param featurePrefix the feature prefix
* Sets the JetStreamOptions.
* @param jso the JetStreamOptions
* @return the builder.
*/
public Builder featurePrefix(String featurePrefix) {
this.featurePrefix = ensureEndsWithDot(validatePrefixOrDomain(featurePrefix, "Feature Prefix", false));
public Builder jetStreamOptions(JetStreamOptions jso) {
jsoBuilder = new JetStreamOptions.Builder(jso);
return this;
}

/**
* Sets the JetStreamOptions.
* @param jso the JetStreamOptions
* Sets the request timeout for JetStream API calls.
* @param requestTimeout the duration to wait for responses.
* @return the builder
*/
public Builder jsRequestTimeout(Duration requestTimeout) {
jsoBuilder.requestTimeout(requestTimeout);
return this;
}

/**
* Sets the prefix for JetStream subjects. A prefix can be used in conjunction with
* user permissions to restrict access to certain JetStream instances. This must
* match the prefix used in the server.
* @param prefix the JetStream prefix
* @return the builder.
*/
public Builder jetStreamOptions(JetStreamOptions jso) {
this.jso = jso;
public Builder jsPrefix(String prefix) {
jsoBuilder.prefix(prefix);
return this;
}

/**
* Sets the domain for JetStream subjects, creating a standard prefix from that domain
* in the form $JS.(domain).API.
* A domain can be used in conjunction with user permissions to restrict access to certain JetStream instances.
* This must match the domain used in the server.
* @param domain the JetStream domain
* @return the builder.
*/
public Builder jsDomain(String domain) {
jsoBuilder.domain(domain);
return this;
}

Expand All @@ -111,7 +123,7 @@ public Builder jetStreamOptions(JetStreamOptions jso) {
* @return JetStream options
*/
public KeyValueOptions build() {
return new KeyValueOptions(featurePrefix, jso);
return new KeyValueOptions(jsoBuilder.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public KeyValueConfiguration build() {
scBuilder.name(toStreamName(name))
.subjects(toStreamSubject(name))
.allowRollup(true)
.discardPolicy(DiscardPolicy.New)
.denyDelete(true);
return new KeyValueConfiguration(scBuilder.build());
}
Expand Down
15 changes: 2 additions & 13 deletions src/main/java/io/nats/client/api/KeyValueEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.Arrays;

import static io.nats.client.support.NatsJetStreamConstants.MSG_SIZE_HDR;
import static io.nats.client.support.NatsKeyValueUtil.BucketAndKey;
Expand Down Expand Up @@ -131,24 +130,14 @@ public boolean equals(Object o) {

KeyValueEntry that = (KeyValueEntry) o;

if (dataLen != that.dataLen) return false;
if (revision != that.revision) return false;
if (delta != that.delta) return false;
if (!bucketAndKey.equals(that.bucketAndKey)) return false;
if (!Arrays.equals(value, that.value)) return false;
if (!created.equals(that.created)) return false;
return op == that.op;
return bucketAndKey.equals(that.bucketAndKey)
&& revision == that.revision;
}

@Override
public int hashCode() {
int result = bucketAndKey.hashCode();
result = 31 * result + Arrays.hashCode(value);
result = 31 * result + (int) (dataLen ^ (dataLen >>> 32));
result = 31 * result + created.hashCode();
result = 31 * result + (int) (revision ^ (revision >>> 32));
result = 31 * result + (int) (delta ^ (delta >>> 32));
result = 31 * result + op.hashCode();
return result;
}
}
105 changes: 105 additions & 0 deletions src/main/java/io/nats/client/api/KeyValuePurgeOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import java.time.Duration;

public class KeyValuePurgeOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will do


/**
* The default time in millis that is used for as the threshold to keep markers.
*/
public static long DEFAULT_THRESHOLD_MILLIS = Duration.ofMinutes(30).toMillis();

private final long deleteMarkersThresholdMillis;

private KeyValuePurgeOptions(Builder b) {
this.deleteMarkersThresholdMillis = b.deleteMarkersThresholdMillis;
}

/**
* The value of the delete marker threshold, in milliseconds.
* @return the threshold
*/
public long getDeleteMarkersThresholdMillis() {
return deleteMarkersThresholdMillis;
}

/**
* Creates a builder for the Key Value Purge Options.
* @return a key value purge options builder
*/
public static Builder builder() {
return new Builder();
}

/**
* KeyValuePurgeOptions is created using a Builder. The builder supports chaining and will
* create a default set of options if no methods are calls.
*
* <p>{@code new KeyValuePurgeOptions.Builder().build()} will create a new KeyValuePurgeOptions.
*
*/
public static class Builder {
private long deleteMarkersThresholdMillis = DEFAULT_THRESHOLD_MILLIS;

/**
* Set the delete marker threshold.
* Null or duration of 0 will assume the default threshold {@link #DEFAULT_THRESHOLD_MILLIS}
* Duration less than zero will assume no threshold and will not keep any markers.
* @param deleteMarkersThreshold the threshold duration or null
* @return The builder
*/
public Builder deleteMarkersThreshold(Duration deleteMarkersThreshold) {
this.deleteMarkersThresholdMillis = deleteMarkersThreshold == null
? DEFAULT_THRESHOLD_MILLIS : deleteMarkersThreshold.toMillis();
return this;
}

/**
* Set the delete marker threshold.
* 0 will assume the default threshold {@link #DEFAULT_THRESHOLD_MILLIS}
* Less than zero will assume no threshold and will not keep any markers.
* @param deleteMarkersThresholdMillis the threshold millis
* @return The builder
*/
public Builder deleteMarkersThreshold(long deleteMarkersThresholdMillis) {
this.deleteMarkersThresholdMillis = deleteMarkersThresholdMillis;
return this;
}

/**
* Set the delete marker threshold to -1 so as to not keep any markers
* @return The builder
*/
public Builder deleteMarkersNoThreshold() {
this.deleteMarkersThresholdMillis = -1;
return this;
}

/**
* Build the Key Value Purge Options
* @return the options
*/
public KeyValuePurgeOptions build() {
if (deleteMarkersThresholdMillis < 0) {
deleteMarkersThresholdMillis = -1;
}
else if (deleteMarkersThresholdMillis == 0) {
deleteMarkersThresholdMillis = DEFAULT_THRESHOLD_MILLIS;
}
return new KeyValuePurgeOptions(this);
}
}
}