|
| 1 | +//! Best bid websocket connector |
| 2 | +//! |
| 3 | +//! This module uses websocket connection to get best bid value from the relays. |
| 4 | +//! The best bid for the current block-slot is stored in the `BestBidCell` and can be read at any time. |
| 5 | +use std::ops::DerefMut; |
| 6 | +use std::sync::Arc; |
| 7 | +use std::time::Duration; |
| 8 | + |
| 9 | +use crate::reconnect::{run_async_loop_with_reconnect, RunCommand}; |
| 10 | +use alloy_primitives::utils::format_ether; |
| 11 | +use alloy_primitives::U256; |
| 12 | +use parking_lot::Mutex; |
| 13 | +use rbuilder::live_builder::block_output::bid_value_source::interfaces::{ |
| 14 | + BidValueObs, BidValueSource, CompetitionBid, |
| 15 | +}; |
| 16 | +use serde::Deserialize; |
| 17 | +use tokio::net::TcpStream; |
| 18 | +use tokio_stream::StreamExt; |
| 19 | +use tokio_tungstenite::tungstenite::client::IntoClientRequest; |
| 20 | +use tokio_tungstenite::tungstenite::handshake::client::Request; |
| 21 | +use tokio_tungstenite::tungstenite::Error; |
| 22 | +use tokio_tungstenite::{connect_async_with_config, tungstenite::protocol::Message}; |
| 23 | +use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; |
| 24 | +use tokio_util::sync::CancellationToken; |
| 25 | +use tracing::{error, trace, warn}; |
| 26 | + |
| 27 | +use crate::metrics::inc_non_0_competition_bids; |
| 28 | + |
| 29 | +type Connection = WebSocketStream<MaybeTlsStream<TcpStream>>; |
| 30 | + |
| 31 | +const MAX_IO_ERRORS: usize = 5; |
| 32 | + |
| 33 | +// time that we wait for a new value before reconnecting |
| 34 | +const READ_TIMEOUT: Duration = Duration::from_secs(5); |
| 35 | + |
| 36 | +#[derive(Debug, Clone, Deserialize, Default)] |
| 37 | +#[serde(rename_all = "camelCase")] |
| 38 | +pub struct BestBidValue { |
| 39 | + pub block_number: u64, |
| 40 | + pub slot_number: u64, |
| 41 | + pub block_top_bid: U256, |
| 42 | +} |
| 43 | + |
| 44 | +#[derive(Debug)] |
| 45 | +pub struct Subscription { |
| 46 | + pub block_number: u64, |
| 47 | + pub slot_number: u64, |
| 48 | + pub obs: Arc<dyn BidValueObs>, |
| 49 | +} |
| 50 | + |
| 51 | +/// Struct that connects to a websocket feed with best bids from the competition. |
| 52 | +/// Allows to subscribe so listen for changes on a particular slot. |
| 53 | +/// Usage: |
| 54 | +/// - call sub = subscribe |
| 55 | +/// - monitor the value as long as needed: |
| 56 | +/// - await wait_for_change. This will wake when a change happens, no need for polling. |
| 57 | +/// - Ask top_bid |
| 58 | +/// - call unsubscribe(sub) |
| 59 | +#[derive(Debug)] |
| 60 | +pub struct BestBidWSConnector { |
| 61 | + connection_request: Request, |
| 62 | + subscriptions: Arc<Mutex<Vec<Subscription>>>, |
| 63 | +} |
| 64 | + |
| 65 | +impl BestBidWSConnector { |
| 66 | + pub fn new(url: &str, basic_auth: &str) -> eyre::Result<Self> { |
| 67 | + let mut connection_request = url.into_client_request()?; |
| 68 | + connection_request |
| 69 | + .headers_mut() |
| 70 | + .insert("Authorization", format!("Basic {basic_auth}").parse()?); |
| 71 | + |
| 72 | + Ok(Self { |
| 73 | + connection_request, |
| 74 | + subscriptions: Default::default(), |
| 75 | + }) |
| 76 | + } |
| 77 | + |
| 78 | + pub async fn run_ws_stream( |
| 79 | + &self, |
| 80 | + // We must try_send on every non 0 bid or the process will be killed |
| 81 | + watch_dog_sender: flume::Sender<()>, |
| 82 | + cancellation_token: CancellationToken, |
| 83 | + ) { |
| 84 | + run_async_loop_with_reconnect( |
| 85 | + "ws_top_bid_connection", |
| 86 | + || connect(self.connection_request.clone()), |
| 87 | + |conn| { |
| 88 | + run_command( |
| 89 | + conn, |
| 90 | + cancellation_token.clone(), |
| 91 | + watch_dog_sender.clone(), |
| 92 | + self.subscriptions.clone(), |
| 93 | + ) |
| 94 | + }, |
| 95 | + None, |
| 96 | + cancellation_token.clone(), |
| 97 | + ) |
| 98 | + .await; |
| 99 | + } |
| 100 | +} |
| 101 | + |
| 102 | +async fn connect<R>(request: R) -> Result<Connection, Error> |
| 103 | +where |
| 104 | + R: IntoClientRequest + Unpin, |
| 105 | +{ |
| 106 | + connect_async_with_config( |
| 107 | + request, None, true, // TODO: naggle, decide |
| 108 | + ) |
| 109 | + .await |
| 110 | + .map(|(c, _)| c) |
| 111 | +} |
| 112 | + |
| 113 | +async fn run_command( |
| 114 | + mut conn: Connection, |
| 115 | + cancellation_token: CancellationToken, |
| 116 | + watch_dog_sender: flume::Sender<()>, |
| 117 | + subscriptions: Arc<Mutex<Vec<Subscription>>>, |
| 118 | +) -> RunCommand { |
| 119 | + let mut io_error_count = 0; |
| 120 | + loop { |
| 121 | + if cancellation_token.is_cancelled() { |
| 122 | + break; |
| 123 | + } |
| 124 | + if io_error_count >= MAX_IO_ERRORS { |
| 125 | + warn!("Too many read errors, reconnecting"); |
| 126 | + return RunCommand::Reconnect; |
| 127 | + } |
| 128 | + |
| 129 | + let next_message = tokio::time::timeout(READ_TIMEOUT, conn.next()); |
| 130 | + let res = match next_message.await { |
| 131 | + Ok(res) => res, |
| 132 | + Err(err) => { |
| 133 | + warn!(?err, "Timeout error"); |
| 134 | + return RunCommand::Reconnect; |
| 135 | + } |
| 136 | + }; |
| 137 | + let message = match res { |
| 138 | + Some(Ok(message)) => message, |
| 139 | + Some(Err(err)) => { |
| 140 | + warn!(?err, "Error reading WS stream"); |
| 141 | + io_error_count += 1; |
| 142 | + continue; |
| 143 | + } |
| 144 | + None => { |
| 145 | + warn!("Connection read stream is closed, reconnecting"); |
| 146 | + return RunCommand::Reconnect; |
| 147 | + } |
| 148 | + }; |
| 149 | + let data = match &message { |
| 150 | + Message::Text(msg) => msg.as_bytes(), |
| 151 | + Message::Binary(msg) => msg.as_ref(), |
| 152 | + Message::Ping(_) => { |
| 153 | + error!(ws_message = "ping", "Received unexpected message"); |
| 154 | + continue; |
| 155 | + } |
| 156 | + Message::Pong(_) => { |
| 157 | + error!(ws_message = "pong", "Received unexpected message"); |
| 158 | + continue; |
| 159 | + } |
| 160 | + Message::Frame(_) => { |
| 161 | + error!(ws_message = "frame", "Received unexpected message"); |
| 162 | + continue; |
| 163 | + } |
| 164 | + Message::Close(_) => { |
| 165 | + warn!("Connection closed, reconnecting"); |
| 166 | + return RunCommand::Reconnect; |
| 167 | + } |
| 168 | + }; |
| 169 | + |
| 170 | + let bid_value: BestBidValue = match serde_json::from_slice(data) { |
| 171 | + Ok(value) => value, |
| 172 | + Err(err) => { |
| 173 | + error!(?err, "Failed to parse best bid value"); |
| 174 | + continue; |
| 175 | + } |
| 176 | + }; |
| 177 | + |
| 178 | + if !bid_value.block_top_bid.is_zero() { |
| 179 | + inc_non_0_competition_bids(); |
| 180 | + let _ = watch_dog_sender.try_send(()); |
| 181 | + } |
| 182 | + |
| 183 | + trace!( |
| 184 | + block = bid_value.block_number, |
| 185 | + slot = bid_value.slot_number, |
| 186 | + value = format_ether(bid_value.block_top_bid), |
| 187 | + "Updated best bid value" |
| 188 | + ); |
| 189 | + |
| 190 | + for sub in subscriptions.lock().deref_mut() { |
| 191 | + if sub.block_number == bid_value.block_number |
| 192 | + && sub.slot_number == bid_value.slot_number |
| 193 | + { |
| 194 | + sub.obs |
| 195 | + .update_new_bid(CompetitionBid::new(bid_value.block_top_bid)); |
| 196 | + } |
| 197 | + } |
| 198 | + } |
| 199 | + RunCommand::Finish |
| 200 | +} |
| 201 | + |
| 202 | +impl BidValueSource for BestBidWSConnector { |
| 203 | + fn subscribe(&self, block_number: u64, slot_number: u64, obs: Arc<dyn BidValueObs>) { |
| 204 | + self.subscriptions.lock().push(Subscription { |
| 205 | + block_number, |
| 206 | + slot_number, |
| 207 | + obs, |
| 208 | + }); |
| 209 | + } |
| 210 | + |
| 211 | + fn unsubscribe(&self, obs: Arc<dyn BidValueObs>) { |
| 212 | + self.subscriptions |
| 213 | + .lock() |
| 214 | + .retain(|s| !Arc::ptr_eq(&s.obs, &obs)); |
| 215 | + } |
| 216 | +} |
0 commit comments