Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Panic fixes #4214

Merged
merged 4 commits into from Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions server/accounts.go
Expand Up @@ -1429,7 +1429,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool
}
sl.RequestStart = time.Unix(0, si.ts-int64(reqRTT)).UTC()
sl.ServiceLatency = serviceRTT - respRTT
sl.TotalLatency = sl.Requestor.RTT + serviceRTT
sl.TotalLatency = reqRTT + serviceRTT
if respRTT > 0 {
sl.SystemLatency = time.Since(ts)
sl.TotalLatency += sl.SystemLatency
Expand Down Expand Up @@ -3784,10 +3784,11 @@ func (ur *URLAccResolver) Fetch(name string) (string, error) {
return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), err)
} else if resp == nil {
return _EMPTY_, fmt.Errorf("could not fetch <%q>: no response", redactURLString(url))
} else if resp.StatusCode != http.StatusOK {
return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), resp.Status)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), resp.Status)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return _EMPTY_, err
Expand Down
4 changes: 3 additions & 1 deletion server/client.go
Expand Up @@ -1989,7 +1989,9 @@ func (c *client) authViolation() {
ErrAuthentication.Error(),
c.opts.Username)
} else {
c.Errorf(ErrAuthentication.Error())
if c.srv != nil {
c.Errorf(ErrAuthentication.Error())
}
}
if c.isMqtt() {
c.mqttEnqueueConnAck(mqttConnAckRCNotAuthorized, false)
Expand Down
3 changes: 3 additions & 0 deletions server/ipqueue.go
Expand Up @@ -104,6 +104,9 @@ func (q *ipQueue[T]) push(e T) int {
// emptied the queue. So the caller should never assume that pop() will
// return a slice of 1 or more, it could return `nil`.
func (q *ipQueue[T]) pop() []T {
if q == nil {
return nil
}
var elts []T
q.Lock()
if q.pos == 0 {
Expand Down
16 changes: 12 additions & 4 deletions server/jetstream_cluster.go
Expand Up @@ -3107,7 +3107,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
accStreams = make(map[string]*streamAssignment)
} else if osa := accStreams[stream]; osa != nil && osa != sa {
// Copy over private existing state from former SA.
sa.Group.node = osa.Group.node
if sa.Group != nil {
sa.Group.node = osa.Group.node
}
sa.consumers = osa.consumers
sa.responded = osa.responded
sa.err = osa.err
Expand Down Expand Up @@ -3198,7 +3200,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
}

// Copy over private existing state from former SA.
sa.Group.node = osa.Group.node
if sa.Group != nil {
sa.Group.node = osa.Group.node
}
sa.consumers = osa.consumers
sa.err = osa.err

Expand All @@ -3216,7 +3220,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
sa.responded = false
} else {
// Make sure to clean up any old node in case this stream moves back here.
sa.Group.node = nil
if sa.Group != nil {
sa.Group.node = nil
}
}
js.mu.Unlock()

Expand Down Expand Up @@ -3784,7 +3790,9 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
} else if oca := sa.consumers[ca.Name]; oca != nil {
wasExisting = true
// Copy over private existing state from former SA.
ca.Group.node = oca.Group.node
if ca.Group != nil {
ca.Group.node = oca.Group.node
}
ca.responded = oca.responded
ca.err = oca.err
}
Expand Down
2 changes: 1 addition & 1 deletion server/leafnode.go
Expand Up @@ -1422,7 +1422,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
}
// If we have a specified JetStream domain we will want to add a mapping to
// allow access cross domain for each non-system account.
if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && opts.JetStream {
if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
if err := acc.AddMapping(src, dest); err != nil {
c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
Expand Down
8 changes: 4 additions & 4 deletions server/route.go
Expand Up @@ -1949,12 +1949,12 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error
c.closeConnection(WrongGateway)
return ErrWrongGateway
}
var perms *RoutePermissions
//TODO this check indicates srv may be nil. see srv usage below
if srv != nil {
perms = srv.getOpts().Cluster.Permissions

if srv == nil {
return ErrServerNotRunning
}

perms := srv.getOpts().Cluster.Permissions
clusterName := srv.ClusterName()

// If we have a cluster name set, make sure it matches ours.
Expand Down
5 changes: 4 additions & 1 deletion server/server.go
Expand Up @@ -3560,7 +3560,10 @@ func (s *Server) lameDuckMode() {
numClients := int64(len(s.clients))
batch := 1
// Sleep interval between each client connection close.
si := dur / numClients
var si int64
if numClients != 0 {
si = dur / numClients
}
if si < 1 {
// Should not happen (except in test with very small LD duration), but
// if there are too many clients, batch the number of close and
Expand Down
4 changes: 3 additions & 1 deletion server/stream.go
Expand Up @@ -1788,7 +1788,9 @@ func gatherSourceMirrorSubjects(subjects []string, cfg *StreamConfig, acc *Accou

// Return the subjects for a stream source.
func (a *Account) streamSourceSubjects(ss *StreamSource, seen map[string]bool) (subjects []string, hasExt bool) {
if ss != nil && ss.External != nil {
if ss == nil {
return nil, false
} else if ss.External != nil {
return nil, true
}

Expand Down