Skip to content

Commit

Permalink
refactor: migrate and and options
Browse files Browse the repository at this point in the history
This PR does more settings consolidation, and updates/adds new
WriterOption options to propagate settings.

In particular, this PR:
* moves the AppendRows call options into streamSettings
* adds a multiplex flag and option to streamSettings
* adds a call function option into streamSettings

This PR also updates managed stream to use the new option(s) as
appropriate, but most of this is unused here and is in preparation
for a larger cutover of functionality related to the new connection
abstractions.

Towards: googleapis#7103
  • Loading branch information
shollyman committed Feb 7, 2023
1 parent c71b090 commit 76dbae4
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 16 deletions.
6 changes: 1 addition & 5 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/google/uuid"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
"google.golang.org/grpc"
)

// DetectProjectID is a sentinel value that instructs NewClient to detect the
Expand Down Expand Up @@ -112,10 +111,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
c: c,
ctx: ctx,
cancel: cancel,
callOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
open: createOpenF(ctx, streamFunc),
open: createOpenF(ctx, streamFunc),
}

// apply writer options
Expand Down
25 changes: 20 additions & 5 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
"github.com/googleapis/gax-go/v2"
"go.opencensus.io/tag"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -85,10 +86,9 @@ type ManagedStream struct {
retry *statelessRetryer

// aspects of the stream client
ctx context.Context // retained context for the stream
cancel context.CancelFunc
callOptions []gax.CallOption // options passed when opening an append client
open func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection
ctx context.Context // retained context for the stream
cancel context.CancelFunc
open func(opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) // how we get a new connection

mu sync.Mutex
arc *storagepb.BigQueryWrite_AppendRowsClient // current stream connection
Expand Down Expand Up @@ -129,6 +129,14 @@ type streamSettings struct {

// retains reference to the target table when resolving settings
destinationTable string

appendCallOptions []gax.CallOption

// enable multiplex?
multiplex bool

// retain a copy of the stream client func.
streamFunc streamClientFunc
}

func defaultStreamSettings() *streamSettings {
Expand All @@ -137,6 +145,9 @@ func defaultStreamSettings() *streamSettings {
MaxInflightRequests: 1000,
MaxInflightBytes: 0,
TraceID: buildTraceID(""),
appendCallOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(10 * 1024 * 1024)),
},
}
}

Expand Down Expand Up @@ -233,7 +244,11 @@ func (ms *ManagedStream) openWithRetry() (storagepb.BigQueryWrite_AppendRowsClie
r := &unaryRetryer{}
for {
recordStat(ms.ctx, AppendClientOpenCount, 1)
arc, err := ms.open(ms.callOptions...)
var opts []gax.CallOption
if ms.streamSettings != nil {
opts = ms.streamSettings.appendCallOptions
}
arc, err := ms.open(opts...)
bo, shouldRetry := r.Retry(err)
if err != nil && shouldRetry {
recordStat(ms.ctx, AppendClientOpenRetryCount, 1)
Expand Down
6 changes: 4 additions & 2 deletions bigquery/storage/managedwriter/managed_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,10 @@ func TestOpenCallOptionPropagation(t *testing.T) {

ms := &ManagedStream{
ctx: ctx,
callOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)),
streamSettings: &streamSettings{
appendCallOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallRecvMsgSize(99)),
},
},
open: createOpenF(ctx, func(ctx context.Context, opts ...gax.CallOption) (storage.BigQueryWrite_AppendRowsClient, error) {
if len(opts) == 0 {
Expand Down
8 changes: 7 additions & 1 deletion bigquery/storage/managedwriter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func WithDataOrigin(dataOrigin string) WriterOption {
// it opens the underlying append stream.
func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
return func(ms *ManagedStream) {
ms.callOptions = append(ms.callOptions, o)
ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions, o)
}
}

Expand All @@ -111,6 +111,12 @@ func EnableWriteRetries(enable bool) WriterOption {
}
}

func enableMultiplex(enable bool) WriterOption {
return func(ms *ManagedStream) {
ms.streamSettings.multiplex = enable
}
}

// AppendOption are options that can be passed when appending data with a managed stream instance.
type AppendOption func(*pendingWrite)

Expand Down
18 changes: 15 additions & 3 deletions bigquery/storage/managedwriter/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ func TestWriterOptions(t *testing.T) {
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
callOptions: []gax.CallOption{
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)),
},
}
ms.streamSettings.appendCallOptions = append(ms.streamSettings.appendCallOptions,
gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(1)))
return ms
}(),
},
Expand All @@ -134,13 +133,25 @@ func TestWriterOptions(t *testing.T) {
return ms
}(),
},
{
desc: "enableMultiplex",
options: []WriterOption{enableMultiplex(true)},
want: func() *ManagedStream {
ms := &ManagedStream{
streamSettings: defaultStreamSettings(),
}
ms.streamSettings.multiplex = true
return ms
}(),
},
{
desc: "multiple",
options: []WriterOption{
WithType(PendingStream),
WithMaxInflightBytes(5),
WithTraceID("traceid"),
EnableWriteRetries(true),
enableMultiplex(true),
},
want: func() *ManagedStream {
ms := &ManagedStream{
Expand All @@ -149,6 +160,7 @@ func TestWriterOptions(t *testing.T) {
ms.streamSettings.MaxInflightBytes = 5
ms.streamSettings.streamType = PendingStream
ms.streamSettings.TraceID = fmt.Sprintf("go-managedwriter:%s traceid", internal.Version)
ms.streamSettings.multiplex = true
ms.retry = newStatelessRetryer()
return ms
}(),
Expand Down

0 comments on commit 76dbae4

Please sign in to comment.