Skip to content

Commit

Permalink
[CSM] Fix CSM Observability for trailers-only response (#36413)
Browse files Browse the repository at this point in the history
Before this change, on a trailers-only response, Metadata Exchange needed by CSM would just be dropped, and hence CSM labels would not be seen.

Changes -
* OTel call attempt tracer populates labels from trailers if it's a trailers-only response.
* HTTP2 layer propagates the Metadata Exchange field from headers to trailers for trailers-only responses.
* Add a test to make sure that retries continue to work when Metadata Exchange is enabled and a trailers-only response is sent. (This verifies that a trailers-only response remains a trailers-only response.)

Closes #36413

COPYBARA_INTEGRATE_REVIEW=#36413 from yashykt:MetadataExchangeInTrailers e7d2026
PiperOrigin-RevId: 630144618
  • Loading branch information
yashykt authored and Copybara-Service committed May 2, 2024
1 parent ffe73d2 commit 108ee94
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 60 deletions.
47 changes: 30 additions & 17 deletions src/core/ext/transport/chttp2/transport/writing.cc
Expand Up @@ -561,14 +561,6 @@ class StreamWriteContext {
grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
&s_->stats.outgoing, t_->outbuf.c_slice_buffer());
} else {
if (send_status_.has_value()) {
s_->send_trailing_metadata->Set(grpc_core::HttpStatusMetadata(),
*send_status_);
}
if (send_content_type_.has_value()) {
s_->send_trailing_metadata->Set(grpc_core::ContentTypeMetadata(),
*send_content_type_);
}
t_->hpack_compressor.EncodeHeaders(
grpc_core::HPackCompressor::EncodeHeaderOptions{
s_->id, true, t_->settings.peer().allow_true_binary_metadata(),
Expand All @@ -588,15 +580,39 @@ class StreamWriteContext {
bool stream_became_writable() { return stream_became_writable_; }

private:
class TrailersOnlyMetadataEncoder {
public:
explicit TrailersOnlyMetadataEncoder(grpc_metadata_batch* trailing_md)
: trailing_md_(trailing_md) {}

template <typename Which, typename Value>
void Encode(Which which, Value value) {
if (Which::kTransferOnTrailersOnly) {
trailing_md_->Set(which, value);
}
}

template <typename Which>
void Encode(Which which, const grpc_core::Slice& value) {
if (Which::kTransferOnTrailersOnly) {
trailing_md_->Set(which, value.Ref());
}
}

// Non-grpc metadata should not be transferred.
void Encode(const grpc_core::Slice&, const grpc_core::Slice&) {}

private:
grpc_metadata_batch* trailing_md_;
};

void ConvertInitialMetadataToTrailingMetadata() {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
// When sending Trailers-Only, we need to move the :status and
// content-type headers to the trailers.
send_status_ =
s_->send_initial_metadata->get(grpc_core::HttpStatusMetadata());
send_content_type_ =
s_->send_initial_metadata->get(grpc_core::ContentTypeMetadata());
// When sending Trailers-Only, we need to move metadata from headers to
// trailers.
TrailersOnlyMetadataEncoder encoder(s_->send_trailing_metadata);
s_->send_initial_metadata->Encode(&encoder);
}

void SentLastFrame() {
Expand Down Expand Up @@ -629,9 +645,6 @@ class StreamWriteContext {
grpc_chttp2_transport* const t_;
grpc_chttp2_stream* const s_;
bool stream_became_writable_ = false;
absl::optional<uint32_t> send_status_;
absl::optional<grpc_core::ContentTypeMetadata::ValueType> send_content_type_ =
{};
};
} // namespace

Expand Down
29 changes: 29 additions & 0 deletions src/core/lib/transport/metadata_batch.h
Expand Up @@ -77,6 +77,7 @@ size_t EncodedSizeOfKey(Key, const typename Key::ValueType& value) {
// should not need to.
struct GrpcTimeoutMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using ValueType = Timestamp;
using MementoType = Duration;
using CompressionTraits = TimeoutCompressor;
Expand All @@ -93,6 +94,7 @@ struct GrpcTimeoutMetadata {
// TE metadata trait.
struct TeMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
// HTTP2 says that TE can either be empty or "trailers".
// Empty means this trait is not included, "trailers" means kTrailers, and
// kInvalid is used to remember an invalid value.
Expand Down Expand Up @@ -122,6 +124,7 @@ inline size_t EncodedSizeOfKey(TeMetadata, TeMetadata::ValueType x) {
// content-type metadata trait.
struct ContentTypeMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = true;
// gRPC says that content-type can be application/grpc[;something]
// Core has only ever verified the prefix.
// IF we want to start verifying more, we can expand this type.
Expand Down Expand Up @@ -150,6 +153,7 @@ struct ContentTypeMetadata {
// scheme metadata trait.
struct HttpSchemeMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
enum ValueType : uint8_t {
kHttp,
kHttps,
Expand Down Expand Up @@ -179,6 +183,7 @@ size_t EncodedSizeOfKey(HttpSchemeMetadata, HttpSchemeMetadata::ValueType x);
// method metadata trait.
struct HttpMethodMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
enum ValueType : uint8_t {
kPost,
kGet,
Expand Down Expand Up @@ -227,6 +232,7 @@ struct CompressionAlgorithmBasedMetadata {
// grpc-encoding metadata trait.
struct GrpcEncodingMetadata : public CompressionAlgorithmBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits =
SmallIntegralValuesCompressor<GRPC_COMPRESS_ALGORITHMS_COUNT>;
static absl::string_view key() { return "grpc-encoding"; }
Expand All @@ -235,13 +241,15 @@ struct GrpcEncodingMetadata : public CompressionAlgorithmBasedMetadata {
// grpc-internal-encoding-request metadata trait.
struct GrpcInternalEncodingRequest : public CompressionAlgorithmBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "grpc-internal-encoding-request"; }
};

// grpc-accept-encoding metadata trait.
struct GrpcAcceptEncodingMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
static absl::string_view key() { return "grpc-accept-encoding"; }
using ValueType = CompressionAlgorithmSet;
using MementoType = ValueType;
Expand All @@ -260,69 +268,79 @@ struct GrpcAcceptEncodingMetadata {
// user-agent metadata trait.
struct UserAgentMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = StableValueCompressor;
static absl::string_view key() { return "user-agent"; }
};

// grpc-message metadata trait.
struct GrpcMessageMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "grpc-message"; }
};

// host metadata trait.
struct HostMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "host"; }
};

// endpoint-load-metrics-bin metadata trait.
struct EndpointLoadMetricsBinMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "endpoint-load-metrics-bin"; }
};

// grpc-server-stats-bin metadata trait.
struct GrpcServerStatsBinMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "grpc-server-stats-bin"; }
};

// grpc-trace-bin metadata trait.
struct GrpcTraceBinMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = FrequentKeyWithNoValueCompressionCompressor;
static absl::string_view key() { return "grpc-trace-bin"; }
};

// grpc-tags-bin metadata trait.
struct GrpcTagsBinMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = FrequentKeyWithNoValueCompressionCompressor;
static absl::string_view key() { return "grpc-tags-bin"; }
};

// XEnvoyPeerMetadata
struct XEnvoyPeerMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = true;
using CompressionTraits = StableValueCompressor;
static absl::string_view key() { return "x-envoy-peer-metadata"; }
};

// :authority metadata trait.
struct HttpAuthorityMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = SmallSetOfValuesCompressor;
static absl::string_view key() { return ":authority"; }
};

// :path metadata trait.
struct HttpPathMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = SmallSetOfValuesCompressor;
static absl::string_view key() { return ":path"; }
};
Expand Down Expand Up @@ -357,6 +375,7 @@ struct SimpleIntBasedMetadata : public SimpleIntBasedMetadataBase<Int> {
struct GrpcStatusMetadata
: public SimpleIntBasedMetadata<grpc_status_code, GRPC_STATUS_UNKNOWN> {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = SmallIntegralValuesCompressor<16>;
static absl::string_view key() { return "grpc-status"; }
};
Expand All @@ -365,13 +384,15 @@ struct GrpcStatusMetadata
struct GrpcPreviousRpcAttemptsMetadata
: public SimpleIntBasedMetadata<uint32_t, 0> {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "grpc-previous-rpc-attempts"; }
};

// grpc-retry-pushback-ms metadata trait.
struct GrpcRetryPushbackMsMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
static absl::string_view key() { return "grpc-retry-pushback-ms"; }
using ValueType = Duration;
using MementoType = Duration;
Expand All @@ -389,6 +410,7 @@ struct GrpcRetryPushbackMsMetadata {
// TODO(ctiller): consider moving to uint16_t
struct HttpStatusMetadata : public SimpleIntBasedMetadata<uint32_t, 0> {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = true;
using CompressionTraits = HttpStatusCompressor;
static absl::string_view key() { return ":status"; }
};
Expand All @@ -399,6 +421,7 @@ class GrpcLbClientStats;

struct GrpcLbClientStatsMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
static absl::string_view key() { return "grpclb_client_stats"; }
using ValueType = GrpcLbClientStats*;
using MementoType = ValueType;
Expand All @@ -423,13 +446,15 @@ inline size_t EncodedSizeOfKey(GrpcLbClientStatsMetadata,
// lb-token metadata
struct LbTokenMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "lb-token"; }
};

// lb-cost-bin metadata
struct LbCostBinMetadata {
static constexpr bool kRepeatable = true;
static constexpr bool kTransferOnTrailersOnly = false;
static absl::string_view key() { return "lb-cost-bin"; }
struct ValueType {
double cost;
Expand All @@ -451,6 +476,7 @@ struct LbCostBinMetadata {
struct GrpcStreamNetworkState {
static absl::string_view DebugKey() { return "GrpcStreamNetworkState"; }
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
enum ValueType : uint8_t {
kNotSentOnWire,
kNotSeenByServer,
Expand Down Expand Up @@ -1149,6 +1175,9 @@ MetadataValueAsSlice(typename Which::ValueType value) {
// struct GrpcXyzMetadata {
// // Can this metadata field be repeated?
// static constexpr bool kRepeatable = ...;
// // Should this metadata be transferred from server headers to trailers on
// // Trailers-Only response?
// static constexpr bool kTransferOnTrailersOnly = ...;
// // The type that's stored on MetadataBatch
// using ValueType = ...;
// // The type that's stored in compression/decompression tables
Expand Down
38 changes: 26 additions & 12 deletions src/cpp/ext/otel/otel_client_call_tracer.cc
Expand Up @@ -88,17 +88,13 @@ OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(

void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
parent_->scope_config_->active_plugin_options_view().ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
size_t /*index*/) {
auto* labels_injector = plugin_option.labels_injector();
if (labels_injector != nullptr) {
injected_labels_from_plugin_options_.push_back(
labels_injector->GetLabels(recv_initial_metadata));
}
return true;
},
parent_->otel_plugin_);
if (recv_initial_metadata != nullptr &&
recv_initial_metadata->get(grpc_core::GrpcTrailersOnly())
.value_or(false)) {
is_trailers_only_ = true;
return;
}
PopulateLabelInjectors(recv_initial_metadata);
}

void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
Expand Down Expand Up @@ -143,8 +139,11 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::

void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) {
if (is_trailers_only_) {
PopulateLabelInjectors(recv_trailing_metadata);
}
std::array<std::pair<absl::string_view, absl::string_view>, 3>
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
Expand Down Expand Up @@ -214,6 +213,21 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::SetOptionalLabel(
optional_labels_[static_cast<size_t>(key)] = std::move(value);
}

void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
PopulateLabelInjectors(grpc_metadata_batch* metadata) {
parent_->scope_config_->active_plugin_options_view().ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
size_t /*index*/) {
auto* labels_injector = plugin_option.labels_injector();
if (labels_injector != nullptr) {
injected_labels_from_plugin_options_.push_back(
labels_injector->GetLabels(metadata));
}
return true;
},
parent_->otel_plugin_);
}

//
// OpenTelemetryPlugin::ClientCallTracer
//
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/ext/otel/otel_client_call_tracer.h
Expand Up @@ -95,6 +95,8 @@ class OpenTelemetryPlugin::ClientCallTracer
grpc_core::RefCountedStringValue value) override;

private:
void PopulateLabelInjectors(grpc_metadata_batch* metadata);

const ClientCallTracer* parent_;
const bool arena_allocated_;
// Start time (for measuring latency).
Expand All @@ -106,6 +108,7 @@ class OpenTelemetryPlugin::ClientCallTracer
optional_labels_;
std::vector<std::unique_ptr<LabelsIterable>>
injected_labels_from_plugin_options_;
bool is_trailers_only_ = false;
};

ClientCallTracer(
Expand Down

0 comments on commit 108ee94

Please sign in to comment.