Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issues 10, 91 StreamInfo request changes to retrieve subjects, deleted details #563

Merged
merged 7 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/NATS.Client/JetStream/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ internal static class ApiConstants
internal const string NumPending = "num_pending";
internal const string NumRedelivered = "num_redelivered";
internal const string NumReplicas = "num_replicas";
internal const string NumSubjects = "num_subjects";
internal const string NumWaiting = "num_waiting";
internal const string Offline = "offline";
internal const string Offset = "offset";
Expand Down Expand Up @@ -128,6 +129,7 @@ internal static class ApiConstants
internal const string Streams = "streams";
internal const string Subject = "subject";
internal const string Subjects = "subjects";
internal const string SubjectsFilter = "subjects_filter";
internal const string Success = "success";
internal const string Tags = "tags";
internal const string TemplateOwner = "template_owner";
Expand Down
9 changes: 8 additions & 1 deletion src/NATS.Client/JetStream/IJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,21 @@ public interface IJetStreamManagement
/// <returns>true if the delete succeeded. Usually throws a NATSJetStreamException otherwise</returns>
bool DeleteStream(string streamName);


/// <summary>
/// Get information about a stream.
/// </summary>
/// <param name="streamName">The name of the stream.</param>
/// <returns>Stream information</returns>
StreamInfo GetStreamInfo(string streamName);

/// <summary>
/// Get information about a stream, and include subject info.
/// </summary>
/// <param name="streamName">The name of the stream.</param>
/// <param name="subjectFilter">the subject filter to use. Can be wildcard. Null is equivalent to all subjects, the same as &gt;.</param>
/// <returns>Stream information</returns>
StreamInfo GetStreamInfoWithSubjectInfo(string streamName, string subjectFilter);
scottf marked this conversation as resolved.
Show resolved Hide resolved

/// <summary>
/// Purges all messages in a stream.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private string LookupStreamBySubject(string subject)

private string LookupStreamSubject(string stream)
{
StreamInfo si = GetStreamInfoInternal(stream);
StreamInfo si = GetStreamInfoInternal(stream, null);
return si.Config.Subjects.Count == 1 ? si.Config.Subjects[0] : null;
}

Expand Down
5 changes: 3 additions & 2 deletions src/NATS.Client/JetStream/JetStreamBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ internal ConsumerInfo AddOrUpdateConsumerInternal(string streamName, ConsumerCon
return new ConsumerInfo(m, true);
}

public StreamInfo GetStreamInfoInternal(string streamName)
public StreamInfo GetStreamInfoInternal(string streamName, string subjectFilter)
{
byte[] payload = subjectFilter == null ? null : StreamInfoRequest.FilterSubjects(subjectFilter);
string subj = string.Format(JetStreamConstants.JsapiStreamInfo, streamName);
Msg m = RequestResponseRequired(subj, null, Timeout);
Msg m = RequestResponseRequired(subj, payload, Timeout);
return new StreamInfo(m, true);
}

Expand Down
11 changes: 10 additions & 1 deletion src/NATS.Client/JetStream/JetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,16 @@ public bool DeleteStream(string streamName)
public StreamInfo GetStreamInfo(string streamName)
{
Validator.ValidateStreamName(streamName, true);
return GetStreamInfoInternal(streamName);
return GetStreamInfoInternal(streamName, null);
}

public StreamInfo GetStreamInfoWithSubjectInfo(string streamName, string subjectFilter)
{
Validator.ValidateStreamName(streamName, true);
if (subjectFilter == null) {
return GetStreamInfoInternal(streamName, ">");
}
return GetStreamInfoInternal(streamName, subjectFilter);
}

public PurgeResponse PurgeStream(string streamName)
Expand Down
30 changes: 30 additions & 0 deletions src/NATS.Client/JetStream/StreamInfoRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using NATS.Client.Internals.SimpleJSON;

namespace NATS.Client.JetStream
{
public sealed class StreamInfoRequest : JsonSerializable
{
public string SubjectsFilter { get; }

internal static byte[] FilterSubjects(string subjectsFilter) {
return new StreamInfoRequest(subjectsFilter).Serialize();
}

internal static byte[] AllSubjects() {
return new StreamInfoRequest(">").Serialize();
}

public StreamInfoRequest(string subjectsFilter)
{
SubjectsFilter = subjectsFilter;
}

internal override JSONNode ToJsonNode()
{
return new JSONObject
{
[ApiConstants.SubjectsFilter] = SubjectsFilter
};
}
}
}
6 changes: 6 additions & 0 deletions src/NATS.Client/JetStream/StreamState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using System.Collections.Generic;
using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;

Expand All @@ -24,8 +25,10 @@ public sealed class StreamState
public ulong FirstSeq { get; }
public ulong LastSeq { get; }
public long ConsumerCount { get; }
public long SubjectCount { get; }
public DateTime FirstTime { get; }
public DateTime LastTime { get; }
public IList<Subject> Subjects { get; }

internal static StreamState OptionalInstance(JSONNode streamState)
{
Expand All @@ -39,8 +42,10 @@ private StreamState(JSONNode streamState)
FirstSeq = streamState[ApiConstants.FirstSeq].AsUlong;
LastSeq = streamState[ApiConstants.LastSeq].AsUlong;
ConsumerCount = streamState[ApiConstants.ConsumerCount].AsLong;
SubjectCount = streamState[ApiConstants.NumSubjects].AsLong;
FirstTime = JsonUtils.AsDate(streamState[ApiConstants.FirstTs]);
LastTime = JsonUtils.AsDate(streamState[ApiConstants.LastTs]);
Subjects = Subject.OptionalListOf(streamState[ApiConstants.Subjects]);
}

public override string ToString()
Expand All @@ -51,6 +56,7 @@ public override string ToString()
", FirstSeq=" + FirstSeq +
", LastSeq=" + LastSeq +
", ConsumerCount=" + ConsumerCount +
", SubjectCount=" + SubjectCount +
", FirstTime=" + FirstTime +
", LastTime=" + LastTime +
'}';
Expand Down
35 changes: 35 additions & 0 deletions src/NATS.Client/JetStream/Subject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using NATS.Client.Internals.SimpleJSON;

namespace NATS.Client.JetStream
{
public sealed class Subject
{
public string Name { get; }
public long Count { get; }

internal static IList<Subject> OptionalListOf(JSONNode subjectsNode)
{
if (subjectsNode == null)
{
return null;
}

List<Subject> list = new List<Subject>();
JSONNode.Enumerator e = subjectsNode.GetEnumerator();
while (e.MoveNext())
{
KeyValuePair<String, JSONNode> pair = e.Current;
list.Add(new Subject(pair.Key, pair.Value.AsLong));
}
return list.Count == 0 ? null : list;
}

public Subject(string name, long count)
{
Name = name;
Count = count;
}
}
}
36 changes: 35 additions & 1 deletion src/Tests/IntegrationTests/TestJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,44 @@ public void TestGetStreamInfo()
{
IJetStreamManagement jsm = c.CreateJetStreamManagementContext();
Assert.Throws<NATSJetStreamException>(() => jsm.GetStreamInfo(STREAM));
CreateDefaultTestStream(c);

String[] subjects = new String[6];
for (int x = 0; x < 5; x++) {
subjects[x] = Subject(x);
}
subjects[5] = "foo.>";
CreateMemoryStream(jsm, STREAM, subjects);

IJetStream js = c.CreateJetStreamContext();
for (int x = 0; x < 5; x++) {
JsPublish(js, Subject(x), x + 1);
}
JsPublish(js, "foo.bar", 6);

StreamInfo si = jsm.GetStreamInfo(STREAM);
Assert.Equal(STREAM, si.Config.Name);
Assert.Equal(6, si.State.SubjectCount);
Assert.Null(si.State.Subjects);

si = jsm.GetStreamInfoWithSubjectInfo(STREAM, null);
Assert.Equal(STREAM, si.Config.Name);
Assert.Equal(6, si.State.SubjectCount);
IList<Subject> list = si.State.Subjects;
Assert.NotNull(list);
Assert.Equal(6, list.Count);
Dictionary<string, Subject> map = new Dictionary<string, Subject>();
foreach (Subject su in list) {
map[su.Name] = su;
}
for (int x = 0; x < 5; x++)
{
Subject subj = map[Subject(x)];
Assert.NotNull(subj);
Assert.Equal(x + 1, subj.Count);
}
Subject s = map["foo.bar"];
Assert.NotNull(s);
Assert.Equal(6, s.Count);
});
}

Expand Down
8 changes: 7 additions & 1 deletion src/Tests/UnitTests/Data/StreamInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@
"first_ts": "0001-01-01T00:00:00Z",
"last_seq": 14,
"last_ts": "2021-01-01T00:00:00Z",
"consumer_count": 15
"consumer_count": 15,
"num_subjects": 3,
"subjects": {
"sub0": 1,
"sub1": 2,
"x.foo": 3
}
},
"cluster": {
"name": "clustername",
Expand Down
22 changes: 22 additions & 0 deletions src/Tests/UnitTests/JetStream/TestStreamInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections.Generic;
using NATS.Client.JetStream;
using Xunit;

Expand Down Expand Up @@ -52,6 +53,27 @@ public void JsonIsReadProperly()
Assert.Equal(13ul, si.State.FirstSeq);
Assert.Equal(14ul, si.State.LastSeq);
Assert.Equal(15, si.State.ConsumerCount);

Assert.Equal(3, si.State.SubjectCount);
Assert.Equal(3, si.State.Subjects.Count);

Dictionary<string, Subject> map = new Dictionary<string, Subject>();
foreach (Subject su in si.State.Subjects) {
map[su.Name] = su;
}

Subject s = map["sub0"];
Assert.NotNull(s);
Assert.Equal(1, s.Count);

s = map["sub1"];
Assert.NotNull(s);
Assert.Equal(2, s.Count);

s = map["x.foo"];
Assert.NotNull(s);
Assert.Equal(3, s.Count);

Assert.Equal(AsDateTime("0001-01-01T00:00:00Z"), si.State.FirstTime);
Assert.Equal(AsDateTime("2021-01-01T00:00:00Z"), si.State.LastTime);

Expand Down