New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ringhash: allow setting request hash key explicitly #7170
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -94,6 +94,21 @@ func (s) TestParseConfig(t *testing.T) { | |
want: nil, | ||
wantErr: true, | ||
}, | ||
{ | ||
name: "request metadata key set", | ||
js: `{"request_metadata_key": "x-foo"}`, | ||
want: &LBConfig{ | ||
MinRingSize: defaultMinSize, | ||
MaxRingSize: defaultMaxSize, | ||
RequestMetadataKey: "x-foo", | ||
}, | ||
}, | ||
{ | ||
name: "invalid request metadata keys", | ||
js: `{"request_metadata_key": "!invalid"}`, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a case rejecting key referencing a binary header please |
||
want: nil, | ||
wantErr: true, | ||
}, | ||
} | ||
for _, tt := range tests { | ||
t.Run(tt.name, func(t *testing.T) { | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -19,43 +19,52 @@ | |||||||||||||||||||||||||||
package ringhash | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||
"fmt" | ||||||||||||||||||||||||||||
"errors" | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
"google.golang.org/grpc/balancer" | ||||||||||||||||||||||||||||
"google.golang.org/grpc/codes" | ||||||||||||||||||||||||||||
"google.golang.org/grpc/connectivity" | ||||||||||||||||||||||||||||
"google.golang.org/grpc/internal/grpclog" | ||||||||||||||||||||||||||||
"google.golang.org/grpc/internal/grpcrand" | ||||||||||||||||||||||||||||
"google.golang.org/grpc/status" | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
var ( | ||||||||||||||||||||||||||||
errNoSubConnReady = errors.New("no connection is Ready") | ||||||||||||||||||||||||||||
errSingleSubConnInTransientFailure = errors.New("the only SubConn is in Transient Failure") | ||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
type picker struct { | ||||||||||||||||||||||||||||
ring *ring | ||||||||||||||||||||||||||||
logger *grpclog.PrefixLogger | ||||||||||||||||||||||||||||
subConnStates map[*subConn]connectivity.State | ||||||||||||||||||||||||||||
ring *ring | ||||||||||||||||||||||||||||
logger *grpclog.PrefixLogger | ||||||||||||||||||||||||||||
subConnStates map[*subConn]connectivity.State | ||||||||||||||||||||||||||||
requestHashKey string | ||||||||||||||||||||||||||||
randuint64 func() uint64 // overridable for testing | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker { | ||||||||||||||||||||||||||||
func newPicker(ring *ring, requestHashKey string, logger *grpclog.PrefixLogger) *picker { | ||||||||||||||||||||||||||||
states := make(map[*subConn]connectivity.State) | ||||||||||||||||||||||||||||
for _, e := range ring.items { | ||||||||||||||||||||||||||||
states[e.sc] = e.sc.effectiveState() | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
return &picker{ring: ring, logger: logger, subConnStates: states} | ||||||||||||||||||||||||||||
return &picker{ring: ring, logger: logger, subConnStates: states, requestHashKey: requestHashKey, randuint64: grpcrand.Uint64} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// handleRICSResult is the return type of handleRICS. It's needed to wrap the | ||||||||||||||||||||||||||||
// returned error from Pick() in a struct. With this, if the return values are | ||||||||||||||||||||||||||||
// `balancer.PickResult, error, bool`, linter complains because error is not the | ||||||||||||||||||||||||||||
// last return value. | ||||||||||||||||||||||||||||
// returned error from Pick() in a struct and whether we triggered a connection | ||||||||||||||||||||||||||||
// attempt. Without this, the return values would be `balancer.PickResult, bool, error, bool`, | ||||||||||||||||||||||||||||
// and linter would complain because error is not the last return value. | ||||||||||||||||||||||||||||
Comment on lines
53
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. line wrap please :)
Suggested change
|
||||||||||||||||||||||||||||
type handleRICSResult struct { | ||||||||||||||||||||||||||||
pr balancer.PickResult | ||||||||||||||||||||||||||||
err error | ||||||||||||||||||||||||||||
pr balancer.PickResult | ||||||||||||||||||||||||||||
triggeredConnect bool | ||||||||||||||||||||||||||||
err error | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// handleRICS generates pick result if the entry is in Ready, Idle, Connecting | ||||||||||||||||||||||||||||
// or Shutdown. TransientFailure will be handled specifically after this | ||||||||||||||||||||||||||||
// function returns. | ||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||
// The first return value indicates if the state is in Ready, Idle, Connecting | ||||||||||||||||||||||||||||
// The second return value indicates if the state is in Ready, Idle, Connecting | ||||||||||||||||||||||||||||
// or Shutdown. If it's true, the PickResult and error should be returned from | ||||||||||||||||||||||||||||
// Pick() as is. | ||||||||||||||||||||||||||||
func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { | ||||||||||||||||||||||||||||
|
@@ -65,7 +74,7 @@ func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { | |||||||||||||||||||||||||||
case connectivity.Idle: | ||||||||||||||||||||||||||||
// Trigger Connect() and queue the pick. | ||||||||||||||||||||||||||||
e.sc.queueConnect() | ||||||||||||||||||||||||||||
return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true | ||||||||||||||||||||||||||||
return handleRICSResult{triggeredConnect: true, err: balancer.ErrNoSubConnAvailable}, true | ||||||||||||||||||||||||||||
case connectivity.Connecting: | ||||||||||||||||||||||||||||
return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true | ||||||||||||||||||||||||||||
case connectivity.TransientFailure: | ||||||||||||||||||||||||||||
|
@@ -84,23 +93,45 @@ func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { | |||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { | ||||||||||||||||||||||||||||
e := p.ring.pick(getRequestHash(info.Ctx)) | ||||||||||||||||||||||||||||
h, usingRandomHash := getRequestHash(info.Ctx, p.requestHashKey) | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer the name There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also if |
||||||||||||||||||||||||||||
if usingRandomHash { | ||||||||||||||||||||||||||||
h = p.randuint64() | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
e := p.ring.pick(h) | ||||||||||||||||||||||||||||
if hr, ok := p.handleRICS(e); ok { | ||||||||||||||||||||||||||||
if usingRandomHash && hr.triggeredConnect { | ||||||||||||||||||||||||||||
// "If the use of this random hash triggers a connection attempt | ||||||||||||||||||||||||||||
// (...), then before queuing the pick, the picker will scan forward | ||||||||||||||||||||||||||||
// searching for a subchannel in `READY` state. If it finds a | ||||||||||||||||||||||||||||
// subchannel in `READY` state, the picker returns it." - A76 | ||||||||||||||||||||||||||||
if p := p.nextReadySubConn(e); p != nil { | ||||||||||||||||||||||||||||
return balancer.PickResult{SubConn: p}, nil | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
return hr.pr, hr.err | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
// ok was false, the entry is in transient failure. | ||||||||||||||||||||||||||||
return p.handleTransientFailure(e) | ||||||||||||||||||||||||||||
return p.handleTransientFailure(e, usingRandomHash) | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, error) { | ||||||||||||||||||||||||||||
func (p *picker) handleTransientFailure(e *ringEntry, usingRandomHash bool) (balancer.PickResult, error) { | ||||||||||||||||||||||||||||
// Queue a connect on the first picked SubConn. | ||||||||||||||||||||||||||||
e.sc.queueConnect() | ||||||||||||||||||||||||||||
if usingRandomHash { | ||||||||||||||||||||||||||||
// "If the use of this random hash triggers a connection attempt | ||||||||||||||||||||||||||||
// (...), then before queuing the pick, the picker will scan forward | ||||||||||||||||||||||||||||
// searching for a subchannel in `READY` state. If it finds a | ||||||||||||||||||||||||||||
// subchannel in `READY` state, the picker returns it." - A76 | ||||||||||||||||||||||||||||
if p := p.nextReadySubConn(e); p != nil { | ||||||||||||||||||||||||||||
return balancer.PickResult{SubConn: p}, nil | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// Find next entry in the ring, skipping duplicate SubConns. | ||||||||||||||||||||||||||||
e2 := nextSkippingDuplicates(p.ring, e) | ||||||||||||||||||||||||||||
if e2 == nil { | ||||||||||||||||||||||||||||
// There's no next entry available, fail the pick. | ||||||||||||||||||||||||||||
return balancer.PickResult{}, fmt.Errorf("the only SubConn is in Transient Failure") | ||||||||||||||||||||||||||||
return balancer.PickResult{}, errSingleSubConnInTransientFailure | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// For the second SubConn, also check Ready/Idle/Connecting as if it's the | ||||||||||||||||||||||||||||
|
@@ -145,7 +176,19 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro | |||||||||||||||||||||||||||
ee.sc.queueConnect() | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
return balancer.PickResult{}, fmt.Errorf("no connection is Ready") | ||||||||||||||||||||||||||||
return balancer.PickResult{}, errNoSubConnReady | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// nextReadySubConn returns the first entry after e that has its | ||||||||||||||||||||||||||||
// subconn in READY state. If no such entry is found, a PickResult with a | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems incomplete. Did you mean?
Suggested change
|
||||||||||||||||||||||||||||
func (p *picker) nextReadySubConn(e *ringEntry) balancer.SubConn { | ||||||||||||||||||||||||||||
for i := range p.ring.items { | ||||||||||||||||||||||||||||
e := p.ring.items[(e.idx+i)%len(p.ring.items)] | ||||||||||||||||||||||||||||
if e.sc.state == connectivity.Ready { | ||||||||||||||||||||||||||||
return e.sc.sc | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||
Comment on lines
+185
to
+191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about we follow a similar pattern and do something like that:
Suggested change
|
||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// nextSkippingDuplicates finds the next entry in the ring, with a different | ||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per A76:
Something's off here, ValidatePair currently does not err for keys suffixed with
-bin
. Should we add that additional validation here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
optional: Also, for readability could we maybe split
ValidatePair
tomd.ValidateKey
andmd.ValidateValue
methods and call justValidateKey
from here so that we dont pass in a""
here?