Skip to content

Commit

Permalink
Calculate fees independently in sync
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Feb 26, 2025
1 parent 47365e0 commit 23a6769
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func NewReplicationServer(
s.nodeRegistry,
s.registrant,
writerDB,
fees.NewFeeCalculator(getRatesFetcher()),
)
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion pkg/sync/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"

"github.com/xmtp/xmtpd/pkg/fees"
"github.com/xmtp/xmtpd/pkg/registrant"
"github.com/xmtp/xmtpd/pkg/registry"
"go.uber.org/zap"
Expand All @@ -23,8 +24,9 @@ func NewSyncServer(
nodeRegistry registry.NodeRegistry,
registrant *registrant.Registrant,
store *sql.DB,
feeCalculator fees.IFeeCalculator,
) (*SyncServer, error) {
worker, err := startSyncWorker(ctx, log, nodeRegistry, registrant, store)
worker, err := startSyncWorker(ctx, log, nodeRegistry, registrant, store, feeCalculator)
if err != nil {
return nil, err
}
Expand Down
62 changes: 54 additions & 8 deletions pkg/sync/syncWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import (
"sync"
"time"

"github.com/xmtp/xmtpd/pkg/constants"
"github.com/xmtp/xmtpd/pkg/currency"
"github.com/xmtp/xmtpd/pkg/db"
"github.com/xmtp/xmtpd/pkg/db/queries"
envUtils "github.com/xmtp/xmtpd/pkg/envelopes"
"github.com/xmtp/xmtpd/pkg/fees"
clientInterceptors "github.com/xmtp/xmtpd/pkg/interceptors/client"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/envelopes"
"github.com/xmtp/xmtpd/pkg/proto/xmtpv4/message_api"
Expand All @@ -32,6 +35,7 @@ type syncWorker struct {
subscriptions map[uint32]struct{}
subscriptionsMutex sync.RWMutex
cancel context.CancelFunc
feeCalculator fees.IFeeCalculator
}

type originatorStream struct {
Expand All @@ -54,6 +58,7 @@ func startSyncWorker(
nodeRegistry registry.NodeRegistry,
registrant *registrant.Registrant,
store *sql.DB,
feeCalculator fees.IFeeCalculator,
) (*syncWorker, error) {

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -64,6 +69,7 @@ func startSyncWorker(
nodeRegistry: nodeRegistry,
registrant: registrant,
store: store,
feeCalculator: feeCalculator,
wg: sync.WaitGroup{},
subscriptions: make(map[uint32]struct{}),
cancel: cancel,
Expand Down Expand Up @@ -403,6 +409,7 @@ func (s *syncWorker) validateAndInsertEnvelope(
return
}

// TODO:(nm) Handle fetching envelopes from other nodes
if env.OriginatorNodeID() != stream.nodeID {
s.log.Error("Received envelope from wrong node", zap.Any("nodeID", env.OriginatorNodeID()))
return
Expand All @@ -423,12 +430,34 @@ func (s *syncWorker) validateAndInsertEnvelope(
stream.lastEnvelope = env
}

// Calculate the fees independently to verify the originator's calculation
ourFeeCalculation, err := s.calculateFees(env)
if err != nil {
s.log.Error("Failed to calculate fees", zap.Error(err))
return
}
originatorsFeeCalculation := currency.PicoDollar(
env.UnsignedOriginatorEnvelope.BaseFee(),
) + currency.PicoDollar(
env.UnsignedOriginatorEnvelope.CongestionFee(),
)
if ourFeeCalculation != originatorsFeeCalculation {
s.log.Error(
"Fee calculation mismatch",
zap.Any("ourFee", ourFeeCalculation),
zap.Any("originatorsFee", originatorsFeeCalculation),
)
}

// TODO Validation logic - share code with API service and publish worker
// Signatures, topic type, etc
s.insertEnvelope(env)
s.insertEnvelope(env, ourFeeCalculation)
}

func (s *syncWorker) insertEnvelope(env *envUtils.OriginatorEnvelope) {
func (s *syncWorker) insertEnvelope(
env *envUtils.OriginatorEnvelope,
spendPicodollars currency.PicoDollar,
) {
s.log.Debug("Replication server received envelope", zap.Any("envelope", env))
originatorBytes, err := env.Bytes()
if err != nil {
Expand Down Expand Up @@ -459,12 +488,7 @@ func (s *syncWorker) insertEnvelope(env *envUtils.OriginatorEnvelope) {
PayerID: payerId,
OriginatorID: originatorID,
MinutesSinceEpoch: utils.MinutesSinceEpoch(originatorTime),
// TODO:(nm) Independently calculate fees
SpendPicodollars: int64(
env.UnsignedOriginatorEnvelope.BaseFee(),
) + int64(
env.UnsignedOriginatorEnvelope.CongestionFee(),
),
SpendPicodollars: int64(spendPicodollars),
},
)
if err != nil {
Expand All @@ -477,6 +501,28 @@ func (s *syncWorker) insertEnvelope(env *envUtils.OriginatorEnvelope) {
}
}

func (s *syncWorker) calculateFees(env *envUtils.OriginatorEnvelope) (currency.PicoDollar, error) {
payerEnvelopeLength := len(env.UnsignedOriginatorEnvelope.PayerEnvelopeBytes())
messageTime := utils.NsToDate(env.OriginatorNs())

baseFee, err := s.feeCalculator.CalculateBaseFee(
messageTime,
int64(payerEnvelopeLength),
constants.DEFAULT_STORAGE_DURATION_DAYS,
)
if err != nil {
return 0, err
}

// TODO:(nm) Calculate real rate of congestion
congestionFee, err := s.feeCalculator.CalculateCongestionFee(messageTime, 0)
if err != nil {
return 0, err
}

return baseFee + congestionFee, nil
}

func (s *syncWorker) getPayerID(env *envUtils.OriginatorEnvelope) (int32, error) {
payerAddress, err := env.UnsignedOriginatorEnvelope.PayerEnvelope.RecoverSigner()
if err != nil {
Expand Down

0 comments on commit 23a6769

Please sign in to comment.