@@ -5,7 +5,7 @@ use eyre::Error;
55use init4_bin_base:: deps:: tracing:: { Instrument , debug, debug_span, trace} ;
66use reqwest:: { Client , Url } ;
77use serde:: { Deserialize , Serialize } ;
8- use serde_json :: from_slice ;
8+ use std :: time :: Duration ;
99use tokio:: { sync:: mpsc, task:: JoinHandle , time} ;
1010
1111/// Models a response from the transaction pool.
@@ -40,12 +40,22 @@ impl TxPoller {
4040 Self { config : config. clone ( ) , client : Client :: new ( ) , poll_interval_ms }
4141 }
4242
43+ /// Returns the poll duration as a [`Duration`].
44+ const fn poll_duration ( & self ) -> Duration {
45+ Duration :: from_millis ( self . poll_interval_ms )
46+ }
47+
4348 /// Polls the transaction cache for transactions.
4449 pub async fn check_tx_cache ( & mut self ) -> Result < Vec < TxEnvelope > , Error > {
4550 let url: Url = Url :: parse ( & self . config . tx_pool_url ) ?. join ( "transactions" ) ?;
46- let result = self . client . get ( url) . send ( ) . await ?;
47- let response: TxPoolResponse = from_slice ( result. text ( ) . await ?. as_bytes ( ) ) ?;
48- Ok ( response. transactions )
51+ self . client
52+ . get ( url)
53+ . send ( )
54+ . await ?
55+ . json ( )
56+ . await
57+ . map ( |resp : TxPoolResponse | resp. transactions )
58+ . map_err ( Into :: into)
4959 }
5060
5161 async fn task_future ( mut self , outbound : mpsc:: UnboundedSender < TxEnvelope > ) {
@@ -64,25 +74,23 @@ impl TxPoller {
6474 // exit the span after the check.
6575 drop ( _guard) ;
6676
67- match self . check_tx_cache ( ) . instrument ( span. clone ( ) ) . await {
68- Ok ( transactions) => {
69- let _guard = span. entered ( ) ;
70- debug ! ( count = ?transactions. len( ) , "found transactions" ) ;
71- for tx in transactions. into_iter ( ) {
72- if outbound. send ( tx) . is_err ( ) {
73- // If there are no receivers, we can shut down
74- trace ! ( "No receivers left, shutting down" ) ;
75- break ;
76- }
77+ if let Ok ( transactions) =
78+ self . check_tx_cache ( ) . instrument ( span. clone ( ) ) . await . inspect_err ( |err| {
79+ debug ! ( %err, "Error fetching transactions" ) ;
80+ } )
81+ {
82+ let _guard = span. entered ( ) ;
83+ debug ! ( count = ?transactions. len( ) , "found transactions" ) ;
84+ for tx in transactions. into_iter ( ) {
85+ if outbound. send ( tx) . is_err ( ) {
86+ // If there are no receivers, we can shut down
87+ trace ! ( "No receivers left, shutting down" ) ;
88+ break ;
7789 }
7890 }
79- // If fetching was an error, we log and continue. We expect
80- // these to be transient network issues.
81- Err ( e) => {
82- debug ! ( error = %e, "Error fetching transactions" ) ;
83- }
8491 }
85- time:: sleep ( time:: Duration :: from_millis ( self . poll_interval_ms ) ) . await ;
92+
93+ time:: sleep ( self . poll_duration ( ) ) . await ;
8694 }
8795 }
8896
0 commit comments