Skip to content

Commit

Permalink
add rate limit to asset inventory (#2055)
Browse files Browse the repository at this point in the history
(cherry picked from commit b8ffed9)
  • Loading branch information
orouz authored and mergify[bot] committed May 7, 2024
1 parent 72baecd commit 1a85091
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 105 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,13 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/time v0.5.0
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240228224816-df926f6c8641 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/grpc v1.62.1 // indirect
google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.33.0
gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions internal/flavors/benchmark/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (g *GCP) NewBenchmark(ctx context.Context, log *logp.Logger, cfg *config.Co

return builder.New(
builder.WithBenchmarkDataProvider(bdp),
builder.WithManagerTimeout(cfg.Period),
).Build(ctx, log, cfg, resourceCh, reg)
}

Expand Down
76 changes: 76 additions & 0 deletions internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"context"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/googleapis/gax-go/v2"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

// a gax.CallOption that defines a retry strategy which retries the request on ResourceExhausted error.
var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{
Initial: 1 * time.Second,
Max: 1 * time.Minute,
Multiplier: 1.2,
})
})

type AssetsInventoryRateLimiter struct {
methods map[string]*rate.Limiter
log *logp.Logger
}

// a map of asset inventory client methods and their quotas.
// see https://cloud.google.com/asset-inventory/docs/quota
var methods = map[string]*rate.Limiter{
// using per-project quota suffices for both single-account and organization-account, because it's more restrictive.
"/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1),
}

func NewAssetsInventoryRateLimiter(log *logp.Logger) *AssetsInventoryRateLimiter {
return &AssetsInventoryRateLimiter{
log: log,
methods: methods,
}
}

// Limits the rate of the method calls defined in the methods map.
func (rl *AssetsInventoryRateLimiter) Wait(ctx context.Context, method string, req any) {
limiter := rl.methods[method]
if limiter != nil {
err := limiter.Wait(ctx)
if err != nil {
rl.log.Errorf("Failed to wait for project quota on method: %s, request: %v, error: %v", method, req, err)
}
}
}

// Returns a grpc.DialOption that intercepts the unary RPCs and limits the rate of the method calls.
func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption {
return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
rl.Wait(ctx, method, req)
return invoker(ctx, method, req, reply, cc, opts...)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"context"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/stretchr/testify/suite"
"golang.org/x/time/rate"
)

type RateLimiterTestSuite struct {
suite.Suite
logger *logp.Logger
rateLimiter *AssetsInventoryRateLimiter
}

func TestInventoryRateLimiterTestSuite(t *testing.T) {
suite.Run(t, new(RateLimiterTestSuite))
}

func (s *RateLimiterTestSuite) SetupTest() {
s.logger = logp.NewLogger("test")
s.rateLimiter = NewAssetsInventoryRateLimiter(s.logger)
}

func (s *RateLimiterTestSuite) TestRateLimiterWait() {
ctx := context.Background()
duration := time.Millisecond
s.rateLimiter.methods = map[string]*rate.Limiter{
"someMethod": rate.NewLimiter(rate.Every(duration/1), 1), // 1 request per duration
}

totalRequests := 5
startTime := time.Now()
for i := 0; i < totalRequests; i++ {
s.rateLimiter.Wait(ctx, "someMethod", nil)
}
endTime := time.Now()

actualDuration := endTime.Sub(startTime)
minDuration := duration * time.Duration((totalRequests - 1)) // 1st request is instant, 2nd and above wait 1duration each
s.GreaterOrEqual(actualDuration, minDuration)
}
40 changes: 40 additions & 0 deletions internal/resources/providers/gcplib/inventory/map_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"sync"
)

type MapCache[T any] struct {
results sync.Map
}

func (c *MapCache[T]) Get(fn func() T, key string) T {
if value, ok := c.results.Load(key); ok {
return value.(T)
}

value := fn()
c.results.Store(key, value)
return value
}

func NewMapCache[T any]() *MapCache[T] {
return &MapCache[T]{}
}
63 changes: 63 additions & 0 deletions internal/resources/providers/gcplib/inventory/map_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type MockFunction struct {
mock.Mock
}

func (m *MockFunction) GetSomeValue() int {
m.Called()
return 0
}

func TestMapCacheGet(t *testing.T) {
cache := NewMapCache[int]()

// Test getting existing value from cache
cache.results.Store("key1", 42)
mockFunction := new(MockFunction)
result := cache.Get(mockFunction.GetSomeValue, "key1")
mockFunction.AssertNotCalled(t, "GetSomeValue")
assert.Equal(t, 42, result)

// Test getting non-existing value from cache
mockFunction.On("GetSomeValue").Return(mockFunction.GetSomeValue())
result = cache.Get(mockFunction.GetSomeValue, "key2")
mockFunction.AssertNumberOfCalls(t, "GetSomeValue", 2) // 1 by Return(), 2nd by cache.Get()
assert.Equal(t, 0, result)

// Test concurrent accesses
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cache.Get(func() int { return 1 }, "concurrent_key")
}()
}
wg.Wait()
}
Loading

0 comments on commit 1a85091

Please sign in to comment.