Skip to content

Commit

Permalink
Merge pull request #12110 from hashicorp/dnephin/blocking-queries-not…
Browse files Browse the repository at this point in the history
…-found

rpc: make blocking queries for non-existent items more efficient
  • Loading branch information
dnephin committed Feb 17, 2022
2 parents 6e6cd92 + 8a6e75a commit 85ecbaf
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 22 deletions.
3 changes: 3 additions & 0 deletions .changelog/12110.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
rpc: improve blocking queries for items that do not exist, by continuing to block until they exist (or the timeout).
```
21 changes: 16 additions & 5 deletions agent/consul/acl_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLToke

reply.Index, reply.Token = index, token
reply.SourceDatacenter = args.Datacenter
if token == nil {
return errNotFound
}
return nil
})
}
Expand Down Expand Up @@ -1045,6 +1048,9 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPo
}

reply.Index, reply.Policy = index, policy
if policy == nil {
return errNotFound
}
return nil
})
}
Expand Down Expand Up @@ -1428,6 +1434,9 @@ func (a *ACL) RoleRead(args *structs.ACLRoleGetRequest, reply *structs.ACLRoleRe
}

reply.Index, reply.Role = index, role
if role == nil {
return errNotFound
}
return nil
})
}
Expand Down Expand Up @@ -1795,12 +1804,14 @@ func (a *ACL) BindingRuleRead(args *structs.ACLBindingRuleGetRequest, reply *str
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, rule, err := state.ACLBindingRuleGetByID(ws, args.BindingRuleID, &args.EnterpriseMeta)

if err != nil {
return err
}

reply.Index, reply.BindingRule = index, rule
if rule == nil {
return errNotFound
}
return nil
})
}
Expand Down Expand Up @@ -2052,16 +2063,16 @@ func (a *ACL) AuthMethodRead(args *structs.ACLAuthMethodGetRequest, reply *struc
return a.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, method, err := state.ACLAuthMethodGetByName(ws, args.AuthMethodName, &args.EnterpriseMeta)

if err != nil {
return err
}

if method != nil {
_ = a.enterpriseAuthMethodTypeValidation(method.Type)
reply.Index, reply.AuthMethod = index, method
if method == nil {
return errNotFound
}

reply.Index, reply.AuthMethod = index, method
_ = a.enterpriseAuthMethodTypeValidation(method.Type)
return nil
})
}
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/config_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE

reply.Index = index
if entry == nil {
return nil
return errNotFound
}

reply.Entry = entry
Expand Down
67 changes: 67 additions & 0 deletions agent/consul/config_endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package consul

import (
"context"
"fmt"
"os"
"sort"
Expand All @@ -9,6 +10,7 @@ import (

msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
Expand Down Expand Up @@ -302,6 +304,71 @@ func TestConfigEntry_Get(t *testing.T) {
require.Equal(t, structs.ServiceDefaults, serviceConf.Kind)
}

func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

_, s1 := testServerWithConfig(t)
codec := rpcClient(t, s1)
store := s1.fsm.State()

entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "alpha",
}
require.NoError(t, store.EnsureConfigEntry(1, entry))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var count int

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: "does-not-exist",
}
args.QueryOptions.MaxQueryTime = time.Second

for ctx.Err() == nil {
var out structs.ConfigEntryResponse

err := msgpackrpc.CallWithCodec(codec, "ConfigEntry.Get", &args, &out)
if err != nil {
return err
}
t.Log("blocking query index", out.QueryMeta.Index, out.Entry)
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})

g.Go(func() error {
for i := uint64(0); i < 200; i++ {
time.Sleep(5 * time.Millisecond)
entry := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf("other%d", i),
}
if err := store.EnsureConfigEntry(i+2, entry); err != nil {
return err
}
}
cancel()
return nil
})

require.NoError(t, g.Wait())
// The test is a bit racy because of the timing of the two goroutines, so
// we relax the check for the count to be within a small range.
if count < 2 || count > 3 {
t.Fatalf("expected count to be 2 or 3, got %d", count)
}
}

func TestConfigEntry_Get_ACLDeny(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down
1 change: 1 addition & 0 deletions agent/consul/coordinate_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde
})
}
reply.Index, reply.Coordinates = index, coords

return nil
})
}
2 changes: 1 addition & 1 deletion agent/consul/federation_state_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs

reply.Index = index
if fedState == nil {
return nil
return errNotFound
}

reply.State = fedState
Expand Down
15 changes: 5 additions & 10 deletions agent/consul/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,18 +160,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er
}

if ent == nil {
// Must provide non-zero index to prevent blocking
// Index 1 is impossible anyways (due to Raft internals)
if index == 0 {
reply.Index = 1
} else {
reply.Index = index
}
reply.Index = index
reply.Entries = nil
} else {
reply.Index = ent.ModifyIndex
reply.Entries = structs.DirEntries{ent}
return errNotFound
}

reply.Index = ent.ModifyIndex
reply.Entries = structs.DirEntries{ent}
return nil
})
}
Expand Down
17 changes: 16 additions & 1 deletion agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,9 @@ func (s *Server) blockingQuery(
var ws memdb.WatchSet
err := query(ws, s.fsm.State())
s.setQueryMeta(responseMeta, opts.GetToken())
if errors.Is(err, errNotFound) {
return nil
}
return err
}

Expand All @@ -995,6 +998,8 @@ func (s *Server) blockingQuery(
// decrement the count when the function returns.
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))

var notFound bool

for {
if opts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
Expand All @@ -1014,7 +1019,15 @@ func (s *Server) blockingQuery(

err := query(ws, state)
s.setQueryMeta(responseMeta, opts.GetToken())
if err != nil {
switch {
case errors.Is(err, errNotFound):
if notFound {
// query result has not changed
minQueryIndex = responseMeta.GetIndex()
}

notFound = true
case err != nil:
return err
}

Expand All @@ -1037,6 +1050,8 @@ func (s *Server) blockingQuery(
}
}

var errNotFound = fmt.Errorf("no data found for query")

// setQueryMeta is used to populate the QueryMeta data for an RPC call
//
// Note: This method must be called *after* filtering query results with ACLs.
Expand Down
93 changes: 89 additions & 4 deletions agent/consul/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,9 @@ func (m *MockSink) Close() error {
return nil
}

func TestRPC_blockingQuery(t *testing.T) {
func TestServer_blockingQuery(t *testing.T) {
t.Parallel()
dir, s := testServer(t)
defer os.RemoveAll(dir)
defer s.Shutdown()
_, s := testServerWithConfig(t)

// Perform a non-blocking query. Note that it's significant that the meta has
// a zero index in response - the implied opts.MinQueryIndex is also zero but
Expand Down Expand Up @@ -391,6 +389,93 @@ func TestRPC_blockingQuery(t *testing.T) {
require.NoError(t, err)
require.True(t, meta.ResultsFilteredByACLs, "ResultsFilteredByACLs should be honored for authenticated calls")
})

t.Run("non-blocking query for item that does not exist", func(t *testing.T) {
opts := structs.QueryOptions{}
meta := structs.QueryMeta{}
calls := 0
fn := func(_ memdb.WatchSet, _ *state.Store) error {
calls++
return errNotFound
}

err := s.blockingQuery(&opts, &meta, fn)
require.NoError(t, err)
require.Equal(t, 1, calls)
})

t.Run("blocking query for item that does not exist", func(t *testing.T) {
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
meta := structs.QueryMeta{}
calls := 0
fn := func(ws memdb.WatchSet, _ *state.Store) error {
calls++
if calls == 1 {
meta.Index = 3

ch := make(chan struct{})
close(ch)
ws.Add(ch)
return errNotFound
}
meta.Index = 5
return errNotFound
}

err := s.blockingQuery(&opts, &meta, fn)
require.NoError(t, err)
require.Equal(t, 2, calls)
})

t.Run("blocking query for item that existed and is removed", func(t *testing.T) {
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
meta := structs.QueryMeta{}
calls := 0
fn := func(ws memdb.WatchSet, _ *state.Store) error {
calls++
if calls == 1 {
meta.Index = 3

ch := make(chan struct{})
close(ch)
ws.Add(ch)
return nil
}
meta.Index = 5
return errNotFound
}

start := time.Now()
err := s.blockingQuery(&opts, &meta, fn)
require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out")
require.NoError(t, err)
require.Equal(t, 2, calls)
})

t.Run("blocking query for non-existent item that is created", func(t *testing.T) {
opts := structs.QueryOptions{MinQueryIndex: 3, MaxQueryTime: 100 * time.Millisecond}
meta := structs.QueryMeta{}
calls := 0
fn := func(ws memdb.WatchSet, _ *state.Store) error {
calls++
if calls == 1 {
meta.Index = 3

ch := make(chan struct{})
close(ch)
ws.Add(ch)
return errNotFound
}
meta.Index = 5
return nil
}

start := time.Now()
err := s.blockingQuery(&opts, &meta, fn)
require.True(t, time.Since(start) < opts.MaxQueryTime, "query timed out")
require.NoError(t, err)
require.Equal(t, 2, calls)
})
}

func TestRPC_ReadyForConsistentReads(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions agent/consul/session_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest,
reply.Sessions = structs.Sessions{session}
} else {
reply.Sessions = nil
return errNotFound
}
s.srv.filterACLWithAuthorizer(authz, reply)
return nil
Expand Down

0 comments on commit 85ecbaf

Please sign in to comment.