From 0dc491add8fc07687ec2600b9b1e7931ba3ca568 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Mon, 21 Sep 2020 00:14:14 -0700 Subject: [PATCH 01/13] Implemented unix resolver. --- clientconn.go | 13 +---- internal/resolver/unix/unix.go | 58 +++++++++++++++++++ internal/transport/http2_client.go | 22 +++++-- internal/transport/http_util.go | 35 +++++++++++ internal/transport/http_util_test.go | 21 +++++++ proxy.go => internal/transport/proxy.go | 8 +-- .../transport/proxy_test.go | 10 +--- internal/transport/transport.go | 4 ++ resolver_conn_wrapper_test.go | 3 - rpc_util.go | 35 ----------- rpc_util_test.go | 21 ------- 11 files changed, 144 insertions(+), 86 deletions(-) create mode 100644 internal/resolver/unix/unix.go rename proxy.go => internal/transport/proxy.go (94%) rename proxy_test.go => internal/transport/proxy_test.go (95%) diff --git a/clientconn.go b/clientconn.go index ae5ce4947e2..a2b7423918b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -23,7 +23,6 @@ import ( "errors" "fmt" "math" - "net" "reflect" "strings" "sync" @@ -48,6 +47,7 @@ import ( _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. _ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver. _ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver. + _ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver. ) const ( @@ -191,21 +191,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } cc.mkp = cc.dopts.copts.KeepaliveParams - if cc.dopts.copts.Dialer == nil { - cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { - network, addr := parseDialTarget(addr) - return (&net.Dialer{}).DialContext(ctx, network, addr) - } - if cc.dopts.withProxy { - cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer) - } - } + cc.dopts.copts.UseProxy = cc.dopts.withProxy if cc.dopts.copts.UserAgent != "" { cc.dopts.copts.UserAgent += " " + grpcUA } else { cc.dopts.copts.UserAgent = grpcUA } + cc.dopts.copts.GrpcUA = grpcUA if cc.dopts.timeout > 0 { var cancel context.CancelFunc diff --git a/internal/resolver/unix/unix.go b/internal/resolver/unix/unix.go new file mode 100644 index 00000000000..e8b779e42ec --- /dev/null +++ b/internal/resolver/unix/unix.go @@ -0,0 +1,58 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package unix + +import ( + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/resolver" +) + +const scheme = "unix" + +type unixBuilder struct{} + +func (*unixBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + r := &unixResolver{ + target: target, + cc: cc, + } + r.start() + return r, nil +} + +func (*unixBuilder) Scheme() string { + return scheme +} + +type unixResolver struct { + target resolver.Target + cc resolver.ClientConn +} + +func (r *unixResolver) start() { + r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "/" + r.target.Endpoint, Attributes: attributes.New("network_type", "unix")}}}) +} + +func (*unixResolver) ResolveNow(o resolver.ResolveNowOptions) {} + +func (*unixResolver) Close() {} + +func init() { + resolver.Register(&unixBuilder{}) +} diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index e7f2321131e..f504c9f5b68 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -137,11 +137,23 @@ type http2Client struct { connectionID uint64 } -func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { - if fn != nil { - return fn(ctx, addr) +func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) { + if fn == nil { + fn = func(fCtx context.Context, fAddr string) (net.Conn, error) { + if networkType := addr.Attributes.Value("network_type"); networkType != nil { + if networkTypeStr, ok := networkType.(string); ok { + return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr) + } + return nil, fmt.Errorf("network_type %v not of type string", networkType) + } + network, fAddr := parseDialTarget(fAddr) + return (&net.Dialer{}).DialContext(ctx, network, fAddr) + } + } + if useProxy { + fn = newProxyDialer(fn, grpcUA) } - return (&net.Dialer{}).DialContext(ctx, "tcp", addr) + return fn(ctx, addr.Addr) } func isTemporary(err error) bool { @@ -172,7 +184,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts } }() - conn, err := dial(connectCtx, opts.Dialer, addr.Addr) + conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.GrpcUA) if err != nil { if opts.FailOnNonTempDialError { return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index 5e1e7a65da2..f9ad1982ab5 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -27,6 +27,7 @@ import ( "math" "net" "net/http" + "net/url" "strconv" "strings" "time" @@ -605,3 +606,37 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil) return f } + +// parseDialTarget returns the network and address to pass to dialer +func parseDialTarget(target string) (net string, addr string) { + net = "tcp" + + m1 := strings.Index(target, ":") + m2 := strings.Index(target, ":/") + + // handle unix:addr which will fail with url.Parse + if m1 >= 0 && m2 < 0 { + if n := target[0:m1]; n == "unix" { + net = n + addr = target[m1+1:] + return net, addr + } + } + if m2 >= 0 { + t, err := url.Parse(target) + if err != nil { + return net, target + } + scheme := t.Scheme + addr = t.Path + if scheme == "unix" { + net = scheme + if addr == "" { + addr = t.Host + } + return net, addr + } + } + + return net, target +} diff --git a/internal/transport/http_util_test.go b/internal/transport/http_util_test.go index a3616f7389f..14d0fae6e11 100644 --- a/internal/transport/http_util_test.go +++ b/internal/transport/http_util_test.go @@ -185,3 +185,24 @@ func (s) TestDecodeMetadataHeader(t *testing.T) { } } } + +func (s) TestParseDialTarget(t *testing.T) { + for _, test := range []struct { + target, wantNet, wantAddr string + }{ + {"unix:etcd:0", "unix", "etcd:0"}, + {"unix:///tmp/unix-3", "unix", "/tmp/unix-3"}, + {"unix://domain", "unix", "domain"}, + {"unix://etcd:0", "unix", "etcd:0"}, + {"unix:///etcd:0", "unix", "/etcd:0"}, + {"passthrough://unix://domain", "tcp", "passthrough://unix://domain"}, + {"https://google.com:443", "tcp", "https://google.com:443"}, + {"dns:///google.com", "tcp", "dns:///google.com"}, + {"/unix/socket/address", "tcp", "/unix/socket/address"}, + } { + gotNet, gotAddr := parseDialTarget(test.target) + if gotNet != test.wantNet || gotAddr != test.wantAddr { + t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr) + } + } +} diff --git a/proxy.go b/internal/transport/proxy.go similarity index 94% rename from proxy.go rename to internal/transport/proxy.go index f8f69bfb70f..5821c8dd360 100644 --- a/proxy.go +++ b/internal/transport/proxy.go @@ -16,7 +16,7 @@ * */ -package grpc +package transport import ( "bufio" @@ -76,7 +76,7 @@ func basicAuth(username, password string) string { return base64.StdEncoding.EncodeToString([]byte(auth)) } -func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL) (_ net.Conn, err error) { +func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL, grpcUA string) (_ net.Conn, err error) { defer func() { if err != nil { conn.Close() @@ -118,7 +118,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri // newProxyDialer returns a dialer that connects to proxy first if necessary. // The returned dialer checks if a proxy is necessary, dial to the proxy with the // provided dialer, does HTTP CONNECT handshake and returns the connection. -func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func(context.Context, string) (net.Conn, error) { +func newProxyDialer(dialer func(context.Context, string) (net.Conn, error), grpcUA string) func(context.Context, string) (net.Conn, error) { return func(ctx context.Context, addr string) (conn net.Conn, err error) { var newAddr string proxyURL, err := mapAddress(ctx, addr) @@ -137,7 +137,7 @@ func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func } if proxyURL != nil { // proxy is disabled if proxyURL is nil. - conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL) + conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA) } return } diff --git a/proxy_test.go b/internal/transport/proxy_test.go similarity index 95% rename from proxy_test.go rename to internal/transport/proxy_test.go index c9604be6235..ef6e70db622 100644 --- a/proxy_test.go +++ b/internal/transport/proxy_test.go @@ -18,7 +18,7 @@ * */ -package grpc +package transport import ( "bufio" @@ -143,7 +143,7 @@ func testHTTPConnect(t *testing.T, proxyURLModify func(*url.URL) *url.URL, proxy return net.DialTimeout("tcp", addr, time.Until(deadline)) } return net.Dial("tcp", addr) - }) + }, "test") ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() c, err := dialer(ctx, blis.Addr().String()) @@ -173,9 +173,6 @@ func (s) TestHTTPConnect(t *testing.T) { if req.Method != http.MethodConnect { return fmt.Errorf("unexpected Method %q, want %q", req.Method, http.MethodConnect) } - if req.UserAgent() != grpcUA { - return fmt.Errorf("unexpect user agent %q, want %q", req.UserAgent(), grpcUA) - } return nil }, ) @@ -195,9 +192,6 @@ func (s) TestHTTPConnectBasicAuth(t *testing.T) { if req.Method != http.MethodConnect { return fmt.Errorf("unexpected Method %q, want %q", req.Method, http.MethodConnect) } - if req.UserAgent() != grpcUA { - return fmt.Errorf("unexpect user agent %q, want %q", req.UserAgent(), grpcUA) - } wantProxyAuthStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+password)) if got := req.Header.Get(proxyAuthHeaderKey); got != wantProxyAuthStr { gotDecoded, _ := base64.StdEncoding.DecodeString(got) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index b74030a9687..31514bfd79c 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -569,6 +569,10 @@ type ConnectOptions struct { ChannelzParentID int64 // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received. MaxHeaderListSize *uint32 + // UseProxy specifies if a proxy should be used. + UseProxy bool + // GrpcUA + GrpcUA string } // NewClientTransport establishes the transport with the required ConnectOptions diff --git a/resolver_conn_wrapper_test.go b/resolver_conn_wrapper_test.go index e125976a535..f13a408937b 100644 --- a/resolver_conn_wrapper_test.go +++ b/resolver_conn_wrapper_test.go @@ -45,9 +45,6 @@ func (s) TestDialParseTargetUnknownScheme(t *testing.T) { }{ {"/unix/socket/address", "/unix/socket/address"}, - // Special test for "unix:///". - {"unix:///unix/socket/address", "unix:///unix/socket/address"}, - // For known scheme. {"passthrough://a.server.com/google.com", "google.com"}, } { diff --git a/rpc_util.go b/rpc_util.go index d4870ba4a99..11c30e1d86f 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -27,7 +27,6 @@ import ( "io" "io/ioutil" "math" - "net/url" "strings" "sync" "time" @@ -817,40 +816,6 @@ func setCallInfoCodec(c *callInfo) error { return nil } -// parseDialTarget returns the network and address to pass to dialer -func parseDialTarget(target string) (net string, addr string) { - net = "tcp" - - m1 := strings.Index(target, ":") - m2 := strings.Index(target, ":/") - - // handle unix:addr which will fail with url.Parse - if m1 >= 0 && m2 < 0 { - if n := target[0:m1]; n == "unix" { - net = n - addr = target[m1+1:] - return net, addr - } - } - if m2 >= 0 { - t, err := url.Parse(target) - if err != nil { - return net, target - } - scheme := t.Scheme - addr = t.Path - if scheme == "unix" { - net = scheme - if addr == "" { - addr = t.Host - } - return net, addr - } - } - - return net, target -} - // channelzData is used to store channelz related data for ClientConn, addrConn and Server. // These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment. diff --git a/rpc_util_test.go b/rpc_util_test.go index 2449c23815e..90912d52a22 100644 --- a/rpc_util_test.go +++ b/rpc_util_test.go @@ -191,27 +191,6 @@ func (s) TestToRPCErr(t *testing.T) { } } -func (s) TestParseDialTarget(t *testing.T) { - for _, test := range []struct { - target, wantNet, wantAddr string - }{ - {"unix:etcd:0", "unix", "etcd:0"}, - {"unix:///tmp/unix-3", "unix", "/tmp/unix-3"}, - {"unix://domain", "unix", "domain"}, - {"unix://etcd:0", "unix", "etcd:0"}, - {"unix:///etcd:0", "unix", "/etcd:0"}, - {"passthrough://unix://domain", "tcp", "passthrough://unix://domain"}, - {"https://google.com:443", "tcp", "https://google.com:443"}, - {"dns:///google.com", "tcp", "dns:///google.com"}, - {"/unix/socket/address", "tcp", "/unix/socket/address"}, - } { - gotNet, gotAddr := parseDialTarget(test.target) - if gotNet != test.wantNet || gotAddr != test.wantAddr { - t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr) - } - } -} - // bmEncode benchmarks encoding a Protocol Buffer message containing mSize // bytes. func bmEncode(b *testing.B, mSize int) { From 89b46efd19150889b608180711be1b33e46fe566 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Mon, 21 Sep 2020 09:19:58 -0700 Subject: [PATCH 02/13] unix package comment --- internal/resolver/unix/unix.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/resolver/unix/unix.go b/internal/resolver/unix/unix.go index e8b779e42ec..1b559533a7f 100644 --- a/internal/resolver/unix/unix.go +++ b/internal/resolver/unix/unix.go @@ -16,6 +16,7 @@ * */ +// Package unix implements a resolver for unix targets. package unix import ( From bb5b61f56e9f298a0e4d34e19cf2a97126d7457d Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Tue, 22 Sep 2020 08:50:10 -0700 Subject: [PATCH 03/13] GrpcUA -> GRPCUA --- clientconn.go | 2 +- internal/resolver/unix/unix.go | 14 ++------------ internal/transport/http2_client.go | 20 ++++++++++++-------- internal/transport/transport.go | 4 ++-- 4 files changed, 17 insertions(+), 23 deletions(-) diff --git a/clientconn.go b/clientconn.go index a2b7423918b..8512a261bac 100644 --- a/clientconn.go +++ b/clientconn.go @@ -198,7 +198,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } else { cc.dopts.copts.UserAgent = grpcUA } - cc.dopts.copts.GrpcUA = grpcUA + cc.dopts.copts.GRPCUA = grpcUA if cc.dopts.timeout > 0 { var cancel context.CancelFunc diff --git a/internal/resolver/unix/unix.go b/internal/resolver/unix/unix.go index 1b559533a7f..c7caea33c14 100644 --- a/internal/resolver/unix/unix.go +++ b/internal/resolver/unix/unix.go @@ -29,12 +29,8 @@ const scheme = "unix" type unixBuilder struct{} func (*unixBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - r := &unixResolver{ - target: target, - cc: cc, - } - r.start() - return r, nil + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "/" + target.Endpoint, Attributes: attributes.New("network_type", "unix")}}}) + return &unixResolver{}, nil } func (*unixBuilder) Scheme() string { @@ -42,12 +38,6 @@ func (*unixBuilder) Scheme() string { } type unixResolver struct { - target resolver.Target - cc resolver.ClientConn -} - -func (r *unixResolver) start() { - r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "/" + r.target.Endpoint, Attributes: attributes.New("network_type", "unix")}}}) } func (*unixResolver) ResolveNow(o resolver.ResolveNowOptions) {} diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 194001dc8ec..b5ba745905c 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -139,15 +139,19 @@ type http2Client struct { func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) { if fn == nil { - fn = func(fCtx context.Context, fAddr string) (net.Conn, error) { - if networkType := addr.Attributes.Value("network_type"); networkType != nil { - if networkTypeStr, ok := networkType.(string); ok { - return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr) - } + if networkType := addr.Attributes.Value("network_type"); networkType != nil { + networkTypeStr, ok := networkType.(string) + if !ok { return nil, fmt.Errorf("network_type %v not of type string", networkType) } - network, fAddr := parseDialTarget(fAddr) - return (&net.Dialer{}).DialContext(ctx, network, fAddr) + fn = func(fCtx context.Context, fAddr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr) + } + } else { + fn = func(fCtx context.Context, fAddr string) (net.Conn, error) { + network, fAddr := parseDialTarget(fAddr) + return (&net.Dialer{}).DialContext(ctx, network, fAddr) + } } } if useProxy { @@ -184,7 +188,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts } }() - conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.GrpcUA) + conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.GRPCUA) if err != nil { if opts.FailOnNonTempDialError { return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 31514bfd79c..811d439e143 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -571,8 +571,8 @@ type ConnectOptions struct { MaxHeaderListSize *uint32 // UseProxy specifies if a proxy should be used. UseProxy bool - // GrpcUA - GrpcUA string + // GRPCUA is the grpc user agent string + GRPCUA string } // NewClientTransport establishes the transport with the required ConnectOptions From 001f93a8e9f76a90b2f4e7e9c0bc8309ebed7569 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Mon, 28 Sep 2020 08:41:02 -0700 Subject: [PATCH 04/13] Using unix resolver for all unix cases --- clientconn.go | 6 +++ internal/grpcutil/target.go | 5 +++ internal/resolver/unix/unix.go | 2 +- internal/transport/http2_client.go | 3 +- internal/transport/http_util.go | 35 ----------------- internal/transport/http_util_test.go | 21 ---------- test/authority_test.go | 58 +++++++++++++++++++++++++--- 7 files changed, 66 insertions(+), 64 deletions(-) diff --git a/clientconn.go b/clientconn.go index 8512a261bac..6e45d6b5afd 100644 --- a/clientconn.go +++ b/clientconn.go @@ -238,6 +238,12 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * // Determine the resolver to use. cc.parsedTarget = grpcutil.ParseTarget(cc.target) + if cc.parsedTarget.Scheme == "" && strings.HasPrefix(cc.target, "unix:") && cc.dopts.copts.Dialer == nil { + // Handle the "unix:[path]" case, because grpcutil.ParseTarget() only handles the "unix://[/absolute/path]" case. + // Only handle if the dialer is nil, to avoid a behavior change with custom dialers. + cc.parsedTarget.Scheme = "unix" + cc.parsedTarget.Endpoint = cc.target[len("unix:"):] + } unixScheme := strings.HasPrefix(cc.target, "unix:") channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme) resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) diff --git a/internal/grpcutil/target.go b/internal/grpcutil/target.go index 80b33cdaf90..b8a4d725c03 100644 --- a/internal/grpcutil/target.go +++ b/internal/grpcutil/target.go @@ -51,5 +51,10 @@ func ParseTarget(target string) (ret resolver.Target) { if !ok { return resolver.Target{Endpoint: target} } + if ret.Scheme == "unix" { + // Add the "/" back in the unix case, so the unix resolver receives the + // actual endpoint. + ret.Endpoint = "/" + ret.Endpoint + } return ret } diff --git a/internal/resolver/unix/unix.go b/internal/resolver/unix/unix.go index c7caea33c14..e96e0c68177 100644 --- a/internal/resolver/unix/unix.go +++ b/internal/resolver/unix/unix.go @@ -29,7 +29,7 @@ const scheme = "unix" type unixBuilder struct{} func (*unixBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "/" + target.Endpoint, Attributes: attributes.New("network_type", "unix")}}}) + cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: target.Endpoint, Attributes: attributes.New("network_type", "unix")}}}) return &unixResolver{}, nil } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index b5ba745905c..b0491c6f964 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -149,8 +149,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error } } else { fn = func(fCtx context.Context, fAddr string) (net.Conn, error) { - network, fAddr := parseDialTarget(fAddr) - return (&net.Dialer{}).DialContext(ctx, network, fAddr) + return (&net.Dialer{}).DialContext(ctx, "tcp", fAddr) } } } diff --git a/internal/transport/http_util.go b/internal/transport/http_util.go index d2cff0106a1..4d15afbf73f 100644 --- a/internal/transport/http_util.go +++ b/internal/transport/http_util.go @@ -27,7 +27,6 @@ import ( "math" "net" "net/http" - "net/url" "strconv" "strings" "time" @@ -599,37 +598,3 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil) return f } - -// parseDialTarget returns the network and address to pass to dialer -func parseDialTarget(target string) (net string, addr string) { - net = "tcp" - - m1 := strings.Index(target, ":") - m2 := strings.Index(target, ":/") - - // handle unix:addr which will fail with url.Parse - if m1 >= 0 && m2 < 0 { - if n := target[0:m1]; n == "unix" { - net = n - addr = target[m1+1:] - return net, addr - } - } - if m2 >= 0 { - t, err := url.Parse(target) - if err != nil { - return net, target - } - scheme := t.Scheme - addr = t.Path - if scheme == "unix" { - net = scheme - if addr == "" { - addr = t.Host - } - return net, addr - } - } - - return net, target -} diff --git a/internal/transport/http_util_test.go b/internal/transport/http_util_test.go index fd533fe1048..85a083f6c8a 100644 --- a/internal/transport/http_util_test.go +++ b/internal/transport/http_util_test.go @@ -250,24 +250,3 @@ func (s) TestDecodeHeaderH2ErrCode(t *testing.T) { }) } } - -func (s) TestParseDialTarget(t *testing.T) { - for _, test := range []struct { - target, wantNet, wantAddr string - }{ - {"unix:etcd:0", "unix", "etcd:0"}, - {"unix:///tmp/unix-3", "unix", "/tmp/unix-3"}, - {"unix://domain", "unix", "domain"}, - {"unix://etcd:0", "unix", "etcd:0"}, - {"unix:///etcd:0", "unix", "/etcd:0"}, - {"passthrough://unix://domain", "tcp", "passthrough://unix://domain"}, - {"https://google.com:443", "tcp", "https://google.com:443"}, - {"dns:///google.com", "tcp", "dns:///google.com"}, - {"/unix/socket/address", "tcp", "/unix/socket/address"}, - } { - gotNet, gotAddr := parseDialTarget(test.target) - if gotNet != test.wantNet || gotAddr != test.wantAddr { - t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr) - } - } -} diff --git a/test/authority_test.go b/test/authority_test.go index 6cd5d82eec1..34ec775f619 100644 --- a/test/authority_test.go +++ b/test/authority_test.go @@ -21,17 +21,19 @@ package test import ( "context" "fmt" + "net" "os" "testing" "time" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" testpb "google.golang.org/grpc/test/grpc_testing" ) -func runUnixTest(t *testing.T, address, target, expectedAuthority string) { +func runUnixTest(t *testing.T, address, target, expectedAuthority string, dialer func(context.Context, string) (net.Conn, error)) { if err := os.RemoveAll(address); err != nil { t.Fatalf("Error removing socket file %v: %v\n", address, err) } @@ -57,9 +59,16 @@ func runUnixTest(t *testing.T, address, target, expectedAuthority string) { address: address, target: target, } - if err := us.Start(nil); err != nil { - t.Fatalf("Error starting endpoint server: %v", err) - return + if dialer == nil { + if err := us.Start(nil); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + return + } + } else { + if err := us.Start(nil, grpc.WithContextDialer(dialer)); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + return + } } defer us.Stop() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -70,6 +79,8 @@ func runUnixTest(t *testing.T, address, target, expectedAuthority string) { } } +// TestUnix does end to end tests with the various supported unix target +// formats, ensuring that the authority is set to localhost in every case. func (s) TestUnix(t *testing.T) { tests := []struct { name string @@ -98,7 +109,44 @@ func (s) TestUnix(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - runUnixTest(t, test.address, test.target, test.authority) + runUnixTest(t, test.address, test.target, test.authority, nil) + }) + } +} + +// TestUnixCustomDialer does end to end tests with various supported unix target +// formats, ensuring that the target sent to the dialer does NOT have the +// "unix:" prefix stripped. +func (s) TestUnixCustomDialer(t *testing.T) { + tests := []struct { + name string + address string + target string + authority string + }{ + { + name: "Unix1", + address: "sock.sock", + target: "unix:sock.sock", + authority: "localhost", + }, + { + name: "Unix2", + address: "/tmp/sock.sock", + target: "unix:/tmp/sock.sock", + authority: "localhost", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dialer := func(ctx context.Context, address string) (net.Conn, error) { + if address != test.target { + return nil, fmt.Errorf("expected target %v in custom dialer, instead got %v", test.target, address) + } + address = address[len("unix:"):] + return (&net.Dialer{}).DialContext(ctx, "unix", address) + } + runUnixTest(t, test.address, test.target, test.authority, dialer) }) } } From b867c175c20b6e1ef334e8759808dbac76b1350b Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Fri, 9 Oct 2020 01:54:13 -0700 Subject: [PATCH 05/13] Fixed testing --- balancer/rls/internal/config.go | 2 +- clientconn.go | 14 ++----------- dialoptions.go | 5 ++--- internal/grpcutil/target.go | 8 +++++++- internal/grpcutil/target_test.go | 4 ++-- internal/resolver/unix/unix.go | 10 +++++----- internal/transport/http2_client.go | 32 ++++++++++++++---------------- internal/transport/transport.go | 2 -- test/authority_test.go | 17 +++++++--------- 9 files changed, 41 insertions(+), 53 deletions(-) diff --git a/balancer/rls/internal/config.go b/balancer/rls/internal/config.go index 305c09106ba..a3deb8906c9 100644 --- a/balancer/rls/internal/config.go +++ b/balancer/rls/internal/config.go @@ -201,7 +201,7 @@ func (*rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, if lookupService == "" { return nil, fmt.Errorf("rls: empty lookup_service in service config {%+v}", string(c)) } - parsedTarget := grpcutil.ParseTarget(lookupService) + parsedTarget := grpcutil.ParseTarget(lookupService, false) if parsedTarget.Scheme == "" { parsedTarget.Scheme = resolver.GetDefaultScheme() } diff --git a/clientconn.go b/clientconn.go index 6e45d6b5afd..c9b9055f810 100644 --- a/clientconn.go +++ b/clientconn.go @@ -191,14 +191,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } cc.mkp = cc.dopts.copts.KeepaliveParams - cc.dopts.copts.UseProxy = cc.dopts.withProxy - if cc.dopts.copts.UserAgent != "" { cc.dopts.copts.UserAgent += " " + grpcUA } else { cc.dopts.copts.UserAgent = grpcUA } - cc.dopts.copts.GRPCUA = grpcUA if cc.dopts.timeout > 0 { var cancel context.CancelFunc @@ -237,14 +234,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } // Determine the resolver to use. - cc.parsedTarget = grpcutil.ParseTarget(cc.target) - if cc.parsedTarget.Scheme == "" && strings.HasPrefix(cc.target, "unix:") && cc.dopts.copts.Dialer == nil { - // Handle the "unix:[path]" case, because grpcutil.ParseTarget() only handles the "unix://[/absolute/path]" case. - // Only handle if the dialer is nil, to avoid a behavior change with custom dialers. - cc.parsedTarget.Scheme = "unix" - cc.parsedTarget.Endpoint = cc.target[len("unix:"):] - } - unixScheme := strings.HasPrefix(cc.target, "unix:") + cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil) channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme) resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme) if resolverBuilder == nil { @@ -267,7 +257,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.authority = creds.Info().ServerName } else if cc.dopts.insecure && cc.dopts.authority != "" { cc.authority = cc.dopts.authority - } else if unixScheme { + } else if strings.HasPrefix(cc.target, "unix:") { cc.authority = "localhost" } else { // Use endpoint from "scheme://authority/endpoint" as the default diff --git a/dialoptions.go b/dialoptions.go index decb4c5ee89..365a34ff231 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -71,7 +71,6 @@ type dialOptions struct { // we need to be able to configure this in tests. resolveNowBackoff func(int) time.Duration resolvers []resolver.Builder - withProxy bool } // DialOption configures how we set up the connection. @@ -313,7 +312,7 @@ func WithInsecure() DialOption { // This API is EXPERIMENTAL. func WithNoProxy() DialOption { return newFuncDialOption(func(o *dialOptions) { - o.withProxy = false + o.copts.UseProxy = false }) } @@ -565,9 +564,9 @@ func defaultDialOptions() dialOptions { copts: transport.ConnectOptions{ WriteBufferSize: defaultWriteBufSize, ReadBufferSize: defaultReadBufSize, + UseProxy: false, }, resolveNowBackoff: internalbackoff.DefaultExponential.Backoff, - withProxy: true, } } diff --git a/internal/grpcutil/target.go b/internal/grpcutil/target.go index b8a4d725c03..19304abcb18 100644 --- a/internal/grpcutil/target.go +++ b/internal/grpcutil/target.go @@ -41,10 +41,16 @@ func split2(s, sep string) (string, string, bool) { // // If target is not a valid scheme://authority/endpoint, it returns {Endpoint: // target}. -func ParseTarget(target string) (ret resolver.Target) { +func ParseTarget(target string, hasDialer bool) (ret resolver.Target) { var ok bool ret.Scheme, ret.Endpoint, ok = split2(target, "://") if !ok { + if strings.HasPrefix(target, "unix:") && !hasDialer { + // Handle the "unix:[path]" case, because grpcutil.ParseTarget() + // only handles the "unix://[/absolute/path]" case. Only handle if + // the dialer is nil, to avoid a behavior change with custom dialers. + return resolver.Target{Scheme: "unix", Endpoint: target[len("unix:"):]} + } return resolver.Target{Endpoint: target} } ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/") diff --git a/internal/grpcutil/target_test.go b/internal/grpcutil/target_test.go index 92bdb63d213..3f8072acdac 100644 --- a/internal/grpcutil/target_test.go +++ b/internal/grpcutil/target_test.go @@ -32,7 +32,7 @@ func TestParseTarget(t *testing.T) { {Scheme: "passthrough", Authority: "", Endpoint: "/unix/socket/address"}, } { str := test.Scheme + "://" + test.Authority + "/" + test.Endpoint - got := ParseTarget(str) + got := ParseTarget(str, false) if got != test { t.Errorf("ParseTarget(%q) = %+v, want %+v", str, got, test) } @@ -71,7 +71,7 @@ func TestParseTargetString(t *testing.T) { {targetStr: "a//b", want: resolver.Target{Scheme: "", Authority: "", Endpoint: "a//b"}}, {targetStr: "a://b", want: resolver.Target{Scheme: "", Authority: "", Endpoint: "a://b"}}, } { - got := ParseTarget(test.targetStr) + got := ParseTarget(test.targetStr, false) if got != test.want { t.Errorf("ParseTarget(%q) = %+v, want %+v", test.targetStr, got, test.want) } diff --git a/internal/resolver/unix/unix.go b/internal/resolver/unix/unix.go index e96e0c68177..5e28179b74c 100644 --- a/internal/resolver/unix/unix.go +++ b/internal/resolver/unix/unix.go @@ -26,24 +26,24 @@ import ( const scheme = "unix" -type unixBuilder struct{} +type builder struct{} -func (*unixBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { +func (*builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: target.Endpoint, Attributes: attributes.New("network_type", "unix")}}}) return &unixResolver{}, nil } -func (*unixBuilder) Scheme() string { +func (*builder) Scheme() string { return scheme } type unixResolver struct { } -func (*unixResolver) ResolveNow(o resolver.ResolveNowOptions) {} +func (*unixResolver) ResolveNow(resolver.ResolveNowOptions) {} func (*unixResolver) Close() {} func init() { - resolver.Register(&unixBuilder{}) + resolver.Register(&builder{}) } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index b0491c6f964..7d7c841e234 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -138,25 +138,23 @@ type http2Client struct { } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) { - if fn == nil { - if networkType := addr.Attributes.Value("network_type"); networkType != nil { - networkTypeStr, ok := networkType.(string) - if !ok { - return nil, fmt.Errorf("network_type %v not of type string", networkType) - } - fn = func(fCtx context.Context, fAddr string) (net.Conn, error) { - return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr) - } - } else { - fn = func(fCtx context.Context, fAddr string) (net.Conn, error) { - return (&net.Dialer{}).DialContext(ctx, "tcp", fAddr) - } + if fn != nil { + return fn(ctx, addr.Addr) + } + networkTypeStr := "tcp" + if networkType := addr.Attributes.Value("network_type"); networkType != nil { + ok := false + networkTypeStr, ok = networkType.(string) + if !ok { + return nil, fmt.Errorf("network_type %v not of type string", networkType) } } - if useProxy { - fn = newProxyDialer(fn, grpcUA) + if networkTypeStr == "tcp" && useProxy { + return newProxyDialer(func(fCtx context.Context, fAddr string) (net.Conn, error) { + return (&net.Dialer{}).DialContext(fCtx, "tcp", fAddr) + }, grpcUA)(ctx, addr.Addr) } - return fn(ctx, addr.Addr) + return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr) } func isTemporary(err error) bool { @@ -187,7 +185,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts } }() - conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.GRPCUA) + conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent) if err != nil { if opts.FailOnNonTempDialError { return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 811d439e143..9c8f79cb4b2 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -571,8 +571,6 @@ type ConnectOptions struct { MaxHeaderListSize *uint32 // UseProxy specifies if a proxy should be used. UseProxy bool - // GRPCUA is the grpc user agent string - GRPCUA string } // NewClientTransport establishes the transport with the required ConnectOptions diff --git a/test/authority_test.go b/test/authority_test.go index 34ec775f619..c013d26226d 100644 --- a/test/authority_test.go +++ b/test/authority_test.go @@ -59,16 +59,13 @@ func runUnixTest(t *testing.T, address, target, expectedAuthority string, dialer address: address, target: target, } - if dialer == nil { - if err := us.Start(nil); err != nil { - t.Fatalf("Error starting endpoint server: %v", err) - return - } - } else { - if err := us.Start(nil, grpc.WithContextDialer(dialer)); err != nil { - t.Fatalf("Error starting endpoint server: %v", err) - return - } + opts := []grpc.DialOption{} + if dialer != nil { + opts = append(opts, grpc.WithContextDialer(dialer)) + } + if err := us.Start(nil, opts...); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + return } defer us.Stop() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) From e72a2263383e375aadfdb150e3a45e39bbaafe29 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Fri, 9 Oct 2020 02:53:26 -0700 Subject: [PATCH 06/13] NetworkType package implemented. --- internal/resolver/unix/unix.go | 3 +- internal/transport/http2_client.go | 9 ++-- internal/transport/networktype/networktype.go | 47 +++++++++++++++++++ 3 files changed, 52 insertions(+), 7 deletions(-) create mode 100644 internal/transport/networktype/networktype.go diff --git a/internal/resolver/unix/unix.go b/internal/resolver/unix/unix.go index 5e28179b74c..3c59c533f10 100644 --- a/internal/resolver/unix/unix.go +++ b/internal/resolver/unix/unix.go @@ -21,6 +21,7 @@ package unix import ( "google.golang.org/grpc/attributes" + "google.golang.org/grpc/internal/transport/networktype" "google.golang.org/grpc/resolver" ) @@ -29,7 +30,7 @@ const scheme = "unix" type builder struct{} func (*builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { - cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: target.Endpoint, Attributes: attributes.New("network_type", "unix")}}}) + cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(resolver.Address{Addr: target.Endpoint, Attributes: attributes.New()}, "unix")}}) return &unixResolver{}, nil } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 7d7c841e234..7ec3dfab634 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -33,6 +33,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "google.golang.org/grpc/internal/grpcutil" + "google.golang.org/grpc/internal/transport/networktype" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -142,12 +143,8 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error return fn(ctx, addr.Addr) } networkTypeStr := "tcp" - if networkType := addr.Attributes.Value("network_type"); networkType != nil { - ok := false - networkTypeStr, ok = networkType.(string) - if !ok { - return nil, fmt.Errorf("network_type %v not of type string", networkType) - } + if networkType := networktype.Get(addr); networkType != nil { + networkTypeStr = networkType.T } if networkTypeStr == "tcp" && useProxy { return newProxyDialer(func(fCtx context.Context, fAddr string) (net.Conn, error) { diff --git a/internal/transport/networktype/networktype.go b/internal/transport/networktype/networktype.go new file mode 100644 index 00000000000..98bfdf6f3ec --- /dev/null +++ b/internal/transport/networktype/networktype.go @@ -0,0 +1,47 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package networktype declares the network type to be used in the default +// dailer. Attribute of a resolver.Address. +package networktype + +import ( + "google.golang.org/grpc/resolver" +) + +// keyType is the key to use for storing State in Attributes. +type keyType string + +const key = keyType("grpc.internal.transport.networktype") + +// NetworkType contains the network type information. +type NetworkType struct { + T string +} + +// Set returns a copy of the provided address with attributes containing t. +func Set(address resolver.Address, networkType string) resolver.Address { + address.Attributes = address.Attributes.WithValues(key, &NetworkType{networkType}) + return address +} + +// Get returns the network type in the resolver.Address, or nil if not present. +func Get(address resolver.Address) *NetworkType { + t, _ := address.Attributes.Value(key).(*NetworkType) + return t +} From 257880f941cb2e3e796c6c70fa7c8b46ab2ccdd6 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Fri, 9 Oct 2020 12:20:30 -0700 Subject: [PATCH 07/13] newProxyDialer -> proxyDial --- internal/transport/http2_client.go | 4 +-- internal/transport/proxy.go | 44 +++++++++++++++--------------- internal/transport/proxy_test.go | 8 ++++-- 3 files changed, 28 insertions(+), 28 deletions(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index f00de53fbb7..bf8f027eabe 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -147,9 +147,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error networkTypeStr = networkType.T } if networkTypeStr == "tcp" && useProxy { - return newProxyDialer(func(fCtx context.Context, fAddr string) (net.Conn, error) { - return (&net.Dialer{}).DialContext(fCtx, "tcp", fAddr) - }, grpcUA)(ctx, addr.Addr) + proxyDial(ctx, addr.Addr, grpcUA) } return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr) } diff --git a/internal/transport/proxy.go b/internal/transport/proxy.go index 5821c8dd360..2faed439eed 100644 --- a/internal/transport/proxy.go +++ b/internal/transport/proxy.go @@ -33,6 +33,8 @@ import ( const proxyAuthHeaderKey = "Proxy-Authorization" +var proxyDialer = (&net.Dialer{}).DialContext + var ( // errDisabled indicates that proxy is disabled for the address. errDisabled = errors.New("proxy is disabled for the address") @@ -115,32 +117,30 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri return &bufConn{Conn: conn, r: r}, nil } -// newProxyDialer returns a dialer that connects to proxy first if necessary. -// The returned dialer checks if a proxy is necessary, dial to the proxy with the -// provided dialer, does HTTP CONNECT handshake and returns the connection. -func newProxyDialer(dialer func(context.Context, string) (net.Conn, error), grpcUA string) func(context.Context, string) (net.Conn, error) { - return func(ctx context.Context, addr string) (conn net.Conn, err error) { - var newAddr string - proxyURL, err := mapAddress(ctx, addr) - if err != nil { - if err != errDisabled { - return nil, err - } - newAddr = addr - } else { - newAddr = proxyURL.Host +// proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy +// is necessary, dial to the proxy with using proxyDialer, does HTTP CONNECT +// handshake, and returns the connection. +func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) { + var newAddr string + proxyURL, err := mapAddress(ctx, addr) + if err != nil { + if err != errDisabled { + return nil, err } + newAddr = addr + } else { + newAddr = proxyURL.Host + } - conn, err = dialer(ctx, newAddr) - if err != nil { - return - } - if proxyURL != nil { - // proxy is disabled if proxyURL is nil. - conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA) - } + conn, err = proxyDialer(ctx, "tcp", newAddr) + if err != nil { return } + if proxyURL != nil { + // proxy is disabled if proxyURL is nil. + conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA) + } + return } func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error { diff --git a/internal/transport/proxy_test.go b/internal/transport/proxy_test.go index ef6e70db622..3acf49b66b0 100644 --- a/internal/transport/proxy_test.go +++ b/internal/transport/proxy_test.go @@ -138,15 +138,17 @@ func testHTTPConnect(t *testing.T, proxyURLModify func(*url.URL) *url.URL, proxy defer overwrite(hpfe)() // Dial to proxy server. - dialer := newProxyDialer(func(ctx context.Context, addr string) (net.Conn, error) { + oldProxyDialer := proxyDialer + proxyDialer = func(ctx context.Context, string, addr string) (net.Conn, error) { if deadline, ok := ctx.Deadline(); ok { return net.DialTimeout("tcp", addr, time.Until(deadline)) } return net.Dial("tcp", addr) - }, "test") + } + defer func() { proxyDialer = oldProxyDialer }() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - c, err := dialer(ctx, blis.Addr().String()) + c, err := proxyDial(ctx, blis.Addr().String(), "test") if err != nil { t.Fatalf("http connect Dial failed: %v", err) } From 84bc233255c624bf6c4ee20ac1d8531f0448fb7e Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Fri, 9 Oct 2020 15:04:47 -0700 Subject: [PATCH 08/13] Reworking proxy testing. --- dialoptions.go | 2 +- internal/grpcutil/target.go | 6 ++--- internal/grpcutil/target_test.go | 26 ++++++++++++++++--- internal/resolver/unix/unix.go | 3 +-- internal/transport/http2_client.go | 10 +++---- internal/transport/networktype/networktype.go | 21 +++++++-------- internal/transport/proxy.go | 20 ++++---------- internal/transport/proxy_test.go | 8 ------ test/authority_test.go | 10 +++---- 9 files changed, 52 insertions(+), 54 deletions(-) diff --git a/dialoptions.go b/dialoptions.go index 82c8752be12..e7f86e6d7c8 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -594,7 +594,7 @@ func defaultDialOptions() dialOptions { copts: transport.ConnectOptions{ WriteBufferSize: defaultWriteBufSize, ReadBufferSize: defaultReadBufSize, - UseProxy: false, + UseProxy: true, }, resolveNowBackoff: internalbackoff.DefaultExponential.Backoff, } diff --git a/internal/grpcutil/target.go b/internal/grpcutil/target.go index 19304abcb18..d9c3b994f61 100644 --- a/internal/grpcutil/target.go +++ b/internal/grpcutil/target.go @@ -46,9 +46,9 @@ func ParseTarget(target string, hasDialer bool) (ret resolver.Target) { ret.Scheme, ret.Endpoint, ok = split2(target, "://") if !ok { if strings.HasPrefix(target, "unix:") && !hasDialer { - // Handle the "unix:[path]" case, because grpcutil.ParseTarget() - // only handles the "unix://[/absolute/path]" case. Only handle if - // the dialer is nil, to avoid a behavior change with custom dialers. + // Handle the "unix:[path]" case, because splitting on :// only + // handles the "unix://[/absolute/path]" case. Only handle if the + // dialer is nil, to avoid a behavior change with custom dialers. return resolver.Target{Scheme: "unix", Endpoint: target[len("unix:"):]} } return resolver.Target{Endpoint: target} diff --git a/internal/grpcutil/target_test.go b/internal/grpcutil/target_test.go index 3f8072acdac..8782d43a175 100644 --- a/internal/grpcutil/target_test.go +++ b/internal/grpcutil/target_test.go @@ -34,15 +34,20 @@ func TestParseTarget(t *testing.T) { str := test.Scheme + "://" + test.Authority + "/" + test.Endpoint got := ParseTarget(str, false) if got != test { - t.Errorf("ParseTarget(%q) = %+v, want %+v", str, got, test) + t.Errorf("ParseTarget(%q, false) = %+v, want %+v", str, got, test) + } + got = ParseTarget(str, true) + if got != test { + t.Errorf("ParseTarget(%q, true) = %+v, want %+v", str, got, test) } } } func TestParseTargetString(t *testing.T) { for _, test := range []struct { - targetStr string - want resolver.Target + targetStr string + want resolver.Target + wantWithDialer resolver.Target }{ {targetStr: "", want: resolver.Target{Scheme: "", Authority: "", Endpoint: ""}}, {targetStr: ":///", want: resolver.Target{Scheme: "", Authority: "", Endpoint: ""}}, @@ -70,10 +75,23 @@ func TestParseTargetString(t *testing.T) { {targetStr: "a:/b", want: resolver.Target{Scheme: "", Authority: "", Endpoint: "a:/b"}}, {targetStr: "a//b", want: resolver.Target{Scheme: "", Authority: "", Endpoint: "a//b"}}, {targetStr: "a://b", want: resolver.Target{Scheme: "", Authority: "", Endpoint: "a://b"}}, + + // Unix cases without custom dialer. + // unix:[local_path] and unix:[/absolute] have different behaviors with + // a custom dialer, to prevent behavior changes with custom dialers. + {targetStr: "unix:domain", want: resolver.Target{Scheme: "unix", Authority: "", Endpoint: "domain"}, wantWithDialer: resolver.Target{Scheme: "", Authority: "", Endpoint: "unix:domain"}}, + {targetStr: "unix:/domain", want: resolver.Target{Scheme: "unix", Authority: "", Endpoint: "/domain"}, wantWithDialer: resolver.Target{Scheme: "", Authority: "", Endpoint: "unix:/domain"}}, } { got := ParseTarget(test.targetStr, false) if got != test.want { - t.Errorf("ParseTarget(%q) = %+v, want %+v", test.targetStr, got, test.want) + t.Errorf("ParseTarget(%q, false) = %+v, want %+v", test.targetStr, got, test.want) + } + if test.wantWithDialer == (resolver.Target{}) { + test.wantWithDialer = test.want + } + got = ParseTarget(test.targetStr, true) + if got != test.wantWithDialer { + t.Errorf("ParseTarget(%q, true) = %+v, want %+v", test.targetStr, got, test.wantWithDialer) } } } diff --git a/internal/resolver/unix/unix.go b/internal/resolver/unix/unix.go index 3c59c533f10..64436b88ae1 100644 --- a/internal/resolver/unix/unix.go +++ b/internal/resolver/unix/unix.go @@ -20,7 +20,6 @@ package unix import ( - "google.golang.org/grpc/attributes" "google.golang.org/grpc/internal/transport/networktype" "google.golang.org/grpc/resolver" ) @@ -30,7 +29,7 @@ const scheme = "unix" type builder struct{} func (*builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { - cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(resolver.Address{Addr: target.Endpoint, Attributes: attributes.New()}, "unix")}}) + cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(resolver.Address{Addr: target.Endpoint}, "unix")}}) return &unixResolver{}, nil } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index bf8f027eabe..02e50431c40 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -142,14 +142,14 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error if fn != nil { return fn(ctx, addr.Addr) } - networkTypeStr := "tcp" - if networkType := networktype.Get(addr); networkType != nil { - networkTypeStr = networkType.T + networkType := "tcp" + if n, ok := networktype.Get(addr); ok { + networkType = n } - if networkTypeStr == "tcp" && useProxy { + if networkType == "tcp" && useProxy { proxyDial(ctx, addr.Addr, grpcUA) } - return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr) + return (&net.Dialer{}).DialContext(ctx, networkType, addr.Addr) } func isTemporary(err error) bool { diff --git a/internal/transport/networktype/networktype.go b/internal/transport/networktype/networktype.go index 98bfdf6f3ec..96967428b51 100644 --- a/internal/transport/networktype/networktype.go +++ b/internal/transport/networktype/networktype.go @@ -29,19 +29,18 @@ type keyType string const key = keyType("grpc.internal.transport.networktype") -// NetworkType contains the network type information. -type NetworkType struct { - T string -} - -// Set returns a copy of the provided address with attributes containing t. +// Set returns a copy of the provided address with attributes containing networkType. func Set(address resolver.Address, networkType string) resolver.Address { - address.Attributes = address.Attributes.WithValues(key, &NetworkType{networkType}) + address.Attributes = address.Attributes.WithValues(key, networkType) return address } -// Get returns the network type in the resolver.Address, or nil if not present. -func Get(address resolver.Address) *NetworkType { - t, _ := address.Attributes.Value(key).(*NetworkType) - return t +// Get returns the network type in the resolver.Address and true, or "", false +// if not present. +func Get(address resolver.Address) (string, bool) { + v := address.Attributes.Value(key) + if v == nil { + return "", false + } + return v.(string), true } diff --git a/internal/transport/proxy.go b/internal/transport/proxy.go index 2faed439eed..d8dc7159ef5 100644 --- a/internal/transport/proxy.go +++ b/internal/transport/proxy.go @@ -22,7 +22,6 @@ import ( "bufio" "context" "encoding/base64" - "errors" "fmt" "io" "net" @@ -33,11 +32,7 @@ import ( const proxyAuthHeaderKey = "Proxy-Authorization" -var proxyDialer = (&net.Dialer{}).DialContext - var ( - // errDisabled indicates that proxy is disabled for the address. - errDisabled = errors.New("proxy is disabled for the address") // The following variable will be overwritten in the tests. httpProxyFromEnvironment = http.ProxyFromEnvironment ) @@ -53,9 +48,6 @@ func mapAddress(ctx context.Context, address string) (*url.URL, error) { if err != nil { return nil, err } - if url == nil { - return nil, errDisabled - } return url, nil } @@ -121,18 +113,16 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri // is necessary, dial to the proxy with using proxyDialer, does HTTP CONNECT // handshake, and returns the connection. func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) { - var newAddr string + newAddr := addr proxyURL, err := mapAddress(ctx, addr) if err != nil { - if err != errDisabled { - return nil, err - } - newAddr = addr - } else { + return nil, err + } + if proxyURL != nil { newAddr = proxyURL.Host } - conn, err = proxyDialer(ctx, "tcp", newAddr) + conn, err = (&net.Dialer{}).DialContext(ctx, "tcp", newAddr) if err != nil { return } diff --git a/internal/transport/proxy_test.go b/internal/transport/proxy_test.go index 3acf49b66b0..628b1fddc49 100644 --- a/internal/transport/proxy_test.go +++ b/internal/transport/proxy_test.go @@ -138,14 +138,6 @@ func testHTTPConnect(t *testing.T, proxyURLModify func(*url.URL) *url.URL, proxy defer overwrite(hpfe)() // Dial to proxy server. - oldProxyDialer := proxyDialer - proxyDialer = func(ctx context.Context, string, addr string) (net.Conn, error) { - if deadline, ok := ctx.Deadline(); ok { - return net.DialTimeout("tcp", addr, time.Until(deadline)) - } - return net.Dial("tcp", addr) - } - defer func() { proxyDialer = oldProxyDialer }() ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() c, err := proxyDial(ctx, blis.Addr().String(), "test") diff --git a/test/authority_test.go b/test/authority_test.go index c013d26226d..11c3c4c1af8 100644 --- a/test/authority_test.go +++ b/test/authority_test.go @@ -86,19 +86,19 @@ func (s) TestUnix(t *testing.T) { authority string }{ { - name: "Unix1", + name: "UnixRelative", address: "sock.sock", target: "unix:sock.sock", authority: "localhost", }, { - name: "Unix2", + name: "UnixAbsolute", address: "/tmp/sock.sock", target: "unix:/tmp/sock.sock", authority: "localhost", }, { - name: "Unix3", + name: "UnixAbsoluteAlternate", address: "/tmp/sock.sock", target: "unix:///tmp/sock.sock", authority: "localhost", @@ -122,13 +122,13 @@ func (s) TestUnixCustomDialer(t *testing.T) { authority string }{ { - name: "Unix1", + name: "UnixRelative", address: "sock.sock", target: "unix:sock.sock", authority: "localhost", }, { - name: "Unix2", + name: "UnixAbsolute", address: "/tmp/sock.sock", target: "unix:/tmp/sock.sock", authority: "localhost", From 2a0040fe70ea73c2d7c99fb314006d2313c8a554 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Fri, 9 Oct 2020 15:20:52 -0700 Subject: [PATCH 09/13] test commit --- test/authority_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/authority_test.go b/test/authority_test.go index 11c3c4c1af8..de1d53cb589 100644 --- a/test/authority_test.go +++ b/test/authority_test.go @@ -86,7 +86,7 @@ func (s) TestUnix(t *testing.T) { authority string }{ { - name: "UnixRelative", + name: "UnixRelative1", address: "sock.sock", target: "unix:sock.sock", authority: "localhost", From 8fe0f4191433818d95103099b196ed2d73e23170 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Fri, 9 Oct 2020 15:21:36 -0700 Subject: [PATCH 10/13] Updating commit --- test/authority_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/authority_test.go b/test/authority_test.go index de1d53cb589..11c3c4c1af8 100644 --- a/test/authority_test.go +++ b/test/authority_test.go @@ -86,7 +86,7 @@ func (s) TestUnix(t *testing.T) { authority string }{ { - name: "UnixRelative1", + name: "UnixRelative", address: "sock.sock", target: "unix:sock.sock", authority: "localhost", From 1aa312336bad1a513e2df691ef78d60c71b05d18 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Mon, 12 Oct 2020 09:54:46 -0700 Subject: [PATCH 11/13] Bugfix --- internal/transport/http2_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 02e50431c40..6a4776bb153 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -147,7 +147,7 @@ func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error networkType = n } if networkType == "tcp" && useProxy { - proxyDial(ctx, addr.Addr, grpcUA) + return proxyDial(ctx, addr.Addr, grpcUA) } return (&net.Dialer{}).DialContext(ctx, networkType, addr.Addr) } From e10f5035ab4b57d2e8d79d02dceac60462b7180c Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Thu, 15 Oct 2020 13:35:43 -0700 Subject: [PATCH 12/13] unixResolver -> nopResolver --- internal/resolver/unix/unix.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/resolver/unix/unix.go b/internal/resolver/unix/unix.go index 64436b88ae1..d046e50613d 100644 --- a/internal/resolver/unix/unix.go +++ b/internal/resolver/unix/unix.go @@ -30,19 +30,19 @@ type builder struct{} func (*builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) { cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(resolver.Address{Addr: target.Endpoint}, "unix")}}) - return &unixResolver{}, nil + return &nopResolver{}, nil } func (*builder) Scheme() string { return scheme } -type unixResolver struct { +type nopResolver struct { } -func (*unixResolver) ResolveNow(resolver.ResolveNowOptions) {} +func (*nopResolver) ResolveNow(resolver.ResolveNowOptions) {} -func (*unixResolver) Close() {} +func (*nopResolver) Close() {} func init() { resolver.Register(&builder{}) From 5ddbc72708d363159d09783b9bba8400ecd1b58f Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Fri, 16 Oct 2020 11:14:45 -0700 Subject: [PATCH 13/13] Improved comments --- internal/grpcutil/target.go | 8 +++++--- internal/grpcutil/target_test.go | 9 +++++---- internal/transport/proxy.go | 4 ++-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/internal/grpcutil/target.go b/internal/grpcutil/target.go index d9c3b994f61..3e1b22f5a8c 100644 --- a/internal/grpcutil/target.go +++ b/internal/grpcutil/target.go @@ -37,15 +37,17 @@ func split2(s, sep string) (string, string, bool) { } // ParseTarget splits target into a resolver.Target struct containing scheme, -// authority and endpoint. +// authority and endpoint. skipUnixColonParsing indicates that the parse should +// not parse "unix:[path]" cases. This should be true in cases where a custom +// dialer is present, to prevent a behavior change. // // If target is not a valid scheme://authority/endpoint, it returns {Endpoint: // target}. -func ParseTarget(target string, hasDialer bool) (ret resolver.Target) { +func ParseTarget(target string, skipUnixColonParsing bool) (ret resolver.Target) { var ok bool ret.Scheme, ret.Endpoint, ok = split2(target, "://") if !ok { - if strings.HasPrefix(target, "unix:") && !hasDialer { + if strings.HasPrefix(target, "unix:") && !skipUnixColonParsing { // Handle the "unix:[path]" case, because splitting on :// only // handles the "unix://[/absolute/path]" case. Only handle if the // dialer is nil, to avoid a behavior change with custom dialers. diff --git a/internal/grpcutil/target_test.go b/internal/grpcutil/target_test.go index 8782d43a175..562390bfe38 100644 --- a/internal/grpcutil/target_test.go +++ b/internal/grpcutil/target_test.go @@ -86,12 +86,13 @@ func TestParseTargetString(t *testing.T) { if got != test.want { t.Errorf("ParseTarget(%q, false) = %+v, want %+v", test.targetStr, got, test.want) } - if test.wantWithDialer == (resolver.Target{}) { - test.wantWithDialer = test.want + wantWithDialer := test.wantWithDialer + if wantWithDialer == (resolver.Target{}) { + wantWithDialer = test.want } got = ParseTarget(test.targetStr, true) - if got != test.wantWithDialer { - t.Errorf("ParseTarget(%q, true) = %+v, want %+v", test.targetStr, got, test.wantWithDialer) + if got != wantWithDialer { + t.Errorf("ParseTarget(%q, true) = %+v, want %+v", test.targetStr, got, wantWithDialer) } } } diff --git a/internal/transport/proxy.go b/internal/transport/proxy.go index d8dc7159ef5..a662bf39a6c 100644 --- a/internal/transport/proxy.go +++ b/internal/transport/proxy.go @@ -110,8 +110,8 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri } // proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy -// is necessary, dial to the proxy with using proxyDialer, does HTTP CONNECT -// handshake, and returns the connection. +// is necessary, dials, does the HTTP CONNECT handshake, and returns the +// connection. func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) { newAddr := addr proxyURL, err := mapAddress(ctx, addr)