Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigquery/storage/managedwriter): fix option propagation #7669

Merged
merged 4 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 4 additions & 18 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
}

// No existing pool available, create one for the location and add to shared pools.
pool, err := c.createPool(ctx, loc, nil, streamFunc)
pool, err := c.createPool(ctx, loc, streamFunc)
if err != nil {
return nil, err
}
Expand All @@ -227,7 +227,7 @@ func (c *Client) resolvePool(ctx context.Context, settings *streamSettings, stre
}

// createPool builds a connectionPool.
func (c *Client) createPool(ctx context.Context, location string, settings *streamSettings, streamFunc streamClientFunc) (*connectionPool, error) {
func (c *Client) createPool(ctx context.Context, location string, streamFunc streamClientFunc) (*connectionPool, error) {
cCtx, cancel := context.WithCancel(ctx)

if c.cfg == nil {
Expand All @@ -238,29 +238,15 @@ func (c *Client) createPool(ctx context.Context, location string, settings *stre
// add location header to the retained pool context.
cCtx = metadata.AppendToOutgoingContext(ctx, "x-goog-request-params", fmt.Sprintf("write_location=%s", location))
}
fcRequests := c.cfg.defaultInflightRequests
fcBytes := c.cfg.defaultInflightBytes
arOpts := c.cfg.defaultAppendRowsCallOptions
if settings != nil {
if settings.MaxInflightRequests > 0 {
fcRequests = settings.MaxInflightRequests
}
if settings.MaxInflightBytes > 0 {
fcBytes = settings.MaxInflightBytes
}
for _, o := range settings.appendCallOptions {
arOpts = append(arOpts, o)
}
}

pool := &connectionPool{
id: newUUID(poolIDPrefix),
location: location,
ctx: cCtx,
cancel: cancel,
open: createOpenF(ctx, streamFunc),
callOptions: arOpts,
baseFlowController: newFlowController(fcRequests, fcBytes),
callOptions: c.cfg.defaultAppendRowsCallOptions,
baseFlowController: newFlowController(c.cfg.defaultInflightRequests, c.cfg.defaultInflightBytes),
}
router := newSharedRouter(c.cfg.useMultiplex, c.cfg.maxMultiplexPoolSize)
if err := pool.activateRouter(router); err != nil {
Expand Down
63 changes: 35 additions & 28 deletions bigquery/storage/managedwriter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCreatePool_Location(t *testing.T) {
c := &Client{
cfg: &writerClientConfig{},
}
pool, err := c.createPool(context.Background(), "foo", nil, nil)
pool, err := c.createPool(context.Background(), "foo", nil)
if err != nil {
t.Fatalf("createPool: %v", err)
}
Expand Down Expand Up @@ -86,18 +86,14 @@ func TestCreatePool_Location(t *testing.T) {
// of global configuration and per-writer configuration.
func TestCreatePool(t *testing.T) {
testCases := []struct {
desc string
cfg *writerClientConfig
settings *streamSettings
wantMaxBytes int
wantMaxRequests int
wantCallOptions int
wantErr bool
desc string
cfg *writerClientConfig
settings *streamSettings
wantMaxBytes int
wantMaxRequests int
wantCallOptions int
wantPoolCallOptions int
}{
{
desc: "no config",
wantErr: true,
},
{
desc: "cfg, no settings",
cfg: &writerClientConfig{
Expand Down Expand Up @@ -130,9 +126,9 @@ func TestCreatePool(t *testing.T) {
MaxInflightRequests: 99,
MaxInflightBytes: 1024,
},
wantMaxBytes: 1024,
wantMaxRequests: 99,
wantCallOptions: 1,
wantMaxBytes: 1024,
wantMaxRequests: 99,
wantPoolCallOptions: 1,
},
{
desc: "merge defaults and settings",
Expand All @@ -145,36 +141,47 @@ func TestCreatePool(t *testing.T) {
MaxInflightBytes: 1024,
appendCallOptions: []gax.CallOption{gax.WithPath("foo")},
},
wantMaxBytes: 1024,
wantMaxRequests: 123,
wantCallOptions: 2,
wantMaxBytes: 1024,
wantMaxRequests: 123,
wantCallOptions: 1,
wantPoolCallOptions: 1,
},
}

for _, tc := range testCases {
c := &Client{
cfg: tc.cfg,
}
got, err := c.createPool(context.Background(), "", tc.settings, nil)
pool, err := c.createPool(context.Background(), "", nil)
if err != nil {
if !tc.wantErr {
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
}
t.Errorf("case %q: createPool errored unexpectedly: %v", tc.desc, err)
continue
}
if err == nil && tc.wantErr {
t.Errorf("case %q: expected createPool to error but it did not", tc.desc)
continue
writer := &ManagedStream{
id: "foo",
streamSettings: tc.settings,
}
if err = pool.addWriter(writer); err != nil {
t.Errorf("case %q: addWriter: %v", tc.desc, err)
}
pw := newPendingWrite(context.Background(), writer, nil, nil, "", "")
gotConn, err := pool.selectConn(pw)
if err != nil {
t.Errorf("case %q: selectConn: %v", tc.desc, err)
}

// too many go-cmp overrides needed to quickly diff here, look at the interesting fields explicitly.
if gotVal := got.baseFlowController.maxInsertBytes; gotVal != tc.wantMaxBytes {
if gotVal := gotConn.fc.maxInsertBytes; gotVal != tc.wantMaxBytes {
t.Errorf("case %q: flowController maxInsertBytes mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxBytes)
}
if gotVal := got.baseFlowController.maxInsertCount; gotVal != tc.wantMaxRequests {
if gotVal := gotConn.fc.maxInsertCount; gotVal != tc.wantMaxRequests {
t.Errorf("case %q: flowController maxInsertCount mismatch, got %d want %d", tc.desc, gotVal, tc.wantMaxRequests)
}
if gotVal := len(got.callOptions); gotVal != tc.wantCallOptions {
if gotVal := len(gotConn.callOptions); gotVal != tc.wantCallOptions {
t.Errorf("case %q: calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantCallOptions)
}
if gotVal := len(pool.callOptions); gotVal != tc.wantPoolCallOptions {
t.Errorf("case %q: POOL calloption count mismatch, got %d want %d", tc.desc, gotVal, tc.wantPoolCallOptions)
}
}
}
48 changes: 38 additions & 10 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ type connectionPool struct {
// connection. Opening the connection is a stateless operation.
open func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)

// We specify one set of calloptions for the pool.
// All connections in the pool open with the same call options.
// We specify default calloptions for the pool.
// Explicit connections may have their own calloptions as well.
callOptions []gax.CallOption

router poolRouter // poolManager makes the decisions about connections and routing.
Expand Down Expand Up @@ -119,6 +119,16 @@ func (pool *connectionPool) removeWriter(writer *ManagedStream) error {
return detachErr
}

func (cp *connectionPool) mergeCallOptions(co *connection) []gax.CallOption {
if co == nil {
return cp.callOptions
}
var mergedOpts []gax.CallOption
mergedOpts = append(mergedOpts, cp.callOptions...)
mergedOpts = append(mergedOpts, co.callOptions...)
return mergedOpts
}

// openWithRetry establishes a new bidi stream and channel pair. It is used by connection objects
// when (re)opening the network connection to the backend.
//
Expand All @@ -127,7 +137,7 @@ func (cp *connectionPool) openWithRetry(co *connection) (storagepb.BigQueryWrite
r := &unaryRetryer{}
for {
recordStat(cp.ctx, AppendClientOpenCount, 1)
arc, err := cp.open(cp.callOptions...)
arc, err := cp.open(cp.mergeCallOptions(co)...)
if err != nil {
bo, shouldRetry := r.Retry(err)
if shouldRetry {
Expand Down Expand Up @@ -172,9 +182,10 @@ type connection struct {
id string
pool *connectionPool // each connection retains a reference to its owning pool.

fc *flowController // each connection has it's own flow controller.
ctx context.Context // retained context for maintaining the connection, derived from the owning pool.
cancel context.CancelFunc
fc *flowController // each connection has it's own flow controller.
callOptions []gax.CallOption // custom calloptions for this connection.
ctx context.Context // retained context for maintaining the connection, derived from the owning pool.
cancel context.CancelFunc

retry *statelessRetryer
optimizer sendOptimizer
Expand All @@ -197,16 +208,32 @@ const (
verboseConnectionMode connectionMode = "VERBOSE"
)

func newConnection(pool *connectionPool, mode connectionMode) *connection {
func newConnection(pool *connectionPool, mode connectionMode, settings *streamSettings) *connection {
if pool == nil {
return nil
}
// create and retain a cancellable context.
connCtx, cancel := context.WithCancel(pool.ctx)
fc := newFlowController(0, 0)
if pool != nil {
fc = copyFlowController(pool.baseFlowController)

// Resolve local overrides for flow control and call options
fcRequests := 0
fcBytes := 0
var opts []gax.CallOption

if pool.baseFlowController != nil {
fcRequests = pool.baseFlowController.maxInsertCount
fcBytes = pool.baseFlowController.maxInsertBytes
}
if settings != nil {
if settings.MaxInflightRequests > 0 {
fcRequests = settings.MaxInflightRequests
}
if settings.MaxInflightBytes > 0 {
fcBytes = settings.MaxInflightBytes
}
opts = settings.appendCallOptions
}
fc := newFlowController(fcRequests, fcBytes)
countLimit, byteLimit := computeLoadThresholds(fc)

return &connection{
Expand All @@ -218,6 +245,7 @@ func newConnection(pool *connectionPool, mode connectionMode) *connection {
optimizer: optimizer(mode),
loadBytesThreshold: byteLimit,
loadCountThreshold: countLimit,
callOptions: opts,
}
}

Expand Down
2 changes: 1 addition & 1 deletion bigquery/storage/managedwriter/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestConnectionPool_OpenCallOptionPropagation(t *testing.T) {
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)),
},
}
conn := newConnection(pool, "")
conn := newConnection(pool, "", nil)
pool.openWithRetry(conn)
}

Expand Down
8 changes: 4 additions & 4 deletions bigquery/storage/managedwriter/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (rtr *simpleRouter) writerAttach(writer *ManagedStream) error {
defer rtr.mu.Unlock()
rtr.writers[writer.id] = struct{}{}
if rtr.conn == nil {
rtr.conn = newConnection(rtr.pool, rtr.mode)
rtr.conn = newConnection(rtr.pool, rtr.mode, nil)
}
return nil
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (sr *sharedRouter) writerAttach(writer *ManagedStream) error {
if pair := sr.exclusiveConns[writer.id]; pair != nil {
return fmt.Errorf("writer %q already attached", writer.id)
}
sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode)
sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode, writer.streamSettings)
return nil
}

Expand Down Expand Up @@ -242,9 +242,9 @@ func (sr *sharedRouter) orderAndGrowMultiConns() {
return sr.multiConns[i].curLoad() < sr.multiConns[j].curLoad()
})
if len(sr.multiConns) == 0 {
sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode)}
sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}
} else if sr.multiConns[0].isLoaded() && len(sr.multiConns) < sr.maxConns {
sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode)}, sr.multiConns...)
sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}, sr.multiConns...)
}
}

Expand Down