Skip to content

Commit

Permalink
Issues 10, 91 StreamInfo request changes to retrieve subjects, delete…
Browse files Browse the repository at this point in the history
…d details (#563)
  • Loading branch information
scottf committed Mar 18, 2022
1 parent 7a8072e commit 2ee4f9f
Show file tree
Hide file tree
Showing 12 changed files with 254 additions and 7 deletions.
5 changes: 5 additions & 0 deletions src/NATS.Client/JetStream/ApiConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ internal static class ApiConstants
internal const string Created = "created";
internal const string Current = "current";
internal const string Data = "data";
internal const string Deleted = "deleted";
internal const string DeletedDetails = "deleted_details";
internal const string Deliver = "deliver";
internal const string DeliverGroup = "deliver_group";
internal const string DeliverPolicy = "deliver_policy";
Expand Down Expand Up @@ -95,9 +97,11 @@ internal static class ApiConstants
internal const string NoAck = "no_ack";
internal const string Nonce = "nonce";
internal const string NumAckPending = "num_ack_pending";
internal const string NumDeleted = "num_deleted";
internal const string NumPending = "num_pending";
internal const string NumRedelivered = "num_redelivered";
internal const string NumReplicas = "num_replicas";
internal const string NumSubjects = "num_subjects";
internal const string NumWaiting = "num_waiting";
internal const string Offline = "offline";
internal const string Offset = "offset";
Expand Down Expand Up @@ -128,6 +132,7 @@ internal static class ApiConstants
internal const string Streams = "streams";
internal const string Subject = "subject";
internal const string Subjects = "subjects";
internal const string SubjectsFilter = "subjects_filter";
internal const string Success = "success";
internal const string Tags = "tags";
internal const string TemplateOwner = "template_owner";
Expand Down
13 changes: 12 additions & 1 deletion src/NATS.Client/JetStream/IJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,25 @@ public interface IJetStreamManagement
/// <param name="streamName">The name of the stream.</param>
/// <returns>true if the delete succeeded. Usually throws a NATSJetStreamException otherwise</returns>
bool DeleteStream(string streamName);

/// <summary>
/// Get information about a stream.
/// Does not retrieve any optional data.
/// See the overloaded version that accepts StreamInfoOptions
/// </summary>
/// <param name="streamName">The name of the stream.</param>
/// <returns>Stream information</returns>
StreamInfo GetStreamInfo(string streamName);

/// <summary>
/// Get information about a stream, and include optional information
/// as defined in the StreamInfoOptions.
/// </summary>
/// <param name="streamName">The name of the stream.</param>
/// <param name="options">the stream info options. If null, request will not return any optional data.</param>
/// <returns>Stream information</returns>
StreamInfo GetStreamInfo(string streamName, StreamInfoOptions options);

/// <summary>
/// Purges all messages in a stream.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client/JetStream/JetStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private string LookupStreamBySubject(string subject)

private string LookupStreamSubject(string stream)
{
StreamInfo si = GetStreamInfoInternal(stream);
StreamInfo si = GetStreamInfoInternal(stream, null);
return si.Config.Subjects.Count == 1 ? si.Config.Subjects[0] : null;
}

Expand Down
5 changes: 3 additions & 2 deletions src/NATS.Client/JetStream/JetStreamBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ internal ConsumerInfo AddOrUpdateConsumerInternal(string streamName, ConsumerCon
return new ConsumerInfo(m, true);
}

public StreamInfo GetStreamInfoInternal(string streamName)
public StreamInfo GetStreamInfoInternal(string streamName, StreamInfoOptions options)
{
byte[] payload = options == null ? null : options.Serialize();
string subj = string.Format(JetStreamConstants.JsapiStreamInfo, streamName);
Msg m = RequestResponseRequired(subj, null, Timeout);
Msg m = RequestResponseRequired(subj, payload, Timeout);
return new StreamInfo(m, true);
}

Expand Down
8 changes: 7 additions & 1 deletion src/NATS.Client/JetStream/JetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,13 @@ public bool DeleteStream(string streamName)
public StreamInfo GetStreamInfo(string streamName)
{
Validator.ValidateStreamName(streamName, true);
return GetStreamInfoInternal(streamName);
return GetStreamInfoInternal(streamName, null);
}

public StreamInfo GetStreamInfo(string streamName, StreamInfoOptions options)
{
Validator.ValidateStreamName(streamName, true);
return GetStreamInfoInternal(streamName, options);
}

public PurgeResponse PurgeStream(string streamName)
Expand Down
77 changes: 77 additions & 0 deletions src/NATS.Client/JetStream/StreamInfoOptions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using NATS.Client.Internals.SimpleJSON;

namespace NATS.Client.JetStream
{
public sealed class StreamInfoOptions : JsonSerializable
{
public string SubjectsFilter { get; }
public bool DeletedDetails { get; }

internal StreamInfoOptions(string subjectsFilter, bool deletedDetails)
{
SubjectsFilter = subjectsFilter;
DeletedDetails = deletedDetails;
}

internal override JSONNode ToJsonNode()
{
return new JSONObject
{
[ApiConstants.SubjectsFilter] = SubjectsFilter,
[ApiConstants.DeletedDetails] = DeletedDetails
};
}

/// <summary>
/// Gets the stream info options builder.
/// </summary>
/// <returns>
/// The builder
/// </returns>
public static StreamInfoOptionsBuilder Builder()
{
return new StreamInfoOptionsBuilder();
}

public sealed class StreamInfoOptionsBuilder
{
public string _subjectsFilter;
public bool _deletedDetails;

public StreamInfoOptionsBuilder WithFilterSubjects(string subjectsFilter)
{
_subjectsFilter = subjectsFilter;
return this;
}

public StreamInfoOptionsBuilder WithAllSubjects() {
_subjectsFilter = ">";
return this;
}

public StreamInfoOptionsBuilder WithDeletedDetails()
{
_deletedDetails = true;
return this;
}

public StreamInfoOptions Build()
{
return new StreamInfoOptions(_subjectsFilter, _deletedDetails);
}
}
}
}
18 changes: 18 additions & 0 deletions src/NATS.Client/JetStream/StreamState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// limitations under the License.

using System;
using System.Collections.Generic;
using NATS.Client.Internals;
using NATS.Client.Internals.SimpleJSON;

Expand All @@ -24,8 +25,12 @@ public sealed class StreamState
public ulong FirstSeq { get; }
public ulong LastSeq { get; }
public long ConsumerCount { get; }
public long SubjectCount { get; }
public long DeletedCount { get; }
public DateTime FirstTime { get; }
public DateTime LastTime { get; }
public IList<Subject> Subjects { get; }
public IList<ulong> Deleted { get; }

internal static StreamState OptionalInstance(JSONNode streamState)
{
Expand All @@ -39,8 +44,19 @@ private StreamState(JSONNode streamState)
FirstSeq = streamState[ApiConstants.FirstSeq].AsUlong;
LastSeq = streamState[ApiConstants.LastSeq].AsUlong;
ConsumerCount = streamState[ApiConstants.ConsumerCount].AsLong;
SubjectCount = streamState[ApiConstants.NumSubjects].AsLong;
DeletedCount = streamState[ApiConstants.NumDeleted].AsLong;
FirstTime = JsonUtils.AsDate(streamState[ApiConstants.FirstTs]);
LastTime = JsonUtils.AsDate(streamState[ApiConstants.LastTs]);
Subjects = Subject.OptionalListOf(streamState[ApiConstants.Subjects]);

Deleted = new List<ulong>();
JSONNode.Enumerator e =
streamState[ApiConstants.Deleted].AsArray.GetEnumerator();
while (e.MoveNext())
{
Deleted.Add(e.Current.Value.AsUlong);
}
}

public override string ToString()
Expand All @@ -51,6 +67,8 @@ public override string ToString()
", FirstSeq=" + FirstSeq +
", LastSeq=" + LastSeq +
", ConsumerCount=" + ConsumerCount +
", SubjectCount=" + SubjectCount +
", DeletedCount=" + DeletedCount +
", FirstTime=" + FirstTime +
", LastTime=" + LastTime +
'}';
Expand Down
48 changes: 48 additions & 0 deletions src/NATS.Client/JetStream/Subject.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using NATS.Client.Internals.SimpleJSON;

namespace NATS.Client.JetStream
{
public sealed class Subject
{
public string Name { get; }
public long Count { get; }

internal static IList<Subject> OptionalListOf(JSONNode subjectsNode)
{
if (subjectsNode == null)
{
return null;
}

List<Subject> list = new List<Subject>();
JSONNode.Enumerator e = subjectsNode.GetEnumerator();
while (e.MoveNext())
{
KeyValuePair<String, JSONNode> pair = e.Current;
list.Add(new Subject(pair.Key, pair.Value.AsLong));
}
return list.Count == 0 ? null : list;
}

public Subject(string name, long count)
{
Name = name;
Count = count;
}
}
}
4 changes: 4 additions & 0 deletions src/Tests/IntegrationTests/JetStreamTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public static void CreateMemoryStream(IJetStreamManagement jsm, string streamNam
return js.Publish(new Msg(SUBJECT, DataBytes()));
}

public static PublishAck JsPublish(IJetStream js, string subject, string data) {
return js.Publish(new Msg(subject, Encoding.ASCII.GetBytes(data)));
}

public static IList<Msg> ReadMessagesAck(ISyncSubscription sub)
{
IList<Msg> messages = new List<Msg>();
Expand Down
52 changes: 51 additions & 1 deletion src/Tests/IntegrationTests/TestJetStreamManagement.cs
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,60 @@ public void TestGetStreamInfo()
{
IJetStreamManagement jsm = c.CreateJetStreamManagementContext();
Assert.Throws<NATSJetStreamException>(() => jsm.GetStreamInfo(STREAM));
CreateDefaultTestStream(c);
String[] subjects = new String[6];
for (int x = 0; x < 5; x++) {
subjects[x] = Subject(x);
}
subjects[5] = "foo.>";
CreateMemoryStream(jsm, STREAM, subjects);
IList<PublishAck> packs = new List<PublishAck>();
IJetStream js = c.CreateJetStreamContext();
for (int x = 0; x < 5; x++) {
JsPublish(js, Subject(x), x + 1);
PublishAck pa = JsPublish(js, Subject(x), Data(x + 2));
packs.Add(pa);
jsm.DeleteMessage(STREAM, pa.Seq);
}
JsPublish(js, "foo.bar", 6);
StreamInfo si = jsm.GetStreamInfo(STREAM);
Assert.Equal(STREAM, si.Config.Name);
Assert.Equal(6, si.State.SubjectCount);
Assert.Null(si.State.Subjects);
Assert.Equal(5, si.State.DeletedCount);
Assert.Empty(si.State.Deleted);
si = jsm.GetStreamInfo(STREAM,
StreamInfoOptions.Builder()
.WithAllSubjects()
.WithDeletedDetails()
.Build());
Assert.Equal(STREAM, si.Config.Name);
Assert.Equal(6, si.State.SubjectCount);
IList<Subject> list = si.State.Subjects;
Assert.NotNull(list);
Assert.Equal(6, list.Count);
Assert.Equal(5, si.State.DeletedCount);
Assert.Equal(5, si.State.Deleted.Count);
Dictionary<string, Subject> map = new Dictionary<string, Subject>();
foreach (Subject su in list) {
map[su.Name] = su;
}
for (int x = 0; x < 5; x++)
{
Subject subj = map[Subject(x)];
Assert.NotNull(subj);
Assert.Equal(x + 1, subj.Count);
}
Subject s = map["foo.bar"];
Assert.NotNull(s);
Assert.Equal(6, s.Count);
foreach (PublishAck pa in packs) {
Assert.True(si.State.Deleted.Contains(pa.Seq));
}
});
}

Expand Down
8 changes: 7 additions & 1 deletion src/Tests/UnitTests/Data/StreamInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@
"first_ts": "0001-01-01T00:00:00Z",
"last_seq": 14,
"last_ts": "2021-01-01T00:00:00Z",
"consumer_count": 15
"consumer_count": 15,
"num_subjects": 3,
"subjects": {
"sub0": 1,
"sub1": 2,
"x.foo": 3
}
},
"cluster": {
"name": "clustername",
Expand Down
21 changes: 21 additions & 0 deletions src/Tests/UnitTests/JetStream/TestStreamInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Collections.Generic;
using NATS.Client.JetStream;
using Xunit;

Expand Down Expand Up @@ -52,6 +53,26 @@ public void JsonIsReadProperly()
Assert.Equal(13ul, si.State.FirstSeq);
Assert.Equal(14ul, si.State.LastSeq);
Assert.Equal(15, si.State.ConsumerCount);
Assert.Equal(3, si.State.SubjectCount);
Assert.Equal(3, si.State.Subjects.Count);

Dictionary<string, Subject> map = new Dictionary<string, Subject>();
foreach (Subject su in si.State.Subjects) {
map[su.Name] = su;
}

Subject s = map["sub0"];
Assert.NotNull(s);
Assert.Equal(1, s.Count);

s = map["sub1"];
Assert.NotNull(s);
Assert.Equal(2, s.Count);

s = map["x.foo"];
Assert.NotNull(s);
Assert.Equal(3, s.Count);

Assert.Equal(AsDateTime("0001-01-01T00:00:00Z"), si.State.FirstTime);
Assert.Equal(AsDateTime("2021-01-01T00:00:00Z"), si.State.LastTime);

Expand Down

0 comments on commit 2ee4f9f

Please sign in to comment.