Skip to content

Commit

Permalink
router: always set upstream host of connection pool first
Browse files Browse the repository at this point in the history
Signed-off-by: wbpcode <wbphub@live.com>
  • Loading branch information
wbpcode committed Apr 26, 2024
1 parent ff264b6 commit c0a47d0
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 26 deletions.
50 changes: 32 additions & 18 deletions source/common/router/upstream_request.cc
Expand Up @@ -82,7 +82,8 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
std::unique_ptr<GenericConnPool>&& conn_pool,
bool can_send_early_data, bool can_use_http3,
bool enable_half_close)
: parent_(parent), conn_pool_(std::move(conn_pool)),
: parent_(parent), conn_pool_(std::move(conn_pool)), upstream_host_(conn_pool_->host()),
upstream_info_(std::make_shared<StreamInfo::UpstreamInfoImpl>()),
stream_info_(parent_.callbacks()->dispatcher().timeSource(), nullptr,
StreamInfo::FilterState::LifeSpan::FilterChain),
start_time_(parent_.callbacks()->dispatcher().timeSource().monotonicTime()),
Expand Down Expand Up @@ -110,14 +111,14 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
}
}

// The router checks that the connection pool is non-null before creating the upstream request.
auto upstream_host = conn_pool_->host();
// The router checks that the connection pool is non-null and valid before creating an
// UpstreamRequest.
ASSERT(conn_pool_->valid() && upstream_host_ != nullptr, "Invalid connection pool");
upstream_info_->upstream_host_ = upstream_host_;

Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders());
Tracing::UpstreamContext upstream_context(upstream_host.get(), // host_
&upstream_host->cluster(), // cluster_
Tracing::ServiceType::Unknown, // service_type_
false // async_client_span_
);
Tracing::UpstreamContext upstream_context(upstream_host_.get(), &upstream_host_->cluster(),
Tracing::ServiceType::Unknown, false);

if (span_ != nullptr) {
span_->injectContext(trace_context, upstream_context);
Expand Down Expand Up @@ -372,10 +373,24 @@ void UpstreamRequest::maybeEndDecode(bool end_stream) {

void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
bool pool_success) {
StreamInfo::UpstreamInfo& upstream_info = *streamInfo().upstreamInfo();
upstream_info.setUpstreamHost(host);
upstream_host_ = host;
parent_.onUpstreamHostSelected(host, pool_success);

// Quick return if the host is nullptr. In this case the connection pool calling must be
// failed.
if (host == nullptr) {
ASSERT(!pool_success);
return;
}

// In most cases, the host description from host() of connection pool should be the same as
// the host description from the connection pool callback. Exception is the logical host
// which's addresses may be changed at runtime and the host description from the connection
// pool callback will be different from the host description from host() of connection pool.
if (host != upstream_host_) {
upstream_host_ = std::move(host);
upstream_info_->upstream_host_ = upstream_host_;
}

parent_.onUpstreamHostSelected(upstream_host_, pool_success);
}

void UpstreamRequest::acceptHeadersFromRouter(bool end_stream) {
Expand Down Expand Up @@ -574,7 +589,7 @@ void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
stream_info_.upstreamInfo()->setUpstreamTransportFailureReason(transport_failure_reason);

// Mimic an upstream reset.
onUpstreamHostSelected(host, false);
onUpstreamHostSelected(std::move(host), false); // Move host to the upstream_host_ if necessary.
onResetStream(reset_reason, transport_failure_reason);
}

Expand All @@ -595,9 +610,8 @@ void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
upstream_->enableHalfClose();
}

host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);

onUpstreamHostSelected(host, true);
onUpstreamHostSelected(std::move(host), true); // Move host to the upstream_host_ if necessary.
upstream_host_->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);

if (protocol) {
stream_info_.protocol(protocol.value());
Expand Down Expand Up @@ -668,8 +682,8 @@ void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
}

const auto* route_entry = route().routeEntry();
if (route_entry->autoHostRewrite() && !host->hostname().empty()) {
Http::Utility::updateAuthority(*parent_.downstreamHeaders(), host->hostname(),
if (route_entry->autoHostRewrite() && !upstream_host_->hostname().empty()) {
Http::Utility::updateAuthority(*parent_.downstreamHeaders(), upstream_host_->hostname(),
route_entry->appendXfh());
}

Expand Down
1 change: 1 addition & 0 deletions source/common/router/upstream_request.h
Expand Up @@ -195,6 +195,7 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
std::unique_ptr<GenericUpstream> upstream_;
absl::optional<Http::StreamResetReason> deferred_reset_reason_;
Upstream::HostDescriptionConstSharedPtr upstream_host_;
std::shared_ptr<StreamInfo::UpstreamInfoImpl> upstream_info_;
DownstreamWatermarkManager downstream_watermark_manager_{*this};
Tracing::SpanPtr span_;
StreamInfo::StreamInfoImpl stream_info_;
Expand Down
6 changes: 3 additions & 3 deletions test/common/grpc/grpc_client_integration_test_harness.h
Expand Up @@ -334,7 +334,7 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {
EXPECT_CALL(*mock_host_, createConnection_(_, _)).WillRepeatedly(Return(connection_data));
EXPECT_CALL(*mock_host_, cluster())
.WillRepeatedly(ReturnRef(*cm_.thread_local_cluster_.cluster_.info_));
EXPECT_CALL(*mock_host_description_, locality()).WillRepeatedly(ReturnRef(host_locality_));
EXPECT_CALL(*mock_host_, locality()).WillRepeatedly(ReturnRef(host_locality_));
http_conn_pool_ = Http::Http2::allocateConnPool(*dispatcher_, api_->randomGenerator(),
host_ptr_, Upstream::ResourcePriority::Default,
nullptr, nullptr, state_);
Expand Down Expand Up @@ -570,11 +570,11 @@ class GrpcSslClientIntegrationTest : public GrpcClientIntegrationTest {
auto cfg = std::make_unique<Extensions::TransportSockets::Tls::ClientContextConfigImpl>(
tls_context, factory_context_);

mock_host_description_->socket_factory_ =
mock_host_->socket_factory_ =
std::make_unique<Extensions::TransportSockets::Tls::ClientSslSocketFactory>(
std::move(cfg), context_manager_, *stats_store_.rootScope());
async_client_transport_socket_ =
mock_host_description_->socket_factory_->createTransportSocket(nullptr, nullptr);
mock_host_->socket_factory_->createTransportSocket(nullptr, nullptr);
FakeUpstreamConfig config(test_time_.timeSystem());
config.upstream_protocol_ = Http::CodecType::HTTP2;
fake_upstream_ =
Expand Down
9 changes: 5 additions & 4 deletions test/common/router/router_test.cc
Expand Up @@ -3530,7 +3530,8 @@ TEST_F(RouterTest, RetryUpstreamConnectionFailure) {
HttpTestUtility::addDefaultHeaders(headers);
router_->decodeHeaders(headers, true);

EXPECT_CALL(*router_->retry_state_, onHostAttempted(_)).Times(0);
EXPECT_CALL(*router_->retry_state_,
onHostAttempted(testing::Eq(cm_.thread_local_cluster_.conn_pool_.host_)));

router_->retry_state_->expectResetRetry();

Expand Down Expand Up @@ -3567,7 +3568,7 @@ TEST_F(RouterTest, RetryUpstreamConnectionFailure) {
EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_.host_->outlier_detector_,
putHttpResponseCode(200));
response_decoder->decodeHeaders(std::move(response_headers), true);
EXPECT_TRUE(verifyHostUpstreamStats(1, 0));
EXPECT_TRUE(verifyHostUpstreamStats(1, 1));
}

TEST_F(RouterTest, DontResetStartedResponseOnUpstreamPerTryTimeout) {
Expand Down Expand Up @@ -3900,7 +3901,7 @@ TEST_F(RouterTest, RetryTimeoutDuringRetryDelayWithUpstreamRequestNoHost) {
EXPECT_CALL(callbacks_, encodeHeaders_(HeaderMapEqualRef(&response_headers), false));
EXPECT_CALL(callbacks_, encodeData(_, true));
response_timeout_->invokeCallback();
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));
EXPECT_TRUE(verifyHostUpstreamStats(0, 2));
// Timeout fired so no retry was done.
EXPECT_EQ(1U,
callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value());
Expand Down Expand Up @@ -3951,7 +3952,7 @@ TEST_F(RouterTest, RetryTimeoutDuringRetryDelayWithUpstreamRequestNoHostAltRespo
Http::TestResponseHeaderMapImpl response_headers{{":status", "204"}};
EXPECT_CALL(callbacks_, encodeHeaders_(HeaderMapEqualRef(&response_headers), true));
response_timeout_->invokeCallback();
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));
EXPECT_TRUE(verifyHostUpstreamStats(0, 2));
// no retry was done.
EXPECT_EQ(1U,
callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value());
Expand Down
2 changes: 1 addition & 1 deletion test/integration/load_stats_integration_test.cc
Expand Up @@ -669,7 +669,7 @@ TEST_P(LoadStatsIntegrationTest, Dropped) {
EXPECT_EQ("503", response_->headers().getStatusValue());
cleanupUpstreamAndDownstream();

ASSERT_TRUE(waitForLoadStatsRequest({}, 1));
ASSERT_TRUE(waitForLoadStatsRequest({localityStats("winter", 0, 1, 0, 0)}, 1));

EXPECT_EQ(1, test_server_->counter("load_reporter.requests")->value());
EXPECT_LE(2, test_server_->counter("load_reporter.responses")->value());
Expand Down

0 comments on commit c0a47d0

Please sign in to comment.