Skip to content

Commit

Permalink
Merge pull request #20406 from markdroth/client_channel_connectivity_api
Browse files Browse the repository at this point in the history
Use more normal API for client channel connectivity watches from internal code
  • Loading branch information
markdroth committed Oct 2, 2019
2 parents ad1ece8 + 490be92 commit 7e23bbb
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 62 deletions.
100 changes: 100 additions & 0 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,17 @@ class ChannelData {
return static_cast<int>(external_watchers_.size());
}

void AddConnectivityWatcher(
grpc_connectivity_state initial_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher);
void RemoveConnectivityWatcher(
AsyncConnectivityStateWatcherInterface* watcher);

private:
class SubchannelWrapper;
class ClientChannelControlHelper;
class ConnectivityWatcherAdder;
class ConnectivityWatcherRemover;

// Represents a pending connectivity callback from an external caller
// via grpc_client_channel_watch_connectivity_state().
Expand Down Expand Up @@ -1202,6 +1210,72 @@ void ChannelData::ExternalConnectivityWatcher::RemoveWatcherLocked(
self->chand_->state_tracker_.RemoveWatcher(self);
}

//
// ChannelData::ConnectivityWatcherAdder
//

class ChannelData::ConnectivityWatcherAdder {
public:
ConnectivityWatcherAdder(
ChannelData* chand, grpc_connectivity_state initial_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher)
: chand_(chand),
initial_state_(initial_state),
watcher_(std::move(watcher)) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherAdder");
GRPC_CLOSURE_INIT(&closure_, &ConnectivityWatcherAdder::AddWatcherLocked,
this, grpc_combiner_scheduler(chand_->combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}

private:
static void AddWatcherLocked(void* arg, grpc_error* error) {
ConnectivityWatcherAdder* self =
static_cast<ConnectivityWatcherAdder*>(arg);
self->chand_->state_tracker_.AddWatcher(self->initial_state_,
std::move(self->watcher_));
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
"ConnectivityWatcherAdder");
Delete(self);
}

ChannelData* chand_;
grpc_connectivity_state initial_state_;
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher_;
grpc_closure closure_;
};

//
// ChannelData::ConnectivityWatcherRemover
//

class ChannelData::ConnectivityWatcherRemover {
public:
ConnectivityWatcherRemover(ChannelData* chand,
AsyncConnectivityStateWatcherInterface* watcher)
: chand_(chand), watcher_(watcher) {
GRPC_CHANNEL_STACK_REF(chand_->owning_stack_, "ConnectivityWatcherRemover");
GRPC_CLOSURE_INIT(&closure_,
&ConnectivityWatcherRemover::RemoveWatcherLocked, this,
grpc_combiner_scheduler(chand_->combiner_));
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}

private:
static void RemoveWatcherLocked(void* arg, grpc_error* error) {
ConnectivityWatcherRemover* self =
static_cast<ConnectivityWatcherRemover*>(arg);
self->chand_->state_tracker_.RemoveWatcher(self->watcher_);
GRPC_CHANNEL_STACK_UNREF(self->chand_->owning_stack_,
"ConnectivityWatcherRemover");
Delete(self);
}

ChannelData* chand_;
AsyncConnectivityStateWatcherInterface* watcher_;
grpc_closure closure_;
};

//
// ChannelData::ClientChannelControlHelper
//
Expand Down Expand Up @@ -1886,6 +1960,17 @@ grpc_connectivity_state ChannelData::CheckConnectivityState(
return out;
}

void ChannelData::AddConnectivityWatcher(
grpc_connectivity_state initial_state,
OrphanablePtr<AsyncConnectivityStateWatcherInterface> watcher) {
New<ConnectivityWatcherAdder>(this, initial_state, std::move(watcher));
}

void ChannelData::RemoveConnectivityWatcher(
AsyncConnectivityStateWatcherInterface* watcher) {
New<ConnectivityWatcherRemover>(this, watcher);
}

//
// CallData implementation
//
Expand Down Expand Up @@ -3937,6 +4022,21 @@ void grpc_client_channel_watch_connectivity_state(
watcher_timer_init);
}

void grpc_client_channel_start_connectivity_watch(
grpc_channel_element* elem, grpc_connectivity_state initial_state,
grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface>
watcher) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->AddConnectivityWatcher(initial_state, std::move(watcher));
}

void grpc_client_channel_stop_connectivity_watch(
grpc_channel_element* elem,
grpc_core::AsyncConnectivityStateWatcherInterface* watcher) {
auto* chand = static_cast<ChannelData*>(elem->channel_data);
chand->RemoveConnectivityWatcher(watcher);
}

grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element* elem) {
auto* calld = static_cast<CallData*>(elem->call_data);
Expand Down
30 changes: 24 additions & 6 deletions src/core/ext/filters/client_channel/client_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,35 @@ grpc_connectivity_state grpc_client_channel_check_connectivity_state(
int grpc_client_channel_num_external_connectivity_watchers(
grpc_channel_element* elem);

// TODO(roth): This function is used both when handling external
// connectivity watchers and for LB policies like grpclb and xds that
// contain nested channels. In the latter case, we ideally want
// something closer to the normal connectivity state tracker API.
// When we have time, consider refactoring this somehow to allow each
// use-case to be handled more cleanly.
// Starts a one-time connectivity state watch. When the channel's state
// becomes different from *state, sets *state to the new state and
// schedules on_complete. The watcher_timer_init callback is invoked as
// soon as the watch is actually started (i.e., after hopping into the
// client channel combiner). I/O will be serviced via pollent.
//
// This is intended to be used when starting a watch from outside of C-core
// via grpc_channel_watch_connectivity_state(). It should not be used
// by other callers.
void grpc_client_channel_watch_connectivity_state(
grpc_channel_element* elem, grpc_polling_entity pollent,
grpc_connectivity_state* state, grpc_closure* on_complete,
grpc_closure* watcher_timer_init);

// Starts and stops a connectivity watch. The watcher will be initially
// notified as soon as the state changes from initial_state and then on
// every subsequent state change until either the watch is stopped or
// it is notified that the state has changed to SHUTDOWN.
//
// This is intended to be used when starting watches from code inside of
// C-core (e.g., for a nested control plane channel for things like xds).
void grpc_client_channel_start_connectivity_watch(
grpc_channel_element* elem, grpc_connectivity_state initial_state,
grpc_core::OrphanablePtr<grpc_core::AsyncConnectivityStateWatcherInterface>
watcher);
void grpc_client_channel_stop_connectivity_watch(
grpc_channel_element* elem,
grpc_core::AsyncConnectivityStateWatcherInterface* watcher);

/* Debug helper: pull the subchannel call from a call stack element */
grpc_core::RefCountedPtr<grpc_core::SubchannelCall>
grpc_client_channel_get_subchannel_call(grpc_call_element* elem);
Expand Down
94 changes: 42 additions & 52 deletions src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,17 @@ class XdsLb : public LoadBalancingPolicy {

void StartConnectivityWatchLocked();
void CancelConnectivityWatchLocked();
static void OnConnectivityChangedLocked(void* arg, grpc_error* error);

private:
class StateWatcher;

// The owning LB policy.
RefCountedPtr<XdsLb> xdslb_policy_;

// The channel and its status.
grpc_channel* channel_;
bool shutting_down_ = false;
grpc_connectivity_state connectivity_ = GRPC_CHANNEL_IDLE;
grpc_closure on_connectivity_changed_;
StateWatcher* watcher_ = nullptr;

// The retryable XDS calls to the LB server.
OrphanablePtr<RetryableLbCall<EdsCallState>> eds_calld_;
Expand Down Expand Up @@ -862,6 +862,39 @@ void XdsLb::FallbackHelper::AddTraceEvent(TraceSeverity severity,
parent_->channel_control_helper()->AddTraceEvent(severity, message);
}

//
// XdsLb::LbChannelState::StateWatcher
//

class XdsLb::LbChannelState::StateWatcher
: public AsyncConnectivityStateWatcherInterface {
public:
explicit StateWatcher(RefCountedPtr<LbChannelState> parent)
: AsyncConnectivityStateWatcherInterface(
grpc_combiner_scheduler(parent->xdslb_policy_->combiner())),
parent_(std::move(parent)) {}

private:
void OnConnectivityStateChange(grpc_connectivity_state new_state) override {
if (!parent_->shutting_down_ &&
parent_->xdslb_policy_->fallback_at_startup_checks_pending_ &&
new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log(GPR_INFO,
"[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode",
parent_->xdslb_policy_.get());
parent_->xdslb_policy_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&parent_->xdslb_policy_->lb_fallback_timer_);
parent_->xdslb_policy_->UpdateFallbackPolicyLocked();
parent_->CancelConnectivityWatchLocked();
}
}

RefCountedPtr<LbChannelState> parent_;
};

//
// XdsLb::LbChannelState
//
Expand All @@ -871,8 +904,6 @@ XdsLb::LbChannelState::LbChannelState(RefCountedPtr<XdsLb> xdslb_policy,
const grpc_channel_args& args)
: InternallyRefCounted<LbChannelState>(&grpc_lb_xds_trace),
xdslb_policy_(std::move(xdslb_policy)) {
GRPC_CLOSURE_INIT(&on_connectivity_changed_, OnConnectivityChangedLocked,
this, grpc_combiner_scheduler(xdslb_policy_->combiner()));
channel_ = CreateXdsBalancerChannel(balancer_name, args);
GPR_ASSERT(channel_ != nullptr);
eds_calld_.reset(New<RetryableLbCall<EdsCallState>>(
Expand Down Expand Up @@ -900,56 +931,17 @@ void XdsLb::LbChannelState::StartConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
// Ref held by callback.
Ref(DEBUG_LOCATION, "LbChannelState+start_watch").release();
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
xdslb_policy_->interested_parties()),
&connectivity_, &on_connectivity_changed_, nullptr);
auto watcher = MakeOrphanable<StateWatcher>(Ref());
watcher_ = watcher.get();
grpc_client_channel_start_connectivity_watch(
client_channel_elem, GRPC_CHANNEL_IDLE, std::move(watcher));
}

void XdsLb::LbChannelState::CancelConnectivityWatchLocked() {
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(grpc_channel_get_channel_stack(channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
xdslb_policy_->interested_parties()),
nullptr, &on_connectivity_changed_, nullptr);
}

void XdsLb::LbChannelState::OnConnectivityChangedLocked(void* arg,
grpc_error* error) {
LbChannelState* self = static_cast<LbChannelState*>(arg);
if (!self->shutting_down_ &&
self->xdslb_policy_->fallback_at_startup_checks_pending_) {
if (self->connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
// Not in TRANSIENT_FAILURE. Renew connectivity watch.
grpc_channel_element* client_channel_elem =
grpc_channel_stack_last_element(
grpc_channel_get_channel_stack(self->channel_));
GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
grpc_client_channel_watch_connectivity_state(
client_channel_elem,
grpc_polling_entity_create_from_pollset_set(
self->xdslb_policy_->interested_parties()),
&self->connectivity_, &self->on_connectivity_changed_, nullptr);
return; // Early out so we don't drop the ref below.
}
// In TRANSIENT_FAILURE. Cancel the fallback timer and go into
// fallback mode immediately.
gpr_log(GPR_INFO,
"[xdslb %p] Balancer channel in state TRANSIENT_FAILURE; "
"entering fallback mode",
self);
self->xdslb_policy_->fallback_at_startup_checks_pending_ = false;
grpc_timer_cancel(&self->xdslb_policy_->lb_fallback_timer_);
self->xdslb_policy_->UpdateFallbackPolicyLocked();
}
// Done watching connectivity state, so drop ref.
self->Unref(DEBUG_LOCATION, "LbChannelState+watch_done");
grpc_client_channel_stop_connectivity_watch(client_channel_elem, watcher_);
}

//
Expand Down Expand Up @@ -1843,9 +1835,7 @@ void XdsLb::ShutdownLocked() {
gpr_log(GPR_INFO, "[xdslb %p] shutting down", this);
}
shutting_down_ = true;
if (fallback_at_startup_checks_pending_) {
grpc_timer_cancel(&lb_fallback_timer_);
}
MaybeCancelFallbackAtStartupChecks();
priority_list_.ShutdownLocked();
if (fallback_policy_ != nullptr) {
grpc_pollset_set_del_pollset_set(fallback_policy_->interested_parties(),
Expand Down
7 changes: 3 additions & 4 deletions src/core/lib/transport/connectivity_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,9 @@ const char* ConnectivityStateName(grpc_connectivity_state state) {
class AsyncConnectivityStateWatcherInterface::Notifier {
public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
grpc_connectivity_state state)
grpc_connectivity_state state, grpc_closure_scheduler* scheduler)
: watcher_(std::move(watcher)), state_(state) {
GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
grpc_schedule_on_exec_ctx);
GRPC_CLOSURE_INIT(&closure_, SendNotification, this, scheduler);
GRPC_CLOSURE_SCHED(&closure_, GRPC_ERROR_NONE);
}

Expand All @@ -82,7 +81,7 @@ class AsyncConnectivityStateWatcherInterface::Notifier {

void AsyncConnectivityStateWatcherInterface::Notify(
grpc_connectivity_state state) {
New<Notifier>(Ref(), state); // Deletes itself when done.
New<Notifier>(Ref(), state, scheduler_); // Deletes itself when done.
}

//
Expand Down
8 changes: 8 additions & 0 deletions src/core/lib/transport/connectivity_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "src/core/lib/gprpp/map.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/exec_ctx.h"

namespace grpc_core {

Expand Down Expand Up @@ -67,8 +68,15 @@ class AsyncConnectivityStateWatcherInterface
protected:
class Notifier;

explicit AsyncConnectivityStateWatcherInterface(
grpc_closure_scheduler* scheduler = grpc_schedule_on_exec_ctx)
: scheduler_(scheduler) {}

// Invoked asynchronously when Notify() is called.
virtual void OnConnectivityStateChange(grpc_connectivity_state new_state) = 0;

private:
grpc_closure_scheduler* scheduler_;
};

// Tracks connectivity state. Maintains a list of watchers that are
Expand Down

0 comments on commit 7e23bbb

Please sign in to comment.