@@ -39,12 +39,12 @@ type Client struct {
39
39
ctx context.Context
40
40
ctxCancel func ()
41
41
42
- rng func () float64
42
+ rng func (func ( * rand. Rand ))
43
43
44
44
brokersMu sync.RWMutex
45
45
brokers []* broker // ordered by broker ID
46
46
seeds atomic.Value // []*broker, seed brokers, also ordered by ID
47
- anyBrokerIdx int32
47
+ anyBrokerOrd [] int32 // shuffled brokers, for random ordering
48
48
anySeedIdx int32
49
49
stopBrokers bool // set to true on close to stop updateBrokers
50
50
@@ -462,13 +462,13 @@ func NewClient(opts ...Opt) (*Client, error) {
462
462
ctx : ctx ,
463
463
ctxCancel : cancel ,
464
464
465
- rng : func () func () float64 {
465
+ rng : func () func (func ( * rand. Rand )) {
466
466
var mu sync.Mutex
467
467
rng := rand .New (rand .NewSource (time .Now ().UnixNano ()))
468
- return func () float64 {
468
+ return func (fn func ( * rand. Rand )) {
469
469
mu .Lock ()
470
470
defer mu .Unlock ()
471
- return rng . Float64 ( )
471
+ fn ( rng )
472
472
}
473
473
}(),
474
474
@@ -733,33 +733,45 @@ func (c *connTimeouter) timeouts(req kmsg.Request) (r, w time.Duration) {
733
733
}
734
734
}
735
735
736
+ func (cl * Client ) reinitAnyBrokerOrd () {
737
+ cl .anyBrokerOrd = append (cl .anyBrokerOrd [:0 ], make ([]int32 , len (cl .brokers ))... )
738
+ for i := range cl .anyBrokerOrd {
739
+ cl .anyBrokerOrd [i ] = int32 (i )
740
+ }
741
+ cl .rng (func (r * rand.Rand ) {
742
+ r .Shuffle (len (cl .anyBrokerOrd ), func (i , j int ) {
743
+ cl .anyBrokerOrd [i ], cl .anyBrokerOrd [j ] = cl .anyBrokerOrd [j ], cl .anyBrokerOrd [i ]
744
+ })
745
+ })
746
+ }
747
+
736
748
// broker returns a random broker from all brokers ever known.
737
749
func (cl * Client ) broker () * broker {
738
- cl .brokersMu .Lock () // full lock needed for anyBrokerIdx below
750
+ cl .brokersMu .Lock ()
739
751
defer cl .brokersMu .Unlock ()
740
752
741
753
// Every time we loop through all discovered brokers, we issue one
742
754
// request to the next seed. This ensures that if all discovered
743
755
// brokers are down, we will *eventually* loop through seeds and
744
756
// hopefully have a reachable seed.
745
757
var b * broker
746
- if len (cl .brokers ) > 0 && int (cl .anyBrokerIdx ) < len (cl .brokers ) {
747
- cl .anyBrokerIdx %= int32 (len (cl .brokers ))
748
- b = cl .brokers [cl .anyBrokerIdx ]
749
- cl .anyBrokerIdx ++
750
- } else {
751
- seeds := cl .loadSeeds ()
752
- cl .anySeedIdx %= int32 (len (seeds ))
753
- b = seeds [cl .anySeedIdx ]
754
- cl .anySeedIdx ++
755
758
756
- // If we have brokers, we ranged past discovered brokers.
757
- // We now reset the anyBrokerIdx to begin ranging through
758
- // discovered brokers again.
759
- if len (cl .brokers ) > 0 {
760
- cl .anyBrokerIdx = 0
761
- }
759
+ if len (cl .anyBrokerOrd ) > 0 {
760
+ b = cl .brokers [cl .anyBrokerOrd [0 ]]
761
+ cl .anyBrokerOrd = cl .anyBrokerOrd [1 :]
762
+ return b
762
763
}
764
+
765
+ seeds := cl .loadSeeds ()
766
+ cl .anySeedIdx %= int32 (len (seeds ))
767
+ b = seeds [cl .anySeedIdx ]
768
+ cl .anySeedIdx ++
769
+
770
+ // If we have brokers, we ranged past discovered brokers.
771
+ // We now reset the anyBrokerOrd to begin ranging through
772
+ // discovered brokers again. If there are still no brokers,
773
+ // this reinit will do nothing and we will keep looping seeds.
774
+ cl .reinitAnyBrokerOrd ()
763
775
return b
764
776
}
765
777
@@ -946,6 +958,7 @@ func (cl *Client) updateBrokers(brokers []kmsg.MetadataResponseBroker) {
946
958
}
947
959
948
960
cl .brokers = newBrokers
961
+ cl .reinitAnyBrokerOrd ()
949
962
}
950
963
951
964
// CloseAllowingRebalance allows rebalances, leaves any group, and closes all
0 commit comments