Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router: set upstream host from the conn pool #33790

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 31 additions & 22 deletions source/common/router/upstream_request.cc
Expand Up @@ -82,7 +82,8 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
std::unique_ptr<GenericConnPool>&& conn_pool,
bool can_send_early_data, bool can_use_http3,
bool enable_half_close)
: parent_(parent), conn_pool_(std::move(conn_pool)),
: parent_(parent), conn_pool_(std::move(conn_pool)), upstream_host_(conn_pool_->host()),
upstream_info_(std::make_shared<StreamInfo::UpstreamInfoImpl>()),
stream_info_(parent_.callbacks()->dispatcher().timeSource(), nullptr,
StreamInfo::FilterState::LifeSpan::FilterChain),
start_time_(parent_.callbacks()->dispatcher().timeSource().monotonicTime()),
Expand Down Expand Up @@ -110,14 +111,14 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
}
}

// The router checks that the connection pool is non-null before creating the upstream request.
auto upstream_host = conn_pool_->host();
// The router checks that the connection pool is non-null and valid before creating an
// UpstreamRequest.
ASSERT(upstream_host_ != nullptr, "Invalid connection pool");
upstream_info_->upstream_host_ = upstream_host_;

Tracing::HttpTraceContext trace_context(*parent_.downstreamHeaders());
Tracing::UpstreamContext upstream_context(upstream_host.get(), // host_
&upstream_host->cluster(), // cluster_
Tracing::ServiceType::Unknown, // service_type_
false // async_client_span_
);
Tracing::UpstreamContext upstream_context(upstream_host_.get(), &upstream_host_->cluster(),
Tracing::ServiceType::Unknown, false);

if (span_ != nullptr) {
span_->injectContext(trace_context, upstream_context);
Expand All @@ -128,9 +129,9 @@ UpstreamRequest::UpstreamRequest(RouterFilterInterface& parent,
parent_.callbacks()->activeSpan().injectContext(trace_context, upstream_context);
}

stream_info_.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
stream_info_.setUpstreamInfo(upstream_info_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create this early but not set it early? Worth code comments I think

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR actually just set the upstream host early. The upstream info is still be created at the constructor (we just moved it more the constructor function to the constructor initialize list)?

stream_info_.route_ = parent_.callbacks()->route();
parent_.callbacks()->streamInfo().setUpstreamInfo(stream_info_.upstreamInfo());
parent_.callbacks()->streamInfo().setUpstreamInfo(upstream_info_);

stream_info_.healthCheck(parent_.callbacks()->streamInfo().healthCheck());
stream_info_.setIsShadow(parent_.callbacks()->streamInfo().isShadow());
Expand Down Expand Up @@ -370,12 +371,21 @@ void UpstreamRequest::maybeEndDecode(bool end_stream) {
}
}

void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
bool pool_success) {
StreamInfo::UpstreamInfo& upstream_info = *streamInfo().upstreamInfo();
upstream_info.setUpstreamHost(host);
upstream_host_ = host;
parent_.onUpstreamHostSelected(host, pool_success);
void UpstreamRequest::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr real_host,
bool success) {
// In most cases, the host description from host() of connection pool should be the same as
// the host description from the connection pool callback. Exception is the logical host
// which's addresses may be changed at runtime and the host description from the connection
// pool callback will be different from the host description from host() of connection pool.
if (real_host != nullptr && real_host != upstream_host_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're latching host early but it may change, can this be a problem for folks latched to the old version? Can you comment on why you want to latch early?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on why you want to latch early?

It is because the problem in our actually production scenario.
Considering a scenario like this, if the request is timeout before any pool callbacks are called, then the upstream host will never be set and we can even cannot know which host is selected for this request from the access log.

if we're latching host early but it may change, can this be a problem for folks latched to the old version?

The real host may have different address with the early one. All other fields or content are same. If some one get the host after the upstream request is created and before the pool callbacks is called, the host may have different addresses with the final one.
The upstream host in the upstream info mainly is used for logging, tracing, etc. So, I think the early one at least is better than null.

But I am open to this issue. It's would be great for me if we can figure out a better way to resolve above problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, may be can check if the upstream info has a valid upstream host when the upstream request is reset (remote or local), and if the upstream host is nullptr, we can get one from connection pool and set it to upstream info. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or is there a way of not snagging logical host? or subscribing to updates so it's always kept up to date, or actually grabbing the outer abstraction instead of a handle which can change? I think I'm OK with it changing if it must, but if we go that route we should probably also comment where the member is defined.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the logical host, when the connection is ready, the host will create a RealHostDescrition that wrap the logical host and the actual address that used by the connection.

This is a way to ensure the finally HostDescrition will always has an accurate address when logic host is used. Other types host needn't this because we assume it have a only one address.

However, with the support of the multiple-addresses of host, the correct way to get the accurate address of upstream host is the Network::ConnectionInfoProvider parameter of pool callbacks, for all types host.

So, I doubt the value of the RealHostDescrition now. Maybe we could remove the RealHostDesription directly and then the host from pool and the host from pool callbacks would be same. (And this would simplify the logic/complexity here, I taken some time to read the code, then found the difference between these hosts)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may need @mattklein123 - I'm not terribly familiar with the subtleties of realhostdescripton and local hosts

upstream_host_ = std::move(real_host);
upstream_info_->upstream_host_ = upstream_host_;
}

// Upstream host never be nullptr. The value may from the host() of connection pool or the
// valid host description from the connection pool callback.
ASSERT(upstream_host_ != nullptr);
parent_.onUpstreamHostSelected(upstream_host_, success);
}

void UpstreamRequest::acceptHeadersFromRouter(bool end_stream) {
Expand Down Expand Up @@ -574,7 +584,7 @@ void UpstreamRequest::onPoolFailure(ConnectionPool::PoolFailureReason reason,
stream_info_.upstreamInfo()->setUpstreamTransportFailureReason(transport_failure_reason);

// Mimic an upstream reset.
onUpstreamHostSelected(host, false);
onUpstreamHostSelected(std::move(host), false); // Move host to the upstream_host_ if necessary.
onResetStream(reset_reason, transport_failure_reason);
}

Expand All @@ -595,9 +605,8 @@ void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
upstream_->enableHalfClose();
}

host->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);

onUpstreamHostSelected(host, true);
onUpstreamHostSelected(std::move(host), true); // Move host to the upstream_host_ if necessary.
upstream_host_->outlierDetector().putResult(Upstream::Outlier::Result::LocalOriginConnectSuccess);

if (protocol) {
stream_info_.protocol(protocol.value());
Expand Down Expand Up @@ -668,8 +677,8 @@ void UpstreamRequest::onPoolReady(std::unique_ptr<GenericUpstream>&& upstream,
}

const auto* route_entry = route().routeEntry();
if (route_entry->autoHostRewrite() && !host->hostname().empty()) {
Http::Utility::updateAuthority(*parent_.downstreamHeaders(), host->hostname(),
if (route_entry->autoHostRewrite() && !upstream_host_->hostname().empty()) {
Http::Utility::updateAuthority(*parent_.downstreamHeaders(), upstream_host_->hostname(),
route_entry->appendXfh());
}

Expand Down
3 changes: 2 additions & 1 deletion source/common/router/upstream_request.h
Expand Up @@ -85,7 +85,7 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
virtual void resetStream();
void setupPerTryTimeout();
void maybeEndDecode(bool end_stream);
void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host, bool pool_success);
void onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr real_host, bool success);

// Http::StreamDecoder
void decodeData(Buffer::Instance& data, bool end_stream) override;
Expand Down Expand Up @@ -195,6 +195,7 @@ class UpstreamRequest : public Logger::Loggable<Logger::Id::router>,
std::unique_ptr<GenericUpstream> upstream_;
absl::optional<Http::StreamResetReason> deferred_reset_reason_;
Upstream::HostDescriptionConstSharedPtr upstream_host_;
std::shared_ptr<StreamInfo::UpstreamInfoImpl> upstream_info_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're going to eagerly create this, I think we can nix the local upsteam_host_ to not have it stored in many places.

DownstreamWatermarkManager downstream_watermark_manager_{*this};
Tracing::SpanPtr span_;
StreamInfo::StreamInfoImpl stream_info_;
Expand Down
6 changes: 3 additions & 3 deletions test/common/grpc/grpc_client_integration_test_harness.h
Expand Up @@ -334,7 +334,7 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest {
EXPECT_CALL(*mock_host_, createConnection_(_, _)).WillRepeatedly(Return(connection_data));
EXPECT_CALL(*mock_host_, cluster())
.WillRepeatedly(ReturnRef(*cm_.thread_local_cluster_.cluster_.info_));
EXPECT_CALL(*mock_host_description_, locality()).WillRepeatedly(ReturnRef(host_locality_));
EXPECT_CALL(*mock_host_, locality()).WillRepeatedly(ReturnRef(host_locality_));
http_conn_pool_ = Http::Http2::allocateConnPool(*dispatcher_, api_->randomGenerator(),
host_ptr_, Upstream::ResourcePriority::Default,
nullptr, nullptr, state_);
Expand Down Expand Up @@ -570,11 +570,11 @@ class GrpcSslClientIntegrationTest : public GrpcClientIntegrationTest {
auto cfg = std::make_unique<Extensions::TransportSockets::Tls::ClientContextConfigImpl>(
tls_context, factory_context_);

mock_host_description_->socket_factory_ =
mock_host_->socket_factory_ =
std::make_unique<Extensions::TransportSockets::Tls::ClientSslSocketFactory>(
std::move(cfg), context_manager_, *stats_store_.rootScope());
async_client_transport_socket_ =
mock_host_description_->socket_factory_->createTransportSocket(nullptr, nullptr);
mock_host_->socket_factory_->createTransportSocket(nullptr, nullptr);
FakeUpstreamConfig config(test_time_.timeSystem());
config.upstream_protocol_ = Http::CodecType::HTTP2;
fake_upstream_ =
Expand Down
9 changes: 5 additions & 4 deletions test/common/router/router_test.cc
Expand Up @@ -3530,7 +3530,8 @@ TEST_F(RouterTest, RetryUpstreamConnectionFailure) {
HttpTestUtility::addDefaultHeaders(headers);
router_->decodeHeaders(headers, true);

EXPECT_CALL(*router_->retry_state_, onHostAttempted(_)).Times(0);
EXPECT_CALL(*router_->retry_state_,
onHostAttempted(testing::Eq(cm_.thread_local_cluster_.conn_pool_.host_)));

router_->retry_state_->expectResetRetry();

Expand Down Expand Up @@ -3567,7 +3568,7 @@ TEST_F(RouterTest, RetryUpstreamConnectionFailure) {
EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_.host_->outlier_detector_,
putHttpResponseCode(200));
response_decoder->decodeHeaders(std::move(response_headers), true);
EXPECT_TRUE(verifyHostUpstreamStats(1, 0));
EXPECT_TRUE(verifyHostUpstreamStats(1, 1));
}

TEST_F(RouterTest, DontResetStartedResponseOnUpstreamPerTryTimeout) {
Expand Down Expand Up @@ -3900,7 +3901,7 @@ TEST_F(RouterTest, RetryTimeoutDuringRetryDelayWithUpstreamRequestNoHost) {
EXPECT_CALL(callbacks_, encodeHeaders_(HeaderMapEqualRef(&response_headers), false));
EXPECT_CALL(callbacks_, encodeData(_, true));
response_timeout_->invokeCallback();
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));
EXPECT_TRUE(verifyHostUpstreamStats(0, 2));
// Timeout fired so no retry was done.
EXPECT_EQ(1U,
callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value());
Expand Down Expand Up @@ -3951,7 +3952,7 @@ TEST_F(RouterTest, RetryTimeoutDuringRetryDelayWithUpstreamRequestNoHostAltRespo
Http::TestResponseHeaderMapImpl response_headers{{":status", "204"}};
EXPECT_CALL(callbacks_, encodeHeaders_(HeaderMapEqualRef(&response_headers), true));
response_timeout_->invokeCallback();
EXPECT_TRUE(verifyHostUpstreamStats(0, 1));
EXPECT_TRUE(verifyHostUpstreamStats(0, 2));
// no retry was done.
EXPECT_EQ(1U,
callbacks_.route_->route_entry_.virtual_cluster_.stats().upstream_rq_total_.value());
Expand Down
2 changes: 1 addition & 1 deletion test/integration/load_stats_integration_test.cc
Expand Up @@ -669,7 +669,7 @@ TEST_P(LoadStatsIntegrationTest, Dropped) {
EXPECT_EQ("503", response_->headers().getStatusValue());
cleanupUpstreamAndDownstream();

ASSERT_TRUE(waitForLoadStatsRequest({}, 1));
ASSERT_TRUE(waitForLoadStatsRequest({localityStats("winter", 0, 1, 0, 0)}, 1));

EXPECT_EQ(1, test_server_->counter("load_reporter.requests")->value());
EXPECT_LE(2, test_server_->counter("load_reporter.responses")->value());
Expand Down