Skip to content

Commit

Permalink
transport: add a draining state check before creating streams (#6142)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Mar 21, 2023
1 parent a2ca46c commit 7651e62
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
if t.activeStreams == nil { // Can be niled from Close().
if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
Expand Down
28 changes: 17 additions & 11 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,9 @@ func (s) TestLargeMessageWithDelayRead(t *testing.T) {
}
}

// TestGracefulClose ensures that GracefulClose allows in-flight streams to
// proceed until they complete naturally, while not allowing creation of new
// streams during this window.
func (s) TestGracefulClose(t *testing.T) {
server, ct, cancel := setUp(t, 0, math.MaxUint32, pingpong)
defer cancel()
Expand All @@ -817,6 +820,9 @@ func (s) TestGracefulClose(t *testing.T) {
}()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
defer cancel()

// Create a stream that will exist for this whole test and confirm basic
// functionality.
s, err := ct.NewStream(ctx, &CallHdr{})
if err != nil {
t.Fatalf("NewStream(_, _) = _, %v, want _, <nil>", err)
Expand All @@ -837,31 +843,31 @@ func (s) TestGracefulClose(t *testing.T) {
if _, err := s.Read(recvMsg); err != nil {
t.Fatalf("Error while reading: %v", err)
}

// Gracefully close the transport, which should not affect the existing
// stream.
ct.GracefulClose()

var wg sync.WaitGroup
// Expect the failure for all the follow-up streams because ct has been closed gracefully.
// Expect errors creating new streams because the client transport has been
// gracefully closed.
for i := 0; i < 200; i++ {
wg.Add(1)
go func() {
defer wg.Done()
str, err := ct.NewStream(ctx, &CallHdr{})
if err != nil && err.(*NewStreamError).Err == ErrConnClosing {
_, err := ct.NewStream(ctx, &CallHdr{})
if err != nil && err.(*NewStreamError).Err == ErrConnClosing && err.(*NewStreamError).AllowTransparentRetry {
return
} else if err != nil {
t.Errorf("_.NewStream(_, _) = _, %v, want _, %v", err, ErrConnClosing)
return
}
ct.Write(str, nil, nil, &Options{Last: true})
if _, err := str.Read(make([]byte, 8)); err != errStreamDrain && err != ErrConnClosing {
t.Errorf("_.Read(_) = _, %v, want _, %v or %v", err, errStreamDrain, ErrConnClosing)
}
t.Errorf("_.NewStream(_, _) = _, %v, want _, %v", err, ErrConnClosing)
}()
}

// Confirm the existing stream still functions as expected.
ct.Write(s, nil, nil, &Options{Last: true})
if _, err := s.Read(incomingHeader); err != io.EOF {
t.Fatalf("Client expected EOF from the server. Got: %v", err)
}
// The stream which was created before graceful close can still proceed.
wg.Wait()
}

Expand Down

0 comments on commit 7651e62

Please sign in to comment.