Skip to content

Commit

Permalink
Kv parity issues 94, 95, 96 (#599)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Mar 8, 2022
1 parent 897fe83 commit bf95780
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 168 deletions.
18 changes: 13 additions & 5 deletions src/main/java/io/nats/client/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
// limitations under the License.
package io.nats.client;

import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.KeyValueStatus;
import io.nats.client.api.KeyValueWatchOption;
import io.nats.client.api.KeyValueWatcher;
import io.nats.client.api.*;
import io.nats.client.impl.NatsKeyValueWatchSubscription;

import java.io.IOException;
Expand Down Expand Up @@ -196,7 +193,8 @@ public interface KeyValue {
List<KeyValueEntry> history(String key) throws IOException, JetStreamApiException, InterruptedException;

/**
* Remove history from all keys that currently are deleted or purged.
* Remove history from all keys that currently are deleted or purged
* with using a default KeyValuePurgeOptions
* THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
Expand All @@ -205,6 +203,16 @@ public interface KeyValue {
*/
void purgeDeletes() throws IOException, JetStreamApiException, InterruptedException;

/**
* Remove history from all keys that currently are deleted or purged, considering options.
* THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
* @throws InterruptedException if the thread is interrupted
*/
void purgeDeletes(KeyValuePurgeOptions options) throws IOException, JetStreamApiException, InterruptedException;

/**
* Get the KeyValueStatus object
* THIS IS A BETA FEATURE AND SUBJECT TO CHANGE
Expand Down
76 changes: 44 additions & 32 deletions src/main/java/io/nats/client/KeyValueOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,18 @@

import java.time.Duration;

import static io.nats.client.support.Validator.ensureEndsWithDot;
import static io.nats.client.support.Validator.validatePrefixOrDomain;

/**
* The KeyValueOptions class specifies the general options for KeyValueO.
* Options are created using the {@link KeyValueOptions.Builder Builder}.
*/
public class KeyValueOptions {

public static final Duration DEFAULT_TIMEOUT = Options.DEFAULT_CONNECTION_TIMEOUT;
public static final KeyValueOptions DEFAULT_JS_OPTIONS = new Builder().build();

private final String featurePrefix;
private final JetStreamOptions jso;

private KeyValueOptions(String featurePrefix, JetStreamOptions jso) {
this.featurePrefix = featurePrefix;
private KeyValueOptions(JetStreamOptions jso) {
this.jso = jso;
}

/**
* Gets the feature [subject] prefix.
* @return the prefix.
*/
public String getFeaturePrefix() {
return featurePrefix;
}

/**
* Gets the JetStream options for a KeyValue store
* @return the options
Expand Down Expand Up @@ -74,35 +58,63 @@ public static Builder builder(KeyValueOptions jso) {
*/
public static class Builder {

private String featurePrefix;
private JetStreamOptions jso;
private JetStreamOptions.Builder jsoBuilder;

public Builder() {}
public Builder() {
this(null);
}

public Builder(KeyValueOptions kvo) {
if (kvo != null) {
this.featurePrefix = kvo.featurePrefix;
this.jso = kvo.jso;
if (kvo == null) {
jsoBuilder = new JetStreamOptions.Builder();
}
else {
jsoBuilder = new JetStreamOptions.Builder(kvo.jso);
}
}

/**
* Sets the prefix for subject in features such as KeyValue.
* @param featurePrefix the feature prefix
* Sets the JetStreamOptions.
* @param jso the JetStreamOptions
* @return the builder.
*/
public Builder featurePrefix(String featurePrefix) {
this.featurePrefix = ensureEndsWithDot(validatePrefixOrDomain(featurePrefix, "Feature Prefix", false));
public Builder jetStreamOptions(JetStreamOptions jso) {
jsoBuilder = new JetStreamOptions.Builder(jso);
return this;
}

/**
* Sets the JetStreamOptions.
* @param jso the JetStreamOptions
* Sets the request timeout for JetStream API calls.
* @param requestTimeout the duration to wait for responses.
* @return the builder
*/
public Builder jsRequestTimeout(Duration requestTimeout) {
jsoBuilder.requestTimeout(requestTimeout);
return this;
}

/**
* Sets the prefix for JetStream subjects. A prefix can be used in conjunction with
* user permissions to restrict access to certain JetStream instances. This must
* match the prefix used in the server.
* @param prefix the JetStream prefix
* @return the builder.
*/
public Builder jetStreamOptions(JetStreamOptions jso) {
this.jso = jso;
public Builder jsPrefix(String prefix) {
jsoBuilder.prefix(prefix);
return this;
}

/**
* Sets the domain for JetStream subjects, creating a standard prefix from that domain
* in the form $JS.(domain).API.
* A domain can be used in conjunction with user permissions to restrict access to certain JetStream instances.
* This must match the domain used in the server.
* @param domain the JetStream domain
* @return the builder.
*/
public Builder jsDomain(String domain) {
jsoBuilder.domain(domain);
return this;
}

Expand All @@ -111,7 +123,7 @@ public Builder jetStreamOptions(JetStreamOptions jso) {
* @return JetStream options
*/
public KeyValueOptions build() {
return new KeyValueOptions(featurePrefix, jso);
return new KeyValueOptions(jsoBuilder.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public KeyValueConfiguration build() {
scBuilder.name(toStreamName(name))
.subjects(toStreamSubject(name))
.allowRollup(true)
.discardPolicy(DiscardPolicy.New)
.denyDelete(true);
return new KeyValueConfiguration(scBuilder.build());
}
Expand Down
15 changes: 2 additions & 13 deletions src/main/java/io/nats/client/api/KeyValueEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.Arrays;

import static io.nats.client.support.NatsJetStreamConstants.MSG_SIZE_HDR;
import static io.nats.client.support.NatsKeyValueUtil.BucketAndKey;
Expand Down Expand Up @@ -131,24 +130,14 @@ public boolean equals(Object o) {

KeyValueEntry that = (KeyValueEntry) o;

if (dataLen != that.dataLen) return false;
if (revision != that.revision) return false;
if (delta != that.delta) return false;
if (!bucketAndKey.equals(that.bucketAndKey)) return false;
if (!Arrays.equals(value, that.value)) return false;
if (!created.equals(that.created)) return false;
return op == that.op;
return bucketAndKey.equals(that.bucketAndKey)
&& revision == that.revision;
}

@Override
public int hashCode() {
int result = bucketAndKey.hashCode();
result = 31 * result + Arrays.hashCode(value);
result = 31 * result + (int) (dataLen ^ (dataLen >>> 32));
result = 31 * result + created.hashCode();
result = 31 * result + (int) (revision ^ (revision >>> 32));
result = 31 * result + (int) (delta ^ (delta >>> 32));
result = 31 * result + op.hashCode();
return result;
}
}
105 changes: 105 additions & 0 deletions src/main/java/io/nats/client/api/KeyValuePurgeOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client.api;

import java.time.Duration;

public class KeyValuePurgeOptions {

/**
* The default time in millis that is used for as the threshold to keep markers.
*/
public static long DEFAULT_THRESHOLD_MILLIS = Duration.ofMinutes(30).toMillis();

private final long deleteMarkersThresholdMillis;

private KeyValuePurgeOptions(Builder b) {
this.deleteMarkersThresholdMillis = b.deleteMarkersThresholdMillis;
}

/**
* The value of the delete marker threshold, in milliseconds.
* @return the threshold
*/
public long getDeleteMarkersThresholdMillis() {
return deleteMarkersThresholdMillis;
}

/**
* Creates a builder for the Key Value Purge Options.
* @return a key value purge options builder
*/
public static Builder builder() {
return new Builder();
}

/**
* KeyValuePurgeOptions is created using a Builder. The builder supports chaining and will
* create a default set of options if no methods are calls.
*
* <p>{@code new KeyValuePurgeOptions.Builder().build()} will create a new KeyValuePurgeOptions.
*
*/
public static class Builder {
private long deleteMarkersThresholdMillis = DEFAULT_THRESHOLD_MILLIS;

/**
* Set the delete marker threshold.
* Null or duration of 0 will assume the default threshold {@link #DEFAULT_THRESHOLD_MILLIS}
* Duration less than zero will assume no threshold and will not keep any markers.
* @param deleteMarkersThreshold the threshold duration or null
* @return The builder
*/
public Builder deleteMarkersThreshold(Duration deleteMarkersThreshold) {
this.deleteMarkersThresholdMillis = deleteMarkersThreshold == null
? DEFAULT_THRESHOLD_MILLIS : deleteMarkersThreshold.toMillis();
return this;
}

/**
* Set the delete marker threshold.
* 0 will assume the default threshold {@link #DEFAULT_THRESHOLD_MILLIS}
* Less than zero will assume no threshold and will not keep any markers.
* @param deleteMarkersThresholdMillis the threshold millis
* @return The builder
*/
public Builder deleteMarkersThreshold(long deleteMarkersThresholdMillis) {
this.deleteMarkersThresholdMillis = deleteMarkersThresholdMillis;
return this;
}

/**
* Set the delete marker threshold to -1 so as to not keep any markers
* @return The builder
*/
public Builder deleteMarkersNoThreshold() {
this.deleteMarkersThresholdMillis = -1;
return this;
}

/**
* Build the Key Value Purge Options
* @return the options
*/
public KeyValuePurgeOptions build() {
if (deleteMarkersThresholdMillis < 0) {
deleteMarkersThresholdMillis = -1;
}
else if (deleteMarkersThresholdMillis == 0) {
deleteMarkersThresholdMillis = DEFAULT_THRESHOLD_MILLIS;
}
return new KeyValuePurgeOptions(this);
}
}
}

0 comments on commit bf95780

Please sign in to comment.