Skip to content

Commit

Permalink
fix(bigquery/storage/managedwriter): address possible resource leak (#…
Browse files Browse the repository at this point in the history
…6775)

This PR ensures getStream() is responsible for closing the existing connection and pending write channel if present.

Fixes: #6766
  • Loading branch information
shollyman committed Sep 29, 2022
1 parent e15f112 commit 979440b
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 8 deletions.
13 changes: 5 additions & 8 deletions bigquery/storage/managedwriter/managed_stream.go
Expand Up @@ -205,16 +205,13 @@ func (ms *ManagedStream) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient
if arc != ms.arc && !forceReconnect {
return ms.arc, ms.pending, nil
}
if arc != ms.arc && forceReconnect && ms.arc != nil {
// In this case, we're forcing a close on the existing stream.
// This is due to either needing to reconnect to satisfy the needs of
// the current request (e.g. to signal a schema change), or because
// a previous request on the stream yielded a transient error and we
// want to reconnect before issuing a subsequent request.
//
// TODO: clean this up once internal issue 205756033 is resolved.
// We need to (re)open a connection. Cleanup previous connection and channel if they are present.
if ms.arc != nil {
(*ms.arc).CloseSend()
}
if ms.pending != nil {
close(ms.pending)
}

ms.arc = new(storagepb.BigQueryWrite_AppendRowsClient)
*ms.arc, ms.pending, ms.err = ms.openWithRetry()
Expand Down
46 changes: 46 additions & 0 deletions bigquery/storage/managedwriter/managed_stream_test.go
Expand Up @@ -471,6 +471,52 @@ func TestManagedStream_LeakingGoroutines(t *testing.T) {
}
}

// Ensures we don't lose track of channels/connections during reconnects.
// https://github.com/googleapis/google-cloud-go/issues/6766
func TestManagedStream_LeakingReconnect(t *testing.T) {

ctx := context.Background()

ms := &ManagedStream{
ctx: ctx,
streamSettings: defaultStreamSettings(),
fc: newFlowController(10, 0),
open: openTestArc(&testAppendRowsClient{},
func(req *storagepb.AppendRowsRequest) error {
// Append always reports EOF on send.
return io.EOF
}, nil),
}
ms.schemaDescriptor = &descriptorpb.DescriptorProto{
Name: proto.String("testDescriptor"),
}

var chans []chan *pendingWrite

for i := 0; i < 10; i++ {
_, ch, err := ms.getStream(nil, true)
if err != nil {
t.Fatalf("failed openWithRetry(%d): %v", i, err)
}
chans = append(chans, ch)
}
var closedCount int
for _, ch := range chans {
select {
case _, ok := <-ch:
if !ok {
closedCount = closedCount + 1
}
case <-time.After(time.Second):
// we blocked, likely indicative that the channel is open.
continue
}
}
if wantClosed := len(chans) - 1; wantClosed != closedCount {
t.Errorf("closed count mismatch, got %d want %d", closedCount, wantClosed)
}
}

// Ensures we're propagating call options as expected.
// Background: https://github.com/googleapis/google-cloud-go/issues/6487
func TestOpenCallOptionPropagation(t *testing.T) {
Expand Down

0 comments on commit 979440b

Please sign in to comment.