Skip to content

Commit

Permalink
Possibly de-race sysRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
neilalexander committed Apr 3, 2023
1 parent 94278e7 commit cad899e
Showing 1 changed file with 31 additions and 35 deletions.
66 changes: 31 additions & 35 deletions server/jetstream_cluster.go
Expand Up @@ -5573,48 +5573,40 @@ var (

// blocking utility call to perform requests on the system account
// returns (synchronized) v or error
func (s *Server) sysRequest(v interface{}, subjFormat string, args ...interface{}) (interface{}, error) {
func sysRequest[T any](s *Server, subjFormat string, args ...interface{}) (*T, error) {
isubj := fmt.Sprintf(subjFormat, args...)

s.mu.Lock()
inbox := s.newRespInbox()
results := make(chan interface{}, 1)
// Store our handler.
s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
if err := json.Unmarshal(msg, v); err != nil {
results := make(chan *T)
s.sys.replies[inbox] = func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) {
var v T
if err := json.Unmarshal(msg, &v); err != nil {
s.Warnf("Error unmarshalling response for request '%s':%v", isubj, err)
return
}
select {
case results <- v:
default:
s.Warnf("Failed placing request response on internal channel")
}
results <- &v
}
s.mu.Unlock()

s.sendInternalMsgLocked(isubj, inbox, nil, nil)

const timeout = 2 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()

var err error
var data interface{}
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
}()

select {
case <-s.quitCh:
err = errReqSrvExit
case <-notActive.C:
err = errReqTimeout
case data = <-results:
return nil, errReqSrvExit
case <-time.After(2 * time.Second):
return nil, errReqTimeout
case data := <-results:
return data, nil
}
// Clean up here.
s.mu.Lock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
s.mu.Unlock()
return data, err
}

func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string) {
Expand Down Expand Up @@ -5708,9 +5700,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
} else {
// Need to release js lock.
js.mu.Unlock()
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
if si, err := sysRequest[StreamInfo](s, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
msg = fmt.Sprintf("error retrieving info: %s", err.Error())
} else if si := si.(*StreamInfo); si != nil {
} else if si != nil {
currentCount := 0
if si.Cluster.Leader != _EMPTY_ {
currentCount++
Expand Down Expand Up @@ -5765,10 +5757,12 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if !s.allPeersOffline(rg) {
// Need to release js lock.
js.mu.Unlock()
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
if si, err := sysRequest[StreamInfo](s, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err)
} else if cl := si.(*StreamInfo).Cluster; cl != nil && cl.Leader != _EMPTY_ {
curLeader = getHash(cl.Leader)
} else if si != nil {
if cl := si.Cluster; cl != nil && cl.Leader != _EMPTY_ {
curLeader = getHash(cl.Leader)
}
}
// Re-acquire here.
js.mu.Lock()
Expand Down Expand Up @@ -6709,10 +6703,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
if !s.allPeersOffline(ca.Group) {
// Need to release js lock.
js.mu.Unlock()
if ci, err := s.sysRequest(&ConsumerInfo{}, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil {
if ci, err := sysRequest[ConsumerInfo](s, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil {
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, sa.Config.Name, cfg.Durable, err)
} else if cl := ci.(*ConsumerInfo).Cluster; cl != nil {
curLeader = getHash(cl.Leader)
} else if ci != nil {
if cl := ci.Cluster; cl != nil {
curLeader = getHash(cl.Leader)
}
}
// Re-acquire here.
js.mu.Lock()
Expand Down

0 comments on commit cad899e

Please sign in to comment.