Skip to content

Commit f6fc435

Browse files
committed
postgres/v2: Matcher implementation
Signed-off-by: Hank Donnay <[email protected]>
1 parent 98d131e commit f6fc435

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+3120
-2
lines changed

datastore/postgres/v2/matcher_v1.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,121 @@
11
package postgres
22

3+
import (
4+
"context"
5+
"fmt"
6+
"sync/atomic"
7+
8+
"github.com/google/uuid"
9+
"github.com/jackc/pgx/v5"
10+
"github.com/jackc/pgx/v5/pgxpool"
11+
"github.com/jackc/pgx/v5/stdlib"
12+
"github.com/quay/zlog"
13+
"github.com/remind101/migrate"
14+
"go.opentelemetry.io/otel/attribute"
15+
"go.opentelemetry.io/otel/trace"
16+
17+
"github.com/quay/claircore/datastore"
18+
"github.com/quay/claircore/datastore/postgres/migrations"
19+
)
20+
21+
func NewMatcher(ctx context.Context, cfg *pgxpool.Config, opt ...MatcherOption) (*MatcherV1, error) {
22+
const prefix = `matcher`
23+
var mCfg matcherConfig
24+
for _, o := range opt {
25+
mCfg = o.matcherConfig(mCfg)
26+
}
27+
28+
if mCfg.Migrations {
29+
cfg := cfg.ConnConfig.Copy()
30+
cfg.DefaultQueryExecMode = pgx.QueryExecModeExec
31+
err := func() error {
32+
db := stdlib.OpenDB(*cfg)
33+
defer db.Close()
34+
migrator := migrate.NewPostgresMigrator(db)
35+
migrator.Table = migrations.MatcherMigrationTable
36+
err := migrator.Exec(migrate.Up, migrations.MatcherMigrations...)
37+
if err != nil {
38+
return fmt.Errorf("failed to perform migrations: %w", err)
39+
}
40+
return nil
41+
}()
42+
if err != nil {
43+
return nil, err
44+
}
45+
}
46+
var s MatcherV1
47+
if err := s.init(ctx, cfg, prefix); err != nil {
48+
return nil, err
49+
50+
}
51+
return &s, nil
52+
}
53+
354
type MatcherOption interface {
455
matcherConfig(matcherConfig) matcherConfig
556
}
657

758
type matcherConfig struct {
859
Migrations bool
960
}
61+
62+
// MatcherV1 implements all interfaces in the vulnstore package
63+
type MatcherV1 struct {
64+
storeCommon
65+
// Initialized is used as an atomic bool for tracking initialization.
66+
initialized uint32
67+
}
68+
69+
var _ datastore.MatcherV1 = (*MatcherV1)(nil)
70+
71+
// DeleteUpdateOperations implements [datastore.MatcherV1Updater].
72+
func (s *MatcherV1) DeleteUpdateOperations(ctx context.Context, id ...uuid.UUID) (int64, error) {
73+
const query = `DELETE FROM update_operation WHERE ref = ANY($1::uuid[]);`
74+
ctx = zlog.ContextWithValues(ctx, "component", "internal/vulnstore/postgres/deleteUpdateOperations")
75+
if len(id) == 0 {
76+
return 0, nil
77+
}
78+
79+
// Pgx seems unwilling to do the []uuid.UUID → uuid[] conversion, so we're
80+
// forced to make some garbage here.
81+
refStr := make([]string, len(id))
82+
for i := range id {
83+
refStr[i] = id[i].String()
84+
}
85+
tag, err := s.pool.Exec(ctx, query, refStr)
86+
if err != nil {
87+
return 0, fmt.Errorf("failed to delete: %w", err)
88+
}
89+
return tag.RowsAffected(), nil
90+
}
91+
92+
// Initialized implements [datastore.MatcherV1].
93+
func (s *MatcherV1) Initialized(ctx context.Context) (ok bool, err error) {
94+
ctx, done := s.method(ctx, &err)
95+
defer done()
96+
span := trace.SpanFromContext(ctx)
97+
ok = atomic.LoadUint32(&s.initialized) != 0
98+
span.AddEvent(`loaded`, trace.WithAttributes(attribute.Bool("value", ok)))
99+
if ok {
100+
return true, nil
101+
}
102+
103+
err = s.pool.AcquireFunc(ctx, s.acquire(ctx, `initialized`, func(ctx context.Context, c *pgxpool.Conn, query string) error {
104+
return c.QueryRow(ctx, query).Scan(&ok)
105+
}))
106+
if err != nil {
107+
return false, err
108+
}
109+
110+
span.AddEvent(`initialized`, trace.WithAttributes(attribute.Bool("value", ok)))
111+
// There were no rows when we looked, so report that. Don't update the bool,
112+
// because it's in the 'false' state or another goroutine has read from the
113+
// database and will want to swap in 'true'.
114+
if !ok {
115+
return false, nil
116+
}
117+
// If this fails, it means a concurrent goroutine already swapped. Any
118+
// subsequent calls will see the 'true' value.
119+
atomic.CompareAndSwapUint32(&s.initialized, 0, 1)
120+
return true, nil
121+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package postgres
2+
3+
import (
4+
"context"
5+
"crypto/md5"
6+
"errors"
7+
"fmt"
8+
"io"
9+
"sort"
10+
11+
"github.com/google/uuid"
12+
"github.com/jackc/pgx/v5"
13+
"github.com/jackc/pgx/v5/pgxpool"
14+
"github.com/quay/zlog"
15+
16+
"github.com/quay/claircore/libvuln/driver"
17+
)
18+
19+
// UpdateEnrichments creates a new UpdateOperation, inserts the provided
20+
// EnrichmentRecord(s), and ensures enrichments from previous updates are not
21+
// queried by clients.
22+
func (s *MatcherV1) UpdateEnrichments(ctx context.Context, name string, fp driver.Fingerprint, es []driver.EnrichmentRecord) (_ uuid.UUID, err error) {
23+
ctx, done := s.method(ctx, &err)
24+
defer done()
25+
26+
type digestPair struct {
27+
Kind string
28+
Digest []byte
29+
}
30+
hashes := make([]digestPair, len(es))
31+
func() {
32+
_, span := tracer.Start(ctx, `doHashes`)
33+
defer span.End()
34+
for i := range es {
35+
hashes[i].Kind, hashes[i].Digest = hashEnrichment(&es[i])
36+
}
37+
}()
38+
39+
var ref uuid.UUID
40+
err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `UpdateEnrichments`, func(ctx context.Context, tx pgx.Tx) (err error) {
41+
var id uint64
42+
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `create`, func(ctx context.Context, tx pgx.Tx, query string) error {
43+
if err := tx.QueryRow(ctx, query, name, string(fp)).Scan(&id, &ref); err != nil {
44+
return err
45+
}
46+
return nil
47+
}))
48+
if err != nil {
49+
return fmt.Errorf("unable to create enrichment update operation: %w", err)
50+
}
51+
zlog.Debug(ctx).
52+
Str("ref", ref.String()).
53+
Msg("update_operation created")
54+
55+
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `insert`, func(ctx context.Context, tx pgx.Tx, query string) error {
56+
var batch pgx.Batch
57+
for i := range es {
58+
batch.Queue(query, hashes[i].Kind, hashes[i].Digest, name, es[i].Tags, es[i].Enrichment)
59+
}
60+
res := tx.SendBatch(ctx, &batch)
61+
defer res.Close()
62+
for range es {
63+
if _, err := res.Exec(); err != nil {
64+
return err
65+
}
66+
}
67+
return nil
68+
}))
69+
if err != nil {
70+
return fmt.Errorf("unable to insert enrichments: %w", err)
71+
}
72+
73+
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `associate`, func(ctx context.Context, tx pgx.Tx, query string) error {
74+
var batch pgx.Batch
75+
for i := range es {
76+
batch.Queue(query, hashes[i].Kind, hashes[i].Digest, name, id)
77+
}
78+
res := tx.SendBatch(ctx, &batch)
79+
defer res.Close()
80+
for range es {
81+
if _, err := res.Exec(); err != nil {
82+
return err
83+
}
84+
}
85+
return nil
86+
}))
87+
if err != nil {
88+
return fmt.Errorf("unable to associate enrichments: %w", err)
89+
}
90+
91+
return nil
92+
}))
93+
if err != nil {
94+
return uuid.Nil, err
95+
}
96+
97+
zlog.Debug(ctx).
98+
Stringer("ref", ref).
99+
Int("inserted", len(es)).
100+
Msg("update_operation committed")
101+
102+
_ = s.pool.AcquireFunc(ctx, s.acquire(ctx, `refresh`, func(ctx context.Context, c *pgxpool.Conn, query string) error {
103+
if _, err := c.Exec(ctx, query); err != nil {
104+
// TODO(hank) Log?
105+
return fmt.Errorf("unable to refresh update operations view: %w", err)
106+
}
107+
return nil
108+
}))
109+
110+
return ref, nil
111+
}
112+
113+
func hashEnrichment(r *driver.EnrichmentRecord) (k string, d []byte) {
114+
h := md5.New()
115+
sort.Strings(r.Tags)
116+
for _, t := range r.Tags {
117+
io.WriteString(h, t)
118+
h.Write([]byte("\x00"))
119+
}
120+
h.Write(r.Enrichment)
121+
return "md5", h.Sum(nil)
122+
}
123+
124+
func (s *MatcherV1) GetEnrichment(ctx context.Context, name string, tags []string) (res []driver.EnrichmentRecord, err error) {
125+
ctx, done := s.method(ctx, &err)
126+
defer done()
127+
128+
res = make([]driver.EnrichmentRecord, 0, 8) // Guess at capacity.
129+
err = pgx.BeginTxFunc(ctx, s.pool, txRO, s.call(ctx, `get`, func(ctx context.Context, tx pgx.Tx, query string) (err error) {
130+
rows, err := tx.Query(ctx, query, name, tags)
131+
if err != nil {
132+
return err
133+
}
134+
for rows.Next() {
135+
i := len(res)
136+
res = append(res, driver.EnrichmentRecord{})
137+
r := &res[i]
138+
if err := rows.Scan(&r.Tags, &r.Enrichment); err != nil {
139+
return err
140+
}
141+
}
142+
return rows.Err()
143+
}))
144+
switch {
145+
case errors.Is(err, nil):
146+
case errors.Is(err, pgx.ErrNoRows):
147+
return nil, nil
148+
default:
149+
return nil, err
150+
}
151+
return res, nil
152+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package postgres
2+
3+
import (
4+
"context"
5+
6+
"github.com/google/uuid"
7+
"github.com/jackc/pgx/v5"
8+
"github.com/quay/zlog"
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/trace"
11+
)
12+
13+
// GCThrottle sets a limit for the number of deleted update operations (and
14+
// subsequent cascade deletes in the uo_vuln table) that can occur in a GC run.
15+
const GCThrottle = 50
16+
17+
// GC performs garbage collection on tables in the Matcher store.
18+
//
19+
// GC is split into two phases, first it will identify any update operations
20+
// which are older then the provided keep value and delete these.
21+
//
22+
// Next it will perform updater-based deletions of any vulnerabilities from the
23+
// vuln table which are not longer referenced by update operations.
24+
//
25+
// The GC is throttled to not overload the database with CASCADE deletes. If a
26+
// full GC is required, run this method until the returned value is 0.
27+
func (s *MatcherV1) GC(ctx context.Context, keep int) (_ int64, err error) {
28+
ctx, done := s.method(ctx, &err)
29+
defer done()
30+
var (
31+
total int64
32+
deleted int64
33+
)
34+
35+
err = pgx.BeginTxFunc(ctx, s.pool, txRW, s.tx(ctx, `GC`, func(ctx context.Context, tx pgx.Tx) error {
36+
var ops []uuid.UUID
37+
span := trace.SpanFromContext(ctx)
38+
39+
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `eligible`, func(ctx context.Context, tx pgx.Tx, query string) (err error) {
40+
rows, err := tx.Query(ctx, query, keep+1, keep)
41+
if err != nil {
42+
return err
43+
}
44+
tmp, err := pgx.CollectRows(rows, pgx.RowTo[[]uuid.UUID])
45+
if err != nil {
46+
return err
47+
}
48+
for _, t := range tmp {
49+
ops = append(ops, t...)
50+
}
51+
return nil
52+
}))
53+
if err != nil {
54+
return err
55+
}
56+
57+
total = int64(len(ops))
58+
switch {
59+
case len(ops) > GCThrottle:
60+
ops = ops[:GCThrottle]
61+
case len(ops) == 0:
62+
return nil
63+
}
64+
span.SetAttributes(attribute.Int64("total", total), attribute.Int("ops", len(ops)))
65+
66+
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `delete_ops`, func(ctx context.Context, tx pgx.Tx, query string) error {
67+
tag, err := s.pool.Exec(ctx, query, ops)
68+
deleted = tag.RowsAffected()
69+
return err
70+
}))
71+
if err != nil {
72+
return err
73+
}
74+
75+
var updaters []string
76+
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `distinct`, func(ctx context.Context, tx pgx.Tx, query string) (err error) {
77+
rows, err := tx.Query(ctx, query)
78+
if err != nil {
79+
return err
80+
}
81+
updaters, err = pgx.CollectRows(rows, pgx.RowTo[string])
82+
return err
83+
}))
84+
if err != nil {
85+
return err
86+
}
87+
88+
for _, u := range updaters {
89+
err = pgx.BeginFunc(ctx, tx, s.call(ctx, `orphaned`, func(ctx context.Context, tx pgx.Tx, query string) (err error) {
90+
ctx = zlog.ContextWithValues(ctx, "updater", u)
91+
trace.SpanFromContext(ctx).SetAttributes(attribute.String("updater", u))
92+
zlog.Debug(ctx).
93+
Msg("clean up start")
94+
zlog.Debug(ctx).Msg("clean up done")
95+
96+
_, err = tx.Exec(ctx, query, u)
97+
return err
98+
}))
99+
if err != nil {
100+
return err
101+
}
102+
}
103+
104+
return nil
105+
}))
106+
if err != nil {
107+
return 0, err
108+
}
109+
110+
return total - deleted, nil
111+
}

0 commit comments

Comments
 (0)