From 1a85091a43800f2b48307bfca71e3150babc1ade Mon Sep 17 00:00:00 2001 From: Or Ouziel Date: Tue, 7 May 2024 15:25:03 +0300 Subject: [PATCH] add rate limit to asset inventory (#2055) (cherry picked from commit b8ffed9f603e54e4cc4f50fc46bd1442da03b484) --- go.mod | 4 +- internal/flavors/benchmark/gcp.go | 1 + .../gcplib/inventory/grpc_rate_limiter.go | 76 ++++++++ .../inventory/grpc_rate_limiter_test.go | 62 ++++++ .../providers/gcplib/inventory/map_cache.go | 40 ++++ .../gcplib/inventory/map_cache_test.go | 63 +++++++ .../providers/gcplib/inventory/provider.go | 178 ++++++++---------- .../gcplib/inventory/provider_test.go | 12 +- 8 files changed, 331 insertions(+), 105 deletions(-) create mode 100644 internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go create mode 100644 internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go create mode 100644 internal/resources/providers/gcplib/inventory/map_cache.go create mode 100644 internal/resources/providers/gcplib/inventory/map_cache_test.go diff --git a/go.mod b/go.mod index 4848884c3e..b973cc5328 100644 --- a/go.mod +++ b/go.mod @@ -474,13 +474,13 @@ require ( golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.5.0 // indirect + golang.org/x/time v0.5.0 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240228224816-df926f6c8641 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect - google.golang.org/grpc v1.62.1 // indirect + google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/internal/flavors/benchmark/gcp.go b/internal/flavors/benchmark/gcp.go index ec2e0f36c3..78b3ccc07d 100644 --- a/internal/flavors/benchmark/gcp.go +++ b/internal/flavors/benchmark/gcp.go @@ -49,6 +49,7 @@ func (g *GCP) NewBenchmark(ctx context.Context, log *logp.Logger, cfg *config.Co return builder.New( builder.WithBenchmarkDataProvider(bdp), + builder.WithManagerTimeout(cfg.Period), ).Build(ctx, log, cfg, resourceCh, reg) } diff --git a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go new file mode 100644 index 0000000000..e37be631bb --- /dev/null +++ b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go @@ -0,0 +1,76 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inventory + +import ( + "context" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/googleapis/gax-go/v2" + "golang.org/x/time/rate" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +// a gax.CallOption that defines a retry strategy which retries the request on ResourceExhausted error. +var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{ + Initial: 1 * time.Second, + Max: 1 * time.Minute, + Multiplier: 1.2, + }) +}) + +type AssetsInventoryRateLimiter struct { + methods map[string]*rate.Limiter + log *logp.Logger +} + +// a map of asset inventory client methods and their quotas. +// see https://cloud.google.com/asset-inventory/docs/quota +var methods = map[string]*rate.Limiter{ + // using per-project quota suffices for both single-account and organization-account, because it's more restrictive. + "/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1), +} + +func NewAssetsInventoryRateLimiter(log *logp.Logger) *AssetsInventoryRateLimiter { + return &AssetsInventoryRateLimiter{ + log: log, + methods: methods, + } +} + +// Limits the rate of the method calls defined in the methods map. +func (rl *AssetsInventoryRateLimiter) Wait(ctx context.Context, method string, req any) { + limiter := rl.methods[method] + if limiter != nil { + err := limiter.Wait(ctx) + if err != nil { + rl.log.Errorf("Failed to wait for project quota on method: %s, request: %v, error: %v", method, req, err) + } + } +} + +// Returns a grpc.DialOption that intercepts the unary RPCs and limits the rate of the method calls. +func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption { + return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + rl.Wait(ctx, method, req) + return invoker(ctx, method, req, reply, cc, opts...) + }) +} diff --git a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go new file mode 100644 index 0000000000..2130b69264 --- /dev/null +++ b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go @@ -0,0 +1,62 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inventory + +import ( + "context" + "testing" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/stretchr/testify/suite" + "golang.org/x/time/rate" +) + +type RateLimiterTestSuite struct { + suite.Suite + logger *logp.Logger + rateLimiter *AssetsInventoryRateLimiter +} + +func TestInventoryRateLimiterTestSuite(t *testing.T) { + suite.Run(t, new(RateLimiterTestSuite)) +} + +func (s *RateLimiterTestSuite) SetupTest() { + s.logger = logp.NewLogger("test") + s.rateLimiter = NewAssetsInventoryRateLimiter(s.logger) +} + +func (s *RateLimiterTestSuite) TestRateLimiterWait() { + ctx := context.Background() + duration := time.Millisecond + s.rateLimiter.methods = map[string]*rate.Limiter{ + "someMethod": rate.NewLimiter(rate.Every(duration/1), 1), // 1 request per duration + } + + totalRequests := 5 + startTime := time.Now() + for i := 0; i < totalRequests; i++ { + s.rateLimiter.Wait(ctx, "someMethod", nil) + } + endTime := time.Now() + + actualDuration := endTime.Sub(startTime) + minDuration := duration * time.Duration((totalRequests - 1)) // 1st request is instant, 2nd and above wait 1duration each + s.GreaterOrEqual(actualDuration, minDuration) +} diff --git a/internal/resources/providers/gcplib/inventory/map_cache.go b/internal/resources/providers/gcplib/inventory/map_cache.go new file mode 100644 index 0000000000..a726187966 --- /dev/null +++ b/internal/resources/providers/gcplib/inventory/map_cache.go @@ -0,0 +1,40 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inventory + +import ( + "sync" +) + +type MapCache[T any] struct { + results sync.Map +} + +func (c *MapCache[T]) Get(fn func() T, key string) T { + if value, ok := c.results.Load(key); ok { + return value.(T) + } + + value := fn() + c.results.Store(key, value) + return value +} + +func NewMapCache[T any]() *MapCache[T] { + return &MapCache[T]{} +} diff --git a/internal/resources/providers/gcplib/inventory/map_cache_test.go b/internal/resources/providers/gcplib/inventory/map_cache_test.go new file mode 100644 index 0000000000..b9630d3985 --- /dev/null +++ b/internal/resources/providers/gcplib/inventory/map_cache_test.go @@ -0,0 +1,63 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inventory + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type MockFunction struct { + mock.Mock +} + +func (m *MockFunction) GetSomeValue() int { + m.Called() + return 0 +} + +func TestMapCacheGet(t *testing.T) { + cache := NewMapCache[int]() + + // Test getting existing value from cache + cache.results.Store("key1", 42) + mockFunction := new(MockFunction) + result := cache.Get(mockFunction.GetSomeValue, "key1") + mockFunction.AssertNotCalled(t, "GetSomeValue") + assert.Equal(t, 42, result) + + // Test getting non-existing value from cache + mockFunction.On("GetSomeValue").Return(mockFunction.GetSomeValue()) + result = cache.Get(mockFunction.GetSomeValue, "key2") + mockFunction.AssertNumberOfCalls(t, "GetSomeValue", 2) // 1 by Return(), 2nd by cache.Get() + assert.Equal(t, 0, result) + + // Test concurrent accesses + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + defer wg.Done() + cache.Get(func() int { return 1 }, "concurrent_key") + }() + } + wg.Wait() +} diff --git a/internal/resources/providers/gcplib/inventory/provider.go b/internal/resources/providers/gcplib/inventory/provider.go index 071f2668be..ecd7f875e6 100644 --- a/internal/resources/providers/gcplib/inventory/provider.go +++ b/internal/resources/providers/gcplib/inventory/provider.go @@ -38,11 +38,11 @@ import ( ) type Provider struct { - log *logp.Logger - config auth.GcpFactoryConfig - inventory *AssetsInventoryWrapper - crm *ResourceManagerWrapper - crmCache map[string]*fetching.CloudAccountMetadata + log *logp.Logger + config auth.GcpFactoryConfig + inventory *AssetsInventoryWrapper + crm *ResourceManagerWrapper + cloudAccountMetadataCache *MapCache[*fetching.CloudAccountMetadata] } type AssetsInventoryWrapper struct { @@ -86,13 +86,6 @@ type ExtendedGcpAsset struct { type ProviderInitializer struct{} -type GcpAssetIDs struct { - orgId string - projectId string - parentProject string - parentOrg string -} - type dnsPolicyFields struct { networks []string enableLogging bool @@ -130,8 +123,9 @@ type ProviderInitializerAPI interface { } func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) { + limiter := NewAssetsInventoryRateLimiter(log) // initialize GCP assets inventory client - client, err := asset.NewClient(ctx, gcpConfig.ClientOpts...) + client, err := asset.NewClient(ctx, append(gcpConfig.ClientOpts, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))...) if err != nil { return nil, err } @@ -139,7 +133,7 @@ func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpCon assetsInventoryWrapper := &AssetsInventoryWrapper{ Close: client.Close, ListAssets: func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator { - return client.ListAssets(ctx, req, opts...) + return client.ListAssets(ctx, req, append(opts, RetryOnResourceExhausted)...) }, } @@ -150,60 +144,62 @@ func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpCon if err != nil { return nil, err } + + displayNamesCache := NewMapCache[string]() // wrap the resource manager client for mocking crmServiceWrapper := &ResourceManagerWrapper{ getProjectDisplayName: func(ctx context.Context, parent string) string { - prj, err := crmService.Projects.Get(parent).Context(ctx).Do() - if err != nil { - log.Errorf("error fetching GCP Project: %s, error: %s", parent, err) - return "" - } - return prj.DisplayName + return displayNamesCache.Get(func() string { + prj, err := crmService.Projects.Get(parent).Context(ctx).Do() + if err != nil { + log.Errorf("error fetching GCP Project: %s, error: %s", parent, err) + return "" + } + return prj.DisplayName + }, parent) }, getOrganizationDisplayName: func(ctx context.Context, parent string) string { - org, err := crmService.Organizations.Get(parent).Context(ctx).Do() - if err != nil { - log.Errorf("error fetching GCP Org: %s, error: %s", parent, err) - return "" - } - return org.DisplayName + return displayNamesCache.Get(func() string { + org, err := crmService.Organizations.Get(parent).Context(ctx).Do() + if err != nil { + log.Errorf("error fetching GCP Org: %s, error: %s", parent, err) + return "" + } + return org.DisplayName + }, parent) }, } return &Provider{ - config: gcpConfig, - log: log, - inventory: assetsInventoryWrapper, - crm: crmServiceWrapper, - crmCache: make(map[string]*fetching.CloudAccountMetadata), + config: gcpConfig, + log: log, + inventory: assetsInventoryWrapper, + crm: crmServiceWrapper, + cloudAccountMetadataCache: NewMapCache[*fetching.CloudAccountMetadata](), }, nil } func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []string) ([]*ExtendedGcpAsset, error) { - p.log.Infof("Listing GCP asset types: %v in %v", assetTypes, p.config.Parent) - wg := sync.WaitGroup{} var resourceAssets []*assetpb.Asset var policyAssets []*assetpb.Asset wg.Add(1) go func() { - request := &assetpb.ListAssetsRequest{ + resourceAssets = p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: assetTypes, ContentType: assetpb.ContentType_RESOURCE, - } - resourceAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request)) + }) wg.Done() }() wg.Add(1) go func() { - request := &assetpb.ListAssetsRequest{ + policyAssets = p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: assetTypes, ContentType: assetpb.ContentType_IAM_POLICY, - } - policyAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request)) + }) wg.Done() }() @@ -212,7 +208,7 @@ func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []str var assets []*assetpb.Asset assets = append(append(assets, resourceAssets...), policyAssets...) mergedAssets := mergeAssetContentType(assets) - extendedAssets := extendWithECS(ctx, p.crm, p.crmCache, mergedAssets) + extendedAssets := p.extendWithCloudMetadata(ctx, mergedAssets) // Enrich network assets with dns policy p.enrichNetworkAssets(ctx, extendedAssets) @@ -308,18 +304,16 @@ func (p *Provider) enrichNetworkAssets(ctx context.Context, assets []*ExtendedGc p.log.Infof("no %s assets were listed", ComputeNetworkAssetType) return } - - dnsPolicyAssets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{ + dnsPolicyAssets := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: []string{DnsPolicyAssetType}, ContentType: assetpb.ContentType_RESOURCE, - })) + }) if len(dnsPolicyAssets) == 0 { p.log.Infof("no %s assets were listed, return original assets", DnsPolicyAssetType) return } - dnsPolicies := decodeDnsPolicies(dnsPolicyAssets) p.log.Infof("attempting to enrich %d %s assets with dns policy", len(assets), ComputeNetworkAssetType) @@ -402,19 +396,21 @@ func getAssetsByProject[T any](assets []*ExtendedGcpAsset, log *logp.Logger, f T return enrichedAssets } -func getAllAssets(log *logp.Logger, it Iterator) []*assetpb.Asset { +func (p *Provider) getAllAssets(ctx context.Context, request *assetpb.ListAssetsRequest) []*assetpb.Asset { + p.log.Infof("Listing asset types: %v of type %v for %v", request.AssetTypes, request.ContentType, request.Parent) results := make([]*assetpb.Asset, 0) + it := p.inventory.ListAssets(ctx, request) for { response, err := it.Next() if err == iterator.Done { break } if err != nil { - log.Errorf("Error fetching GCP Asset: %s", err) - return nil + p.log.Errorf("Error fetching GCP Asset: %s", err) + return results } - log.Debugf("Fetched GCP Asset: %+v", response.Name) + p.log.Debugf("Fetched GCP Asset: %+v", response.Name) results = append(results, response) } return results @@ -444,92 +440,80 @@ func mergeAssetContentType(assets []*assetpb.Asset) []*assetpb.Asset { } // extends the assets with the project and organization display name -func extendWithECS(ctx context.Context, crm *ResourceManagerWrapper, cache map[string]*fetching.CloudAccountMetadata, assets []*assetpb.Asset) []*ExtendedGcpAsset { +func (p *Provider) extendWithCloudMetadata(ctx context.Context, assets []*assetpb.Asset) []*ExtendedGcpAsset { extendedAssets := make([]*ExtendedGcpAsset, 0, len(assets)) for _, asset := range assets { - keys := getAssetIds(asset) - cacheKey := fmt.Sprintf("%s/%s", keys.parentProject, keys.parentOrg) - if cloudAccount, ok := cache[cacheKey]; ok { - extendedAssets = append(extendedAssets, &ExtendedGcpAsset{ - Asset: asset, - CloudAccount: cloudAccount, - }) - continue - } - cache[cacheKey] = getCloudAccountMetadata(ctx, crm, keys) + orgId := getOrganizationId(asset.Ancestors) + projectId := getProjectId(asset.Ancestors) + cacheKey := fmt.Sprintf("%s/%s", projectId, orgId) + cloudAccount := p.cloudAccountMetadataCache.Get(func() *fetching.CloudAccountMetadata { + return p.getCloudAccountMetadata(ctx, projectId, orgId) + }, cacheKey) extendedAssets = append(extendedAssets, &ExtendedGcpAsset{ Asset: asset, - CloudAccount: cache[cacheKey], + CloudAccount: cloudAccount, }) } return extendedAssets } func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context) ([]*ProjectPoliciesAsset, error) { - projects := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{ + projects := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ ContentType: assetpb.ContentType_IAM_POLICY, Parent: p.config.Parent, AssetTypes: []string{CrmProjectAssetType}, - })) - + }) + p.log.Infof("Listed %d GCP projects", len(projects)) + ancestorsPoliciesCache := NewMapCache[[]*ExtendedGcpAsset]() return lo.Map(projects, func(project *assetpb.Asset, _ int) *ProjectPoliciesAsset { - projectAsset := extendWithECS(ctx, p.crm, p.crmCache, []*assetpb.Asset{project})[0] + projectAsset := p.extendWithCloudMetadata(ctx, []*assetpb.Asset{project})[0] // Skip first ancestor it as we already got it - policiesAssets := append([]*ExtendedGcpAsset{projectAsset}, getAncestorsAssets(ctx, p, project.Ancestors[1:])...) + policiesAssets := append([]*ExtendedGcpAsset{projectAsset}, getAncestorsAssets(ctx, ancestorsPoliciesCache, p, project.Ancestors[1:])...) return &ProjectPoliciesAsset{CloudAccount: projectAsset.CloudAccount, Policies: policiesAssets} }), nil } -func getAncestorsAssets(ctx context.Context, p *Provider, ancestors []string) []*ExtendedGcpAsset { +func getAncestorsAssets(ctx context.Context, ancestorsPoliciesCache *MapCache[[]*ExtendedGcpAsset], p *Provider, ancestors []string) []*ExtendedGcpAsset { return lo.Flatten(lo.Map(ancestors, func(parent string, _ int) []*ExtendedGcpAsset { - var assetType string - if strings.HasPrefix(parent, "folders") { - assetType = CrmFolderAssetType - } - if strings.HasPrefix(parent, "organizations") { - assetType = CrmOrgAssetType - } - assets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{ - ContentType: assetpb.ContentType_IAM_POLICY, - Parent: parent, - AssetTypes: []string{assetType}, - })) - return extendWithECS(ctx, p.crm, p.crmCache, assets) + return ancestorsPoliciesCache.Get(func() []*ExtendedGcpAsset { + var assetType string + if strings.HasPrefix(parent, "folders") { + assetType = CrmFolderAssetType + } + if strings.HasPrefix(parent, "organizations") { + assetType = CrmOrgAssetType + } + return p.extendWithCloudMetadata(ctx, p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ + ContentType: assetpb.ContentType_IAM_POLICY, + Parent: parent, + AssetTypes: []string{assetType}, + })) + }, parent) })) } -func getAssetIds(asset *assetpb.Asset) GcpAssetIDs { - orgId := getOrganizationId(asset.Ancestors) - projectId := getProjectId(asset.Ancestors) - parentProject := fmt.Sprintf("projects/%s", projectId) - parentOrg := fmt.Sprintf("organizations/%s", orgId) - return GcpAssetIDs{ - orgId: orgId, - projectId: projectId, - parentProject: parentProject, - parentOrg: parentOrg, - } -} - -func getCloudAccountMetadata(ctx context.Context, crm *ResourceManagerWrapper, keys GcpAssetIDs) *fetching.CloudAccountMetadata { +func (p *Provider) getCloudAccountMetadata(ctx context.Context, projectId string, orgId string) *fetching.CloudAccountMetadata { var orgName string var projectName string wg := sync.WaitGroup{} wg.Add(1) go func() { - orgName = crm.getOrganizationDisplayName(ctx, keys.parentOrg) + orgName = p.crm.getOrganizationDisplayName(ctx, fmt.Sprintf("organizations/%s", orgId)) wg.Done() }() wg.Add(1) go func() { - projectName = crm.getProjectDisplayName(ctx, keys.parentProject) + // some assets are not associated with a project + if projectId != "" { + projectName = p.crm.getProjectDisplayName(ctx, fmt.Sprintf("projects/%s", projectId)) + } wg.Done() }() wg.Wait() return &fetching.CloudAccountMetadata{ - AccountId: keys.projectId, + AccountId: projectId, AccountName: projectName, - OrganisationId: keys.orgId, + OrganisationId: orgId, OrganizationName: orgName, } } diff --git a/internal/resources/providers/gcplib/inventory/provider_test.go b/internal/resources/providers/gcplib/inventory/provider_test.go index e4b2cb9734..9675acfefe 100644 --- a/internal/resources/providers/gcplib/inventory/provider_test.go +++ b/internal/resources/providers/gcplib/inventory/provider_test.go @@ -89,7 +89,7 @@ func (s *ProviderTestSuite) TestListAllAssetTypesByName() { return "OrganizationName" }, }, - crmCache: make(map[string]*fetching.CloudAccountMetadata), + cloudAccountMetadataCache: NewMapCache[*fetching.CloudAccountMetadata](), } s.mockedIterator.On("Next").Return(&assetpb.Asset{Name: "AssetName1", Resource: &assetpb.Resource{}, Ancestors: []string{"projects/1", "organizations/1"}}, nil).Once() @@ -138,7 +138,7 @@ func (s *ProviderTestSuite) TestListMonitoringAssets() { return "OrganizationName1" }, }, - crmCache: make(map[string]*fetching.CloudAccountMetadata), + cloudAccountMetadataCache: NewMapCache[*fetching.CloudAccountMetadata](), } expected := []*MonitoringAsset{ @@ -218,7 +218,7 @@ func (s *ProviderTestSuite) TestEnrichNetworkAssets() { return "OrganizationName" }, }, - crmCache: make(map[string]*fetching.CloudAccountMetadata), + cloudAccountMetadataCache: NewMapCache[*fetching.CloudAccountMetadata](), } assets := []*ExtendedGcpAsset{ @@ -332,7 +332,7 @@ func (s *ProviderTestSuite) TestListServiceUsageAssets() { return "OrganizationName1" }, }, - crmCache: make(map[string]*fetching.CloudAccountMetadata), + cloudAccountMetadataCache: NewMapCache[*fetching.CloudAccountMetadata](), } // asset's resource @@ -421,7 +421,7 @@ func (s *ProviderTestSuite) TestListLoggingAssets() { return "OrganizationName1" }, }, - crmCache: make(map[string]*fetching.CloudAccountMetadata), + cloudAccountMetadataCache: NewMapCache[*fetching.CloudAccountMetadata](), } // asset's resource @@ -460,7 +460,7 @@ func (s *ProviderTestSuite) TestListProjectsAncestorsPolicies() { return "OrganizationName" }, }, - crmCache: make(map[string]*fetching.CloudAccountMetadata), + cloudAccountMetadataCache: NewMapCache[*fetching.CloudAccountMetadata](), } s.mockedIterator.On("Next").Return(&assetpb.Asset{Name: "AssetName1", IamPolicy: &iampb.Policy{}, Ancestors: []string{"projects/1", "organizations/1"}}, nil).Once()