Skip to content

Commit

Permalink
indexer: Cache scanner table
Browse files Browse the repository at this point in the history
During the indexing cycle many operations try to read information
from the scanner table, this can occasionally lead to resource
contention depending on the DB. This reads the table into memory when
the indexer store is instanciated and uses when asked for scanner info.

Signed-off-by: crozzy <[email protected]>
  • Loading branch information
crozzy committed Jun 5, 2023
1 parent 07c4530 commit 8620166
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 110 deletions.
23 changes: 3 additions & 20 deletions datastore/postgres/distributionsbylayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,6 @@ var (

func (s *IndexerStore) DistributionsByLayer(ctx context.Context, hash claircore.Digest, scnrs indexer.VersionedScanners) ([]*claircore.Distribution, error) {
const (
selectScanner = `
SELECT id
FROM scanner
WHERE name = $1
AND version = $2
AND kind = $3;
`
query = `
SELECT dist.id,
dist.name,
Expand All @@ -68,19 +61,9 @@ func (s *IndexerStore) DistributionsByLayer(ctx context.Context, hash claircore.
return []*claircore.Distribution{}, nil
}

// get scanner ids
scannerIDs := make([]int64, len(scnrs))
for i, scnr := range scnrs {
ctx, done := context.WithTimeout(ctx, time.Second)
start := time.Now()
err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()).
Scan(&scannerIDs[i])
done()
if err != nil {
return nil, fmt.Errorf("failed to retrieve distribution ids for scanner %q: %w", scnr, err)
}
distributionByLayerCounter.WithLabelValues("selectScanner").Add(1)
distributionByLayerDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds())
scannerIDs, err := s.selectScanners(ctx, scnrs)
if err != nil {
return nil, err
}

ctx, done := context.WithTimeout(ctx, 30*time.Second)
Expand Down
21 changes: 3 additions & 18 deletions datastore/postgres/filesbylayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ var (

func (s *IndexerStore) FilesByLayer(ctx context.Context, hash claircore.Digest, scnrs indexer.VersionedScanners) ([]claircore.File, error) {
const (
selectScanner = `
SELECT id
FROM scanner
WHERE name = $1
AND version = $2
AND kind = $3;
`
query = `
SELECT file.path, file.kind
FROM file_scanartifact
Expand All @@ -59,17 +52,9 @@ func (s *IndexerStore) FilesByLayer(ctx context.Context, hash claircore.Digest,
return []claircore.File{}, nil
}

// get scanner ids
scannerIDs := make([]int64, len(scnrs))
for i, scnr := range scnrs {
start := time.Now()
err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()).
Scan(&scannerIDs[i])
filesByLayerCounter.WithLabelValues("selectScanner").Add(1)
filesByLayerDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds())
if err != nil {
return nil, fmt.Errorf("failed to retrieve file ids for scanner %q: %w", scnr, err)
}
scannerIDs, err := s.selectScanners(ctx, scnrs)
if err != nil {
return nil, fmt.Errorf("unable to select scanners: %w", err)
}

start := time.Now()
Expand Down
30 changes: 3 additions & 27 deletions datastore/postgres/indexer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ func InitPostgresIndexerStore(_ context.Context, pool *pgxpool.Pool, doMigration
}
}

store := NewIndexerStore(pool)
return store, nil
return NewIndexerStore(pool), nil
}

var _ indexer.Store = (*IndexerStore)(nil)
Expand All @@ -39,7 +38,8 @@ var _ indexer.Store = (*IndexerStore)(nil)
//
// All the other exported methods live in their own files.
type IndexerStore struct {
pool *pgxpool.Pool
pool *pgxpool.Pool
scanners map[string]int64
}

func NewIndexerStore(pool *pgxpool.Pool) *IndexerStore {
Expand All @@ -53,30 +53,6 @@ func (s *IndexerStore) Close(_ context.Context) error {
return nil
}

const selectScanner = `
SELECT
id
FROM
scanner
WHERE
name = $1 AND version = $2 AND kind = $3;
`

func (s *IndexerStore) selectScanners(ctx context.Context, vs indexer.VersionedScanners) ([]int64, error) {
ids := make([]int64, len(vs))
for i, v := range vs {
ctx, done := context.WithTimeout(ctx, time.Second)
err := s.pool.QueryRow(ctx, selectScanner, v.Name(), v.Version(), v.Kind()).
Scan(&ids[i])
done()
if err != nil {
return nil, fmt.Errorf("failed to retrieve id for scanner %q: %w", v.Name(), err)
}
}

return ids, nil
}

func promTimer(h *prometheus.HistogramVec, name string, err *error) func() time.Duration {
t := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
h.WithLabelValues(name, success(*err)).Observe(v)
Expand Down
30 changes: 5 additions & 25 deletions datastore/postgres/layerscanned.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@ package postgres

import (
"context"
"errors"
"fmt"
"time"

"github.com/jackc/pgx/v4"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -40,14 +37,6 @@ func (s *IndexerStore) LayerScanned(ctx context.Context, hash claircore.Digest,
// TODO(hank) Could this be written as a single query that reports NULL if
// the scanner isn't present?
const (
selectScanner = `
SELECT
id
FROM
scanner
WHERE
name = $1 AND version = $2 AND kind = $3;
`
selectScanned = `
SELECT
EXISTS(
Expand All @@ -64,25 +53,16 @@ SELECT
`
)

ctx, done := context.WithTimeout(ctx, 10*time.Second)
defer done()
start := time.Now()
var scannerID int64
err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()).
Scan(&scannerID)
switch {
case errors.Is(err, nil):
case errors.Is(err, pgx.ErrNoRows):
return false, fmt.Errorf("scanner %s not found", scnr.Name())
default:
scannerID, err := s.selectScanner(ctx, scnr)
if err != nil {
return false, err
}
layerScannedCounter.WithLabelValues("selectScanner").Add(1)
layerScannedDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds())

ctx, done := context.WithTimeout(ctx, 10*time.Second)
defer done()
var ok bool

start = time.Now()
start := time.Now()
err = s.pool.QueryRow(ctx, selectScanned, hash.String(), scannerID).
Scan(&ok)
if err != nil {
Expand Down
23 changes: 3 additions & 20 deletions datastore/postgres/packagesbylayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ var (

func (s *IndexerStore) PackagesByLayer(ctx context.Context, hash claircore.Digest, scnrs indexer.VersionedScanners) ([]*claircore.Package, error) {
const (
selectScanner = `
SELECT
id
FROM
scanner
WHERE
name = $1 AND version = $2 AND kind = $3;
`
query = `
SELECT
package.id,
Expand Down Expand Up @@ -85,18 +77,9 @@ WHERE
return []*claircore.Package{}, nil
}
// get scanner ids
scannerIDs := make([]int64, len(scnrs))
for i, scnr := range scnrs {
ctx, done := context.WithTimeout(ctx, time.Second)
start := time.Now()
err := s.pool.QueryRow(ctx, selectScanner, scnr.Name(), scnr.Version(), scnr.Kind()).
Scan(&scannerIDs[i])
done()
if err != nil {
return nil, fmt.Errorf("failed to retrieve scanner ids: %w", err)
}
packagesByLayerCounter.WithLabelValues("selectScanner").Add(1)
packagesByLayerDuration.WithLabelValues("selectScanner").Observe(time.Since(start).Seconds())
scannerIDs, err := s.selectScanners(ctx, scnrs)
if err != nil {
return nil, err
}

ctx, done := context.WithTimeout(ctx, 15*time.Second)
Expand Down
52 changes: 52 additions & 0 deletions datastore/postgres/registerscanners.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/quay/claircore/indexer"
)

Expand Down Expand Up @@ -88,5 +89,56 @@ SELECT
registerScannerDuration.WithLabelValues("insert").Observe(time.Since(start).Seconds())
}

return s.populateScanners(ctx)
}

const selectAllScanner = `
SELECT
id, name, version, kind
FROM
scanner;
`

func (s *IndexerStore) populateScanners(ctx context.Context) error {
s.scanners = make(map[string]int64)
rows, err := s.pool.Query(ctx, selectAllScanner)
if err != nil {
return fmt.Errorf("failed to retrieve scanners: %w", err)
}
for rows.Next() {
var id int64
var name, version, kind string
err := rows.Scan(
&id,
&name,
&version,
&kind,
)
if err != nil {
return fmt.Errorf("failed to scan scanners: %w", err)
}
s.scanners[name+"_"+version+"_"+kind] = id
}
return nil
}

func (s *IndexerStore) selectScanners(ctx context.Context, vs indexer.VersionedScanners) ([]int64, error) {
ids := make([]int64, len(vs))
for i, v := range vs {
id, ok := s.scanners[v.Name()+"_"+v.Version()+"_"+v.Kind()]
if !ok {
return nil, fmt.Errorf("failed to retrieve id for scanner %q", v.Name())
}
ids[i] = id
}

return ids, nil
}

func (s *IndexerStore) selectScanner(ctx context.Context, v indexer.VersionedScanner) (int64, error) {
id, ok := s.scanners[v.Name()+"_"+v.Version()+"_"+v.Kind()]
if !ok {
return 0, fmt.Errorf("failed to retrieve id for scanner %q", v.Name())
}
return id, nil
}
5 changes: 5 additions & 0 deletions indexer/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,19 @@ func (s *Controller) run(ctx context.Context) (err error) {
// In all the other switch arms, we now know that the parent context
// is OK.
err = ctx.Err()
zlog.Info(ctx).Err(err).Msg("getting to the first arm")
continue
case errors.Is(err, nil):
// OK
zlog.Info(ctx).Err(err).Msg("getting to the second arm")
case errors.Is(err, context.DeadlineExceeded):
// Either the function's internal deadline or the parent's deadline
// was hit.
retry = true
case errors.Is(err, context.Canceled):
// The parent context was canceled and the stateFunc noticed.
// Continuing the loop should drop execution out of it.
zlog.Info(ctx).Err(err).Msg("getting to the third arm")
continue
default:
s.setState(IndexError)
Expand All @@ -108,13 +111,15 @@ func (s *Controller) run(ctx context.Context) (err error) {
Msg("error during scan")
s.report.Success = false
s.report.Err = err.Error()
zlog.Info(ctx).Err(err).Msg("getting to the default arm")
}
if err := s.Store.SetIndexReport(ctx, s.report); !errors.Is(err, nil) {
zlog.Info(ctx).
Err(err).
Msg("failed persisting index report")
}
if retry {
zlog.Info(ctx).Err(err).Msg("in the retry loop")
t := time.NewTimer(w)
select {
case <-ctx.Done():
Expand Down

0 comments on commit 8620166

Please sign in to comment.