Skip to content

Commit b2ccc2f

Browse files
committedOct 22, 2023
kgo: reintroduce random broker iteration
Random iteration was removed with 1e5c11d We can reintroduce random iteration easily enough, while still keeping the behavior of try-a-seed-occasionally. Closes #579.
1 parent 6a961da commit b2ccc2f

File tree

2 files changed

+38
-22
lines changed

2 files changed

+38
-22
lines changed
 

‎pkg/kgo/broker.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"io"
1010
"math"
11+
"math/rand"
1112
"net"
1213
"os"
1314
"strconv"
@@ -948,7 +949,9 @@ func (cxn *brokerCxn) doSasl(authenticate bool) error {
948949
if latencyMillis > minPessimismMillis {
949950
minPessimismMillis = latencyMillis
950951
}
951-
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*cxn.b.cl.rng()) // 95 to 98% of lifetime (pessimism 2% to 5%)
952+
var random float64
953+
cxn.b.cl.rng(func(r *rand.Rand) { random = r.Float64() })
954+
maxPessimismMillis := float64(lifetimeMillis) * (0.05 - 0.03*random) // 95 to 98% of lifetime (pessimism 2% to 5%)
952955

953956
// Our minimum lifetime is always 1s (or latency, if larger).
954957
// When our max pessimism becomes more than min pessimism,

‎pkg/kgo/client.go

+34-21
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ type Client struct {
3939
ctx context.Context
4040
ctxCancel func()
4141

42-
rng func() float64
42+
rng func(func(*rand.Rand))
4343

4444
brokersMu sync.RWMutex
4545
brokers []*broker // ordered by broker ID
4646
seeds atomic.Value // []*broker, seed brokers, also ordered by ID
47-
anyBrokerIdx int32
47+
anyBrokerOrd []int32 // shuffled brokers, for random ordering
4848
anySeedIdx int32
4949
stopBrokers bool // set to true on close to stop updateBrokers
5050

@@ -462,13 +462,13 @@ func NewClient(opts ...Opt) (*Client, error) {
462462
ctx: ctx,
463463
ctxCancel: cancel,
464464

465-
rng: func() func() float64 {
465+
rng: func() func(func(*rand.Rand)) {
466466
var mu sync.Mutex
467467
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
468-
return func() float64 {
468+
return func(fn func(*rand.Rand)) {
469469
mu.Lock()
470470
defer mu.Unlock()
471-
return rng.Float64()
471+
fn(rng)
472472
}
473473
}(),
474474

@@ -733,33 +733,45 @@ func (c *connTimeouter) timeouts(req kmsg.Request) (r, w time.Duration) {
733733
}
734734
}
735735

736+
func (cl *Client) reinitAnyBrokerOrd() {
737+
cl.anyBrokerOrd = append(cl.anyBrokerOrd[:0], make([]int32, len(cl.brokers))...)
738+
for i := range cl.anyBrokerOrd {
739+
cl.anyBrokerOrd[i] = int32(i)
740+
}
741+
cl.rng(func(r *rand.Rand) {
742+
r.Shuffle(len(cl.anyBrokerOrd), func(i, j int) {
743+
cl.anyBrokerOrd[i], cl.anyBrokerOrd[j] = cl.anyBrokerOrd[j], cl.anyBrokerOrd[i]
744+
})
745+
})
746+
}
747+
736748
// broker returns a random broker from all brokers ever known.
737749
func (cl *Client) broker() *broker {
738-
cl.brokersMu.Lock() // full lock needed for anyBrokerIdx below
750+
cl.brokersMu.Lock()
739751
defer cl.brokersMu.Unlock()
740752

741753
// Every time we loop through all discovered brokers, we issue one
742754
// request to the next seed. This ensures that if all discovered
743755
// brokers are down, we will *eventually* loop through seeds and
744756
// hopefully have a reachable seed.
745757
var b *broker
746-
if len(cl.brokers) > 0 && int(cl.anyBrokerIdx) < len(cl.brokers) {
747-
cl.anyBrokerIdx %= int32(len(cl.brokers))
748-
b = cl.brokers[cl.anyBrokerIdx]
749-
cl.anyBrokerIdx++
750-
} else {
751-
seeds := cl.loadSeeds()
752-
cl.anySeedIdx %= int32(len(seeds))
753-
b = seeds[cl.anySeedIdx]
754-
cl.anySeedIdx++
755758

756-
// If we have brokers, we ranged past discovered brokers.
757-
// We now reset the anyBrokerIdx to begin ranging through
758-
// discovered brokers again.
759-
if len(cl.brokers) > 0 {
760-
cl.anyBrokerIdx = 0
761-
}
759+
if len(cl.anyBrokerOrd) > 0 {
760+
b = cl.brokers[cl.anyBrokerOrd[0]]
761+
cl.anyBrokerOrd = cl.anyBrokerOrd[1:]
762+
return b
762763
}
764+
765+
seeds := cl.loadSeeds()
766+
cl.anySeedIdx %= int32(len(seeds))
767+
b = seeds[cl.anySeedIdx]
768+
cl.anySeedIdx++
769+
770+
// If we have brokers, we ranged past discovered brokers.
771+
// We now reset the anyBrokerOrd to begin ranging through
772+
// discovered brokers again. If there are still no brokers,
773+
// this reinit will do nothing and we will keep looping seeds.
774+
cl.reinitAnyBrokerOrd()
763775
return b
764776
}
765777

@@ -946,6 +958,7 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
946958
}
947959

948960
cl.brokers = newBrokers
961+
cl.reinitAnyBrokerOrd()
949962
}
950963

951964
// CloseAllowingRebalance allows rebalances, leaves any group, and closes all

0 commit comments

Comments
 (0)
Please sign in to comment.