Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dfp: allowing the dfp cluster to do dns lookups #38237

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ removed_config_or_runtime:
Removed runtime guard ``envoy.reloadable_features.no_timer_based_rate_limit_token_bucket`` and legacy code paths.

new_features:
- area: dfp
change: |
The DFP cluster will now use the async lookup path to do DNS resolutions for null hosts. This behavioral change
can be temporarily reverted by setting runtime guard ``envoy.reloadable_features.dfp_cluster_resolves_hosts``
to false.
- area: oauth2
change: |
Add the option to specify SameSite cookie attribute values for oauth2 supported cookies.
Expand Down
6 changes: 5 additions & 1 deletion envoy/upstream/load_balancer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ struct HostSelectionResponse {
HostSelectionResponse(HostConstSharedPtr host,
std::unique_ptr<AsyncHostSelectionHandle> cancelable = nullptr)
: host(host), cancelable(std::move(cancelable)) {}
HostSelectionResponse(HostConstSharedPtr host, std::string details)
: host(host), details(details) {}
HostConstSharedPtr host;
std::string details;
std::unique_ptr<AsyncHostSelectionHandle> cancelable;
};

Expand Down Expand Up @@ -151,8 +154,9 @@ class LoadBalancerContext {

/* Called by the load balancer when asynchronous host selection completes
* @param host supplies the upstream host selected
* @param details gives optional details about the resolution success/failure.
*/
virtual void onAsyncHostSelection(HostConstSharedPtr&& host) PURE;
virtual void onAsyncHostSelection(HostConstSharedPtr&& host, std::string details) PURE;
};

/**
Expand Down
58 changes: 38 additions & 20 deletions source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,9 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
}

callbacks_->streamInfo().downstreamTiming().setValue(
"envoy.router.host_selection_start_ms",
callbacks_->dispatcher().timeSource().monotonicTime());
auto host_selection_response = cluster->chooseHost(this);
if (!host_selection_response.cancelable ||
!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.async_host_selection")) {
Expand All @@ -668,7 +671,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
// well as handling unsupported asynchronous host selection by treating it
// as host selection failure and calling sendNoHealthyUpstreamResponse.
return continueDecodeHeaders(cluster, headers, end_stream, modify_headers, nullptr,
std::move(host_selection_response.host));
std::move(host_selection_response.host),
std::string(host_selection_response.details));
}

ENVOY_STREAM_LOG(debug, "Doing asynchronous host selection\n", *callbacks_);
Expand All @@ -677,13 +681,14 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
host_selection_cancelable_ = std::move(host_selection_response.cancelable);
// Configure a callback to be called on asynchronous host selection.
on_host_selected_ = ([this, cluster, end_stream,
modify_headers](Upstream::HostConstSharedPtr&& host) -> void {
modify_headers](Upstream::HostConstSharedPtr&& host,
std::string host_selection_details) -> void {
// It should always be safe to call continueDecodeHeaders. In the case the
// stream had a local reply before host selection completed,
// the lookup should be canceled.
bool should_continue_decoding = false;
continueDecodeHeaders(cluster, *downstream_headers_, end_stream, modify_headers,
&should_continue_decoding, std::move(host));
&should_continue_decoding, std::move(host), host_selection_details);
// continueDecodeHeaders can itself send a local reply, in which case should_continue_decoding
// should be false. If this is not the case, we can continue the filter chain due to successful
// asynchronous host selection.
Expand All @@ -699,23 +704,26 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
}

// When asynchronous host selection is complete, call the pre-configured on_host_selected_function.
void Filter::onAsyncHostSelection(Upstream::HostConstSharedPtr&& host) {
ENVOY_STREAM_LOG(debug, "Completing asynchronous host selection\n", *callbacks_);
void Filter::onAsyncHostSelection(Upstream::HostConstSharedPtr&& host, std::string details) {
ENVOY_STREAM_LOG(debug, "Completing asynchronous host selection [{}]\n", *callbacks_, details);
std::unique_ptr<Upstream::AsyncHostSelectionHandle> local_scope =
std::move(host_selection_cancelable_);
on_host_selected_(std::move(host));
on_host_selected_(std::move(host), details);
}

Http::FilterHeadersStatus Filter::continueDecodeHeaders(
Upstream::ThreadLocalCluster* cluster, Http::RequestHeaderMap& headers, bool end_stream,
std::function<void(Http::ResponseHeaderMap&)> modify_headers, bool* should_continue_decoding,
Upstream::HostConstSharedPtr&& selected_host) {
Upstream::HostConstSharedPtr&& selected_host,
absl::optional<std::string> host_selection_details) {
const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
const DebugConfig* debug_config = filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key());
callbacks_->streamInfo().downstreamTiming().setValue(
"envoy.router.host_selection_end_ms", callbacks_->dispatcher().timeSource().monotonicTime());

std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster, selected_host);
if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
sendNoHealthyUpstreamResponse(host_selection_details);
return Http::FilterHeadersStatus::StopIteration;
}
Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host();
Expand Down Expand Up @@ -938,12 +946,14 @@ std::unique_ptr<GenericConnPool> Filter::createConnPool(Upstream::ThreadLocalClu
callbacks_->streamInfo().protocol(), this, *message);
}

void Filter::sendNoHealthyUpstreamResponse() {
void Filter::sendNoHealthyUpstreamResponse(absl::optional<std::string> optional_details) {
callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream);
chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, false);
absl::string_view details = (optional_details.has_value() && !optional_details->empty())
? absl::string_view(*optional_details)
: StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream;
callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", modify_headers_,
absl::nullopt,
StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream);
absl::nullopt, details);
}

Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) {
Expand Down Expand Up @@ -2112,11 +2122,14 @@ void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry
const auto cluster = config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
std::unique_ptr<GenericConnPool> generic_conn_pool;
if (cluster == nullptr) {
sendNoHealthyUpstreamResponse();
sendNoHealthyUpstreamResponse({});
cleanup();
return;
}

callbacks_->streamInfo().downstreamTiming().setValue(
"envoy.router.host_selection_start_ms",
callbacks_->dispatcher().timeSource().monotonicTime());
auto host_selection_response = cluster->chooseHost(this);
if (!host_selection_response.cancelable ||
!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.async_host_selection")) {
Expand All @@ -2127,26 +2140,31 @@ void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry
// well as handling unsupported asynchronous host selection (by treating it
// as host selection failure).
continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry,
std::move(host_selection_response.host), *cluster);
std::move(host_selection_response.host), *cluster,
std::string(host_selection_response.details));
}

ENVOY_STREAM_LOG(debug, "Handling asynchronous host selection for retry\n", *callbacks_);
// Again latch the cancel handle, and set up the callback to be called when host
// selection is complete.
host_selection_cancelable_ = std::move(host_selection_response.cancelable);
on_host_selected_ = ([this, can_send_early_data, can_use_http3, is_timeout_retry,
cluster](Upstream::HostConstSharedPtr&& host) -> void {
continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry, std::move(host),
*cluster);
});
on_host_selected_ =
([this, can_send_early_data, can_use_http3, is_timeout_retry,
cluster](Upstream::HostConstSharedPtr&& host, std::string host_selection_details) -> void {
continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry, std::move(host),
*cluster, host_selection_details);
});
}

void Filter::continueDoRetry(bool can_send_early_data, bool can_use_http3,
TimeoutRetry is_timeout_retry, Upstream::HostConstSharedPtr&& host,
Upstream::ThreadLocalCluster& cluster) {
Upstream::ThreadLocalCluster& cluster,
absl::optional<std::string> host_selection_details) {
callbacks_->streamInfo().downstreamTiming().setValue(
"envoy.router.host_selection_end_ms", callbacks_->dispatcher().timeSource().monotonicTime());
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(cluster, host);
if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
sendNoHealthyUpstreamResponse(host_selection_details);
cleanup();
return;
}
Expand Down
12 changes: 7 additions & 5 deletions source/common/router/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ class Filter : Logger::Loggable<Logger::Id::router>,
continueDecodeHeaders(Upstream::ThreadLocalCluster* cluster, Http::RequestHeaderMap& headers,
bool end_stream,
std::function<void(Http::ResponseHeaderMap&)> modify_headers,
bool* should_continue_decoding, Upstream::HostConstSharedPtr&& host);
bool* should_continue_decoding, Upstream::HostConstSharedPtr&& host,
absl::optional<std::string> host_selection_detailsi = {});

Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override;
Http::FilterTrailersStatus decodeTrailers(Http::RequestTrailerMap& trailers) override;
Expand Down Expand Up @@ -446,7 +447,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
return callbacks_->upstreamOverrideHost();
}

void onAsyncHostSelection(Upstream::HostConstSharedPtr&& host) override;
void onAsyncHostSelection(Upstream::HostConstSharedPtr&& host, std::string details) override;

/**
* Set a computed cookie to be sent with the downstream headers.
Expand Down Expand Up @@ -569,7 +570,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
// if a "good" response comes back and we return downstream, so there is no point in waiting
// for the remaining upstream requests to return.
void resetOtherUpstreams(UpstreamRequest& upstream_request);
void sendNoHealthyUpstreamResponse();
void sendNoHealthyUpstreamResponse(absl::optional<std::string> details);
bool setupRedirect(const Http::ResponseHeaderMap& headers);
bool convertRequestHeadersForInternalRedirect(Http::RequestHeaderMap& downstream_headers,
const Http::ResponseHeaderMap& upstream_headers,
Expand All @@ -579,7 +580,8 @@ class Filter : Logger::Loggable<Logger::Id::router>,
absl::optional<uint64_t> code);
void doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry);
void continueDoRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry,
Upstream::HostConstSharedPtr&& host, Upstream::ThreadLocalCluster& cluster);
Upstream::HostConstSharedPtr&& host, Upstream::ThreadLocalCluster& cluster,
absl::optional<std::string> host_selection_details);

void runRetryOptionsPredicates(UpstreamRequest& retriable_request);
// Called immediately after a non-5xx header is received from upstream, performs stats accounting
Expand All @@ -603,7 +605,7 @@ class Filter : Logger::Loggable<Logger::Id::router>,
std::unique_ptr<Stats::StatNameDynamicStorage> alt_stat_prefix_;
const VirtualCluster* request_vcluster_{};
RouteStatsContextOptRef route_stats_context_;
std::function<void(Upstream::HostConstSharedPtr&& host)> on_host_selected_;
std::function<void(Upstream::HostConstSharedPtr&& host, std::string details)> on_host_selected_;
std::unique_ptr<Upstream::AsyncHostSelectionHandle> host_selection_cancelable_;
Event::TimerPtr response_timeout_;
TimeoutData timeout_;
Expand Down
1 change: 1 addition & 0 deletions source/common/runtime/runtime_features.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ RUNTIME_GUARD(envoy_reloadable_features_async_host_selection);
RUNTIME_GUARD(envoy_reloadable_features_avoid_dfp_cluster_removal_on_cds_update);
RUNTIME_GUARD(envoy_reloadable_features_boolean_to_string_fix);
RUNTIME_GUARD(envoy_reloadable_features_check_switch_protocol_websocket_handshake);
RUNTIME_GUARD(envoy_reloadable_features_dfp_cluster_resolves_hosts);
RUNTIME_GUARD(envoy_reloadable_features_dfp_fail_on_empty_host_header);
RUNTIME_GUARD(envoy_reloadable_features_disallow_quic_client_udp_mmsg);
RUNTIME_GUARD(envoy_reloadable_features_enable_compression_bomb_protection);
Expand Down
3 changes: 3 additions & 0 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2238,6 +2238,9 @@ HostSelectionResponse ClusterManagerImpl::ThreadLocalClusterManagerImpl::Cluster
if (host_selection.host || host_selection.cancelable) {
return host_selection;
}
cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
ENVOY_LOG(debug, "no healthy host");
return host_selection;
}

cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/load_balancer_context_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LoadBalancerContextBase : public LoadBalancerContext {

absl::optional<OverrideHost> overrideHostToSelect() const override { return {}; }

void onAsyncHostSelection(HostConstSharedPtr&&) override {}
void onAsyncHostSelection(HostConstSharedPtr&&, std::string) override {}
};

} // namespace Upstream
Expand Down
71 changes: 63 additions & 8 deletions source/extensions/clusters/dynamic_forward_proxy/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "envoy/stream_info/uint32_accessor.h"

#include "source/common/http/utility.h"
#include "source/common/network/filter_state_proxy_info.h"
#include "source/common/network/transport_socket_options_impl.h"
#include "source/common/router/string_accessor_impl.h"
#include "source/common/stream_info/uint32_accessor_impl.h"
Expand Down Expand Up @@ -46,6 +47,15 @@ class DynamicPortObjectFactory : public StreamInfo::FilterState::ObjectFactory {
}
};

bool isProxying(StreamInfo::StreamInfo* stream_info) {
// Should not hit this call unless the flag is enabled.
ASSERT(Runtime::runtimeFeatureEnabled(
"envoy.reloadable_features.skip_dns_lookup_for_proxied_requests"));
return stream_info && stream_info->filterState() &&
stream_info->filterState()->hasData<Network::Http11ProxyInfoFilterState>(
Network::Http11ProxyInfoFilterState::key());
}

} // namespace

REGISTER_FACTORY(DynamicHostObjectFactory, StreamInfo::FilterState::ObjectFactory);
Expand Down Expand Up @@ -394,21 +404,66 @@ Cluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {

if (raw_host.empty()) {
ENVOY_LOG(debug, "host empty");
return {nullptr};
return {nullptr, "empty_host_header"};
}
std::string host = Common::DynamicForwardProxy::DnsHostInfo::normalizeHostForDfp(raw_host, port);
std::string hostname =
Common::DynamicForwardProxy::DnsHostInfo::normalizeHostForDfp(raw_host, port);

if (cluster_.enableSubCluster()) {
return cluster_.chooseHost(host, context);
}
return findHostByName(host);
return cluster_.chooseHost(hostname, context);
}
Upstream::HostConstSharedPtr host = findHostByName(hostname);
bool force_refresh =
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.reresolve_if_no_connections") &&
Runtime::runtimeFeatureEnabled("envoy.reloadable_features.dfp_cluster_resolves_hosts") &&
host && !host->used();
if ((host && !force_refresh) ||
!Runtime::runtimeFeatureEnabled("envoy.reloadable_features.dfp_cluster_resolves_hosts")) {
return {host};
}

// If the host is not found, the DFP cluster cluster can now do asynchronous lookup.
Upstream::ResourceAutoIncDecPtr handle = cluster_.dns_cache_->canCreateDnsRequest();

// Return an immediate failure if there's too many requests already.
if (!handle) {
return {nullptr, "dns_cache_pending_requests_overflow"};
}

// Attempt to load the host from cache. Generally this will result in async
// resolution so create a DFPHostSelectionHandle to handle this.
std::unique_ptr<DFPHostSelectionHandle> cancelable =
std::make_unique<DFPHostSelectionHandle>(context, cluster_, hostname);
bool is_proxying = isProxying(context->requestStreamInfo());
auto result = cluster_.dns_cache_->loadDnsCacheEntryWithForceRefresh(raw_host, port, is_proxying,
force_refresh, *cancelable);
switch (result.status_) {
case Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryStatus::InCache:
return {nullptr, result.host_info_.has_value() ? result.host_info_.value()->details() : ""};
case Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryStatus::Loading:
// Here the DFP kicks off an async lookup. The DFPHostSelectionHandle will
// call onLoadDnsCacheComplete and onAsyncHostSelection unless the
// resolution is canceled by the stream.
cancelable->handle_ = std::move(result.handle_);
cancelable->auto_dec_ = std::move(handle);
return Upstream::HostSelectionResponse{nullptr, std::move(cancelable)};
case Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryStatus::Overflow:
// In the case of overflow, return immediate failure.
ENVOY_LOG(debug, "host {} lookup failed due to overflow", hostname);
break; // fall through
}
return {nullptr, "dns_cache_overflow"};
}

Upstream::HostConstSharedPtr Cluster::LoadBalancer::findHostByName(const std::string& host) const {
return cluster_.findHostByName(host);
}

Upstream::HostConstSharedPtr Cluster::findHostByName(const std::string& host) const {
{
absl::ReaderMutexLock lock{&cluster_.host_map_lock_};
const auto host_it = cluster_.host_map_.find(host);
if (host_it == cluster_.host_map_.end()) {
absl::ReaderMutexLock lock{&host_map_lock_};
const auto host_it = host_map_.find(host);
if (host_it == host_map_.end()) {
ENVOY_LOG(debug, "host {} not found", host);
return nullptr;
} else {
Expand Down
Loading
Loading