Skip to content

Commit

Permalink
Possibly de-race sysRequest (#4017)
Browse files Browse the repository at this point in the history
This may fix a race condition in `sysRequest` where multiple inbox
responses could try to mutate the same input object, so instead we'll
let it instantiate its own, although it isn't clear to me yet why that
would happen in the first place.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison committed Apr 4, 2023
2 parents d5a525b + 03a5a4d commit 208f462
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions server/jetstream_cluster.go
Expand Up @@ -5573,19 +5573,20 @@ var (

// blocking utility call to perform requests on the system account
// returns (synchronized) v or error
func (s *Server) sysRequest(v interface{}, subjFormat string, args ...interface{}) (interface{}, error) {
func sysRequest[T any](s *Server, subjFormat string, args ...interface{}) (*T, error) {
isubj := fmt.Sprintf(subjFormat, args...)

s.mu.Lock()
inbox := s.newRespInbox()
results := make(chan interface{}, 1)
// Store our handler.
s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
if err := json.Unmarshal(msg, v); err != nil {
results := make(chan *T, 1)
s.sys.replies[inbox] = func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) {
var v T
if err := json.Unmarshal(msg, &v); err != nil {
s.Warnf("Error unmarshalling response for request '%s':%v", isubj, err)
return
}
select {
case results <- v:
case results <- &v:
default:
s.Warnf("Failed placing request response on internal channel")
}
Expand All @@ -5594,27 +5595,22 @@ func (s *Server) sysRequest(v interface{}, subjFormat string, args ...interface{

s.sendInternalMsgLocked(isubj, inbox, nil, nil)

const timeout = 2 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()

var err error
var data interface{}
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
}()

select {
case <-s.quitCh:
err = errReqSrvExit
case <-notActive.C:
err = errReqTimeout
case data = <-results:
return nil, errReqSrvExit
case <-time.After(2 * time.Second):
return nil, errReqTimeout
case data := <-results:
return data, nil
}
// Clean up here.
s.mu.Lock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
s.mu.Unlock()
return data, err
}

func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string) {
Expand Down Expand Up @@ -5708,9 +5704,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
} else {
// Need to release js lock.
js.mu.Unlock()
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
if si, err := sysRequest[StreamInfo](s, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
msg = fmt.Sprintf("error retrieving info: %s", err.Error())
} else if si := si.(*StreamInfo); si != nil {
} else if si != nil {
currentCount := 0
if si.Cluster.Leader != _EMPTY_ {
currentCount++
Expand Down Expand Up @@ -5765,10 +5761,12 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if !s.allPeersOffline(rg) {
// Need to release js lock.
js.mu.Unlock()
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
if si, err := sysRequest[StreamInfo](s, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err)
} else if cl := si.(*StreamInfo).Cluster; cl != nil && cl.Leader != _EMPTY_ {
curLeader = getHash(cl.Leader)
} else if si != nil {
if cl := si.Cluster; cl != nil && cl.Leader != _EMPTY_ {
curLeader = getHash(cl.Leader)
}
}
// Re-acquire here.
js.mu.Lock()
Expand Down Expand Up @@ -6709,10 +6707,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
if !s.allPeersOffline(ca.Group) {
// Need to release js lock.
js.mu.Unlock()
if ci, err := s.sysRequest(&ConsumerInfo{}, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil {
if ci, err := sysRequest[ConsumerInfo](s, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil {
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, sa.Config.Name, cfg.Durable, err)
} else if cl := ci.(*ConsumerInfo).Cluster; cl != nil {
curLeader = getHash(cl.Leader)
} else if ci != nil {
if cl := ci.Cluster; cl != nil {
curLeader = getHash(cl.Leader)
}
}
// Re-acquire here.
js.mu.Lock()
Expand Down

0 comments on commit 208f462

Please sign in to comment.