From 6f40deb50258e207d9f36d699585db6cf28177f8 Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Mon, 24 Feb 2025 19:41:33 -0500 Subject: [PATCH] Track unsettled usage --- pkg/db/queries.sql | 19 ++++ pkg/db/queries/models.go | 7 ++ pkg/db/queries/queries.sql.go | 51 +++++++++ pkg/db/tx.go | 36 +++++++ pkg/db/unsettledUsage_test.go | 100 ++++++++++++++++++ pkg/migrations/00007_unsettled-usage.down.sql | 4 + pkg/migrations/00007_unsettled-usage.up.sql | 9 ++ pkg/testutils/random.go | 4 + pkg/utils/time.go | 13 +++ 9 files changed, 243 insertions(+) create mode 100644 pkg/db/unsettledUsage_test.go create mode 100644 pkg/migrations/00007_unsettled-usage.down.sql create mode 100644 pkg/migrations/00007_unsettled-usage.up.sql create mode 100644 pkg/utils/time.go diff --git a/pkg/db/queries.sql b/pkg/db/queries.sql index 030aacd5..f7880640 100644 --- a/pkg/db/queries.sql +++ b/pkg/db/queries.sql @@ -180,3 +180,22 @@ ON CONFLICT (address) RETURNING id; +-- name: IncrementUnsettledUsage :exec +INSERT INTO unsettled_usage(payer_id, originator_id, minutes_since_epoch, spend_picodollars) + VALUES (@payer_id, @originator_id, @minutes_since_epoch, @spend_picodollars) +ON CONFLICT (payer_id, originator_id, minutes_since_epoch) + DO UPDATE SET + spend_picodollars = unsettled_usage.spend_picodollars + @spend_picodollars; + +-- name: GetPayerUnsettledUsage :one +SELECT + SUM(spend_picodollars) AS total_spend_picodollars +FROM + unsettled_usage +WHERE + payer_id = @payer_id + AND (@minutes_since_epoch_gt::BIGINT = 0 + OR minutes_since_epoch > @minutes_since_epoch_gt::BIGINT) + AND (@minutes_since_epoch_lt::BIGINT = 0 + OR minutes_since_epoch < @minutes_since_epoch_lt::BIGINT); + diff --git a/pkg/db/queries/models.go b/pkg/db/queries/models.go index bfb2c9fc..22ff23e3 100644 --- a/pkg/db/queries/models.go +++ b/pkg/db/queries/models.go @@ -56,3 +56,10 @@ type StagedOriginatorEnvelope struct { Topic []byte PayerEnvelope []byte } + +type UnsettledUsage struct { + PayerID int32 + OriginatorID int32 + MinutesSinceEpoch int32 + SpendPicodollars int64 +} diff --git a/pkg/db/queries/queries.sql.go b/pkg/db/queries/queries.sql.go index 6554d65d..de1a44a5 100644 --- a/pkg/db/queries/queries.sql.go +++ b/pkg/db/queries/queries.sql.go @@ -220,6 +220,57 @@ func (q *Queries) GetLatestSequenceId(ctx context.Context, originatorNodeID int3 return originator_sequence_id, err } +const getPayerUnsettledUsage = `-- name: GetPayerUnsettledUsage :one +SELECT + SUM(spend_picodollars) AS total_spend_picodollars +FROM + unsettled_usage +WHERE + payer_id = $1 + AND ($2::BIGINT = 0 + OR minutes_since_epoch > $2::BIGINT) + AND ($3::BIGINT = 0 + OR minutes_since_epoch < $3::BIGINT) +` + +type GetPayerUnsettledUsageParams struct { + PayerID int32 + MinutesSinceEpochGt int64 + MinutesSinceEpochLt int64 +} + +func (q *Queries) GetPayerUnsettledUsage(ctx context.Context, arg GetPayerUnsettledUsageParams) (int64, error) { + row := q.db.QueryRowContext(ctx, getPayerUnsettledUsage, arg.PayerID, arg.MinutesSinceEpochGt, arg.MinutesSinceEpochLt) + var total_spend_picodollars int64 + err := row.Scan(&total_spend_picodollars) + return total_spend_picodollars, err +} + +const incrementUnsettledUsage = `-- name: IncrementUnsettledUsage :exec +INSERT INTO unsettled_usage(payer_id, originator_id, minutes_since_epoch, spend_picodollars) + VALUES ($1, $2, $3, $4) +ON CONFLICT (payer_id, originator_id, minutes_since_epoch) + DO UPDATE SET + spend_picodollars = unsettled_usage.spend_picodollars + $4 +` + +type IncrementUnsettledUsageParams struct { + PayerID int32 + OriginatorID int32 + MinutesSinceEpoch int32 + SpendPicodollars int64 +} + +func (q *Queries) IncrementUnsettledUsage(ctx context.Context, arg IncrementUnsettledUsageParams) error { + _, err := q.db.ExecContext(ctx, incrementUnsettledUsage, + arg.PayerID, + arg.OriginatorID, + arg.MinutesSinceEpoch, + arg.SpendPicodollars, + ) + return err +} + const insertAddressLog = `-- name: InsertAddressLog :execrows INSERT INTO address_log(address, inbox_id, association_sequence_id, revocation_sequence_id) VALUES ($1, decode($2, 'hex'), $3, NULL) diff --git a/pkg/db/tx.go b/pkg/db/tx.go index 830b6784..12029d83 100644 --- a/pkg/db/tx.go +++ b/pkg/db/tx.go @@ -34,3 +34,39 @@ func RunInTx( done = true return tx.Commit() } + +func RunInTxWithResult[T any]( + ctx context.Context, + db *sql.DB, + opts *sql.TxOptions, + fn func(ctx context.Context, txQueries *queries.Queries) (T, error), +) (T, error) { + querier := queries.New(db) + tx, err := db.BeginTx(ctx, opts) + if err != nil { + var zero T + return zero, err + } + + var done bool + + defer func() { + if !done { + _ = tx.Rollback() + } + }() + + result, err := fn(ctx, querier.WithTx(tx)) + if err != nil { + var zero T + return zero, err + } + + done = true + if err := tx.Commit(); err != nil { + var zero T + return zero, err + } + + return result, nil +} diff --git a/pkg/db/unsettledUsage_test.go b/pkg/db/unsettledUsage_test.go new file mode 100644 index 00000000..7a29f636 --- /dev/null +++ b/pkg/db/unsettledUsage_test.go @@ -0,0 +1,100 @@ +package db + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/xmtp/xmtpd/pkg/db/queries" + "github.com/xmtp/xmtpd/pkg/testutils" + "github.com/xmtp/xmtpd/pkg/utils" +) + +func TestIncrementUnsettledUsage(t *testing.T) { + ctx := context.Background() + db, _, cleanup := testutils.NewDB(t, ctx) + defer cleanup() + + querier := queries.New(db) + payerId := testutils.RandomInt32() + originatorId := testutils.RandomInt32() + minutesSinceEpoch := utils.MinutesSinceEpochNow() + + require.NoError(t, querier.IncrementUnsettledUsage(ctx, queries.IncrementUnsettledUsageParams{ + PayerID: payerId, + OriginatorID: originatorId, + MinutesSinceEpoch: minutesSinceEpoch, + SpendPicodollars: 100, + })) + + unsettledUsage, err := querier.GetPayerUnsettledUsage( + ctx, + queries.GetPayerUnsettledUsageParams{ + PayerID: payerId, + }, + ) + require.NoError(t, err) + require.Equal(t, unsettledUsage, int64(100)) + + require.NoError(t, querier.IncrementUnsettledUsage(ctx, queries.IncrementUnsettledUsageParams{ + PayerID: payerId, + OriginatorID: originatorId, + MinutesSinceEpoch: minutesSinceEpoch, + SpendPicodollars: 100, + })) + + unsettledUsage, err = querier.GetPayerUnsettledUsage( + ctx, + queries.GetPayerUnsettledUsageParams{ + PayerID: payerId, + }, + ) + require.NoError(t, err) + require.Equal(t, unsettledUsage, int64(200)) +} + +func TestGetUnsettledUsage(t *testing.T) { + ctx := context.Background() + db, _, cleanup := testutils.NewDB(t, ctx) + defer cleanup() + + querier := queries.New(db) + payerId := testutils.RandomInt32() + originatorId := testutils.RandomInt32() + + addUsage := func(minutesSinceEpoch int32, spendPicodollars int64) { + require.NoError( + t, + querier.IncrementUnsettledUsage(ctx, queries.IncrementUnsettledUsageParams{ + PayerID: payerId, + OriginatorID: originatorId, + MinutesSinceEpoch: minutesSinceEpoch, + SpendPicodollars: spendPicodollars, + }), + ) + } + + addUsage(1, 100) + addUsage(2, 200) + addUsage(3, 300) + + unsettledUsage, err := querier.GetPayerUnsettledUsage( + ctx, + queries.GetPayerUnsettledUsageParams{ + PayerID: payerId, + MinutesSinceEpochGt: 2, + }, + ) + require.NoError(t, err) + require.Equal(t, unsettledUsage, int64(300)) + + unsettledUsage, err = querier.GetPayerUnsettledUsage( + ctx, + queries.GetPayerUnsettledUsageParams{ + PayerID: payerId, + MinutesSinceEpochGt: 1, + }, + ) + require.NoError(t, err) + require.Equal(t, unsettledUsage, int64(500)) +} diff --git a/pkg/migrations/00007_unsettled-usage.down.sql b/pkg/migrations/00007_unsettled-usage.down.sql new file mode 100644 index 00000000..d9815448 --- /dev/null +++ b/pkg/migrations/00007_unsettled-usage.down.sql @@ -0,0 +1,4 @@ +DROP TABLE IF EXISTS unsettled_usage; + +DROP INDEX IF EXISTS idx_unsettled_usage_payer_id; + diff --git a/pkg/migrations/00007_unsettled-usage.up.sql b/pkg/migrations/00007_unsettled-usage.up.sql new file mode 100644 index 00000000..b30c682f --- /dev/null +++ b/pkg/migrations/00007_unsettled-usage.up.sql @@ -0,0 +1,9 @@ +CREATE TABLE unsettled_usage( + payer_id INTEGER NOT NULL, + originator_id INTEGER NOT NULL, + minutes_since_epoch INTEGER NOT NULL, + spend_picodollars BIGINT NOT NULL, + PRIMARY KEY (payer_id, originator_id, minutes_since_epoch) +); + +CREATE INDEX idx_unsettled_usage_payer_id ON unsettled_usage(payer_id); \ No newline at end of file diff --git a/pkg/testutils/random.go b/pkg/testutils/random.go index e12dbbb8..cd48746c 100644 --- a/pkg/testutils/random.go +++ b/pkg/testutils/random.go @@ -67,3 +67,7 @@ func RandomBlockHash() common.Hash { bytes := RandomBytes(32) return common.BytesToHash(bytes) } + +func RandomInt32() int32 { + return rand.Int31() +} diff --git a/pkg/utils/time.go b/pkg/utils/time.go new file mode 100644 index 00000000..688c7221 --- /dev/null +++ b/pkg/utils/time.go @@ -0,0 +1,13 @@ +package utils + +import "time" + +func MinutesSinceEpoch(timestamp time.Time) int32 { + durationSinceEpoch := timestamp.Sub(time.Unix(0, 0)) + + return int32(durationSinceEpoch.Minutes()) +} + +func MinutesSinceEpochNow() int32 { + return MinutesSinceEpoch(time.Now()) +}