Skip to content

Commit

Permalink
rpc: install custom proto codec
Browse files Browse the repository at this point in the history
This change enables future upgrades of google.golang.org/grpc.

Recent changes to google.golang.org/grpc have changed the default
proto codec such that it is incompatible with gogoproto:

grpc/grpc-go#3958

While `proto.Marshal` does do a check for the Marshal/Unmarshal it
appears to do this through reflection methods that will panic on
fields using gogoproto.nullable.

To avoid this, we install a proto codec that does an interface check
before dispatching to proto.Marshal/proto.Unmarshal.

Note that I haven't replicated the protobuffer caching that previously
existed in google.golang.org/grpc. The referenced change claims
removing the caching may be a small performance win; however, they
also removed the typecheck that we need to do, so it is unclear.

Fixes cockroachdb#60531

Release note: None
  • Loading branch information
stevendanna committed Mar 23, 2021
1 parent 3f9f18e commit 5ba160f
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 4 deletions.
7 changes: 6 additions & 1 deletion pkg/rpc/BUILD.bazel
Expand Up @@ -9,6 +9,7 @@ go_library(
"auth_tenant.go",
"breaker.go",
"clock_offset.go",
"codec.go",
"connection_class.go",
"context.go",
"context_testutils.go",
Expand Down Expand Up @@ -49,6 +50,8 @@ go_library(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_facebookgo_clock//:clock",
"@com_github_gogo_protobuf//proto",
"@com_github_golang_protobuf//proto:go_default_library",
"@com_github_golang_snappy//:snappy",
"@com_github_montanaflynn_stats//:stats",
"@com_github_vividcortex_ewma//:ewma",
Expand All @@ -57,7 +60,6 @@ go_library(
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//encoding",
"@org_golang_google_grpc//encoding/proto",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//peer",
Expand All @@ -73,6 +75,7 @@ go_test(
srcs = [
"auth_test.go",
"clock_offset_test.go",
"codec_test.go",
"context_test.go",
"heartbeat_test.go",
"main_test.go",
Expand Down Expand Up @@ -105,9 +108,11 @@ go_test(
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_raft_v3//raftpb",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//health/grpc_health_v1",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//peer",
Expand Down
48 changes: 48 additions & 0 deletions pkg/rpc/codec.go
@@ -0,0 +1,48 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rpc

import (
"github.com/gogo/protobuf/proto"
// Used instead of gogo/protobuf/proto for the fallback case
// to match the behavior of the upstream codec in
// google.golang.org/grpc/encoding/proto that we are
// replacing:
//
// https://github.com/grpc/grpc-go/blob/7b167fd6eca1ab8f05ec14085d63197cacd41438/encoding/proto/proto.go
//
gproto "github.com/golang/protobuf/proto"
"google.golang.org/grpc/encoding"
)

const name = "proto"

type codec struct{}

var _ encoding.Codec = codec{}

func (codec) Marshal(v interface{}) ([]byte, error) {
if pm, ok := v.(proto.Marshaler); ok {
return pm.Marshal()
}
return gproto.Marshal(v.(gproto.Message))
}

func (codec) Unmarshal(data []byte, v interface{}) error {
if pm, ok := v.(proto.Unmarshaler); ok {
return pm.Unmarshal(data)
}
return gproto.Unmarshal(data, v.(gproto.Message))
}

func (codec) Name() string {
return name
}
98 changes: 98 additions & 0 deletions pkg/rpc/codec_test.go
@@ -0,0 +1,98 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rpc

import (
"reflect"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/v3/raftpb"
"google.golang.org/grpc/health/grpc_health_v1"
)

func TestCodecMarshalUnmarshal(t *testing.T) {
defer leaktest.AfterTest(t)()

testCodec := codec{}
for _, test := range []struct {
name string
filledMsgBuilder func() interface{}
emptyMsgBuilder func() interface{}
}{
{"rpc.PingRequest",
func() interface{} { return &PingRequest{Ping: "pong"} },
func() interface{} { return &PingRequest{} }},
{"raftpb.Message",
func() interface{} {
return &raftpb.Message{
To: 531,
From: 550,
}
},
func() interface{} { return &raftpb.Message{} }},
{"grpc_health_v1.HealthCheckRequest",
func() interface{} {
return &grpc_health_v1.HealthCheckRequest{
Service: "wombats",
}
},
func() interface{} { return &grpc_health_v1.HealthCheckRequest{} }},
{"roachpb.GetRequest",
func() interface{} {
return &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{
Key: roachpb.Key("turtle"),
},
}
},
func() interface{} { return &roachpb.GetRequest{} }},
} {
t.Run(test.name, func(t *testing.T) {
input := test.filledMsgBuilder()
marshaled, err := testCodec.Marshal(input)
require.NoError(t, err, "marshal failed")
output := test.emptyMsgBuilder()
err = testCodec.Unmarshal(marshaled, output)
require.NoError(t, err, "unmarshal failed")
// reflect.DeepEqual/require.Equal can fail
// because of XXX_sizecache fields
//
// google's proto Equal doesn't understand all
// gogoproto generated types and panics.
//
// gogoproto's proto Equal fails because of
// https://github.com/gogo/protobuf/issues/13
//
// Here, we zero any top-level fields that
// start with XXX_ and then use require.Equal
// (which uses require.DeepEqual). I doubt
// this would work for the general case, but
// it works for the protobufs tested here.
zeroXXXFields(input)
zeroXXXFields(output)
require.Equal(t, input, output)
})
}
}

func zeroXXXFields(v interface{}) {
val := reflect.Indirect(reflect.ValueOf(v))
typ := val.Type()
for i := 0; i < val.NumField(); i++ {
if strings.HasPrefix(typ.Field(i).Name, "XXX_") {
val.Field(i).Set(reflect.Zero(val.Field(i).Type()))
}
}
}
4 changes: 1 addition & 3 deletions pkg/rpc/context.go
Expand Up @@ -46,7 +46,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
encodingproto "google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -794,8 +793,7 @@ func (c growStackCodec) Unmarshal(data []byte, v interface{}) error {
// Install the growStackCodec over the default proto codec in order to grow the
// stack for BatchRequest RPCs prior to unmarshaling.
func init() {
protoCodec := encoding.GetCodec(encodingproto.Name)
encoding.RegisterCodec(growStackCodec{Codec: protoCodec})
encoding.RegisterCodec(growStackCodec{Codec: codec{}})
}

// onlyOnceDialer implements the grpc.WithDialer interface but only
Expand Down
7 changes: 7 additions & 0 deletions pkg/testutils/lint/lint_test.go
Expand Up @@ -998,6 +998,8 @@ func TestLint(t *testing.T) {
":!sql/*.pb.go",
":!util/protoutil/marshal.go",
":!util/protoutil/marshaler.go",
":!rpc/codec.go",
":!rpc/codec_test.go",
":!settings/settings_test.go",
":!sql/types/types_jsonpb.go",
":!sql/schemachanger/scbuild/builder_test.go",
Expand Down Expand Up @@ -1042,6 +1044,8 @@ func TestLint(t *testing.T) {
":!util/protoutil/marshaler.go",
":!util/encoding/encoding.go",
":!util/hlc/timestamp.go",
":!rpc/codec.go",
":!rpc/codec_test.go",
":!sql/types/types_jsonpb.go",
)
if err != nil {
Expand Down Expand Up @@ -1481,6 +1485,7 @@ func TestLint(t *testing.T) {
stream.GrepNot(`cockroach/pkg/util/log: github\.com/pkg/errors$`),
stream.GrepNot(`cockroach/pkg/(base|release|security|util/(log|randutil|stop)): log$`),
stream.GrepNot(`cockroach/pkg/(server/serverpb|ts/tspb): github\.com/golang/protobuf/proto$`),
stream.GrepNot(`cockroachdb/cockroach/pkg/rpc: github\.com/golang/protobuf/proto$`),
stream.GrepNot(`cockroachdb/cockroach/pkg/sql/lex/allkeywords: log$`),
stream.GrepNot(`cockroachdb/cockroach/pkg/util/timeutil/gen: log$`),
stream.GrepNot(`cockroachdb/cockroach/pkg/roachpb/gen: log$`),
Expand Down Expand Up @@ -1649,6 +1654,8 @@ func TestLint(t *testing.T) {
stream.GrepNot(`pkg/.*.go:.* func .*\.Cause is unused`),
// Using deprecated WireLength call.
stream.GrepNot(`pkg/rpc/stats_handler.go:.*v.WireLength is deprecated: This field is never set.*`),
// rpc/codec.go imports the same proto package that grpc-go imports (as of crdb@dd87d1145 and grpc-go@7b167fd6).
stream.GrepNot(`pkg/rpc/codec.go:.*package github.com/golang/protobuf/proto is deprecated: Use the "google.golang.org/protobuf/proto" package instead.`),
), func(s string) {
t.Errorf("\n%s", s)
}); err != nil {
Expand Down

0 comments on commit 5ba160f

Please sign in to comment.