From 8620166b9eb5e966fb32d387ad9541f8f2dde439 Mon Sep 17 00:00:00 2001 From: crozzy Date: Mon, 5 Jun 2023 09:50:15 -0700 Subject: [PATCH] indexer: Cache scanner table During the indexing cycle many operations try to read information from the scanner table, this can occasionally lead to resource contention depending on the DB. This reads the table into memory when the indexer store is instanciated and uses when asked for scanner info. Signed-off-by: crozzy --- datastore/postgres/distributionsbylayer.go | 23 ++-------- datastore/postgres/filesbylayer.go | 21 ++------- datastore/postgres/indexer_store.go | 30 ++----------- datastore/postgres/layerscanned.go | 30 +++---------- datastore/postgres/packagesbylayer.go | 23 ++-------- datastore/postgres/registerscanners.go | 52 ++++++++++++++++++++++ indexer/controller/controller.go | 5 +++ 7 files changed, 74 insertions(+), 110 deletions(-) diff --git a/datastore/postgres/distributionsbylayer.go b/datastore/postgres/distributionsbylayer.go index 38869636d..ca5469c4a 100644 --- a/datastore/postgres/distributionsbylayer.go +++ b/datastore/postgres/distributionsbylayer.go @@ -39,13 +39,6 @@ var ( func (s *IndexerStore) DistributionsByLayer(ctx context.Context, hash claircore.Digest, scnrs indexer.VersionedScanners) ([]*claircore.Distribution, error) { const ( - selectScanner = ` - SELECT id - FROM scanner - WHERE name = $1 - AND version = $2 - AND kind = $3; - ` query = ` SELECT dist.id, dist.name, @@ -68,19 +61,9 @@ func (s *IndexerStore) DistributionsByLayer(ctx context.Context, hash claircore. return []*claircore.Distribution{}, nil } - // get scanner ids - scannerIDs := make([]int64, len(scnrs)) - for i, scnr := range scnrs { - ctx, done := context.WithTimeout(ctx, time.Second) - start := time.Now() - err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()). - Scan(&scannerIDs[i]) - done() - if err != nil { - return nil, fmt.Errorf("failed to retrieve distribution ids for scanner %q: %w", scnr, err) - } - distributionByLayerCounter.WithLabelValues("selectScanner").Add(1) - distributionByLayerDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds()) + scannerIDs, err := s.selectScanners(ctx, scnrs) + if err != nil { + return nil, err } ctx, done := context.WithTimeout(ctx, 30*time.Second) diff --git a/datastore/postgres/filesbylayer.go b/datastore/postgres/filesbylayer.go index 1daecfbc5..cfad4bf77 100644 --- a/datastore/postgres/filesbylayer.go +++ b/datastore/postgres/filesbylayer.go @@ -38,13 +38,6 @@ var ( func (s *IndexerStore) FilesByLayer(ctx context.Context, hash claircore.Digest, scnrs indexer.VersionedScanners) ([]claircore.File, error) { const ( - selectScanner = ` - SELECT id - FROM scanner - WHERE name = $1 - AND version = $2 - AND kind = $3; - ` query = ` SELECT file.path, file.kind FROM file_scanartifact @@ -59,17 +52,9 @@ func (s *IndexerStore) FilesByLayer(ctx context.Context, hash claircore.Digest, return []claircore.File{}, nil } - // get scanner ids - scannerIDs := make([]int64, len(scnrs)) - for i, scnr := range scnrs { - start := time.Now() - err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()). - Scan(&scannerIDs[i]) - filesByLayerCounter.WithLabelValues("selectScanner").Add(1) - filesByLayerDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds()) - if err != nil { - return nil, fmt.Errorf("failed to retrieve file ids for scanner %q: %w", scnr, err) - } + scannerIDs, err := s.selectScanners(ctx, scnrs) + if err != nil { + return nil, fmt.Errorf("unable to select scanners: %w", err) } start := time.Now() diff --git a/datastore/postgres/indexer_store.go b/datastore/postgres/indexer_store.go index 534a56912..61c1f1288 100644 --- a/datastore/postgres/indexer_store.go +++ b/datastore/postgres/indexer_store.go @@ -29,8 +29,7 @@ func InitPostgresIndexerStore(_ context.Context, pool *pgxpool.Pool, doMigration } } - store := NewIndexerStore(pool) - return store, nil + return NewIndexerStore(pool), nil } var _ indexer.Store = (*IndexerStore)(nil) @@ -39,7 +38,8 @@ var _ indexer.Store = (*IndexerStore)(nil) // // All the other exported methods live in their own files. type IndexerStore struct { - pool *pgxpool.Pool + pool *pgxpool.Pool + scanners map[string]int64 } func NewIndexerStore(pool *pgxpool.Pool) *IndexerStore { @@ -53,30 +53,6 @@ func (s *IndexerStore) Close(_ context.Context) error { return nil } -const selectScanner = ` -SELECT - id -FROM - scanner -WHERE - name = $1 AND version = $2 AND kind = $3; -` - -func (s *IndexerStore) selectScanners(ctx context.Context, vs indexer.VersionedScanners) ([]int64, error) { - ids := make([]int64, len(vs)) - for i, v := range vs { - ctx, done := context.WithTimeout(ctx, time.Second) - err := s.pool.QueryRow(ctx, selectScanner, v.Name(), v.Version(), v.Kind()). - Scan(&ids[i]) - done() - if err != nil { - return nil, fmt.Errorf("failed to retrieve id for scanner %q: %w", v.Name(), err) - } - } - - return ids, nil -} - func promTimer(h *prometheus.HistogramVec, name string, err *error) func() time.Duration { t := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { h.WithLabelValues(name, success(*err)).Observe(v) diff --git a/datastore/postgres/layerscanned.go b/datastore/postgres/layerscanned.go index 787b75815..b6e4ba8e8 100644 --- a/datastore/postgres/layerscanned.go +++ b/datastore/postgres/layerscanned.go @@ -2,11 +2,8 @@ package postgres import ( "context" - "errors" - "fmt" "time" - "github.com/jackc/pgx/v4" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -40,14 +37,6 @@ func (s *IndexerStore) LayerScanned(ctx context.Context, hash claircore.Digest, // TODO(hank) Could this be written as a single query that reports NULL if // the scanner isn't present? const ( - selectScanner = ` -SELECT - id -FROM - scanner -WHERE - name = $1 AND version = $2 AND kind = $3; -` selectScanned = ` SELECT EXISTS( @@ -64,25 +53,16 @@ SELECT ` ) - ctx, done := context.WithTimeout(ctx, 10*time.Second) - defer done() - start := time.Now() - var scannerID int64 - err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()). - Scan(&scannerID) - switch { - case errors.Is(err, nil): - case errors.Is(err, pgx.ErrNoRows): - return false, fmt.Errorf("scanner %s not found", scnr.Name()) - default: + scannerID, err := s.selectScanner(ctx, scnr) + if err != nil { return false, err } - layerScannedCounter.WithLabelValues("selectScanner").Add(1) - layerScannedDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds()) + ctx, done := context.WithTimeout(ctx, 10*time.Second) + defer done() var ok bool - start = time.Now() + start := time.Now() err = s.pool.QueryRow(ctx, selectScanned, hash.String(), scannerID). Scan(&ok) if err != nil { diff --git a/datastore/postgres/packagesbylayer.go b/datastore/postgres/packagesbylayer.go index b328d3473..82ed6b0cf 100644 --- a/datastore/postgres/packagesbylayer.go +++ b/datastore/postgres/packagesbylayer.go @@ -40,14 +40,6 @@ var ( func (s *IndexerStore) PackagesByLayer(ctx context.Context, hash claircore.Digest, scnrs indexer.VersionedScanners) ([]*claircore.Package, error) { const ( - selectScanner = ` -SELECT - id -FROM - scanner -WHERE - name = $1 AND version = $2 AND kind = $3; -` query = ` SELECT package.id, @@ -85,18 +77,9 @@ WHERE return []*claircore.Package{}, nil } // get scanner ids - scannerIDs := make([]int64, len(scnrs)) - for i, scnr := range scnrs { - ctx, done := context.WithTimeout(ctx, time.Second) - start := time.Now() - err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()). - Scan(&scannerIDs[i]) - done() - if err != nil { - return nil, fmt.Errorf("failed to retrieve scanner ids: %w", err) - } - packagesByLayerCounter.WithLabelValues("selectScanner").Add(1) - packagesByLayerDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds()) + scannerIDs, err := s.selectScanners(ctx, scnrs) + if err != nil { + return nil, err } ctx, done := context.WithTimeout(ctx, 15*time.Second) diff --git a/datastore/postgres/registerscanners.go b/datastore/postgres/registerscanners.go index 7c0638eab..0b9816d73 100644 --- a/datastore/postgres/registerscanners.go +++ b/datastore/postgres/registerscanners.go @@ -7,6 +7,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/quay/claircore/indexer" ) @@ -88,5 +89,56 @@ SELECT registerScannerDuration.WithLabelValues("insert").Observe(time.Since(start).Seconds()) } + return s.populateScanners(ctx) +} + +const selectAllScanner = ` +SELECT + id, name, version, kind +FROM + scanner; +` + +func (s *IndexerStore) populateScanners(ctx context.Context) error { + s.scanners = make(map[string]int64) + rows, err := s.pool.Query(ctx, selectAllScanner) + if err != nil { + return fmt.Errorf("failed to retrieve scanners: %w", err) + } + for rows.Next() { + var id int64 + var name, version, kind string + err := rows.Scan( + &id, + &name, + &version, + &kind, + ) + if err != nil { + return fmt.Errorf("failed to scan scanners: %w", err) + } + s.scanners[name+"_"+version+"_"+kind] = id + } return nil } + +func (s *IndexerStore) selectScanners(ctx context.Context, vs indexer.VersionedScanners) ([]int64, error) { + ids := make([]int64, len(vs)) + for i, v := range vs { + id, ok := s.scanners[v.Name()+"_"+v.Version()+"_"+v.Kind()] + if !ok { + return nil, fmt.Errorf("failed to retrieve id for scanner %q", v.Name()) + } + ids[i] = id + } + + return ids, nil +} + +func (s *IndexerStore) selectScanner(ctx context.Context, v indexer.VersionedScanner) (int64, error) { + id, ok := s.scanners[v.Name()+"_"+v.Version()+"_"+v.Kind()] + if !ok { + return 0, fmt.Errorf("failed to retrieve id for scanner %q", v.Name()) + } + return id, nil +} diff --git a/indexer/controller/controller.go b/indexer/controller/controller.go index 3f843f329..3ba8e2d12 100644 --- a/indexer/controller/controller.go +++ b/indexer/controller/controller.go @@ -90,9 +90,11 @@ func (s *Controller) run(ctx context.Context) (err error) { // In all the other switch arms, we now know that the parent context // is OK. err = ctx.Err() + zlog.Info(ctx).Err(err).Msg("getting to the first arm") continue case errors.Is(err, nil): // OK + zlog.Info(ctx).Err(err).Msg("getting to the second arm") case errors.Is(err, context.DeadlineExceeded): // Either the function's internal deadline or the parent's deadline // was hit. @@ -100,6 +102,7 @@ func (s *Controller) run(ctx context.Context) (err error) { case errors.Is(err, context.Canceled): // The parent context was canceled and the stateFunc noticed. // Continuing the loop should drop execution out of it. + zlog.Info(ctx).Err(err).Msg("getting to the third arm") continue default: s.setState(IndexError) @@ -108,6 +111,7 @@ func (s *Controller) run(ctx context.Context) (err error) { Msg("error during scan") s.report.Success = false s.report.Err = err.Error() + zlog.Info(ctx).Err(err).Msg("getting to the default arm") } if err := s.Store.SetIndexReport(ctx, s.report); !errors.Is(err, nil) { zlog.Info(ctx). @@ -115,6 +119,7 @@ func (s *Controller) run(ctx context.Context) (err error) { Msg("failed persisting index report") } if retry { + zlog.Info(ctx).Err(err).Msg("in the retry loop") t := time.NewTimer(w) select { case <-ctx.Done():