Skip to content

Commit ef036ee

Browse files
committed
added proper scheduled reconnection and matching_orders logic
1 parent 92cb2f0 commit ef036ee

File tree

6 files changed

+100
-97
lines changed

6 files changed

+100
-97
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ rustc-hex = "2.1.0"
2525
schemars = "0.8.0"
2626
serde = { version = "1.0.198", features = ["derive"] }
2727
serde_json = "1.0.116"
28-
spark-market-sdk = "0.6.4"
28+
spark-market-sdk = "0.6.5"
2929
pangea-client = { git = "https://github.com/nazgull08/pangea-client/"}
3030
thiserror = "1.0.63"
3131
tokio = { version = "1.41.0", features = ["rt", "macros", "time"] }

src/indexer/order_event_handler.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::indexer::spot_order::{LimitType, OrderStatus, OrderType, SpotOrder};
2+
use crate::storage::matching_orders::MatchingOrders;
23
use crate::storage::order_book::OrderBook;
34
use chrono::Utc;
45
use log::{error, info};
@@ -27,7 +28,8 @@ pub struct PangeaOrderEvent {
2728
pub limit_type: Option<String>,
2829
}
2930

30-
pub async fn handle_order_event(order_book: Arc<OrderBook>, event: PangeaOrderEvent) {
31+
pub async fn handle_order_event(order_book: Arc<OrderBook>, matching_orders: Arc<MatchingOrders>,
32+
event: PangeaOrderEvent) {
3133
if let Some(event_type) = event.event_type.as_deref() {
3234
match event_type {
3335
"Open" => {
@@ -37,13 +39,17 @@ pub async fn handle_order_event(order_book: Arc<OrderBook>, event: PangeaOrderEv
3739
}
3840
}
3941
"Trade" => {
42+
matching_orders.remove(&event.order_id);
43+
info!("Order {} removed from matching_orders", &event.order_id);
4044
if let Some(match_size) = event.amount {
4145
let o_type = event.order_type_to_enum();
4246
let l_type = event.limit_type_to_enum();
4347
process_trade(&order_book, &event.order_id, match_size, o_type, l_type);
4448
}
4549
}
4650
"Cancel" => {
51+
matching_orders.remove(&event.order_id);
52+
info!("Order {} removed from matching_orders", &event.order_id);
4753
order_book.remove_order(&event.order_id, event.order_type_to_enum());
4854
info!(
4955
"Removed order with id: {} due to Cancel event",

src/indexer/pangea.rs

Lines changed: 76 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use pangea_client::{
66
futures::StreamExt, provider::FuelProvider, query::Bound, requests::fuel::GetSparkOrderRequest,
77
ClientBuilder, Format, WsProvider,
88
};
9-
use tokio::time::sleep;
9+
use tokio::time::{interval, sleep};
1010
use std::collections::HashSet;
1111
use std::str::FromStr;
1212
use std::sync::Arc;
@@ -16,14 +16,16 @@ use crate::config::env::ev;
1616
use crate::error::Error;
1717
use crate::indexer::order_event_handler::handle_order_event;
1818
use crate::indexer::order_event_handler::PangeaOrderEvent;
19+
use crate::storage::matching_orders::MatchingOrders;
1920
use crate::storage::order_book::OrderBook;
2021

2122
pub async fn initialize_pangea_indexer(
2223
tasks: &mut Vec<tokio::task::JoinHandle<()>>,
2324
order_book: Arc<OrderBook>,
25+
matching_orders: Arc<MatchingOrders>,
2426
) -> Result<(), Error> {
2527
let ws_task_pangea = tokio::spawn(async move {
26-
if let Err(e) = start_pangea_indexer(order_book).await {
28+
if let Err(e) = start_pangea_indexer(order_book, matching_orders).await {
2729
eprintln!("Pangea error: {}", e);
2830
}
2931
});
@@ -32,22 +34,24 @@ pub async fn initialize_pangea_indexer(
3234
Ok(())
3335
}
3436

35-
async fn start_pangea_indexer(order_book: Arc<OrderBook>) -> Result<(), Error> {
37+
async fn start_pangea_indexer(order_book: Arc<OrderBook>, matching_orders: Arc<MatchingOrders>) -> Result<(), Error> {
3638
let client = create_pangea_client().await?;
3739

3840
let contract_start_block: i64 = ev("CONTRACT_START_BLOCK")?.parse()?;
3941
let contract_h256 = H256::from_str(&ev("CONTRACT_ID")?)?;
4042

4143
let mut last_processed_block =
42-
fetch_historical_data(&client, &order_book, contract_start_block, contract_h256).await?;
44+
fetch_historical_data(&client, &order_book, &matching_orders,
45+
contract_start_block, contract_h256).await?;
4346

4447
if last_processed_block == 0 {
4548
last_processed_block = contract_start_block;
4649
}
4750

4851
info!("Switching to listening for new orders (deltas)");
4952

50-
listen_for_new_deltas(&client, &order_book, last_processed_block, contract_h256).await
53+
listen_for_new_deltas(&client, &order_book, &matching_orders,
54+
last_processed_block, contract_h256).await
5155
}
5256

5357
async fn create_pangea_client() -> Result<Client<WsProvider>, Error> {
@@ -79,6 +83,7 @@ async fn get_latest_block(chain_id: ChainId) -> Result<i64, Error> {
7983
async fn fetch_historical_data(
8084
client: &Client<WsProvider>,
8185
order_book: &Arc<OrderBook>,
86+
matching_orders: &Arc<MatchingOrders>,
8287
contract_start_block: i64,
8388
contract_h256: H256,
8489
) -> Result<i64, Error> {
@@ -115,7 +120,8 @@ async fn fetch_historical_data(
115120
Ok(data) => {
116121
let data = String::from_utf8(data)?;
117122
let order: PangeaOrderEvent = serde_json::from_str(&data)?;
118-
handle_order_event(order_book.clone(), order).await;
123+
handle_order_event(order_book.clone(), matching_orders.clone(),
124+
order).await;
119125
}
120126
Err(e) => {
121127
error!("Error in the stream of historical orders: {e}");
@@ -134,87 +140,92 @@ async fn fetch_historical_data(
134140
Ok(last_processed_block)
135141
}
136142

143+
137144
async fn listen_for_new_deltas(
138145
client: &Client<WsProvider>,
139146
order_book: &Arc<OrderBook>,
147+
matching_orders: &Arc<MatchingOrders>,
140148
mut last_processed_block: i64,
141149
contract_h256: H256,
142150
) -> Result<(), Error> {
143-
let max_retries = 5;
144-
let mut retry_count = 0;
145-
let retry_delay = Duration::from_secs(5);
151+
let mut retry_delay = Duration::from_secs(1);
152+
let reconnect_interval = Duration::from_secs(10*60);
153+
let mut reconnect_timer = interval(reconnect_interval);
154+
let mut processing = false;
146155

147156
loop {
148-
if retry_count > max_retries {
149-
error!("Maximum number of retries exceeded. Exiting listen_for_new_deltas.");
150-
return Err(Error::MaxRetriesExceeded);
151-
}
152-
153-
let fuel_chain = match ev("CHAIN")?.as_str() {
154-
"FUEL" => ChainId::FUEL,
155-
_ => ChainId::FUELTESTNET,
156-
};
157-
158-
let request_deltas = GetSparkOrderRequest {
159-
from_block: Bound::Exact(last_processed_block + 1),
160-
to_block: Bound::Subscribe,
161-
market_id__in: HashSet::from([contract_h256]),
162-
chains: HashSet::from([fuel_chain]),
163-
..Default::default()
164-
};
165-
166-
let stream_deltas_result = client
167-
.get_fuel_spark_orders_by_format(request_deltas, Format::JsonStream, true)
168-
.await;
169-
170-
let stream_deltas = match stream_deltas_result {
171-
Ok(stream) => {
172-
retry_count = 0;
173-
stream
174-
}
175-
Err(e) => {
176-
error!("Failed to initiate stream: {}", e);
177-
retry_count += 1;
178-
error!("Retrying in {} seconds...", retry_delay.as_secs());
179-
sleep(retry_delay).await;
180-
continue;
181-
}
182-
};
183-
184-
pangea_client::futures::pin_mut!(stream_deltas);
185-
186-
while let Some(data_result) = stream_deltas.next().await {
187-
match data_result {
188-
Ok(data) => {
189-
retry_count = 0;
190-
191-
if let Err(e) = process_order_data(&data, order_book, &mut last_processed_block).await {
192-
error!("Failed to process order data: {}", e);
157+
tokio::select! {
158+
_ = reconnect_timer.tick(), if !processing => {
159+
info!("Scheduled reconnect to refresh connection...");
160+
processing = false; // Сбросим состояние processing
161+
},
162+
result = async {
163+
processing = true;
164+
let fuel_chain = match ev("CHAIN")?.as_str() {
165+
"FUEL" => ChainId::FUEL,
166+
_ => ChainId::FUELTESTNET,
167+
};
168+
169+
let request_deltas = GetSparkOrderRequest {
170+
from_block: Bound::Exact(last_processed_block + 1),
171+
to_block: Bound::Subscribe,
172+
market_id__in: HashSet::from([contract_h256]),
173+
chains: HashSet::from([fuel_chain]),
174+
..Default::default()
175+
};
176+
177+
match client
178+
.get_fuel_spark_orders_by_format(request_deltas, Format::JsonStream, true)
179+
.await
180+
{
181+
Ok(stream_deltas) => {
182+
retry_delay = Duration::from_secs(1);
183+
pangea_client::futures::pin_mut!(stream_deltas);
184+
185+
while let Some(data_result) = stream_deltas.next().await {
186+
match data_result {
187+
Ok(data) => {
188+
if let Err(e) = process_order_data(&data, order_book, matching_orders, &mut last_processed_block).await {
189+
error!("Failed to process order data: {}", e);
190+
}
191+
}
192+
Err(e) => {
193+
error!("Error in the stream of new orders (deltas): {}", e);
194+
break;
195+
}
196+
}
197+
}
198+
}
199+
Err(e) => {
200+
error!("Failed to initiate stream: {}", e);
193201
}
194202
}
195-
Err(e) => {
196-
error!("Error in the stream of new orders (deltas): {}", e);
197-
retry_count += 1;
198-
error!("Retrying connection in {} seconds...", retry_delay.as_secs());
199-
sleep(retry_delay).await;
200-
break;
203+
204+
info!("Reconnecting to listen for new deltas in {} seconds...", retry_delay.as_secs());
205+
sleep(retry_delay).await;
206+
retry_delay = (retry_delay * 2).min(Duration::from_secs(60));
207+
Ok::<(), Error>(())
208+
} => {
209+
processing = false;
210+
if let Err(e) = result {
211+
error!("Error in listen_for_new_deltas: {:?}", e);
201212
}
202-
}
213+
},
203214
}
204-
205-
info!("Reconnecting to listen for new deltas...");
206-
sleep(retry_delay).await;
207215
}
208216
}
209217

210218
async fn process_order_data(
211219
data: &[u8],
212220
order_book: &Arc<OrderBook>,
221+
matching_orders: &Arc<MatchingOrders>,
213222
last_processed_block: &mut i64,
214223
) -> Result<(), Error> {
215224
let data_str = String::from_utf8(data.to_vec())?;
216225
let order_event: PangeaOrderEvent = serde_json::from_str(&data_str)?;
217226
*last_processed_block = order_event.block_number;
218-
handle_order_event(order_book.clone(), order_event).await;
227+
handle_order_event(order_book.clone(),
228+
matching_orders.clone(),
229+
order_event).await;
219230
Ok(())
220231
}

src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ async fn main() -> Result<(), Error> {
2828
let matching_orders = Arc::new(MatchingOrders::new());
2929
let mut tasks = vec![];
3030

31-
initialize_pangea_indexer(&mut tasks, Arc::clone(&order_book)).await?;
31+
initialize_pangea_indexer(&mut tasks, Arc::clone(&order_book), Arc::clone(&matching_orders)).await?;
3232

3333
let port = ev("SERVER_PORT")?.parse()?;
3434
let rocket_task = tokio::spawn(run_rocket_server(port, Arc::clone(&order_book)));
3535
tasks.push(rocket_task);
3636
let matcher_ws_port = ev("MATCHERS_PORT")?.parse()?;
37-
let matcher_websocket = Arc::new(MatcherWebSocket::new(order_book.clone()));
37+
let matcher_websocket = Arc::new(MatcherWebSocket::new(order_book.clone(), matching_orders.clone()));
3838
let matcher_ws_task = tokio::spawn(run_matcher_websocket_server(
3939
matcher_websocket.clone(),
4040
matcher_ws_port

src/matchers/websocket.rs

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,29 @@
11
use crate::config::env::ev;
22
use crate::indexer::spot_order::SpotOrder;
33
use crate::matchers::types::{MatcherRequest, MatcherResponse};
4+
use crate::storage::matching_orders::MatchingOrders;
45
use crate::storage::order_book::OrderBook;
56
use futures_util::{SinkExt, StreamExt};
67
use log::{error, info};
7-
use std::collections::HashSet;
88
use std::sync::Arc;
99
use tokio::net::TcpStream;
10-
use tokio::sync::Mutex;
1110
use tokio_tungstenite::tungstenite::protocol::Message;
1211
use tokio_tungstenite::WebSocketStream;
1312

1413
use super::types::{MatcherConnectRequest, MatcherOrderUpdate};
1514

1615
pub struct MatcherWebSocket {
1716
pub order_book: Arc<OrderBook>,
18-
pub matching_orders: Arc<Mutex<HashSet<String>>>,
17+
pub matching_orders: Arc<MatchingOrders>
1918
}
2019

2120
impl MatcherWebSocket {
22-
pub fn new(order_book: Arc<OrderBook>) -> Self {
21+
pub fn new(order_book: Arc<OrderBook>,
22+
matching_orders: Arc<MatchingOrders>,
23+
) -> Self {
2324
Self {
2425
order_book,
25-
matching_orders: Arc::new(Mutex::new(HashSet::new())),
26+
matching_orders,
2627
}
2728
}
2829

@@ -74,33 +75,20 @@ impl MatcherWebSocket {
7475
if let Err(e) = write.send(Message::Text(response_text)).await {
7576
error!("Failed to send response to matcher {}: {}", uuid, e);
7677

77-
78-
let mut matching_orders = self.matching_orders.lock().await;
7978
for order in available_orders {
80-
matching_orders.remove(&order.id);
79+
self.matching_orders.remove(&order.id);
8180
}
8281
}
8382
}
8483

8584
async fn handle_order_updates(&self, order_updates: Vec<MatcherOrderUpdate>, _uuid: String) {
8685
info!("Processing order updates: {:?}", order_updates);
87-
88-
let mut matching_orders = self.matching_orders.lock().await;
89-
for update in order_updates {
90-
matching_orders.remove(&update.order_id);
91-
info!("Order {} removed from matching_orders", update.order_id);
92-
}
9386
}
9487

9588
async fn get_available_orders(&self, batch_size: usize) -> Vec<SpotOrder> {
9689
let mut available_orders = Vec::new();
9790

98-
99-
let matching_order_ids = {
100-
let matching_orders = self.matching_orders.lock().await;
101-
matching_orders.clone()
102-
};
103-
91+
let matching_order_ids = self.matching_orders.get_all();
10492

10593
let buy_orders = self.order_book.get_buy_orders()
10694
.values()
@@ -166,11 +154,9 @@ impl MatcherWebSocket {
166154
}
167155

168156

169-
{
170-
let mut matching_orders = self.matching_orders.lock().await;
171-
for order_id in new_matching_order_ids {
172-
matching_orders.insert(order_id);
173-
}
157+
for order_id in new_matching_order_ids {
158+
self.matching_orders.add(&order_id);
159+
info!("----Order {} added to matching_orders", order_id);
174160
}
175161

176162
available_orders

0 commit comments

Comments
 (0)