Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: make blocking queries for non-existent items more efficient #12110

Merged
merged 3 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12110.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
rpc: improve blocking queries for items that do not exist, by continuing to block until they exist (or the timeout).
```
21 changes: 16 additions & 5 deletions agent/consul/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLToke

reply.Index, reply.Token = index, token
reply.SourceDatacenter = args.Datacenter
if token == nil {
return errNotFound
}
return nil
})
}
Expand Down Expand Up @@ -1045,6 +1048,9 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPo
}

reply.Index, reply.Policy = index, policy
if policy == nil {
return errNotFound
}
return nil
})
}
Expand Down Expand Up @@ -1428,6 +1434,9 @@ func (a *ACL) RoleRead(args *structs.ACLRoleGetRequest, reply *structs.ACLRoleRe
}

reply.Index, reply.Role = index, role
if role == nil {
return errNotFound
}
return nil
})
}
Expand Down Expand Up @@ -1795,12 +1804,14 @@ func (a *ACL) BindingRuleRead(args *structs.ACLBindingRuleGetRequest, reply *str
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, rule, err := state.ACLBindingRuleGetByID(ws, args.BindingRuleID, &args.EnterpriseMeta)

if err != nil {
return err
}

reply.Index, reply.BindingRule = index, rule
if rule == nil {
return errNotFound
}
return nil
})
}
Expand Down Expand Up @@ -2052,16 +2063,16 @@ func (a *ACL) AuthMethodRead(args *structs.ACLAuthMethodGetRequest, reply *struc
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, method, err := state.ACLAuthMethodGetByName(ws, args.AuthMethodName, &args.EnterpriseMeta)

if err != nil {
return err
}

if method != nil {
_ = a.enterpriseAuthMethodTypeValidation(method.Type)
reply.Index, reply.AuthMethod = index, method
if method == nil {
return errNotFound
}

reply.Index, reply.AuthMethod = index, method
_ = a.enterpriseAuthMethodTypeValidation(method.Type)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/config_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE

reply.Index = index
if entry == nil {
return nil
return errNotFound
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the change that needs to be applied to all the other RPC endpoints to take advantage of this enhancement. Currently it is only implemented on this one ConfigEntry.Get endpoint.

}

reply.Entry = entry
Expand Down
67 changes: 67 additions & 0 deletions agent/consul/config_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consul

import (
"context"
"fmt"
"os"
"sort"
Expand All @@ -9,6 +10,7 @@ import (

msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -302,6 +304,71 @@ func TestConfigEntry_Get(t *testing.T) {
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
}

func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

_, s1 := testServerWithConfig(t)
codec := rpcClient(t, s1)
store := s1.fsm.State()

entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "alpha",
}
require.NoError(t, store.EnsureConfigEntry(1, entry))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var count int

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: "does-not-exist",
}
args.QueryOptions.MaxQueryTime = time.Second

for ctx.Err() == nil {
var out structs.ConfigEntryResponse

err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)
if err != nil {
return err
}
t.Log("blocking query index", out.QueryMeta.Index, out.Entry)
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})

g.Go(func() error {
for i := uint64(0); i < 200; i++ {
time.Sleep(5 * time.Millisecond)
entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf("other%d", i),
}
if err := store.EnsureConfigEntry(i+2, entry); err != nil {
return err
}
}
cancel()
return nil
})

require.NoError(t, g.Wait())
// The test is a bit racy because of the timing of the two goroutines, so
// we relax the check for the count to be within a small range.
if count < 2 || count > 3 {
t.Fatalf("expected count to be 2 or 3, got %d", count)
}
}

func TestConfigEntry_Get_ACLDeny(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down
1 change: 1 addition & 0 deletions agent/consul/coordinate_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde
})
}
reply.Index, reply.Coordinates = index, coords

return nil
})
}
2 changes: 1 addition & 1 deletion agent/consul/federation_state_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs

reply.Index = index
if fedState == nil {
return nil
return errNotFound
}

reply.State = fedState
Expand Down
15 changes: 5 additions & 10 deletions agent/consul/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,18 +160,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
}

if ent == nil {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
reply.Index = 1
} else {
reply.Index = index
}
reply.Index = index
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this part of the change was intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, this is now handled generally in Server.blockingQuery by calling Server.setQueryMeta, and setting the value to 1 if it is still 0. I guess this logic in kvs_endpoint predated the general fix in blockingQuery.

reply.Entries = nil
} else {
reply.Index = ent.ModifyIndex
reply.Entries = structs.DirEntries{ent}
return errNotFound
}

reply.Index = ent.ModifyIndex
reply.Entries = structs.DirEntries{ent}
return nil
})
}
Expand Down
17 changes: 16 additions & 1 deletion agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,9 @@ func (s *Server) blockingQuery(
var ws memdb.WatchSet
err := query(ws, s.fsm.State())
s.setQueryMeta(responseMeta, opts.GetToken())
if errors.Is(err, errNotFound) {
return nil
}
return err
}

Expand All @@ -995,6 +998,8 @@ func (s *Server) blockingQuery(
// decrement the count when the function returns.
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))

var notFound bool

for {
if opts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
Expand All @@ -1014,7 +1019,15 @@ func (s *Server) blockingQuery(

err := query(ws, state)
s.setQueryMeta(responseMeta, opts.GetToken())
if err != nil {
switch {
case errors.Is(err, errNotFound):
if notFound {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need var notFound bool? It seems like it moots the whole point, because the first errNotFound skips the conditional here, and proceeds exactly like the prior scenario, never executing line 978.

Copy link
Contributor Author

@dnephin dnephin Feb 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question!

I believe what you are calling out is that on the first iteration of this loop we still rely on the index not having changed. If a write happens between the time the last blocking query returned and when the next blocking query request happens (which I think should generally only be a few milliseconds), then this optimization does not work. We continue to have the old behaviour of returning unnecessarily. Generally that should be ok because it only limits the optimization, should not impact correctness, and there is a very small window where a write must happen to cause this problem.

There is a way to work around this limitation (could be a follow up), but it introduces some risk, so I thought it would be better to avoid. I haven't documented that anywhere yet (just some discussions in old slack threads), but I can do that here.

The work around for this limitation would be to add a new X-Consul-Index-Not-Found query parameter (as a sibling to X-Consul-Index). This parameter would be used to set the initial value of var notFound bool. That way when a new query comes in, we'll already know if we're in a "not found" state, and only the creation of an item (or a timeout) will cause blockingQuery to return. The risk here is that if the user incorrectly sets this header, then they could miss a delete that happened, but I haven't thought enough about this case to know if it's actually a problem for correctness. Maybe we should do this as a follow up.

Why do we need var notFound bool?

To answer the other part of this question, this is essential to the optimization. I think this is explained briefly in 9abb49da51ad87312a5f34364d8e6a3c0e35c328, but I'm happy to add more detail since this is the main reason for the change. As documented in #12343, there are two things to consider here: the "last modified index" set on responseMeta.Index, and the WatchSet blocking.

We can't fix the WatchSet blocking without turning every read operation into a write that modifies the memdb index (I'm pretty sure this is not an approach we want to investigation). Maybe there are other ways of making WatchSet block correctly for not found items? I have not found any.

So that leaves the index comparison (responseMeta.Index > query.MinIndex). Generally we know this comparison will return true in these "not found" cases, because responseMeta.Index will be the max index of the table, and it changes on every write.

What we need is a comparison we can use instead of responseMeta.Index > query.MinIndex.

If we think about this comparison more abstractly, I think we'll see that the question this is asking is "has the result of the query changed from the last result sent to the client". There are plenty of ways that question could be answered, using the modify index is a very cost effective one, so that's generally what we use.

However as we've noticed, using that modify index doesn't work for cases where an item does not exist, because we don't have a stable modify index to compare against. In this case we use "not found" to indicate if the result has changed.

Since we don't know the state from the client perspective (X-Consul-Index-Not-Found would give us that), we have to perform the query once. If the result of the query is the same "last modified index", then we know at that index the query returned no results (which tells us the client's last view was "not found"). Until the query returns some result, the state has not changed, so we can continue to block until that time.

In the line below this, when we see "state has not changed, the result is still not found", we update the minQueryIndex to the latest responseMeta.Index, so that the comparison later keeps us in the for loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #12363 for that potential future optimization.

// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}

notFound = true
case err != nil:
return err
}

Expand All @@ -1037,6 +1050,8 @@ func (s *Server) blockingQuery(
}
}

var errNotFound = fmt.Errorf("no data found for query")

// setQueryMeta is used to populate the QueryMeta data for an RPC call
//
// Note: This method must be called *after* filtering query results with ACLs.
Expand Down
93 changes: 89 additions & 4 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,9 @@ func (m *MockSink) Close() error {
return nil
}

func TestRPC_blockingQuery(t *testing.T) {
func TestServer_blockingQuery(t *testing.T) {
t.Parallel()
dir, s := testServer(t)
defer os.RemoveAll(dir)
defer s.Shutdown()
_, s := testServerWithConfig(t)

// Perform a non-blocking query. Note that it's significant that the meta has
// a zero index in response - the implied opts.MinQueryIndex is also zero but
Expand Down Expand Up @@ -391,6 +389,93 @@ func TestRPC_blockingQuery(t *testing.T) {
require.NoError(t, err)
require.True(t, meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be honored for authenticated calls")
})

t.Run("non-blocking query for item that does not exist", func(t *testing.T) {
opts := structs.QueryOptions{}
meta := structs.QueryMeta{}
calls := 0
fn := func(_ memdb.WatchSet, _ *state.Store) error {
calls++
return errNotFound
}

err := s.blockingQuery(&opts, &meta, fn)
require.NoError(t, err)
require.Equal(t, 1, calls)
})

t.Run("blocking query for item that does not exist", func(t *testing.T) {
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
meta := structs.QueryMeta{}
calls := 0
fn := func(ws memdb.WatchSet, _ *state.Store) error {
calls++
if calls == 1 {
meta.Index = 3

ch := make(chan struct{})
close(ch)
ws.Add(ch)
return errNotFound
}
meta.Index = 5
return errNotFound
}

err := s.blockingQuery(&opts, &meta, fn)
require.NoError(t, err)
require.Equal(t, 2, calls)
})

t.Run("blocking query for item that existed and is removed", func(t *testing.T) {
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
meta := structs.QueryMeta{}
calls := 0
fn := func(ws memdb.WatchSet, _ *state.Store) error {
calls++
if calls == 1 {
meta.Index = 3

ch := make(chan struct{})
close(ch)
ws.Add(ch)
return nil
}
meta.Index = 5
return errNotFound
}

start := time.Now()
err := s.blockingQuery(&opts, &meta, fn)
require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out")
require.NoError(t, err)
require.Equal(t, 2, calls)
})

t.Run("blocking query for non-existent item that is created", func(t *testing.T) {
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
meta := structs.QueryMeta{}
calls := 0
fn := func(ws memdb.WatchSet, _ *state.Store) error {
calls++
if calls == 1 {
meta.Index = 3

ch := make(chan struct{})
close(ch)
ws.Add(ch)
return errNotFound
}
meta.Index = 5
return nil
}

start := time.Now()
err := s.blockingQuery(&opts, &meta, fn)
require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out")
require.NoError(t, err)
require.Equal(t, 2, calls)
})
}

func TestRPC_ReadyForConsistentReads(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions agent/consul/session_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
reply.Sessions = structs.Sessions{session}
} else {
reply.Sessions = nil
return errNotFound
}
s.srv.filterACLWithAuthorizer(authz, reply)
return nil
Expand Down