Skip to content

Commit

Permalink
Ent implementation for use cases #1 and #2 (#24)
Browse files Browse the repository at this point in the history
* Ent - PackageVersion: added index for improving IsDependency ingestion (guacsec#1439)

Signed-off-by: mrizzi <[email protected]>

* Ent: Package,IsDependency concurrent bulk ingestions (guacsec#1440)

Signed-off-by: mrizzi <[email protected]>

* Ent - HasMetadata: fix ingesting same twice (guacsec#1392)

Signed-off-by: mrizzi <[email protected]>

* Ent - Vulnerability endpoints: applied concurrent approach

Signed-off-by: mrizzi <[email protected]>

* Ent implementation for use case #1-#2

Signed-off-by: mrizzi <[email protected]>

---------

Signed-off-by: mrizzi <[email protected]>
  • Loading branch information
mrizzi authored Nov 10, 2023
1 parent d084a42 commit 307a19e
Show file tree
Hide file tree
Showing 41 changed files with 2,638 additions and 181 deletions.
1 change: 1 addition & 0 deletions .github/scripts/excluded_from_copyright
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@
./pkg/assembler/graphql/generated/prelude.generated.go
./pkg/assembler/graphql/generated/root_.generated.go
./pkg/assembler/graphql/generated/schema.generated.go
./pkg/assembler/graphql/generated/search.generated.go
./pkg/assembler/graphql/generated/source.generated.go
./pkg/assembler/graphql/generated/vulnEqual.generated.go
./pkg/assembler/graphql/generated/vulnMetadata.generated.go
Expand Down
33 changes: 33 additions & 0 deletions internal/testing/testdata/testdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,6 +822,17 @@ var (
Collector: "GUAC",
},
},
{
Pkg: baselayoutPack,
PkgMatchFlag: generated.MatchFlags{Pkg: "SPECIFIC_VERSION"},
HasMetadata: &generated.HasMetadataInputSpec{
Key: "topLevelPackage",
Value: "pkg:guac/spdx/gcr.io/google-containers/alpine-latest",
Justification: "spdx top level package reference",
Origin: "GUAC SPDX",
Collector: "GUAC",
},
},
{
Pkg: baselayoutdataPack,
PkgMatchFlag: model.MatchFlags{Pkg: model.PkgMatchTypeSpecificVersion},
Expand All @@ -844,6 +855,17 @@ var (
Collector: "GUAC",
},
},
{
Pkg: baselayoutdataPack,
PkgMatchFlag: generated.MatchFlags{Pkg: "SPECIFIC_VERSION"},
HasMetadata: &generated.HasMetadataInputSpec{
Key: "topLevelPackage",
Value: "pkg:guac/spdx/gcr.io/google-containers/alpine-latest",
Justification: "spdx top level package reference",
Origin: "GUAC SPDX",
Collector: "GUAC",
},
},
{
Pkg: keysPack,
PkgMatchFlag: model.MatchFlags{Pkg: model.PkgMatchTypeSpecificVersion},
Expand Down Expand Up @@ -888,6 +910,17 @@ var (
Collector: "GUAC",
},
},
{
Pkg: keysPack,
PkgMatchFlag: generated.MatchFlags{Pkg: "SPECIFIC_VERSION"},
HasMetadata: &generated.HasMetadataInputSpec{
Key: "topLevelPackage",
Value: "pkg:guac/spdx/gcr.io/google-containers/alpine-latest",
Justification: "spdx top level package reference",
Origin: "GUAC SPDX",
Collector: "GUAC",
},
},
}

SpdxIngestionPredicates = assembler.IngestPredicates{
Expand Down
72 changes: 47 additions & 25 deletions pkg/assembler/backends/ent/backend/certifyVEXStatement.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
stdsql "database/sql"

"entgo.io/ent/dialect/sql"
"github.com/guacsec/guac/internal/testing/ptrfrom"
"github.com/guacsec/guac/pkg/assembler/backends/ent"
"github.com/guacsec/guac/pkg/assembler/backends/ent/certifyvex"
"github.com/guacsec/guac/pkg/assembler/backends/ent/predicate"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/guacsec/guac/pkg/assembler/graphql/model"
"github.com/pkg/errors"
"github.com/vektah/gqlparser/v2/gqlerror"
"golang.org/x/sync/errgroup"
)

func (b *EntBackend) IngestVEXStatement(ctx context.Context, subject model.PackageOrArtifactInput, vulnerability model.VulnerabilityInputSpec, vexStatement model.VexStatementInputSpec) (*model.CertifyVEXStatement, error) {
Expand Down Expand Up @@ -63,12 +65,14 @@ func (b *EntBackend) IngestVEXStatement(ctx context.Context, subject model.Packa
var conflictWhere *sql.Predicate

// manage package or artifact
var subjectID int
if subject.Package != nil {
p, err := getPkgVersion(ctx, client.Client(), *subject.Package)
if err != nil {
return nil, Errorf("%v :: %s", funcName, err)
}
insert.SetPackage(p)
subjectID = p.ID
conflictColumns = append(conflictColumns, certifyvex.FieldPackageID)
conflictWhere = sql.And(
sql.NotNull(certifyvex.FieldPackageID),
Expand All @@ -82,6 +86,7 @@ func (b *EntBackend) IngestVEXStatement(ctx context.Context, subject model.Packa
return nil, Errorf("%v :: %s", funcName, err)
}
insert.SetArtifactID(artID)
subjectID = artID
conflictColumns = append(conflictColumns, certifyvex.FieldArtifactID)
conflictWhere = sql.And(
sql.IsNull(certifyvex.FieldPackageID),
Expand Down Expand Up @@ -112,7 +117,7 @@ func (b *EntBackend) IngestVEXStatement(ctx context.Context, subject model.Packa
return nil, errors.Wrap(err, "upsert certify vex statement node")
}
id, err = client.CertifyVex.Query().
Where(vexStatementInputPredicate(subject, vulnerability, vexStatement)).
Where(vexStatementInputPredicate(subject, subjectID, vulnerability, vulnID, vexStatement)).
WithPackage(func(q *ent.PackageVersionQuery) {
q.WithName(func(q *ent.PackageNameQuery) {
q.WithNamespace(func(q *ent.PackageNamespaceQuery) {
Expand All @@ -139,19 +144,30 @@ func (b *EntBackend) IngestVEXStatement(ctx context.Context, subject model.Packa
}

func (b *EntBackend) IngestVEXStatements(ctx context.Context, subjects model.PackageOrArtifactInputs, vulnerabilities []*model.VulnerabilityInputSpec, vexStatements []*model.VexStatementInputSpec) ([]string, error) {
var ids []string
var ids = make([]string, len(vexStatements))
eg, ctx := errgroup.WithContext(ctx)
for i := range vexStatements {
index := i
var subject model.PackageOrArtifactInput
if len(subjects.Packages) > 0 {
subject = model.PackageOrArtifactInput{Package: subjects.Packages[i]}
subject = model.PackageOrArtifactInput{Package: subjects.Packages[index]}
} else {
subject = model.PackageOrArtifactInput{Artifact: subjects.Artifacts[i]}
subject = model.PackageOrArtifactInput{Artifact: subjects.Artifacts[index]}
}
statement, err := b.IngestVEXStatement(ctx, subject, *vulnerabilities[i], *vexStatements[i])
if err != nil {
return nil, gqlerror.Errorf("IngestVEXStatements failed with element #%v with err: %v", i, err)
}
ids = append(ids, statement.ID)
vuln := *vulnerabilities[index]
vexStatement := *vexStatements[index]
concurrently(eg, func() error {
statement, err := b.IngestVEXStatement(ctx, subject, vuln, vexStatement)
if err == nil {
ids[index] = statement.ID
return err
} else {
return gqlerror.Errorf("IngestVEXStatements failed with element #%v with err: %v", i, err)
}
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
return ids, nil
}
Expand Down Expand Up @@ -202,27 +218,21 @@ func certifyVexPredicate(filter model.CertifyVEXStatementSpec) predicate.Certify
predicates := []predicate.CertifyVex{
optionalPredicate(filter.ID, IDEQ),
optionalPredicate(filter.KnownSince, certifyvex.KnownSinceEQ),
optionalPredicate(filter.Statement, certifyvex.StatementEQ),
optionalPredicate(filter.StatusNotes, certifyvex.StatusNotesEQ),
optionalPredicate(filter.Collector, certifyvex.CollectorEQ),
optionalPredicate(filter.Origin, certifyvex.OriginEQ),
}
if filter.Status != nil {
status := filter.Status.String()
predicates = append(predicates, optionalPredicate(&status, certifyvex.StatusEQ))
}
if filter.VexJustification != nil {
justification := filter.VexJustification.String()
predicates = append(predicates, optionalPredicate(&justification, certifyvex.JustificationEQ))
}

if filter.Subject != nil {
if filter.Subject.Package != nil {
predicates = append(predicates, certifyvex.HasPackageWith(packageVersionQuery(filter.Subject.Package)))
} else if filter.Subject.Artifact != nil {
predicates = append(predicates, certifyvex.HasArtifactWith(artifactQueryPredicates(filter.Subject.Artifact)))
}
if filter.Status != nil {
status := filter.Status.String()
predicates = append(predicates, optionalPredicate(&status, certifyvex.StatusEQ))
}
predicates = append(predicates,
optionalPredicate(filter.Statement, certifyvex.StatementEQ),
optionalPredicate(filter.StatusNotes, certifyvex.StatusNotesEQ),
optionalPredicate(filter.Origin, certifyvex.OriginEQ),
optionalPredicate(filter.Collector, certifyvex.CollectorEQ),
)

if filter.Vulnerability != nil {
if filter.Vulnerability.NoVuln != nil && *filter.Vulnerability.NoVuln {
Expand All @@ -239,23 +249,35 @@ func certifyVexPredicate(filter model.CertifyVEXStatementSpec) predicate.Certify
)
}
}

if filter.Subject != nil {
if filter.Subject.Package != nil {
predicates = append(predicates, certifyvex.HasPackageWith(packageVersionQuery(filter.Subject.Package)))
} else if filter.Subject.Artifact != nil {
predicates = append(predicates, certifyvex.HasArtifactWith(artifactQueryPredicates(filter.Subject.Artifact)))
}
}

return certifyvex.And(predicates...)
}

func vexStatementInputPredicate(subject model.PackageOrArtifactInput, vulnerability model.VulnerabilityInputSpec, vexStatement model.VexStatementInputSpec) predicate.CertifyVex {
func vexStatementInputPredicate(subject model.PackageOrArtifactInput, subjectID int, vulnerability model.VulnerabilityInputSpec, vulnerabilityID int, vexStatement model.VexStatementInputSpec) predicate.CertifyVex {
var sub *model.PackageOrArtifactSpec
if subject.Package != nil {
sub = &model.PackageOrArtifactSpec{
Package: helper.ConvertPkgInputSpecToPkgSpec(subject.Package),
}
sub.Package.ID = ptrfrom.String(nodeID(subjectID))
} else {
sub = &model.PackageOrArtifactSpec{
Artifact: helper.ConvertArtInputSpecToArtSpec(subject.Artifact),
}
sub.Artifact.ID = ptrfrom.String(nodeID(subjectID))
}
return certifyVexPredicate(model.CertifyVEXStatementSpec{
Subject: sub,
Vulnerability: &model.VulnerabilitySpec{
ID: ptrfrom.String(nodeID(vulnerabilityID)),
Type: &vulnerability.Type,
VulnerabilityID: &vulnerability.VulnerabilityID,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ func (s *Suite) TestVEXBulkIngest() {
if err != nil {
return
}
if diff := cmp.Diff(test.ExpVEX, got, ignoreID); diff != "" {
if diff := cmp.Diff(test.ExpVEX, got, IngestPredicatesCmpOpts...); diff != "" {
t.Errorf("Unexpected results. (-want +got):\n%s", diff)
}
})
Expand Down
27 changes: 20 additions & 7 deletions pkg/assembler/backends/ent/backend/certifyVuln.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/guacsec/guac/pkg/assembler/backends/ent/vulnerabilitytype"
"github.com/guacsec/guac/pkg/assembler/graphql/model"
"github.com/vektah/gqlparser/v2/gqlerror"
"golang.org/x/sync/errgroup"
)

func (b *EntBackend) IngestCertifyVuln(ctx context.Context, pkg model.PkgInputSpec, spec model.VulnerabilityInputSpec, certifyVuln model.ScanMetadataInput) (*model.CertifyVuln, error) {
Expand Down Expand Up @@ -90,13 +91,25 @@ func (b *EntBackend) IngestCertifyVuln(ctx context.Context, pkg model.PkgInputSp
}

func (b *EntBackend) IngestCertifyVulns(ctx context.Context, pkgs []*model.PkgInputSpec, vulnerabilities []*model.VulnerabilityInputSpec, certifyVulns []*model.ScanMetadataInput) ([]*model.CertifyVuln, error) {
var modelCertifyVulns []*model.CertifyVuln
for i, certifyVuln := range certifyVulns {
modelCertifyVuln, err := b.IngestCertifyVuln(ctx, *pkgs[i], *vulnerabilities[i], *certifyVuln)
if err != nil {
return nil, gqlerror.Errorf("IngestVulnerability failed with err: %v", err)
}
modelCertifyVulns = append(modelCertifyVulns, modelCertifyVuln)
var modelCertifyVulns = make([]*model.CertifyVuln, len(vulnerabilities))
eg, ctx := errgroup.WithContext(ctx)
for i := range certifyVulns {
index := i
pkg := *pkgs[index]
vuln := *vulnerabilities[index]
certifyVuln := *certifyVulns[index]
concurrently(eg, func() error {
modelCertifyVuln, err := b.IngestCertifyVuln(ctx, pkg, vuln, certifyVuln)
if err == nil {
modelCertifyVulns[index] = modelCertifyVuln
return err
} else {
return gqlerror.Errorf("IngestCertifyVulns failed with err: %v", err)
}
})
}
if err := eg.Wait(); err != nil {
return nil, err
}
return modelCertifyVulns, nil
}
Expand Down
5 changes: 1 addition & 4 deletions pkg/assembler/backends/ent/backend/certifyVuln_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,9 +1023,6 @@ func (s *Suite) TestIngestCertifyVulns() {
},
},
}
ignoreID := cmp.FilterPath(func(p cmp.Path) bool {
return strings.Compare(".ID", p[len(p)-1].String()) == 0
}, cmp.Ignore())
ctx := context.Background()
for _, test := range tests {
s.Run(test.Name, func() {
Expand Down Expand Up @@ -1072,7 +1069,7 @@ func (s *Suite) TestIngestCertifyVulns() {
if err != nil {
return
}
if diff := cmp.Diff(test.ExpVuln, got, ignoreID); diff != "" {
if diff := cmp.Diff(test.ExpVuln, got, IngestPredicatesCmpOpts...); diff != "" {
t.Errorf("Unexpected results. (-want +got):\n%s", diff)
}
})
Expand Down
79 changes: 79 additions & 0 deletions pkg/assembler/backends/ent/backend/concurrently.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
//
// Copyright 2023 The GUAC Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package backend

import (
"context"
"os"
"strconv"

"github.com/guacsec/guac/pkg/logging"
"golang.org/x/sync/errgroup"
)

var concurrent chan struct{}
var concurrentRead chan struct{}

const MaxConcurrentBulkIngestionString string = "MAX_CONCURRENT_BULK_INGESTION"
const defaultMaxConcurrentBulkIngestion int = 50
const MaxConcurrentReadString string = "MAX_CONCURRENT_READ"
const defaultMaxConcurrentRead int = 40

func init() {
logger := logging.FromContext(context.Background())
concurrentSize := defaultMaxConcurrentBulkIngestion
maxConcurrentBulkIngestionEnv, found := os.LookupEnv(MaxConcurrentBulkIngestionString)
if found {
maxConcurrentBulkIngestion, err := strconv.Atoi(maxConcurrentBulkIngestionEnv)
if err != nil {
logger.Warnf("failed to convert %v value %v to integer. Default value %v will be applied", MaxConcurrentBulkIngestionString, maxConcurrentBulkIngestionEnv, defaultMaxConcurrentBulkIngestion)
concurrentSize = defaultMaxConcurrentBulkIngestion
} else {
concurrentSize = maxConcurrentBulkIngestion
}
}
concurrent = make(chan struct{}, concurrentSize)

concurrentReadSize := defaultMaxConcurrentRead
maxConcurrentReadEnv, found := os.LookupEnv(MaxConcurrentReadString)
if found {
maxConcurrentBulkIngestion, err := strconv.Atoi(maxConcurrentReadEnv)
if err != nil {
logger.Warnf("failed to convert %v value %v to integer. Default value applied is %v\n", MaxConcurrentReadString, maxConcurrentReadEnv, concurrentReadSize)
} else {
concurrentReadSize = maxConcurrentBulkIngestion
}
}
concurrentRead = make(chan struct{}, concurrentReadSize)
}

func concurrently(eg *errgroup.Group, fn func() error) {
eg.Go(func() error {
concurrent <- struct{}{}
err := fn()
<-concurrent
return err
})
}

func concurrentlyRead(eg *errgroup.Group, fn func() error) {
eg.Go(func() error {
concurrentRead <- struct{}{}
err := fn()
<-concurrentRead
return err
})
}
Loading

0 comments on commit 307a19e

Please sign in to comment.