Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): flowcontroller byte reporting
Browse files Browse the repository at this point in the history
This PR allows the flowcontroller to report bytes in flight for flow
controllers with a bounded byte definition.  The primary connection
load signals for a connection are the inserts/bytes in flight as
reported by the flow controller, and this makes the bytes in flight a
signal we can use.

Important note: an unbounded flow controller will not report any bytes
in flight.  This avoids introducing odd situations due to size
normalization where bytes tracked and the actual capacity of the
semaphore could get out of sync.

Towards: googleapis#7103
  • Loading branch information
shollyman committed Mar 17, 2023
1 parent 7d085b4 commit 6f1735f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
18 changes: 13 additions & 5 deletions bigquery/storage/managedwriter/flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type flowController struct {
// Semaphores for governing pending inserts.
semInsertCount, semInsertBytes *semaphore.Weighted

countRemaining int64 // Atomic.
countTracked int64 // Atomic.
bytesTracked int64 // Atomic. Only tracked if bytes are bounded.
}

func newFlowController(maxInserts, maxInsertBytes int) *flowController {
Expand Down Expand Up @@ -80,7 +81,8 @@ func (fc *flowController) acquire(ctx context.Context, sizeBytes int) error {
return err
}
}
atomic.AddInt64(&fc.countRemaining, 1)
atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes))
atomic.AddInt64(&fc.countTracked, 1)
return nil
}

Expand All @@ -103,12 +105,14 @@ func (fc *flowController) tryAcquire(sizeBytes int) bool {
return false
}
}
atomic.AddInt64(&fc.countRemaining, 1)
atomic.AddInt64(&fc.bytesTracked, fc.bound(sizeBytes))
atomic.AddInt64(&fc.countTracked, 1)
return true
}

func (fc *flowController) release(sizeBytes int) {
atomic.AddInt64(&fc.countRemaining, -1)
atomic.AddInt64(&fc.countTracked, -1)
atomic.AddInt64(&fc.bytesTracked, (0 - fc.bound(sizeBytes)))
if fc.semInsertCount != nil {
fc.semInsertCount.Release(1)
}
Expand All @@ -126,5 +130,9 @@ func (fc *flowController) bound(sizeBytes int) int64 {
}

func (fc *flowController) count() int {
return int(atomic.LoadInt64(&fc.countRemaining))
return int(atomic.LoadInt64(&fc.countTracked))
}

func (fc *flowController) bytes() int {
return int(atomic.LoadInt64(&fc.bytesTracked))
}
27 changes: 19 additions & 8 deletions bigquery/storage/managedwriter/flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"errors"
"fmt"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -125,9 +124,6 @@ func TestFlowControllerSaturation(t *testing.T) {
},
} {
fc := newFlowController(maxCount, maxSize)
// Atomically track flow controller state.
// The flowController itself tracks count.
var curSize int64
success := errors.New("")
// Time out if wantSize or wantCount is never reached.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand Down Expand Up @@ -155,15 +151,15 @@ func TestFlowControllerSaturation(t *testing.T) {
if c == test.wantCount {
hitCount = true
}
s := atomic.AddInt64(&curSize, int64(test.acquireSize))
s := int64(fc.bytes())
if s > test.wantSize {
return fmt.Errorf("size %d exceeds want %d", s, test.wantSize)
}
if s == test.wantSize {
hitSize = true
}
time.Sleep(5 * time.Millisecond) // Let other goroutines make progress.
if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
if fc.bytes() < 0 {
return errors.New("negative size")
}
fc.release(test.acquireSize)
Expand Down Expand Up @@ -212,11 +208,20 @@ func TestFlowControllerUnboundedCount(t *testing.T) {
if !fc.tryAcquire(4) {
t.Error("got false, wanted true")
}
wantBytes := int64(8)
if gotB := int64(fc.bytes()); gotB != wantBytes {
t.Fatalf("got bytes %d, want %d", gotB, wantBytes)
}

// Fail to tryAcquire 3 bytes.
if fc.tryAcquire(3) {
t.Error("got true, wanted false")
}

if gotB := int64(fc.bytes()); gotB != wantBytes {
t.Fatalf("got bytes %d, want %d", gotB, wantBytes)
}

}

func TestFlowControllerUnboundedCount2(t *testing.T) {
Expand All @@ -227,14 +232,20 @@ func TestFlowControllerUnboundedCount2(t *testing.T) {
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}
wantBytes := int64(0)
if gotB := int64(fc.bytes()); gotB != wantBytes {
t.Fatalf("got bytes %d, want %d", gotB, wantBytes)
}
fc.release(1)
fc.release(1)
fc.release(1)
wantCount := int64(-2)
c := int64(fc.count())
if c != wantCount {
if c := int64(fc.count()); c != wantCount {
t.Fatalf("got count %d, want %d", c, wantCount)
}
if gotB := int64(fc.bytes()); gotB != wantBytes {
t.Fatalf("got bytes %d, want %d", gotB, wantBytes)
}
}

func TestFlowControllerUnboundedBytes(t *testing.T) {
Expand Down

0 comments on commit 6f1735f

Please sign in to comment.