Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kv parity issues 94, 95, 96 #599

Merged
merged 22 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4f07e7f
kv updated delivery policy and pending! instead of getLastMessage for…
scottf Feb 16, 2022
e5d977e
kv updated delivery policy and pending! instead of getLastMessage for…
scottf Feb 16, 2022
4ebf19d
kv updated delivery policy and pending! instead of getLastMessage for…
scottf Feb 16, 2022
915bab8
shoring up test coverage
scottf Feb 16, 2022
eae97ca
shoring up test coverage
scottf Feb 16, 2022
2ad90d2
Merge branch 'main' into kv-updates-issue-94
scottf Feb 16, 2022
d54f8ab
shoring up test coverage
scottf Feb 16, 2022
eee3db0
shoring up test coverage
scottf Feb 16, 2022
e180db1
issue 96 purge deletes
scottf Feb 17, 2022
0dd4ad8
issue 96 purge deletes fix test
scottf Feb 17, 2022
349843f
issue 95 use of prefix as specified in ADR
scottf Feb 18, 2022
ef4fdbe
issue 95 use of prefix as specified in ADR
scottf Feb 18, 2022
8c12d93
updated licence
scottf Feb 23, 2022
780b964
Merge branch 'main' into kv-updates-issue-94
scottf Feb 23, 2022
38cb5af
Merge branch 'main' into kv-updates-issue-94
scottf Feb 28, 2022
b352a52
doh missing last line in files
scottf Feb 28, 2022
29cdbab
Merge remote-tracking branch 'origin/kv-updates-issue-94' into kv-upd…
scottf Feb 28, 2022
4c23814
Merge branch 'main' into kv-updates-issue-94
scottf Mar 1, 2022
0089a72
Merge branch 'main' into kv-updates-issue-94
scottf Mar 1, 2022
6163bf9
Merge branch 'main' into kv-updates-issue-94
scottf Mar 3, 2022
b2b1a59
Merge branch 'main' into kv-updates-issue-94
scottf Mar 7, 2022
51a81dd
addressed comments
scottf Mar 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public KeyValueConfiguration build() {
scBuilder.name(toStreamName(name))
.subjects(toStreamSubject(name))
.allowRollup(true)
.discardPolicy(DiscardPolicy.New)
.denyDelete(true);
return new KeyValueConfiguration(scBuilder.build());
}
Expand Down
15 changes: 2 additions & 13 deletions src/main/java/io/nats/client/api/KeyValueEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.Arrays;

import static io.nats.client.support.NatsJetStreamConstants.MSG_SIZE_HDR;
import static io.nats.client.support.NatsKeyValueUtil.BucketAndKey;
Expand Down Expand Up @@ -131,24 +130,14 @@ public boolean equals(Object o) {

KeyValueEntry that = (KeyValueEntry) o;

if (dataLen != that.dataLen) return false;
if (revision != that.revision) return false;
if (delta != that.delta) return false;
if (!bucketAndKey.equals(that.bucketAndKey)) return false;
if (!Arrays.equals(value, that.value)) return false;
if (!created.equals(that.created)) return false;
return op == that.op;
return bucketAndKey.equals(that.bucketAndKey)
&& revision == that.revision;
}

@Override
public int hashCode() {
int result = bucketAndKey.hashCode();
result = 31 * result + Arrays.hashCode(value);
result = 31 * result + (int) (dataLen ^ (dataLen >>> 32));
result = 31 * result + created.hashCode();
result = 31 * result + (int) (revision ^ (revision >>> 32));
result = 31 * result + (int) (delta ^ (delta >>> 32));
result = 31 * result + op.hashCode();
return result;
}
}
4 changes: 0 additions & 4 deletions src/main/java/io/nats/client/impl/NatsKeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,6 @@ String getStreamName() {
return streamName;
}

String getStreamSubject() {
return streamSubject;
}

/**
* {@inheritDoc}
*/
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/io/nats/client/impl/NatsKeyValueManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
package io.nats.client.impl;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.KeyValueManagement;
import io.nats.client.KeyValueOptions;
import io.nats.client.api.KeyValueConfiguration;
import io.nats.client.api.KeyValueStatus;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.support.Validator;

import java.io.IOException;
Expand All @@ -28,7 +28,7 @@
import static io.nats.client.support.NatsKeyValueUtil.*;

public class NatsKeyValueManagement implements KeyValueManagement {
private final JetStreamManagement jsm;
private final NatsJetStreamManagement jsm;

NatsKeyValueManagement(NatsConnection connection, KeyValueOptions kvo) throws IOException {
jsm = new NatsJetStreamManagement(connection, kvo == null ? null : kvo.getJetStreamOptions());
Expand All @@ -39,7 +39,11 @@ public class NatsKeyValueManagement implements KeyValueManagement {
*/
@Override
public KeyValueStatus create(KeyValueConfiguration config) throws IOException, JetStreamApiException {
return new KeyValueStatus(jsm.addStream(config.getBackingConfig()));
StreamConfiguration sc = config.getBackingConfig();
if ( jsm.conn.getServerInfo().isOlderThanVersion("2.7.2") ) {
sc = StreamConfiguration.builder(sc).discardPolicy(null).build(); // null discard policy will use default
scottf marked this conversation as resolved.
Show resolved Hide resolved
}
return new KeyValueStatus(jsm.addStream(sc));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
import io.nats.client.api.*;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class NatsKeyValueWatchSubscription implements AutoCloseable {
private static final Object dispatcherLock = new Object();
private static NatsDispatcher dispatcher;

private final JetStreamSubscription sub;
private final AtomicBoolean endOfDataSent;

public NatsKeyValueWatchSubscription(NatsKeyValue kv, String keyPattern, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException {
String keySubject = kv.defaultKeySubject(keyPattern);
Expand All @@ -44,18 +42,14 @@ public NatsKeyValueWatchSubscription(NatsKeyValue kv, String keyPattern, KeyValu
}
}

WatchMessageHandler handler = new WatchMessageHandler(watcher, !ignoreDeletes);
if (deliverPolicy == DeliverPolicy.New) {
watcher.endOfData();
endOfDataSent = new AtomicBoolean(true);
handler.sendEndOfData();
}
else {
KeyValueEntry kveCheckPending = kv.getLastMessage(keyPattern);
if (kveCheckPending == null) {
watcher.endOfData();
endOfDataSent = new AtomicBoolean(true);
}
else {
endOfDataSent = new AtomicBoolean(false);
handler.sendEndOfData();
}
}

Expand All @@ -71,19 +65,40 @@ public NatsKeyValueWatchSubscription(NatsKeyValue kv, String keyPattern, KeyValu
.build())
.build();

final boolean includeDeletes = !ignoreDeletes;
MessageHandler handler = m -> {
sub = kv.js.subscribe(keySubject, getDispatcher(kv.js), handler, false, pso);
if (!handler.endOfDataSent) {
long pending = sub.getConsumerInfo().getNumPending() + sub.getConsumerInfo().getDelivered().getConsumerSequence();
if (pending == 0) {
handler.sendEndOfData();
}
}
}

static class WatchMessageHandler implements MessageHandler {
private final KeyValueWatcher watcher;
private final boolean includeDeletes;
boolean endOfDataSent;

public WatchMessageHandler(KeyValueWatcher watcher, boolean includeDeletes) {
this.watcher = watcher;
this.includeDeletes = includeDeletes;
}

@Override
public void onMessage(Message m) throws InterruptedException {
KeyValueEntry kve = new KeyValueEntry(m);
if (includeDeletes || kve.getOperation() == KeyValueOperation.PUT) {
watcher.watch(kve);
}
if (!endOfDataSent.get() && kve.getDelta() == 0) {
watcher.endOfData();
endOfDataSent.set(true);
if (!endOfDataSent && kve.getDelta() == 0) {
sendEndOfData();
}
};
}

sub = kv.js.subscribe(keySubject, getDispatcher(kv.js), handler, false, pso);
private void sendEndOfData() {
endOfDataSent = true;
watcher.endOfData();
}
}

private static Dispatcher getDispatcher(JetStream js) {
Expand Down
13 changes: 9 additions & 4 deletions src/test/java/io/nats/client/api/ServerInfoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import static org.junit.jupiter.api.Assertions.*;

public class ServerInfoTests {
static byte[] nonce = "abcdefg".getBytes(StandardCharsets.UTF_8);
static String encoded = Base64.getUrlEncoder().withoutPadding().encodeToString(nonce);
static String json = ResourceUtils.dataAsString("ServerInfoJson.txt").replace("<encoded>", encoded);

@Test
public void testValidInfoString() {
byte[] nonce = "abcdefg".getBytes(StandardCharsets.UTF_8);
String encoded = Base64.getUrlEncoder().withoutPadding().encodeToString(nonce);
byte[] ascii = encoded.getBytes(StandardCharsets.US_ASCII);

String json = ResourceUtils.dataAsString("ServerInfoJson.txt").replace("<encoded>", encoded);

ServerInfo info = new ServerInfo(json);
assertEquals("serverId", info.getServerId());
assertEquals("serverName", info.getServerName());
Expand All @@ -53,6 +53,11 @@ public void testValidInfoString() {
assertArrayEquals(ascii, info.getNonce());

assertNotNull(info.toString()); // COVERAGE
}

@Test
public void testServerVersionComparisonsWork() {
ServerInfo info = new ServerInfo(json);

ServerInfo info234 = new ServerInfo(json.replace("1.2.3", "2.3.4"));
ServerInfo info235 = new ServerInfo(json.replace("1.2.3", "2.3.5"));
Expand Down
133 changes: 108 additions & 25 deletions src/test/java/io/nats/client/impl/KeyValueTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;

import static io.nats.client.api.KeyValueWatchOption.*;
import static io.nats.client.support.NatsConstants.DOT;
import static org.junit.jupiter.api.Assertions.*;

public class KeyValueTests extends JetStreamTestBase {
Expand All @@ -50,34 +51,20 @@ public void testWorkflow() throws Exception {

// create the bucket
KeyValueConfiguration kvc = KeyValueConfiguration.builder()
.name(BUCKET)
.maxHistoryPerKey(3)
.storageType(StorageType.Memory)
.build();
.name(BUCKET)
.description(PLAIN)
.maxHistoryPerKey(3)
.storageType(StorageType.Memory)
.build();

KeyValueStatus status = kvm.create(kvc);

kvc = status.getConfiguration();
assertEquals(BUCKET, status.getBucketName());
assertEquals(BUCKET, kvc.getBucketName());
assertEquals(NatsKeyValueUtil.toStreamName(BUCKET), kvc.getBackingConfig().getName());
assertEquals(3, status.getMaxHistoryPerKey());
assertEquals(3, kvc.getMaxHistoryPerKey());
assertEquals(-1, status.getMaxBucketSize());
assertEquals(-1, kvc.getMaxBucketSize());
assertEquals(-1, status.getMaxValueSize());
assertEquals(-1, kvc.getMaxValueSize());
assertEquals(Duration.ZERO, status.getTtl());
assertEquals(Duration.ZERO, kvc.getTtl());
assertEquals(StorageType.Memory, status.getStorageType());
assertEquals(StorageType.Memory, kvc.getStorageType());
assertEquals(1, status.getReplicas());
assertEquals(1, kvc.getReplicas());
assertEquals(0, status.getEntryCount());
assertEquals("JetStream", status.getBackingStore());
assertStatus(status);

// get the kv context for the specific bucket
KeyValue kv = nc.keyValue(BUCKET);
assertEquals(BUCKET, kv.getBucketName());
status = kv.getStatus();
assertStatus(status);

// Put some keys. Each key is put in a subject in the bucket (stream)
// The put returns the sequence number in the bucket (stream)
Expand Down Expand Up @@ -301,12 +288,39 @@ public void testWorkflow() throws Exception {
});
}

private void assertStatus(KeyValueStatus status) {
KeyValueConfiguration kvc;
kvc = status.getConfiguration();
assertEquals(BUCKET, status.getBucketName());
assertEquals(BUCKET, kvc.getBucketName());
assertEquals(PLAIN, status.getDescription());
assertEquals(PLAIN, kvc.getDescription());
assertEquals(NatsKeyValueUtil.toStreamName(BUCKET), kvc.getBackingConfig().getName());
assertEquals(3, status.getMaxHistoryPerKey());
assertEquals(3, kvc.getMaxHistoryPerKey());
assertEquals(-1, status.getMaxBucketSize());
assertEquals(-1, kvc.getMaxBucketSize());
assertEquals(-1, status.getMaxValueSize());
assertEquals(-1, kvc.getMaxValueSize());
assertEquals(Duration.ZERO, status.getTtl());
assertEquals(Duration.ZERO, kvc.getTtl());
assertEquals(StorageType.Memory, status.getStorageType());
assertEquals(StorageType.Memory, kvc.getStorageType());
assertEquals(1, status.getReplicas());
assertEquals(1, kvc.getReplicas());
assertEquals(0, status.getEntryCount());
assertEquals("JetStream", status.getBackingStore());

assertTrue(status.toString().contains(BUCKET));
assertTrue(status.toString().contains(PLAIN));
}

@Test
public void testKeys() throws Exception {
runInJsServer(nc -> {
KeyValueManagement kvm = nc.keyValueManagement();

// create bucket 1
// create bucket
kvm.create(KeyValueConfiguration.builder()
.name(BUCKET)
.storageType(StorageType.Memory)
Expand Down Expand Up @@ -810,6 +824,7 @@ else if (expected instanceof String) {
static final String BUCKET_CREATED_BY_USER_A = "bucketA";
static final String BUCKET_CREATED_BY_USER_I = "bucketI";

// TODO Revisit after https://github.com/nats-io/nats-architecture-and-design/issues/95
@Test
public void testWithAccount() throws Exception {

Expand All @@ -825,11 +840,17 @@ public void testWithAccount() throws Exception {
.jetStreamOptions(JetStreamOptions.builder().prefix("jsFromA").build())
.build();

assertEquals("iBucketA.", jsOpt_UserI_BucketA_WithPrefix.getFeaturePrefix());
assertNotNull(jsOpt_UserI_BucketA_WithPrefix.getJetStreamOptions());

KeyValueOptions jsOpt_UserI_BucketI_WithPrefix = KeyValueOptions.builder()
.featurePrefix("iBucketI")
.jetStreamOptions(JetStreamOptions.builder().prefix("jsFromA").build())
.build();

assertEquals("iBucketI.", jsOpt_UserI_BucketI_WithPrefix.getFeaturePrefix());
assertNotNull(jsOpt_UserI_BucketI_WithPrefix.getJetStreamOptions());

KeyValueManagement kvmUserA = connUserA.keyValueManagement();
KeyValueManagement kvmUserIBcktA = connUserI.keyValueManagement(jsOpt_UserI_BucketA_WithPrefix);
KeyValueManagement kvmUserIBcktI = connUserI.keyValueManagement(jsOpt_UserI_BucketI_WithPrefix);
Expand Down Expand Up @@ -965,4 +986,66 @@ private void assertKveAccountGet(KeyValue kvUserA, KeyValue kvUserI, String key,
assertEquals(data, kveUserA.getValueAsString());
assertEquals(KeyValueOperation.PUT, kveUserA.getOperation());
}
}

@Test
public void testCoverBucketAndKey() {
NatsKeyValueUtil.BucketAndKey bak1 = new NatsKeyValueUtil.BucketAndKey(DOT + BUCKET + DOT + KEY);
NatsKeyValueUtil.BucketAndKey bak2 = new NatsKeyValueUtil.BucketAndKey(DOT + BUCKET + DOT + KEY);
NatsKeyValueUtil.BucketAndKey bak3 = new NatsKeyValueUtil.BucketAndKey(DOT + bucket(1) + DOT + KEY);
NatsKeyValueUtil.BucketAndKey bak4 = new NatsKeyValueUtil.BucketAndKey(DOT + BUCKET + DOT + key(1));

assertEquals(BUCKET, bak1.bucket);
assertEquals(KEY, bak1.key);
assertEquals(bak1, bak1);
assertEquals(bak1, bak2);
assertEquals(bak2, bak1);
assertNotEquals(bak1, bak3);
assertNotEquals(bak1, bak4);
assertNotEquals(bak3, bak1);
assertNotEquals(bak4, bak1);

assertFalse(bak4.equals(null));
assertFalse(bak4.equals(new Object()));
}

@Test
public void testKeyValueEntryEqualsImpl() throws Exception {
runInJsServer(nc -> {
KeyValueManagement kvm = nc.keyValueManagement();

// create bucket 1
kvm.create(KeyValueConfiguration.builder()
.name(bucket(1))
.storageType(StorageType.Memory)
.build());

// create bucket 2
kvm.create(KeyValueConfiguration.builder()
.name(bucket(2))
.storageType(StorageType.Memory)
.build());

KeyValue kv1 = nc.keyValue(bucket(1));
KeyValue kv2 = nc.keyValue(bucket(2));
kv1.put(key(1), "ONE");
kv1.put(key(2), "TWO");
kv2.put(key(1), "ONE");

KeyValueEntry kve1_1 = kv1.get(key(1));
KeyValueEntry kve1_2 = kv1.get(key(2));
KeyValueEntry kve2_1 = kv2.get(key(1));

assertEquals(kve1_1, kve1_1);
assertEquals(kve1_1, kv1.get(key(1)));
assertNotEquals(kve1_1, kve1_2);
assertNotEquals(kve1_1, kve2_1);

kv1.put(key(1), "ONE-PRIME");
assertNotEquals(kve1_1, kv1.get(key(1)));

// coverage
assertFalse(kve1_1.equals(null));
assertFalse(kve1_1.equals(new Object()));
});
}
}