Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: implement support for "unix" resolver scheme #3890

Merged
merged 18 commits into from Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion balancer/rls/internal/config.go
Expand Up @@ -201,7 +201,7 @@ func (*rlsBB) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig,
if lookupService == "" {
return nil, fmt.Errorf("rls: empty lookup_service in service config {%+v}", string(c))
}
parsedTarget := grpcutil.ParseTarget(lookupService)
parsedTarget := grpcutil.ParseTarget(lookupService, false)
if parsedTarget.Scheme == "" {
parsedTarget.Scheme = resolver.GetDefaultScheme()
}
Expand Down
17 changes: 3 additions & 14 deletions clientconn.go
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"math"
"net"
"reflect"
"strings"
"sync"
Expand All @@ -48,6 +47,7 @@ import (
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
)

const (
Expand Down Expand Up @@ -191,16 +191,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.mkp = cc.dopts.copts.KeepaliveParams

if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
network, addr := parseDialTarget(addr)
return (&net.Dialer{}).DialContext(ctx, network, addr)
}
if cc.dopts.withProxy {
cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
}
}

if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
Expand Down Expand Up @@ -244,8 +234,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}

// Determine the resolver to use.
cc.parsedTarget = grpcutil.ParseTarget(cc.target)
unixScheme := strings.HasPrefix(cc.target, "unix:")
cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
channelz.Infof(logger, cc.channelzID, "parsed scheme: %q", cc.parsedTarget.Scheme)
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
if resolverBuilder == nil {
Expand All @@ -268,7 +257,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.authority != "" {
cc.authority = cc.dopts.authority
} else if unixScheme {
} else if strings.HasPrefix(cc.target, "unix:") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to come from the resolver (with resolver.Address - is this the same as ServerName? @menghanl?). It can be done in a follow-up change but this is not what we want to be doing, ideally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't do that.

The :authority header doesn't come from resolver.Address.ServerName:

headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})

But the ClientHandshake does:

conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)

All the authority thing needs a cleanup..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is what I thought and what I was worried about. The resolver needs a way to set the authority for addresses. I think ServerName is the right way to do that -- WDYT? Do we want a different handshaker name from authority header value?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they should be the same. For security reason?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed; I think we can consider it a bug that we are not using the address's ServerName for the authority header. But this can be done in a follow-up PR.

cc.authority = "localhost"
} else {
// Use endpoint from "scheme://authority/endpoint" as the default
Expand Down
5 changes: 2 additions & 3 deletions dialoptions.go
Expand Up @@ -71,7 +71,6 @@ type dialOptions struct {
// we need to be able to configure this in tests.
resolveNowBackoff func(int) time.Duration
resolvers []resolver.Builder
withProxy bool
}

// DialOption configures how we set up the connection.
Expand Down Expand Up @@ -325,7 +324,7 @@ func WithInsecure() DialOption {
// later release.
func WithNoProxy() DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.withProxy = false
o.copts.UseProxy = false
})
}

Expand Down Expand Up @@ -595,9 +594,9 @@ func defaultDialOptions() dialOptions {
copts: transport.ConnectOptions{
WriteBufferSize: defaultWriteBufSize,
ReadBufferSize: defaultReadBufSize,
UseProxy: false,
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
},
resolveNowBackoff: internalbackoff.DefaultExponential.Backoff,
withProxy: true,
}
}

Expand Down
13 changes: 12 additions & 1 deletion internal/grpcutil/target.go
Expand Up @@ -41,15 +41,26 @@ func split2(s, sep string) (string, string, bool) {
//
// If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
// target}.
func ParseTarget(target string) (ret resolver.Target) {
func ParseTarget(target string, hasDialer bool) (ret resolver.Target) {
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
var ok bool
ret.Scheme, ret.Endpoint, ok = split2(target, "://")
if !ok {
if strings.HasPrefix(target, "unix:") && !hasDialer {
// Handle the "unix:[path]" case, because grpcutil.ParseTarget()
// only handles the "unix://[/absolute/path]" case. Only handle if
// the dialer is nil, to avoid a behavior change with custom dialers.
return resolver.Target{Scheme: "unix", Endpoint: target[len("unix:"):]}
}
return resolver.Target{Endpoint: target}
}
ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
if !ok {
return resolver.Target{Endpoint: target}
}
if ret.Scheme == "unix" {
// Add the "/" back in the unix case, so the unix resolver receives the
// actual endpoint.
ret.Endpoint = "/" + ret.Endpoint
}
return ret
}
4 changes: 2 additions & 2 deletions internal/grpcutil/target_test.go
Expand Up @@ -32,7 +32,7 @@ func TestParseTarget(t *testing.T) {
{Scheme: "passthrough", Authority: "", Endpoint: "/unix/socket/address"},
} {
str := test.Scheme + "://" + test.Authority + "/" + test.Endpoint
got := ParseTarget(str)
got := ParseTarget(str, false)
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
if got != test {
t.Errorf("ParseTarget(%q) = %+v, want %+v", str, got, test)
}
Expand Down Expand Up @@ -71,7 +71,7 @@ func TestParseTargetString(t *testing.T) {
{targetStr: "a//b", want: resolver.Target{Scheme: "", Authority: "", Endpoint: "a//b"}},
{targetStr: "a://b", want: resolver.Target{Scheme: "", Authority: "", Endpoint: "a://b"}},
} {
got := ParseTarget(test.targetStr)
got := ParseTarget(test.targetStr, false)
if got != test.want {
t.Errorf("ParseTarget(%q) = %+v, want %+v", test.targetStr, got, test.want)
}
Expand Down
50 changes: 50 additions & 0 deletions internal/resolver/unix/unix.go
@@ -0,0 +1,50 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package unix implements a resolver for unix targets.
package unix

import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/resolver"
)

const scheme = "unix"

type builder struct{}

func (*builder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
cc.UpdateState(resolver.State{Addresses: []resolver.Address{networktype.Set(resolver.Address{Addr: target.Endpoint, Attributes: attributes.New()}, "unix")}})
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
return &unixResolver{}, nil
}

func (*builder) Scheme() string {
return scheme
}

type unixResolver struct {
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
}

func (*unixResolver) ResolveNow(resolver.ResolveNowOptions) {}

func (*unixResolver) Close() {}

func init() {
resolver.Register(&builder{})
}
16 changes: 12 additions & 4 deletions internal/transport/http2_client.go
Expand Up @@ -33,6 +33,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/transport/networktype"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -137,11 +138,18 @@ type http2Client struct {
connectionID uint64
}

func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
return fn(ctx, addr.Addr)
}
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
networkTypeStr := "tcp"
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
if networkType := networktype.Get(addr); networkType != nil {
networkTypeStr = networkType.T
}
if networkTypeStr == "tcp" && useProxy {
proxyDial(ctx, addr.Addr, grpcUA)
}
return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr)
}

func isTemporary(err error) bool {
Expand Down Expand Up @@ -172,7 +180,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}()

conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
Expand Down
47 changes: 47 additions & 0 deletions internal/transport/networktype/networktype.go
@@ -0,0 +1,47 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package networktype declares the network type to be used in the default
// dailer. Attribute of a resolver.Address.
package networktype

import (
"google.golang.org/grpc/resolver"
)

// keyType is the key to use for storing State in Attributes.
type keyType string

const key = keyType("grpc.internal.transport.networktype")

// NetworkType contains the network type information.
type NetworkType struct {
T string
}
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved

// Set returns a copy of the provided address with attributes containing t.
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
func Set(address resolver.Address, networkType string) resolver.Address {
address.Attributes = address.Attributes.WithValues(key, &NetworkType{networkType})
return address
}

// Get returns the network type in the resolver.Address, or nil if not present.
func Get(address resolver.Address) *NetworkType {
t, _ := address.Attributes.Value(key).(*NetworkType)
return t
}
48 changes: 24 additions & 24 deletions proxy.go → internal/transport/proxy.go
Expand Up @@ -16,7 +16,7 @@
*
*/

package grpc
package transport

import (
"bufio"
Expand All @@ -33,6 +33,8 @@ import (

const proxyAuthHeaderKey = "Proxy-Authorization"

var proxyDialer = (&net.Dialer{}).DialContext

var (
// errDisabled indicates that proxy is disabled for the address.
errDisabled = errors.New("proxy is disabled for the address")
Expand Down Expand Up @@ -76,7 +78,7 @@ func basicAuth(username, password string) string {
return base64.StdEncoding.EncodeToString([]byte(auth))
}

func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL) (_ net.Conn, err error) {
func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL, grpcUA string) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close()
Expand Down Expand Up @@ -115,32 +117,30 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
return &bufConn{Conn: conn, r: r}, nil
}

// newProxyDialer returns a dialer that connects to proxy first if necessary.
// The returned dialer checks if a proxy is necessary, dial to the proxy with the
// provided dialer, does HTTP CONNECT handshake and returns the connection.
func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (conn net.Conn, err error) {
var newAddr string
proxyURL, err := mapAddress(ctx, addr)
if err != nil {
if err != errDisabled {
return nil, err
}
newAddr = addr
} else {
newAddr = proxyURL.Host
// proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy
// is necessary, dial to the proxy with using proxyDialer, does HTTP CONNECT
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
// handshake, and returns the connection.
func proxyDial(ctx context.Context, addr string, grpcUA string) (conn net.Conn, err error) {
var newAddr string
proxyURL, err := mapAddress(ctx, addr)
if err != nil {
if err != errDisabled {
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
return nil, err
}
newAddr = addr
} else {
newAddr = proxyURL.Host
}

conn, err = dialer(ctx, newAddr)
if err != nil {
return
}
if proxyURL != nil {
// proxy is disabled if proxyURL is nil.
conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL)
}
conn, err = proxyDialer(ctx, "tcp", newAddr)
if err != nil {
return
}
if proxyURL != nil {
// proxy is disabled if proxyURL is nil.
conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
}
return
}

func sendHTTPRequest(ctx context.Context, req *http.Request, conn net.Conn) error {
Expand Down