Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): add custom client options
Browse files Browse the repository at this point in the history
This PR revisits the expected behavior for config knobs in the client.
Previously, all configuration was done when instantiating a writer (aka
a ManagedStream).  There are some chicken-and-egg problems related to
multiplex settings, as connection options are decoupled from individual
writers.

This PR adds the following unexported custom client options (but does
not yet use them for anything):

* enableMultiplex
* defaultInflightRequests
* defaultInflightBytes
* defaultAppendRowsCallOption

This PR also removes the still-unexported enableMultiplex from
the set of defined WriterOption options which can be passed when
instantiating individual writes.

Towards: googleapis#7103
  • Loading branch information
shollyman committed Feb 23, 2023
1 parent 14771b1 commit ce6d307
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 19 deletions.
98 changes: 92 additions & 6 deletions bigquery/storage/managedwriter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,102 @@ package managedwriter

import (
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

// encapsulates custom client-level config settings.
type writerClientConfig struct {
useMultiplex bool
defaultInflightRequests int
defaultInflightBytes int
defaultAppendRowsCallOptions []gax.CallOption
}

func newWriterClientConfig(opts ...option.ClientOption) *writerClientConfig {
conf := &writerClientConfig{}
for _, opt := range opts {
if wOpt, ok := opt.(writerClientOption); ok {
wOpt.ApplyWriterOpt(conf)
}
}
return conf
}

type writerClientOption interface {
option.ClientOption
ApplyWriterOpt(*writerClientConfig)
}

// enableMultiplex enables multiplex behavior in the client.
//
// TODO: export this as part of the multiplex feature launch.
func enableMultiplex(enable bool) option.ClientOption {
return &enableMultiplexSetting{useMultiplex: enable}
}

type enableMultiplexSetting struct {
internaloption.EmbeddableAdapter
useMultiplex bool
}

func (s *enableMultiplexSetting) ApplyWriterOpt(c *writerClientConfig) {
c.useMultiplex = s.useMultiplex
}

// defaultMaxInflightRequests sets the default flow controller limit for requests for
// all AppendRows connections created by this client.
//
// TODO: export this as part of the multiplex feature launch.
func defaultMaxInflightRequests(n int) option.ClientOption {
return &defaultInflightRequestsSetting{maxRequests: n}
}

type defaultInflightRequestsSetting struct {
internaloption.EmbeddableAdapter
maxRequests int
}

func (s *defaultInflightRequestsSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightRequests = s.maxRequests
}

// defaultMaxInflightBytes sets the default flow controller limit for bytes for
// all AppendRows connections created by this client.
//
// TODO: export this as part of the multiplex feature launch.
func defaultMaxInflightBytes(n int) option.ClientOption {
return &defaultInflightBytesSetting{maxBytes: n}
}

type defaultInflightBytesSetting struct {
internaloption.EmbeddableAdapter
maxBytes int
}

func (s *defaultInflightBytesSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultInflightBytes = s.maxBytes
}

// defaultAppendRowsCallOptions sets a gax.CallOption passed when opening
// the AppendRows bidi connection.
//
// TODO: export this as part of the multiplex feature launch.
func defaultAppendRowsCallOption(o gax.CallOption) option.ClientOption {
return &defaultAppendRowsCallOptionSetting{opt: o}
}

type defaultAppendRowsCallOptionSetting struct {
internaloption.EmbeddableAdapter
opt gax.CallOption
}

func (s *defaultAppendRowsCallOptionSetting) ApplyWriterOpt(c *writerClientConfig) {
c.defaultAppendRowsCallOptions = append(c.defaultAppendRowsCallOptions, s.opt)
}

// WriterOption are variadic options used to configure a ManagedStream instance.
type WriterOption func(*ManagedStream)

Expand Down Expand Up @@ -111,12 +203,6 @@ func EnableWriteRetries(enable bool) WriterOption {
}
}

func enableMultiplex(enable bool) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.multiplex = enable
}
}

// AppendOption are options that can be passed when appending data with a managed stream instance.
type AppendOption func(*pendingWrite)

Expand Down
89 changes: 76 additions & 13 deletions bigquery/storage/managedwriter/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,85 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc"
)

func TestCustomClientOptions(t *testing.T) {
testCases := []struct {
desc string
options []option.ClientOption
want *writerClientConfig
}{
{
desc: "no options",
want: &writerClientConfig{},
},
{
desc: "multiplex",
options: []option.ClientOption{
enableMultiplex(true),
},
want: &writerClientConfig{
useMultiplex: true,
},
},
{
desc: "default requests",
options: []option.ClientOption{
defaultMaxInflightRequests(42),
},
want: &writerClientConfig{
defaultInflightRequests: 42,
},
},
{
desc: "default bytes",
options: []option.ClientOption{
defaultMaxInflightBytes(123),
},
want: &writerClientConfig{
defaultInflightBytes: 123,
},
},
{
desc: "default call options",
options: []option.ClientOption{
defaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
},
want: &writerClientConfig{
defaultAppendRowsCallOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
},
},
},
{
desc: "multiple options",
options: []option.ClientOption{
enableMultiplex(true),
defaultMaxInflightRequests(99),
defaultMaxInflightBytes(12345),
defaultAppendRowsCallOption(gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1))),
},
want: &writerClientConfig{
useMultiplex: true,
defaultInflightRequests: 99,
defaultInflightBytes: 12345,
defaultAppendRowsCallOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
},
},
},
}
for _, tc := range testCases {
gotCfg := newWriterClientConfig(tc.options...)

if diff := cmp.Diff(gotCfg, tc.want, cmp.AllowUnexported(writerClientConfig{})); diff != "" {
t.Errorf("diff in case (%s):\n%v", tc.desc, diff)
}
}
}

func TestWriterOptions(t *testing.T) {

testCases := []struct {
Expand Down Expand Up @@ -133,25 +209,13 @@ func TestWriterOptions(t *testing.T) {
return ms
}(),
},
{
desc: "enableMultiplex",
options: []WriterOption{enableMultiplex(true)},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.multiplex = true
return ms
}(),
},
{
desc: "multiple",
options: []WriterOption{
WithType(PendingStream),
WithMaxInflightBytes(5),
WithTraceID("traceid"),
EnableWriteRetries(true),
enableMultiplex(true),
},
want: func() *ManagedStream {
ms := &ManagedStream{
Expand All @@ -160,7 +224,6 @@ func TestWriterOptions(t *testing.T) {
ms.streamSettings.MaxInflightBytes = 5
ms.streamSettings.streamType = PendingStream
ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s traceid", internal.Version)
ms.streamSettings.multiplex = true
ms.retry = newStatelessRetryer()
return ms
}(),
Expand Down

0 comments on commit ce6d307

Please sign in to comment.