Skip to content

Commit 9d1f0ae

Browse files
committed
postgres/v2: Indexer implementation
Signed-off-by: Hank Donnay <[email protected]>
1 parent f6fc435 commit 9d1f0ae

File tree

50 files changed

+3350
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3350
-0
lines changed

datastore/postgres/v2/indexer_v1.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,66 @@
11
package postgres
22

3+
import (
4+
"context"
5+
"fmt"
6+
"runtime"
7+
8+
"github.com/jackc/pgx/v5"
9+
"github.com/jackc/pgx/v5/pgxpool"
10+
"github.com/jackc/pgx/v5/stdlib"
11+
"github.com/remind101/migrate"
12+
13+
"github.com/quay/claircore/datastore/postgres/migrations"
14+
"github.com/quay/claircore/indexer"
15+
)
16+
17+
// NewIndexerV1 returns a configured [IndexerV1].
18+
//
19+
// The passed [pgxpool.Config] will have its tracing and lifecycle hooks
20+
// overwritten.
21+
//
22+
// Values that can be used as IndexerOptions:
23+
// - [WithMigrations]
24+
func NewIndexerV1(ctx context.Context, cfg *pgxpool.Config, opt ...IndexerOption) (*IndexerV1, error) {
25+
const prefix = `indexer`
26+
var idxCfg indexerConfig
27+
for _, o := range opt {
28+
idxCfg = o.indexerConfig(idxCfg)
29+
}
30+
31+
if idxCfg.Migrations {
32+
cfg := cfg.ConnConfig.Copy()
33+
cfg.DefaultQueryExecMode = pgx.QueryExecModeExec
34+
err := func() error {
35+
db := stdlib.OpenDB(*cfg)
36+
defer db.Close()
37+
migrator := migrate.NewPostgresMigrator(db)
38+
migrator.Table = migrations.IndexerMigrationTable
39+
err := migrator.Exec(migrate.Up, migrations.IndexerMigrations...)
40+
if err != nil {
41+
return fmt.Errorf("failed to perform migrations: %w", err)
42+
}
43+
return nil
44+
}()
45+
if err != nil {
46+
return nil, err
47+
}
48+
}
49+
50+
var s IndexerV1
51+
var err error
52+
if err = s.init(ctx, cfg, prefix); err != nil {
53+
return nil, err
54+
}
55+
56+
_, file, line, _ := runtime.Caller(1)
57+
runtime.SetFinalizer(&s, func(s *IndexerV1) {
58+
panic(fmt.Sprintf("%s:%d: IndexerV1 not closed", file, line))
59+
})
60+
61+
return &s, nil
62+
}
63+
364
// IndexerOption is an option for configuring an indexer datastore.
465
type IndexerOption interface {
566
indexerConfig(indexerConfig) indexerConfig
@@ -9,3 +70,33 @@ type IndexerOption interface {
970
type indexerConfig struct {
1071
Migrations bool
1172
}
73+
74+
// Static assertion for the [indexer.Store] interface.
75+
var _ indexer.Store = (*IndexerV1)(nil)
76+
77+
// IndexerV1 implements [indexer.Store] backed by a PostgreSQL database.
78+
type IndexerV1 struct {
79+
storeCommon
80+
}
81+
82+
// Close implements [indexer.Store].
83+
func (s *IndexerV1) Close(_ context.Context) error {
84+
runtime.SetFinalizer(s, nil)
85+
return s.storeCommon.Close()
86+
}
87+
88+
// RegisterScanners is a bad name.
89+
func (s *IndexerV1) RegisterScanners(ctx context.Context, vs indexer.VersionedScanners) (err error) {
90+
ctx, done := s.method(ctx, &err)
91+
defer done()
92+
rvs := rotateVersionedScanners(vs)
93+
94+
err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `register`, func(ctx context.Context, c *pgxpool.Conn, query string) (err error) {
95+
_, err = c.Exec(ctx, query, rvs.Name, rvs.Version, rvs.Kind)
96+
return err
97+
}))
98+
if err != nil {
99+
return err
100+
}
101+
return nil
102+
}
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package postgres
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"runtime/pprof"
8+
"strconv"
9+
10+
"github.com/jackc/pgx/v5"
11+
"github.com/quay/zlog"
12+
"go.opentelemetry.io/otel/attribute"
13+
"go.opentelemetry.io/otel/codes"
14+
"go.opentelemetry.io/otel/trace"
15+
16+
"github.com/quay/claircore"
17+
)
18+
19+
var (
20+
// ErrNotIndexed indicates the vulnerability being queried has a dist or repo not
21+
// indexed into the database.
22+
ErrNotIndexed = fmt.Errorf("vulnerability containers data not indexed by any scannners")
23+
)
24+
25+
// AffectedManifests finds the manifests digests which are affected by the provided vulnerability.
26+
//
27+
// An exhaustive search for all indexed packages of the same name as the vulnerability is performed.
28+
//
29+
// The list of packages is filtered down to only the affected set.
30+
//
31+
// The manifest index is then queried to resolve a list of manifest hashes containing the affected
32+
// artifacts.
33+
func (s *IndexerV1) AffectedManifests(ctx context.Context, v claircore.Vulnerability, vulnFunc claircore.CheckVulnernableFunc) (_ []claircore.Digest, err error) {
34+
ctx, done := s.method(ctx, &err)
35+
defer done()
36+
ctx = zlog.ContextWithValues(ctx, "vulnerability", v.Name)
37+
38+
out := []claircore.Digest{}
39+
err = pgx.BeginTxFunc(ctx, s.pool, pgx.TxOptions{AccessMode: pgx.ReadOnly},
40+
s.tx(ctx, `AffectedManifests`, func(ctx context.Context, tx pgx.Tx) (err error) {
41+
var pr claircore.IndexRecord
42+
span := trace.SpanFromContext(ctx)
43+
44+
err = pgx.BeginFunc(ctx, tx, s.tx(ctx, `protoRecord`, s.protoRecordCall(&pr, v)))
45+
switch {
46+
case err == nil:
47+
case errors.Is(err, ErrNotIndexed):
48+
// This is a common case: the system knows of a vulnerability but
49+
// doesn't know of any manifests it could apply to.
50+
zlog.Debug(ctx).Msg("not indexed")
51+
trace.SpanFromContext(ctx).SetStatus(codes.Ok, "not indexed")
52+
return nil
53+
default:
54+
return err
55+
}
56+
57+
// Collect all packages which may be affected by the vulnerability
58+
// in question.
59+
pkgsToFilter := []claircore.Package{}
60+
61+
err = pgx.BeginFunc(ctx, tx,
62+
s.call(ctx, `selectPackages`, func(ctx context.Context, tx pgx.Tx, query string) error {
63+
rows, err := tx.Query(ctx, query, v.Package.Name)
64+
if err != nil {
65+
return fmt.Errorf("vulnerability %q: %w", v.ID, err)
66+
}
67+
defer rows.Close()
68+
69+
for rows.Next() {
70+
var pkg claircore.Package
71+
var id int64
72+
var nKind *string
73+
err := rows.Scan(
74+
&id,
75+
&pkg.Name,
76+
&pkg.Version,
77+
&pkg.Kind,
78+
&nKind,
79+
&pkg.NormalizedVersion,
80+
&pkg.Module,
81+
&pkg.Arch,
82+
)
83+
if err != nil {
84+
return fmt.Errorf("unmarshal error: %w", err)
85+
}
86+
idStr := strconv.FormatInt(id, 10)
87+
pkg.ID = idStr
88+
if nKind != nil {
89+
pkg.NormalizedVersion.Kind = *nKind
90+
}
91+
pkgsToFilter = append(pkgsToFilter, pkg)
92+
}
93+
trace.SpanFromContext(ctx).
94+
AddEvent("loaded packages", trace.WithAttributes(attribute.Int("count", len(pkgsToFilter))))
95+
zlog.Debug(ctx).Int("count", len(pkgsToFilter)).Msg("packages to filter")
96+
if err := rows.Err(); err != nil {
97+
return fmt.Errorf("error reading response: %w", err)
98+
}
99+
return nil
100+
}))
101+
if err != nil {
102+
return fmt.Errorf("unable to select packages: %w", err)
103+
}
104+
105+
// for each package discovered create an index record
106+
// and determine if any in-tree matcher finds the record vulnerable
107+
var filteredRecords []claircore.IndexRecord
108+
for i := range pkgsToFilter {
109+
pkg := &pkgsToFilter[i]
110+
pr.Package = pkg
111+
var match bool
112+
var err error
113+
pprof.Do(ctx, pprof.Labels("hook", "CheckVulnFunc"), func(ctx context.Context) {
114+
match, err = vulnFunc(ctx, &pr, &v)
115+
})
116+
if err != nil {
117+
return fmt.Errorf("error in check vulnerable hook: %w", err)
118+
}
119+
if match {
120+
filteredRecords = append(filteredRecords, claircore.IndexRecord{
121+
Package: pkg,
122+
Distribution: pr.Distribution,
123+
Repository: pr.Repository,
124+
})
125+
}
126+
}
127+
span.AddEvent("filtered packages", trace.WithAttributes(attribute.Int("count", len(filteredRecords))))
128+
zlog.Debug(ctx).Int("count", len(filteredRecords)).Msg("vulnerable index records")
129+
// Query the manifest index for manifests containing the vulnerable
130+
// IndexRecords and create a set containing each unique manifest.
131+
set := map[string]struct{}{}
132+
selectAffected := func(id string, dist, repo *uint64) callFunc {
133+
return func(ctx context.Context, tx pgx.Tx, query string) error {
134+
rows, err := tx.Query(ctx, query, id, dist, repo)
135+
if err != nil {
136+
return err
137+
}
138+
defer rows.Close()
139+
for rows.Next() {
140+
var hash string
141+
if err := rows.Scan(&hash); err != nil {
142+
return err
143+
}
144+
if _, ok := set[hash]; ok {
145+
continue
146+
}
147+
set[hash] = struct{}{}
148+
i := len(out)
149+
out = append(out, claircore.Digest{})
150+
if err := out[i].UnmarshalText([]byte(hash)); err != nil {
151+
return err
152+
}
153+
}
154+
return rows.Err()
155+
}
156+
}
157+
158+
for _, record := range filteredRecords {
159+
v, err := toValues(record)
160+
if err != nil {
161+
return fmt.Errorf("failed to get sql values for query: %w", err)
162+
}
163+
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `selectAffected`, selectAffected(record.Package.ID, v[2], v[3])))
164+
switch {
165+
case errors.Is(err, nil):
166+
default:
167+
return fmt.Errorf("error selecting affected: %w", err)
168+
}
169+
}
170+
171+
span.AddEvent("affected manifests", trace.WithAttributes(attribute.Int("count", len(out))))
172+
zlog.Debug(ctx).Int("count", len(out)).Msg("affected manifests")
173+
return nil
174+
}))
175+
if err != nil {
176+
return nil, err
177+
}
178+
return out, nil
179+
}
180+
181+
func (s *IndexerV1) protoRecordCall(out *claircore.IndexRecord, v claircore.Vulnerability) txFunc {
182+
return func(ctx context.Context, tx pgx.Tx) error {
183+
// fill dist into prototype index record if exists
184+
if (v.Dist != nil) && (v.Dist.Name != "") {
185+
const name = `selectDist`
186+
var did int64
187+
err := pgx.BeginFunc(ctx, tx, s.call(ctx, name, protoRecordSelectDist(&did, v.Dist)))
188+
switch {
189+
case errors.Is(err, nil):
190+
id := strconv.FormatInt(did, 10)
191+
out.Distribution = &claircore.Distribution{
192+
ID: id,
193+
Arch: v.Dist.Arch,
194+
CPE: v.Dist.CPE,
195+
DID: v.Dist.DID,
196+
Name: v.Dist.Name,
197+
PrettyName: v.Dist.PrettyName,
198+
Version: v.Dist.Version,
199+
VersionCodeName: v.Dist.VersionCodeName,
200+
VersionID: v.Dist.VersionID,
201+
}
202+
zlog.Debug(ctx).Str("id", id).Msg("discovered distribution id")
203+
case errors.Is(err, pgx.ErrNoRows):
204+
// OK
205+
default:
206+
return fmt.Errorf("failed to scan dist: %w", err)
207+
}
208+
} else {
209+
zlog.Debug(ctx).Msg("no distribution")
210+
}
211+
212+
// fill repo into prototype index record if exists
213+
if (v.Repo != nil) && (v.Repo.Name != "") {
214+
const name = `selectRepo`
215+
var rid int64
216+
err := pgx.BeginFunc(ctx, tx, s.call(ctx, name, protoRecordSelectRepo(&rid, v.Repo)))
217+
switch {
218+
case errors.Is(err, nil):
219+
id := strconv.FormatInt(rid, 10)
220+
out.Repository = &claircore.Repository{
221+
ID: id,
222+
Key: v.Repo.Key,
223+
Name: v.Repo.Name,
224+
URI: v.Repo.URI,
225+
}
226+
zlog.Debug(ctx).Str("id", id).Msg("discovered repo id")
227+
case errors.Is(err, pgx.ErrNoRows):
228+
// OK
229+
default:
230+
return fmt.Errorf("failed to scan repo: %w", err)
231+
}
232+
} else {
233+
zlog.Debug(ctx).Msg("no repository")
234+
}
235+
236+
// we need at least a repo or distribution to continue
237+
if (out.Distribution == nil) && (out.Repository == nil) {
238+
return ErrNotIndexed
239+
}
240+
return nil
241+
}
242+
}
243+
244+
func protoRecordSelectDist(out *int64, d *claircore.Distribution) callFunc {
245+
return func(ctx context.Context, tx pgx.Tx, query string) error {
246+
return tx.QueryRow(ctx, query,
247+
d.Arch,
248+
d.CPE,
249+
d.DID,
250+
d.Name,
251+
d.PrettyName,
252+
d.Version,
253+
d.VersionCodeName,
254+
d.VersionID,
255+
).Scan(out)
256+
}
257+
}
258+
259+
func protoRecordSelectRepo(out *int64, r *claircore.Repository) callFunc {
260+
return func(ctx context.Context, tx pgx.Tx, query string) error {
261+
return tx.QueryRow(ctx, query,
262+
r.Name,
263+
r.Key,
264+
r.URI,
265+
).Scan(out)
266+
}
267+
}

0 commit comments

Comments
 (0)