Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generic proxy: complete basic support of timeout and retry #33836

Merged
merged 3 commits into from Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,7 +23,7 @@ option (xds.annotations.v3.file_status).work_in_progress = true;
// [#protodoc-title: Generic Proxy Route Action Configuration]

// Configuration for the route match action.
// [#next-free-field: 7]
// [#next-free-field: 8]
message RouteAction {
// The name of the route action. This should be unique across all route actions.
string name = 5;
Expand Down Expand Up @@ -53,6 +53,13 @@ message RouteAction {
// spans between the point at which the entire downstream request (i.e. end-of-stream) has been
// processed and when the upstream response has been completely processed. A value of 0 will
// disable the route's timeout.
// [#not-implemented-hide:]
google.protobuf.Duration timeout = 6;

// Specifies the retry policy for the route. If not specified, then no retries will be performed.
//
// .. note::
// Only simplest retry policy is supported and only ``num_retries`` field is used for generic
// proxy. The default value for ``num_retries`` is 1 means that the request will be tried once
// and no additional retries will be performed.
config.core.v3.RetryPolicy retry_policy = 7;
}
Expand Up @@ -31,8 +31,9 @@ class ServerCodecCallbacks {

/**
* If request decoding failure then this method will be called.
* @param reason the reason of decoding failure.
*/
virtual void onDecodingFailure() PURE;
virtual void onDecodingFailure(absl::string_view reason = {}) PURE;

/**
* Write specified data to the downstream connection. This is could be used to write
Expand Down Expand Up @@ -71,8 +72,9 @@ class ClientCodecCallbacks {

/**
* If response decoding failure then this method will be called.
* @param reason the reason of decoding failure.
*/
virtual void onDecodingFailure() PURE;
virtual void onDecodingFailure(absl::string_view reason = {}) PURE;

/**
* Write specified data to the upstream connection. This is could be used to write
Expand Down Expand Up @@ -114,6 +116,12 @@ class EncodingCallbacks {
*/
virtual void onEncodingSuccess(Buffer::Instance& buffer, bool end_stream) PURE;

/**
* If encoding failure then this method will be called.
* @param reason the reason of encoding failure.
*/
virtual void onEncodingFailure(absl::string_view reason = {}) PURE;

/**
* The route that the request is matched to. This is optional when encoding the response
* (by server codec) because the request may not be matched to any route and the
Expand Down
18 changes: 18 additions & 0 deletions contrib/generic_proxy/filters/network/source/interface/route.h
Expand Up @@ -26,6 +26,18 @@ using RouteSpecificFilterConfigConstSharedPtr = std::shared_ptr<const RouteSpeci
*/
using RouteTypedMetadataFactory = Envoy::Router::HttpRouteTypedMetadataFactory;

/**
* The simplest retry implementation. It only contains the number of retries.
*/
class RetryPolicy {
public:
RetryPolicy(uint32_t num_retries) : num_retries_(num_retries) {}
uint32_t numRetries() const { return num_retries_; }

private:
const uint32_t num_retries_{};
};

class RouteEntry {
public:
virtual ~RouteEntry() = default;
Expand Down Expand Up @@ -64,7 +76,13 @@ class RouteEntry {
* @return route timeout for this route.
*/
virtual const std::chrono::milliseconds timeout() const PURE;

/**
* @return const RetryPolicy& the retry policy for this route.
*/
virtual const RetryPolicy& retryPolicy() const PURE;
};

using RouteEntryConstSharedPtr = std::shared_ptr<const RouteEntry>;

class RouteMatcher : public Rds::Config {
Expand Down
16 changes: 4 additions & 12 deletions contrib/generic_proxy/filters/network/source/interface/stream.h
Expand Up @@ -285,18 +285,10 @@ using ResponseSharedPtr = std::shared_ptr<Response>;

template <class T> class StreamFramePtrHelper {
public:
StreamFramePtrHelper(StreamFramePtr frame) {
auto frame_ptr = frame.release();

auto typed_frame_ptr = dynamic_cast<T*>(frame_ptr);

if (typed_frame_ptr == nullptr) {
// If the frame is not the expected type, wrap it
// in the original StreamFramePtr.
frame_ = StreamFramePtr{frame_ptr};
} else {
// If the frame is the expected type, wrap it
// in the typed frame unique pointer.
StreamFramePtrHelper(StreamFramePtr frame) : frame_(std::move(frame)) {
if (auto typed_frame_ptr = dynamic_cast<T*>(frame_.get()); typed_frame_ptr != nullptr) {
// If the frame is the expected type, wrap it in the typed frame unique pointer.
frame_.release();
typed_frame_ = std::unique_ptr<T>{typed_frame_ptr};
}
}
Expand Down
9 changes: 7 additions & 2 deletions contrib/generic_proxy/filters/network/source/proxy.cc
Expand Up @@ -314,6 +314,11 @@ void ActiveStream::onEncodingSuccess(Buffer::Instance& buffer, bool end_stream)
parent_.deferredStream(*this);
}

void ActiveStream::onEncodingFailure(absl::string_view reason) {
ENVOY_LOG(error, "Generic proxy: response encoding failure: {}", reason);
resetStream(DownstreamStreamResetReason::ProtocolError);
}

void ActiveStream::initializeFilterChain(FilterChainFactory& factory) {
factory.createFilterChain(*this);
// Reverse the encoder filter chain so that the first encoder filter is the last filter in the
Expand Down Expand Up @@ -390,9 +395,9 @@ void Filter::onDecodingSuccess(StreamFramePtr request) {
onDecodingFailure();
}

void Filter::onDecodingFailure() {
void Filter::onDecodingFailure(absl::string_view reason) {
ENVOY_LOG(error, "generic proxy: request decoding failure: {}", reason);
stats_helper_.onRequestDecodingError();

resetDownstreamAllStreams(DownstreamStreamResetReason::ProtocolError);
closeDownstreamConnection();
}
Expand Down
3 changes: 2 additions & 1 deletion contrib/generic_proxy/filters/network/source/proxy.h
Expand Up @@ -263,6 +263,7 @@ class ActiveStream : public FilterChainManager,

// ResponseEncoderCallback
void onEncodingSuccess(Buffer::Instance& buffer, bool end_stream) override;
void onEncodingFailure(absl::string_view reason = {}) override;
OptRef<const RouteEntry> routeEntry() const override {
return makeOptRefFromPtr<const RouteEntry>(cached_route_entry_.get());
}
Expand Down Expand Up @@ -372,7 +373,7 @@ class Filter : public Envoy::Network::ReadFilter,

// RequestDecoderCallback
void onDecodingSuccess(StreamFramePtr request) override;
void onDecodingFailure() override;
void onDecodingFailure(absl::string_view reason = {}) override;
void writeToConnection(Buffer::Instance& buffer) override;
OptRef<Network::Connection> connection() override;

Expand Down
3 changes: 2 additions & 1 deletion contrib/generic_proxy/filters/network/source/route.cc
Expand Up @@ -48,7 +48,8 @@ RouteEntryImpl::RouteEntryImpl(const ProtoRouteAction& route_action,
Envoy::Server::Configuration::ServerFactoryContext& context)
: name_(route_action.name()), cluster_name_(route_action.cluster()),
metadata_(route_action.metadata()), typed_metadata_(metadata_),
timeout_(PROTOBUF_GET_MS_OR_DEFAULT(route_action, timeout, DEFAULT_ROUTE_TIMEOUT_MS)) {
timeout_(PROTOBUF_GET_MS_OR_DEFAULT(route_action, timeout, DEFAULT_ROUTE_TIMEOUT_MS)),
retry_policy_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(route_action.retry_policy(), num_retries, 1)) {

for (const auto& proto_filter_config : route_action.per_filter_config()) {
auto route_config =
Expand Down
3 changes: 3 additions & 0 deletions contrib/generic_proxy/filters/network/source/route.h
Expand Up @@ -30,6 +30,7 @@ using ProtoRouteAction =
using ProtoRouteConfiguration =
envoy::extensions::filters::network::generic_proxy::v3::RouteConfiguration;
using ProtoVirtualHost = envoy::extensions::filters::network::generic_proxy::v3::VirtualHost;
using ProtoRetryPolicy = envoy::config::core::v3::RetryPolicy;

class RouteEntryImpl : public RouteEntry {
public:
Expand All @@ -48,6 +49,7 @@ class RouteEntryImpl : public RouteEntry {
const Envoy::Config::TypedMetadata& typedMetadata() const override { return typed_metadata_; };

const std::chrono::milliseconds timeout() const override { return timeout_; };
const RetryPolicy& retryPolicy() const override { return retry_policy_; }

RouteSpecificFilterConfigConstSharedPtr
createRouteSpecificFilterConfig(const std::string& name, const ProtobufWkt::Any& typed_config,
Expand All @@ -64,6 +66,7 @@ class RouteEntryImpl : public RouteEntry {
const Envoy::Config::TypedMetadataImpl<RouteTypedMetadataFactory> typed_metadata_;

const std::chrono::milliseconds timeout_;
const RetryPolicy retry_policy_;

absl::flat_hash_map<std::string, RouteSpecificFilterConfigConstSharedPtr> per_filter_configs_;
};
Expand Down