Skip to content

Commit

Permalink
channelz: pass parent pointer instead of parent ID to RegisterSubChan…
Browse files Browse the repository at this point in the history
…nel (#7101)
  • Loading branch information
dfawley committed Apr 8, 2024
1 parent 0f6ef0f commit 92f6dd0
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 31 deletions.
4 changes: 2 additions & 2 deletions channelz/service/service_test.go
Expand Up @@ -334,7 +334,7 @@ func (s) TestGetChannel(t *testing.T) {
},
})

subChan := channelz.RegisterSubChannel(cids[0].ID, refNames[2])
subChan := channelz.RegisterSubChannel(cids[0], refNames[2])
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: "SubChannel Created",
Severity: channelz.CtInfo,
Expand Down Expand Up @@ -432,7 +432,7 @@ func (s) TestGetSubChannel(t *testing.T) {
Desc: "Channel Created",
Severity: channelz.CtInfo,
})
subChan := channelz.RegisterSubChannel(chann.ID, refNames[1])
subChan := channelz.RegisterSubChannel(chann, refNames[1])
defer channelz.RemoveEntry(subChan.ID)
channelz.AddTraceEvent(logger, subChan, 0, &channelz.TraceEvent{
Desc: subchanCreated,
Expand Down
2 changes: 1 addition & 1 deletion clientconn.go
Expand Up @@ -833,7 +833,7 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
addrs: copyAddressesWithoutBalancerAttributes(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz.ID, ""),
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
stateChan: make(chan struct{}),
}
Expand Down
21 changes: 11 additions & 10 deletions internal/channelz/funcs.go
Expand Up @@ -143,20 +143,21 @@ func RegisterChannel(parent *Channel, target string) *Channel {
// Returns a unique channelz identifier assigned to this subChannel.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterSubChannel(pid int64, ref string) *SubChannel {
func RegisterSubChannel(parent *Channel, ref string) *SubChannel {
id := IDGen.genID()
if !IsOn() {
return &SubChannel{ID: id}
}

sc := &SubChannel{
RefName: ref,
ID: id,
sockets: make(map[int64]string),
parent: db.getChannel(pid),
trace: &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},
RefName: ref,
parent: parent,
}
db.addSubChannel(id, sc, pid)

if !IsOn() {
return sc
}

sc.sockets = make(map[int64]string)
sc.trace = &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())}
db.addSubChannel(id, sc, parent.ID)
return sc
}

Expand Down
19 changes: 13 additions & 6 deletions internal/transport/keepalive_test.go
Expand Up @@ -249,6 +249,16 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
}
}

func channelzSubChannel(t *testing.T) *channelz.SubChannel {
ch := channelz.RegisterChannel(nil, "test chan")
sc := channelz.RegisterSubChannel(ch, "test subchan")
t.Cleanup(func() {
channelz.RemoveEntry(sc.ID)
channelz.RemoveEntry(ch.ID)
})
return sc
}

// TestKeepaliveClientClosesUnresponsiveServer creates a server which does not
// respond to keepalive pings, and makes sure that the client closes the
// transport once the keepalive logic kicks in. Here, we set the
Expand All @@ -257,14 +267,13 @@ func (s) TestKeepaliveServerWithResponsiveClient(t *testing.T) {
func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
copts := ConnectOptions{
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchan"),
ChannelzParent: channelzSubChannel(t),
KeepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Millisecond,
Timeout: 10 * time.Millisecond,
PermitWithoutStream: true,
},
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))
Expand All @@ -288,13 +297,12 @@ func (s) TestKeepaliveClientClosesUnresponsiveServer(t *testing.T) {
func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
connCh := make(chan net.Conn, 1)
copts := ConnectOptions{
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchan"),
ChannelzParent: channelzSubChannel(t),
KeepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Millisecond,
Timeout: 10 * time.Millisecond,
},
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
client, cancel := setUpWithNoPingServer(t, copts, connCh)
defer cancel()
defer client.Close(fmt.Errorf("closed manually by test"))
Expand All @@ -319,13 +327,12 @@ func (s) TestKeepaliveClientOpenWithUnresponsiveServer(t *testing.T) {
func (s) TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
connCh := make(chan net.Conn, 1)
copts := ConnectOptions{
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchan"),
ChannelzParent: channelzSubChannel(t),
KeepaliveParams: keepalive.ClientParameters{
Time: 500 * time.Millisecond,
Timeout: 500 * time.Millisecond,
},
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
// TODO(i/6099): Setup a server which can ping and no-ping based on a flag to
// reduce the flakiness in this test.
client, cancel := setUpWithNoPingServer(t, copts, connCh)
Expand Down
15 changes: 5 additions & 10 deletions internal/transport/transport_test.go
Expand Up @@ -434,8 +434,7 @@ func setUp(t *testing.T, port int, ht hType) (*server, *http2Client, func()) {
func setUpWithOptions(t *testing.T, port int, sc *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
server := setUpServerOnly(t, port, sc, ht)
addr := resolver.Address{Addr: "localhost:" + server.port}
copts.ChannelzParent = channelz.RegisterSubChannel(-1, "test channel")
t.Cleanup(func() { channelz.RemoveEntry(copts.ChannelzParent.ID) })
copts.ChannelzParent = channelzSubChannel(t)

connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
ct, connErr := NewClientTransport(connectCtx, context.Background(), addr, copts, func(GoAwayReason) {})
Expand Down Expand Up @@ -1321,9 +1320,8 @@ func (s) TestClientHonorsConnectContext(t *testing.T) {
connectCtx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
time.AfterFunc(100*time.Millisecond, cancel)

parent := channelz.RegisterSubChannel(-1, "test channel")
parent := channelzSubChannel(t)
copts := ConnectOptions{ChannelzParent: parent}
defer channelz.RemoveEntry(parent.ID)
_, err = NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
if err == nil {
t.Fatalf("NewClientTransport() returned successfully; wanted error")
Expand Down Expand Up @@ -1414,8 +1412,7 @@ func (s) TestClientWithMisbehavedServer(t *testing.T) {
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
defer cancel()

parent := channelz.RegisterSubChannel(-1, "test channel")
defer channelz.RemoveEntry(parent.ID)
parent := channelzSubChannel(t)
copts := ConnectOptions{ChannelzParent: parent}
ct, err := NewClientTransport(connectCtx, context.Background(), resolver.Address{Addr: lis.Addr().String()}, copts, func(GoAwayReason) {})
if err != nil {
Expand Down Expand Up @@ -2425,9 +2422,8 @@ func (s) TestClientHandshakeInfo(t *testing.T) {

copts := ConnectOptions{
TransportCredentials: creds,
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchannel"),
ChannelzParent: channelzSubChannel(t),
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
Expand Down Expand Up @@ -2467,9 +2463,8 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) {

copts := ConnectOptions{
Dialer: dialer,
ChannelzParent: channelz.RegisterSubChannel(-1, "test subchannel"),
ChannelzParent: channelzSubChannel(t),
}
defer channelz.RemoveEntry(copts.ChannelzParent.ID)
tr, err := NewClientTransport(ctx, context.Background(), addr, copts, func(GoAwayReason) {})
if err != nil {
t.Fatalf("NewClientTransport(): %v", err)
Expand Down
4 changes: 2 additions & 2 deletions test/channelz_test.go
Expand Up @@ -554,8 +554,8 @@ func (s) TestCZRecusivelyDeletionOfEntry(t *testing.T) {
// Socket1 Socket2

topChan := channelz.RegisterChannel(nil, "")
subChan1 := channelz.RegisterSubChannel(topChan.ID, "")
subChan2 := channelz.RegisterSubChannel(topChan.ID, "")
subChan1 := channelz.RegisterSubChannel(topChan, "")
subChan2 := channelz.RegisterSubChannel(topChan, "")
skt1 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})
skt2 := channelz.RegisterSocket(&channelz.Socket{SocketType: channelz.SocketTypeNormal, Parent: subChan1})

Expand Down

0 comments on commit 92f6dd0

Please sign in to comment.