From 49710701929b16f2ab21171c930591018fbc56e5 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Mon, 17 Oct 2022 14:41:23 -0700 Subject: [PATCH] review comments --- balancer/balancer.go | 4 ++-- balancer_conn_wrappers.go | 16 +++++++--------- orca/producer.go | 27 +++++++++++++++------------ 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/balancer/balancer.go b/balancer/balancer.go index 74646455b0c..392b21fb2d8 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -381,11 +381,11 @@ var ErrBadResolverState = errors.New("bad resolver state") // SubConn to create producers when needed. type ProducerBuilder interface { // Build creates a Producer. The first parameter is always a - // grpc.InvokerStreamer (a type to allow creating RPCs/streams on the + // grpc.ClientConnInterface (a type to allow creating RPCs/streams on the // associated SubConn), but is declared as interface{} to avoid a // dependency cycle. Should also return a close function that will be // called when all references to the Producer have been given up. - Build(grpcInvokerStreamer interface{}) (p Producer, close func()) + Build(grpcClientConnInterface interface{}) (p Producer, close func()) } // A Producer is a type shared among potentially many consumers. It is diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 0d5ccc2623e..91b795c6cd8 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -308,7 +308,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) return nil, err } - acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*producerData)} + acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)} acbw.ac.mu.Lock() ac.acbw = acbw acbw.ac.mu.Unlock() @@ -364,7 +364,7 @@ func (ccb *ccBalancerWrapper) Target() string { type acBalancerWrapper struct { mu sync.Mutex ac *addrConn - producers map[balancer.ProducerBuilder]*producerData + producers map[balancer.ProducerBuilder]*refCountedProducer } func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) { @@ -444,12 +444,10 @@ func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args i return cs.RecvMsg(reply) } -// producerData stores a producer, a ref counting mechanism, and a close -// function to be called when the producer no longer has any references. -type producerData struct { +type refCountedProducer struct { producer balancer.Producer - refs int - close func() + refs int // number of current refs to the producer + close func() // underlying producer's close function } func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) { @@ -461,14 +459,14 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) ( if pData == nil { // Not found; create a new one and add it to the producers map. p, close := pb.Build(acbw) - pData = &producerData{producer: p, close: close} + pData = &refCountedProducer{producer: p, close: close} acbw.producers[pb] = pData } // Account for this new reference. pData.refs++ // Return a cleanup function wrapped in a sync.Once to remove this - // reference and delete the producerData from the map if the total + // reference and delete the refCountedProducer from the map if the total // reference count goes to zero. unref := func() { acbw.mu.Lock() diff --git a/orca/producer.go b/orca/producer.go index af44e6d7546..6c724845c54 100644 --- a/orca/producer.go +++ b/orca/producer.go @@ -63,16 +63,17 @@ type OOBListener interface { // OOBListenerOptions contains options to control how an OOBListener is called. type OOBListenerOptions struct { - // How often to request the server to provide a load report. May be - // provided less frequently if the server requires a longer interval, or - // may be provided more frequently if another subscriber requests a shorter - // interval. + // ReportInterval specifies how often to request the server to provide a + // load report. May be provided less frequently if the server requires a + // longer interval, or may be provided more frequently if another + // subscriber requests a shorter interval. ReportInterval time.Duration } // RegisterOOBListener registers an out-of-band load report listener on sc. // Any OOBListener may only be registered once per subchannel at a time. The -// returned stop function must be called when no longer needed. +// returned stop function must be called when no longer needed. Do not +// register a single OOBListener more than once per SubConn. func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOptions) (stop func()) { pr, close := sc.GetOrBuildProducer(producerBuilderSingleton) p := pr.(*producer) @@ -95,9 +96,11 @@ func RegisterOOBListener(sc balancer.SubConn, l OOBListener, opts OOBListenerOpt type producer struct { client v3orcaservicegrpc.OpenRcaServiceClient - ctx context.Context - cancel func() - closed *grpcsync.Event // fired when closure completes + closed *grpcsync.Event // fired when closure completes + // backoff is called between stream attempts to determine how long to delay + // to avoid overloading a server experiencing problems. The attempt count + // is incremented when stream errors occur and is reset when the stream + // reports a result. backoff func(int) time.Duration mu sync.Mutex @@ -130,9 +133,9 @@ func (p *producer) unregisterListener(l OOBListener, interval time.Duration) { func (p *producer) minInterval() time.Duration { p.mu.Lock() defer p.mu.Unlock() - min := time.Duration(-1) - for t := range p.intervals { - if min == time.Duration(-1) || t < min { + var min time.Duration + for i, t := range p.intervals { + if t < min || i == 0 { min = t } } @@ -197,7 +200,7 @@ func (p *producer) runStream(ctx context.Context) (resetBackoff bool, err error) ReportInterval: durationpb.New(interval), }) if err != nil { - return resetBackoff, err + return false, err } for {