Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): add custom client options (#…
Browse files Browse the repository at this point in the history
…7490)

* refactor(bigquery/storage/managedwriter): add custom client options

This PR revisits the expected behavior for config knobs in the client.
Previously, all configuration was done when instantiating a writer (aka
a ManagedStream).  There are some chicken-and-egg problems related to
multiplex settings, as connection options are decoupled from individual
writers.

This PR adds the following unexported custom client options (but does
not yet use them for anything):

* enableMultiplex
* defaultInflightRequests
* defaultInflightBytes
* defaultAppendRowsCallOption

This PR also removes the still-unexported enableMultiplex from
the set of defined WriterOption options which can be passed when
instantiating individual writes.

Additionally, this refactor includes a correctness fix for the traceID option that was causing the traceID to duplicate the initial token.

Towards: #7103
  • Loading branch information
shollyman committed Feb 27, 2023
1 parent f55bfe0 commit f425a60
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 35 deletions.
100 changes: 93 additions & 7 deletions bigquery/storage/managedwriter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,102 @@ package managedwriter

import (
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

// encapsulates custom client-level config settings.
type writerClientConfig struct {
useMultiplex bool
defaultInflightRequests int
defaultInflightBytes int
defaultAppendRowsCallOptions []gax.CallOption
}

func newWriterClientConfig(opts ...option.ClientOption) *writerClientConfig {
conf := &writerClientConfig{}
for _, opt := range opts {
if wOpt, ok := opt.(writerClientOption); ok {
wOpt.ApplyWriterOpt(conf)
}
}
return conf
}

type writerClientOption interface {
option.ClientOption
ApplyWriterOpt(*writerClientConfig)
}

// enableMultiplex enables multiplex behavior in the client.
//
// TODO: export this as part of the multiplex feature launch.
func enableMultiplex(enable bool) option.ClientOption {
return &enableMultiplexSetting{useMultiplex: enable}
}

type enableMultiplexSetting struct {
internaloption.EmbeddableAdapter
useMultiplex bool
}

func (s *enableMultiplexSetting) ApplyWriterOpt(c *writerClientConfig) {
c.useMultiplex = s.useMultiplex
}

// defaultMaxInflightRequests sets the default flow controller limit for requests for
// all AppendRows connections created by this client.
//
// TODO: export this as part of the multiplex feature launch.
func defaultMaxInflightRequests(n int) option.ClientOption {
return &defaultInflightRequestsSetting{maxRequests: n}
}

type defaultInflightRequestsSetting struct {
internaloption.EmbeddableAdapter
maxRequests int
}

func (s *defaultInflightRequestsSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightRequests = s.maxRequests
}

// defaultMaxInflightBytes sets the default flow controller limit for bytes for
// all AppendRows connections created by this client.
//
// TODO: export this as part of the multiplex feature launch.
func defaultMaxInflightBytes(n int) option.ClientOption {
return &defaultInflightBytesSetting{maxBytes: n}
}

type defaultInflightBytesSetting struct {
internaloption.EmbeddableAdapter
maxBytes int
}

func (s *defaultInflightBytesSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightBytes = s.maxBytes
}

// defaultAppendRowsCallOptions sets a gax.CallOption passed when opening
// the AppendRows bidi connection.
//
// TODO: export this as part of the multiplex feature launch.
func defaultAppendRowsCallOption(o gax.CallOption) option.ClientOption {
return &defaultAppendRowsCallOptionSetting{opt: o}
}

type defaultAppendRowsCallOptionSetting struct {
internaloption.EmbeddableAdapter
opt gax.CallOption
}

func (s *defaultAppendRowsCallOptionSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultAppendRowsCallOptions = append(c.defaultAppendRowsCallOptions, s.opt)
}

// WriterOption are variadic options used to configure a ManagedStream instance.
type WriterOption func(*ManagedStream)

Expand Down Expand Up @@ -70,7 +162,7 @@ func WithMaxInflightBytes(n int) WriterOption {
// This is generally for diagnostic purposes only.
func WithTraceID(traceID string) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.TraceID = buildTraceID(traceID)
ms.streamSettings.TraceID = traceID
}
}

Expand Down Expand Up @@ -111,12 +203,6 @@ func EnableWriteRetries(enable bool) WriterOption {
}
}

func enableMultiplex(enable bool) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.multiplex = enable
}
}

// AppendOption are options that can be passed when appending data with a managed stream instance.
type AppendOption func(*pendingWrite)

Expand Down
106 changes: 78 additions & 28 deletions bigquery/storage/managedwriter/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,91 @@
package managedwriter

import (
"fmt"
"sync"
"testing"

"cloud.google.com/go/bigquery/internal"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc"
)

func TestCustomClientOptions(t *testing.T) {
testCases := []struct {
desc string
options []option.ClientOption
want *writerClientConfig
}{
{
desc: "no options",
want: &writerClientConfig{},
},
{
desc: "multiplex",
options: []option.ClientOption{
enableMultiplex(true),
},
want: &writerClientConfig{
useMultiplex: true,
},
},
{
desc: "default requests",
options: []option.ClientOption{
defaultMaxInflightRequests(42),
},
want: &writerClientConfig{
defaultInflightRequests: 42,
},
},
{
desc: "default bytes",
options: []option.ClientOption{
defaultMaxInflightBytes(123),
},
want: &writerClientConfig{
defaultInflightBytes: 123,
},
},
{
desc: "default call options",
options: []option.ClientOption{
defaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
},
want: &writerClientConfig{
defaultAppendRowsCallOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
},
},
},
{
desc: "multiple options",
options: []option.ClientOption{
enableMultiplex(true),
defaultMaxInflightRequests(99),
defaultMaxInflightBytes(12345),
defaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
},
want: &writerClientConfig{
useMultiplex: true,
defaultInflightRequests: 99,
defaultInflightBytes: 12345,
defaultAppendRowsCallOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
},
},
},
}
for _, tc := range testCases {
gotCfg := newWriterClientConfig(tc.options...)

if diff := cmp.Diff(gotCfg, tc.want, cmp.AllowUnexported(writerClientConfig{})); diff != "" {
t.Errorf("diff in case (%s):\n%v", tc.desc, diff)
}
}
}

func TestWriterOptions(t *testing.T) {

testCases := []struct {
Expand Down Expand Up @@ -73,18 +147,7 @@ func TestWriterOptions(t *testing.T) {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s foo", internal.Version)
return ms
}(),
},
{
desc: "WithoutTraceID",
options: []WriterOption{},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s", internal.Version)
ms.streamSettings.TraceID = "foo"
return ms
}(),
},
Expand Down Expand Up @@ -133,34 +196,21 @@ func TestWriterOptions(t *testing.T) {
return ms
}(),
},
{
desc: "enableMultiplex",
options: []WriterOption{enableMultiplex(true)},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.multiplex = true
return ms
}(),
},
{
desc: "multiple",
options: []WriterOption{
WithType(PendingStream),
WithMaxInflightBytes(5),
WithTraceID("traceid"),
EnableWriteRetries(true),
enableMultiplex(true),
},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.MaxInflightBytes = 5
ms.streamSettings.streamType = PendingStream
ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s traceid", internal.Version)
ms.streamSettings.multiplex = true
ms.streamSettings.TraceID = "traceid"
ms.retry = newStatelessRetryer()
return ms
}(),
Expand Down

0 comments on commit f425a60

Please sign in to comment.