Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kv parity issues 94, 95, 96 #599

Merged
merged 22 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4f07e7f
kv updated delivery policy and pending! instead of getLastMessage for…
scottf Feb 16, 2022
e5d977e
kv updated delivery policy and pending! instead of getLastMessage for…
scottf Feb 16, 2022
4ebf19d
kv updated delivery policy and pending! instead of getLastMessage for…
scottf Feb 16, 2022
915bab8
shoring up test coverage
scottf Feb 16, 2022
eae97ca
shoring up test coverage
scottf Feb 16, 2022
2ad90d2
Merge branch 'main' into kv-updates-issue-94
scottf Feb 16, 2022
d54f8ab
shoring up test coverage
scottf Feb 16, 2022
eee3db0
shoring up test coverage
scottf Feb 16, 2022
e180db1
issue 96 purge deletes
scottf Feb 17, 2022
0dd4ad8
issue 96 purge deletes fix test
scottf Feb 17, 2022
349843f
issue 95 use of prefix as specified in ADR
scottf Feb 18, 2022
ef4fdbe
issue 95 use of prefix as specified in ADR
scottf Feb 18, 2022
8c12d93
updated licence
scottf Feb 23, 2022
780b964
Merge branch 'main' into kv-updates-issue-94
scottf Feb 23, 2022
38cb5af
Merge branch 'main' into kv-updates-issue-94
scottf Feb 28, 2022
b352a52
doh missing last line in files
scottf Feb 28, 2022
29cdbab
Merge remote-tracking branch 'origin/kv-updates-issue-94' into kv-upd…
scottf Feb 28, 2022
4c23814
Merge branch 'main' into kv-updates-issue-94
scottf Mar 1, 2022
0089a72
Merge branch 'main' into kv-updates-issue-94
scottf Mar 1, 2022
6163bf9
Merge branch 'main' into kv-updates-issue-94
scottf Mar 3, 2022
b2b1a59
Merge branch 'main' into kv-updates-issue-94
scottf Mar 7, 2022
51a81dd
addressed comments
scottf Mar 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public KeyValueConfiguration build() {
scBuilder.name(toStreamName(name))
.subjects(toStreamSubject(name))
.allowRollup(true)
.discardPolicy(DiscardPolicy.New)
.denyDelete(true);
return new KeyValueConfiguration(scBuilder.build());
}
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/io/nats/client/impl/NatsKeyValueManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
package io.nats.client.impl;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.KeyValueManagement;
import io.nats.client.KeyValueOptions;
import io.nats.client.api.KeyValueConfiguration;
import io.nats.client.api.KeyValueStatus;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.support.Validator;

import java.io.IOException;
Expand All @@ -28,7 +28,7 @@
import static io.nats.client.support.NatsKeyValueUtil.*;

public class NatsKeyValueManagement implements KeyValueManagement {
private final JetStreamManagement jsm;
private final NatsJetStreamManagement jsm;

NatsKeyValueManagement(NatsConnection connection, KeyValueOptions kvo) throws IOException {
jsm = new NatsJetStreamManagement(connection, kvo == null ? null : kvo.getJetStreamOptions());
Expand All @@ -39,7 +39,11 @@ public class NatsKeyValueManagement implements KeyValueManagement {
*/
@Override
public KeyValueStatus create(KeyValueConfiguration config) throws IOException, JetStreamApiException {
return new KeyValueStatus(jsm.addStream(config.getBackingConfig()));
StreamConfiguration sc = config.getBackingConfig();
if ( jsm.conn.getServerInfo().isOlderThanVersion("2.7.2") ) {
sc = StreamConfiguration.builder(sc).discardPolicy(null).build(); // null discard policy will use default
scottf marked this conversation as resolved.
Show resolved Hide resolved
}
return new KeyValueStatus(jsm.addStream(sc));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
import io.nats.client.api.*;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class NatsKeyValueWatchSubscription implements AutoCloseable {
private static final Object dispatcherLock = new Object();
private static NatsDispatcher dispatcher;

private final JetStreamSubscription sub;
private final AtomicBoolean endOfDataSent;

public NatsKeyValueWatchSubscription(NatsKeyValue kv, String keyPattern, KeyValueWatcher watcher, KeyValueWatchOption... watchOptions) throws IOException, JetStreamApiException {
String keySubject = kv.defaultKeySubject(keyPattern);
Expand All @@ -44,18 +42,14 @@ public NatsKeyValueWatchSubscription(NatsKeyValue kv, String keyPattern, KeyValu
}
}

WatchMessageHandler handler = new WatchMessageHandler(watcher, !ignoreDeletes);
if (deliverPolicy == DeliverPolicy.New) {
watcher.endOfData();
endOfDataSent = new AtomicBoolean(true);
handler.sendEndOfData();
}
else {
KeyValueEntry kveCheckPending = kv.getLastMessage(keyPattern);
if (kveCheckPending == null) {
watcher.endOfData();
endOfDataSent = new AtomicBoolean(true);
}
else {
endOfDataSent = new AtomicBoolean(false);
handler.sendEndOfData();
}
}

Expand All @@ -71,19 +65,40 @@ public NatsKeyValueWatchSubscription(NatsKeyValue kv, String keyPattern, KeyValu
.build())
.build();

final boolean includeDeletes = !ignoreDeletes;
MessageHandler handler = m -> {
sub = kv.js.subscribe(keySubject, getDispatcher(kv.js), handler, false, pso);
if (!handler.endOfDataSent) {
long pending = sub.getConsumerInfo().getNumPending() + sub.getConsumerInfo().getDelivered().getConsumerSequence();
if (pending == 0) {
handler.sendEndOfData();
}
}
}

static class WatchMessageHandler implements MessageHandler {
private final KeyValueWatcher watcher;
private final boolean includeDeletes;
boolean endOfDataSent;

public WatchMessageHandler(KeyValueWatcher watcher, boolean includeDeletes) {
this.watcher = watcher;
this.includeDeletes = includeDeletes;
}

@Override
public void onMessage(Message m) throws InterruptedException {
KeyValueEntry kve = new KeyValueEntry(m);
if (includeDeletes || kve.getOperation() == KeyValueOperation.PUT) {
watcher.watch(kve);
}
if (!endOfDataSent.get() && kve.getDelta() == 0) {
watcher.endOfData();
endOfDataSent.set(true);
if (!endOfDataSent && kve.getDelta() == 0) {
sendEndOfData();
}
};
}

sub = kv.js.subscribe(keySubject, getDispatcher(kv.js), handler, false, pso);
private void sendEndOfData() {
endOfDataSent = true;
watcher.endOfData();
}
}

private static Dispatcher getDispatcher(JetStream js) {
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/io/nats/client/api/ServerInfoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void testValidInfoString() {
String encoded = Base64.getUrlEncoder().withoutPadding().encodeToString(nonce);
byte[] ascii = encoded.getBytes(StandardCharsets.US_ASCII);

String json = ResourceUtils.dataAsString("ServerInfoJson.txt").replace("<encoded>", encoded);
String json = ResourceUtils.dataAsString("ServerInfoJson.txt");

ServerInfo info = new ServerInfo(json);
assertEquals("serverId", info.getServerId());
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/io/nats/client/impl/KeyValueTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,8 @@ else if (expected instanceof String) {
static final String BUCKET_CREATED_BY_USER_A = "bucketA";
static final String BUCKET_CREATED_BY_USER_I = "bucketI";

@Test
// TODO Implement https://github.com/nats-io/nats-architecture-and-design/issues/95
// @Test
public void testWithAccount() throws Exception {

try (NatsTestServer ts = new NatsTestServer("src/test/resources/kv_account.conf", false)) {
Expand Down