Skip to content

Commit

Permalink
Panic fixes (#4214)
Browse files Browse the repository at this point in the history
 - [ ] Link to issue, e.g. `Resolves #NNN`
 - [ ] Documentation added (if applicable)
 - [ ] Tests added
- [ ] Branch rebased on top of current main (`git pull --rebase origin
main`)
- [ ] Changes squashed to a single commit (described
[here](http://gitready.com/advanced/2009/02/10/squashing-commits-with-rebase.html))
 - [x] Build is green in Travis CI
- [x] You have certified that the contribution is your original work and
that you license the work to the project under the [Apache 2
license](https://github.com/nats-io/nats-server/blob/main/LICENSE)

Resolves panics in the code.

### Changes proposed in this pull request:

 - This PR fixes some of the panics in the code
  • Loading branch information
derekcollison committed Jun 5, 2023
2 parents 64e3bf8 + 5141b87 commit df5df3c
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 15 deletions.
7 changes: 4 additions & 3 deletions server/accounts.go
Expand Up @@ -1429,7 +1429,7 @@ func (a *Account) sendTrackingLatency(si *serviceImport, responder *client) bool
}
sl.RequestStart = time.Unix(0, si.ts-int64(reqRTT)).UTC()
sl.ServiceLatency = serviceRTT - respRTT
sl.TotalLatency = sl.Requestor.RTT + serviceRTT
sl.TotalLatency = reqRTT + serviceRTT
if respRTT > 0 {
sl.SystemLatency = time.Since(ts)
sl.TotalLatency += sl.SystemLatency
Expand Down Expand Up @@ -3784,10 +3784,11 @@ func (ur *URLAccResolver) Fetch(name string) (string, error) {
return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), err)
} else if resp == nil {
return _EMPTY_, fmt.Errorf("could not fetch <%q>: no response", redactURLString(url))
} else if resp.StatusCode != http.StatusOK {
return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), resp.Status)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return _EMPTY_, fmt.Errorf("could not fetch <%q>: %v", redactURLString(url), resp.Status)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return _EMPTY_, err
Expand Down
4 changes: 3 additions & 1 deletion server/client.go
Expand Up @@ -1989,7 +1989,9 @@ func (c *client) authViolation() {
ErrAuthentication.Error(),
c.opts.Username)
} else {
c.Errorf(ErrAuthentication.Error())
if c.srv != nil {
c.Errorf(ErrAuthentication.Error())
}
}
if c.isMqtt() {
c.mqttEnqueueConnAck(mqttConnAckRCNotAuthorized, false)
Expand Down
3 changes: 3 additions & 0 deletions server/ipqueue.go
Expand Up @@ -104,6 +104,9 @@ func (q *ipQueue[T]) push(e T) int {
// emptied the queue. So the caller should never assume that pop() will
// return a slice of 1 or more, it could return `nil`.
func (q *ipQueue[T]) pop() []T {
if q == nil {
return nil
}
var elts []T
q.Lock()
if q.pos == 0 {
Expand Down
16 changes: 12 additions & 4 deletions server/jetstream_cluster.go
Expand Up @@ -3107,7 +3107,9 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
accStreams = make(map[string]*streamAssignment)
} else if osa := accStreams[stream]; osa != nil && osa != sa {
// Copy over private existing state from former SA.
sa.Group.node = osa.Group.node
if sa.Group != nil {
sa.Group.node = osa.Group.node
}
sa.consumers = osa.consumers
sa.responded = osa.responded
sa.err = osa.err
Expand Down Expand Up @@ -3198,7 +3200,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
}

// Copy over private existing state from former SA.
sa.Group.node = osa.Group.node
if sa.Group != nil {
sa.Group.node = osa.Group.node
}
sa.consumers = osa.consumers
sa.err = osa.err

Expand All @@ -3216,7 +3220,9 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
sa.responded = false
} else {
// Make sure to clean up any old node in case this stream moves back here.
sa.Group.node = nil
if sa.Group != nil {
sa.Group.node = nil
}
}
js.mu.Unlock()

Expand Down Expand Up @@ -3787,7 +3793,9 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
} else if oca := sa.consumers[ca.Name]; oca != nil {
wasExisting = true
// Copy over private existing state from former SA.
ca.Group.node = oca.Group.node
if ca.Group != nil {
ca.Group.node = oca.Group.node
}
ca.responded = oca.responded
ca.err = oca.err
}
Expand Down
2 changes: 1 addition & 1 deletion server/leafnode.go
Expand Up @@ -1422,7 +1422,7 @@ func (s *Server) addLeafNodeConnection(c *client, srvName, clusterName string, c
}
// If we have a specified JetStream domain we will want to add a mapping to
// allow access cross domain for each non-system account.
if opts.JetStreamDomain != _EMPTY_ && acc != sysAcc && opts.JetStream {
if opts.JetStreamDomain != _EMPTY_ && opts.JetStream && acc != nil && acc != sysAcc {
for src, dest := range generateJSMappingTable(opts.JetStreamDomain) {
if err := acc.AddMapping(src, dest); err != nil {
c.Debugf("Error adding JetStream domain mapping: %s", err.Error())
Expand Down
8 changes: 4 additions & 4 deletions server/route.go
Expand Up @@ -1949,12 +1949,12 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error
c.closeConnection(WrongGateway)
return ErrWrongGateway
}
var perms *RoutePermissions
//TODO this check indicates srv may be nil. see srv usage below
if srv != nil {
perms = srv.getOpts().Cluster.Permissions

if srv == nil {
return ErrServerNotRunning
}

perms := srv.getOpts().Cluster.Permissions
clusterName := srv.ClusterName()

// If we have a cluster name set, make sure it matches ours.
Expand Down
5 changes: 4 additions & 1 deletion server/server.go
Expand Up @@ -3560,7 +3560,10 @@ func (s *Server) lameDuckMode() {
numClients := int64(len(s.clients))
batch := 1
// Sleep interval between each client connection close.
si := dur / numClients
var si int64
if numClients != 0 {
si = dur / numClients
}
if si < 1 {
// Should not happen (except in test with very small LD duration), but
// if there are too many clients, batch the number of close and
Expand Down
4 changes: 3 additions & 1 deletion server/stream.go
Expand Up @@ -1788,7 +1788,9 @@ func gatherSourceMirrorSubjects(subjects []string, cfg *StreamConfig, acc *Accou

// Return the subjects for a stream source.
func (a *Account) streamSourceSubjects(ss *StreamSource, seen map[string]bool) (subjects []string, hasExt bool) {
if ss != nil && ss.External != nil {
if ss == nil {
return nil, false
} else if ss.External != nil {
return nil, true
}

Expand Down

0 comments on commit df5df3c

Please sign in to comment.