Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possibly de-race sysRequest #4017

Merged
merged 1 commit into from Apr 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 31 additions & 31 deletions server/jetstream_cluster.go
Expand Up @@ -5573,19 +5573,20 @@ var (

// blocking utility call to perform requests on the system account
// returns (synchronized) v or error
func (s *Server) sysRequest(v interface{}, subjFormat string, args ...interface{}) (interface{}, error) {
func sysRequest[T any](s *Server, subjFormat string, args ...interface{}) (*T, error) {
isubj := fmt.Sprintf(subjFormat, args...)

s.mu.Lock()
inbox := s.newRespInbox()
results := make(chan interface{}, 1)
// Store our handler.
s.sys.replies[inbox] = func(sub *subscription, _ *client, _ *Account, subject, _ string, msg []byte) {
if err := json.Unmarshal(msg, v); err != nil {
results := make(chan *T, 1)
s.sys.replies[inbox] = func(_ *subscription, _ *client, _ *Account, _, _ string, msg []byte) {
var v T
if err := json.Unmarshal(msg, &v); err != nil {
s.Warnf("Error unmarshalling response for request '%s':%v", isubj, err)
return
}
select {
case results <- v:
case results <- &v:
default:
s.Warnf("Failed placing request response on internal channel")
}
Expand All @@ -5594,27 +5595,22 @@ func (s *Server) sysRequest(v interface{}, subjFormat string, args ...interface{

s.sendInternalMsgLocked(isubj, inbox, nil, nil)

const timeout = 2 * time.Second
notActive := time.NewTimer(timeout)
defer notActive.Stop()

var err error
var data interface{}
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
}()

select {
case <-s.quitCh:
err = errReqSrvExit
case <-notActive.C:
err = errReqTimeout
case data = <-results:
return nil, errReqSrvExit
case <-time.After(2 * time.Second):
return nil, errReqTimeout
case data := <-results:
return data, nil
}
// Clean up here.
s.mu.Lock()
if s.sys != nil && s.sys.replies != nil {
delete(s.sys.replies, inbox)
}
s.mu.Unlock()
return data, err
}

func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, subject, reply string, rmsg []byte, cfg *StreamConfig, peerSet []string) {
Expand Down Expand Up @@ -5708,9 +5704,9 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
} else {
// Need to release js lock.
js.mu.Unlock()
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
if si, err := sysRequest[StreamInfo](s, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
msg = fmt.Sprintf("error retrieving info: %s", err.Error())
} else if si := si.(*StreamInfo); si != nil {
} else if si != nil {
currentCount := 0
if si.Cluster.Leader != _EMPTY_ {
currentCount++
Expand Down Expand Up @@ -5765,10 +5761,12 @@ func (s *Server) jsClusteredStreamUpdateRequest(ci *ClientInfo, acc *Account, su
if !s.allPeersOffline(rg) {
// Need to release js lock.
js.mu.Unlock()
if si, err := s.sysRequest(&StreamInfo{}, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
if si, err := sysRequest[StreamInfo](s, clusterStreamInfoT, ci.serviceAccount(), cfg.Name); err != nil {
s.Warnf("Did not receive stream info results for '%s > %s' due to: %s", acc, cfg.Name, err)
} else if cl := si.(*StreamInfo).Cluster; cl != nil && cl.Leader != _EMPTY_ {
curLeader = getHash(cl.Leader)
} else if si != nil {
if cl := si.Cluster; cl != nil && cl.Leader != _EMPTY_ {
curLeader = getHash(cl.Leader)
}
}
// Re-acquire here.
js.mu.Lock()
Expand Down Expand Up @@ -6709,10 +6707,12 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
if !s.allPeersOffline(ca.Group) {
// Need to release js lock.
js.mu.Unlock()
if ci, err := s.sysRequest(&ConsumerInfo{}, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil {
if ci, err := sysRequest[ConsumerInfo](s, clusterConsumerInfoT, ci.serviceAccount(), sa.Config.Name, cfg.Durable); err != nil {
s.Warnf("Did not receive consumer info results for '%s > %s > %s' due to: %s", acc, sa.Config.Name, cfg.Durable, err)
} else if cl := ci.(*ConsumerInfo).Cluster; cl != nil {
curLeader = getHash(cl.Leader)
} else if ci != nil {
if cl := ci.Cluster; cl != nil {
curLeader = getHash(cl.Leader)
}
}
// Re-acquire here.
js.mu.Lock()
Expand Down