Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Avoid stale KV reads on server restart for replicated KV stores. #4171

Merged
merged 1 commit into from May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 11 additions & 19 deletions server/jetstream_cluster.go
Expand Up @@ -2284,27 +2284,19 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Here we are checking if we are not the leader but we have been asked to allow
// direct access. We now allow non-leaders to participate in the queue group.
if !isLeader && mset != nil {
mset.mu.Lock()
// Check direct gets first.
if mset.cfg.AllowDirect {
if mset.directSub == nil && mset.isCurrent() {
mset.subscribeToDirect()
} else {
startDirectAccessMonitoring()
}
}
// Now check for mirror directs as well.
if mset.cfg.MirrorDirect {
if mset.mirror != nil && mset.mirror.dsub == nil && mset.isCurrent() {
mset.subscribeToMirrorDirect()
} else {
startDirectAccessMonitoring()
}
}
mset.mu.Unlock()
startDirectAccessMonitoring()
}

case <-datc:
if mset == nil || isRecovering {
return
}
// If we are leader we can stop, we know this is setup now.
if isLeader {
stopDirectMonitoring()
return
}

mset.mu.Lock()
ad, md, current := mset.cfg.AllowDirect, mset.cfg.MirrorDirect, mset.isCurrent()
if !current {
Expand All @@ -2328,7 +2320,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
mset.subscribeToMirrorDirect()
}
mset.mu.Unlock()
// Stop monitoring.
// Stop direct monitoring.
stopDirectMonitoring()

case <-t.C:
Expand Down
76 changes: 76 additions & 0 deletions server/jetstream_cluster_3_test.go
Expand Up @@ -4046,3 +4046,79 @@ func TestJetStreamClusterStreamScaleUpNoGroupCluster(t *testing.T) {
})
require_NoError(t, err)
}

// https://github.com/nats-io/nats-server/issues/4162
func TestJetStreamClusterStaleDirectGetOnRestart(t *testing.T) {
c := createJetStreamClusterExplicit(t, "NATS", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "TEST",
Replicas: 3,
})
require_NoError(t, err)

_, err = kv.PutString("foo", "bar")
require_NoError(t, err)

// Close client in case we were connected to server below.
// We will recreate.
nc.Close()

// Shutdown a non-leader.
s := c.randomNonStreamLeader(globalAccountName, "KV_TEST")
s.Shutdown()

nc, js = jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err = js.KeyValue("TEST")
require_NoError(t, err)

_, err = kv.PutString("foo", "baz")
require_NoError(t, err)

errCh := make(chan error, 100)
done := make(chan struct{})

go func() {
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

kv, err := js.KeyValue("TEST")
if err != nil {
errCh <- err
return
}

for {
select {
case <-done:
return
default:
entry, err := kv.Get("foo")
if err != nil {
errCh <- err
return
}
if v := string(entry.Value()); v != "baz" {
errCh <- fmt.Errorf("Got wrong value: %q", v)
}
}
}
}()

// Restart
c.restartServer(s)
// Wait for a bit to make sure as this server participates in direct gets
// it does not server stale reads.
time.Sleep(2 * time.Second)
close(done)

if len(errCh) > 0 {
t.Fatalf("Expected no errors but got %v", <-errCh)
}
}