Skip to content

Commit

Permalink
router: add more upstream request metadata to tracing spans (envoypro…
Browse files Browse the repository at this point in the history
…xy#8289)

Signed-off-by: Teju Nareddy <nareddyt@google.com>
  • Loading branch information
nareddyt committed Sep 27, 2019
1 parent df67940 commit 2b63e91
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 42 deletions.
6 changes: 3 additions & 3 deletions source/common/http/conn_manager_impl.cc
Expand Up @@ -520,9 +520,9 @@ ConnectionManagerImpl::ActiveStream::~ActiveStream() {
}

if (active_span_) {
Tracing::HttpTracerUtility::finalizeSpan(*active_span_, request_headers_.get(),
response_headers_.get(), response_trailers_.get(),
stream_info_, *this);
Tracing::HttpTracerUtility::finalizeDownstreamSpan(
*active_span_, request_headers_.get(), response_headers_.get(), response_trailers_.get(),
stream_info_, *this);
}
if (state_.successful_upgrade_) {
connection_manager_.stats_.named_.downstream_cx_upgrades_active_.dec();
Expand Down
22 changes: 19 additions & 3 deletions source/common/router/router.cc
Expand Up @@ -1332,17 +1332,22 @@ Filter::UpstreamRequest::UpstreamRequest(Filter& parent, Http::ConnectionPool::I
span_ = parent_.callbacks_->activeSpan().spawnChild(
parent_.callbacks_->tracingConfig(), "router " + parent.cluster_->name() + " egress",
parent.timeSource().systemTime());
span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
if (parent.attempt_count_ != 1) {
// This is a retry request, add this metadata to span.
span_->setTag(Tracing::Tags::get().RetryCount, std::to_string(parent.attempt_count_ - 1));
}
}

stream_info_.healthCheck(parent_.callbacks_->streamInfo().healthCheck());
}

Filter::UpstreamRequest::~UpstreamRequest() {
if (span_ != nullptr) {
// TODO(mattklein123): Add tags based on what happened to this request (retries, reset, etc.).
span_->finishSpan();
Tracing::HttpTracerUtility::finalizeUpstreamSpan(*span_, upstream_headers_.get(),
upstream_trailers_.get(), stream_info_,
Tracing::EgressConfig::get());
}

if (per_try_timeout_ != nullptr) {
// Allows for testing.
per_try_timeout_->disableTimer();
Expand Down Expand Up @@ -1481,6 +1486,12 @@ void Filter::UpstreamRequest::onResetStream(Http::StreamResetReason reason,
absl::string_view transport_failure_reason) {
ScopeTrackerScopeState scope(&parent_.callbacks_->scope(), parent_.callbacks_->dispatcher());

if (span_ != nullptr) {
// Add tags about reset.
span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
span_->setTag(Tracing::Tags::get().ErrorReason, Http::Utility::resetReasonToString(reason));
}

clearRequestEncoder();
awaiting_headers_ = false;
if (!calling_encode_headers_) {
Expand All @@ -1497,6 +1508,11 @@ void Filter::UpstreamRequest::resetStream() {
return;
}

if (span_ != nullptr) {
// Add tags about the cancellation.
span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
}

if (conn_pool_stream_handle_) {
ENVOY_STREAM_LOG(debug, "cancelling pool request", *parent_.callbacks_);
ASSERT(!request_encoder_);
Expand Down
41 changes: 32 additions & 9 deletions source/common/tracing/http_tracer_impl.cc
Expand Up @@ -138,11 +138,11 @@ static void annotateVerbose(Span& span, const StreamInfo::StreamInfo& stream_inf
}
}

void HttpTracerUtility::finalizeSpan(Span& span, const Http::HeaderMap* request_headers,
const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers,
const StreamInfo::StreamInfo& stream_info,
const Config& tracing_config) {
void HttpTracerUtility::finalizeDownstreamSpan(Span& span, const Http::HeaderMap* request_headers,
const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers,
const StreamInfo::StreamInfo& stream_info,
const Config& tracing_config) {
// Pre response data.
if (request_headers) {
if (request_headers->RequestId()) {
Expand Down Expand Up @@ -173,14 +173,38 @@ void HttpTracerUtility::finalizeSpan(Span& span, const Http::HeaderMap* request_
}
}
span.setTag(Tracing::Tags::get().RequestSize, std::to_string(stream_info.bytesReceived()));
span.setTag(Tracing::Tags::get().ResponseSize, std::to_string(stream_info.bytesSent()));

setCommonTags(span, response_headers, response_trailers, stream_info, tracing_config);

span.finishSpan();
}

void HttpTracerUtility::finalizeUpstreamSpan(Span& span, const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers,
const StreamInfo::StreamInfo& stream_info,
const Config& tracing_config) {
span.setTag(Tracing::Tags::get().HttpProtocol,
AccessLog::AccessLogFormatUtils::protocolToString(stream_info.protocol()));

setCommonTags(span, response_headers, response_trailers, stream_info, tracing_config);

span.finishSpan();
}

void HttpTracerUtility::setCommonTags(Span& span, const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers,
const StreamInfo::StreamInfo& stream_info,
const Config& tracing_config) {

span.setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);

if (nullptr != stream_info.upstreamHost()) {
span.setTag(Tracing::Tags::get().UpstreamCluster, stream_info.upstreamHost()->cluster().name());
}

// Post response data.
span.setTag(Tracing::Tags::get().HttpStatusCode, buildResponseCode(stream_info));
span.setTag(Tracing::Tags::get().ResponseSize, std::to_string(stream_info.bytesSent()));
span.setTag(Tracing::Tags::get().ResponseFlags,
StreamInfo::ResponseFlagUtils::toShortString(stream_info));

Expand All @@ -198,8 +222,6 @@ void HttpTracerUtility::finalizeSpan(Span& span, const Http::HeaderMap* request_
if (!stream_info.responseCode() || Http::CodeUtility::is5xx(stream_info.responseCode().value())) {
span.setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
}

span.finishSpan();
}

HttpTracerImpl::HttpTracerImpl(DriverPtr&& driver, const LocalInfo::LocalInfo& local_info)
Expand All @@ -217,8 +239,9 @@ SpanPtr HttpTracerImpl::startSpan(const Config& config, Http::HeaderMap& request

SpanPtr active_span = driver_->startSpan(config, request_headers, span_name,
stream_info.startTime(), tracing_decision);

// Set tags related to the local environment
if (active_span) {
active_span->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
active_span->setTag(Tracing::Tags::get().NodeId, local_info_.nodeName());
active_span->setTag(Tracing::Tags::get().Zone, local_info_.zoneName());
}
Expand Down
30 changes: 24 additions & 6 deletions source/common/tracing/http_tracer_impl.h
Expand Up @@ -40,6 +40,7 @@ class TracingTagValues {

// Non-standard tag names.
const std::string DownstreamCluster = "downstream_cluster";
const std::string ErrorReason = "error.reason";
const std::string GrpcStatusCode = "grpc.status_code";
const std::string GrpcMessage = "grpc.message";
const std::string GuidXClientTraceId = "guid:x-client-trace-id";
Expand All @@ -49,6 +50,7 @@ class TracingTagValues {
const std::string RequestSize = "request_size";
const std::string ResponseFlags = "response_flags";
const std::string ResponseSize = "response_size";
const std::string RetryCount = "retry.count";
const std::string Status = "status";
const std::string UpstreamCluster = "upstream_cluster";
const std::string UserAgent = "user_agent";
Expand Down Expand Up @@ -98,13 +100,29 @@ class HttpTracerUtility {
const Http::HeaderMap& request_headers);

/**
* 1) Fill in span tags based on the response headers.
* 2) Finish active span.
* Adds information obtained from the downstream request headers as tags to the active span.
* Then finishes the span.
*/
static void finalizeSpan(Span& span, const Http::HeaderMap* request_headers,
const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers,
const StreamInfo::StreamInfo& stream_info, const Config& tracing_config);
static void finalizeDownstreamSpan(Span& span, const Http::HeaderMap* request_headers,
const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers,
const StreamInfo::StreamInfo& stream_info,
const Config& tracing_config);

/**
* Adds information obtained from the upstream request headers as tags to the active span.
* Then finishes the span.
*/
static void finalizeUpstreamSpan(Span& span, const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers,
const StreamInfo::StreamInfo& stream_info,
const Config& tracing_config);

private:
static void setCommonTags(Span& span, const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers,
const StreamInfo::StreamInfo& stream_info,
const Config& tracing_config);

static const std::string IngressOperation;
static const std::string EgressOperation;
Expand Down
136 changes: 132 additions & 4 deletions test/common/router/router_test.cc
Expand Up @@ -4400,6 +4400,7 @@ class RouterTestChildSpan : public RouterTestBase {
};

// Make sure child spans start/inject/finish with a normal flow.
// An upstream request succeeds and a single span is created.
TEST_F(RouterTestChildSpan, BasicFlow) {
EXPECT_CALL(callbacks_.route_->route_entry_, timeout())
.WillOnce(Return(std::chrono::milliseconds(0)));
Expand All @@ -4422,16 +4423,22 @@ TEST_F(RouterTestChildSpan, BasicFlow) {
EXPECT_CALL(callbacks_.active_span_, spawnChild_(_, "router fake_cluster egress", _))
.WillOnce(Return(child_span));
EXPECT_CALL(callbacks_, tracingConfig());
EXPECT_CALL(*child_span,
setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy)));
router_.decodeHeaders(headers, true);

Http::HeaderMapPtr response_headers(new Http::TestHeaderMapImpl{{":status", "200"}});
EXPECT_CALL(*child_span,
setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy)));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().HttpProtocol), Eq("HTTP/1.0")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamCluster), Eq("fake_cluster")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().HttpStatusCode), Eq("200")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().ResponseFlags), Eq("-")));
EXPECT_CALL(*child_span, finishSpan());
response_decoder->decodeHeaders(std::move(response_headers), true);
}

// Make sure child spans start/inject/finish with a reset flow.
// The upstream responds back to envoy before the reset, so the span has fields that represent a
// response and reset.
TEST_F(RouterTestChildSpan, ResetFlow) {
EXPECT_CALL(callbacks_.route_->route_entry_, timeout())
.WillOnce(Return(std::chrono::milliseconds(0)));
Expand All @@ -4454,17 +4461,138 @@ TEST_F(RouterTestChildSpan, ResetFlow) {
EXPECT_CALL(callbacks_.active_span_, spawnChild_(_, "router fake_cluster egress", _))
.WillOnce(Return(child_span));
EXPECT_CALL(callbacks_, tracingConfig());
EXPECT_CALL(*child_span,
setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy)));
router_.decodeHeaders(headers, true);

// Upstream responds back to envoy.
Http::HeaderMapPtr response_headers(new Http::TestHeaderMapImpl{{":status", "200"}});
response_decoder->decodeHeaders(std::move(response_headers), false);

// The reset occurs after the upstream response, so the span has a valid status code but also an
// error.
EXPECT_CALL(*child_span,
setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy)));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().HttpProtocol), Eq("HTTP/1.0")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamCluster), Eq("fake_cluster")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().HttpStatusCode), Eq("200")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().ResponseFlags), Eq("UR")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().Error), Eq(Tracing::Tags::get().True)));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().ErrorReason), Eq("remote reset")));
EXPECT_CALL(*child_span, finishSpan());
encoder.stream_.resetStream(Http::StreamResetReason::RemoteReset);
}

// Make sure child spans start/inject/finish with a cancellation flow.
// An upstream request is created but is then cancelled before. The resulting span has the
// cancellation fields.
TEST_F(RouterTestChildSpan, CancelFlow) {
EXPECT_CALL(callbacks_.route_->route_entry_, timeout())
.WillOnce(Return(std::chrono::milliseconds(0)));
EXPECT_CALL(callbacks_.dispatcher_, createTimer_(_)).Times(0);

NiceMock<Http::MockStreamEncoder> encoder;
Tracing::MockSpan* child_span{new Tracing::MockSpan()};
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder&, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
EXPECT_CALL(*child_span, injectContext(_));
callbacks.onPoolReady(encoder, cm_.conn_pool_.host_, upstream_stream_info_);
return nullptr;
}));

Http::TestHeaderMapImpl headers;
HttpTestUtility::addDefaultHeaders(headers);
EXPECT_CALL(callbacks_.active_span_, spawnChild_(_, "router fake_cluster egress", _))
.WillOnce(Return(child_span));
EXPECT_CALL(callbacks_, tracingConfig());
router_.decodeHeaders(headers, true);

// Destroy the router, causing the upstream request to be cancelled.
// Response code on span is 0 because the upstream never sent a response.
EXPECT_CALL(*child_span,
setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy)));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().HttpProtocol), Eq("HTTP/1.0")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().UpstreamCluster), Eq("fake_cluster")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().HttpStatusCode), Eq("0")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().ResponseFlags), Eq("-")));
EXPECT_CALL(*child_span, setTag(Eq(Tracing::Tags::get().Error), Eq(Tracing::Tags::get().True)));
EXPECT_CALL(*child_span,
setTag(Eq(Tracing::Tags::get().Canceled), Eq(Tracing::Tags::get().True)));
EXPECT_CALL(*child_span, finishSpan());
router_.onDestroy();
}

// Make sure child spans start/inject/finish with retry flow.
// The first request will fail because of an upstream reset, so the span will be annotated with the
// reset reason. The second request will succeed, so the span will be annotated with 200 OK.
TEST_F(RouterTestChildSpan, ResetRetryFlow) {
NiceMock<Http::MockStreamEncoder> encoder1;
Http::StreamDecoder* response_decoder = nullptr;
Tracing::MockSpan* child_span_1{new Tracing::MockSpan()};
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
EXPECT_CALL(*child_span_1, injectContext(_));
callbacks.onPoolReady(encoder1, cm_.conn_pool_.host_, upstream_stream_info_);
return nullptr;
}));
expectResponseTimerCreate();

// Upstream responds back to envoy simulating an upstream reset.
Http::TestHeaderMapImpl headers{{"x-envoy-retry-on", "5xx"}, {"x-envoy-internal", "true"}};
HttpTestUtility::addDefaultHeaders(headers);
EXPECT_CALL(callbacks_.active_span_, spawnChild_(_, "router fake_cluster egress", _))
.WillOnce(Return(child_span_1));
EXPECT_CALL(callbacks_, tracingConfig());
router_.decodeHeaders(headers, true);

// The span should be annotated with the reset-related fields.
EXPECT_CALL(*child_span_1,
setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy)));
EXPECT_CALL(*child_span_1, setTag(Eq(Tracing::Tags::get().HttpProtocol), Eq("HTTP/1.0")));
EXPECT_CALL(*child_span_1, setTag(Eq(Tracing::Tags::get().UpstreamCluster), Eq("fake_cluster")));
EXPECT_CALL(*child_span_1, setTag(Eq(Tracing::Tags::get().HttpStatusCode), Eq("0")));
EXPECT_CALL(*child_span_1, setTag(Eq(Tracing::Tags::get().ResponseFlags), Eq("UR")));
EXPECT_CALL(*child_span_1, setTag(Eq(Tracing::Tags::get().Error), Eq(Tracing::Tags::get().True)))
.Times(2);
EXPECT_CALL(*child_span_1, setTag(Eq(Tracing::Tags::get().ErrorReason), Eq("remote reset")));
EXPECT_CALL(*child_span_1, finishSpan());

router_.retry_state_->expectResetRetry();
encoder1.stream_.resetStream(Http::StreamResetReason::RemoteReset);

// We expect this reset to kick off a new request.
NiceMock<Http::MockStreamEncoder> encoder2;
Tracing::MockSpan* child_span_2{new Tracing::MockSpan()};
EXPECT_CALL(cm_.conn_pool_, newStream(_, _))
.WillOnce(Invoke([&](Http::StreamDecoder& decoder, Http::ConnectionPool::Callbacks& callbacks)
-> Http::ConnectionPool::Cancellable* {
response_decoder = &decoder;
EXPECT_CALL(*child_span_2, injectContext(_));
EXPECT_CALL(*router_.retry_state_, onHostAttempted(_));
callbacks.onPoolReady(encoder2, cm_.conn_pool_.host_, upstream_stream_info_);
return nullptr;
}));

EXPECT_CALL(callbacks_.active_span_, spawnChild_(_, "router fake_cluster egress", _))
.WillOnce(Return(child_span_2));
EXPECT_CALL(callbacks_, tracingConfig());
EXPECT_CALL(*child_span_2, setTag(Eq(Tracing::Tags::get().RetryCount), Eq("1")));

router_.retry_state_->callback_();

// Upstream responds back with a normal response. Span should be annotated as usual.
Http::HeaderMapPtr response_headers(new Http::TestHeaderMapImpl{{":status", "200"}});
EXPECT_CALL(*child_span_2,
setTag(Eq(Tracing::Tags::get().Component), Eq(Tracing::Tags::get().Proxy)));
EXPECT_CALL(*child_span_2, setTag(Eq(Tracing::Tags::get().HttpProtocol), Eq("HTTP/1.0")));
EXPECT_CALL(*child_span_2, setTag(Eq(Tracing::Tags::get().UpstreamCluster), Eq("fake_cluster")));
EXPECT_CALL(*child_span_2, setTag(Eq(Tracing::Tags::get().HttpStatusCode), Eq("200")));
EXPECT_CALL(*child_span_2, setTag(Eq(Tracing::Tags::get().ResponseFlags), Eq("-")));
EXPECT_CALL(*child_span_2, finishSpan());
response_decoder->decodeHeaders(std::move(response_headers), true);
}

Protobuf::RepeatedPtrField<std::string> protobufStrList(const std::vector<std::string>& v) {
Protobuf::RepeatedPtrField<std::string> res;
for (auto& field : v) {
Expand Down

0 comments on commit 2b63e91

Please sign in to comment.