Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orca: create ORCA producer for LB policies to use to receive OOB load reports #5669

Merged
merged 9 commits into from Nov 3, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 46 additions & 36 deletions orca/producer_test.go
Expand Up @@ -62,38 +62,38 @@ type ccWrapper struct {

func (w *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
if len(addrs) != 1 {
panic(fmt.Sprintf("len(addrs) != 1; was: %v", addrs))
panic(fmt.Sprintf("got addrs=%v; want len(addrs) == 1", addrs))
}
sc, err := w.ClientConn.NewSubConn(addrs, opts)
if err != nil {
return sc, err
}
l := getListenerAndOptions(addrs[0])
l := getListenerInfo(addrs[0])
l.listener.cleanup = orca.RegisterOOBListener(sc, l.listener, l.opts)
l.sc = sc
return sc, nil
}

// listenerAndOptions is stored in an address's attributes to allow ORCA
// listenerInfo is stored in an address's attributes to allow ORCA
// listeners to be registered on subconns created for that address.
type listenerAndOptions struct {
type listenerInfo struct {
listener *testOOBListener
opts orca.OOBListenerOptions
sc balancer.SubConn // Set by the LB policy
}

type listenerAndOptionsKey struct{}
type listenerInfoKey struct{}

func setListenerAndOptions(addr resolver.Address, l *listenerAndOptions) resolver.Address {
addr.Attributes = addr.Attributes.WithValue(listenerAndOptionsKey{}, l)
func setListenerInfo(addr resolver.Address, l *listenerInfo) resolver.Address {
addr.Attributes = addr.Attributes.WithValue(listenerInfoKey{}, l)
return addr
}

func getListenerAndOptions(addr resolver.Address) *listenerAndOptions {
return addr.Attributes.Value(listenerAndOptionsKey{}).(*listenerAndOptions)
func getListenerInfo(addr resolver.Address) *listenerInfo {
return addr.Attributes.Value(listenerInfoKey{}).(*listenerInfo)
}

// testOOBListener is a simple listener that pushes reports to a channel.
// testOOBListener is a simple listener that pushes load reports to a channel.
type testOOBListener struct {
cleanup func()
loadReportCh chan *v3orcapb.OrcaLoadReport
Expand Down Expand Up @@ -143,8 +143,9 @@ func (s) TestProducer(t *testing.T) {
oobLis := newTestOOBListener()

lisOpts := orca.OOBListenerOptions{ReportInterval: 50 * time.Millisecond}
lao := &listenerAndOptions{listener: oobLis, opts: lisOpts}
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerAndOptions(resolver.Address{Addr: lis.Addr().String()}, lao)}})
lao := &listenerInfo{listener: oobLis, opts: lisOpts}
zasweq marked this conversation as resolved.
Show resolved Hide resolved
addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, lao)
r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
Expand Down Expand Up @@ -203,8 +204,10 @@ testReport:
}
}

// fakeORCAService is a simple implementation of an orca service that allows us
// to verify requests and send responses on demand.
// fakeORCAService is a simple implementation of an ORCA service that pushes
// requests it receives from clients to a channel and sends responses from a
// channel back. This allows tests to verify the client is sending requests
// and processing responses properly.
type fakeORCAService struct {
v3orcaservicegrpc.UnimplementedOpenRcaServiceServer

Expand Down Expand Up @@ -255,10 +258,14 @@ func (s) TestProducerBackoff(t *testing.T) {

// Provide a convenient way to expect backoff calls and return a minimal
// value.
expectedBackoff := -1 // -1 indicates any value is allowed.
const backoffShouldNotBeCalled = 9999 // Use to assert backoff function is not called.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get that explanation, but it's probably just me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explain in comments why we don't want the backoff function to be called when backoffShouldNotBeCalled is assigned to expectedBackoff below. (It's because the last stream was successful.)

const backoffAllowAny = -1 // Use to ignore any backoff calls.
expectedBackoff := backoffAllowAny
oldBackoff := internal.DefaultBackoffFunc
internal.DefaultBackoffFunc = func(got int) time.Duration {
if expectedBackoff != -1 {
if expectedBackoff == backoffShouldNotBeCalled {
t.Errorf("Unexpected backoff call; parameter = %v", got)
} else if expectedBackoff != backoffAllowAny {
if got != expectedBackoff {
t.Errorf("Unexpected backoff received; got %v want %v", got, expectedBackoff)
}
Expand All @@ -273,7 +280,7 @@ func (s) TestProducerBackoff(t *testing.T) {
t.Fatal(err)
}

// Register our fake ORCA service
// Register our fake ORCA service.
s := grpc.NewServer()
fake := newFakeORCAService()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rename this local var? fakeWhat? I was having trouble later in the test figuring out what this was a fake of.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's only one thing we are faking in this test. fakeORCAService is going to be really verbose and make everything harder to read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fos?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's far less intuitive to me.

defer fake.close()
Expand All @@ -291,7 +298,7 @@ func (s) TestProducerBackoff(t *testing.T) {
t.Errorf("Unexpected report interval; got %v want %v", got, interval)
}
case <-ctx.Done():
panic("Did not receive client request")
t.Fatalf("Did not receive client request")
}
}

Expand All @@ -300,8 +307,8 @@ func (s) TestProducerBackoff(t *testing.T) {
oobLis := newTestOOBListener()

lisOpts := orca.OOBListenerOptions{ReportInterval: reportInterval}
lao := &listenerAndOptions{listener: oobLis, opts: lisOpts}
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerAndOptions(resolver.Address{Addr: lis.Addr().String()}, lao)}})
lao := &listenerInfo{listener: oobLis, opts: lisOpts}
zasweq marked this conversation as resolved.
Show resolved Hide resolved
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, lao)}})
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
Expand All @@ -319,7 +326,7 @@ func (s) TestProducerBackoff(t *testing.T) {
Utilization: map[string]float64{"bob": 555},
}

// Unblock the fake
// Unblock the fake.
awaitRequest(reportInterval)
fake.respCh <- loadReportWant
select {
Expand All @@ -333,8 +340,9 @@ func (s) TestProducerBackoff(t *testing.T) {
t.Fatalf("timed out waiting for load report of: %v", loadReportWant)
}

// The next request should be immediate, since there was a message received.
expectedBackoff = 99 // This will fail if backoff is called with any other value.
// The next request should be immediate, since there was a message
// received.
expectedBackoff = backoffShouldNotBeCalled
fake.respCh <- status.Errorf(codes.Internal, "injected error")
awaitRequest(reportInterval)

Expand All @@ -348,7 +356,9 @@ func (s) TestProducerBackoff(t *testing.T) {
expectedBackoff = 2
fake.respCh <- status.Errorf(codes.Internal, "injected error")
awaitRequest(reportInterval)
expectedBackoff = 99 // This will fail if backoff is called with any other value.
// The next request should be immediate, since there was a message
// received.
expectedBackoff = backoffShouldNotBeCalled

// Send another valid response and wait for it on the client.
fake.respCh <- loadReportWant
Expand Down Expand Up @@ -385,7 +395,7 @@ func (s) TestProducerMultipleListeners(t *testing.T) {
t.Fatal(err)
}

// Register our fake ORCA service
// Register our fake ORCA service.
s := grpc.NewServer()
fake := newFakeORCAService()
defer fake.close()
Expand All @@ -405,16 +415,16 @@ func (s) TestProducerMultipleListeners(t *testing.T) {
t.Errorf("Unexpected report interval; got %v want %v", got, interval)
}
case <-ctx.Done():
panic("Did not receive client request")
t.Fatalf("Did not receive client request")
}
}

// Create our client with an OOB listener in the LB policy it selects.
r := manual.NewBuilderWithScheme("whatever")
oobLis1 := newTestOOBListener()
lisOpts1 := orca.OOBListenerOptions{ReportInterval: reportInterval1}
lao := &listenerAndOptions{listener: oobLis1, opts: lisOpts1}
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerAndOptions(resolver.Address{Addr: lis.Addr().String()}, lao)}})
lao := &listenerInfo{listener: oobLis1, opts: lisOpts1}
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, lao)}})
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
Expand Down Expand Up @@ -497,7 +507,7 @@ func (s) TestProducerMultipleListeners(t *testing.T) {
}

// Only 1 listener; expect reportInterval1 to be used and expect the report
// to be sent and distributed correctly.
// to be sent to the listener.
awaitRequest(reportInterval1)
fake.respCh <- loadReportWant
checkReports(1, 0, 0)
Expand All @@ -509,26 +519,26 @@ func (s) TestProducerMultipleListeners(t *testing.T) {
checkReports(2, 1, 0)

// Register listener 3 with a more frequent interval; stream is recreated
// with the lower interval after the next report is received. The first
// report will go to all three listeners.
// with this interval after the next report is received. The first report
// will go to all three listeners.
oobLis3.cleanup = orca.RegisterOOBListener(lao.sc, oobLis3, lisOpts3)
fake.respCh <- loadReportWant
checkReports(3, 2, 1)
awaitRequest(reportInterval3)

// A normal report should go to all three listeners.
// Another report without a change in listeners should go to all three listeners.
fake.respCh <- loadReportWant
zasweq marked this conversation as resolved.
Show resolved Hide resolved
checkReports(4, 3, 2)

// Stop listener 2. This does not affect the interval. The next update
// goes to listeners 1 and 3.
// Stop listener 2. This does not affect the interval as listener 3 is
// still the shortest. The next update goes to listeners 1 and 3.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and still has the shortest interval

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This level of review isn't important. The meaning of this comment is very clear, and it's an internal comment in a test, not in external documentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't hurt though. But I see what you're saying

oobLis2.Stop()
fake.respCh <- loadReportWant
checkReports(5, 3, 3)

// Stop listener 3. This makes the interval longer, with stream recreation
// delayed until the next report is received. Reports only go to listener
// 1 now.
// delayed until the next report is received. Reports should only go to
// listener 1 now.
oobLis3.Stop()
fake.respCh <- loadReportWant
checkReports(6, 3, 3)
Expand Down