Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXES/IMPROVED] Raft and filestore logic #4045

Merged
merged 6 commits into from Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 11 additions & 6 deletions server/filestore.go
Expand Up @@ -1311,6 +1311,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
}

var smv StoreMsg
var needNextFirst bool

// Walk messages and remove if expired.
mb.ensurePerSubjectInfoLoaded()
Expand All @@ -1325,21 +1326,21 @@ func (fs *fileStore) expireMsgsOnRecover() {
mb.dmap = nil
}
}
// Keep this update just in case since we are removing dmap entries.
mb.first.seq = seq
// Keep this updated just in case since we are removing dmap entries.
mb.first.seq, needNextFirst = seq, true
continue
}
// Break on other errors.
if err != nil || sm == nil {
// Keep this update just in case since we could have removed dmap entries.
mb.first.seq = seq
mb.first.seq, needNextFirst = seq, true
break
}

// No error and sm != nil from here onward.

// Check for done.
if minAge < sm.ts {
mb.first.seq, needNextFirst = sm.seq, false
mb.first.seq = sm.seq
mb.first.ts = sm.ts
nts = sm.ts
Expand All @@ -1348,6 +1349,7 @@ func (fs *fileStore) expireMsgsOnRecover() {

// Delete the message here.
if mb.msgs > 0 {
mb.first.seq, needNextFirst = seq, true
sz := fileStoreMsgSize(sm.subj, sm.hdr, sm.msg)
mb.bytes -= sz
bytes += sz
Expand All @@ -1359,7 +1361,10 @@ func (fs *fileStore) expireMsgsOnRecover() {
mb.removeSeqPerSubject(sm.subj, seq, nil)
fs.removePerSubject(sm.subj)
}

// Make sure we have a proper next first sequence.
if needNextFirst {
mb.selectNextFirst()
}
// Check if empty after processing, could happen if tail of messages are all deleted.
needWriteIndex := true
if mb.msgs == 0 {
Expand Down Expand Up @@ -2365,7 +2370,7 @@ func (fs *fileStore) StoreMsg(subj string, hdr, msg []byte) (uint64, int64, erro

// skipMsg will update this message block for a skipped message.
// If we do not have any messages, just update the metadata, otherwise
// we will place and empty record marking the sequence as used. The
// we will place an empty record marking the sequence as used. The
// sequence will be marked erased.
// fs lock should be held.
func (mb *msgBlock) skipMsg(seq uint64, now time.Time) {
Expand Down
4 changes: 2 additions & 2 deletions server/jetstream_cluster.go
Expand Up @@ -7843,9 +7843,9 @@ func (mset *stream) processClusterStreamInfoRequest(reply string) {
return
}

// If we are not the leader let someone else possible respond first.
// If we are not the leader let someone else possibly respond first.
if !isLeader {
time.Sleep(200 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
}

si := &StreamInfo{
Expand Down
12 changes: 7 additions & 5 deletions server/raft.go
Expand Up @@ -347,16 +347,16 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error
if cfg == nil {
return nil, errNilCfg
}
s.mu.Lock()
s.mu.RLock()
if s.sys == nil {
s.mu.Unlock()
s.mu.RUnlock()
return nil, ErrNoSysAccount
}
sq := s.sys.sq
sacc := s.sys.account
hash := s.sys.shash
pub := s.info.ID
s.mu.Unlock()
s.mu.RUnlock()

ps, err := readPeerState(cfg.Store)
if err != nil {
Expand Down Expand Up @@ -2403,6 +2403,8 @@ func (n *raft) sendSnapshotToFollower(subject string) (uint64, error) {
if err != nil {
// We need to stepdown here when this happens.
n.stepdown.push(noLeader)
// We need to reset our state here as well.
n.resetWAL()
return 0, err
}
// Go ahead and send the snapshot and peerstate here as first append entry to the catchup follower.
Expand Down Expand Up @@ -2880,7 +2882,7 @@ func (n *raft) truncateWAL(term, index uint64) {
if err == ErrInvalidSequence {
n.debug("Resetting WAL")
n.wal.Truncate(0)
index, n.pterm, n.pindex = 0, 0, 0
index, n.term, n.pterm, n.pindex = 0, 0, 0, 0
} else {
n.warn("Error truncating WAL: %v", err)
n.setWriteErrLocked(err)
Expand All @@ -2889,7 +2891,7 @@ func (n *raft) truncateWAL(term, index uint64) {
}

// Set after we know we have truncated properly.
n.pterm, n.pindex = term, index
n.term, n.pterm, n.pindex = term, term, index
}

// Reset our WAL.
Expand Down
276 changes: 276 additions & 0 deletions server/raft_helpers_test.go
@@ -0,0 +1,276 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Do not exlude this file with the !skip_js_tests since those helpers
// are also used by MQTT.

package server

import (
"encoding/binary"
"fmt"
"math/rand"
"sync"
"testing"
"time"
)

type stateMachine interface {
server() *Server
node() RaftNode
// This will call forward as needed so can be called on any node.
propose(data []byte)
// When entries have been committed and can be applied.
applyEntry(ce *CommittedEntry)
// When a leader change happens.
leaderChange(isLeader bool)
// Stop the raft group.
stop()
// Restart
restart()
}

// Factory function needed for constructor.
type smFactory func(s *Server, cfg *RaftConfig, node RaftNode) stateMachine

type smGroup []stateMachine

// Leader of the group.
func (sg smGroup) leader() stateMachine {
for _, sm := range sg {
if sm.node().Leader() {
return sm
}
}
return nil
}

// Wait on a leader to be elected.
func (sg smGroup) waitOnLeader() {
expires := time.Now().Add(10 * time.Second)
for time.Now().Before(expires) {
for _, sm := range sg {
if sm.node().Leader() {
return
}
}
time.Sleep(100 * time.Millisecond)
}
}

// Pick a random member.
func (sg smGroup) randomMember() stateMachine {
return sg[rand.Intn(len(sg))]
}

// Return a non-leader
func (sg smGroup) nonLeader() stateMachine {
for _, sm := range sg {
if !sm.node().Leader() {
return sm
}
}
return nil
}

// Create a raft group and place on numMembers servers at random.
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
c.t.Helper()
if numMembers > len(c.servers) {
c.t.Fatalf("Members > Peers: %d vs %d", numMembers, len(c.servers))
}
servers := append([]*Server{}, c.servers...)
rand.Shuffle(len(servers), func(i, j int) { servers[i], servers[j] = servers[j], servers[i] })
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf)
}

func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory) smGroup {
c.t.Helper()

var sg smGroup
var peers []string

for _, s := range servers {
// generate peer names.
s.mu.RLock()
peers = append(peers, s.sys.shash)
s.mu.RUnlock()
}

for _, s := range servers {
fs, err := newFileStore(
FileStoreConfig{StoreDir: c.t.TempDir(), BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
StreamConfig{Name: name, Storage: FileStorage},
)
require_NoError(c.t, err)
cfg := &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs}
s.bootstrapRaftNode(cfg, peers, true)
n, err := s.startRaftNode(globalAccountName, cfg)
require_NoError(c.t, err)
sm := smf(s, cfg, n)
sg = append(sg, sm)
go smLoop(sm)
}
return sg
}

// Driver program for the state machine.
// Should be run in its own go routine.
func smLoop(sm stateMachine) {
s, n := sm.server(), sm.node()
qch, lch, aq := n.QuitC(), n.LeadChangeC(), n.ApplyQ()

for {
select {
case <-s.quitCh:
return
case <-qch:
return
case <-aq.ch:
ces := aq.pop()
for _, ce := range ces {
sm.applyEntry(ce)
}
aq.recycle(&ces)

case isLeader := <-lch:
sm.leaderChange(isLeader)
}
}
}

// Simple implementation of a replicated state.
// The adder state just sums up int64 values.
type stateAdder struct {
sync.Mutex
s *Server
n RaftNode
cfg *RaftConfig
sum int64
}

// Simple getters for server and the raft node.
func (a *stateAdder) server() *Server {
a.Lock()
defer a.Unlock()
return a.s
}
func (a *stateAdder) node() RaftNode {
a.Lock()
defer a.Unlock()
return a.n
}

func (a *stateAdder) propose(data []byte) {
a.Lock()
defer a.Unlock()
a.n.ForwardProposal(data)
}

func (a *stateAdder) applyEntry(ce *CommittedEntry) {
a.Lock()
defer a.Unlock()
if ce == nil {
// This means initial state is done/replayed.
return
}
for _, e := range ce.Entries {
if e.Type == EntryNormal {
delta, _ := binary.Varint(e.Data)
a.sum += delta
} else if e.Type == EntrySnapshot {
a.sum, _ = binary.Varint(e.Data)
}
}
// Update applied.
a.n.Applied(ce.Index)
}

func (a *stateAdder) leaderChange(isLeader bool) {}

// Adder specific to change the total.
func (a *stateAdder) proposeDelta(delta int64) {
data := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(data, int64(delta))
a.propose(data[:n])
}

// Stop the group.
func (a *stateAdder) stop() {
a.Lock()
defer a.Unlock()
a.n.Stop()
}

// Restart the group
func (a *stateAdder) restart() {
a.Lock()
defer a.Unlock()

if a.n.State() != Closed {
return
}

// The filestore is stopped as well, so need to extract the parts to recreate it.
rn := a.n.(*raft)
fs := rn.wal.(*fileStore)

var err error
a.cfg.Log, err = newFileStore(fs.fcfg, fs.cfg.StreamConfig)
if err != nil {
panic(err)
}
a.n, err = a.s.startRaftNode(globalAccountName, a.cfg)
if err != nil {
panic(err)
}
// Finally restart the driver.
go smLoop(a)
}

// Total for the adder state machine.
func (a *stateAdder) total() int64 {
a.Lock()
defer a.Unlock()
return a.sum
}

// Install a snapshot.
func (a *stateAdder) snapshot(t *testing.T) {
a.Lock()
defer a.Unlock()
data := make([]byte, binary.MaxVarintLen64)
n := binary.PutVarint(data, a.sum)
snap := data[:n]
require_NoError(t, a.n.InstallSnapshot(snap))
}

// Helper to wait for a certain state.
func (rg smGroup) waitOnTotal(t *testing.T, expected int64) {
t.Helper()
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
for _, sm := range rg {
asm := sm.(*stateAdder)
if total := asm.total(); total != expected {
return fmt.Errorf("Adder on %v has wrong total: %d vs %d",
asm.server(), total, expected)
}
}
return nil
})
}

// Factory function.
func newStateAdder(s *Server, cfg *RaftConfig, n RaftNode) stateMachine {
return &stateAdder{s: s, n: n, cfg: cfg}
}