From bdd88ac14631fe241797fcc3b0b0d429b4dd66c6 Mon Sep 17 00:00:00 2001 From: huangchong94 Date: Fri, 17 Dec 2021 22:37:46 +0800 Subject: [PATCH 1/8] wrr: improve randomWRR.Next performance * use binary search when weights are not equal * random pick one when weights are equal --- internal/wrr/random.go | 45 ++++++++++++++++++++++++++-------------- internal/wrr/wrr_test.go | 1 + 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/internal/wrr/random.go b/internal/wrr/random.go index ccf5113e9f3..4df9bd8fb9c 100644 --- a/internal/wrr/random.go +++ b/internal/wrr/random.go @@ -19,6 +19,7 @@ package wrr import ( "fmt" + "sort" "sync" "google.golang.org/grpc/internal/grpcrand" @@ -26,8 +27,13 @@ import ( // weightedItem is a wrapped weighted item that is used to implement weighted random algorithm. type weightedItem struct { - Item interface{} - Weight int64 + Item interface{} + // TODO Delete Weight? This field is not necessary for randomWRR to work. + // But without this field, if we want to know an item's weight in randomWRR.Add , we have to + // calculate it (i.e. weight = items.AccumulatedWeight - previousItem.AccumulatedWeight) + // which is a bit less concise than items.Weight + Weight int64 + AccumulatedWeight int64 } func (w *weightedItem) String() string { @@ -38,7 +44,7 @@ func (w *weightedItem) String() string { type randomWRR struct { mu sync.RWMutex items []*weightedItem - sumOfWeights int64 + equalWeights bool } // NewRandom creates a new WRR with random. @@ -47,31 +53,40 @@ func NewRandom() WRR { } var grpcrandInt63n = grpcrand.Int63n +var grpcrandIntn = grpcrand.Intn func (rw *randomWRR) Next() (item interface{}) { rw.mu.RLock() defer rw.mu.RUnlock() - if rw.sumOfWeights == 0 { + sumOfWeights := rw.items[len(rw.items)-1].AccumulatedWeight + if sumOfWeights == 0 { return nil } - // Random number in [0, sum). - randomWeight := grpcrandInt63n(rw.sumOfWeights) - for _, item := range rw.items { - randomWeight = randomWeight - item.Weight - if randomWeight < 0 { - return item.Item - } + if rw.equalWeights { + return rw.items[grpcrandIntn(len(rw.items))].Item } - - return rw.items[len(rw.items)-1].Item + // Random number in [0, sumOfWeights). + randomWeight := grpcrandInt63n(sumOfWeights) + // Item's accumulated weights are in ascending order, because item's weight >= 0. + // Binary search rw.items to find first item whose AccumulatedWeight > randomWeight + // The return i is guaranteed to be in range [0, len(rw.items)) because randomWeight < last item's AccumulatedWeight + i := sort.Search(len(rw.items), func(i int) bool { return rw.items[i].AccumulatedWeight > randomWeight }) + return rw.items[i].Item } func (rw *randomWRR) Add(item interface{}, weight int64) { rw.mu.Lock() defer rw.mu.Unlock() - rItem := &weightedItem{Item: item, Weight: weight} + accumulatedWeight := weight + equalWeights := true + if len(rw.items) > 0 { + lastItem := rw.items[len(rw.items)-1] + accumulatedWeight = lastItem.AccumulatedWeight + weight + equalWeights = rw.equalWeights && weight == lastItem.Weight + } + rw.equalWeights = equalWeights + rItem := &weightedItem{Item: item, Weight: weight, AccumulatedWeight: accumulatedWeight} rw.items = append(rw.items, rItem) - rw.sumOfWeights += weight } func (rw *randomWRR) String() string { diff --git a/internal/wrr/wrr_test.go b/internal/wrr/wrr_test.go index 4565e34ffb9..159e7091c54 100644 --- a/internal/wrr/wrr_test.go +++ b/internal/wrr/wrr_test.go @@ -115,4 +115,5 @@ func (s) TestEdfWrrNext(t *testing.T) { func init() { r := rand.New(rand.NewSource(0)) grpcrandInt63n = r.Int63n + grpcrandIntn = r.Intn } From f2009a0c7ab73f42c7e4dc4648711874aa8fe628 Mon Sep 17 00:00:00 2001 From: huangchong94 Date: Mon, 20 Dec 2021 10:39:44 +0800 Subject: [PATCH 2/8] wrr: use grpcrand.Int63n only --- internal/wrr/random.go | 3 +-- internal/wrr/wrr_test.go | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/wrr/random.go b/internal/wrr/random.go index 4df9bd8fb9c..3680fb99bef 100644 --- a/internal/wrr/random.go +++ b/internal/wrr/random.go @@ -53,7 +53,6 @@ func NewRandom() WRR { } var grpcrandInt63n = grpcrand.Int63n -var grpcrandIntn = grpcrand.Intn func (rw *randomWRR) Next() (item interface{}) { rw.mu.RLock() @@ -63,7 +62,7 @@ func (rw *randomWRR) Next() (item interface{}) { return nil } if rw.equalWeights { - return rw.items[grpcrandIntn(len(rw.items))].Item + return rw.items[grpcrandInt63n(int64(len(rw.items)))].Item } // Random number in [0, sumOfWeights). randomWeight := grpcrandInt63n(sumOfWeights) diff --git a/internal/wrr/wrr_test.go b/internal/wrr/wrr_test.go index 159e7091c54..4565e34ffb9 100644 --- a/internal/wrr/wrr_test.go +++ b/internal/wrr/wrr_test.go @@ -115,5 +115,4 @@ func (s) TestEdfWrrNext(t *testing.T) { func init() { r := rand.New(rand.NewSource(0)) grpcrandInt63n = r.Int63n - grpcrandIntn = r.Intn } From b52eec9c589511128847861258d2ad69fe081555 Mon Sep 17 00:00:00 2001 From: huangchong94 Date: Thu, 23 Dec 2021 12:30:04 +0800 Subject: [PATCH 3/8] wrr: add BenchmarkRandomWRRNext --- internal/wrr/wrr_test.go | 65 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/internal/wrr/wrr_test.go b/internal/wrr/wrr_test.go index 4565e34ffb9..957724efe89 100644 --- a/internal/wrr/wrr_test.go +++ b/internal/wrr/wrr_test.go @@ -21,6 +21,7 @@ import ( "errors" "math" "math/rand" + "strconv" "testing" "github.com/google/go-cmp/cmp" @@ -112,6 +113,70 @@ func (s) TestEdfWrrNext(t *testing.T) { testWRRNext(t, NewEDF) } +func BenchmarkRandomWRRNext(b *testing.B) { + for _, n := range []int{100, 500, 1000} { + b.Run("equal-weights-"+strconv.Itoa(n)+"-items", func(b *testing.B) { + w := NewRandom().(*randomWRR) + sumOfWeights := n + for i := 0; i < n; i++ { + w.Add(i, 1) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + for i := 0; i < sumOfWeights; i++ { + w.Next() + } + } + }) + } + + var maxWeight int64 = 1024 + for _, n := range []int{100, 500, 1000} { + b.Run("random-weights-"+strconv.Itoa(n)+"-items", func(b *testing.B) { + w := NewRandom() + var sumOfWeights int64 + for i := 0; i < n; i++ { + weight := rand.Int63n(maxWeight + 1) + w.Add(i, weight) + sumOfWeights += weight + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + for i := 0; i < int(sumOfWeights); i++ { + w.Next() + } + } + }) + } + + itemsNum := 200 + heavyWeight := int64(itemsNum) + lightWeight := int64(1) + heavyIndices := []int{0, itemsNum / 2, itemsNum - 1} + for _, heavyIndex := range heavyIndices { + b.Run("skew-weights-heavy-index-"+strconv.Itoa(heavyIndex), func(b *testing.B) { + w := NewRandom() + var sumOfWeights int64 + for i := 0; i < itemsNum; i++ { + var weight int64 + if i == heavyIndex { + weight = heavyWeight + } else { + weight = lightWeight + } + sumOfWeights += weight + w.Add(i, weight) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + for i := 0; i < int(sumOfWeights); i++ { + w.Next() + } + } + }) + } +} + func init() { r := rand.New(rand.NewSource(0)) grpcrandInt63n = r.Int63n From 8edbcca26ece748bc14601da5d83f16ce7c90f1e Mon Sep 17 00:00:00 2001 From: huangchong94 Date: Thu, 6 Jan 2022 11:50:50 +0800 Subject: [PATCH 4/8] remove Weight field in weightedItem, all fields in weightedItem become unexported; add comment for field equalWeights in randomWRR; fix panic when there is no item in randomWRR.Next; add test case for wrr; remove redundant type assertion in BenchmarkRandomWRRNext; --- internal/wrr/random.go | 35 ++++++++++++++++++----------------- internal/wrr/wrr_test.go | 13 ++++++++++++- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/internal/wrr/random.go b/internal/wrr/random.go index 3680fb99bef..5e7477a3d0b 100644 --- a/internal/wrr/random.go +++ b/internal/wrr/random.go @@ -27,13 +27,8 @@ import ( // weightedItem is a wrapped weighted item that is used to implement weighted random algorithm. type weightedItem struct { - Item interface{} - // TODO Delete Weight? This field is not necessary for randomWRR to work. - // But without this field, if we want to know an item's weight in randomWRR.Add , we have to - // calculate it (i.e. weight = items.AccumulatedWeight - previousItem.AccumulatedWeight) - // which is a bit less concise than items.Weight - Weight int64 - AccumulatedWeight int64 + item interface{} + accumulatedWeight int64 } func (w *weightedItem) String() string { @@ -44,6 +39,7 @@ func (w *weightedItem) String() string { type randomWRR struct { mu sync.RWMutex items []*weightedItem + // Are all item's weights equal equalWeights bool } @@ -57,20 +53,21 @@ var grpcrandInt63n = grpcrand.Int63n func (rw *randomWRR) Next() (item interface{}) { rw.mu.RLock() defer rw.mu.RUnlock() - sumOfWeights := rw.items[len(rw.items)-1].AccumulatedWeight - if sumOfWeights == 0 { + var sumOfWeights int64 + if len(rw.items) == 0 { return nil } + sumOfWeights = rw.items[len(rw.items)-1].accumulatedWeight if rw.equalWeights { - return rw.items[grpcrandInt63n(int64(len(rw.items)))].Item + return rw.items[grpcrandInt63n(int64(len(rw.items)))].item } // Random number in [0, sumOfWeights). randomWeight := grpcrandInt63n(sumOfWeights) // Item's accumulated weights are in ascending order, because item's weight >= 0. - // Binary search rw.items to find first item whose AccumulatedWeight > randomWeight - // The return i is guaranteed to be in range [0, len(rw.items)) because randomWeight < last item's AccumulatedWeight - i := sort.Search(len(rw.items), func(i int) bool { return rw.items[i].AccumulatedWeight > randomWeight }) - return rw.items[i].Item + // Binary search rw.items to find first item whose accumulatedWeight > randomWeight + // The return i is guaranteed to be in range [0, len(rw.items)) because randomWeight < last item's accumulatedWeight + i := sort.Search(len(rw.items), func(i int) bool { return rw.items[i].accumulatedWeight > randomWeight }) + return rw.items[i].item } func (rw *randomWRR) Add(item interface{}, weight int64) { @@ -80,11 +77,15 @@ func (rw *randomWRR) Add(item interface{}, weight int64) { equalWeights := true if len(rw.items) > 0 { lastItem := rw.items[len(rw.items)-1] - accumulatedWeight = lastItem.AccumulatedWeight + weight - equalWeights = rw.equalWeights && weight == lastItem.Weight + accumulatedWeight = lastItem.accumulatedWeight + weight + lastItemWeight := lastItem.accumulatedWeight + if len(rw.items) > 1 { + lastItemWeight = lastItem.accumulatedWeight - rw.items[len(rw.items)-2].accumulatedWeight + } + equalWeights = rw.equalWeights && weight == lastItemWeight } rw.equalWeights = equalWeights - rItem := &weightedItem{Item: item, Weight: weight, AccumulatedWeight: accumulatedWeight} + rItem := &weightedItem{item: item, accumulatedWeight: accumulatedWeight} rw.items = append(rw.items, rItem) } diff --git a/internal/wrr/wrr_test.go b/internal/wrr/wrr_test.go index 957724efe89..18dc790742e 100644 --- a/internal/wrr/wrr_test.go +++ b/internal/wrr/wrr_test.go @@ -71,6 +71,10 @@ func testWRRNext(t *testing.T, newWRR func() WRR) { name: "17-23-37", weights: []int64{17, 23, 37}, }, + { + name: "no items", + weights: []int64{}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -81,6 +85,13 @@ func testWRRNext(t *testing.T, newWRR func() WRR) { w.Add(i, weight) sumOfWeights += weight } + if len(tt.weights) == 0 { + if w.Next() != nil { + t.Fatalf("w.Next returns non nil value when there is no item") + } else { + return + } + } results := make(map[int]int) for i := 0; i < iterCount; i++ { @@ -116,7 +127,7 @@ func (s) TestEdfWrrNext(t *testing.T) { func BenchmarkRandomWRRNext(b *testing.B) { for _, n := range []int{100, 500, 1000} { b.Run("equal-weights-"+strconv.Itoa(n)+"-items", func(b *testing.B) { - w := NewRandom().(*randomWRR) + w := NewRandom() sumOfWeights := n for i := 0; i < n; i++ { w.Add(i, 1) From 88b9a1e6fe66bf7f491afee819bef42fc67c98e5 Mon Sep 17 00:00:00 2001 From: huangchong94 Date: Thu, 6 Jan 2022 11:55:17 +0800 Subject: [PATCH 5/8] format code --- internal/wrr/random.go | 6 +++--- internal/wrr/wrr_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/wrr/random.go b/internal/wrr/random.go index 5e7477a3d0b..5e5d6c1c912 100644 --- a/internal/wrr/random.go +++ b/internal/wrr/random.go @@ -27,7 +27,7 @@ import ( // weightedItem is a wrapped weighted item that is used to implement weighted random algorithm. type weightedItem struct { - item interface{} + item interface{} accumulatedWeight int64 } @@ -37,8 +37,8 @@ func (w *weightedItem) String() string { // randomWRR is a struct that contains weighted items implement weighted random algorithm. type randomWRR struct { - mu sync.RWMutex - items []*weightedItem + mu sync.RWMutex + items []*weightedItem // Are all item's weights equal equalWeights bool } diff --git a/internal/wrr/wrr_test.go b/internal/wrr/wrr_test.go index 18dc790742e..a9457501a66 100644 --- a/internal/wrr/wrr_test.go +++ b/internal/wrr/wrr_test.go @@ -72,7 +72,7 @@ func testWRRNext(t *testing.T, newWRR func() WRR) { weights: []int64{17, 23, 37}, }, { - name: "no items", + name: "no items", weights: []int64{}, }, } From 5c32a8d79debb9c45a65bb0cf64baca49d07591b Mon Sep 17 00:00:00 2001 From: huangchong94 Date: Thu, 6 Jan 2022 11:57:03 +0800 Subject: [PATCH 6/8] update randomWRR.Next --- internal/wrr/random.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/wrr/random.go b/internal/wrr/random.go index 5e5d6c1c912..befa3ab3b2f 100644 --- a/internal/wrr/random.go +++ b/internal/wrr/random.go @@ -57,10 +57,10 @@ func (rw *randomWRR) Next() (item interface{}) { if len(rw.items) == 0 { return nil } - sumOfWeights = rw.items[len(rw.items)-1].accumulatedWeight if rw.equalWeights { return rw.items[grpcrandInt63n(int64(len(rw.items)))].item } + sumOfWeights = rw.items[len(rw.items)-1].accumulatedWeight // Random number in [0, sumOfWeights). randomWeight := grpcrandInt63n(sumOfWeights) // Item's accumulated weights are in ascending order, because item's weight >= 0. From 4c8c3c7eaaa6bf47c50722991bc5ac5be5687200 Mon Sep 17 00:00:00 2001 From: huangchong94 Date: Fri, 7 Jan 2022 11:53:21 +0800 Subject: [PATCH 7/8] add weight field back to weightedItem; remove redundant return in testWRRNext --- internal/wrr/random.go | 12 +++++------- internal/wrr/wrr_test.go | 17 ++++++++--------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/internal/wrr/random.go b/internal/wrr/random.go index befa3ab3b2f..486d827e872 100644 --- a/internal/wrr/random.go +++ b/internal/wrr/random.go @@ -28,6 +28,7 @@ import ( // weightedItem is a wrapped weighted item that is used to implement weighted random algorithm. type weightedItem struct { item interface{} + weight int64 accumulatedWeight int64 } @@ -53,13 +54,14 @@ var grpcrandInt63n = grpcrand.Int63n func (rw *randomWRR) Next() (item interface{}) { rw.mu.RLock() defer rw.mu.RUnlock() - var sumOfWeights int64 if len(rw.items) == 0 { return nil } if rw.equalWeights { return rw.items[grpcrandInt63n(int64(len(rw.items)))].item } + + var sumOfWeights int64 sumOfWeights = rw.items[len(rw.items)-1].accumulatedWeight // Random number in [0, sumOfWeights). randomWeight := grpcrandInt63n(sumOfWeights) @@ -78,14 +80,10 @@ func (rw *randomWRR) Add(item interface{}, weight int64) { if len(rw.items) > 0 { lastItem := rw.items[len(rw.items)-1] accumulatedWeight = lastItem.accumulatedWeight + weight - lastItemWeight := lastItem.accumulatedWeight - if len(rw.items) > 1 { - lastItemWeight = lastItem.accumulatedWeight - rw.items[len(rw.items)-2].accumulatedWeight - } - equalWeights = rw.equalWeights && weight == lastItemWeight + equalWeights = rw.equalWeights && weight == lastItem.weight } rw.equalWeights = equalWeights - rItem := &weightedItem{item: item, accumulatedWeight: accumulatedWeight} + rItem := &weightedItem{item: item, weight: weight, accumulatedWeight: accumulatedWeight} rw.items = append(rw.items, rItem) } diff --git a/internal/wrr/wrr_test.go b/internal/wrr/wrr_test.go index a9457501a66..ce4f5e507a2 100644 --- a/internal/wrr/wrr_test.go +++ b/internal/wrr/wrr_test.go @@ -78,20 +78,19 @@ func testWRRNext(t *testing.T, newWRR func() WRR) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var sumOfWeights int64 - w := newWRR() + if len(tt.weights) == 0 { + if next := w.Next(); next != nil { + t.Fatalf("w.Next returns non nil value:%v when there is no item", next) + } + return + } + + var sumOfWeights int64 for i, weight := range tt.weights { w.Add(i, weight) sumOfWeights += weight } - if len(tt.weights) == 0 { - if w.Next() != nil { - t.Fatalf("w.Next returns non nil value when there is no item") - } else { - return - } - } results := make(map[int]int) for i := 0; i < iterCount; i++ { From 144df076aa723194046de39111cbcfe43738b8ce Mon Sep 17 00:00:00 2001 From: huangchong94 Date: Fri, 7 Jan 2022 12:08:40 +0800 Subject: [PATCH 8/8] make vet happy --- internal/wrr/random.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/wrr/random.go b/internal/wrr/random.go index 486d827e872..6d5eb7d4620 100644 --- a/internal/wrr/random.go +++ b/internal/wrr/random.go @@ -61,8 +61,7 @@ func (rw *randomWRR) Next() (item interface{}) { return rw.items[grpcrandInt63n(int64(len(rw.items)))].item } - var sumOfWeights int64 - sumOfWeights = rw.items[len(rw.items)-1].accumulatedWeight + sumOfWeights := rw.items[len(rw.items)-1].accumulatedWeight // Random number in [0, sumOfWeights). randomWeight := grpcrandInt63n(sumOfWeights) // Item's accumulated weights are in ascending order, because item's weight >= 0.