-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rpc: make blocking queries for non-existent items more efficient #12110
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
```release-note:improvement | ||
rpc: improve blocking queries for items that do not exist, by continuing to block until they exist (or the timeout). | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -160,18 +160,13 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er | |
} | ||
|
||
if ent == nil { | ||
// Must provide non-zero index to prevent blocking | ||
// Index 1 is impossible anyways (due to Raft internals) | ||
if index == 0 { | ||
reply.Index = 1 | ||
} else { | ||
reply.Index = index | ||
} | ||
reply.Index = index | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Presumably this part of the change was intentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ya, this is now handled generally in |
||
reply.Entries = nil | ||
} else { | ||
reply.Index = ent.ModifyIndex | ||
reply.Entries = structs.DirEntries{ent} | ||
return errNotFound | ||
} | ||
|
||
reply.Index = ent.ModifyIndex | ||
reply.Entries = structs.DirEntries{ent} | ||
return nil | ||
}) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -983,6 +983,9 @@ func (s *Server) blockingQuery( | |
var ws memdb.WatchSet | ||
err := query(ws, s.fsm.State()) | ||
s.setQueryMeta(responseMeta, opts.GetToken()) | ||
if errors.Is(err, errNotFound) { | ||
return nil | ||
} | ||
return err | ||
} | ||
|
||
|
@@ -995,6 +998,8 @@ func (s *Server) blockingQuery( | |
// decrement the count when the function returns. | ||
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0)) | ||
|
||
var notFound bool | ||
|
||
for { | ||
if opts.GetRequireConsistent() { | ||
if err := s.consistentRead(); err != nil { | ||
|
@@ -1014,7 +1019,15 @@ func (s *Server) blockingQuery( | |
|
||
err := query(ws, state) | ||
s.setQueryMeta(responseMeta, opts.GetToken()) | ||
if err != nil { | ||
switch { | ||
case errors.Is(err, errNotFound): | ||
if notFound { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question! I believe what you are calling out is that on the first iteration of this loop we still rely on the index not having changed. If a write happens between the time the last blocking query returned and when the next blocking query request happens (which I think should generally only be a few milliseconds), then this optimization does not work. We continue to have the old behaviour of returning unnecessarily. Generally that should be ok because it only limits the optimization, should not impact correctness, and there is a very small window where a write must happen to cause this problem. There is a way to work around this limitation (could be a follow up), but it introduces some risk, so I thought it would be better to avoid. I haven't documented that anywhere yet (just some discussions in old slack threads), but I can do that here. The work around for this limitation would be to add a new
To answer the other part of this question, this is essential to the optimization. I think this is explained briefly in 9abb49da51ad87312a5f34364d8e6a3c0e35c328, but I'm happy to add more detail since this is the main reason for the change. As documented in #12343, there are two things to consider here: the "last modified index" set on We can't fix the So that leaves the index comparison ( What we need is a comparison we can use instead of If we think about this comparison more abstractly, I think we'll see that the question this is asking is "has the result of the query changed from the last result sent to the client". There are plenty of ways that question could be answered, using the modify index is a very cost effective one, so that's generally what we use. However as we've noticed, using that modify index doesn't work for cases where an item does not exist, because we don't have a stable modify index to compare against. In this case we use "not found" to indicate if the result has changed. Since we don't know the state from the client perspective ( In the line below this, when we see "state has not changed, the result is still not found", we update the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I opened #12363 for that potential future optimization. |
||
// query result has not changed | ||
minQueryIndex = responseMeta.GetIndex() | ||
} | ||
|
||
notFound = true | ||
case err != nil: | ||
return err | ||
} | ||
|
||
|
@@ -1037,6 +1050,8 @@ func (s *Server) blockingQuery( | |
} | ||
} | ||
|
||
var errNotFound = fmt.Errorf("no data found for query") | ||
|
||
// setQueryMeta is used to populate the QueryMeta data for an RPC call | ||
// | ||
// Note: This method must be called *after* filtering query results with ACLs. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the change that needs to be applied to all the other RPC endpoints to take advantage of this enhancement. Currently it is only implemented on this one
ConfigEntry.Get
endpoint.