Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

generic proxy: complete basic support of timeout and retry #33836

Merged
merged 3 commits into from Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,7 +23,7 @@ option (xds.annotations.v3.file_status).work_in_progress = true;
// [#protodoc-title: Generic Proxy Route Action Configuration]

// Configuration for the route match action.
// [#next-free-field: 7]
// [#next-free-field: 8]
message RouteAction {
// The name of the route action. This should be unique across all route actions.
string name = 5;
Expand Down Expand Up @@ -53,6 +53,13 @@ message RouteAction {
// spans between the point at which the entire downstream request (i.e. end-of-stream) has been
// processed and when the upstream response has been completely processed. A value of 0 will
// disable the route's timeout.
// [#not-implemented-hide:]
google.protobuf.Duration timeout = 6;

// Specifies the retry policy for the route. If not specified, then no retries will be performed.
//
// .. note::
// Only simplest retry policy is supported and only ``num_retries`` field is used for generic
// proxy. The default value for ``num_retries`` is 1 means that the request will be tried once
// and no additional retries will be performed.
config.core.v3.RetryPolicy retry_policy = 7;
}
18 changes: 18 additions & 0 deletions contrib/generic_proxy/filters/network/source/interface/route.h
Expand Up @@ -26,6 +26,18 @@ using RouteSpecificFilterConfigConstSharedPtr = std::shared_ptr<const RouteSpeci
*/
using RouteTypedMetadataFactory = Envoy::Router::HttpRouteTypedMetadataFactory;

/**
* The simplest retry implementation. It only contains the number of retries.
*/
class RetryPolicy {
public:
RetryPolicy(uint32_t num_retries) : num_retries_(num_retries) {}
uint32_t numRetries() const { return num_retries_; }

private:
const uint32_t num_retries_{};
};

class RouteEntry {
public:
virtual ~RouteEntry() = default;
Expand Down Expand Up @@ -64,7 +76,13 @@ class RouteEntry {
* @return route timeout for this route.
*/
virtual const std::chrono::milliseconds timeout() const PURE;

/**
* @return const RetryPolicy& the retry policy for this route.
*/
virtual const RetryPolicy& retryPolicy() const PURE;
};

using RouteEntryConstSharedPtr = std::shared_ptr<const RouteEntry>;

class RouteMatcher : public Rds::Config {
Expand Down
3 changes: 2 additions & 1 deletion contrib/generic_proxy/filters/network/source/route.cc
Expand Up @@ -48,7 +48,8 @@ RouteEntryImpl::RouteEntryImpl(const ProtoRouteAction& route_action,
Envoy::Server::Configuration::ServerFactoryContext& context)
: name_(route_action.name()), cluster_name_(route_action.cluster()),
metadata_(route_action.metadata()), typed_metadata_(metadata_),
timeout_(PROTOBUF_GET_MS_OR_DEFAULT(route_action, timeout, DEFAULT_ROUTE_TIMEOUT_MS)) {
timeout_(PROTOBUF_GET_MS_OR_DEFAULT(route_action, timeout, DEFAULT_ROUTE_TIMEOUT_MS)),
retry_policy_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(route_action.retry_policy(), num_retries, 1)) {

for (const auto& proto_filter_config : route_action.per_filter_config()) {
auto route_config =
Expand Down
3 changes: 3 additions & 0 deletions contrib/generic_proxy/filters/network/source/route.h
Expand Up @@ -30,6 +30,7 @@ using ProtoRouteAction =
using ProtoRouteConfiguration =
envoy::extensions::filters::network::generic_proxy::v3::RouteConfiguration;
using ProtoVirtualHost = envoy::extensions::filters::network::generic_proxy::v3::VirtualHost;
using ProtoRetryPolicy = envoy::config::core::v3::RetryPolicy;

class RouteEntryImpl : public RouteEntry {
public:
Expand All @@ -48,6 +49,7 @@ class RouteEntryImpl : public RouteEntry {
const Envoy::Config::TypedMetadata& typedMetadata() const override { return typed_metadata_; };

const std::chrono::milliseconds timeout() const override { return timeout_; };
const RetryPolicy& retryPolicy() const override { return retry_policy_; }

RouteSpecificFilterConfigConstSharedPtr
createRouteSpecificFilterConfig(const std::string& name, const ProtobufWkt::Any& typed_config,
Expand All @@ -64,6 +66,7 @@ class RouteEntryImpl : public RouteEntry {
const Envoy::Config::TypedMetadataImpl<RouteTypedMetadataFactory> typed_metadata_;

const std::chrono::milliseconds timeout_;
const RetryPolicy retry_policy_;

absl::flat_hash_map<std::string, RouteSpecificFilterConfigConstSharedPtr> per_filter_configs_;
};
Expand Down
103 changes: 70 additions & 33 deletions contrib/generic_proxy/filters/network/source/router/router.cc
Expand Up @@ -612,79 +612,118 @@ void UpstreamRequest::encodeBufferToUpstream(Buffer::Instance& buffer) {
}

void RouterFilter::onResponseStart(ResponsePtr response) {
filter_complete_ = response->frameFlags().endStream();
if (response->frameFlags().endStream()) {
onFilterComplete();
}
callbacks_->onResponseStart(std::move(response));
}

void RouterFilter::onResponseFrame(StreamFramePtr frame) {
ASSERT(!filter_complete_, "response frame received after response complete");
filter_complete_ = frame->frameFlags().endStream();
if (frame->frameFlags().endStream()) {
onFilterComplete();
}
callbacks_->onResponseFrame(std::move(frame));
}

void RouterFilter::completeDirectly() {
filter_complete_ = true;
onFilterComplete();
callbacks_->completeDirectly();
}

void RouterFilter::onUpstreamRequestReset(UpstreamRequest&, StreamResetReason reason,
absl::string_view reason_detail) {
// If the RouterFilter is already completed (request is canceled or reset), ignore
// the upstream request reset.
if (filter_complete_) {
return;
}

// TODO(wbpcode): To support retry policy.
// Retry is the upstream request is reset because of the connection failure or the
// protocol error.
if (couldRetry(reason)) {
kickOffNewUpstreamRequest();
return;
}

resetStream(reason, reason_detail);
}

void RouterFilter::cleanUpstreamRequests() {
// Set filter_complete_ to true then the resetStream() of RouterFilter will not be called
// on the onUpstreamRequestReset() of RouterFilter.
void RouterFilter::onFilterComplete() {
// Ensure this method is called only once strictly.
if (filter_complete_) {
return;
}
filter_complete_ = true;

// Clean up all pending upstream requests.
while (!upstream_requests_.empty()) {
(*upstream_requests_.back()).resetStream(StreamResetReason::LocalReset, {});
}

// Clean up the timer to avoid the timeout event.
if (timeout_timer_ != nullptr) {
timeout_timer_->disableTimer();
timeout_timer_.reset();
}
}

void RouterFilter::onDestroy() {
if (filter_complete_) {
void RouterFilter::mayRequestStreamEnd(bool stream_end_stream) {
if (!stream_end_stream) {
return;
}
cleanUpstreamRequests();

request_stream_end_ = stream_end_stream;

const auto timeout = route_entry_->timeout();
if (timeout.count() > 0) {
timeout_timer_ = callbacks_->dispatcher().createTimer([this] { onTimeout(); });
timeout_timer_->enableTimer(timeout);
}
}

void RouterFilter::onTimeout() {
completeAndSendLocalReply(Status(StatusCode::kDeadlineExceeded, "timeout"), {},
StreamInfo::CoreResponseFlag::UpstreamRequestTimeout);
}

void RouterFilter::onDestroy() { onFilterComplete(); }

void RouterFilter::resetStream(StreamResetReason reason, absl::string_view reason_detail) {
// Ensure this method is called only once strictly and never called after
// onFilterComplete().
if (filter_complete_) {
return;
}
filter_complete_ = true;

const auto [view, flag] = resetReasonToViewAndFlag(reason);
completeAndSendLocalReply(Status(StatusCode::kUnavailable, view), reason_detail, flag);
}

void RouterFilter::completeAndSendLocalReply(absl::Status status, absl::string_view details,
absl::optional<StreamInfo::CoreResponseFlag> flag) {
if (flag.has_value()) {
callbacks_->streamInfo().setResponseFlag(flag.value());
}
callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, view), reason_detail);
onFilterComplete();
callbacks_->sendLocalReply(std::move(status), details);
}

void RouterFilter::kickOffNewUpstreamRequest() {
num_retries_++;

const auto& cluster_name = route_entry_->clusterName();

auto thread_local_cluster = cluster_manager_.getThreadLocalCluster(cluster_name);
if (thread_local_cluster == nullptr) {
filter_complete_ = true;
callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoClusterFound);
callbacks_->sendLocalReply(Status(StatusCode::kNotFound, "cluster_not_found"));
completeAndSendLocalReply(Status(StatusCode::kNotFound, "cluster_not_found"), {});
return;
}

cluster_ = thread_local_cluster->info();
callbacks_->streamInfo().setUpstreamClusterInfo(cluster_);

if (cluster_->maintenanceMode()) {
filter_complete_ = true;
// No response flag for maintenance mode for now.
callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, "cluster_maintain_mode"));
completeAndSendLocalReply(Status(StatusCode::kUnavailable, "cluster_maintain_mode"), {});
return;
}

Expand All @@ -704,9 +743,8 @@ void RouterFilter::kickOffNewUpstreamRequest() {
// The upstream connection is not bound yet and create a new bound upstream connection.
auto pool_data = thread_local_cluster->tcpConnPool(Upstream::ResourcePriority::Default, this);
if (!pool_data.has_value()) {
filter_complete_ = true;
callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream);
callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, "no_healthy_upstream"));
completeAndSendLocalReply(Status(StatusCode::kUnavailable, "no_healthy_upstream"), {},
StreamInfo::CoreResponseFlag::NoHealthyUpstream);
return;
}
auto new_bound_upstream = std::make_shared<BoundGenericUpstream>(
Expand All @@ -724,9 +762,8 @@ void RouterFilter::kickOffNewUpstreamRequest() {
// Upstream connection binding is disabled and create a new upstream connection.
auto pool_data = thread_local_cluster->tcpConnPool(Upstream::ResourcePriority::Default, this);
if (!pool_data.has_value()) {
filter_complete_ = true;
callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream);
callbacks_->sendLocalReply(Status(StatusCode::kUnavailable, "no_healthy_upstream"));
completeAndSendLocalReply(Status(StatusCode::kUnavailable, "no_healthy_upstream"), {},
StreamInfo::CoreResponseFlag::NoHealthyUpstream);
return;
}
generic_upstream = std::make_shared<OwnedGenericUpstream>(callbacks_->codecFactory(),
Expand All @@ -740,7 +777,8 @@ void RouterFilter::kickOffNewUpstreamRequest() {
}

void RouterFilter::onStreamFrame(StreamFramePtr frame) {
request_stream_end_ = frame->frameFlags().endStream();
mayRequestStreamEnd(frame->frameFlags().endStream());

request_stream_frames_.emplace_back(std::move(frame));

if (upstream_requests_.empty()) {
Expand All @@ -754,18 +792,17 @@ FilterStatus RouterFilter::onStreamDecoded(StreamRequest& request) {
ENVOY_LOG(debug, "Try route request to the upstream based on the route entry");

setRouteEntry(callbacks_->routeEntry());
request_stream_end_ = request.frameFlags().endStream();
request_stream_ = &request;

if (route_entry_ != nullptr) {
kickOffNewUpstreamRequest();
if (route_entry_ == nullptr) {
ENVOY_LOG(debug, "No route for current request and send local reply");
completeAndSendLocalReply(Status(StatusCode::kNotFound, "route_not_found"), {},
StreamInfo::CoreResponseFlag::NoRouteFound);
return FilterStatus::StopIteration;
}

ENVOY_LOG(debug, "No route for current request and send local reply");
filter_complete_ = true;
callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoRouteFound);
callbacks_->sendLocalReply(Status(StatusCode::kNotFound, "route_not_found"));
mayRequestStreamEnd(request.frameFlags().endStream());
kickOffNewUpstreamRequest();
return FilterStatus::StopIteration;
}

Expand Down
38 changes: 30 additions & 8 deletions contrib/generic_proxy/filters/network/source/router/router.h
Expand Up @@ -287,8 +287,12 @@ class RouterFilter : public DecoderFilter,

void onUpstreamRequestReset(UpstreamRequest& upstream_request, StreamResetReason reason,
absl::string_view reason_detail);
void onTimeout();

void setRouteEntry(const RouteEntry* route_entry) { route_entry_ = route_entry; }
void setRouteEntry(const RouteEntry* route_entry) {
route_entry_ = route_entry;
max_retries_ = route_entry_ ? route_entry->retryPolicy().numRetries() : 1;
}

std::list<UpstreamRequestPtr>& upstreamRequestsForTest() { return upstream_requests_; }

Expand All @@ -305,17 +309,34 @@ class RouterFilter : public DecoderFilter,

void kickOffNewUpstreamRequest();
void resetStream(StreamResetReason reason, absl::string_view reason_detail);
void completeAndSendLocalReply(absl::Status status, absl::string_view details,
absl::optional<StreamInfo::CoreResponseFlag> flag = {});

// Clean up all the upstream requests, statuses and timers. All further events will be
// ignored.
void onFilterComplete();

// Check if all request frames are ready and fire the timeout timer if necessary.
void mayRequestStreamEnd(bool stream_end_stream);

// Check if retry is allowed.
bool couldRetry(absl::optional<StreamResetReason> reason) {
// If the upstream connection is bound to the downstream connection and resetting happens,
// we should not retry the request because the downstream connection will be closed.
if (reason.has_value() && config_->bindUpstreamConnection()) {
return false;
}
return num_retries_ < max_retries_;
}

// Clean up all the upstream requests.
void cleanUpstreamRequests();

// Set filter_complete_ to true before any local or upstream response. Because the
// response processing may complete and destroy the L7 filter chain directly and cause the
// onDestory() of RouterFilter to be called. The filter_complete_ will be used to block
// unnecessary clearUpstreamRequests() in the onDestory() of RouterFilter.
// Set this flag if the downstream request is cancelled, reset or completed, or the upstream
// response is completely received to tell the filter to ignore all further events.
bool filter_complete_{};

const RouteEntry* route_entry_{};
uint32_t num_retries_{0};
uint32_t max_retries_{1};

Upstream::ClusterInfoConstSharedPtr cluster_;
Request* request_stream_{};
std::list<StreamFramePtr> request_stream_frames_;
Expand All @@ -324,6 +345,7 @@ class RouterFilter : public DecoderFilter,
Envoy::Router::MetadataMatchCriteriaConstPtr metadata_match_;

std::list<UpstreamRequestPtr> upstream_requests_;
Envoy::Event::TimerPtr timeout_timer_;

DecoderFilterCallback* callbacks_{};

Expand Down