Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues 10, 91 StreamInfo request changes to retrieve subjects, deleted details #563

Merged
merged 7 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/NATS.Client/JetStream/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ internal static class ApiConstants
internal const string Created = "created";
internal const string Current = "current";
internal const string Data = "data";
internal const string Deleted = "deleted";
internal const string DeletedDetails = "deleted_details";
internal const string Deliver = "deliver";
internal const string DeliverGroup = "deliver_group";
internal const string DeliverPolicy = "deliver_policy";
Expand Down Expand Up @@ -95,9 +97,11 @@ internal static class ApiConstants
internal const string NoAck = "no_ack";
internal const string Nonce = "nonce";
internal const string NumAckPending = "num_ack_pending";
internal const string NumDeleted = "num_deleted";
internal const string NumPending = "num_pending";
internal const string NumRedelivered = "num_redelivered";
internal const string NumReplicas = "num_replicas";
internal const string NumSubjects = "num_subjects";
internal const string NumWaiting = "num_waiting";
internal const string Offline = "offline";
internal const string Offset = "offset";
Expand Down Expand Up @@ -128,6 +132,7 @@ internal static class ApiConstants
internal const string Streams = "streams";
internal const string Subject = "subject";
internal const string Subjects = "subjects";
internal const string SubjectsFilter = "subjects_filter";
internal const string Success = "success";
internal const string Tags = "tags";
internal const string TemplateOwner = "template_owner";
Expand Down
13 changes: 12 additions & 1 deletion src/NATS.Client/JetStream/IJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,25 @@ public interface IJetStreamManagement
/// <param name="streamName">The name of the stream.</param>
/// <returns>true if the delete succeeded. Usually throws a NATSJetStreamException otherwise</returns>
bool DeleteStream(string streamName);

/// <summary>
/// Get information about a stream.
/// Does not retrieve any optional data.
/// See the overloaded version that accepts StreamInfoOptions
/// </summary>
/// <param name="streamName">The name of the stream.</param>
/// <returns>Stream information</returns>
StreamInfo GetStreamInfo(string streamName);

/// <summary>
/// Get information about a stream, and include optional information
/// as defined in the StreamInfoOptions.
/// </summary>
/// <param name="streamName">The name of the stream.</param>
/// <param name="options">the stream info options. If null, request will not return any optional data.</param>
/// <returns>Stream information</returns>
StreamInfo GetStreamInfo(string streamName, StreamInfoOptions options);

/// <summary>
/// Purges all messages in a stream.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private string LookupStreamBySubject(string subject)

private string LookupStreamSubject(string stream)
{
StreamInfo si = GetStreamInfoInternal(stream);
StreamInfo si = GetStreamInfoInternal(stream, null);
return si.Config.Subjects.Count == 1 ? si.Config.Subjects[0] : null;
}

Expand Down
5 changes: 3 additions & 2 deletions src/NATS.Client/JetStream/JetStreamBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ internal ConsumerInfo AddOrUpdateConsumerInternal(string streamName, ConsumerCon
return new ConsumerInfo(m, true);
}

public StreamInfo GetStreamInfoInternal(string streamName)
public StreamInfo GetStreamInfoInternal(string streamName, StreamInfoOptions options)
{
byte[] payload = options == null ? null : options.Serialize();
string subj = string.Format(JetStreamConstants.JsapiStreamInfo, streamName);
Msg m = RequestResponseRequired(subj, null, Timeout);
Msg m = RequestResponseRequired(subj, payload, Timeout);
return new StreamInfo(m, true);
}

Expand Down
8 changes: 7 additions & 1 deletion src/NATS.Client/JetStream/JetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ public bool DeleteStream(string streamName)
public StreamInfo GetStreamInfo(string streamName)
{
Validator.ValidateStreamName(streamName, true);
return GetStreamInfoInternal(streamName);
return GetStreamInfoInternal(streamName, null);
}

public StreamInfo GetStreamInfo(string streamName, StreamInfoOptions options)
{
Validator.ValidateStreamName(streamName, true);
return GetStreamInfoInternal(streamName, options);
}

public PurgeResponse PurgeStream(string streamName)
Expand Down
77 changes: 77 additions & 0 deletions src/NATS.Client/JetStream/StreamInfoOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using NATS.Client.Internals.SimpleJSON;

namespace NATS.Client.JetStream
{
public sealed class StreamInfoOptions : JsonSerializable
{
public string SubjectsFilter { get; }
public bool DeletedDetails { get; }

internal StreamInfoOptions(string subjectsFilter, bool deletedDetails)
{
SubjectsFilter = subjectsFilter;
DeletedDetails = deletedDetails;
}

internal override JSONNode ToJsonNode()
{
return new JSONObject
{
[ApiConstants.SubjectsFilter] = SubjectsFilter,
[ApiConstants.DeletedDetails] = DeletedDetails
};
}

/// <summary>
/// Gets the stream info options builder.
/// </summary>
/// <returns>
/// The builder
/// </returns>
public static StreamInfoOptionsBuilder Builder()
{
return new StreamInfoOptionsBuilder();
}

public sealed class StreamInfoOptionsBuilder
{
public string _subjectsFilter;
public bool _deletedDetails;

public StreamInfoOptionsBuilder WithFilterSubjects(string subjectsFilter)
{
_subjectsFilter = subjectsFilter;
return this;
}

public StreamInfoOptionsBuilder WithAllSubjects() {
_subjectsFilter = ">";
return this;
}

public StreamInfoOptionsBuilder WithDeletedDetails()
{
_deletedDetails = true;
return this;
}

public StreamInfoOptions Build()
{
return new StreamInfoOptions(_subjectsFilter, _deletedDetails);
}
}
}
}
18 changes: 18 additions & 0 deletions src/NATS.Client/JetStream/StreamState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using System.Collections.Generic;
using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;

Expand All @@ -24,8 +25,12 @@ public sealed class StreamState
public ulong FirstSeq { get; }
public ulong LastSeq { get; }
public long ConsumerCount { get; }
public long SubjectCount { get; }
public long DeletedCount { get; }
public DateTime FirstTime { get; }
public DateTime LastTime { get; }
public IList<Subject> Subjects { get; }
public IList<ulong> Deleted { get; }

internal static StreamState OptionalInstance(JSONNode streamState)
{
Expand All @@ -39,8 +44,19 @@ private StreamState(JSONNode streamState)
FirstSeq = streamState[ApiConstants.FirstSeq].AsUlong;
LastSeq = streamState[ApiConstants.LastSeq].AsUlong;
ConsumerCount = streamState[ApiConstants.ConsumerCount].AsLong;
SubjectCount = streamState[ApiConstants.NumSubjects].AsLong;
DeletedCount = streamState[ApiConstants.NumDeleted].AsLong;
FirstTime = JsonUtils.AsDate(streamState[ApiConstants.FirstTs]);
LastTime = JsonUtils.AsDate(streamState[ApiConstants.LastTs]);
Subjects = Subject.OptionalListOf(streamState[ApiConstants.Subjects]);

Deleted = new List<ulong>();
JSONNode.Enumerator e =
streamState[ApiConstants.Deleted].AsArray.GetEnumerator();
while (e.MoveNext())
{
Deleted.Add(e.Current.Value.AsUlong);
}
}

public override string ToString()
Expand All @@ -51,6 +67,8 @@ public override string ToString()
", FirstSeq=" + FirstSeq +
", LastSeq=" + LastSeq +
", ConsumerCount=" + ConsumerCount +
", SubjectCount=" + SubjectCount +
", DeletedCount=" + DeletedCount +
", FirstTime=" + FirstTime +
", LastTime=" + LastTime +
'}';
Expand Down
48 changes: 48 additions & 0 deletions src/NATS.Client/JetStream/Subject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using NATS.Client.Internals.SimpleJSON;

namespace NATS.Client.JetStream
{
public sealed class Subject
{
public string Name { get; }
public long Count { get; }

internal static IList<Subject> OptionalListOf(JSONNode subjectsNode)
{
if (subjectsNode == null)
{
return null;
}

List<Subject> list = new List<Subject>();
JSONNode.Enumerator e = subjectsNode.GetEnumerator();
while (e.MoveNext())
{
KeyValuePair<String, JSONNode> pair = e.Current;
list.Add(new Subject(pair.Key, pair.Value.AsLong));
}
return list.Count == 0 ? null : list;
}

public Subject(string name, long count)
{
Name = name;
Count = count;
}
}
}
4 changes: 4 additions & 0 deletions src/Tests/IntegrationTests/JetStreamTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public static void CreateMemoryStream(IJetStreamManagement jsm, string streamNam
return js.Publish(new Msg(SUBJECT, DataBytes()));
}

public static PublishAck JsPublish(IJetStream js, string subject, string data) {
return js.Publish(new Msg(subject, Encoding.ASCII.GetBytes(data)));
}

public static IList<Msg> ReadMessagesAck(ISyncSubscription sub)
{
IList<Msg> messages = new List<Msg>();
Expand Down
52 changes: 51 additions & 1 deletion src/Tests/IntegrationTests/TestJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,60 @@ public void TestGetStreamInfo()
{
IJetStreamManagement jsm = c.CreateJetStreamManagementContext();
Assert.Throws<NATSJetStreamException>(() => jsm.GetStreamInfo(STREAM));
CreateDefaultTestStream(c);

String[] subjects = new String[6];
for (int x = 0; x < 5; x++) {
subjects[x] = Subject(x);
}
subjects[5] = "foo.>";
CreateMemoryStream(jsm, STREAM, subjects);

IList<PublishAck> packs = new List<PublishAck>();
IJetStream js = c.CreateJetStreamContext();
for (int x = 0; x < 5; x++) {
JsPublish(js, Subject(x), x + 1);
PublishAck pa = JsPublish(js, Subject(x), Data(x + 2));
packs.Add(pa);
jsm.DeleteMessage(STREAM, pa.Seq);
}
JsPublish(js, "foo.bar", 6);

StreamInfo si = jsm.GetStreamInfo(STREAM);
Assert.Equal(STREAM, si.Config.Name);
Assert.Equal(6, si.State.SubjectCount);
Assert.Null(si.State.Subjects);
Assert.Equal(5, si.State.DeletedCount);
Assert.Empty(si.State.Deleted);

si = jsm.GetStreamInfo(STREAM,
StreamInfoOptions.Builder()
.WithAllSubjects()
.WithDeletedDetails()
.Build());
Assert.Equal(STREAM, si.Config.Name);
Assert.Equal(6, si.State.SubjectCount);
IList<Subject> list = si.State.Subjects;
Assert.NotNull(list);
Assert.Equal(6, list.Count);
Assert.Equal(5, si.State.DeletedCount);
Assert.Equal(5, si.State.Deleted.Count);
Dictionary<string, Subject> map = new Dictionary<string, Subject>();
foreach (Subject su in list) {
map[su.Name] = su;
}
for (int x = 0; x < 5; x++)
{
Subject subj = map[Subject(x)];
Assert.NotNull(subj);
Assert.Equal(x + 1, subj.Count);
}
Subject s = map["foo.bar"];
Assert.NotNull(s);
Assert.Equal(6, s.Count);

foreach (PublishAck pa in packs) {
Assert.True(si.State.Deleted.Contains(pa.Seq));
}
});
}

Expand Down
8 changes: 7 additions & 1 deletion src/Tests/UnitTests/Data/StreamInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@
"first_ts": "0001-01-01T00:00:00Z",
"last_seq": 14,
"last_ts": "2021-01-01T00:00:00Z",
"consumer_count": 15
"consumer_count": 15,
"num_subjects": 3,
"subjects": {
"sub0": 1,
"sub1": 2,
"x.foo": 3
}
},
"cluster": {
"name": "clustername",
Expand Down
21 changes: 21 additions & 0 deletions src/Tests/UnitTests/JetStream/TestStreamInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections.Generic;
using NATS.Client.JetStream;
using Xunit;

Expand Down Expand Up @@ -52,6 +53,26 @@ public void JsonIsReadProperly()
Assert.Equal(13ul, si.State.FirstSeq);
Assert.Equal(14ul, si.State.LastSeq);
Assert.Equal(15, si.State.ConsumerCount);
Assert.Equal(3, si.State.SubjectCount);
Assert.Equal(3, si.State.Subjects.Count);

Dictionary<string, Subject> map = new Dictionary<string, Subject>();
foreach (Subject su in si.State.Subjects) {
map[su.Name] = su;
}

Subject s = map["sub0"];
Assert.NotNull(s);
Assert.Equal(1, s.Count);

s = map["sub1"];
Assert.NotNull(s);
Assert.Equal(2, s.Count);

s = map["x.foo"];
Assert.NotNull(s);
Assert.Equal(3, s.Count);

Assert.Equal(AsDateTime("0001-01-01T00:00:00Z"), si.State.FirstTime);
Assert.Equal(AsDateTime("2021-01-01T00:00:00Z"), si.State.LastTime);

Expand Down