Skip to content

Commit

Permalink
Merge pull request #3922 from nats-io/stress-perf
Browse files Browse the repository at this point in the history
[IMPROVED] Performance and Stability under heavy IO loads.
  • Loading branch information
derekcollison committed Feb 28, 2023
2 parents daadbc0 + 1956fa3 commit d920ca6
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 222 deletions.
62 changes: 35 additions & 27 deletions server/consumer.go
Expand Up @@ -1138,8 +1138,9 @@ func (o *consumer) setLeader(isLeader bool) {
}
}

// This is coming on the wire so do not block here.
func (o *consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
o.infoWithSnapAndReply(false, reply)
go o.infoWithSnapAndReply(false, reply)
}

// Lock should be held.
Expand Down Expand Up @@ -1378,7 +1379,7 @@ func (o *consumer) deleteNotActive() {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
Expand Down Expand Up @@ -2115,12 +2116,9 @@ func (o *consumer) checkRedelivered(slseq uint64) {
}
}
if shouldUpdateState {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err)
o.mu.Lock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.cfg.Name, name, err)
}
}
}
Expand Down Expand Up @@ -3041,9 +3039,19 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
return nil, 0, errMaxAckPending
}

store := o.mset.store
filter, filterWC := o.cfg.FilterSubject, o.filterWC

// Grab next message applicable to us.
// We will unlock here in case lots of contention, e.g. WQ.
o.mu.Unlock()
pmsg := getJSPubMsgFromPool()
sm, sseq, err := o.mset.store.LoadNextMsg(o.cfg.FilterSubject, o.filterWC, seq, &pmsg.StoreMsg)
sm, sseq, err := store.LoadNextMsg(filter, filterWC, seq, &pmsg.StoreMsg)
if sm == nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
}
o.mu.Lock()

if sseq >= o.sseq {
o.sseq = sseq + 1
Expand All @@ -3052,11 +3060,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
}
}

if sm == nil {
pmsg.returnToPool()
return nil, 0, err
}

return pmsg, dc, err
}

Expand Down Expand Up @@ -3779,14 +3782,22 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool {

// Checks the pending messages.
func (o *consumer) checkPending() {
o.mu.Lock()
defer o.mu.Unlock()

o.mu.RLock()
mset := o.mset
// On stop, mset and timer will be nil.
if mset == nil || o.ptmr == nil {
o.mu.RUnlock()
return
}
o.mu.RUnlock()

var shouldUpdateState bool
var state StreamState
mset.store.FastState(&state)
fseq := state.FirstSeq

o.mu.Lock()
defer o.mu.Unlock()

now := time.Now().UnixNano()
ttl := int64(o.cfg.AckWait)
Expand All @@ -3797,11 +3808,6 @@ func (o *consumer) checkPending() {
next = int64(o.cfg.BackOff[l-1])
}

var shouldUpdateState bool
var state StreamState
mset.store.FastState(&state)
fseq := state.FirstSeq

// Since we can update timestamps, we have to review all pending.
// We will now bail if we see an ack pending in bound to us via o.awl.
var expired []uint64
Expand Down Expand Up @@ -3868,7 +3874,12 @@ func (o *consumer) checkPending() {
}

if len(o.pending) > 0 {
o.ptmr.Reset(o.ackWait(time.Duration(next)))
delay := time.Duration(next)
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(o.ackWait(delay))
}
} else {
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
Expand All @@ -3878,12 +3889,9 @@ func (o *consumer) checkPending() {

// Update our state if needed.
if shouldUpdateState {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err)
o.mu.Lock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.cfg.Name, name, err)
}
}
}
Expand Down
76 changes: 48 additions & 28 deletions server/filestore.go
Expand Up @@ -270,9 +270,9 @@ const (
// Check for bad record length value due to corrupt data.
rlBadThresh = 32 * 1024 * 1024
// Time threshold to write index info.
wiThresh = int64(2 * time.Second)
wiThresh = int64(30 * time.Second)
// Time threshold to write index info for non FIFO cases
winfThresh = int64(500 * time.Millisecond)
winfThresh = int64(2 * time.Second)
)

func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
Expand Down Expand Up @@ -1192,7 +1192,7 @@ func (fs *fileStore) recoverMsgs() error {
if mb.msgs == 0 && mb.rbytes == 0 {
if mb == fs.lmb {
mb.first.seq, mb.first.ts = mb.last.seq+1, 0
mb.closeAndKeepIndex()
mb.closeAndKeepIndex(false)
} else {
emptyBlks = append(emptyBlks, mb)
}
Expand Down Expand Up @@ -1263,7 +1263,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
if mb == fs.lmb {
// Do this part by hand since not deleting one by one.
mb.first.seq, mb.first.ts = mb.last.seq+1, 0
mb.closeAndKeepIndex()
mb.closeAndKeepIndex(false)
// Clear any global subject state.
fs.psim = make(map[string]*psi)
return false
Expand Down Expand Up @@ -2282,12 +2282,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
if fseq == 0 {
fseq, _ = fs.firstSeqForSubj(subj)
}
if ok, _ := fs.removeMsg(fseq, false, false); ok {
if ok, _ := fs.removeMsgViaLimits(fseq); ok {
// Make sure we are below the limit.
if psmc--; psmc >= mmp {
for info, ok := fs.psim[subj]; ok && info.total > mmp; info, ok = fs.psim[subj] {
if seq, _ := fs.firstSeqForSubj(subj); seq > 0 {
if ok, _ := fs.removeMsg(seq, false, false); !ok {
if ok, _ := fs.removeMsgViaLimits(seq); !ok {
break
}
} else {
Expand Down Expand Up @@ -2539,7 +2539,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
m, _, err := mb.firstMatching(subj, false, seq, &sm)
if err == nil {
seq = m.seq + 1
if removed, _ := fs.removeMsg(m.seq, false, false); removed {
if removed, _ := fs.removeMsgViaLimits(m.seq); removed {
total--
blks[mb] = struct{}{}
}
Expand All @@ -2560,17 +2560,24 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {

// Lock should be held.
func (fs *fileStore) deleteFirstMsg() (bool, error) {
return fs.removeMsg(fs.state.FirstSeq, false, false)
return fs.removeMsgViaLimits(fs.state.FirstSeq)
}

// If we remove via limits that can always be recovered on a restart we
// do not force the system to update the index file.
// Lock should be held.
func (fs *fileStore) removeMsgViaLimits(seq uint64) (bool, error) {
return fs.removeMsg(seq, false, true, false)
}

// RemoveMsg will remove the message from this store.
// Will return the number of bytes removed.
func (fs *fileStore) RemoveMsg(seq uint64) (bool, error) {
return fs.removeMsg(seq, false, true)
return fs.removeMsg(seq, false, false, true)
}

func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
return fs.removeMsg(seq, true, true)
return fs.removeMsg(seq, true, false, true)
}

// Convenience function to remove per subject tracking at the filestore level.
Expand All @@ -2590,7 +2597,7 @@ func (fs *fileStore) removePerSubject(subj string) {
}

// Remove a message, optionally rewriting the mb file.
func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) {
func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (bool, error) {
if seq == 0 {
return false, ErrStoreMsgNotFound
}
Expand Down Expand Up @@ -2695,7 +2702,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
fifo := seq == mb.first.seq
isLastBlock := mb == fs.lmb
isEmpty := mb.msgs == 0
shouldWriteIndex := !isEmpty
// If we are removing the message via limits we do not need to write the index file here.
// If viaLimits this means on a restart we will properly cleanup these messages regardless.
shouldWriteIndex := !isEmpty && !viaLimits

if fifo {
mb.selectNextFirst()
Expand Down Expand Up @@ -2724,10 +2733,12 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error

var firstSeqNeedsUpdate bool

// Decide how we want to clean this up. If last block we will hold into index.
// Decide how we want to clean this up. If last block and the only block left we will hold into index.
if isEmpty {
if isLastBlock {
mb.closeAndKeepIndex()
mb.closeAndKeepIndex(viaLimits)
// We do not need to writeIndex since just did above.
shouldWriteIndex = false
} else {
fs.removeMsgBlock(mb)
}
Expand Down Expand Up @@ -3441,7 +3452,9 @@ func (fs *fileStore) expireMsgs() {
fs.mu.RUnlock()

for sm, _ = fs.msgForSeq(0, &smv); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0, &smv) {
fs.removeMsg(sm.seq, false, true)
fs.mu.Lock()
fs.removeMsgViaLimits(sm.seq)
fs.mu.Unlock()
// Recalculate in case we are expiring a bunch.
minAge = time.Now().UnixNano() - maxAge
}
Expand Down Expand Up @@ -3788,7 +3801,7 @@ func (fs *fileStore) syncBlocks() {
mb.ifd.Truncate(mb.liwsz)
mb.ifd.Sync()
}
// See if we can close FDs do to being idle.
// See if we can close FDs due to being idle.
if mb.ifd != nil || mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
Expand Down Expand Up @@ -4241,6 +4254,7 @@ var (
errNoMsgBlk = errors.New("no message block")
errMsgBlkTooBig = errors.New("message block size exceeded int capacity")
errUnknownCipher = errors.New("unknown cipher")
errDIOStalled = errors.New("IO is stalled")
)

// Used for marking messages that have had their checksums checked.
Expand Down Expand Up @@ -4755,7 +4769,9 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
}

// Check if this will be a short write, and if so truncate before writing here.
if int64(len(buf)) < mb.liwsz {
// We only really need to truncate if we are encryptyed or we have dmap entries.
// If no dmap entries readIndexInfo does the right thing in the presence of extra data left over.
if int64(len(buf)) < mb.liwsz && (mb.aek != nil || len(mb.dmap) > 0) {
if err := mb.ifd.Truncate(0); err != nil {
mb.werr = err
return err
Expand All @@ -4770,7 +4786,6 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
} else {
mb.werr = err
}

return err
}

Expand Down Expand Up @@ -5525,7 +5540,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {

// When we have an empty block but want to keep the index for timestamp info etc.
// Lock should be held.
func (mb *msgBlock) closeAndKeepIndex() {
func (mb *msgBlock) closeAndKeepIndex(viaLimits bool) {
// We will leave a 0 length blk marker.
if mb.mfd != nil {
mb.mfd.Truncate(0)
Expand Down Expand Up @@ -5900,13 +5915,18 @@ func (mb *msgBlock) writePerSubjectInfo() error {
b.Write(mb.lchk[:])

// Gate this for when we have a large number of blocks expiring at the same time.
<-dios
err := os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms)
dios <- struct{}{}

// Clear write flag if no error.
if err == nil {
mb.fssNeedsWrite = false
// Since we have the lock we would rather fail here then block.
// This is an optional structure that can be rebuilt on restart.
var err error
select {
case <-dios:
if err = os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms); err == nil {
// Clear write flag if no error.
mb.fssNeedsWrite = false
}
dios <- struct{}{}
default:
err = errDIOStalled
}

return err
Expand Down Expand Up @@ -6796,8 +6816,8 @@ var dios chan struct{}
// Used to setup our simplistic counting semaphore using buffered channels.
// golang.org's semaphore seemed a bit heavy.
func init() {
// Based on Go max threads of 10k, limit ourselves to a max of 1k blocking IO calls.
const nIO = 1024
// Limit ourselves to a max of 4 blocking IO calls.
const nIO = 4
dios = make(chan struct{}, nIO)
// Fill it up to start.
for i := 0; i < nIO; i++ {
Expand Down
8 changes: 4 additions & 4 deletions server/jetstream.go
Expand Up @@ -784,9 +784,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) {
// JetStreamEnabled reports if jetstream is enabled for this server.
func (s *Server) JetStreamEnabled() bool {
var js *jetStream
s.mu.Lock()
s.mu.RLock()
js = s.js
s.mu.Unlock()
s.mu.RUnlock()
return js.isEnabled()
}

Expand Down Expand Up @@ -853,9 +853,9 @@ func (s *Server) signalPullConsumers() {

// Shutdown jetstream for this server.
func (s *Server) shutdownJetStream() {
s.mu.Lock()
s.mu.RLock()
js := s.js
s.mu.Unlock()
s.mu.RUnlock()

if js == nil {
return
Expand Down

0 comments on commit d920ca6

Please sign in to comment.