Skip to content

Commit 04391be

Browse files
authored
fix: fire and forget update tx (#102)
The old behaviour causes the exporter not to publishing prices regularly. This change spawns a separate thread to unblock the exporter loop. Also, because of the batch staggering behaviour to avoid calling rpc for all batches at the same time, the old approach would always results in waiting longer than the publishing period.
1 parent c76ccc4 commit 04391be

File tree

3 files changed

+39
-17
lines changed

3 files changed

+39
-17
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-agent"
3-
version = "2.4.3"
3+
version = "2.4.4"
44
edition = "2021"
55

66
[[bin]]

src/agent/solana/exporter.rs

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use {
6161
BTreeMap,
6262
HashMap,
6363
},
64+
sync::Arc,
6465
time::Duration,
6566
},
6667
tokio::{
@@ -228,7 +229,7 @@ pub fn spawn_exporter(
228229
/// Exporter is responsible for exporting data held in the local store
229230
/// to the global Pyth Network.
230231
pub struct Exporter {
231-
rpc_client: RpcClient,
232+
rpc_client: Arc<RpcClient>,
232233

233234
config: Config,
234235

@@ -292,7 +293,10 @@ impl Exporter {
292293
) -> Self {
293294
let publish_interval = time::interval(config.publish_interval_duration);
294295
Exporter {
295-
rpc_client: RpcClient::new_with_timeout(rpc_url.to_string(), rpc_timeout),
296+
rpc_client: Arc::new(RpcClient::new_with_timeout(
297+
rpc_url.to_string(),
298+
rpc_timeout,
299+
)),
296300
config,
297301
network,
298302
publish_interval,
@@ -536,7 +540,7 @@ impl Exporter {
536540
let mut batch_send_interval = time::interval(
537541
self.config
538542
.publish_interval_duration
539-
.div_f64(num_batches as f64),
543+
.div_f64((num_batches + 1) as f64), // +1 to give enough time for the last batch
540544
);
541545
let mut batch_state = HashMap::new();
542546
let mut batch_futures = vec![];
@@ -796,19 +800,37 @@ impl Exporter {
796800
network_state.blockhash,
797801
);
798802

799-
let signature = self
800-
.rpc_client
801-
.send_transaction_with_config(
802-
&transaction,
803-
RpcSendTransactionConfig {
804-
skip_preflight: true,
805-
..RpcSendTransactionConfig::default()
806-
},
807-
)
808-
.await?;
809-
debug!(self.logger, "sent upd_price transaction"; "signature" => signature.to_string(), "instructions" => instructions.len(), "price_accounts" => format!("{:?}", price_accounts));
803+
let tx = self.inflight_transactions_tx.clone();
804+
let logger = self.logger.clone();
805+
let rpc_client = self.rpc_client.clone();
806+
807+
// Fire this off in a separate task so we don't block the main thread of the exporter
808+
tokio::spawn(async move {
809+
let signature = match rpc_client
810+
.send_transaction_with_config(
811+
&transaction,
812+
RpcSendTransactionConfig {
813+
skip_preflight: true,
814+
..RpcSendTransactionConfig::default()
815+
},
816+
)
817+
.await
818+
{
819+
Ok(signature) => signature,
820+
Err(err) => {
821+
error!(logger, "{}", err);
822+
debug!(logger, "error context"; "context" => format!("{:?}", err));
823+
return;
824+
}
825+
};
826+
827+
debug!(logger, "sent upd_price transaction"; "signature" => signature.to_string(), "instructions" => instructions.len(), "price_accounts" => format!("{:?}", price_accounts));
810828

811-
self.inflight_transactions_tx.send(signature).await?;
829+
if let Err(err) = tx.send(signature).await {
830+
error!(logger, "{}", err);
831+
debug!(logger, "error context"; "context" => format!("{:?}", err));
832+
}
833+
});
812834

813835
Ok(())
814836
}

0 commit comments

Comments
 (0)