Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: implement support for "unix" resolver scheme #3890

Merged
merged 18 commits into from Oct 16, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 3 additions & 10 deletions clientconn.go
Expand Up @@ -23,7 +23,6 @@ import (
"errors"
"fmt"
"math"
"net"
"reflect"
"strings"
"sync"
Expand All @@ -48,6 +47,7 @@ import (
_ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
_ "google.golang.org/grpc/internal/resolver/dns" // To register dns resolver.
_ "google.golang.org/grpc/internal/resolver/passthrough" // To register passthrough resolver.
_ "google.golang.org/grpc/internal/resolver/unix" // To register unix resolver.
)

const (
Expand Down Expand Up @@ -191,21 +191,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
cc.mkp = cc.dopts.copts.KeepaliveParams

if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
network, addr := parseDialTarget(addr)
return (&net.Dialer{}).DialContext(ctx, network, addr)
}
if cc.dopts.withProxy {
cc.dopts.copts.Dialer = newProxyDialer(cc.dopts.copts.Dialer)
}
}
cc.dopts.copts.UseProxy = cc.dopts.withProxy
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved

if cc.dopts.copts.UserAgent != "" {
cc.dopts.copts.UserAgent += " " + grpcUA
} else {
cc.dopts.copts.UserAgent = grpcUA
}
cc.dopts.copts.GrpcUA = grpcUA

if cc.dopts.timeout > 0 {
var cancel context.CancelFunc
Expand Down
59 changes: 59 additions & 0 deletions internal/resolver/unix/unix.go
@@ -0,0 +1,59 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package unix implements a resolver for unix targets.
package unix

import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)

const scheme = "unix"

type unixBuilder struct{}
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved

func (*unixBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
r := &unixResolver{
target: target,
cc: cc,
}
r.start()
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
return r, nil
}

func (*unixBuilder) Scheme() string {
return scheme
}

type unixResolver struct {
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
target resolver.Target
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
cc resolver.ClientConn
}

func (r *unixResolver) start() {
r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "/" + r.target.Endpoint, Attributes: attributes.New("network_type", "unix")}}})
}

func (*unixResolver) ResolveNow(o resolver.ResolveNowOptions) {}
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved

func (*unixResolver) Close() {}

func init() {
resolver.Register(&unixBuilder{})
}
22 changes: 17 additions & 5 deletions internal/transport/http2_client.go
Expand Up @@ -137,11 +137,23 @@ type http2Client struct {
connectionID uint64
}

func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
if fn != nil {
return fn(ctx, addr)
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
if fn == nil {
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
fn = func(fCtx context.Context, fAddr string) (net.Conn, error) {
if networkType := addr.Attributes.Value("network_type"); networkType != nil {
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
if networkTypeStr, ok := networkType.(string); ok {
return (&net.Dialer{}).DialContext(ctx, networkTypeStr, addr.Addr)
}
return nil, fmt.Errorf("network_type %v not of type string", networkType)
}
network, fAddr := parseDialTarget(fAddr)
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
return (&net.Dialer{}).DialContext(ctx, network, fAddr)
}
}
if useProxy {
fn = newProxyDialer(fn, grpcUA)
}
return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
return fn(ctx, addr.Addr)
}

func isTemporary(err error) bool {
Expand Down Expand Up @@ -172,7 +184,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}()

conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.GrpcUA)
if err != nil {
if opts.FailOnNonTempDialError {
return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
Expand Down
35 changes: 35 additions & 0 deletions internal/transport/http_util.go
Expand Up @@ -27,6 +27,7 @@ import (
"math"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -598,3 +599,37 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, maxHeaderList
f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
return f
}

// parseDialTarget returns the network and address to pass to dialer
func parseDialTarget(target string) (net string, addr string) {
net = "tcp"

m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")

// handle unix:addr which will fail with url.Parse
if m1 >= 0 && m2 < 0 {
if n := target[0:m1]; n == "unix" {
net = n
addr = target[m1+1:]
return net, addr
}
}
if m2 >= 0 {
t, err := url.Parse(target)
if err != nil {
return net, target
}
scheme := t.Scheme
addr = t.Path
if scheme == "unix" {
net = scheme
if addr == "" {
addr = t.Host
}
return net, addr
}
}

return net, target
}
21 changes: 21 additions & 0 deletions internal/transport/http_util_test.go
Expand Up @@ -250,3 +250,24 @@ func (s) TestDecodeHeaderH2ErrCode(t *testing.T) {
})
}
}

func (s) TestParseDialTarget(t *testing.T) {
for _, test := range []struct {
target, wantNet, wantAddr string
}{
{"unix:etcd:0", "unix", "etcd:0"},
{"unix:///tmp/unix-3", "unix", "/tmp/unix-3"},
{"unix://domain", "unix", "domain"},
{"unix://etcd:0", "unix", "etcd:0"},
{"unix:///etcd:0", "unix", "/etcd:0"},
{"passthrough://unix://domain", "tcp", "passthrough://unix://domain"},
{"https://google.com:443", "tcp", "https://google.com:443"},
{"dns:///google.com", "tcp", "dns:///google.com"},
{"/unix/socket/address", "tcp", "/unix/socket/address"},
} {
gotNet, gotAddr := parseDialTarget(test.target)
if gotNet != test.wantNet || gotAddr != test.wantAddr {
t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr)
}
}
}
8 changes: 4 additions & 4 deletions proxy.go → internal/transport/proxy.go
Expand Up @@ -16,7 +16,7 @@
*
*/

package grpc
package transport

import (
"bufio"
Expand Down Expand Up @@ -76,7 +76,7 @@ func basicAuth(username, password string) string {
return base64.StdEncoding.EncodeToString([]byte(auth))
}

func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL) (_ net.Conn, err error) {
func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr string, proxyURL *url.URL, grpcUA string) (_ net.Conn, err error) {
defer func() {
if err != nil {
conn.Close()
Expand Down Expand Up @@ -118,7 +118,7 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
// newProxyDialer returns a dialer that connects to proxy first if necessary.
// The returned dialer checks if a proxy is necessary, dial to the proxy with the
// provided dialer, does HTTP CONNECT handshake and returns the connection.
func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func(context.Context, string) (net.Conn, error) {
func newProxyDialer(dialer func(context.Context, string) (net.Conn, error), grpcUA string) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (conn net.Conn, err error) {
var newAddr string
proxyURL, err := mapAddress(ctx, addr)
Expand All @@ -137,7 +137,7 @@ func newProxyDialer(dialer func(context.Context, string) (net.Conn, error)) func
}
if proxyURL != nil {
// proxy is disabled if proxyURL is nil.
conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL)
conn, err = doHTTPConnectHandshake(ctx, conn, addr, proxyURL, grpcUA)
}
return
}
Expand Down
10 changes: 2 additions & 8 deletions proxy_test.go → internal/transport/proxy_test.go
Expand Up @@ -18,7 +18,7 @@
*
*/

package grpc
package transport

import (
"bufio"
Expand Down Expand Up @@ -143,7 +143,7 @@ func testHTTPConnect(t *testing.T, proxyURLModify func(*url.URL) *url.URL, proxy
return net.DialTimeout("tcp", addr, time.Until(deadline))
}
return net.Dial("tcp", addr)
})
}, "test")
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
c, err := dialer(ctx, blis.Addr().String())
Expand Down Expand Up @@ -173,9 +173,6 @@ func (s) TestHTTPConnect(t *testing.T) {
if req.Method != http.MethodConnect {
return fmt.Errorf("unexpected Method %q, want %q", req.Method, http.MethodConnect)
}
if req.UserAgent() != grpcUA {
return fmt.Errorf("unexpect user agent %q, want %q", req.UserAgent(), grpcUA)
}
return nil
},
)
Expand All @@ -195,9 +192,6 @@ func (s) TestHTTPConnectBasicAuth(t *testing.T) {
if req.Method != http.MethodConnect {
return fmt.Errorf("unexpected Method %q, want %q", req.Method, http.MethodConnect)
}
if req.UserAgent() != grpcUA {
return fmt.Errorf("unexpect user agent %q, want %q", req.UserAgent(), grpcUA)
}
wantProxyAuthStr := "Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+password))
if got := req.Header.Get(proxyAuthHeaderKey); got != wantProxyAuthStr {
gotDecoded, _ := base64.StdEncoding.DecodeString(got)
Expand Down
4 changes: 4 additions & 0 deletions internal/transport/transport.go
Expand Up @@ -569,6 +569,10 @@ type ConnectOptions struct {
ChannelzParentID int64
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.
UseProxy bool
// GrpcUA
GrpcUA string
GarrettGutierrez1 marked this conversation as resolved.
Show resolved Hide resolved
}

// NewClientTransport establishes the transport with the required ConnectOptions
Expand Down
3 changes: 0 additions & 3 deletions resolver_conn_wrapper_test.go
Expand Up @@ -45,9 +45,6 @@ func (s) TestDialParseTargetUnknownScheme(t *testing.T) {
}{
{"/unix/socket/address", "/unix/socket/address"},

// Special test for "unix:///".
{"unix:///unix/socket/address", "unix:///unix/socket/address"},

// For known scheme.
{"passthrough://a.server.com/google.com", "google.com"},
} {
Expand Down
35 changes: 0 additions & 35 deletions rpc_util.go
Expand Up @@ -27,7 +27,6 @@ import (
"io"
"io/ioutil"
"math"
"net/url"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -817,40 +816,6 @@ func setCallInfoCodec(c *callInfo) error {
return nil
}

// parseDialTarget returns the network and address to pass to dialer
func parseDialTarget(target string) (net string, addr string) {
net = "tcp"

m1 := strings.Index(target, ":")
m2 := strings.Index(target, ":/")

// handle unix:addr which will fail with url.Parse
if m1 >= 0 && m2 < 0 {
if n := target[0:m1]; n == "unix" {
net = n
addr = target[m1+1:]
return net, addr
}
}
if m2 >= 0 {
t, err := url.Parse(target)
if err != nil {
return net, target
}
scheme := t.Scheme
addr = t.Path
if scheme == "unix" {
net = scheme
if addr == "" {
addr = t.Host
}
return net, addr
}
}

return net, target
}

// channelzData is used to store channelz related data for ClientConn, addrConn and Server.
// These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic
// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
Expand Down
21 changes: 0 additions & 21 deletions rpc_util_test.go
Expand Up @@ -191,27 +191,6 @@ func (s) TestToRPCErr(t *testing.T) {
}
}

func (s) TestParseDialTarget(t *testing.T) {
for _, test := range []struct {
target, wantNet, wantAddr string
}{
{"unix:etcd:0", "unix", "etcd:0"},
{"unix:///tmp/unix-3", "unix", "/tmp/unix-3"},
{"unix://domain", "unix", "domain"},
{"unix://etcd:0", "unix", "etcd:0"},
{"unix:///etcd:0", "unix", "/etcd:0"},
{"passthrough://unix://domain", "tcp", "passthrough://unix://domain"},
{"https://google.com:443", "tcp", "https://google.com:443"},
{"dns:///google.com", "tcp", "dns:///google.com"},
{"/unix/socket/address", "tcp", "/unix/socket/address"},
} {
gotNet, gotAddr := parseDialTarget(test.target)
if gotNet != test.wantNet || gotAddr != test.wantAddr {
t.Errorf("parseDialTarget(%q) = %s, %s want %s, %s", test.target, gotNet, gotAddr, test.wantNet, test.wantAddr)
}
}
}

// bmEncode benchmarks encoding a Protocol Buffer message containing mSize
// bytes.
func bmEncode(b *testing.B, mSize int) {
Expand Down