Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CloseIdleConnections of wrapped Transport RoundTrippers #104844

Merged
merged 2 commits into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 18 additions & 3 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -933,9 +934,23 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nod
}

kubeClientConfigOverrides(s, clientConfig)
closeAllConns, err := updateDialer(clientConfig)
if err != nil {
return nil, nil, err
// Kubelet needs to be able to recover from stale http connections.
// HTTP2 has a mechanism to detect broken connections by sending periodical pings.
// HTTP1 only can have one persistent connection, and it will close all Idle connections
// once the Kubelet heartbeat fails. However, since there are many edge cases that we can't
// control, users can still opt-in to the previous behavior for closing the connections by
// setting the environment variable DISABLE_HTTP2.
var closeAllConns func()
if s := os.Getenv("DISABLE_HTTP2"); len(s) > 0 {
aojea marked this conversation as resolved.
Show resolved Hide resolved
klog.InfoS("HTTP2 has been explicitly disabled, updating Kubelet client Dialer to forcefully close active connections on heartbeat failures")
closeAllConns, err = updateDialer(clientConfig)
if err != nil {
return nil, nil, err
}
} else {
closeAllConns = func() {
utilnet.CloseIdleConnectionsFor(clientConfig.Transport)
}
}
return clientConfig, closeAllConns, nil
}
Expand Down
23 changes: 23 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/util/net/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,29 @@ func DialerFor(transport http.RoundTripper) (DialFunc, error) {
}
}

// CloseIdleConnectionsFor close idles connections for the Transport.
// If the Transport is wrapped it iterates over the wrapped round trippers
// until it finds one that implements the CloseIdleConnections method.
// If the Transport does not have a CloseIdleConnections method
// then this function does nothing.
func CloseIdleConnectionsFor(transport http.RoundTripper) {
if transport == nil {
return
}
type closeIdler interface {
CloseIdleConnections()
}

switch transport := transport.(type) {
case closeIdler:
transport.CloseIdleConnections()
case RoundTripperWrapper:
CloseIdleConnectionsFor(transport.WrappedRoundTripper())
default:
aojea marked this conversation as resolved.
Show resolved Hide resolved
klog.Warningf("unknown transport type: %T", transport)
}
}

type TLSClientConfigHolder interface {
TLSClientConfig() *tls.Config
}
Expand Down
184 changes: 184 additions & 0 deletions staging/src/k8s.io/client-go/rest/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilnet "k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
)

type tcpLB struct {
Expand Down Expand Up @@ -164,6 +165,189 @@ func TestReconnectBrokenTCP(t *testing.T) {
}
}

// 1. connect to https server with http1.1 using a TCP proxy
// 2. the connection has keepalive enabled so it will be reused
// 3. break the TCP connection stopping the proxy
// 4. close the idle connection to force creating a new connection
// 5. count that there are 2 connection to the server (we didn't reuse the original connection)
func TestReconnectBrokenTCP_HTTP1(t *testing.T) {
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "Hello, %s", r.Proto)
}))
ts.EnableHTTP2 = false
ts.StartTLS()
defer ts.Close()

u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("failed to parse URL from %q: %v", ts.URL, err)
}
lb := newLB(t, u.Host)
defer lb.ln.Close()
stopCh := make(chan struct{})
go lb.serve(stopCh)
transport, ok := ts.Client().Transport.(*http.Transport)
if !ok {
t.Fatal("failed to assert *http.Transport")
}
config := &Config{
Host: "https://" + lb.ln.Addr().String(),
Transport: utilnet.SetTransportDefaults(transport),
// large timeout, otherwise the broken connection will be cleaned by it
Timeout: wait.ForeverTestTimeout,
// These fields are required to create a REST client.
ContentConfig: ContentConfig{
GroupVersion: &schema.GroupVersion{},
NegotiatedSerializer: &serializer.CodecFactory{},
},
}
config.TLSClientConfig.NextProtos = []string{"http/1.1"}
client, err := RESTClientFor(config)
if err != nil {
t.Fatalf("failed to create REST client: %v", err)
}

data, err := client.Get().AbsPath("/").DoRaw(context.TODO())
aojea marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("unexpected err: %s: %v", data, err)
}
if string(data) != "Hello, HTTP/1.1" {
t.Fatalf("unexpected response: %s", data)
}

// Deliberately let the LB stop proxying traffic for the current
// connection. This mimics a broken TCP connection that's not properly
// closed.
close(stopCh)

stopCh = make(chan struct{})
go lb.serve(stopCh)
// Close the idle connections
aojea marked this conversation as resolved.
Show resolved Hide resolved
utilnet.CloseIdleConnectionsFor(client.Client.Transport)

// If the client didn't close the idle connections, the broken connection
// would still be in the connection pool, the following request would
// then reuse the broken connection instead of creating a new one, and
// thus would fail.
data, err = client.Get().AbsPath("/").DoRaw(context.TODO())
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if string(data) != "Hello, HTTP/1.1" {
t.Fatalf("unexpected response: %s", data)
}
dials := atomic.LoadInt32(&lb.dials)
if dials != 2 {
t.Fatalf("expected %d dials, got %d", 2, dials)
}
}

// 1. connect to https server with http1.1 using a TCP proxy making the connection to timeout
// 2. the connection has keepalive enabled so it will be reused
// 3. close the in-flight connection to force creating a new connection
// 4. count that there are 2 connection on the LB but only one succeeds
func TestReconnectBrokenTCPInFlight_HTTP1(t *testing.T) {
done := make(chan struct{})
defer close(done)
received := make(chan struct{})

ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/hang" {
conn, _, _ := w.(http.Hijacker).Hijack()
close(received)
<-done
conn.Close()
}
fmt.Fprintf(w, "Hello, %s", r.Proto)
}))
ts.EnableHTTP2 = false
ts.StartTLS()
defer ts.Close()

u, err := url.Parse(ts.URL)
if err != nil {
t.Fatalf("failed to parse URL from %q: %v", ts.URL, err)
}

lb := newLB(t, u.Host)
defer lb.ln.Close()
stopCh := make(chan struct{})
go lb.serve(stopCh)

transport, ok := ts.Client().Transport.(*http.Transport)
if !ok {
t.Fatal("failed to assert *http.Transport")
}
config := &Config{
Host: "https://" + lb.ln.Addr().String(),
Transport: utilnet.SetTransportDefaults(transport),
// Use something extraordinary large to not hit the timeout
Timeout: wait.ForeverTestTimeout,
// These fields are required to create a REST client.
ContentConfig: ContentConfig{
GroupVersion: &schema.GroupVersion{},
NegotiatedSerializer: &serializer.CodecFactory{},
},
}
config.TLSClientConfig.NextProtos = []string{"http/1.1"}

client, err := RESTClientFor(config)
if err != nil {
t.Fatalf("failed to create REST client: %v", err)
}

// The request will connect, hang and eventually time out
// but we can use a context to close once the test is done
// we are only interested in have an inflight connection
ctx, cancel := context.WithCancel(context.Background())
reqErrCh := make(chan error, 1)
defer close(reqErrCh)
go func() {
_, err = client.Get().AbsPath("/hang").DoRaw(ctx)
reqErrCh <- err
}()

// wait until it connect to the server
select {
case <-received:
case <-time.After(wait.ForeverTestTimeout):
t.Fatal("Test timed out waiting for first request to fail")
}

// Deliberately let the LB stop proxying traffic for the current
// connection. This mimics a broken TCP connection that's not properly
// closed.
close(stopCh)

stopCh = make(chan struct{})
go lb.serve(stopCh)

// New request will fail if tries to reuse the connection
data, err := client.Get().AbsPath("/").DoRaw(context.Background())
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if string(data) != "Hello, HTTP/1.1" {
t.Fatalf("unexpected response: %s", data)
}
dials := atomic.LoadInt32(&lb.dials)
if dials != 2 {
t.Fatalf("expected %d dials, got %d", 2, dials)
}

// cancel the in-flight connection
cancel()
select {
case <-reqErrCh:
if err == nil {
t.Fatal("Connection succeeded but was expected to timeout")
}
case <-time.After(10 * time.Second):
aojea marked this conversation as resolved.
Show resolved Hide resolved
t.Fatal("Test timed out waiting for the request to fail")
}

}

func TestRestClientTimeout(t *testing.T) {
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
time.Sleep(2 * time.Second)
Expand Down