Skip to content

Commit

Permalink
[LB policies] fix handling of UpdateLocked() result (#36463)
Browse files Browse the repository at this point in the history
This fixes some TODOs added in #30809 for cases where LB policies lazily create child policies.  Credit to @ejona86 for pointing out that simply calling `RequestReresolution()` in this case will ultimately result in the exponential backoff behavior we want.

This also adds some missing plumbing in code added as part of the dualstack work (in the endpoint_list library and in ring_hash) to propagate non-OK statuses from `UpdateLocked()`.  When I first made the dualstack changes, I didn't bother with this plumbing, because there are no cases today where these code-paths will actually see a non-OK status (`EndpointAddresses` won't allow creating an endpoint with 0 addresses, and that's the only case where pick_first will return a non-OK status), and I wasn't sure if we would stick with the approach of returning status from `UpdateLocked()` due to the aforementioned lazy creation case.  However, now that we have a good solution for the lazy creation case, I've added the necessary plumbing, just so that we don't have a bug if in the future pick_first winds up returning non-OK status in some other case.

I have not bothered to fix the propagation in the grpclb policy, since that looked like it would be slightly more work than it's really worth at this point.

Closes #36463

COPYBARA_INTEGRATE_REVIEW=#36463 from markdroth:lb_reresolve_for_lazy_child_creation 49043b2
PiperOrigin-RevId: 629755047
  • Loading branch information
markdroth authored and Copybara-Service committed May 1, 2024
1 parent 834b511 commit 76c9376
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 37 deletions.
6 changes: 2 additions & 4 deletions src/core/load_balancing/endpoint_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class EndpointList::Endpoint::Helper final
// EndpointList::Endpoint
//

void EndpointList::Endpoint::Init(
absl::Status EndpointList::Endpoint::Init(
const EndpointAddresses& addresses, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer) {
ChannelArgs child_args =
Expand Down Expand Up @@ -123,9 +123,7 @@ void EndpointList::Endpoint::Init(
update_args.addresses = std::make_shared<SingleEndpointIterator>(addresses);
update_args.args = child_args;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
// we need to propagate that back to the resolver somehow.
(void)child_policy_->UpdateLocked(std::move(update_args));
return child_policy_->UpdateLocked(std::move(update_args));
}

void EndpointList::Endpoint::Orphan() {
Expand Down
21 changes: 14 additions & 7 deletions src/core/load_balancing/endpoint_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class MyEndpointList : public EndpointList {
public:
MyEndpointList(RefCountedPtr<MyLbPolicy> lb_policy,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
const ChannelArgs& args,
std::vector<std::string>* errors)
: EndpointList(std::move(lb_policy),
GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer)
? "MyEndpointList"
Expand All @@ -63,18 +64,23 @@ class MyEndpointList : public EndpointList {
const EndpointAddresses& addresses, const ChannelArgs& args) {
return MakeOrphanable<MyEndpoint>(
std::move(endpoint_list), addresses, args,
policy<MyLbPolicy>()->work_serializer());
policy<MyLbPolicy>()->work_serializer(), errors);
});
}
private:
class MyEndpoint : public Endpoint {
public:
MyEndpoint(RefCountedPtr<MyEndpointList> endpoint_list,
const EndpointAddresses& address, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer)
const EndpointAddresses& addresses, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer,
std::vector<std::string>* errors)
: Endpoint(std::move(endpoint_list)) {
Init(addresses, args, std::move(work_serializer));
absl::Status status = Init(addresses, args, std::move(work_serializer));
if (!status.ok()) {
errors->emplace_back(absl::StrCat(
"endpoint ", addresses.ToString(), ": ", status.ToString()));
}
}
private:
Expand Down Expand Up @@ -120,8 +126,9 @@ class EndpointList : public InternallyRefCounted<EndpointList> {
explicit Endpoint(RefCountedPtr<EndpointList> endpoint_list)
: endpoint_list_(std::move(endpoint_list)) {}

void Init(const EndpointAddresses& addresses, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer);
absl::Status Init(const EndpointAddresses& addresses,
const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer);

// Templated for convenience, to provide a short-hand for
// down-casting in the caller.
Expand Down
11 changes: 8 additions & 3 deletions src/core/load_balancing/priority/priority.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,11 +415,16 @@ void PriorityLb::ChoosePriorityLocked() {
child_name);
auto child_config = config_->children().find(child_name);
GPR_DEBUG_ASSERT(child_config != config_->children().end());
// TODO(roth): If the child reports a non-OK status with the
// update, we need to propagate that back to the resolver somehow.
(void)child->UpdateLocked(
// If the child policy returns a non-OK status, request re-resolution.
// Note that this will initially cause fixed backoff delay in the
// resolver instead of exponential delay. However, once the
// resolver returns the initial re-resolution, we will be able to
// return non-OK from UpdateLocked(), which will trigger
// exponential backoff instead.
absl::Status status = child->UpdateLocked(
child_config->second.config,
child_config->second.ignore_reresolution_requests);
if (!status.ok()) channel_control_helper()->RequestReresolution();
} else {
// The child already exists. Reactivate if needed.
child->MaybeReactivateLocked();
Expand Down
37 changes: 27 additions & 10 deletions src/core/load_balancing/ring_hash/ring_hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class RingHash final : public LoadBalancingPolicy {

size_t index() const { return index_; }

void UpdateLocked(size_t index);
absl::Status UpdateLocked(size_t index);

grpc_connectivity_state connectivity_state() const {
return connectivity_state_;
Expand All @@ -196,7 +196,7 @@ class RingHash final : public LoadBalancingPolicy {
class Helper;

void CreateChildPolicy();
void UpdateChildPolicyLocked();
absl::Status UpdateChildPolicyLocked();

// Called when the child policy reports a connectivity state update.
void OnStateUpdate(grpc_connectivity_state new_state,
Expand Down Expand Up @@ -498,9 +498,10 @@ void RingHash::RingHashEndpoint::Orphan() {
Unref();
}

void RingHash::RingHashEndpoint::UpdateLocked(size_t index) {
absl::Status RingHash::RingHashEndpoint::UpdateLocked(size_t index) {
index_ = index;
if (child_policy_ != nullptr) UpdateChildPolicyLocked();
if (child_policy_ == nullptr) return absl::OkStatus();
return UpdateChildPolicyLocked();
}

void RingHash::RingHashEndpoint::ResetBackoffLocked() {
Expand Down Expand Up @@ -541,10 +542,19 @@ void RingHash::RingHashEndpoint::CreateChildPolicy() {
// this policy, which in turn is tied to the application's call.
grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(),
ring_hash_->interested_parties());
UpdateChildPolicyLocked();
// If the child policy returns a non-OK status, request re-resolution.
// Note that this will initially cause fixed backoff delay in the
// resolver instead of exponential delay. However, once the
// resolver returns the initial re-resolution, we will be able to
// return non-OK from UpdateLocked(), which will trigger
// exponential backoff instead.
absl::Status status = UpdateChildPolicyLocked();
if (!status.ok()) {
ring_hash_->channel_control_helper()->RequestReresolution();
}
}

void RingHash::RingHashEndpoint::UpdateChildPolicyLocked() {
absl::Status RingHash::RingHashEndpoint::UpdateChildPolicyLocked() {
// Construct pick_first config.
auto config =
CoreConfiguration::Get().lb_policy_registry().ParseLoadBalancingConfig(
Expand All @@ -557,9 +567,7 @@ void RingHash::RingHashEndpoint::UpdateChildPolicyLocked() {
std::make_shared<SingleEndpointIterator>(ring_hash_->endpoints_[index_]);
update_args.args = ring_hash_->args_;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
// we need to propagate that back to the resolver somehow.
(void)child_policy_->UpdateLocked(std::move(update_args));
return child_policy_->UpdateLocked(std::move(update_args));
}

void RingHash::RingHashEndpoint::OnStateUpdate(
Expand Down Expand Up @@ -667,13 +675,18 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
this, static_cast<RingHashLbConfig*>(args.config.get()));
// Update endpoint map.
std::map<EndpointAddressSet, OrphanablePtr<RingHashEndpoint>> endpoint_map;
std::vector<std::string> errors;
for (size_t i = 0; i < endpoints_.size(); ++i) {
const EndpointAddresses& addresses = endpoints_[i];
const EndpointAddressSet address_set(addresses.addresses());
// If present in old map, retain it; otherwise, create a new one.
auto it = endpoint_map_.find(address_set);
if (it != endpoint_map_.end()) {
it->second->UpdateLocked(i);
absl::Status status = it->second->UpdateLocked(i);
if (!status.ok()) {
errors.emplace_back(absl::StrCat("endpoint ", address_set.ToString(),
": ", status.ToString()));
}
endpoint_map.emplace(address_set, std::move(it->second));
} else {
endpoint_map.emplace(address_set, MakeOrphanable<RingHashEndpoint>(
Expand All @@ -695,6 +708,10 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
// Return a new picker.
UpdateAggregatedConnectivityStateLocked(/*entered_transient_failure=*/false,
absl::OkStatus());
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}

Expand Down
13 changes: 10 additions & 3 deletions src/core/load_balancing/rls/rls.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1860,9 +1860,16 @@ void RlsLb::RlsRequest::OnRlsCallCompleteLocked(grpc_error_handle error) {
// Now that we've released the lock, finish the update on any newly
// created child policies.
for (ChildPolicyWrapper* child : child_policies_to_finish_update) {
// TODO(roth): If the child reports an error with the update, we
// need to propagate that back to the resolver somehow.
(void)child->MaybeFinishUpdate();
// If the child policy returns a non-OK status, request re-resolution.
// Note that this will initially cause fixed backoff delay in the
// resolver instead of exponential delay. However, once the
// resolver returns the initial re-resolution, we will be able to
// return non-OK from UpdateLocked(), which will trigger
// exponential backoff instead.
absl::Status status = child->MaybeFinishUpdate();
if (!status.ok()) {
lb_policy_->channel_control_helper()->RequestReresolution();
}
}
}

Expand Down
21 changes: 16 additions & 5 deletions src/core/load_balancing/round_robin/round_robin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ class RoundRobin final : public LoadBalancingPolicy {
public:
RoundRobinEndpointList(RefCountedPtr<RoundRobin> round_robin,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
const ChannelArgs& args,
std::vector<std::string>* errors)
: EndpointList(std::move(round_robin),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
? "RoundRobinEndpointList"
Expand All @@ -82,7 +83,7 @@ class RoundRobin final : public LoadBalancingPolicy {
const EndpointAddresses& addresses, const ChannelArgs& args) {
return MakeOrphanable<RoundRobinEndpoint>(
std::move(endpoint_list), addresses, args,
policy<RoundRobin>()->work_serializer());
policy<RoundRobin>()->work_serializer(), errors);
});
}

Expand All @@ -92,9 +93,14 @@ class RoundRobin final : public LoadBalancingPolicy {
RoundRobinEndpoint(RefCountedPtr<EndpointList> endpoint_list,
const EndpointAddresses& addresses,
const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer)
std::shared_ptr<WorkSerializer> work_serializer,
std::vector<std::string>* errors)
: Endpoint(std::move(endpoint_list)) {
Init(addresses, args, std::move(work_serializer));
absl::Status status = Init(addresses, args, std::move(work_serializer));
if (!status.ok()) {
errors->emplace_back(absl::StrCat("endpoint ", addresses.ToString(),
": ", status.ToString()));
}
}

private:
Expand Down Expand Up @@ -255,9 +261,10 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RR %p] replacing previous pending child list %p", this,
latest_pending_endpoint_list_.get());
}
std::vector<std::string> errors;
latest_pending_endpoint_list_ = MakeOrphanable<RoundRobinEndpointList>(
RefAsSubclass<RoundRobin>(DEBUG_LOCATION, "RoundRobinEndpointList"),
addresses, args.args);
addresses, args.args, &errors);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
if (latest_pending_endpoint_list_->size() == 0) {
Expand All @@ -281,6 +288,10 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
if (endpoint_list_ == nullptr) {
endpoint_list_ = std::move(latest_pending_endpoint_list_);
}
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,16 @@ class WeightedRoundRobin final : public LoadBalancingPolicy {
public:
WrrEndpoint(RefCountedPtr<EndpointList> endpoint_list,
const EndpointAddresses& addresses, const ChannelArgs& args,
std::shared_ptr<WorkSerializer> work_serializer)
std::shared_ptr<WorkSerializer> work_serializer,
std::vector<std::string>* errors)
: Endpoint(std::move(endpoint_list)),
weight_(policy<WeightedRoundRobin>()->GetOrCreateWeight(
addresses.addresses())) {
Init(addresses, args, std::move(work_serializer));
absl::Status status = Init(addresses, args, std::move(work_serializer));
if (!status.ok()) {
errors->emplace_back(absl::StrCat("endpoint ", addresses.ToString(),
": ", status.ToString()));
}
}

RefCountedPtr<EndpointWeight> weight() const { return weight_; }
Expand Down Expand Up @@ -261,7 +266,7 @@ class WeightedRoundRobin final : public LoadBalancingPolicy {

WrrEndpointList(RefCountedPtr<WeightedRoundRobin> wrr,
EndpointAddressesIterator* endpoints,
const ChannelArgs& args)
const ChannelArgs& args, std::vector<std::string>* errors)
: EndpointList(std::move(wrr),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
? "WrrEndpointList"
Expand All @@ -271,7 +276,7 @@ class WeightedRoundRobin final : public LoadBalancingPolicy {
const EndpointAddresses& addresses, const ChannelArgs& args) {
return MakeOrphanable<WrrEndpoint>(
std::move(endpoint_list), addresses, args,
policy<WeightedRoundRobin>()->work_serializer());
policy<WeightedRoundRobin>()->work_serializer(), errors);
});
}

Expand Down Expand Up @@ -767,8 +772,9 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[WRR %p] replacing previous pending endpoint list %p",
this, latest_pending_endpoint_list_.get());
}
std::vector<std::string> errors;
latest_pending_endpoint_list_ = MakeOrphanable<WrrEndpointList>(
RefAsSubclass<WeightedRoundRobin>(), addresses.get(), args.args);
RefAsSubclass<WeightedRoundRobin>(), addresses.get(), args.args, &errors);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
if (latest_pending_endpoint_list_->size() == 0) {
Expand All @@ -792,6 +798,10 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
if (endpoint_list_.get() == nullptr) {
endpoint_list_ = std::move(latest_pending_endpoint_list_);
}
if (!errors.empty()) {
return absl::UnavailableError(absl::StrCat(
"errors from children: [", absl::StrJoin(errors, "; "), "]"));
}
return absl::OkStatus();
}

Expand Down

0 comments on commit 76c9376

Please sign in to comment.