Skip to content

Commit

Permalink
order transactions announcements in the order they are received
Browse files Browse the repository at this point in the history
  • Loading branch information
charithabandi committed Mar 3, 2025
1 parent e5f20eb commit 8f7880e
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 54 deletions.
2 changes: 1 addition & 1 deletion node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (ce *ConsensusEngine) BroadcastTx(ctx context.Context, tx *types.Tx, sync u
if ce.txAnnouncer != nil {
// We can't use parent context 'cause it's canceled in the caller, which
// could be the RPC request. handler. This shouldn't be CE's problem...
go ce.txAnnouncer(context.Background(), tx.Transaction)
go ce.txAnnouncer(context.Background(), tx.Transaction, txHash)
}

// If sync is set to 1, wait for the transaction to be committed in a block.
Expand Down
2 changes: 1 addition & 1 deletion node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ type ProposalBroadcaster func(ctx context.Context, blk *ktypes.Block)
type BlkAnnouncer func(ctx context.Context, blk *ktypes.Block, ci *types.CommitInfo)

// TxAnnouncer broadcasts the new transaction to the network
type TxAnnouncer func(ctx context.Context, tx *ktypes.Transaction)
type TxAnnouncer func(ctx context.Context, tx *ktypes.Transaction, txID types.Hash)

// AckBroadcaster gossips the ack/nack messages to the network
// type AckBroadcaster func(ack bool, height int64, blkID types.Hash, appHash *types.Hash, Signature []byte) error
Expand Down
2 changes: 1 addition & 1 deletion node/consensus/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func mockVoteBroadcaster(msg *types.AckRes) error {

func mockBlkAnnouncer(_ context.Context, blk *ktypes.Block, ci *types.CommitInfo) {}

func mockTxAnnouncer(ctx context.Context, tx *ktypes.Transaction) {}
func mockTxAnnouncer(ctx context.Context, tx *ktypes.Transaction, txID types.Hash) {}

func mockResetStateBroadcaster(_ int64, _ []ktypes.Hash) error {
return nil
Expand Down
27 changes: 18 additions & 9 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,20 @@ type Node struct {

blkPropHandling chan struct{}

txQueue chan orderedTxn // enforces ordering in the tx broadcasts to the network.

wg sync.WaitGroup
log log.Logger
}

type orderedTxn struct {
txID types.Hash
rawtx []byte
from peer.ID
}

const txQueueSize = 1000 // rare to have a really long queue of unbroadcasted txns

// NewNode creates a new node. The config struct is for required configuration,
// and the functional options for optional settings, like dependency overrides.
func NewNode(cfg *Config, opts ...Option) (*Node, error) {
Expand All @@ -174,11 +184,9 @@ func NewNode(cfg *Config, opts ...Option) (*Node, error) {
ss: cfg.Snapshotter,
bp: cfg.BlockProc,

ackChan: make(chan AckRes, 1),
resetMsg: make(chan ConsensusReset, 1),
// discReq: make(chan types.DiscoveryRequest, 1),
// discResp: make(chan types.DiscoveryResponse, 1),

ackChan: make(chan AckRes, 1),
resetMsg: make(chan ConsensusReset, 1),
txQueue: make(chan orderedTxn, txQueueSize),
blkPropHandling: make(chan struct{}, 1),

P2PService: *cfg.P2PService,
Expand Down Expand Up @@ -315,6 +323,8 @@ func (n *Node) Start(ctx context.Context) error {
return err
}

n.startOrderedTxQueueAnns(ctx)

/*
if err := n.startDiscoveryRequestGossip(ctx, ps); err != nil {
cancel()
Expand Down Expand Up @@ -343,15 +353,14 @@ func (n *Node) Start(ctx context.Context) error {
ProposalBroadcaster: func(ctx context.Context, blk *ktypes.Block) {
n.announceBlkProp(ctx, blk, n.host.ID())
},
TxAnnouncer: func(ctx context.Context, tx *ktypes.Transaction) {
n.announceTx(ctx, tx, n.host.ID())
TxAnnouncer: func(ctx context.Context, tx *ktypes.Transaction, txID types.Hash) {
n.announceTx(ctx, tx, txID, n.host.ID())
},
BlkAnnouncer: n.announceBlk,
AckBroadcaster: n.sendACK,
BlkRequester: n.getBlkHeight,
RstStateBroadcaster: n.sendReset,
// DiscoveryReqBroadcaster: n.sendDiscoveryRequest,
TxBroadcaster: n.BroadcastTx,
TxBroadcaster: n.BroadcastTx,
}

whitelistFns := consensus.WhitelistFns{
Expand Down
80 changes: 38 additions & 42 deletions node/nogossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,17 @@ func (n *Node) txAnnStreamHandler(s network.Stream) {
}

// re-announce
go n.announceRawTx(context.Background(), txHash, rawTx, s.Conn().RemotePeer())
n.queueTxn(txHash, rawTx, s.Conn().RemotePeer())
}

func (n *Node) queueTxn(txID types.Hash, rawTx []byte, from peer.ID) {
tx := orderedTxn{txID: txID, rawtx: rawTx, from: from}

select {
case n.txQueue <- tx:
default:
n.log.Warnf("tx queue full, dropping tx %v", txID)
}
}

func (n *Node) announceRawTx(ctx context.Context, txHash types.Hash, rawTx []byte, from peer.ID) {
Expand All @@ -103,11 +113,8 @@ func (n *Node) announceRawTx(ctx context.Context, txHash types.Hash, rawTx []byt
}
}

func (n *Node) announceTx(ctx context.Context, tx *ktypes.Transaction, from peer.ID) {
rawTx := tx.Bytes()
txHash := tx.Hash()

n.announceRawTx(ctx, txHash, rawTx, from)
func (n *Node) announceTx(_ context.Context, _ *ktypes.Transaction, txID types.Hash, from peer.ID) {
n.queueTxn(txID, nil, from)
}

// advertiseTxToPeer sends a lightweight advertisement to a connected peer.
Expand Down Expand Up @@ -161,30 +168,6 @@ func (n *Node) advertiseTxToPeer(ctx context.Context, peerID peer.ID, txHash typ
return nil
}

/*func randomTx(size int, signer auth.Signer) ([]byte, error) {
payload := &ktypes.KVPayload{
Key: randBytes(32),
Value: randBytes(size),
}
tx, err := ktypes.CreateTransaction(payload, "test-chain", 1)
if err != nil {
return nil, err
}
if err := tx.Sign(signer); err != nil {
return nil, err
}
return tx.MarshalBinary()
}
func randBytes(n int) []byte {
b := make([]byte, n)
rand.Read(b)
return b
}*/

// startTxAnns handles periodic reannouncement. It can also be modified to
// regularly create dummy transactions.
func (n *Node) startTxAnns(ctx context.Context, reannouncePeriod time.Duration) {
Expand Down Expand Up @@ -229,17 +212,30 @@ func (n *Node) startTxAnns(ctx context.Context, reannouncePeriod time.Duration)
}()
}

/*func secp256k1Signer() *auth.EthPersonalSigner {
privKey, _, err := crypto.GenerateSecp256k1Key(nil)
if err != nil {
return nil
}
// startOrderedTxQueueAnns ensures that transaction announcements are broadcasted
// in the order they are received, maintaining FIFO order for nonce consistency.
func (n *Node) startOrderedTxQueueAnns(ctx context.Context) {
n.wg.Add(1)
go func() {
defer n.wg.Done()

privKeyBytes := privKey.Bytes()
k, err := crypto.UnmarshalSecp256k1PrivateKey(privKeyBytes)
if err != nil {
return nil
}
for {
select {
case <-ctx.Done():
return
case txn := <-n.txQueue:
rawTx := txn.rawtx
if txn.rawtx == nil {
// fetch the raw tx from the mempool
tx := n.mp.Get(txn.txID)
if tx == nil {
continue // tx was removed from mempool
}
rawTx = tx.Bytes()
}

return &auth.EthPersonalSigner{Key: *k}
}*/
n.announceRawTx(context.Background(), txn.txID, rawTx, txn.from)
}
}
}()
}

0 comments on commit 8f7880e

Please sign in to comment.