Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ringhash: allow setting request hash key explicitly #7170

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions xds/internal/balancer/ringhash/config.go
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"

"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/serviceconfig"
)

Expand All @@ -32,6 +33,8 @@ type LBConfig struct {

MinRingSize uint64 `json:"minRingSize,omitempty"`
MaxRingSize uint64 `json:"maxRingSize,omitempty"`

RequestMetadataKey string `json:"request_metadata_key,omitempty"`
}

const (
Expand Down Expand Up @@ -66,5 +69,10 @@ func parseConfig(c json.RawMessage) (*LBConfig, error) {
if cfg.MaxRingSize > envconfig.RingHashCap {
cfg.MaxRingSize = envconfig.RingHashCap
}
if cfg.RequestMetadataKey != "" {
if err := metadata.ValidatePair(cfg.RequestMetadataKey, ""); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per A76:

If the request_hash_header refers to a binary header (suffixed with -bin), the configuration is also rejected.

Something's off here, ValidatePair currently does not err for keys suffixed with -bin. Should we add that additional validation here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: Also, for readability could we maybe split ValidatePair to md.ValidateKey and md.ValidateValue methods and call just ValidateKey from here so that we dont pass in a "" here?

return nil, fmt.Errorf("invalid request_metadata_key %q: %w", cfg.RequestMetadataKey, err)
}
}
return &cfg, nil
}
15 changes: 15 additions & 0 deletions xds/internal/balancer/ringhash/config_test.go
Expand Up @@ -94,6 +94,21 @@ func (s) TestParseConfig(t *testing.T) {
want: nil,
wantErr: true,
},
{
name: "request metadata key set",
js: `{"request_metadata_key": "x-foo"}`,
want: &LBConfig{
MinRingSize: defaultMinSize,
MaxRingSize: defaultMaxSize,
RequestMetadataKey: "x-foo",
},
},
{
name: "invalid request metadata keys",
js: `{"request_metadata_key": "!invalid"}`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a case rejecting key referencing a binary header please

want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
79 changes: 61 additions & 18 deletions xds/internal/balancer/ringhash/picker.go
Expand Up @@ -19,43 +19,52 @@
package ringhash

import (
"fmt"
"errors"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/status"
)

var (
errNoSubConnReady = errors.New("no connection is Ready")
errSingleSubConnInTransientFailure = errors.New("the only SubConn is in Transient Failure")
)

type picker struct {
ring *ring
logger *grpclog.PrefixLogger
subConnStates map[*subConn]connectivity.State
ring *ring
logger *grpclog.PrefixLogger
subConnStates map[*subConn]connectivity.State
requestHashKey string
randuint64 func() uint64 // overridable for testing
}

func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker {
func newPicker(ring *ring, requestHashKey string, logger *grpclog.PrefixLogger) *picker {
states := make(map[*subConn]connectivity.State)
for _, e := range ring.items {
states[e.sc] = e.sc.effectiveState()
}
return &picker{ring: ring, logger: logger, subConnStates: states}
return &picker{ring: ring, logger: logger, subConnStates: states, requestHashKey: requestHashKey, randuint64: grpcrand.Uint64}
}

// handleRICSResult is the return type of handleRICS. It's needed to wrap the
// returned error from Pick() in a struct. With this, if the return values are
// `balancer.PickResult, error, bool`, linter complains because error is not the
// last return value.
// returned error from Pick() in a struct and whether we triggered a connection
// attempt. Without this, the return values would be `balancer.PickResult, bool, error, bool`,
// and linter would complain because error is not the last return value.
Comment on lines 53 to +56
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line wrap please :)

Suggested change
// handleRICSResult is the return type of handleRICS. It's needed to wrap the
// returned error from Pick() in a struct. With this, if the return values are
// `balancer.PickResult, error, bool`, linter complains because error is not the
// last return value.
// returned error from Pick() in a struct and whether we triggered a connection
// attempt. Without this, the return values would be `balancer.PickResult, bool, error, bool`,
// and linter would complain because error is not the last return value.
// handleRICSResult is the return type of handleRICS. It's needed to wrap the
// returned error from Pick() in a struct and whether we triggered a connection
// attempt. Without this, the return values would be `balancer.PickResult, bool,
// error, bool`, and linter would complain because error is not the last return
// value.

type handleRICSResult struct {
pr balancer.PickResult
err error
pr balancer.PickResult
triggeredConnect bool
err error
}

// handleRICS generates pick result if the entry is in Ready, Idle, Connecting
// or Shutdown. TransientFailure will be handled specifically after this
// function returns.
//
// The first return value indicates if the state is in Ready, Idle, Connecting
// The second return value indicates if the state is in Ready, Idle, Connecting
// or Shutdown. If it's true, the PickResult and error should be returned from
// Pick() as is.
func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
Expand All @@ -65,7 +74,7 @@ func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
case connectivity.Idle:
// Trigger Connect() and queue the pick.
e.sc.queueConnect()
return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
return handleRICSResult{triggeredConnect: true, err: balancer.ErrNoSubConnAvailable}, true
case connectivity.Connecting:
return handleRICSResult{err: balancer.ErrNoSubConnAvailable}, true
case connectivity.TransientFailure:
Expand All @@ -84,23 +93,45 @@ func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
}

func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
e := p.ring.pick(getRequestHash(info.Ctx))
h, usingRandomHash := getRequestHash(info.Ctx, p.requestHashKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer the name usesRandomHash

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also if getRequestHash is made a method with the pointer receiver, then you could actually merge the logic to assign h = p.randunit64 inside getRequestHash? wdyt?

if usingRandomHash {
h = p.randuint64()
}
e := p.ring.pick(h)
if hr, ok := p.handleRICS(e); ok {
if usingRandomHash && hr.triggeredConnect {
// "If the use of this random hash triggers a connection attempt
// (...), then before queuing the pick, the picker will scan forward
// searching for a subchannel in `READY` state. If it finds a
// subchannel in `READY` state, the picker returns it." - A76
if p := p.nextReadySubConn(e); p != nil {
return balancer.PickResult{SubConn: p}, nil
}
}
return hr.pr, hr.err
}
// ok was false, the entry is in transient failure.
return p.handleTransientFailure(e)
return p.handleTransientFailure(e, usingRandomHash)
}

func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, error) {
func (p *picker) handleTransientFailure(e *ringEntry, usingRandomHash bool) (balancer.PickResult, error) {
// Queue a connect on the first picked SubConn.
e.sc.queueConnect()
if usingRandomHash {
// "If the use of this random hash triggers a connection attempt
// (...), then before queuing the pick, the picker will scan forward
// searching for a subchannel in `READY` state. If it finds a
// subchannel in `READY` state, the picker returns it." - A76
if p := p.nextReadySubConn(e); p != nil {
return balancer.PickResult{SubConn: p}, nil
}
}

// Find next entry in the ring, skipping duplicate SubConns.
e2 := nextSkippingDuplicates(p.ring, e)
if e2 == nil {
// There's no next entry available, fail the pick.
return balancer.PickResult{}, fmt.Errorf("the only SubConn is in Transient Failure")
return balancer.PickResult{}, errSingleSubConnInTransientFailure
}

// For the second SubConn, also check Ready/Idle/Connecting as if it's the
Expand Down Expand Up @@ -145,7 +176,19 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro
ee.sc.queueConnect()
}
}
return balancer.PickResult{}, fmt.Errorf("no connection is Ready")
return balancer.PickResult{}, errNoSubConnReady
}

// nextReadySubConn returns the first entry after e that has its
// subconn in READY state. If no such entry is found, a PickResult with a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems incomplete. Did you mean?

Suggested change
// subconn in READY state. If no such entry is found, a PickResult with a
// subconn in READY state. If no such entry is found, returns nil.

func (p *picker) nextReadySubConn(e *ringEntry) balancer.SubConn {
for i := range p.ring.items {
e := p.ring.items[(e.idx+i)%len(p.ring.items)]
if e.sc.state == connectivity.Ready {
return e.sc.sc
}
}
return nil
Comment on lines +185 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we follow a similar pattern and do something like that:

Suggested change
for i := range p.ring.items {
e := p.ring.items[(e.idx+i)%len(p.ring.items)]
if e.sc.state == connectivity.Ready {
return e.sc.sc
}
}
return nil
for next := p.ring.next(e); next != e; next = ring.next(next) {
if next.sc.state == connectivity.Ready {
return next.sc.sc
}
}
return nil

}

// nextSkippingDuplicates finds the next entry in the ring, with a different
Expand Down
105 changes: 96 additions & 9 deletions xds/internal/balancer/ringhash/picker_test.go
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/grpclog"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/metadata"
)

var testSubConns []*testutils.TestSubConn
Expand Down Expand Up @@ -57,7 +58,7 @@ func newTestRing(cStats []connectivity.State) *ring {
return &ring{items: items}
}

func (s) TestPickerPickFirstTwo(t *testing.T) {
func (s) TestXdsPickerPickFirstTwo(t *testing.T) {
tests := []struct {
name string
ring *ring
Expand Down Expand Up @@ -104,12 +105,98 @@ func (s) TestPickerPickFirstTwo(t *testing.T) {
wantErr: balancer.ErrNoSubConnAvailable,
wantSCToConnect: testSubConns[1],
},
{
name: "single channel in TransientFailure",
ring: newTestRing([]connectivity.State{connectivity.TransientFailure}),
hash: 5,
wantErr: errSingleSubConnInTransientFailure,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := newPicker(tt.ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
p := newPicker(tt.ring, "", igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
got, err := p.Pick(balancer.PickInfo{
Ctx: SetXDSRequestHash(context.Background(), tt.hash),
})
if err != tt.wantErr {
t.Errorf("Pick() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got.SubConn != tt.wantSC {
t.Errorf("Pick() got = %v, want picked SubConn: %v", got, tt.wantSC)
}
if sc := tt.wantSCToConnect; sc != nil {
select {
case <-sc.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestShortTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc)
}
}
})
}
}

// TestPickerWithRequestHashKey tests that if an explicit request hash key is
// set, it will be used to pick a SubConn.
func (s) TestPickerWithRequestHashKey(t *testing.T) {
tests := []struct {
name string
values []string
ring *ring
wantSC balancer.SubConn
wantSCToConnect balancer.SubConn
wantErr error
}{
{
name: "hash key is not set, random picked an idle subconn, pick first ready SubConn",
ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready, connectivity.Ready}),
wantSC: testSubConns[3],
wantSCToConnect: testSubConns[0],
},
{
name: "hash key is not set, random picks an idle subconn, no subconn ready",
ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Shutdown}),
wantErr: balancer.ErrNoSubConnAvailable,
wantSCToConnect: testSubConns[0],
},
{
name: "hash key is not set, random picks a subconn in transient failure, pick first subchannel ready",
ring: newTestRing([]connectivity.State{connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready, connectivity.Ready}),
wantSC: testSubConns[3],
},
{
name: "hash key is not set, random picked a subconn in transient failure, no subchannel ready",
ring: newTestRing([]connectivity.State{connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Connecting}),
wantErr: errNoSubConnReady,
},
{
name: "hash key is set, return ready subconn",
values: []string{"test-value"}, // this hashes to the end of the test ring => ring entry 0 expected.
ring: newTestRing([]connectivity.State{connectivity.Ready, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready}),
wantSC: testSubConns[0],
},
{
name: "hash key is set, connect and queue the pick",
values: []string{"test-value"}, // this hashes to the end of the test ring => ring entry 0 expected.
ring: newTestRing([]connectivity.State{connectivity.Idle, connectivity.TransientFailure, connectivity.Connecting, connectivity.Ready}),
wantErr: balancer.ErrNoSubConnAvailable,
wantSCToConnect: testSubConns[0],
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requestHashKey := "test-key"
ring := tt.ring
p := newPicker(ring, requestHashKey, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
p.randuint64 = func() uint64 {
// always return the first entry on the ring.
return 5
}

md := metadata.New(nil)
md.Set("test-key", tt.values...)
got, err := p.Pick(balancer.PickInfo{
Ctx: SetRequestHash(context.Background(), tt.hash),
Ctx: metadata.NewOutgoingContext(context.Background(), md),
})
if err != tt.wantErr {
t.Errorf("Pick() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -137,8 +224,8 @@ func (s) TestPickerPickTriggerTFConnect(t *testing.T) {
connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure,
connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure,
})
p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
_, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
p := newPicker(ring, "", igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
_, err := p.Pick(balancer.PickInfo{Ctx: SetXDSRequestHash(context.Background(), 5)})
if err == nil {
t.Fatalf("Pick() error = %v, want non-nil", err)
}
Expand Down Expand Up @@ -167,8 +254,8 @@ func (s) TestPickerPickTriggerTFReturnReady(t *testing.T) {
ring := newTestRing([]connectivity.State{
connectivity.TransientFailure, connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Ready,
})
p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
pr, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
p := newPicker(ring, "", igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
pr, err := p.Pick(balancer.PickInfo{Ctx: SetXDSRequestHash(context.Background(), 5)})
if err != nil {
t.Fatalf("Pick() error = %v, want nil", err)
}
Expand All @@ -193,8 +280,8 @@ func (s) TestPickerPickTriggerTFWithIdle(t *testing.T) {
ring := newTestRing([]connectivity.State{
connectivity.TransientFailure, connectivity.TransientFailure, connectivity.Idle, connectivity.TransientFailure, connectivity.TransientFailure,
})
p := newPicker(ring, igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
_, err := p.Pick(balancer.PickInfo{Ctx: SetRequestHash(context.Background(), 5)})
p := newPicker(ring, "", igrpclog.NewPrefixLogger(grpclog.Component("xds"), "rh_test"))
_, err := p.Pick(balancer.PickInfo{Ctx: SetXDSRequestHash(context.Background(), 5)})
if err == balancer.ErrNoSubConnAvailable {
t.Fatalf("Pick() error = %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/ringhash/ringhash.go
Expand Up @@ -434,7 +434,7 @@ func (b *ringhashBalancer) regeneratePicker() {
b.picker = base.NewErrPicker(b.mergeErrors())
return
}
b.picker = newPicker(b.ring, b.logger)
b.picker = newPicker(b.ring, b.config.RequestMetadataKey, b.logger)
}

func (b *ringhashBalancer) Close() {
Expand Down