Skip to content

Commit

Permalink
Merge pull request #13359 from serathius/authority
Browse files Browse the repository at this point in the history
Fix http2 authority header in single endpoint scenario
  • Loading branch information
ptabor committed Sep 29, 2021
2 parents 3df2727 + d52d7fc commit f3cfe0f
Show file tree
Hide file tree
Showing 38 changed files with 853 additions and 212 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.5.1) and

- Fix [self-signed-cert-validity parameter cannot be specified in the config file](https://github.com/etcd-io/etcd/pull/13237).

### etcd client

- [Fix etcd client sends invalid :authority header](https://github.com/etcd-io/etcd/issues/13192)

### package clientv3

- Endpoints self identify now as `etcd-endpoints://{id}/{authority}` where authority is based on first endpoint passed, for example `etcd-endpoints://0xc0009d8540/localhost:2079`

<hr>

Expand Down
18 changes: 15 additions & 3 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,28 @@ func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.
dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
defer cancel() // TODO: Is this right for cases where grpc.WithBlock() is not set on the dial options?
}

initialEndpoints := strings.Join(c.Endpoints(), ";")
target := fmt.Sprintf("%s://%p/#initially=[%s]", resolver.Schema, c, initialEndpoints)
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.endpoints[0]))
conn, err := grpc.DialContext(dctx, target, opts...)
if err != nil {
return nil, err
}
return conn, nil
}

func authority(endpoint string) string {
spl := strings.SplitN(endpoint, "://", 2)
if len(spl) < 2 {
if strings.HasPrefix(endpoint, "unix:") {
return endpoint[len("unix:"):]
}
if strings.HasPrefix(endpoint, "unixs:") {
return endpoint[len("unixs:"):]
}
return endpoint
}
return spl[1]
}

func (c *Client) credentialsForEndpoint(ep string) grpccredentials.TransportCredentials {
r := endpoint.RequiresCredentials(ep)
switch r {
Expand Down
69 changes: 69 additions & 0 deletions pkg/grpc_testing/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2021 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc_testing

import (
"context"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

type GrpcRecorder struct {
mux sync.RWMutex
requests []RequestInfo
}

type RequestInfo struct {
FullMethod string
Authority string
}

func (ri *GrpcRecorder) UnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ri.record(toRequestInfo(ctx, info))
resp, err := handler(ctx, req)
return resp, err
}
}

func (ri *GrpcRecorder) RecordedRequests() []RequestInfo {
ri.mux.RLock()
defer ri.mux.RUnlock()
reqs := make([]RequestInfo, len(ri.requests))
copy(reqs, ri.requests)
return reqs
}

func toRequestInfo(ctx context.Context, info *grpc.UnaryServerInfo) RequestInfo {
req := RequestInfo{
FullMethod: info.FullMethod,
}
md, ok := metadata.FromIncomingContext(ctx)
if ok {
as := md.Get(":authority")
if len(as) != 0 {
req.Authority = as[0]
}
}
return req
}

func (ri *GrpcRecorder) record(r RequestInfo) {
ri.mux.Lock()
defer ri.mux.Unlock()
ri.requests = append(ri.requests, r)
}
2 changes: 1 addition & 1 deletion server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ func (e *Etcd) servePeers() (err error) {

for _, p := range e.Peers {
u := p.Listener.Addr().String()
gs := v3rpc.Server(e.Server, peerTLScfg)
gs := v3rpc.Server(e.Server, peerTLScfg, nil)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Expand Down
4 changes: 2 additions & 2 deletions server/embed/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (sctx *serveCtx) serve(
}()

if sctx.insecure {
gs = v3rpc.Server(s, nil, gopts...)
gs = v3rpc.Server(s, nil, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down Expand Up @@ -148,7 +148,7 @@ func (sctx *serveCtx) serve(
if tlsErr != nil {
return tlsErr
}
gs = v3rpc.Server(s, tlscfg, gopts...)
gs = v3rpc.Server(s, tlscfg, nil, gopts...)
v3electionpb.RegisterElectionServer(gs, servElection)
v3lockpb.RegisterLockServer(gs, servLock)
if sctx.serviceRegister != nil {
Expand Down
6 changes: 4 additions & 2 deletions server/etcdserver/api/v3rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,21 @@ const (
maxSendBytes = math.MaxInt32
)

func Server(s *etcdserver.EtcdServer, tls *tls.Config, gopts ...grpc.ServerOption) *grpc.Server {
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
}

chainUnaryInterceptors := []grpc.UnaryServerInterceptor{
newLogUnaryInterceptor(s),
newUnaryInterceptor(s),
grpc_prometheus.UnaryServerInterceptor,
}
if interceptor != nil {
chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor)
}

chainStreamInterceptors := []grpc.StreamServerInterceptor{
newStreamInterceptor(s),
Expand Down
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerIntercepto
startTime := time.Now()
resp, err := handler(ctx, req)
lg := s.Logger()
if lg != nil { // acquire stats if debug level is enabled or request is expensive
if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive
defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp)
}
return resp, err
Expand Down
4 changes: 4 additions & 0 deletions tests/e2e/cluster_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
return p.etcdProc.WithStopSignal(sig)
}

func (p *proxyEtcdProcess) Logs() logsExpect {
return p.etcdProc.Logs()
}

type proxyProc struct {
lg *zap.Logger
execPath string
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type etcdProcessClusterConfig struct {
execPath string
dataDirPath string
keepDataDir bool
envVars map[string]string

clusterSize int

Expand Down Expand Up @@ -318,6 +319,7 @@ func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs(tb testing.TB) []*
lg: lg,
execPath: cfg.execPath,
args: args,
envVars: cfg.envVars,
tlsArgs: cfg.tlsArgs(),
dataDirPath: dataDirPath,
keepDataDir: cfg.keepDataDir,
Expand Down

0 comments on commit f3cfe0f

Please sign in to comment.