Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record the stream and consumer info timestamps #4133

Merged
merged 1 commit into from Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions server/consumer.go
Expand Up @@ -55,6 +55,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}

type ConsumerConfig struct {
Expand Down Expand Up @@ -2394,6 +2396,7 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumRedelivered: len(o.rdc),
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
Expand Down
50 changes: 30 additions & 20 deletions server/jetstream_api.go
Expand Up @@ -1373,9 +1373,10 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
TimeStamp: time.Now().UTC(),
}
resp.DidCreate = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
Expand Down Expand Up @@ -1461,12 +1462,13 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}

resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
Expand Down Expand Up @@ -1686,12 +1688,13 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
for _, mset := range msets[offset:] {
config := mset.config()
resp.Streams = append(resp.Streams, &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
})
if len(resp.Streams) >= JSApiListLimit {
break
Expand Down Expand Up @@ -1846,6 +1849,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Alternates: js.streamAlternates(ci, config.Name),
TimeStamp: time.Now().UTC(),
}
if clusterWideConsCount > 0 {
resp.StreamInfo.State.Consumers = clusterWideConsCount
Expand Down Expand Up @@ -3455,7 +3459,12 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start))
} else {
resp.StreamInfo = &StreamInfo{Created: mset.createdTime(), State: mset.state(), Config: mset.config()}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
TimeStamp: time.Now().UTC(),
}
s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start))
}
Expand Down Expand Up @@ -4222,10 +4231,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
// our config and defaults for state and no cluster info.
if isMember {
resp.ConsumerInfo = &ConsumerInfo{
Stream: ca.Stream,
Name: ca.Name,
Created: ca.Created,
Config: ca.Config,
Stream: ca.Stream,
Name: ca.Name,
Created: ca.Created,
Config: ca.Config,
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
Expand Down
66 changes: 40 additions & 26 deletions server/jetstream_cluster.go
Expand Up @@ -2932,12 +2932,13 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}
resp.DidCreate = true
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
Expand Down Expand Up @@ -3339,12 +3340,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
// Send our response.
var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
}

s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
Expand Down Expand Up @@ -3398,12 +3400,13 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if !recovering {
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
Expand Down Expand Up @@ -6264,7 +6267,12 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
for _, sa := range streams {
if s.allPeersOffline(sa.Group) {
// Place offline onto our results by hand here.
si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)}
si := &StreamInfo{
Config: *sa.Config,
Created: sa.Created,
Cluster: js.offlineClusterInfo(sa.Group),
TimeStamp: time.Now().UTC(),
}
resp.Streams = append(resp.Streams, si)
missingNames = append(missingNames, sa.Config.Name)
} else {
Expand Down Expand Up @@ -6410,7 +6418,12 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
for _, ca := range consumers {
if s.allPeersOffline(ca.Group) {
// Place offline onto our results by hand here.
ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)}
ci := &ConsumerInfo{
Config: ca.Config,
Created: ca.Created,
Cluster: js.offlineClusterInfo(ca.Group),
TimeStamp: time.Now().UTC(),
}
resp.Consumers = append(resp.Consumers, ci)
missingNames = append(missingNames, ca.Name)
} else {
Expand Down Expand Up @@ -7993,12 +8006,13 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
}

si := &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}

// Check for out of band catchups.
Expand Down
2 changes: 2 additions & 0 deletions server/stream.go
Expand Up @@ -136,6 +136,8 @@ type StreamInfo struct {
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
Alternates []StreamAlternate `json:"alternates,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}

type StreamAlternate struct {
Expand Down