Skip to content

Commit

Permalink
Merge pull request #20441 from markdroth/lb_policy_std
Browse files Browse the repository at this point in the history
Use std::function<> for recv_trailing_metadata callback in LB policy API.
  • Loading branch information
markdroth committed Oct 2, 2019
2 parents 7e23bbb + 206c11f commit c8c755a
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 53 deletions.
15 changes: 5 additions & 10 deletions src/core/ext/filters/client_channel/client_channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,9 @@ class CallData {
LbCallState lb_call_state_;
const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr;
RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
void (*lb_recv_trailing_metadata_ready_)(
void* user_data, grpc_error* error,
LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
LoadBalancingPolicy::CallState* call_state) = nullptr;
void* lb_recv_trailing_metadata_ready_user_data_ = nullptr;
std::function<void(grpc_error*, LoadBalancingPolicy::MetadataInterface*,
LoadBalancingPolicy::CallState*)>
lb_recv_trailing_metadata_ready_;
grpc_closure pick_closure_;

// For intercepting recv_trailing_metadata_ready for the LB policy.
Expand Down Expand Up @@ -2262,9 +2260,8 @@ void CallData::RecvTrailingMetadataReadyForLoadBalancingPolicy(
CallData* calld = static_cast<CallData*>(arg);
// Invoke callback to LB policy.
Metadata trailing_metadata(calld, calld->recv_trailing_metadata_);
calld->lb_recv_trailing_metadata_ready_(
calld->lb_recv_trailing_metadata_ready_user_data_, error,
&trailing_metadata, &calld->lb_call_state_);
calld->lb_recv_trailing_metadata_ready_(error, &trailing_metadata,
&calld->lb_call_state_);
// Chain to original callback.
GRPC_CLOSURE_RUN(calld->original_recv_trailing_metadata_ready_,
GRPC_ERROR_REF(error));
Expand Down Expand Up @@ -3963,8 +3960,6 @@ bool CallData::PickSubchannelLocked(grpc_call_element* elem,
GPR_ASSERT(connected_subchannel_ != nullptr);
}
lb_recv_trailing_metadata_ready_ = result.recv_trailing_metadata_ready;
lb_recv_trailing_metadata_ready_user_data_ =
result.recv_trailing_metadata_ready_user_data;
*error = result.error;
return true;
}
Expand Down
22 changes: 8 additions & 14 deletions src/core/ext/filters/client_channel/lb_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,14 @@ class LoadBalancingPolicy : public InternallyRefCounted<LoadBalancingPolicy> {

/// Used only if type is PICK_COMPLETE.
/// Callback set by LB policy to be notified of trailing metadata.
/// The user_data argument will be set to the
/// recv_trailing_metadata_ready_user_data field.
/// recv_trailing_metadata will be set to the metadata, which may be
/// modified by the callback. The callback does not take ownership,
/// however, so any data that needs to be used after returning must
/// be copied.
/// call_state can be used to obtain backend metric data.
// TODO(roth): Replace grpc_error with something better before we allow
// people outside of gRPC team to use this API.
void (*recv_trailing_metadata_ready)(
void* user_data, grpc_error* error,
MetadataInterface* recv_trailing_metadata,
CallState* call_state) = nullptr;
void* recv_trailing_metadata_ready_user_data = nullptr;
/// If set by LB policy, the client channel will invoke the callback
/// when trailing metadata is returned.
/// The metadata may be modified by the callback. However, the callback
/// does not take ownership, so any data that needs to be used after
/// returning must be copied.
/// The call state can be used to obtain backend metric data.
std::function<void(grpc_error*, MetadataInterface*, CallState*)>
recv_trailing_metadata_ready;
};

/// A subchannel picker is the object used to pick the subchannel to
Expand Down
30 changes: 10 additions & 20 deletions src/core/ext/filters/client_channel/lb_policy/xds/xds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,6 @@ class XdsLb : public LoadBalancingPolicy {
PickResult Pick(PickArgs args);

private:
static void RecordCallCompletion(
void* arg, grpc_error* error,
LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
LoadBalancingPolicy::CallState* call_state);

UniquePtr<SubchannelPicker> picker_;
RefCountedPtr<XdsClientStats::LocalityStats> locality_stats_;
};
Expand Down Expand Up @@ -728,25 +723,20 @@ LoadBalancingPolicy::PickResult XdsLb::PickerWrapper::Pick(
// Record a call started.
locality_stats_->AddCallStarted();
// Intercept the recv_trailing_metadata op to record call completion.
result.recv_trailing_metadata_ready = RecordCallCompletion;
result.recv_trailing_metadata_ready_user_data =
XdsClientStats::LocalityStats* locality_stats =
locality_stats_->Ref(DEBUG_LOCATION, "LocalityStats+call").release();
result.recv_trailing_metadata_ready =
// Note: This callback does not run in either the control plane
// combiner or in the data plane mutex.
[locality_stats](grpc_error* error, MetadataInterface* metadata,
CallState* call_state) {
const bool call_failed = error != GRPC_ERROR_NONE;
locality_stats->AddCallFinished(call_failed);
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
};
return result;
}

// Note that the following callback does not run in either the control plane
// combiner or the data plane combiner.
void XdsLb::PickerWrapper::RecordCallCompletion(
void* arg, grpc_error* error,
LoadBalancingPolicy::MetadataInterface* recv_trailing_metadata,
LoadBalancingPolicy::CallState* call_state) {
XdsClientStats::LocalityStats* locality_stats =
static_cast<XdsClientStats::LocalityStats*>(arg);
const bool call_failed = error != GRPC_ERROR_NONE;
locality_stats->AddCallFinished(call_failed);
locality_stats->Unref(DEBUG_LOCATION, "LocalityStats+call");
}

//
// XdsLb::Picker
//
Expand Down
19 changes: 10 additions & 9 deletions test/core/util/test_lb_policies.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,22 +177,23 @@ class InterceptRecvTrailingMetadataLoadBalancingPolicy
InterceptRecvTrailingMetadataCallback cb,
void* user_data)
: cb_(cb), user_data_(user_data) {
result->recv_trailing_metadata_ready = &RecordRecvTrailingMetadata;
result->recv_trailing_metadata_ready_user_data = this;
result->recv_trailing_metadata_ready = [this](grpc_error* error,
MetadataInterface* metadata,
CallState* call_state) {
RecordRecvTrailingMetadata(error, metadata, call_state);
};
}

private:
static void RecordRecvTrailingMetadata(
void* arg, grpc_error* error, MetadataInterface* recv_trailing_metadata,
CallState* call_state) {
TrailingMetadataHandler* self =
static_cast<TrailingMetadataHandler*>(arg);
void RecordRecvTrailingMetadata(grpc_error* error,
MetadataInterface* recv_trailing_metadata,
CallState* call_state) {
GPR_ASSERT(recv_trailing_metadata != nullptr);
gpr_log(GPR_INFO, "trailing metadata:");
InterceptRecvTrailingMetadataLoadBalancingPolicy::LogMetadata(
recv_trailing_metadata);
self->cb_(self->user_data_, call_state->GetBackendMetricData());
self->~TrailingMetadataHandler();
cb_(user_data_, call_state->GetBackendMetricData());
this->~TrailingMetadataHandler();
}

InterceptRecvTrailingMetadataCallback cb_;
Expand Down

0 comments on commit c8c755a

Please sign in to comment.