diff --git a/bigquery/storage/managedwriter/managed_stream.go b/bigquery/storage/managedwriter/managed_stream.go index 8ccae000ab5..9989d15db56 100644 --- a/bigquery/storage/managedwriter/managed_stream.go +++ b/bigquery/storage/managedwriter/managed_stream.go @@ -205,16 +205,13 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient if arc != ms.arc && !forceReconnect { return ms.arc, ms.pending, nil } - if arc != ms.arc && forceReconnect && ms.arc != nil { - // In this case, we're forcing a close on the existing stream. - // This is due to either needing to reconnect to satisfy the needs of - // the current request (e.g. to signal a schema change), or because - // a previous request on the stream yielded a transient error and we - // want to reconnect before issuing a subsequent request. - // - // TODO: clean this up once internal issue 205756033 is resolved. + // We need to (re)open a connection. Cleanup previous connection and channel if they are present. + if ms.arc != nil { (*ms.arc).CloseSend() } + if ms.pending != nil { + close(ms.pending) + } ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient) *ms.arc, ms.pending, ms.err = ms.openWithRetry() diff --git a/bigquery/storage/managedwriter/managed_stream_test.go b/bigquery/storage/managedwriter/managed_stream_test.go index fa7fdadae8f..97916f2ab55 100644 --- a/bigquery/storage/managedwriter/managed_stream_test.go +++ b/bigquery/storage/managedwriter/managed_stream_test.go @@ -471,6 +471,52 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) { } } +// Ensures we don't lose track of channels/connections during reconnects. +// https://github.com/googleapis/google-cloud-go/issues/6766 +func TestManagedStream_LeakingReconnect(t *testing.T) { + + ctx := context.Background() + + ms := &ManagedStream{ + ctx: ctx, + streamSettings: defaultStreamSettings(), + fc: newFlowController(10, 0), + open: openTestArc(&testAppendRowsClient{}, + func(req *storagepb.AppendRowsRequest) error { + // Append always reports EOF on send. + return io.EOF + }, nil), + } + ms.schemaDescriptor = &descriptorpb.DescriptorProto{ + Name: proto.String("testDescriptor"), + } + + var chans []chan *pendingWrite + + for i := 0; i < 10; i++ { + _, ch, err := ms.getStream(nil, true) + if err != nil { + t.Fatalf("failed openWithRetry(%d): %v", i, err) + } + chans = append(chans, ch) + } + var closedCount int + for _, ch := range chans { + select { + case _, ok := <-ch: + if !ok { + closedCount = closedCount + 1 + } + case <-time.After(time.Second): + // we blocked, likely indicative that the channel is open. + continue + } + } + if wantClosed := len(chans) - 1; wantClosed != closedCount { + t.Errorf("closed count mismatch, got %d want %d", closedCount, wantClosed) + } +} + // Ensures we're propagating call options as expected. // Background: https://github.com/googleapis/google-cloud-go/issues/6487 func TestOpenCallOptionPropagation(t *testing.T) {