Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CSM] Fix CSM Observability for trailers-only response #36413

Closed
wants to merge 9 commits into from
47 changes: 30 additions & 17 deletions src/core/ext/transport/chttp2/transport/writing.cc
Expand Up @@ -561,14 +561,6 @@ class StreamWriteContext {
grpc_chttp2_encode_data(s_->id, &s_->flow_controlled_buffer, 0, true,
&s_->stats.outgoing, t_->outbuf.c_slice_buffer());
} else {
if (send_status_.has_value()) {
s_->send_trailing_metadata->Set(grpc_core::HttpStatusMetadata(),
*send_status_);
}
if (send_content_type_.has_value()) {
s_->send_trailing_metadata->Set(grpc_core::ContentTypeMetadata(),
*send_content_type_);
}
t_->hpack_compressor.EncodeHeaders(
grpc_core::HPackCompressor::EncodeHeaderOptions{
s_->id, true, t_->settings.peer().allow_true_binary_metadata(),
Expand All @@ -588,15 +580,39 @@ class StreamWriteContext {
bool stream_became_writable() { return stream_became_writable_; }

private:
class TrailersOnlyMetadataEncoder {
public:
explicit TrailersOnlyMetadataEncoder(grpc_metadata_batch* trailing_md)
: trailing_md_(trailing_md) {}

template <typename Which, typename Value>
void Encode(Which which, Value value) {
if (Which::kTransferOnTrailersOnly) {
trailing_md_->Set(which, value);
}
}

template <typename Which>
void Encode(Which which, const grpc_core::Slice& value) {
if (Which::kTransferOnTrailersOnly) {
trailing_md_->Set(which, value.Ref());
}
}

// Non-grpc metadata should not be transferred.
void Encode(const grpc_core::Slice&, const grpc_core::Slice&) {}

private:
grpc_metadata_batch* trailing_md_;
};

void ConvertInitialMetadataToTrailingMetadata() {
GRPC_CHTTP2_IF_TRACING(
gpr_log(GPR_INFO, "not sending initial_metadata (Trailers-Only)"));
// When sending Trailers-Only, we need to move the :status and
// content-type headers to the trailers.
send_status_ =
s_->send_initial_metadata->get(grpc_core::HttpStatusMetadata());
send_content_type_ =
s_->send_initial_metadata->get(grpc_core::ContentTypeMetadata());
// When sending Trailers-Only, we need to move metadata from headers to
// trailers.
TrailersOnlyMetadataEncoder encoder(s_->send_trailing_metadata);
s_->send_initial_metadata->Encode(&encoder);
}

void SentLastFrame() {
Expand Down Expand Up @@ -629,9 +645,6 @@ class StreamWriteContext {
grpc_chttp2_transport* const t_;
grpc_chttp2_stream* const s_;
bool stream_became_writable_ = false;
absl::optional<uint32_t> send_status_;
absl::optional<grpc_core::ContentTypeMetadata::ValueType> send_content_type_ =
{};
};
} // namespace

Expand Down
29 changes: 29 additions & 0 deletions src/core/lib/transport/metadata_batch.h
Expand Up @@ -76,6 +76,7 @@ size_t EncodedSizeOfKey(Key, const typename Key::ValueType& value) {
// should not need to.
struct GrpcTimeoutMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using ValueType = Timestamp;
using MementoType = Duration;
using CompressionTraits = TimeoutCompressor;
Expand All @@ -92,6 +93,7 @@ struct GrpcTimeoutMetadata {
// TE metadata trait.
struct TeMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
// HTTP2 says that TE can either be empty or "trailers".
// Empty means this trait is not included, "trailers" means kTrailers, and
// kInvalid is used to remember an invalid value.
Expand Down Expand Up @@ -121,6 +123,7 @@ inline size_t EncodedSizeOfKey(TeMetadata, TeMetadata::ValueType x) {
// content-type metadata trait.
struct ContentTypeMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = true;
// gRPC says that content-type can be application/grpc[;something]
// Core has only ever verified the prefix.
// IF we want to start verifying more, we can expand this type.
Expand Down Expand Up @@ -149,6 +152,7 @@ struct ContentTypeMetadata {
// scheme metadata trait.
struct HttpSchemeMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
enum ValueType : uint8_t {
kHttp,
kHttps,
Expand Down Expand Up @@ -178,6 +182,7 @@ size_t EncodedSizeOfKey(HttpSchemeMetadata, HttpSchemeMetadata::ValueType x);
// method metadata trait.
struct HttpMethodMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
enum ValueType : uint8_t {
kPost,
kGet,
Expand Down Expand Up @@ -226,6 +231,7 @@ struct CompressionAlgorithmBasedMetadata {
// grpc-encoding metadata trait.
struct GrpcEncodingMetadata : public CompressionAlgorithmBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits =
SmallIntegralValuesCompressor<GRPC_COMPRESS_ALGORITHMS_COUNT>;
static absl::string_view key() { return "grpc-encoding"; }
Expand All @@ -234,13 +240,15 @@ struct GrpcEncodingMetadata : public CompressionAlgorithmBasedMetadata {
// grpc-internal-encoding-request metadata trait.
struct GrpcInternalEncodingRequest : public CompressionAlgorithmBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "grpc-internal-encoding-request"; }
};

// grpc-accept-encoding metadata trait.
struct GrpcAcceptEncodingMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
static absl::string_view key() { return "grpc-accept-encoding"; }
using ValueType = CompressionAlgorithmSet;
using MementoType = ValueType;
Expand All @@ -259,69 +267,79 @@ struct GrpcAcceptEncodingMetadata {
// user-agent metadata trait.
struct UserAgentMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = StableValueCompressor;
static absl::string_view key() { return "user-agent"; }
};

// grpc-message metadata trait.
struct GrpcMessageMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "grpc-message"; }
};

// host metadata trait.
struct HostMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "host"; }
};

// endpoint-load-metrics-bin metadata trait.
struct EndpointLoadMetricsBinMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "endpoint-load-metrics-bin"; }
};

// grpc-server-stats-bin metadata trait.
struct GrpcServerStatsBinMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "grpc-server-stats-bin"; }
};

// grpc-trace-bin metadata trait.
struct GrpcTraceBinMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = FrequentKeyWithNoValueCompressionCompressor;
static absl::string_view key() { return "grpc-trace-bin"; }
};

// grpc-tags-bin metadata trait.
struct GrpcTagsBinMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = FrequentKeyWithNoValueCompressionCompressor;
static absl::string_view key() { return "grpc-tags-bin"; }
};

// XEnvoyPeerMetadata
struct XEnvoyPeerMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = true;
using CompressionTraits = StableValueCompressor;
static absl::string_view key() { return "x-envoy-peer-metadata"; }
};

// :authority metadata trait.
struct HttpAuthorityMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = SmallSetOfValuesCompressor;
static absl::string_view key() { return ":authority"; }
};

// :path metadata trait.
struct HttpPathMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = SmallSetOfValuesCompressor;
static absl::string_view key() { return ":path"; }
};
Expand Down Expand Up @@ -356,6 +374,7 @@ struct SimpleIntBasedMetadata : public SimpleIntBasedMetadataBase<Int> {
struct GrpcStatusMetadata
: public SimpleIntBasedMetadata<grpc_status_code, GRPC_STATUS_UNKNOWN> {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = SmallIntegralValuesCompressor<16>;
static absl::string_view key() { return "grpc-status"; }
};
Expand All @@ -364,13 +383,15 @@ struct GrpcStatusMetadata
struct GrpcPreviousRpcAttemptsMetadata
: public SimpleIntBasedMetadata<uint32_t, 0> {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "grpc-previous-rpc-attempts"; }
};

// grpc-retry-pushback-ms metadata trait.
struct GrpcRetryPushbackMsMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
static absl::string_view key() { return "grpc-retry-pushback-ms"; }
using ValueType = Duration;
using MementoType = Duration;
Expand All @@ -388,6 +409,7 @@ struct GrpcRetryPushbackMsMetadata {
// TODO(ctiller): consider moving to uint16_t
struct HttpStatusMetadata : public SimpleIntBasedMetadata<uint32_t, 0> {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = true;
using CompressionTraits = HttpStatusCompressor;
static absl::string_view key() { return ":status"; }
};
Expand All @@ -398,6 +420,7 @@ class GrpcLbClientStats;

struct GrpcLbClientStatsMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
static absl::string_view key() { return "grpclb_client_stats"; }
using ValueType = GrpcLbClientStats*;
using MementoType = ValueType;
Expand All @@ -422,13 +445,15 @@ inline size_t EncodedSizeOfKey(GrpcLbClientStatsMetadata,
// lb-token metadata
struct LbTokenMetadata : public SimpleSliceBasedMetadata {
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
using CompressionTraits = NoCompressionCompressor;
static absl::string_view key() { return "lb-token"; }
};

// lb-cost-bin metadata
struct LbCostBinMetadata {
static constexpr bool kRepeatable = true;
static constexpr bool kTransferOnTrailersOnly = false;
static absl::string_view key() { return "lb-cost-bin"; }
struct ValueType {
double cost;
Expand All @@ -450,6 +475,7 @@ struct LbCostBinMetadata {
struct GrpcStreamNetworkState {
static absl::string_view DebugKey() { return "GrpcStreamNetworkState"; }
static constexpr bool kRepeatable = false;
static constexpr bool kTransferOnTrailersOnly = false;
enum ValueType : uint8_t {
kNotSentOnWire,
kNotSeenByServer,
Expand Down Expand Up @@ -1148,6 +1174,9 @@ MetadataValueAsSlice(typename Which::ValueType value) {
// struct GrpcXyzMetadata {
// // Can this metadata field be repeated?
// static constexpr bool kRepeatable = ...;
// // Should this metadata be transferred from server headers to trailers on
// // Trailers-Only response?
// static constexpr bool kTransferOnTrailersOnly = ...;
// // The type that's stored on MetadataBatch
// using ValueType = ...;
// // The type that's stored in compression/decompression tables
Expand Down
38 changes: 26 additions & 12 deletions src/cpp/ext/otel/otel_client_call_tracer.cc
Expand Up @@ -88,17 +88,13 @@ OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::CallAttemptTracer(

void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedInitialMetadata(grpc_metadata_batch* recv_initial_metadata) {
parent_->scope_config_->active_plugin_options_view().ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
size_t /*index*/) {
auto* labels_injector = plugin_option.labels_injector();
if (labels_injector != nullptr) {
injected_labels_from_plugin_options_.push_back(
labels_injector->GetLabels(recv_initial_metadata));
}
return true;
},
parent_->otel_plugin_);
if (recv_initial_metadata != nullptr &&
recv_initial_metadata->get(grpc_core::GrpcTrailersOnly())
.value_or(false)) {
is_trailers_only_ = true;
return;
}
PopulateLabelInjectors(recv_initial_metadata);
}

void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
Expand Down Expand Up @@ -143,8 +139,11 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::

void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
RecordReceivedTrailingMetadata(
absl::Status status, grpc_metadata_batch* /*recv_trailing_metadata*/,
absl::Status status, grpc_metadata_batch* recv_trailing_metadata,
const grpc_transport_stream_stats* transport_stream_stats) {
if (is_trailers_only_) {
PopulateLabelInjectors(recv_trailing_metadata);
}
std::array<std::pair<absl::string_view, absl::string_view>, 3>
additional_labels = {
{{OpenTelemetryMethodKey(), parent_->MethodForStats()},
Expand Down Expand Up @@ -214,6 +213,21 @@ void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::SetOptionalLabel(
optional_labels_[static_cast<size_t>(key)] = std::move(value);
}

void OpenTelemetryPlugin::ClientCallTracer::CallAttemptTracer::
PopulateLabelInjectors(grpc_metadata_batch* metadata) {
parent_->scope_config_->active_plugin_options_view().ForEach(
[&](const InternalOpenTelemetryPluginOption& plugin_option,
size_t /*index*/) {
auto* labels_injector = plugin_option.labels_injector();
if (labels_injector != nullptr) {
injected_labels_from_plugin_options_.push_back(
labels_injector->GetLabels(metadata));
}
return true;
},
parent_->otel_plugin_);
}

//
// OpenTelemetryPlugin::ClientCallTracer
//
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/ext/otel/otel_client_call_tracer.h
Expand Up @@ -95,6 +95,8 @@ class OpenTelemetryPlugin::ClientCallTracer
grpc_core::RefCountedStringValue value) override;

private:
void PopulateLabelInjectors(grpc_metadata_batch* metadata);

const ClientCallTracer* parent_;
const bool arena_allocated_;
// Start time (for measuring latency).
Expand All @@ -106,6 +108,7 @@ class OpenTelemetryPlugin::ClientCallTracer
optional_labels_;
std::vector<std::unique_ptr<LabelsIterable>>
injected_labels_from_plugin_options_;
bool is_trailers_only_ = false;
};

ClientCallTracer(
Expand Down