Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[transport] Centralize ref-counting between transports #36460

Closed
wants to merge 14 commits into from
2 changes: 1 addition & 1 deletion src/core/ext/transport/chaotic_good/client_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class ChaoticGoodClientTransport final : public ClientTransport {
grpc_endpoint* GetEndpoint() override { return nullptr; }
void Orphan() override {
AbortWithError();
delete this;
Unref();
}

void StartCall(CallHandler call_handler) override;
Expand Down
2 changes: 1 addition & 1 deletion src/core/ext/transport/chaotic_good/server_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class ChaoticGoodServerTransport final : public ServerTransport {
void SetPollsetSet(grpc_stream*, grpc_pollset_set*) override {}
void PerformOp(grpc_transport_op*) override;
grpc_endpoint* GetEndpoint() override { return nullptr; }
void Orphan() override { delete this; }
void Orphan() override { Unref(); }

void SetAcceptor(Acceptor* acceptor) override;
void AbortWithError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ static bool g_default_server_keepalive_permit_without_calls = false;

#define MAX_CLIENT_STREAM_ID 0x7fffffffu
grpc_core::TraceFlag grpc_keepalive_trace(false, "http_keepalive");
grpc_core::DebugOnlyTraceFlag grpc_trace_chttp2_refcount(false,
"chttp2_refcount");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this from the list in doc/environment_variables.md.


// forward declarations of various callbacks that we'll build closures around
static void write_action_begin_locked(
Expand Down Expand Up @@ -594,12 +592,7 @@ static void init_keepalive_pings_if_enabled_locked(
grpc_chttp2_transport::grpc_chttp2_transport(
const grpc_core::ChannelArgs& channel_args, grpc_endpoint* ep,
bool is_client)
: grpc_core::RefCounted<grpc_chttp2_transport,
grpc_core::NonPolymorphicRefCount>(
GRPC_TRACE_FLAG_ENABLED(grpc_trace_chttp2_refcount)
? "chttp2_refcount"
: nullptr),
ep(ep),
: ep(ep),
peer_string(
grpc_core::Slice::FromCopiedString(grpc_endpoint_get_peer(ep))),
memory_owner(channel_args.GetObject<grpc_core::ResourceQuota>()
Expand Down
12 changes: 7 additions & 5 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,19 @@ typedef enum {
GRPC_CHTTP2_KEEPALIVE_STATE_DISABLED,
} grpc_chttp2_keepalive_state;

struct grpc_chttp2_transport final
: public grpc_core::FilterStackTransport,
public grpc_core::RefCounted<grpc_chttp2_transport,
grpc_core::NonPolymorphicRefCount>,
public grpc_core::KeepsGrpcInitialized {
struct grpc_chttp2_transport final : public grpc_core::FilterStackTransport,
public grpc_core::KeepsGrpcInitialized {
grpc_chttp2_transport(const grpc_core::ChannelArgs& channel_args,
grpc_endpoint* ep, bool is_client);
~grpc_chttp2_transport() override;

void Orphan() override;

grpc_core::RefCountedPtr<grpc_chttp2_transport> Ref() {
return grpc_core::FilterStackTransport::RefAsSubclass<
grpc_chttp2_transport>();
}

size_t SizeOfStream() const override;
bool HackyDisableStreamOpBatchCoalescingInConnectedChannel() const override;
void PerformStreamOp(grpc_stream* gs,
Expand Down
28 changes: 18 additions & 10 deletions src/core/ext/transport/inproc/inproc_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
namespace grpc_core {

namespace {
class InprocServerTransport final : public RefCounted<InprocServerTransport>,
public ServerTransport {
class InprocClientTransport;

class InprocServerTransport final : public ServerTransport {
public:
void SetAcceptor(Acceptor* acceptor) override {
acceptor_ = acceptor;
Expand Down Expand Up @@ -95,6 +96,8 @@ class InprocServerTransport final : public RefCounted<InprocServerTransport>,
return acceptor_->CreateCall(std::move(md), acceptor_->CreateArena());
}

OrphanablePtr<InprocClientTransport> MakeClientTransport();

private:
enum class ConnectionState : uint8_t { kInitial, kReady, kDisconnected };

Expand All @@ -109,6 +112,10 @@ class InprocServerTransport final : public RefCounted<InprocServerTransport>,

class InprocClientTransport final : public ClientTransport {
public:
explicit InprocClientTransport(
RefCountedPtr<InprocServerTransport> server_transport)
: server_transport_(std::move(server_transport)) {}

void StartCall(CallHandler call_handler) override {
call_handler.SpawnGuarded(
"pull_initial_metadata",
Expand All @@ -125,10 +132,6 @@ class InprocClientTransport final : public ClientTransport {

void Orphan() override { delete this; }

OrphanablePtr<Transport> GetServerTransport() {
return OrphanablePtr<Transport>(server_transport_->Ref().release());
}

FilterStackTransport* filter_stack_transport() override { return nullptr; }
ClientTransport* client_transport() override { return this; }
ServerTransport* server_transport() override { return nullptr; }
Expand All @@ -144,8 +147,7 @@ class InprocClientTransport final : public ClientTransport {
absl::UnavailableError("Client transport closed"));
}

RefCountedPtr<InprocServerTransport> server_transport_ =
MakeRefCounted<InprocServerTransport>();
const RefCountedPtr<InprocServerTransport> server_transport_;
};

bool UsePromiseBasedTransport() {
Expand All @@ -155,6 +157,12 @@ bool UsePromiseBasedTransport() {
return true;
}

OrphanablePtr<InprocClientTransport>
InprocServerTransport::MakeClientTransport() {
return MakeOrphanable<InprocClientTransport>(
RefAsSubclass<InprocServerTransport>());
}

OrphanablePtr<Channel> MakeLameChannel(absl::string_view why,
absl::Status error) {
gpr_log(GPR_ERROR, "%s: %s", std::string(why).c_str(),
Expand Down Expand Up @@ -196,8 +204,8 @@ OrphanablePtr<Channel> MakeInprocChannel(Server* server,

std::pair<OrphanablePtr<Transport>, OrphanablePtr<Transport>>
MakeInProcessTransportPair() {
auto client_transport = MakeOrphanable<InprocClientTransport>();
auto server_transport = client_transport->GetServerTransport();
auto server_transport = MakeOrphanable<InprocServerTransport>();
auto client_transport = server_transport->MakeClientTransport();
return std::make_pair(std::move(client_transport),
std::move(server_transport));
}
Expand Down
2 changes: 1 addition & 1 deletion src/core/lib/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ class FilterStackTransport;
class ClientTransport;
class ServerTransport;

class Transport : public Orphanable {
class Transport : public InternallyRefCounted<Transport> {
public:
struct RawPointerChannelArgTag {};
static absl::string_view ChannelArgName() { return GRPC_ARG_TRANSPORT; }
Expand Down