Skip to content

Commit

Permalink
Record the stream and consumer info timestamps (#4133)
Browse files Browse the repository at this point in the history
This records the server time when info for streams and consumers are
created so that tools such as the nats cli can calculate time deltas for
last ack, last delivered and so forth in the context of the server
clock.

This will help aleviate problems with client devices experiencing clock
jitter that can show up in user interfaces as negative seconds since
last ack etc
  • Loading branch information
ripienaar committed Jun 2, 2023
2 parents c3234dc + fb1d86d commit c24547e
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 46 deletions.
3 changes: 3 additions & 0 deletions server/consumer.go
Expand Up @@ -55,6 +55,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}

type ConsumerConfig struct {
Expand Down Expand Up @@ -2401,6 +2403,7 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumRedelivered: len(o.rdc),
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
Expand Down
50 changes: 30 additions & 20 deletions server/jetstream_api.go
Expand Up @@ -1373,9 +1373,10 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
TimeStamp: time.Now().UTC(),
}
resp.DidCreate = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
Expand Down Expand Up @@ -1461,12 +1462,13 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}

resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
Expand Down Expand Up @@ -1686,12 +1688,13 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
for _, mset := range msets[offset:] {
config := mset.config()
resp.Streams = append(resp.Streams, &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
})
if len(resp.Streams) >= JSApiListLimit {
break
Expand Down Expand Up @@ -1846,6 +1849,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Alternates: js.streamAlternates(ci, config.Name),
TimeStamp: time.Now().UTC(),
}
if clusterWideConsCount > 0 {
resp.StreamInfo.State.Consumers = clusterWideConsCount
Expand Down Expand Up @@ -3455,7 +3459,12 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start))
} else {
resp.StreamInfo = &StreamInfo{Created: mset.createdTime(), State: mset.state(), Config: mset.config()}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
TimeStamp: time.Now().UTC(),
}
s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start))
}
Expand Down Expand Up @@ -4222,10 +4231,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
// our config and defaults for state and no cluster info.
if isMember {
resp.ConsumerInfo = &ConsumerInfo{
Stream: ca.Stream,
Name: ca.Name,
Created: ca.Created,
Config: ca.Config,
Stream: ca.Stream,
Name: ca.Name,
Created: ca.Created,
Config: ca.Config,
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
Expand Down
66 changes: 40 additions & 26 deletions server/jetstream_cluster.go
Expand Up @@ -2971,12 +2971,13 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}
resp.DidCreate = true
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
Expand Down Expand Up @@ -3378,12 +3379,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
// Send our response.
var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
}

s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
Expand Down Expand Up @@ -3437,12 +3439,13 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if !recovering {
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
Expand Down Expand Up @@ -6318,7 +6321,12 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
for _, sa := range streams {
if s.allPeersOffline(sa.Group) {
// Place offline onto our results by hand here.
si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)}
si := &StreamInfo{
Config: *sa.Config,
Created: sa.Created,
Cluster: js.offlineClusterInfo(sa.Group),
TimeStamp: time.Now().UTC(),
}
resp.Streams = append(resp.Streams, si)
missingNames = append(missingNames, sa.Config.Name)
} else {
Expand Down Expand Up @@ -6464,7 +6472,12 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
for _, ca := range consumers {
if s.allPeersOffline(ca.Group) {
// Place offline onto our results by hand here.
ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)}
ci := &ConsumerInfo{
Config: ca.Config,
Created: ca.Created,
Cluster: js.offlineClusterInfo(ca.Group),
TimeStamp: time.Now().UTC(),
}
resp.Consumers = append(resp.Consumers, ci)
missingNames = append(missingNames, ca.Name)
} else {
Expand Down Expand Up @@ -8058,12 +8071,13 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
}

si := &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}

// Check for out of band catchups.
Expand Down
2 changes: 2 additions & 0 deletions server/stream.go
Expand Up @@ -136,6 +136,8 @@ type StreamInfo struct {
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
Alternates []StreamAlternate `json:"alternates,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"ts"`
}

type StreamAlternate struct {
Expand Down

0 comments on commit c24547e

Please sign in to comment.