Skip to content

Commit

Permalink
Record the stream and consumer info timestamps
Browse files Browse the repository at this point in the history
This records the server time when info for streams and
consumers are created so that tools such as the nats cli
can calculate time deltas for last ack, last delivered and
so forth in the context of the server clock.

This will help aleviate problems with client devices experiencing
clock jitter that can show up in user interfaces as negative
seconds since last ack etc

Signed-off-by: R.I.Pienaar <rip@devco.net>
  • Loading branch information
ripienaar committed May 5, 2023
1 parent 69fb3db commit 8dd7474
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 46 deletions.
3 changes: 3 additions & 0 deletions server/consumer.go
Expand Up @@ -55,6 +55,8 @@ type ConsumerInfo struct {
NumPending uint64 `json:"num_pending"`
Cluster *ClusterInfo `json:"cluster,omitempty"`
PushBound bool `json:"push_bound,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"time_stamp"`
}

type ConsumerConfig struct {
Expand Down Expand Up @@ -2394,6 +2396,7 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
NumRedelivered: len(o.rdc),
NumPending: o.checkNumPending(),
PushBound: o.isPushMode() && o.active,
TimeStamp: time.Now().UTC(),
}

// If we are replicated and we are not the leader we need to pull certain data from our store.
Expand Down
50 changes: 30 additions & 20 deletions server/jetstream_api.go
Expand Up @@ -1373,9 +1373,10 @@ func (s *Server) jsStreamCreateRequest(sub *subscription, c *client, _ *Account,
return
}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
TimeStamp: time.Now().UTC(),
}
resp.DidCreate = true
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
Expand Down Expand Up @@ -1461,12 +1462,13 @@ func (s *Server) jsStreamUpdateRequest(sub *subscription, c *client, _ *Account,
}

resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
Expand Down Expand Up @@ -1686,12 +1688,13 @@ func (s *Server) jsStreamListRequest(sub *subscription, c *client, _ *Account, s
for _, mset := range msets[offset:] {
config := mset.config()
resp.Streams = append(resp.Streams, &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Domain: s.getOpts().JetStreamDomain,
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
})
if len(resp.Streams) >= JSApiListLimit {
break
Expand Down Expand Up @@ -1846,6 +1849,7 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Alternates: js.streamAlternates(ci, config.Name),
TimeStamp: time.Now().UTC(),
}
if clusterWideConsCount > 0 {
resp.StreamInfo.State.Consumers = clusterWideConsCount
Expand Down Expand Up @@ -3455,7 +3459,12 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
s.Warnf("Restore failed for %s for stream '%s > %s' in %v",
friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start))
} else {
resp.StreamInfo = &StreamInfo{Created: mset.createdTime(), State: mset.state(), Config: mset.config()}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
TimeStamp: time.Now().UTC(),
}
s.Noticef("Completed restore of %s for stream '%s > %s' in %v",
friendlyBytes(int64(total)), streamName, acc.Name, end.Sub(start))
}
Expand Down Expand Up @@ -4222,10 +4231,11 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
// our config and defaults for state and no cluster info.
if isMember {
resp.ConsumerInfo = &ConsumerInfo{
Stream: ca.Stream,
Name: ca.Name,
Created: ca.Created,
Config: ca.Config,
Stream: ca.Stream,
Name: ca.Name,
Created: ca.Created,
Config: ca.Config,
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
Expand Down
66 changes: 40 additions & 26 deletions server/jetstream_cluster.go
Expand Up @@ -2932,12 +2932,13 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
} else {
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}
resp.DidCreate = true
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
Expand Down Expand Up @@ -3339,12 +3340,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
// Send our response.
var resp = JSApiStreamUpdateResponse{ApiResponse: ApiResponse{Type: JSApiStreamUpdateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Mirror: mset.mirrorInfo(),
Sources: mset.sourcesInfo(),
TimeStamp: time.Now().UTC(),
}

s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
Expand Down Expand Up @@ -3398,12 +3400,13 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
if !recovering {
var resp = JSApiStreamCreateResponse{ApiResponse: ApiResponse{Type: JSApiStreamCreateResponseType}}
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: mset.config(),
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
}
Expand Down Expand Up @@ -6264,7 +6267,12 @@ func (s *Server) jsClusteredStreamListRequest(acc *Account, ci *ClientInfo, filt
for _, sa := range streams {
if s.allPeersOffline(sa.Group) {
// Place offline onto our results by hand here.
si := &StreamInfo{Config: *sa.Config, Created: sa.Created, Cluster: js.offlineClusterInfo(sa.Group)}
si := &StreamInfo{
Config: *sa.Config,
Created: sa.Created,
Cluster: js.offlineClusterInfo(sa.Group),
TimeStamp: time.Now().UTC(),
}
resp.Streams = append(resp.Streams, si)
missingNames = append(missingNames, sa.Config.Name)
} else {
Expand Down Expand Up @@ -6410,7 +6418,12 @@ func (s *Server) jsClusteredConsumerListRequest(acc *Account, ci *ClientInfo, of
for _, ca := range consumers {
if s.allPeersOffline(ca.Group) {
// Place offline onto our results by hand here.
ci := &ConsumerInfo{Config: ca.Config, Created: ca.Created, Cluster: js.offlineClusterInfo(ca.Group)}
ci := &ConsumerInfo{
Config: ca.Config,
Created: ca.Created,
Cluster: js.offlineClusterInfo(ca.Group),
TimeStamp: time.Now().UTC(),
}
resp.Consumers = append(resp.Consumers, ci)
missingNames = append(missingNames, ca.Name)
} else {
Expand Down Expand Up @@ -7993,12 +8006,13 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
}

si := &StreamInfo{
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
Created: mset.createdTime(),
State: mset.state(),
Config: config,
Cluster: js.clusterInfo(mset.raftGroup()),
Sources: mset.sourcesInfo(),
Mirror: mset.mirrorInfo(),
TimeStamp: time.Now().UTC(),
}

// Check for out of band catchups.
Expand Down
2 changes: 2 additions & 0 deletions server/stream.go
Expand Up @@ -136,6 +136,8 @@ type StreamInfo struct {
Mirror *StreamSourceInfo `json:"mirror,omitempty"`
Sources []*StreamSourceInfo `json:"sources,omitempty"`
Alternates []StreamAlternate `json:"alternates,omitempty"`
// TimeStamp indicates when the info was gathered
TimeStamp time.Time `json:"time_stamp"`
}

type StreamAlternate struct {
Expand Down

0 comments on commit 8dd7474

Please sign in to comment.