Skip to content

Commit

Permalink
ecds: support for upstream network filter (envoyproxy#29240)
Browse files Browse the repository at this point in the history
Support of upstream network filter with ECDS. Resolves envoyproxy#14696.

Risk Level: low
Testing: Integration tests
Docs Changes: ECDS docs

Signed-off-by: ohadvano <[email protected]>
  • Loading branch information
ohadvano authored Aug 30, 2023
1 parent d8d92ce commit 41adbd5
Show file tree
Hide file tree
Showing 11 changed files with 828 additions and 10 deletions.
9 changes: 9 additions & 0 deletions api/envoy/config/cluster/v3/filter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ syntax = "proto3";

package envoy.config.cluster.v3;

import "envoy/config/core/v3/config_source.proto";

import "google/protobuf/any.proto";

import "udpa/annotations/status.proto";
Expand All @@ -27,5 +29,12 @@ message Filter {
// instantiated. See the supported filters for further documentation.
// Note that Envoy's :ref:`downstream network
// filters <config_network_filters>` are not valid upstream filters.
// Only one of typed_config or config_discovery can be used.
google.protobuf.Any typed_config = 2;

// Configuration source specifier for an extension configuration discovery
// service. In case of a failure and without the default configuration, the
// listener closes the connections.
// Only one of typed_config or config_discovery can be used.
core.v3.ExtensionConfigSource config_discovery = 3;
}
5 changes: 3 additions & 2 deletions api/envoy/service/extension/v3/config_discovery.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#protodoc-title: Extension config discovery service (ECDS)]

// A service that supports dynamic configuration updates for a specific filter.
// Currently, ECDS is supported for downstream network filters, HTTP filters and Listener filters.
// Currently, ECDS is supported for network filters, HTTP filters and Listener filters.
// Please check :ref:`Extension Config Discovery Service (ECDS) API <config_overview_extension_discovery>`.
// The overall extension config discovery service works as follows:
//
// 1. A filter (:ref:`Network <envoy_v3_api_field_config.listener.v3.Filter.config_discovery>`,
// 1. A filter (:ref:`Downstream Network <envoy_v3_api_field_config.listener.v3.Filter.config_discovery>`,
// :ref:`Upstream Network <envoy_v3_api_field_config.cluster.v3.Filter.config_discovery>`,
// :ref:`Listener <envoy_v3_api_field_config.listener.v3.ListenerFilter.config_discovery>`
// or :ref:`HTTP <envoy_v3_api_field_extensions.filters.network.http_connection_manager.v3.HttpFilter.config_discovery>`)
// contains a :ref:`config_discovery <envoy_v3_api_msg_config.core.v3.ExtensionConfigSource>` configuration. This configuration
Expand Down
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ new_features:
- area: redis
change: |
added support for time command (returns a local response).
- area: extension_discovery_service
change: |
added ECDS support for :ref:` upstream network filters<envoy_v3_api_field_config.cluster.v3.Filter.config_discovery>`.
- area: redis
change: |
added support for lmove command.
Expand Down
1 change: 1 addition & 0 deletions docs/root/configuration/overview/extension.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ are supported for listener filters, downstream network filters, and HTTP filters

- For TCP listener filters, the value of *<stat_prefix>* is *tcp_listener_filter*.
- For downstream network filters, the value of *<stat_prefix>* is *network_filter*.
- For upstream network filters, the value of *<stat_prefix>* is *upstream_network_filter*.
- For downstream HTTP filters, the value of *<stat_prefix>* is *http_filter*.
- For upstream HTTP filters, the value of *<stat_prefix>* is *upstream_http_filter*.

Expand Down
33 changes: 31 additions & 2 deletions source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,8 @@ ClusterInfoImpl::ClusterInfoImpl(
http_filter_config_provider_manager_(
Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
server_context)),
network_filter_config_provider_manager_(
createSingletonUpstreamNetworkFilterConfigProviderManager(server_context)),
factory_context_(
std::make_unique<FactoryContextImpl>(*stats_scope_, runtime, factory_context)),
upstream_context_(server_context, init_manager, *stats_scope_),
Expand Down Expand Up @@ -1225,7 +1227,23 @@ ClusterInfoImpl::ClusterInfoImpl(
filter_factories_.reserve(filters.size());
for (ssize_t i = 0; i < filters.size(); i++) {
const auto& proto_config = filters[i];
const bool is_terminal = i == filters.size() - 1;
ENVOY_LOG(debug, " upstream filter #{}:", i);

if (proto_config.has_config_discovery()) {
if (proto_config.has_typed_config()) {
throw EnvoyException("Only one of typed_config or config_discovery can be used");
}

ENVOY_LOG(debug, " dynamic filter name: {}", proto_config.name());
filter_factories_.push_back(
network_filter_config_provider_manager_->createDynamicFilterConfigProvider(
proto_config.config_discovery(), proto_config.name(), server_context,
upstream_context_, factory_context.clusterManager(), is_terminal, "network",
nullptr));
continue;
}

ENVOY_LOG(debug, " name: {}", proto_config.name());
auto& factory = Config::Utility::getAndCheckFactory<
Server::Configuration::NamedUpstreamNetworkFilterConfigFactory>(proto_config);
Expand All @@ -1234,8 +1252,9 @@ ClusterInfoImpl::ClusterInfoImpl(
factory_context.messageValidationVisitor(), *message);
Network::FilterFactoryCb callback =
factory.createFilterFactoryFromProto(*message, upstream_context_);
filter_factories_.push_back(network_config_provider_manager_.createStaticFilterConfigProvider(
callback, proto_config.name()));
filter_factories_.push_back(
network_filter_config_provider_manager_->createStaticFilterConfigProvider(
callback, proto_config.name()));
}

if (http_protocol_options_) {
Expand Down Expand Up @@ -1778,6 +1797,16 @@ std::pair<absl::optional<double>, absl::optional<uint32_t>> ClusterInfoImpl::get
return std::make_pair(budget_percent, min_retry_concurrency);
}

SINGLETON_MANAGER_REGISTRATION(upstream_network_filter_config_provider_manager);

std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
ClusterInfoImpl::createSingletonUpstreamNetworkFilterConfigProviderManager(
Server::Configuration::ServerFactoryContext& context) {
return context.singletonManager().getTyped<UpstreamNetworkFilterConfigProviderManager>(
SINGLETON_MANAGER_REGISTERED_NAME(upstream_network_filter_config_provider_manager),
[] { return std::make_shared<Filter::UpstreamNetworkFilterConfigProviderManagerImpl>(); });
}

ResourceManagerImplPtr
ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluster& config,
Runtime::Loader& runtime, const std::string& cluster_name,
Expand Down
21 changes: 15 additions & 6 deletions source/common/upstream/upstream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@
namespace Envoy {
namespace Upstream {

using UpstreamNetworkFilterConfigProviderManager =
Filter::FilterConfigProviderManager<Network::FilterFactoryCb,
Server::Configuration::UpstreamFactoryContext>;

/**
* An implementation of UpstreamLocalAddressSelector.
*/
Expand Down Expand Up @@ -1050,6 +1054,10 @@ class ClusterInfoImpl : public ClusterInfo,
getRetryBudgetParams(const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds);

private:
std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
createSingletonUpstreamNetworkFilterConfigProviderManager(
Server::Configuration::ServerFactoryContext& context);

struct ResourceManagers {
ResourceManagers(const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime,
const std::string& cluster_name, Stats::Scope& stats_scope,
Expand Down Expand Up @@ -1112,15 +1120,16 @@ class ClusterInfoImpl : public ClusterInfo,
const std::shared_ptr<const envoy::config::cluster::v3::Cluster::CommonLbConfig>
common_lb_config_;
std::unique_ptr<const envoy::config::cluster::v3::Cluster::CustomClusterType> cluster_type_;
// TODO(ohadvano): http_filter_config_provider_manager_ should be maintained in the ClusterManager
// object as a singleton. This is currently not possible due to circular dependency (filter config
// provider manager depends on the ClusterManager object).
// The circular dependency can be resolved when the following issue is resolved:
// https://github.com/envoyproxy/envoy/issues/26653.
// TODO(ohadvano): http_filter_config_provider_manager_ and
// network_filter_config_provider_manager_ should be maintained in the ClusterManager object as a
// singleton. This is currently not possible due to circular dependency (filter config provider
// manager depends on the ClusterManager object). The circular dependency can be resolved when the
// following issue is resolved: https://github.com/envoyproxy/envoy/issues/26653.
std::shared_ptr<Http::UpstreamFilterConfigProviderManager> http_filter_config_provider_manager_;
std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
network_filter_config_provider_manager_;
const std::unique_ptr<Server::Configuration::CommonFactoryContext> factory_context_;
Filter::NetworkFilterFactoriesList filter_factories_;
Filter::UpstreamNetworkFilterConfigProviderManagerImpl network_config_provider_manager_;
Http::FilterChainUtility::FilterFactoriesList http_filter_factories_;
mutable Http::Http1::CodecStats::AtomicPtr http1_codec_stats_;
mutable Http::Http2::CodecStats::AtomicPtr http2_codec_stats_;
Expand Down
20 changes: 20 additions & 0 deletions test/common/upstream/upstream_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4863,6 +4863,26 @@ TEST_F(ClusterInfoImplTest, Http2AutoWithNonAlpnMatcher) {
".*which has a non-ALPN transport socket matcher:.*");
}

TEST_F(ClusterInfoImplTest, UpstreamFilterTypedAndDynamicConfigThrows) {
const std::string yaml = R"EOF(
name: name
connect_timeout: 0.25s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
protocol_selection: USE_DOWNSTREAM_PROTOCOL
filters:
- name: foo
config_discovery:
type_urls:
- type.googleapis.com/google.protobuf.Struct
typed_config:
"@type": type.googleapis.com/google.protobuf.Struct
)EOF";

EXPECT_THROW_WITH_MESSAGE(makeCluster(yaml), EnvoyException,
"Only one of typed_config or config_discovery can be used");
}

// Validate empty singleton for HostsPerLocalityImpl.
TEST(HostsPerLocalityImpl, Empty) {
EXPECT_FALSE(HostsPerLocalityImpl::empty()->hasLocalLocality());
Expand Down
16 changes: 16 additions & 0 deletions test/integration/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,22 @@ envoy_cc_test(
],
)

envoy_cc_test(
name = "upstream_network_filter_integration_test",
size = "large",
srcs = ["upstream_network_filter_integration_test.cc"],
deps = [
":http_integration_lib",
"//source/extensions/filters/network/tcp_proxy:config",
"//test/common/grpc:grpc_client_integration_lib",
"//test/integration/filters:test_network_filter_lib",
"//test/test_common:utility_lib",
"@envoy_api//envoy/extensions/filters/network/tcp_proxy/v3:pkg_cc_proto",
"@envoy_api//envoy/service/discovery/v3:pkg_cc_proto",
"@envoy_api//envoy/service/extension/v3:pkg_cc_proto",
],
)

envoy_cc_test(
name = "buffer_accounting_integration_test",
size = "large",
Expand Down
45 changes: 45 additions & 0 deletions test/integration/filters/test_network_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,50 @@ static Registry::RegisterFactory<TestDrainerNetworkFilterConfigFactory,
Server::Configuration::NamedNetworkFilterConfigFactory>
drainer_register_;

class TestDrainerUpstreamNetworkFilter : public Network::WriteFilter {
public:
TestDrainerUpstreamNetworkFilter(
const test::integration::filters::TestDrainerUpstreamNetworkFilterConfig& config)
: bytes_to_drain_(config.bytes_to_drain()) {}

Network::FilterStatus onWrite(Buffer::Instance& buffer, bool) override {
buffer.drain(bytes_to_drain_);
return Network::FilterStatus::Continue;
}

void initializeWriteFilterCallbacks(Network::WriteFilterCallbacks& callbacks) override {
write_callbacks_ = &callbacks;
}

private:
Envoy::Network::WriteFilterCallbacks* write_callbacks_{};
int bytes_to_drain_;
};

class TestDrainerUpstreamNetworkFilterConfigFactory
: public Server::Configuration::NamedUpstreamNetworkFilterConfigFactory {
public:
std::string name() const override { return "envoy.test.test_drainer_upstream_network_filter"; }

Network::FilterFactoryCb
createFilterFactoryFromProto(const Protobuf::Message& proto_config,
Server::Configuration::UpstreamFactoryContext& context) override {
const auto& config = MessageUtil::downcastAndValidate<
const test::integration::filters::TestDrainerUpstreamNetworkFilterConfig&>(
proto_config, context.getServerFactoryContext().messageValidationVisitor());
return [config](Network::FilterManager& filter_manager) -> void {
filter_manager.addWriteFilter(std::make_shared<TestDrainerUpstreamNetworkFilter>(config));
};
}

ProtobufTypes::MessagePtr createEmptyConfigProto() override {
return std::make_unique<test::integration::filters::TestDrainerUpstreamNetworkFilterConfig>();
}
};

static Registry::RegisterFactory<TestDrainerUpstreamNetworkFilterConfigFactory,
Server::Configuration::NamedUpstreamNetworkFilterConfigFactory>
upstream_drainer_register_;

} // namespace
} // namespace Envoy
4 changes: 4 additions & 0 deletions test/integration/filters/test_network_filter.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ message TestDrainerNetworkFilterConfig {
bool is_terminal_filter = 1;
uint32 bytes_to_drain = 2 [(validate.rules).uint32 = {gte: 2}];
}

message TestDrainerUpstreamNetworkFilterConfig {
uint32 bytes_to_drain = 1 [(validate.rules).uint32 = {gte: 2}];
}
Loading

0 comments on commit 41adbd5

Please sign in to comment.