Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Performance and Stability under heavy IO loads. #3922

Merged
merged 21 commits into from Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4391629
Make minimum snapshot time for all assets 10s.
derekcollison Feb 27, 2023
2711460
Prevent benign spin between competing leaders with same index but dif…
derekcollison Feb 27, 2023
3cebd26
Optimize for high IO workloads. When we know optional metadata will a…
derekcollison Feb 27, 2023
13167f4
Optimize some locking for when under heavy loads.
derekcollison Feb 27, 2023
2642a8c
Optimize locking for when under heavy loads.
derekcollison Feb 27, 2023
9721309
Do not allow meta snapshot processing during recovery to override.
derekcollison Feb 28, 2023
6078706
Fixup test for new parameters
derekcollison Feb 28, 2023
576d317
Sometimes do force meta snapshot
derekcollison Feb 28, 2023
fa8afba
Only warn on write errors if not closed in case they linger under pre…
derekcollison Feb 28, 2023
aad8aa6
Do not need lock to grab js here
derekcollison Feb 28, 2023
bee149b
Only need server's rlock here.
derekcollison Feb 28, 2023
b19fe50
Do not block routes/gws on internal stream and consumer info requests
derekcollison Feb 28, 2023
adbb50f
Fixed dios capacity to 4 due to testing under heavy load.
derekcollison Feb 28, 2023
d85bec2
Do not block in place on warning, and only warn if consumer not closed
derekcollison Feb 28, 2023
3807441
Always process inbound messages in separate execution context.
derekcollison Feb 28, 2023
6bda358
Fix tests that made assumptions about single server processing.
derekcollison Feb 28, 2023
24cb570
Do not lock on stream name for consumer write state error
derekcollison Feb 28, 2023
68cd312
Be more conservative on defaultMaxTotalCatchupOutBytes, default to 64M
derekcollison Feb 28, 2023
bab10c1
Revert closeAndKeepIndex behavior
derekcollison Feb 28, 2023
724160e
Fix flapping tests
derekcollison Feb 28, 2023
1956fa3
Signal a metasnapshot for consumer deletes as well
derekcollison Feb 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
62 changes: 35 additions & 27 deletions server/consumer.go
Expand Up @@ -1138,8 +1138,9 @@ func (o *consumer) setLeader(isLeader bool) {
}
}

// This is coming on the wire so do not block here.
func (o *consumer) handleClusterConsumerInfoRequest(sub *subscription, c *client, _ *Account, subject, reply string, msg []byte) {
o.infoWithSnapAndReply(false, reply)
go o.infoWithSnapAndReply(false, reply)
}

// Lock should be held.
Expand Down Expand Up @@ -1378,7 +1379,7 @@ func (o *consumer) deleteNotActive() {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
ticker := time.NewTicker(time.Second)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
Expand Down Expand Up @@ -2115,12 +2116,9 @@ func (o *consumer) checkRedelivered(slseq uint64) {
}
}
if shouldUpdateState {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.name(), name, err)
o.mu.Lock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check redelivered: %v", acc, mset.cfg.Name, name, err)
}
}
}
Expand Down Expand Up @@ -3041,9 +3039,19 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
return nil, 0, errMaxAckPending
}

store := o.mset.store
filter, filterWC := o.cfg.FilterSubject, o.filterWC

// Grab next message applicable to us.
// We will unlock here in case lots of contention, e.g. WQ.
o.mu.Unlock()
pmsg := getJSPubMsgFromPool()
sm, sseq, err := o.mset.store.LoadNextMsg(o.cfg.FilterSubject, o.filterWC, seq, &pmsg.StoreMsg)
sm, sseq, err := store.LoadNextMsg(filter, filterWC, seq, &pmsg.StoreMsg)
if sm == nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
}
o.mu.Lock()

if sseq >= o.sseq {
o.sseq = sseq + 1
Expand All @@ -3052,11 +3060,6 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
}
}

if sm == nil {
pmsg.returnToPool()
return nil, 0, err
}

return pmsg, dc, err
}

Expand Down Expand Up @@ -3779,14 +3782,22 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool {

// Checks the pending messages.
func (o *consumer) checkPending() {
o.mu.Lock()
defer o.mu.Unlock()

o.mu.RLock()
mset := o.mset
// On stop, mset and timer will be nil.
if mset == nil || o.ptmr == nil {
o.mu.RUnlock()
return
}
o.mu.RUnlock()

var shouldUpdateState bool
var state StreamState
mset.store.FastState(&state)
fseq := state.FirstSeq

o.mu.Lock()
defer o.mu.Unlock()

now := time.Now().UnixNano()
ttl := int64(o.cfg.AckWait)
Expand All @@ -3797,11 +3808,6 @@ func (o *consumer) checkPending() {
next = int64(o.cfg.BackOff[l-1])
}

var shouldUpdateState bool
var state StreamState
mset.store.FastState(&state)
fseq := state.FirstSeq

// Since we can update timestamps, we have to review all pending.
// We will now bail if we see an ack pending in bound to us via o.awl.
var expired []uint64
Expand Down Expand Up @@ -3868,7 +3874,12 @@ func (o *consumer) checkPending() {
}

if len(o.pending) > 0 {
o.ptmr.Reset(o.ackWait(time.Duration(next)))
delay := time.Duration(next)
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(o.ackWait(delay))
}
} else {
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
Expand All @@ -3878,12 +3889,9 @@ func (o *consumer) checkPending() {

// Update our state if needed.
if shouldUpdateState {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil {
if err := o.writeStoreStateUnlocked(); err != nil && o.srv != nil && o.mset != nil && !o.closed {
s, acc, mset, name := o.srv, o.acc, o.mset, o.name
// Can not hold lock while gather information about account and stream below.
o.mu.Unlock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.name(), name, err)
o.mu.Lock()
s.Warnf("Consumer '%s > %s > %s' error on write store state from check pending: %v", acc, mset.cfg.Name, name, err)
}
}
}
Expand Down
76 changes: 48 additions & 28 deletions server/filestore.go
Expand Up @@ -270,9 +270,9 @@ const (
// Check for bad record length value due to corrupt data.
rlBadThresh = 32 * 1024 * 1024
// Time threshold to write index info.
wiThresh = int64(2 * time.Second)
wiThresh = int64(30 * time.Second)
// Time threshold to write index info for non FIFO cases
winfThresh = int64(500 * time.Millisecond)
winfThresh = int64(2 * time.Second)
)

func newFileStore(fcfg FileStoreConfig, cfg StreamConfig) (*fileStore, error) {
Expand Down Expand Up @@ -1192,7 +1192,7 @@ func (fs *fileStore) recoverMsgs() error {
if mb.msgs == 0 && mb.rbytes == 0 {
if mb == fs.lmb {
mb.first.seq, mb.first.ts = mb.last.seq+1, 0
mb.closeAndKeepIndex()
mb.closeAndKeepIndex(false)
} else {
emptyBlks = append(emptyBlks, mb)
}
Expand Down Expand Up @@ -1263,7 +1263,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
if mb == fs.lmb {
// Do this part by hand since not deleting one by one.
mb.first.seq, mb.first.ts = mb.last.seq+1, 0
mb.closeAndKeepIndex()
mb.closeAndKeepIndex(false)
// Clear any global subject state.
fs.psim = make(map[string]*psi)
return false
Expand Down Expand Up @@ -2282,12 +2282,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
if fseq == 0 {
fseq, _ = fs.firstSeqForSubj(subj)
}
if ok, _ := fs.removeMsg(fseq, false, false); ok {
if ok, _ := fs.removeMsgViaLimits(fseq); ok {
// Make sure we are below the limit.
if psmc--; psmc >= mmp {
for info, ok := fs.psim[subj]; ok && info.total > mmp; info, ok = fs.psim[subj] {
if seq, _ := fs.firstSeqForSubj(subj); seq > 0 {
if ok, _ := fs.removeMsg(seq, false, false); !ok {
if ok, _ := fs.removeMsgViaLimits(seq); !ok {
break
}
} else {
Expand Down Expand Up @@ -2539,7 +2539,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
m, _, err := mb.firstMatching(subj, false, seq, &sm)
if err == nil {
seq = m.seq + 1
if removed, _ := fs.removeMsg(m.seq, false, false); removed {
if removed, _ := fs.removeMsgViaLimits(m.seq); removed {
total--
blks[mb] = struct{}{}
}
Expand All @@ -2560,17 +2560,24 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {

// Lock should be held.
func (fs *fileStore) deleteFirstMsg() (bool, error) {
return fs.removeMsg(fs.state.FirstSeq, false, false)
return fs.removeMsgViaLimits(fs.state.FirstSeq)
}

// If we remove via limits that can always be recovered on a restart we
// do not force the system to update the index file.
// Lock should be held.
func (fs *fileStore) removeMsgViaLimits(seq uint64) (bool, error) {
return fs.removeMsg(seq, false, true, false)
}

// RemoveMsg will remove the message from this store.
// Will return the number of bytes removed.
func (fs *fileStore) RemoveMsg(seq uint64) (bool, error) {
return fs.removeMsg(seq, false, true)
return fs.removeMsg(seq, false, false, true)
}

func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
return fs.removeMsg(seq, true, true)
return fs.removeMsg(seq, true, false, true)
}

// Convenience function to remove per subject tracking at the filestore level.
Expand All @@ -2590,7 +2597,7 @@ func (fs *fileStore) removePerSubject(subj string) {
}

// Remove a message, optionally rewriting the mb file.
func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) {
func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (bool, error) {
if seq == 0 {
return false, ErrStoreMsgNotFound
}
Expand Down Expand Up @@ -2695,7 +2702,9 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
fifo := seq == mb.first.seq
isLastBlock := mb == fs.lmb
isEmpty := mb.msgs == 0
shouldWriteIndex := !isEmpty
// If we are removing the message via limits we do not need to write the index file here.
// If viaLimits this means on a restart we will properly cleanup these messages regardless.
shouldWriteIndex := !isEmpty && !viaLimits

if fifo {
mb.selectNextFirst()
Expand Down Expand Up @@ -2724,10 +2733,12 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error

var firstSeqNeedsUpdate bool

// Decide how we want to clean this up. If last block we will hold into index.
// Decide how we want to clean this up. If last block and the only block left we will hold into index.
if isEmpty {
if isLastBlock {
mb.closeAndKeepIndex()
mb.closeAndKeepIndex(viaLimits)
// We do not need to writeIndex since just did above.
shouldWriteIndex = false
} else {
fs.removeMsgBlock(mb)
}
Expand Down Expand Up @@ -3441,7 +3452,9 @@ func (fs *fileStore) expireMsgs() {
fs.mu.RUnlock()

for sm, _ = fs.msgForSeq(0, &smv); sm != nil && sm.ts <= minAge; sm, _ = fs.msgForSeq(0, &smv) {
fs.removeMsg(sm.seq, false, true)
fs.mu.Lock()
fs.removeMsgViaLimits(sm.seq)
fs.mu.Unlock()
// Recalculate in case we are expiring a bunch.
minAge = time.Now().UnixNano() - maxAge
}
Expand Down Expand Up @@ -3788,7 +3801,7 @@ func (fs *fileStore) syncBlocks() {
mb.ifd.Truncate(mb.liwsz)
mb.ifd.Sync()
}
// See if we can close FDs do to being idle.
// See if we can close FDs due to being idle.
if mb.ifd != nil || mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
Expand Down Expand Up @@ -4241,6 +4254,7 @@ var (
errNoMsgBlk = errors.New("no message block")
errMsgBlkTooBig = errors.New("message block size exceeded int capacity")
errUnknownCipher = errors.New("unknown cipher")
errDIOStalled = errors.New("IO is stalled")
)

// Used for marking messages that have had their checksums checked.
Expand Down Expand Up @@ -4755,7 +4769,9 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
}

// Check if this will be a short write, and if so truncate before writing here.
if int64(len(buf)) < mb.liwsz {
// We only really need to truncate if we are encryptyed or we have dmap entries.
// If no dmap entries readIndexInfo does the right thing in the presence of extra data left over.
if int64(len(buf)) < mb.liwsz && (mb.aek != nil || len(mb.dmap) > 0) {
if err := mb.ifd.Truncate(0); err != nil {
mb.werr = err
return err
Expand All @@ -4770,7 +4786,6 @@ func (mb *msgBlock) writeIndexInfoLocked() error {
} else {
mb.werr = err
}

return err
}

Expand Down Expand Up @@ -5525,7 +5540,7 @@ func (fs *fileStore) removeMsgBlock(mb *msgBlock) {

// When we have an empty block but want to keep the index for timestamp info etc.
// Lock should be held.
func (mb *msgBlock) closeAndKeepIndex() {
func (mb *msgBlock) closeAndKeepIndex(viaLimits bool) {
// We will leave a 0 length blk marker.
if mb.mfd != nil {
mb.mfd.Truncate(0)
Expand Down Expand Up @@ -5900,13 +5915,18 @@ func (mb *msgBlock) writePerSubjectInfo() error {
b.Write(mb.lchk[:])

// Gate this for when we have a large number of blocks expiring at the same time.
<-dios
err := os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms)
dios <- struct{}{}

// Clear write flag if no error.
if err == nil {
mb.fssNeedsWrite = false
// Since we have the lock we would rather fail here then block.
// This is an optional structure that can be rebuilt on restart.
var err error
select {
case <-dios:
if err = os.WriteFile(mb.sfn, b.Bytes(), defaultFilePerms); err == nil {
// Clear write flag if no error.
mb.fssNeedsWrite = false
}
dios <- struct{}{}
default:
err = errDIOStalled
}

return err
Expand Down Expand Up @@ -6796,8 +6816,8 @@ var dios chan struct{}
// Used to setup our simplistic counting semaphore using buffered channels.
// golang.org's semaphore seemed a bit heavy.
func init() {
// Based on Go max threads of 10k, limit ourselves to a max of 1k blocking IO calls.
const nIO = 1024
// Limit ourselves to a max of 4 blocking IO calls.
const nIO = 4
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
dios = make(chan struct{}, nIO)
// Fill it up to start.
for i := 0; i < nIO; i++ {
Expand Down
8 changes: 4 additions & 4 deletions server/jetstream.go
Expand Up @@ -784,9 +784,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) {
// JetStreamEnabled reports if jetstream is enabled for this server.
func (s *Server) JetStreamEnabled() bool {
var js *jetStream
s.mu.Lock()
s.mu.RLock()
js = s.js
s.mu.Unlock()
s.mu.RUnlock()
return js.isEnabled()
}

Expand Down Expand Up @@ -853,9 +853,9 @@ func (s *Server) signalPullConsumers() {

// Shutdown jetstream for this server.
func (s *Server) shutdownJetStream() {
s.mu.Lock()
s.mu.RLock()
js := s.js
s.mu.Unlock()
s.mu.RUnlock()

if js == nil {
return
Expand Down