Skip to content

Commit f61ab5e

Browse files
committed
Lazer symbol fetch retry and async task
1 parent 657e639 commit f61ab5e

File tree

1 file changed

+111
-26
lines changed

1 file changed

+111
-26
lines changed

src/agent/services/lazer_exporter.rs

Lines changed: 111 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use {
33
anyhow::{
44
Context,
55
Result,
6-
anyhow,
76
bail,
87
},
98
backoff::{
@@ -353,8 +352,12 @@ mod lazer_exporter {
353352
std::{
354353
collections::HashMap,
355354
sync::Arc,
355+
time::Duration,
356+
},
357+
tokio::sync::{
358+
broadcast::Sender,
359+
mpsc,
356360
},
357-
tokio::sync::broadcast::Sender,
358361
url::Url,
359362
};
360363

@@ -367,22 +370,43 @@ mod lazer_exporter {
367370
S: LocalStore,
368371
S: Send + Sync + 'static,
369372
{
370-
let mut lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
373+
// We can't publish to Lazer without symbols, so crash the process if it fails.
374+
let mut lazer_symbols = match get_lazer_symbol_map(&config.history_url).await {
375+
Ok(symbol_map) => {
376+
if symbol_map.is_empty() {
377+
panic!("Retrieved zero Lazer symbols from {}", config.history_url);
378+
}
379+
symbol_map
380+
}
381+
Err(_) => {
382+
tracing::error!(
383+
"Failed to retrieve Lazer symbols from {}",
384+
config.history_url
385+
);
386+
panic!(
387+
"Failed to retrieve Lazer symbols from {}",
388+
config.history_url
389+
);
390+
}
391+
};
392+
371393
tracing::info!(
372394
"Retrieved {} Lazer feeds with hermes symbols from symbols endpoint: {}",
373395
lazer_symbols.len(),
374396
&config.history_url
375397
);
376398

399+
let (symbols_sender, mut symbols_receiver) = mpsc::channel(1);
400+
tokio::spawn(get_lazer_symbols_task(
401+
config.history_url.clone(),
402+
config.symbol_fetch_interval_duration.clone(),
403+
symbols_sender,
404+
));
405+
377406
let mut publish_interval = tokio::time::interval(config.publish_interval_duration);
378-
let mut symbol_fetch_interval =
379-
tokio::time::interval(config.symbol_fetch_interval_duration);
380407

381408
loop {
382409
tokio::select! {
383-
_ = symbol_fetch_interval.tick() => {
384-
lazer_symbols = get_lazer_symbol_map(&config.history_url).await;
385-
},
386410
_ = publish_interval.tick() => {
387411
let publisher_timestamp = MessageField::some(Timestamp::now());
388412
let mut publisher_update = PublisherUpdate {
@@ -449,33 +473,94 @@ mod lazer_exporter {
449473
tracing::error!("Error sending transaction to relayer receivers: {e}");
450474
}
451475
}
476+
},
477+
latest_symbol_map = symbols_receiver.recv() => {
478+
match latest_symbol_map {
479+
Some(symbol_map) => {
480+
tracing::info!("Refreshing Lazer symbol map with {} symbols", symbol_map.len());
481+
lazer_symbols = symbol_map
482+
}
483+
None => {
484+
// agent can continue but will eventually have a stale symbol set unless the process is cycled.
485+
tracing::error!("Lazer symbol refresh channel closed")
486+
}
487+
}
488+
},
489+
}
490+
}
491+
}
492+
493+
async fn get_lazer_symbols_task(
494+
history_url: Url,
495+
fetch_interval_duration: Duration,
496+
sender: mpsc::Sender<HashMap<pyth_sdk::Identifier, SymbolResponse>>,
497+
) {
498+
let mut symbol_fetch_interval = tokio::time::interval(fetch_interval_duration);
499+
500+
loop {
501+
tokio::select! {
502+
_ = symbol_fetch_interval.tick() => {
503+
tracing::info!("Refreshing Lazer symbol map from history service...");
504+
match get_lazer_symbol_map(&history_url).await {
505+
Ok(symbol_map) => {
506+
if symbol_map.is_empty() {
507+
tracing::error!("Retrieved zero Lazer symbols from {}", history_url);
508+
continue;
509+
}
510+
match sender.send(symbol_map).await {
511+
Ok(_) => (),
512+
Err(e) => {
513+
// agent can continue but will eventually have a stale symbol set unless the process is cycled.
514+
tracing::error!("Error sending refreshed symbol map to exporter task: {e}");
515+
}
516+
}
517+
},
518+
Err(_) => {
519+
tracing::error!("Failed to retrieve Lazer symbols from {} in refresh task", history_url);
520+
}
521+
}
452522
}
453523
}
454524
}
455525
}
456526

457527
async fn get_lazer_symbol_map(
458528
history_url: &Url,
459-
) -> HashMap<pyth_sdk::Identifier, SymbolResponse> {
460-
match fetch_symbols(history_url).await {
461-
Ok(symbols) => symbols
462-
.into_iter()
463-
.filter_map(|symbol| {
464-
let hermes_id = symbol.hermes_id.clone()?;
465-
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
466-
Ok(id) => Some((id, symbol)),
467-
Err(e) => {
468-
tracing::warn!("Failed to parse hermes_id {}: {e:?}", hermes_id);
469-
None
470-
}
471-
}
472-
})
473-
.collect(),
474-
Err(e) => {
475-
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
476-
HashMap::new()
529+
) -> anyhow::Result<HashMap<pyth_sdk::Identifier, SymbolResponse>> {
530+
const NUM_RETRIES: usize = 3;
531+
const RETRY_INTERVAL: Duration = Duration::from_secs(1);
532+
let mut retry_count = 0;
533+
534+
while retry_count < NUM_RETRIES {
535+
match fetch_symbols(history_url).await {
536+
Ok(symbols) => {
537+
let symbol_map = symbols
538+
.into_iter()
539+
.filter_map(|symbol| {
540+
let hermes_id = symbol.hermes_id.clone()?;
541+
match pyth_sdk::Identifier::from_hex(hermes_id.clone()) {
542+
Ok(id) => Some((id, symbol)),
543+
Err(e) => {
544+
tracing::warn!(
545+
"Failed to parse hermes_id {}: {e:?}",
546+
hermes_id
547+
);
548+
None
549+
}
550+
}
551+
})
552+
.collect();
553+
return Ok(symbol_map);
554+
}
555+
Err(e) => {
556+
tracing::error!("Failed to fetch Lazer symbols: {e:?}");
557+
558+
retry_count += 1;
559+
tokio::time::sleep(RETRY_INTERVAL).await;
560+
}
477561
}
478562
}
563+
anyhow::bail!("Lazer symbol map fetch failed after {NUM_RETRIES} attempts");
479564
}
480565
}
481566

0 commit comments

Comments
 (0)