From 0fdf44159477ad7771f44ab405728751806f650a Mon Sep 17 00:00:00 2001 From: Or Ouziel Date: Thu, 28 Mar 2024 08:59:52 +0200 Subject: [PATCH] w --- .../gcplib/inventory/grpc_rate_limiter.go | 11 ++++++----- .../gcplib/inventory/grpc_rate_limiter_test.go | 16 ++++++++-------- .../providers/gcplib/inventory/provider.go | 8 +++++--- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go index 75301c7300..6f727ac7b7 100644 --- a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go +++ b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go @@ -31,21 +31,21 @@ import ( var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer { return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{ Initial: 1 * time.Second, - Max: 10 * time.Second, + Max: 1 * time.Minute, Multiplier: 1.2, }) }) type AssetsInventoryRateLimiter struct { - // methods is a map of method name to rate limiter based on the methods's per-project quota. - // we do this because when requests are made on the org level (parent: org/123), we can't tell the project id - // so we fetch by the more restrictive per-project quota, making sure even at the org level we don't exceed the limit methods map[string]*rate.Limiter log *logp.Logger } // https://cloud.google.com/asset-inventory/docs/quota var methods = map[string]*rate.Limiter{ + // In both 'single-account' and 'organization-account' cases, we always need to pace by project because of the consumer project (quota_project_id) + // Which is the one effectively consuming the quota + // the organization quota would be relevant if we manually send multiple requests with diff quota_project_id, which we don't do "/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1), } @@ -61,9 +61,10 @@ func (rl *AssetsInventoryRateLimiter) Wait(ctx context.Context, method string) { if limiter != nil { err := limiter.Wait(ctx) if err != nil { - rl.log.Errorf("Failed to wait for %s, error: %v", method, err) + rl.log.Errorf("Failed to wait for project quota on method %s, error: %w", method, err) } } + } func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption { diff --git a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go index 4a329bf306..e473cb50c7 100644 --- a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go +++ b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go @@ -45,22 +45,22 @@ func (s *RateLimiterTestSuite) SetupTest() { func (s *RateLimiterTestSuite) TestRateLimiterWait() { ctx := context.Background() + duration := time.Millisecond s.rateLimiter.methods = map[string]*rate.Limiter{ - "foo": rate.NewLimiter(rate.Every(time.Second/1), 1), // 1 request per second + "someMethod": rate.NewLimiter(rate.Every(duration/1), 1), // 1 request per second } totalRequests := 5 startTime := time.Now() for i := 0; i < totalRequests; i++ { - s.rateLimiter.Wait(ctx, "foo") + s.rateLimiter.Wait(ctx, "someMethod") } endTime := time.Now() - duration := endTime.Sub(startTime) - - // expected duration is (totalRequests-1) seconds - // 1st request goes instantly, 2nd and above wait 1 second each - expectedDuration := time.Second * time.Duration((totalRequests - 1)) - s.Assert().True(duration >= expectedDuration, fmt.Sprintf("expected %v, actual %v", expectedDuration, duration)) + actualDuration := endTime.Sub(startTime) + // expected duration is (totalRequests-1) duration + // 1st request goes instantly, 2nd and above wait 1duration each + expectedDuration := duration * time.Duration((totalRequests - 1)) + s.Assert().True(actualDuration >= expectedDuration, fmt.Sprintf("expected %v to be greater or equal than %v", actualDuration, expectedDuration)) } diff --git a/internal/resources/providers/gcplib/inventory/provider.go b/internal/resources/providers/gcplib/inventory/provider.go index 5511cd6df5..582cfd3cde 100644 --- a/internal/resources/providers/gcplib/inventory/provider.go +++ b/internal/resources/providers/gcplib/inventory/provider.go @@ -181,8 +181,6 @@ func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpCon } func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []string) ([]*ExtendedGcpAsset, error) { - p.log.Infof("Listing GCP asset types: %v in %v", assetTypes, p.config.Parent) - wg := sync.WaitGroup{} var resourceAssets []*assetpb.Asset var policyAssets []*assetpb.Asset @@ -194,6 +192,7 @@ func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []str AssetTypes: assetTypes, ContentType: assetpb.ContentType_RESOURCE, } + p.log.Infof("Listing GCP resources for asset types: %v in %v", assetTypes, p.config.Parent) resourceAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request)) wg.Done() }() @@ -204,6 +203,7 @@ func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []str AssetTypes: assetTypes, ContentType: assetpb.ContentType_IAM_POLICY, } + p.log.Infof("Listing GCP policies for asset types: %v in %v", assetTypes, p.config.Parent) policyAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request)) wg.Done() }() @@ -312,7 +312,7 @@ func (p *Provider) enrichNetworkAssets(ctx context.Context, assets []*ExtendedGc p.log.Infof("no %s assets were listed", ComputeNetworkAssetType) return } - + p.log.Infof("Listing GCP dns policies for %v", p.config.Parent) dnsPolicyAssets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: []string{DnsPolicyAssetType}, @@ -470,6 +470,7 @@ func extendWithECS(ctx context.Context, crm *ResourceManagerWrapper, cache map[s } func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context) ([]*ProjectPoliciesAsset, error) { + p.log.Infof("Listing GCP project policies for %v", p.config.Parent) projects := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{ ContentType: assetpb.ContentType_IAM_POLICY, Parent: p.config.Parent, @@ -493,6 +494,7 @@ func getAncestorsAssets(ctx context.Context, p *Provider, ancestors []string) [] if strings.HasPrefix(parent, "organizations") { assetType = CrmOrgAssetType } + p.log.Infof("Listing GCP ancestor policies for %v", parent) assets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{ ContentType: assetpb.ContentType_IAM_POLICY, Parent: parent,