Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): support default value controls #8686

Merged
merged 14 commits into from
Oct 24, 2023
Merged
33 changes: 11 additions & 22 deletions bigquery/storage/managedwriter/appendresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ type pendingWrite struct {
// likely outcome when processing requests and it allows us to be efficient on send.
// We retain the additional information to build the complete request in the related fields.
req *storagepb.AppendRowsRequest
descVersion *descriptorVersion // schema at time of creation
reqTmpl *versionedTemplate // request template at time of creation
traceID string
writeStreamID string

Expand All @@ -188,21 +188,21 @@ type pendingWrite struct {
// to the pending results for later consumption. The provided context is
// embedded in the pending write, as the write may be retried and we want
// to respect the original context for expiry/cancellation etc.
func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, curDescVersion *descriptorVersion, writeStreamID, traceID string) *pendingWrite {
func newPendingWrite(ctx context.Context, src *ManagedStream, req *storagepb.AppendRowsRequest, reqTmpl *versionedTemplate, writeStreamID, traceID string) *pendingWrite {
pw := &pendingWrite{
writer: src,
result: newAppendResult(),
reqCtx: ctx,

req: req,
descVersion: curDescVersion,
req: req, // minimal req, typically just row data
reqTmpl: reqTmpl, // remainder of templated request
writeStreamID: writeStreamID,
traceID: traceID,
}
// Compute the approx size for flow control purposes.
pw.reqSize = proto.Size(pw.req) + len(writeStreamID) + len(traceID)
if pw.descVersion != nil {
pw.reqSize += proto.Size(pw.descVersion.descriptorProto)
if pw.reqTmpl != nil {
pw.reqSize += proto.Size(pw.reqTmpl.tmpl)
}
return pw
}
Expand All @@ -221,33 +221,22 @@ func (pw *pendingWrite) markDone(resp *storagepb.AppendRowsResponse, err error)
close(pw.result.ready)
// Cleanup references remaining on the write explicitly.
pw.req = nil
pw.descVersion = nil
pw.reqTmpl = nil
pw.writer = nil
pw.reqCtx = nil
}

func (pw *pendingWrite) constructFullRequest(addTrace bool) *storagepb.AppendRowsRequest {
req := &storagepb.AppendRowsRequest{}
if pw.reqTmpl != nil {
req = proto.Clone(pw.reqTmpl.tmpl).(*storagepb.AppendRowsRequest)
}
if pw.req != nil {
req = proto.Clone(pw.req).(*storagepb.AppendRowsRequest)
proto.Merge(req, pw.req)
}
if addTrace {
req.TraceId = buildTraceID(&streamSettings{TraceID: pw.traceID})
}
req.WriteStream = pw.writeStreamID
if pw.descVersion != nil {
ps := &storagepb.ProtoSchema{
ProtoDescriptor: pw.descVersion.descriptorProto,
}
if pr := req.GetProtoRows(); pr != nil {
pr.WriterSchema = ps
} else {
req.Rows = &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: ps,
},
}
}
}
return req
}
15 changes: 8 additions & 7 deletions bigquery/storage/managedwriter/appendresult_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func TestPendingWrite(t *testing.T) {
func TestPendingWrite_ConstructFullRequest(t *testing.T) {

testDP := &descriptorpb.DescriptorProto{Name: proto.String("foo")}
testDV := newDescriptorVersion(testDP)
testTmpl := newVersionedTemplate().revise(reviseProtoSchema(testDP))

testEmptyTraceID := buildTraceID(&streamSettings{})

for _, tc := range []struct {
Expand All @@ -144,7 +145,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
{
desc: "nil request",
pw: &pendingWrite{
descVersion: testDV,
reqTmpl: testTmpl,
},
want: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
Expand All @@ -159,8 +160,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
{
desc: "empty req w/trace",
pw: &pendingWrite{
req: &storagepb.AppendRowsRequest{},
descVersion: testDV,
req: &storagepb.AppendRowsRequest{},
reqTmpl: testTmpl,
},
addTrace: true,
want: &storagepb.AppendRowsRequest{
Expand All @@ -177,8 +178,8 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
{
desc: "basic req",
pw: &pendingWrite{
req: &storagepb.AppendRowsRequest{},
descVersion: testDV,
req: &storagepb.AppendRowsRequest{},
reqTmpl: testTmpl,
},
want: &storagepb.AppendRowsRequest{
Rows: &storagepb.AppendRowsRequest_ProtoRows{
Expand All @@ -194,7 +195,7 @@ func TestPendingWrite_ConstructFullRequest(t *testing.T) {
desc: "everything w/trace",
pw: &pendingWrite{
req: &storagepb.AppendRowsRequest{},
descVersion: testDV,
reqTmpl: testTmpl,
traceID: "foo",
writeStreamID: "streamid",
},
Expand Down
1 change: 1 addition & 0 deletions bigquery/storage/managedwriter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
id: newUUID(writerIDPrefix),
c: c,
streamSettings: defaultStreamSettings(),
curTemplate: newVersionedTemplate(),
}
// apply writer options.
for _, opt := range opts {
Expand Down
18 changes: 16 additions & 2 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,22 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
// Additionally, we check multiplex status as schema changes for explicit streams
// require reconnect, whereas multiplex does not.
forceReconnect := false
if pw.writer != nil && pw.descVersion != nil && pw.descVersion.isNewer(pw.writer.curDescVersion) {
pw.writer.curDescVersion = pw.descVersion
promoted := false
if pw.writer != nil && pw.reqTmpl != nil {
if !pw.reqTmpl.Compatible(pw.writer.curTemplate) {
if pw.writer.curTemplate == nil {
// promote because there's no current template
pw.writer.curTemplate = pw.reqTmpl
promoted = true
} else {
if pw.writer.curTemplate.versionTime.Before(pw.reqTmpl.versionTime) {
pw.writer.curTemplate = pw.reqTmpl
promoted = true
}
}
}
}
if promoted {
if co.optimizer == nil {
forceReconnect = true
} else {
Expand Down
95 changes: 94 additions & 1 deletion bigquery/storage/managedwriter/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ func TestIntegration_ManagedWriter(t *testing.T) {
t.Run("TestLargeInsertWithRetry", func(t *testing.T) {
testLargeInsertWithRetry(ctx, t, mwClient, bqClient, dataset)
})

t.Run("DefaultValueHandling", func(t *testing.T) {
testDefaultValueHandling(ctx, t, mwClient, bqClient, dataset)
})
})
}

Expand Down Expand Up @@ -1262,6 +1264,97 @@ func testSchemaEvolution(ctx context.Context, t *testing.T, mwClient *Client, bq
)
}

func testDefaultValueHandling(ctx context.Context, t *testing.T, mwClient *Client, bqClient *bigquery.Client, dataset *bigquery.Dataset, opts ...WriterOption) {
testTable := dataset.Table(tableIDs.New())
if err := testTable.Create(ctx, &bigquery.TableMetadata{Schema: testdata.DefaultValueSchema}); err != nil {
t.Fatalf("failed to create test table %s: %v", testTable.FullyQualifiedName(), err)
}

m := &testdata.DefaultValuesPartialSchema{
// We only popu
shollyman marked this conversation as resolved.
Show resolved Hide resolved
Id: proto.String("someval"),
}
var data []byte
var err error
if data, err = proto.Marshal(m); err != nil {
t.Fatalf("failed to marshal test row data")
}
descriptorProto := protodesc.ToDescriptorProto(m.ProtoReflect().Descriptor())

// setup a new stream.
opts = append(opts, WithDestinationTable(TableParentFromParts(testTable.ProjectID, testTable.DatasetID, testTable.TableID)))
opts = append(opts, WithSchemaDescriptor(descriptorProto))
ms, err := mwClient.NewManagedStream(ctx, opts...)
if err != nil {
t.Fatalf("NewManagedStream: %v", err)
}
validateTableConstraints(ctx, t, bqClient, testTable, "before send",
withExactRowCount(0))

var result *AppendResult

// Send one row, verify default values were set as expected.

result, err = ms.AppendRows(ctx, [][]byte{data})
if err != nil {
t.Errorf("append failed: %v", err)
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}

validateTableConstraints(ctx, t, bqClient, testTable, "after first row",
withExactRowCount(1),
withNonNullCount("id", 1),
withNullCount("strcol_withdef", 1),
withNullCount("intcol_withdef", 1),
withNullCount("otherstr_withdef", 0)) // not part of partial schema

// Change default MVI to use nulls.
// We expect the fields in the partial schema to leverage nulls rather than default values.
// The fields outside the partial schema continue to obey default values.
result, err = ms.AppendRows(ctx, [][]byte{data}, UpdateDefaultMissingValueInterpretation(storagepb.AppendRowsRequest_DEFAULT_VALUE))
if err != nil {
t.Errorf("append failed: %v", err)
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}

validateTableConstraints(ctx, t, bqClient, testTable, "after second row (default mvi is DEFAULT_VALUE)",
withExactRowCount(2),
withNullCount("strcol_withdef", 1), // doesn't increment, as it gets default value
withNullCount("intcol_withdef", 1)) // doesn't increment, as it gets default value

// Change per-column MVI to use default value
result, err = ms.AppendRows(ctx, [][]byte{data},
UpdateMissingValueInterpretations(map[string]storagepb.AppendRowsRequest_MissingValueInterpretation{
"strcol_withdef": storagepb.AppendRowsRequest_NULL_VALUE,
}))
if err != nil {
t.Errorf("append failed: %v", err)
}
// Wait for the result to indicate ready, then validate.
_, err = result.GetResult(ctx)
if err != nil {
t.Errorf("error on append: %v", err)
}

validateTableConstraints(ctx, t, bqClient, testTable, "after third row (explicit column mvi)",
withExactRowCount(3),
withNullCount("strcol_withdef", 2), // increments as it's null for this column
withNullCount("intcol_withdef", 1), // doesn't increment, still default value
withNonNullCount("otherstr_withdef", 3), // not part of descriptor, always gets default value
withNullCount("otherstr", 3), // not part of descriptor, always gets null
withNullCount("strcol", 3), // no default value defined, always gets null
withNullCount("intcol", 3), // no default value defined, always gets null
)
}

func TestIntegration_DetectProjectID(t *testing.T) {
ctx := context.Background()
testCreds := testutil.Credentials(ctx)
Expand Down
17 changes: 12 additions & 5 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ type ManagedStream struct {

streamSettings *streamSettings
// retains the current descriptor for the stream.
curDescVersion *descriptorVersion
c *Client
retry *statelessRetryer
curTemplate *versionedTemplate
c *Client
retry *statelessRetryer

// writer state
mu sync.Mutex
Expand Down Expand Up @@ -298,13 +298,20 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
return nil, err
}
// Ensure we build the request and pending write with a consistent schema version.
curSchemaVersion := ms.curDescVersion
curTemplate := ms.curTemplate
req := ms.buildRequest(data)
pw := newPendingWrite(ctx, ms, req, curSchemaVersion, ms.streamSettings.streamID, ms.streamSettings.TraceID)
pw := newPendingWrite(ctx, ms, req, curTemplate, ms.streamSettings.streamID, ms.streamSettings.TraceID)
// apply AppendOption opts
for _, opt := range opts {
opt(pw)
}
// Post-request fixup after options are applied.
if pw.reqTmpl != nil {
if pw.reqTmpl.tmpl != nil {
// MVIs must be set on each request, but _default_ MVIs persist across the stream lifetime. Sigh.
pw.req.MissingValueInterpretations = pw.reqTmpl.tmpl.GetMissingValueInterpretations()
}
}

// Call the underlying append. The stream has it's own retained context and will surface expiry on
// it's own, but we also need to respect any deadline for the provided context.
Expand Down
18 changes: 9 additions & 9 deletions bigquery/storage/managedwriter/managed_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestManagedStream_RequestOptimization(t *testing.T) {
}
ms.streamSettings.streamID = "FOO"
ms.streamSettings.TraceID = "TRACE"
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))

fakeData := [][]byte{
[]byte("foo"),
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestManagedStream_FlowControllerFailure(t *testing.T) {
router.conn.fc = newFlowController(1, 0)
router.conn.fc.acquire(ctx, 0)

ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))

fakeData := [][]byte{
[]byte("foo"),
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestManagedStream_AppendWithDeadline(t *testing.T) {
t.Errorf("addWriter: %v", err)
}
conn := router.conn
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))

fakeData := [][]byte{
[]byte("foo"),
Expand Down Expand Up @@ -293,7 +293,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) {
ctx: ctx,
streamSettings: defaultStreamSettings(),
}
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter: %v", err)
}
Expand All @@ -316,7 +316,7 @@ func TestManagedStream_ContextExpiry(t *testing.T) {
cancel()

// First, append with an invalid context.
pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curDescVersion, "", "")
pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curTemplate, "", "")
err := ms.appendWithRetry(pw)
if err != context.Canceled {
t.Errorf("expected cancelled context error, got: %v", err)
Expand Down Expand Up @@ -457,7 +457,7 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) {
ctx: ctx,
streamSettings: defaultStreamSettings(),
}
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter: %v", err)
}
Expand Down Expand Up @@ -509,7 +509,7 @@ func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) {
retry: newStatelessRetryer(),
}
ms.retry.maxAttempts = 4
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter: %v", err)
}
Expand Down Expand Up @@ -575,7 +575,7 @@ func TestManagedWriter_CancellationDuringRetry(t *testing.T) {
streamSettings: defaultStreamSettings(),
retry: newStatelessRetryer(),
}
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter: %v", err)
}
Expand Down Expand Up @@ -624,7 +624,7 @@ func TestManagedStream_Closure(t *testing.T) {
streamSettings: defaultStreamSettings(),
}
ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
ms.curDescVersion = newDescriptorVersion(&descriptorpb.DescriptorProto{})
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
if err := pool.addWriter(ms); err != nil {
t.Errorf("addWriter A: %v", err)
}
Expand Down