diff --git a/eth/backend.go b/eth/backend.go index ff5926a187..380498c526 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -21,12 +21,13 @@ import ( "context" "errors" "fmt" - "github.com/ethereum/go-ethereum/core/txpool/bundlepool" "math/big" "runtime" "sync" "time" + "github.com/ethereum/go-ethereum/core/txpool/bundlepool" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -324,6 +325,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { // Permit the downloader to use the trie cache allowance during fast sync cacheLimit := cacheConfig.TrieCleanLimit + cacheConfig.TrieDirtyLimit + cacheConfig.SnapshotLimit if eth.handler, err = newHandler(&handlerConfig{ + DirectNodes: stack.Config().P2P.DirectNodes, Database: chainDb, Chain: eth.blockchain, TxPool: eth.txPool, diff --git a/eth/handler.go b/eth/handler.go index db8f0ed5cd..bcb56d5b1e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/triedb/pathdb" ) @@ -88,6 +89,7 @@ type txPool interface { // handlerConfig is the collection of initialization parameters to create a full // node network handler. type handlerConfig struct { + DirectNodes []*enode.Node Database ethdb.Database // Database for direct sync insertions Chain *core.BlockChain // Blockchain to serve data from TxPool txPool // Transaction pool to propagate from @@ -137,6 +139,8 @@ type handler struct { handlerStartCh chan struct{} handlerDoneCh chan struct{} + + directNodes map[string]struct{} } // newHandler returns a handler for all Ethereum chain management protocol. @@ -159,7 +163,12 @@ func newHandler(config *handlerConfig) (*handler, error) { quitSync: make(chan struct{}), handlerDoneCh: make(chan struct{}), handlerStartCh: make(chan struct{}), + directNodes: make(map[string]struct{}), + } + for _, node := range config.DirectNodes { + h.directNodes[node.ID().String()] = struct{}{} } + if config.Sync == downloader.FullSync { // The database seems empty as the current block is the genesis. Yet the snap // block is ahead, so snap sync was enabled for this node at a certain point. @@ -620,6 +629,29 @@ func (h *handler) BroadcastBlock(block *types.Block, propagate bool) { } } +func (h *handler) peersForBroadcasting(numDirect int, peers []*ethPeer) (direct []*ethPeer, announce []*ethPeer) { + // Split the peers into direct-peers and announce-peers + // we send the tx directly to direct-peers + // we announce the tx to announce-peers + direct = make([]*ethPeer, 0, numDirect) + announce = make([]*ethPeer, 0, len(peers)-numDirect) + for _, peer := range peers { + if _, ok := h.directNodes[peer.ID()]; ok { + direct = append(direct, peer) + } else { + announce = append(announce, peer) + } + } + + // if directly-peers are not enough, move some announce-peers into directly pool + for len(direct) < numDirect && len(announce) > 0 { + // shift one peer to trusted + direct = append(direct, announce[0]) + announce = announce[1:] + } + return direct, announce +} + // BroadcastTransactions will propagate a batch of transactions // - To a square root of all peers for non-blob transactions // - And, separately, as announcements to all peers which are not known to @@ -642,6 +674,7 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { peers := h.peers.peersWithoutTransaction(tx.Hash()) var numDirect int + var direct, announce []*ethPeer = nil, peers switch { case tx.Type() == types.BlobTxType: blobTxs++ @@ -649,14 +682,16 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { largeTxs++ default: numDirect = int(math.Sqrt(float64(len(peers)))) + direct, announce = h.peersForBroadcasting(numDirect, peers) } + // Send the tx unconditionally to a subset of our peers - for _, peer := range peers[:numDirect] { + for _, peer := range direct { txset[peer] = append(txset[peer], tx.Hash()) log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash()) } // For the remaining peers, send announcement only - for _, peer := range peers[numDirect:] { + for _, peer := range announce { annos[peer] = append(annos[peer], tx.Hash()) log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash()) } diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 02fb0fdb7d..c3cecd0518 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -2000,9 +2000,9 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c if tx.To() == nil { addr := crypto.CreateAddress(from, tx.Nonce()) - log.Info("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value()) + log.Debug("Submitted contract creation", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "contract", addr.Hex(), "value", tx.Value()) } else { - log.Info("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value()) + log.Debug("Submitted transaction", "hash", tx.Hash().Hex(), "from", from, "nonce", tx.Nonce(), "recipient", tx.To(), "value", tx.Value()) } return tx.Hash(), nil } diff --git a/p2p/server.go b/p2p/server.go index b398c51eb4..af036a2510 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -116,6 +116,9 @@ type Config struct { // maintained and re-connected on disconnects. StaticNodes []*enode.Node + // Direct nodes are static nodes who will always reveived transactions body rather than just the hashes. + DirectNodes []*enode.Node + // Trusted nodes are used as pre-configured connections which are always // allowed to connect, even above the peer limit. TrustedNodes []*enode.Node