Skip to content

Commit

Permalink
Merge pull request #990 from nats-io/stream-info-deleted-details
Browse files Browse the repository at this point in the history
ADDED: option to fetch deleted details in stream info
  • Loading branch information
piotrpio committed Jun 7, 2022
2 parents 5a292d0 + 4a1666c commit fe748b3
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 11 deletions.
7 changes: 7 additions & 0 deletions js.go
Expand Up @@ -230,6 +230,8 @@ type jsOpts struct {
shouldTrace bool
// purgeOpts contains optional stream purge options
purgeOpts *StreamPurgeRequest
// streamInfoOpts contains optional stream info options
streamInfoOpts *StreamInfoRequest
}

const (
Expand Down Expand Up @@ -301,6 +303,11 @@ func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error {
return nil
}

func (s *StreamInfoRequest) configureJSContext(js *jsOpts) error {
js.streamInfoOpts = s
return nil
}

// APIPrefix changes the default prefix used for the JetStream API.
func APIPrefix(pre string) JSOpt {
return jsOptFn(func(js *jsOpts) error {
Expand Down
34 changes: 24 additions & 10 deletions jsm.go
Expand Up @@ -634,7 +634,13 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return resp.StreamInfo, nil
}

type streamInfoResponse = streamCreateResponse
type (
// StreamInfoRequest contains additional option to return details about messages deleted from a stream
StreamInfoRequest struct {
DeletedDetails bool `json:"deleted_details,omitempty"`
}
streamInfoResponse = streamCreateResponse
)

func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
if err := checkStreamName(stream); err != nil {
Expand All @@ -647,9 +653,15 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
if cancel != nil {
defer cancel()
}
var req []byte
if o.streamInfoOpts != nil {
if req, err = json.Marshal(o.streamInfoOpts); err != nil {
return nil, err
}
}
siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))
r, err := js.apiRequestWithContext(o.ctx, csSubj, nil)
r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -686,13 +698,15 @@ type StreamSourceInfo struct {

// StreamState is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
Deleted []uint64 `json:"deleted"`
NumDeleted int `json:"num_deleted"`
}

// ClusterInfo shows information about the underlying set of servers
Expand Down
106 changes: 105 additions & 1 deletion test/js_test.go
Expand Up @@ -1702,6 +1702,110 @@ func TestPurgeStream(t *testing.T) {
}
}

func TestStreamInfo(t *testing.T) {
testData := []string{"one", "two", "three", "four"}

tests := []struct {
name string
stream string
req *nats.StreamInfoRequest
withError error
expectedDeletedDetails []uint64
}{
{
name: "empty request body",
stream: "foo",
},
{
name: "with deleted details",
stream: "foo",
req: &nats.StreamInfoRequest{
DeletedDetails: true,
},
expectedDeletedDetails: []uint64{2, 4},
},
{
name: "with deleted details set to false",
stream: "foo",
req: &nats.StreamInfoRequest{
DeletedDetails: false,
},
},
{
name: "empty stream name",
stream: "",
withError: nats.ErrStreamNameRequired,
},
{
name: "invalid stream name",
stream: "bad.stream.name",
withError: nats.ErrInvalidStreamName,
},
{
name: "stream not found",
stream: "bar",
withError: nats.ErrStreamNotFound,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.A"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for _, msg := range testData {
if _, err := js.Publish("foo.A", []byte(msg)); err != nil {
t.Fatalf("Unexpected error during publish: %v", err)
}
}
if err := js.DeleteMsg("foo", 2); err != nil {
t.Fatalf("Unexpected error while deleting message from stream: %v", err)
}
if err := js.DeleteMsg("foo", 4); err != nil {
t.Fatalf("Unexpected error while deleting message from stream: %v", err)
}

var streamInfo *nats.StreamInfo
if test.req != nil {
streamInfo, err = js.StreamInfo(test.stream, test.req)
} else {
streamInfo, err = js.StreamInfo(test.stream)
}
if test.withError != nil {
if err == nil {
t.Fatal("Expected error, got nil")
}
if !errors.Is(err, test.withError) {
t.Fatalf("Expected error: '%s'; got '%s'", test.withError, err)
}
return
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if streamInfo.Config.Name != "foo" {
t.Fatalf("Invalid stream name in StreamInfo response: want: 'foo'; got: '%s'", streamInfo.Config.Name)
}
if streamInfo.State.NumDeleted != 2 {
t.Fatalf("Invalid value for num_deleted in state: want: 2; got: %d", streamInfo.State.NumDeleted)
}
if !reflect.DeepEqual(test.expectedDeletedDetails, streamInfo.State.Deleted) {
t.Fatalf("Invalid value for deleted msgs in state: want: %v; got: %v", test.expectedDeletedDetails, streamInfo.State.Deleted)
}
})
}
}

func TestJetStreamManagement_GetMsg(t *testing.T) {
t.Run("1-node", func(t *testing.T) {
withJSServer(t, testJetStreamManagement_GetMsg)
Expand Down Expand Up @@ -2361,7 +2465,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
Name: sourceName,
Storage: nats.FileStorage,
Sources: []*nats.StreamSource{
&nats.StreamSource{
{
Name: publishSubj,
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
Expand Down

0 comments on commit fe748b3

Please sign in to comment.