Skip to content

Commit

Permalink
Add fees to UnsignedOriginatorEnvelope
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Feb 25, 2025
1 parent 1ec1c15 commit 9a2fafe
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 33 deletions.
2 changes: 1 addition & 1 deletion dev/gen-protos
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/usr/bin/env bash

rm -rf pkg/proto/**/*.pb.go pkg/proto/**/*.pb.gw.go pkg/proto/**/*.swagger.json
if ! go tool -modfile=tools/go.mod buf generate https://github.com/xmtp/proto.git#subdir=proto,branch=main; then
if ! go tool -modfile=tools/go.mod buf generate https://github.com/xmtp/proto.git#subdir=proto,branch=nm/add-fees; then
echo "Failed to generate protobuf definitions"
exit 1
fi
55 changes: 47 additions & 8 deletions pkg/api/message/publishWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import (
"sync/atomic"
"time"

"github.com/xmtp/xmtpd/pkg/constants"
"github.com/xmtp/xmtpd/pkg/currency"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/envelopes"
"github.com/xmtp/xmtpd/pkg/fees"
"github.com/xmtp/xmtpd/pkg/registrant"
"go.uber.org/zap"
)
Expand All @@ -22,13 +25,15 @@ type publishWorker struct {
store *sql.DB
subscription db.DBSubscription[queries.StagedOriginatorEnvelope, int64]
lastProcessed atomic.Int64
feeCalculator fees.IFeeCalculator
}

func startPublishWorker(
ctx context.Context,
log *zap.Logger,
reg *registrant.Registrant,
store *sql.DB,
feeCalculator fees.IFeeCalculator,
) (*publishWorker, error) {
log = log.Named("publishWorker")
q := queries.New(store)
Expand Down Expand Up @@ -62,13 +67,14 @@ func startPublishWorker(
}

worker := &publishWorker{
ctx: ctx,
log: log,
notifier: notifier,
subscription: *subscription,
listener: listener,
registrant: reg,
store: store,
ctx: ctx,
log: log,
notifier: notifier,
subscription: *subscription,
listener: listener,
registrant: reg,
store: store,
feeCalculator: feeCalculator,
}
go worker.start()

Expand Down Expand Up @@ -102,7 +108,13 @@ func (p *publishWorker) start() {

func (p *publishWorker) publishStagedEnvelope(stagedEnv queries.StagedOriginatorEnvelope) bool {
logger := p.log.With(zap.Int64("sequenceID", stagedEnv.ID))
originatorEnv, err := p.registrant.SignStagedEnvelope(stagedEnv)
baseFee, congestionFee, err := p.calculateFees(&stagedEnv)
if err != nil {
logger.Error("Failed to calculate fees", zap.Error(err))
return false
}

originatorEnv, err := p.registrant.SignStagedEnvelope(stagedEnv, baseFee, congestionFee)
if err != nil {
logger.Error(
"Failed to sign staged envelope",
Expand Down Expand Up @@ -144,6 +156,7 @@ func (p *publishWorker) publishStagedEnvelope(stagedEnv queries.StagedOriginator
Topic: stagedEnv.Topic,
OriginatorEnvelope: originatorBytes,
PayerID: db.NullInt32(payerId),
GatewayTime: stagedEnv.OriginatorTime,
},
)
if p.ctx.Err() != nil {
Expand Down Expand Up @@ -171,3 +184,29 @@ func (p *publishWorker) publishStagedEnvelope(stagedEnv queries.StagedOriginator

return true
}

func (p *publishWorker) calculateFees(
stagedEnv *queries.StagedOriginatorEnvelope,
) (currency.PicoDollar, currency.PicoDollar, error) {
baseFee, err := p.feeCalculator.CalculateBaseFee(
stagedEnv.OriginatorTime,
int64(len(stagedEnv.PayerEnvelope)),
constants.DEFAULT_STORAGE_DURATION_DAYS,
)
if err != nil {
return 0, 0, err
}

// TODO:nm: Set this to the actual congestion fee
// For now we are setting congestion to 0
congestionFee, err := p.feeCalculator.CalculateCongestionFee(
stagedEnv.OriginatorTime,
0,
)

if err != nil {
return 0, 0, err
}

return baseFee, congestionFee, nil
}
47 changes: 47 additions & 0 deletions pkg/api/message/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtpd/pkg/currency"
"github.com/xmtp/xmtpd/pkg/db/queries"
envelopeUtils "github.com/xmtp/xmtpd/pkg/envelopes"
"github.com/xmtp/xmtpd/pkg/mlsvalidate"
apiv1 "github.com/xmtp/xmtpd/pkg/proto/mls/api/v1"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
Expand Down Expand Up @@ -294,3 +296,48 @@ func TestPublishEnvelopeOriginatorUnknown(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "DependsOn has not been seen by this node")
}

func TestPublishEnvelopeFees(t *testing.T) {
api, db, _, cleanup := apiTestUtils.NewTestReplicationAPIClient(t)
defer cleanup()

payerEnvelope := envelopeTestUtils.CreatePayerEnvelope(
t,
envelopeTestUtils.DefaultClientEnvelopeNodeId,
)

resp, err := api.PublishPayerEnvelopes(
context.Background(),
&message_api.PublishPayerEnvelopesRequest{
PayerEnvelopes: []*envelopes.PayerEnvelope{payerEnvelope},
},
)
require.NoError(t, err)
require.NotNil(t, resp)

returnedEnv, err := envelopeUtils.NewOriginatorEnvelope(resp.GetOriginatorEnvelopes()[0])
require.NoError(t, err)
// BaseFee will always be > 0
require.Greater(t, returnedEnv.UnsignedOriginatorEnvelope.BaseFee(), currency.PicoDollar(0))
// CongestionFee will be 0 for now.
// TODO:nm: Set this to the actual congestion fee
require.Equal(t, returnedEnv.UnsignedOriginatorEnvelope.CongestionFee(), currency.PicoDollar(0))

envs, err := queries.New(db).
SelectGatewayEnvelopes(context.Background(), queries.SelectGatewayEnvelopesParams{})
require.NoError(t, err)
require.Equal(t, len(envs), 1)

originatorEnv, err := envelopeUtils.NewOriginatorEnvelopeFromBytes(envs[0].OriginatorEnvelope)
require.NoError(t, err)
require.Equal(
t,
originatorEnv.UnsignedOriginatorEnvelope.BaseFee(),
returnedEnv.UnsignedOriginatorEnvelope.BaseFee(),
)
require.Equal(
t,
originatorEnv.UnsignedOriginatorEnvelope.CongestionFee(),
returnedEnv.UnsignedOriginatorEnvelope.CongestionFee(),
)
}
17 changes: 14 additions & 3 deletions pkg/api/message/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"database/sql"
"fmt"
"github.com/xmtp/xmtpd/pkg/api/metadata"
"time"

"github.com/xmtp/xmtpd/pkg/api/metadata"
"github.com/xmtp/xmtpd/pkg/fees"

"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
"github.com/xmtp/xmtpd/pkg/envelopes"
Expand Down Expand Up @@ -42,6 +44,7 @@ type Service struct {
subscribeWorker *subscribeWorker
validationService mlsvalidate.MLSValidationService
cu metadata.CursorUpdater
feeCalculator fees.IFeeCalculator
}

func NewReplicationApiService(
Expand All @@ -51,8 +54,10 @@ func NewReplicationApiService(
store *sql.DB,
validationService mlsvalidate.MLSValidationService,
updater metadata.CursorUpdater,
rateFetcher fees.IRatesFetcher,
) (*Service, error) {
publishWorker, err := startPublishWorker(ctx, log, registrant, store)
feeCalculator := fees.NewFeeCalculator(rateFetcher)
publishWorker, err := startPublishWorker(ctx, log, registrant, store, feeCalculator)
if err != nil {
return nil, err
}
Expand All @@ -70,6 +75,7 @@ func NewReplicationApiService(
subscribeWorker: subscribeWorker,
validationService: validationService,
cu: updater,
feeCalculator: feeCalculator,
}, nil
}

Expand Down Expand Up @@ -349,7 +355,12 @@ func (s *Service) PublishPayerEnvelopes(
}
s.publishWorker.notifyStagedPublish()

originatorEnv, err := s.registrant.SignStagedEnvelope(stagedEnv)
baseFee, congestionFee, err := s.publishWorker.calculateFees(&stagedEnv)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not calculate fees: %v", err)
}

originatorEnv, err := s.registrant.SignStagedEnvelope(stagedEnv, baseFee, congestionFee)
if err != nil {
return nil, status.Errorf(codes.Internal, "could not sign envelope: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const (
ORIGINATOR_DOMAIN_SEPARATION_LABEL = "originator|"
NODE_AUTHORIZATION_HEADER_NAME = "node-authorization"
MAX_BLOCKCHAIN_ORIGINATOR_ID = 100
DEFAULT_STORAGE_DURATION_DAYS = 60
)

type VerifiedNodeRequestCtxKey struct{}
4 changes: 2 additions & 2 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ WHERE
singleton_id = 1;

-- name: InsertGatewayEnvelope :execrows
INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope, payer_id)
VALUES (@originator_node_id, @originator_sequence_id, @topic, @originator_envelope, @payer_id)
INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope, payer_id, gateway_time)
VALUES (@originator_node_id, @originator_sequence_id, @topic, @originator_envelope, @payer_id, COALESCE(@gateway_time, NOW()))
ON CONFLICT
DO NOTHING;

Expand Down
6 changes: 4 additions & 2 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions pkg/envelopes/unsignedOriginator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package envelopes
import (
"errors"

"github.com/xmtp/xmtpd/pkg/currency"
envelopesProto "github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/topic"
"github.com/xmtp/xmtpd/pkg/utils"
Expand Down Expand Up @@ -45,6 +46,16 @@ func (u *UnsignedOriginatorEnvelope) OriginatorNs() int64 {
return u.proto.OriginatorNs
}

func (u *UnsignedOriginatorEnvelope) BaseFee() currency.PicoDollar {
// Skip nil check because it is in the constructor
return currency.PicoDollar(u.proto.BaseFeePicodollars)
}

func (u *UnsignedOriginatorEnvelope) CongestionFee() currency.PicoDollar {
// Skip nil check because it is in the constructor
return currency.PicoDollar(u.proto.CongestionFeePicodollars)
}

func NewUnsignedOriginatorEnvelopeFromBytes(bytes []byte) (*UnsignedOriginatorEnvelope, error) {
message, err := utils.UnmarshalUnsignedEnvelope(bytes)
if err != nil {
Expand Down
41 changes: 32 additions & 9 deletions pkg/proto/xmtpv4/envelopes/envelopes.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9a2fafe

Please sign in to comment.