Skip to content

Commit

Permalink
[FIXED] Avoid stale KV reads on server restart for replicated KV stor…
Browse files Browse the repository at this point in the history
…es. (#4171)

Make sure to wait properly until we believe we are caught up to enable
direct gets on followers.

Signed-off-by: Derek Collison <derek@nats.io>

Resolves #4162
  • Loading branch information
derekcollison committed May 16, 2023
2 parents 4feb7b9 + b0340ce commit 87f17fc
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 19 deletions.
30 changes: 11 additions & 19 deletions server/jetstream_cluster.go
Expand Up @@ -2284,27 +2284,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Here we are checking if we are not the leader but we have been asked to allow
// direct access. We now allow non-leaders to participate in the queue group.
if !isLeader && mset != nil {
mset.mu.Lock()
// Check direct gets first.
if mset.cfg.AllowDirect {
if mset.directSub == nil && mset.isCurrent() {
mset.subscribeToDirect()
} else {
startDirectAccessMonitoring()
}
}
// Now check for mirror directs as well.
if mset.cfg.MirrorDirect {
if mset.mirror != nil && mset.mirror.dsub == nil && mset.isCurrent() {
mset.subscribeToMirrorDirect()
} else {
startDirectAccessMonitoring()
}
}
mset.mu.Unlock()
startDirectAccessMonitoring()
}

case <-datc:
if mset == nil || isRecovering {
return
}
// If we are leader we can stop, we know this is setup now.
if isLeader {
stopDirectMonitoring()
return
}

mset.mu.Lock()
ad, md, current := mset.cfg.AllowDirect, mset.cfg.MirrorDirect, mset.isCurrent()
if !current {
Expand All @@ -2328,7 +2320,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
mset.subscribeToMirrorDirect()
}
mset.mu.Unlock()
// Stop monitoring.
// Stop direct monitoring.
stopDirectMonitoring()

case <-t.C:
Expand Down
76 changes: 76 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4046,3 +4046,79 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) {
})
require_NoError(t, err)
}

// https://github.com/nats-io/nats-server/issues/4162
func TestJetStreamClusterStaleDirectGetOnRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)

_, err = kv.PutString("foo", "bar")
require_NoError(t, err)

// Close client in case we were connected to server below.
// We will recreate.
nc.Close()

// Shutdown a non-leader.
s := c.randomNonStreamLeader(globalAccountName, "KV_TEST")
s.Shutdown()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err = js.KeyValue("TEST")
require_NoError(t, err)

_, err = kv.PutString("foo", "baz")
require_NoError(t, err)

errCh := make(chan error, 100)
done := make(chan struct{})

go func() {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.KeyValue("TEST")
if err != nil {
errCh <- err
return
}

for {
select {
case <-done:
return
default:
entry, err := kv.Get("foo")
if err != nil {
errCh <- err
return
}
if v := string(entry.Value()); v != "baz" {
errCh <- fmt.Errorf("Got wrong value: %q", v)
}
}
}
}()

// Restart
c.restartServer(s)
// Wait for a bit to make sure as this server participates in direct gets
// it does not server stale reads.
time.Sleep(2 * time.Second)
close(done)

if len(errCh) > 0 {
t.Fatalf("Expected no errors but got %v", <-errCh)
}
}

0 comments on commit 87f17fc

Please sign in to comment.