Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async_client: add support for filter state #33772

Merged
merged 1 commit into from Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions envoy/http/async_client.h
Expand Up @@ -9,6 +9,7 @@
#include "envoy/http/filter.h"
#include "envoy/http/header_map.h"
#include "envoy/http/message.h"
#include "envoy/stream_info/filter_state.h"
#include "envoy/stream_info/stream_info.h"
#include "envoy/tracing/tracer.h"

Expand Down Expand Up @@ -266,6 +267,12 @@ class AsyncClient {
return *this;
}

// Set FilterState on async stream allowing upstream filters to access it.
StreamOptions& setFilterState(Envoy::StreamInfo::FilterStateSharedPtr fs) {
filter_state = fs;
return *this;
}

// Set buffer restriction and accounting for the stream.
StreamOptions& setBufferAccount(const Buffer::BufferMemoryAccountSharedPtr& account) {
account_ = account;
Expand Down Expand Up @@ -330,6 +337,7 @@ class AsyncClient {
ParentContext parent_context;

envoy::config::core::v3::Metadata metadata;
Envoy::StreamInfo::FilterStateSharedPtr filter_state;

// Buffer memory account for tracking bytes.
Buffer::BufferMemoryAccountSharedPtr account_{nullptr};
Expand Down Expand Up @@ -377,6 +385,10 @@ class AsyncClient {
StreamOptions::setMetadata(m);
return *this;
}
RequestOptions& setFilterState(Envoy::StreamInfo::FilterStateSharedPtr fs) {
StreamOptions::setFilterState(fs);
return *this;
}
RequestOptions& setRetryPolicy(const envoy::config::route::v3::RetryPolicy& p) {
StreamOptions::setRetryPolicy(p);
return *this;
Expand Down
6 changes: 5 additions & 1 deletion source/common/http/async_client_impl.cc
Expand Up @@ -10,6 +10,7 @@
#include "source/common/http/null_route_impl.h"
#include "source/common/http/utility.h"
#include "source/common/protobuf/message_validator_impl.h"
#include "source/common/stream_info/filter_state_impl.h"
#include "source/common/tracing/http_tracer_impl.h"
#include "source/common/upstream/retry_factory.h"

Expand Down Expand Up @@ -98,7 +99,10 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCal
router_(options.filter_config_ ? *options.filter_config_ : parent.config_,
parent.config_.async_stats_),
stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr,
StreamInfo::FilterState::LifeSpan::FilterChain),
options.filter_state != nullptr
? options.filter_state
: std::make_shared<StreamInfo::FilterStateImpl>(
StreamInfo::FilterState::LifeSpan::FilterChain)),
tracing_config_(Tracing::EgressConfig::get()),
retry_policy_(createRetryPolicy(parent, options, parent_.factory_context_)),
route_(std::make_shared<NullRouteImpl>(
Expand Down
24 changes: 12 additions & 12 deletions source/common/stream_info/stream_info_impl.h
Expand Up @@ -133,6 +133,18 @@ struct StreamInfoImpl : public StreamInfo {
std::move(parent_filter_state), parent_life_span),
life_span)) {}

StreamInfoImpl(
absl::optional<Http::Protocol> protocol, TimeSource& time_source,
const Network::ConnectionInfoProviderSharedPtr& downstream_connection_info_provider,
FilterStateSharedPtr filter_state)
: time_source_(time_source), start_time_(time_source.systemTime()),
start_time_monotonic_(time_source.monotonicTime()), protocol_(protocol),
filter_state_(std::move(filter_state)),
downstream_connection_info_provider_(downstream_connection_info_provider != nullptr
? downstream_connection_info_provider
: emptyDownstreamAddressProvider()),
trace_reason_(Tracing::Reason::NotTraceable) {}

SystemTime startTime() const override { return start_time_; }

MonotonicTime startTimeMonotonic() const override { return start_time_monotonic_; }
Expand Down Expand Up @@ -460,18 +472,6 @@ struct StreamInfoImpl : public StreamInfo {
std::make_shared<Network::ConnectionInfoSetterImpl>(nullptr, nullptr));
}

StreamInfoImpl(
absl::optional<Http::Protocol> protocol, TimeSource& time_source,
const Network::ConnectionInfoProviderSharedPtr& downstream_connection_info_provider,
FilterStateSharedPtr filter_state)
: time_source_(time_source), start_time_(time_source.systemTime()),
start_time_monotonic_(time_source.monotonicTime()), protocol_(protocol),
filter_state_(std::move(filter_state)),
downstream_connection_info_provider_(downstream_connection_info_provider != nullptr
? downstream_connection_info_provider
: emptyDownstreamAddressProvider()),
trace_reason_(Tracing::Reason::NotTraceable) {}

std::shared_ptr<UpstreamInfo> upstream_info_;
uint64_t bytes_received_{};
uint64_t bytes_retransmitted_{};
Expand Down
60 changes: 60 additions & 0 deletions test/common/http/async_client_impl_test.cc
Expand Up @@ -5,6 +5,7 @@

#include "envoy/config/core/v3/base.pb.h"
#include "envoy/config/route/v3/route_components.pb.h"
#include "envoy/stream_info/filter_state.h"

#include "source/common/buffer/buffer_impl.h"
#include "source/common/http/async_client_impl.h"
Expand All @@ -13,6 +14,7 @@
#include "source/common/http/utility.h"
#include "source/common/router/context_impl.h"
#include "source/common/router/upstream_codec_filter.h"
#include "source/common/stream_info/filter_state_impl.h"

#include "test/common/http/common.h"
#include "test/mocks/buffer/mocks.h"
Expand Down Expand Up @@ -701,6 +703,64 @@ TEST_F(AsyncClientImplTest, WithMetadata) {
response_decoder_->decodeData(data, true);
}

class TestStateObject : public StreamInfo::FilterState::Object {
public:
TestStateObject(std::string value) : value_(value) {}

const std::string& value() const { return value_; }

private:
std::string value_;
};

TEST_F(AsyncClientImplTest, WithFilterState) {
message_->body().add("test-body");
Buffer::Instance& data = message_->body();

EXPECT_CALL(cm_.thread_local_cluster_.conn_pool_, newStream(_, _, _))
.WillOnce(Invoke(
[&](ResponseDecoder& decoder, ConnectionPool::Callbacks& callbacks,
const ConnectionPool::Instance::StreamOptions&) -> ConnectionPool::Cancellable* {
callbacks.onPoolReady(stream_encoder_, cm_.thread_local_cluster_.conn_pool_.host_,
stream_info_, {});
response_decoder_ = &decoder;
return nullptr;
}));

EXPECT_CALL(cm_.thread_local_cluster_, httpConnPool(_, _, _))
.WillOnce(Invoke([&](Upstream::ResourcePriority, absl::optional<Http::Protocol>,
Upstream::LoadBalancerContext* context) {
const StreamInfo::FilterState& filter_state = context->requestStreamInfo()->filterState();
const TestStateObject* state = filter_state.getDataReadOnly<TestStateObject>("test-filter");
EXPECT_NE(state, nullptr);
EXPECT_EQ(state->value(), "stored-test-state");
return Upstream::HttpPoolData([]() {}, &cm_.thread_local_cluster_.conn_pool_);
}));

TestRequestHeaderMapImpl copy(message_->headers());
copy.addCopy("x-envoy-internal", "true");
copy.addCopy("x-forwarded-for", "127.0.0.1");
copy.addCopy(":scheme", "http");

EXPECT_CALL(stream_encoder_, encodeHeaders(HeaderMapEqualRef(&copy), false));

AsyncClient::RequestOptions options;
auto state_object = std::make_shared<TestStateObject>("stored-test-state");
auto filter_state =
std::make_shared<StreamInfo::FilterStateImpl>(StreamInfo::FilterState::LifeSpan::FilterChain);
filter_state->setData("test-filter", state_object, StreamInfo::FilterState::StateType::Mutable);
options.setFilterState(filter_state);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the idea that a client must explicitly set the filter state in-code? Or would it be something automatic for sidecalls like ext_authz?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that the authors of individual filters would need to explicitly set the FilterState on an AsyncClient call if they want it in a similar way to how the DynamicMetadata is currently handled. I think this makes sense since whether or not it makes sense to apply the state/metadata from the main request to a side request may vary depending on the use case.


auto* request = client_.send(std::move(message_), callbacks_, options);
EXPECT_NE(request, nullptr);

expectSuccess(request, 200);

ResponseHeaderMapPtr response_headers(new TestResponseHeaderMapImpl{{":status", "200"}});
response_decoder_->decodeHeaders(std::move(response_headers), false);
response_decoder_->decodeData(data, true);
}

TEST_F(AsyncClientImplTest, Retry) {
ON_CALL(factory_context_.runtime_loader_.snapshot_, featureEnabled("upstream.use_retry", 100))
.WillByDefault(Return(true));
Expand Down