Skip to content

Commit

Permalink
w
Browse files Browse the repository at this point in the history
  • Loading branch information
orouz committed Mar 31, 2024
1 parent 0307158 commit 0fdf441
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ import (
var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{
Initial: 1 * time.Second,
Max: 10 * time.Second,
Max: 1 * time.Minute,
Multiplier: 1.2,
})
})

type AssetsInventoryRateLimiter struct {
// methods is a map of method name to rate limiter based on the methods's per-project quota.
// we do this because when requests are made on the org level (parent: org/123), we can't tell the project id
// so we fetch by the more restrictive per-project quota, making sure even at the org level we don't exceed the limit
methods map[string]*rate.Limiter
log *logp.Logger
}

// https://cloud.google.com/asset-inventory/docs/quota
var methods = map[string]*rate.Limiter{
// In both 'single-account' and 'organization-account' cases, we always need to pace by project because of the consumer project (quota_project_id)
// Which is the one effectively consuming the quota
// the organization quota would be relevant if we manually send multiple requests with diff quota_project_id, which we don't do
"/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1),
}

Expand All @@ -61,9 +61,10 @@ func (rl *AssetsInventoryRateLimiter) Wait(ctx context.Context, method string) {
if limiter != nil {
err := limiter.Wait(ctx)
if err != nil {
rl.log.Errorf("Failed to wait for %s, error: %v", method, err)
rl.log.Errorf("Failed to wait for project quota on method %s, error: %w", method, err)
}
}

}

func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,22 @@ func (s *RateLimiterTestSuite) SetupTest() {

func (s *RateLimiterTestSuite) TestRateLimiterWait() {
ctx := context.Background()
duration := time.Millisecond
s.rateLimiter.methods = map[string]*rate.Limiter{
"foo": rate.NewLimiter(rate.Every(time.Second/1), 1), // 1 request per second
"someMethod": rate.NewLimiter(rate.Every(duration/1), 1), // 1 request per second
}

totalRequests := 5
startTime := time.Now()

for i := 0; i < totalRequests; i++ {
s.rateLimiter.Wait(ctx, "foo")
s.rateLimiter.Wait(ctx, "someMethod")
}

endTime := time.Now()
duration := endTime.Sub(startTime)

// expected duration is (totalRequests-1) seconds
// 1st request goes instantly, 2nd and above wait 1 second each
expectedDuration := time.Second * time.Duration((totalRequests - 1))
s.Assert().True(duration >= expectedDuration, fmt.Sprintf("expected %v, actual %v", expectedDuration, duration))
actualDuration := endTime.Sub(startTime)
// expected duration is (totalRequests-1) duration
// 1st request goes instantly, 2nd and above wait 1duration each
expectedDuration := duration * time.Duration((totalRequests - 1))
s.Assert().True(actualDuration >= expectedDuration, fmt.Sprintf("expected %v to be greater or equal than %v", actualDuration, expectedDuration))
}
8 changes: 5 additions & 3 deletions internal/resources/providers/gcplib/inventory/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpCon
}

func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []string) ([]*ExtendedGcpAsset, error) {
p.log.Infof("Listing GCP asset types: %v in %v", assetTypes, p.config.Parent)

wg := sync.WaitGroup{}
var resourceAssets []*assetpb.Asset
var policyAssets []*assetpb.Asset
Expand All @@ -194,6 +192,7 @@ func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []str
AssetTypes: assetTypes,
ContentType: assetpb.ContentType_RESOURCE,
}
p.log.Infof("Listing GCP resources for asset types: %v in %v", assetTypes, p.config.Parent)
resourceAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request))
wg.Done()
}()
Expand All @@ -204,6 +203,7 @@ func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []str
AssetTypes: assetTypes,
ContentType: assetpb.ContentType_IAM_POLICY,
}
p.log.Infof("Listing GCP policies for asset types: %v in %v", assetTypes, p.config.Parent)
policyAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request))
wg.Done()
}()
Expand Down Expand Up @@ -312,7 +312,7 @@ func (p *Provider) enrichNetworkAssets(ctx context.Context, assets []*ExtendedGc
p.log.Infof("no %s assets were listed", ComputeNetworkAssetType)
return
}

p.log.Infof("Listing GCP dns policies for %v", p.config.Parent)
dnsPolicyAssets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{
Parent: p.config.Parent,
AssetTypes: []string{DnsPolicyAssetType},
Expand Down Expand Up @@ -470,6 +470,7 @@ func extendWithECS(ctx context.Context, crm *ResourceManagerWrapper, cache map[s
}

func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context) ([]*ProjectPoliciesAsset, error) {
p.log.Infof("Listing GCP project policies for %v", p.config.Parent)
projects := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{
ContentType: assetpb.ContentType_IAM_POLICY,
Parent: p.config.Parent,
Expand All @@ -493,6 +494,7 @@ func getAncestorsAssets(ctx context.Context, p *Provider, ancestors []string) []
if strings.HasPrefix(parent, "organizations") {
assetType = CrmOrgAssetType
}
p.log.Infof("Listing GCP ancestor policies for %v", parent)
assets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{
ContentType: assetpb.ContentType_IAM_POLICY,
Parent: parent,
Expand Down

0 comments on commit 0fdf441

Please sign in to comment.