Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADDED: option to fetch deleted details in stream info #990

Merged
merged 1 commit into from Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions js.go
Expand Up @@ -230,6 +230,8 @@ type jsOpts struct {
shouldTrace bool
// purgeOpts contains optional stream purge options
purgeOpts *StreamPurgeRequest
// streamInfoOpts contains optional stream info options
streamInfoOpts *StreamInfoRequest
}

const (
Expand Down Expand Up @@ -301,6 +303,11 @@ func (s *StreamPurgeRequest) configureJSContext(js *jsOpts) error {
return nil
}

func (s *StreamInfoRequest) configureJSContext(js *jsOpts) error {
js.streamInfoOpts = s
return nil
}

// APIPrefix changes the default prefix used for the JetStream API.
func APIPrefix(pre string) JSOpt {
return jsOptFn(func(js *jsOpts) error {
Expand Down
34 changes: 24 additions & 10 deletions jsm.go
Expand Up @@ -634,7 +634,13 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return resp.StreamInfo, nil
}

type streamInfoResponse = streamCreateResponse
type (
// StreamInfoRequest contains additional option to return details about messages deleted from a stream
StreamInfoRequest struct {
DeletedDetails bool `json:"deleted_details,omitempty"`
}
streamInfoResponse = streamCreateResponse
)

func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
if err := checkStreamName(stream); err != nil {
Expand All @@ -647,9 +653,15 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
if cancel != nil {
defer cancel()
}
var req []byte
if o.streamInfoOpts != nil {
if req, err = json.Marshal(o.streamInfoOpts); err != nil {
return nil, err
}
}
siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

csSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))
r, err := js.apiRequestWithContext(o.ctx, csSubj, nil)
r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -686,13 +698,15 @@ type StreamSourceInfo struct {

// StreamState is information about the given stream.
type StreamState struct {
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
Msgs uint64 `json:"messages"`
Bytes uint64 `json:"bytes"`
FirstSeq uint64 `json:"first_seq"`
FirstTime time.Time `json:"first_ts"`
LastSeq uint64 `json:"last_seq"`
LastTime time.Time `json:"last_ts"`
Consumers int `json:"consumer_count"`
Deleted []uint64 `json:"deleted"`
NumDeleted int `json:"num_deleted"`
}

// ClusterInfo shows information about the underlying set of servers
Expand Down
106 changes: 105 additions & 1 deletion test/js_test.go
Expand Up @@ -1702,6 +1702,110 @@ func TestPurgeStream(t *testing.T) {
}
}

func TestStreamInfo(t *testing.T) {
testData := []string{"one", "two", "three", "four"}

tests := []struct {
name string
stream string
req *nats.StreamInfoRequest
withError error
expectedDeletedDetails []uint64
}{
{
name: "empty request body",
stream: "foo",
},
{
name: "with deleted details",
stream: "foo",
req: &nats.StreamInfoRequest{
DeletedDetails: true,
},
expectedDeletedDetails: []uint64{2, 4},
},
{
name: "with deleted details set to false",
stream: "foo",
req: &nats.StreamInfoRequest{
DeletedDetails: false,
},
},
{
name: "empty stream name",
stream: "",
withError: nats.ErrStreamNameRequired,
},
{
name: "invalid stream name",
stream: "bad.stream.name",
withError: nats.ErrInvalidStreamName,
},
{
name: "stream not found",
stream: "bar",
withError: nats.ErrStreamNotFound,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.A"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for _, msg := range testData {
if _, err := js.Publish("foo.A", []byte(msg)); err != nil {
t.Fatalf("Unexpected error during publish: %v", err)
}
}
if err := js.DeleteMsg("foo", 2); err != nil {
t.Fatalf("Unexpected error while deleting message from stream: %v", err)
}
if err := js.DeleteMsg("foo", 4); err != nil {
t.Fatalf("Unexpected error while deleting message from stream: %v", err)
}

var streamInfo *nats.StreamInfo
if test.req != nil {
streamInfo, err = js.StreamInfo(test.stream, test.req)
} else {
streamInfo, err = js.StreamInfo(test.stream)
}
if test.withError != nil {
if err == nil {
t.Fatal("Expected error, got nil")
}
if !errors.Is(err, test.withError) {
t.Fatalf("Expected error: '%s'; got '%s'", test.withError, err)
}
return
}
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if streamInfo.Config.Name != "foo" {
t.Fatalf("Invalid stream name in StreamInfo response: want: 'foo'; got: '%s'", streamInfo.Config.Name)
}
if streamInfo.State.NumDeleted != 2 {
t.Fatalf("Invalid value for num_deleted in state: want: 2; got: %d", streamInfo.State.NumDeleted)
}
if !reflect.DeepEqual(test.expectedDeletedDetails, streamInfo.State.Deleted) {
t.Fatalf("Invalid value for deleted msgs in state: want: %v; got: %v", test.expectedDeletedDetails, streamInfo.State.Deleted)
}
})
}
}

func TestJetStreamManagement_GetMsg(t *testing.T) {
t.Run("1-node", func(t *testing.T) {
withJSServer(t, testJetStreamManagement_GetMsg)
Expand Down Expand Up @@ -2361,7 +2465,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
Name: sourceName,
Storage: nats.FileStorage,
Sources: []*nats.StreamSource{
&nats.StreamSource{
{
Name: publishSubj,
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
Expand Down