Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): flowcontroller byte reporti…
Browse files Browse the repository at this point in the history
…ng (#7581)

This PR allows the flowcontroller to report bytes in flight for flow controllers with a bounded byte definition.  The primary connection load signals for a connection are the inserts/bytes in flight as reported by the flow controller, and this makes the bytes in flight a signal we can use.

Important note: an unbounded flow controller will not report any bytes in flight.  This avoids introducing odd situations due to size normalization where bytes tracked and the actual capacity of the semaphore could get out of sync.

Towards: https://togithub.com/googleapis/google-cloud-go/issues/7103
  • Loading branch information
shollyman committed Mar 17, 2023
1 parent 4497130 commit 34e35d1
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
18 changes: 13 additions & 5 deletions bigquery/storage/managedwriter/flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type flowController struct {
// Semaphores for governing pending inserts.
semInsertCount, semInsertBytes *semaphore.Weighted

countRemaining int64 // Atomic.
countTracked int64 // Atomic.
bytesTracked int64 // Atomic. Only tracked if bytes are bounded.
}

func newFlowController(maxInserts, maxInsertBytes int) *flowController {
Expand Down Expand Up @@ -80,7 +81,8 @@ func (fc *flowController) acquire(ctx context.Context, sizeBytes int) error {
return err
}
}
atomic.AddInt64(&fc.countRemaining, 1)
atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes))
atomic.AddInt64(&fc.countTracked, 1)
return nil
}

Expand All @@ -103,12 +105,14 @@ func (fc *flowController) tryAcquire(sizeBytes int) bool {
return false
}
}
atomic.AddInt64(&fc.countRemaining, 1)
atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes))
atomic.AddInt64(&fc.countTracked, 1)
return true
}

func (fc *flowController) release(sizeBytes int) {
atomic.AddInt64(&fc.countRemaining, -1)
atomic.AddInt64(&fc.countTracked, -1)
atomic.AddInt64(&fc.bytesTracked, (0 - fc.bound(sizeBytes)))
if fc.semInsertCount != nil {
fc.semInsertCount.Release(1)
}
Expand All @@ -126,5 +130,9 @@ func (fc *flowController) bound(sizeBytes int) int64 {
}

func (fc *flowController) count() int {
return int(atomic.LoadInt64(&fc.countRemaining))
return int(atomic.LoadInt64(&fc.countTracked))
}

func (fc *flowController) bytes() int {
return int(atomic.LoadInt64(&fc.bytesTracked))
}
27 changes: 19 additions & 8 deletions bigquery/storage/managedwriter/flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -125,9 +124,6 @@ func TestFlowControllerSaturation(t *testing.T) {
},
} {
fc := newFlowController(maxCount, maxSize)
// Atomically track flow controller state.
// The flowController itself tracks count.
var curSize int64
success := errors.New("")
// Time out if wantSize or wantCount is never reached.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand Down Expand Up @@ -155,15 +151,15 @@ func TestFlowControllerSaturation(t *testing.T) {
if c == test.wantCount {
hitCount = true
}
s := atomic.AddInt64(&curSize, int64(test.acquireSize))
s := int64(fc.bytes())
if s > test.wantSize {
return fmt.Errorf("size %d exceeds want %d", s, test.wantSize)
}
if s == test.wantSize {
hitSize = true
}
time.Sleep(5 * time.Millisecond) // Let other goroutines make progress.
if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
if fc.bytes() < 0 {
return errors.New("negative size")
}
fc.release(test.acquireSize)
Expand Down Expand Up @@ -212,11 +208,20 @@ func TestFlowControllerUnboundedCount(t *testing.T) {
if !fc.tryAcquire(4) {
t.Error("got false, wanted true")
}
wantBytes := int64(8)
if gotB := int64(fc.bytes()); gotB != wantBytes {
t.Fatalf("got bytes %d, want %d", gotB, wantBytes)
}

// Fail to tryAcquire 3 bytes.
if fc.tryAcquire(3) {
t.Error("got true, wanted false")
}

if gotB := int64(fc.bytes()); gotB != wantBytes {
t.Fatalf("got bytes %d, want %d", gotB, wantBytes)
}

}

func TestFlowControllerUnboundedCount2(t *testing.T) {
Expand All @@ -227,14 +232,20 @@ func TestFlowControllerUnboundedCount2(t *testing.T) {
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}
wantBytes := int64(0)
if gotB := int64(fc.bytes()); gotB != wantBytes {
t.Fatalf("got bytes %d, want %d", gotB, wantBytes)
}
fc.release(1)
fc.release(1)
fc.release(1)
wantCount := int64(-2)
c := int64(fc.count())
if c != wantCount {
if c := int64(fc.count()); c != wantCount {
t.Fatalf("got count %d, want %d", c, wantCount)
}
if gotB := int64(fc.bytes()); gotB != wantBytes {
t.Fatalf("got bytes %d, want %d", gotB, wantBytes)
}
}

func TestFlowControllerUnboundedBytes(t *testing.T) {
Expand Down

0 comments on commit 34e35d1

Please sign in to comment.