Skip to content

Commit

Permalink
Merge pull request #1118 from nats-io/stream-alternates
Browse files Browse the repository at this point in the history
Support stream alternates
  • Loading branch information
derekcollison committed Nov 2, 2022
2 parents e227e17 + 6bd7837 commit fbeee79
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
20 changes: 14 additions & 6 deletions jsm.go
Expand Up @@ -867,12 +867,20 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {

// StreamInfo shows config and current state for this stream.
type StreamInfo struct {
Config StreamConfig `json:"config"`
Created time.Time `json:"created"`
State StreamState `json:"state"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
Config StreamConfig `json:"config"`
Created time.Time `json:"created"`
State StreamState `json:"state"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
Alternates []*StreamAlternate `json:"alternates,omitempty"`
}

// StreamAlternate is an alternate stream represented by a mirror.
type StreamAlternate struct {
Name string `json:"name"`
Domain string `json:"domain,omitempty"`
Cluster string `json:"cluster"`
}

// StreamSourceInfo shows information about an upstream stream source.
Expand Down
6 changes: 3 additions & 3 deletions kv.go
Expand Up @@ -1013,7 +1013,7 @@ func (js *js) KeyValueStoreNames() <-chan string {
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "KV_") {
if !strings.HasPrefix(info.Config.Name, kvBucketNamePre) {
continue
}
ch <- info.Config.Name
Expand All @@ -1033,10 +1033,10 @@ func (js *js) KeyValueStores() <-chan KeyValueStatus {
defer close(ch)
for l.Next() {
for _, info := range l.Page() {
if !strings.HasPrefix(info.Config.Name, "KV_") {
if !strings.HasPrefix(info.Config.Name, kvBucketNamePre) {
continue
}
ch <- &KeyValueBucketStatus{nfo: info, bucket: strings.TrimPrefix(info.Config.Name, "KV_")}
ch <- &KeyValueBucketStatus{nfo: info, bucket: strings.TrimPrefix(info.Config.Name, kvBucketNamePre)}
}
}
}()
Expand Down
29 changes: 29 additions & 0 deletions test/js_test.go
Expand Up @@ -8288,3 +8288,32 @@ func TestJetStreamCreateStreamDiscardPolicy(t *testing.T) {
})
}
}

func TestJetStreamStreamInfoAlternates(t *testing.T) {
withJSCluster(t, "R3S", 3, func(t *testing.T, nodes ...*jsServer) {
nc, js := jsClient(t, nodes[0].Server)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
expectOk(t, err)

// Create a mirror as well.
_, err = js.AddStream(&nats.StreamConfig{
Name: "MIRROR",
Mirror: &nats.StreamSource{
Name: "TEST",
},
})
expectOk(t, err)

si, err := js.StreamInfo("TEST")
expectOk(t, err)

if len(si.Alternates) != 2 {
t.Fatalf("Expected 2 alternates, got %d", len(si.Alternates))
}
})
}

0 comments on commit fbeee79

Please sign in to comment.