Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/api/v3/handler_pools_add_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func poolsAddAccount(backend backend.Backend) http.HandlerFunc {
return
}

// Forbid adding accounts to dynamic pools (query-based)
if p, err := backend.PoolsGet(ctx, id); err == nil && p != nil && p.Query != nil {
api.BadRequest(w, ErrValidation, models.ErrValidation)
return
}

span.SetAttributes(attribute.String("accountID", accountID(r)))
accountID, err := models.AccountIDFromString(accountID(r))
if err != nil {
Expand Down
39 changes: 28 additions & 11 deletions internal/api/v3/handler_pools_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (

type CreatePoolRequest struct {
Name string `json:"name" validate:"required"`
AccountIDs []string `json:"accountIDs" validate:"min=1,dive,accountID"`
AccountIDs []string `json:"accountIDs" validate:"omitempty,min=1,dive,accountID"`
Query *string `json:"query"`
}

func poolsCreate(backend backend.Backend, validator *validation.Validator) http.HandlerFunc {
Expand All @@ -42,24 +43,37 @@ func poolsCreate(backend backend.Backend, validator *validation.Validator) http.
return
}

if CreatePoolRequest.Query == nil && len(CreatePoolRequest.AccountIDs) == 0 {
api.BadRequest(w, ErrValidation, fmt.Errorf("either accountIDs or query must be provided"))
return
}
if CreatePoolRequest.Query != nil && len(CreatePoolRequest.AccountIDs) > 0 {
api.BadRequest(w, ErrValidation, fmt.Errorf("accountIDs and query are mutually exclusive"))
return
}

pool := models.Pool{
ID: uuid.New(),
Name: CreatePoolRequest.Name,
CreatedAt: time.Now().UTC(),
}

accounts := make([]models.AccountID, len(CreatePoolRequest.AccountIDs))
for i, accountID := range CreatePoolRequest.AccountIDs {
aID, err := models.AccountIDFromString(accountID)
if err != nil {
otel.RecordError(span, err)
api.BadRequest(w, ErrValidation, err)
return
}
if CreatePoolRequest.Query != nil {
pool.Query = CreatePoolRequest.Query
} else {
accounts := make([]models.AccountID, len(CreatePoolRequest.AccountIDs))
for i, accountID := range CreatePoolRequest.AccountIDs {
aID, err := models.AccountIDFromString(accountID)
if err != nil {
otel.RecordError(span, err)
api.BadRequest(w, ErrValidation, err)
return
}

accounts[i] = aID
accounts[i] = aID
}
pool.PoolAccounts = accounts
}
pool.PoolAccounts = accounts

err = backend.PoolsCreate(ctx, pool)
if err != nil {
Expand All @@ -77,4 +91,7 @@ func populateSpanFromCreatePoolRequest(span trace.Span, req CreatePoolRequest) {
for i, acc := range req.AccountIDs {
span.SetAttributes(attribute.String(fmt.Sprintf("accountIDs[%d]", i), acc))
}
if req.Query != nil {
span.SetAttributes(attribute.String("query", *req.Query))
}
}
6 changes: 6 additions & 0 deletions internal/api/v3/handler_pools_remove_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func poolsRemoveAccount(backend backend.Backend) http.HandlerFunc {
return
}

// Forbid removing accounts from dynamic pools (query-based)
if p, err := backend.PoolsGet(ctx, id); err == nil && p != nil && p.Query != nil {
api.BadRequest(w, ErrValidation, models.ErrValidation)
return
}

span.SetAttributes(attribute.String("accountID", accountID(r)))
accountID, err := models.AccountIDFromString(accountID(r))
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions internal/models/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Pool struct {
CreatedAt time.Time `json:"createdAt"`

PoolAccounts []AccountID `json:"poolAccounts"`
Query *string `json:"query,omitempty"`
}

func (p Pool) MarshalJSON() ([]byte, error) {
Expand All @@ -21,6 +22,7 @@ func (p Pool) MarshalJSON() ([]byte, error) {
Name string `json:"name"`
CreatedAt time.Time `json:"createdAt"`
PoolAccounts []string `json:"poolAccounts"`
Query *string `json:"query,omitempty"`
}

aux.ID = p.ID.String()
Expand All @@ -31,6 +33,7 @@ func (p Pool) MarshalJSON() ([]byte, error) {
for i := range p.PoolAccounts {
aux.PoolAccounts[i] = p.PoolAccounts[i].String()
}
aux.Query = p.Query

return json.Marshal(aux)
}
Expand All @@ -41,6 +44,7 @@ func (p *Pool) UnmarshalJSON(data []byte) error {
Name string `json:"name"`
CreatedAt time.Time `json:"createdAt"`
PoolAccounts []string `json:"poolAccounts"`
Query *string `json:"query"`
}

if err := json.Unmarshal(data, &aux); err != nil {
Expand All @@ -65,6 +69,7 @@ func (p *Pool) UnmarshalJSON(data []byte) error {
p.Name = aux.Name
p.CreatedAt = aux.CreatedAt
p.PoolAccounts = poolAccounts
p.Query = aux.Query

return nil
}
Expand All @@ -77,9 +82,11 @@ func (p *Pool) IdempotencyKey() string {
var ik = struct {
ID string
RelatedAccounts []string
Query *string
}{
ID: p.ID.String(),
RelatedAccounts: relatedAccounts,
Query: p.Query,
}
return IdempotencyKey(ik)
}
6 changes: 6 additions & 0 deletions internal/storage/migrations/23-pools-query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ALTER TABLE pools
ADD COLUMN IF NOT EXISTS query jsonb;

-- Optional index to speed up querying dynamic pools by query content if needed later
-- CREATE INDEX IF NOT EXISTS pools_query_gin ON pools USING GIN (query);

13 changes: 13 additions & 0 deletions internal/storage/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var psuConnectionPaymentsAccounts string

//go:embed 22-rename-bank-bridges-open-banking.sql
var renameBankBridgesOpenBanking string
//go:embed 23-pools-query.sql
var poolsQuery string

func registerMigrations(logger logging.Logger, migrator *migrations.Migrator, encryptionKey string) {
migrator.RegisterMigrations(
Expand Down Expand Up @@ -322,6 +324,17 @@ func registerMigrations(logger logging.Logger, migrator *migrations.Migrator, en
})
},
},
migrations.Migration{
Name: "pools query column",
Up: func(ctx context.Context, db bun.IDB) error {
return db.RunInTx(ctx, &sql.TxOptions{}, func(ctx context.Context, tx bun.Tx) error {
logger.Info("running pools query column migration...")
_, err := tx.ExecContext(ctx, poolsQuery)
logger.WithField("error", err).Info("finished running pools query column migration")
return err
})
},
},
)
}

Expand Down
43 changes: 43 additions & 0 deletions internal/storage/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"context"
"fmt"
"encoding/json"

"github.com/formancehq/go-libs/v3/bun/bunpaginate"
"github.com/formancehq/go-libs/v3/pointer"
Expand All @@ -21,6 +22,8 @@ type pool struct {
Name string `bun:"name,type:text,notnull"`
CreatedAt time.Time `bun:"created_at,type:timestamp without time zone,notnull"`

Query json.RawMessage `bun:"query,type:jsonb,nullzero"`

PoolAccounts []*poolAccounts `bun:"rel:has-many,join:id=pool_id"`
}

Expand Down Expand Up @@ -86,6 +89,36 @@ func (s *store) PoolsGet(ctx context.Context, id uuid.UUID) (*models.Pool, error
return nil, e("get pool: %w", err)
}

// If dynamic pool (query set), resolve matching accounts at read time
if len(pool.Query) > 0 {
qb, err := query.ParseJSON(string(pool.Query))
if err != nil {
return nil, e("parse pool query: %w", err)
}
where, args, err := s.accountsQueryContext(qb)
if err != nil {
return nil, e("build accounts query for pool: %w", err)
}

var accs []account
q := s.db.NewSelect().Model(&accs)
if where != "" {
q = q.Where(where, args...)
}
if err := q.Scan(ctx); err != nil {
return nil, e("scan dynamic pool accounts: %w", err)
}
pool.PoolAccounts = make([]*poolAccounts, 0, len(accs))
for i := range accs {
acc := accs[i]
pool.PoolAccounts = append(pool.PoolAccounts, &poolAccounts{
PoolID: pool.ID,
AccountID: acc.ID,
ConnectorID: acc.ConnectorID,
})
}
}

return pointer.For(toPoolModel(pool)), nil
}

Expand Down Expand Up @@ -272,6 +305,9 @@ func fromPoolModel(from models.Pool) (pool, []poolAccounts) {
Name: from.Name,
CreatedAt: time.New(from.CreatedAt),
}
if from.Query != nil {
p.Query = json.RawMessage(*from.Query)
}

var accounts []poolAccounts
for i := range from.PoolAccounts {
Expand All @@ -291,10 +327,17 @@ func toPoolModel(from pool) models.Pool {
accounts = append(accounts, from.PoolAccounts[i].AccountID)
}

var queryString *string
if len(from.Query) > 0 {
qs := string(from.Query)
queryString = &qs
}

return models.Pool{
ID: from.ID,
Name: from.Name,
CreatedAt: from.CreatedAt.Time,
PoolAccounts: accounts,
Query: queryString,
}
}
7 changes: 6 additions & 1 deletion openapi/v3/v3-schemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1553,14 +1553,16 @@ components:
type: object
required:
- name
- accountIDs
properties:
name:
type: string
accountIDs:
type: array
items:
type: string
query:
type: string
description: JSON query builder to dynamically include accounts. Mutually exclusive with accountIDs.

V3CreatePoolResponse:
type: object
Expand Down Expand Up @@ -1637,6 +1639,9 @@ components:
type: array
items:
$ref: '#/components/schemas/V3AccountID'
query:
type: string
description: JSON query defining dynamic membership for the pool.

V3PoolBalances:
type: array
Expand Down
Loading