Skip to content

Commit

Permalink
pindexer: properly record transactions in dex_ex table
Browse files Browse the repository at this point in the history
  • Loading branch information
cronokirby committed Feb 19, 2025
1 parent 28d6652 commit f82c0a4
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 19 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ penumbra-sdk-num = {workspace = true, default-features = false}
penumbra-sdk-asset = {workspace = true, default-features = false}
penumbra-sdk-proto = {workspace = true, default-features = false}
penumbra-sdk-sct = {workspace = true, default-features = false}
penumbra-sdk-transaction = {workspace = true, default-features = false}
prost = {workspace = true}
tracing = {workspace = true}
tokio = {workspace = true, features = ["full"]}
Expand Down
35 changes: 21 additions & 14 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use penumbra_sdk_num::Amount;
use penumbra_sdk_proto::event::EventDomainType;
use penumbra_sdk_proto::DomainType;
use penumbra_sdk_sct::event::EventBlockRoot;
use penumbra_sdk_transaction::Transaction;
use sqlx::types::BigDecimal;
use sqlx::{prelude::Type, Row};
use std::collections::{BTreeMap, HashMap, HashSet};
Expand Down Expand Up @@ -703,8 +704,6 @@ struct Events {
position_open_txs: BTreeMap<PositionId, [u8; 32]>,
position_close_txs: BTreeMap<PositionId, [u8; 32]>,
position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
// Track transactions
transactions: HashMap<TransactionId, Transaction>,
}

impl Events {
Expand All @@ -725,7 +724,6 @@ impl Events {
position_open_txs: BTreeMap::new(),
position_close_txs: BTreeMap::new(),
position_withdrawal_txs: BTreeMap::new(),
transactions: HashMap::new(),
}
}

Expand Down Expand Up @@ -891,8 +889,6 @@ impl Events {
.entry(e.trading_pair)
.or_insert_with(Vec::new)
.push(e);
} else if let Ok(e) = EventBlockTransaction::try_from_event(&event.event) {
out.transactions.insert(e.transaction_id, e.transaction);
}
}
Ok(out)
Expand Down Expand Up @@ -1407,7 +1403,7 @@ impl Component {
&self,
dbtx: &mut PgTransaction<'_>,
time: DateTime,
height: i32,
height: u64,
transaction_id: [u8; 32],
transaction: Transaction,
) -> anyhow::Result<()> {
Expand All @@ -1420,14 +1416,29 @@ impl Component {
) VALUES ($1, $2, $3, $4)",
)
.bind(transaction_id)
.bind(transaction)
.bind(height)
.bind(transaction.encode_to_vec())
.bind(i32::try_from(height)?)
.bind(time)
.execute(dbtx.as_mut())
.await?;

Ok(())
}

async fn record_all_transactions(
&self,
dbtx: &mut PgTransaction<'_>,
time: DateTime,
block: &BlockEvents,
) -> anyhow::Result<()> {
for (tx_id, tx_bytes) in block.transactions() {
let tx = Transaction::try_from(tx_bytes)?;
let height = block.height();
self.record_transaction(dbtx, time, height, tx_id, tx)
.await?;
}
Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -1463,6 +1474,8 @@ impl AppView for Component {
.expect(&format!("no block root event at height {}", block.height));
last_time = Some(time);

self.record_all_transactions(dbtx, time, block).await?;

// Load any missing positions before processing events
events.load_positions(dbtx).await?;

Expand Down Expand Up @@ -1506,12 +1519,6 @@ impl AppView for Component {
.await?;
}

// Record transactions
for (transaction_id, transaction) in &events.transactions {
self.record_transaction(dbtx, time, events.height, transaction_id, transaction)
.await?;
}

for (pair, candle) in &events.candles {
for window in Window::all() {
let key = (pair.start, pair.end, window);
Expand Down
7 changes: 2 additions & 5 deletions crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,14 @@ CREATE TABLE IF NOT EXISTS dex_ex_block_summary (
CREATE INDEX ON dex_ex_block_summary (time, height);

CREATE TABLE IF NOT EXISTS dex_ex_transactions (
-- The primary key
rowid SERIAL PRIMARY KEY,
-- The unique identifier of the transaction
transaction_id BYTEA NOT NULL UNIQUE,
transaction_id BYTEA NOT NULL PRIMARY KEY,
-- The raw transaction bytes
transaction BYTEA NOT NULL,
-- The block height at which this transaction was included
height INTEGER NOT NULL,
-- The timestamp when this transaction was included in a block
time TIMESTAMPTZ NOT NULL,
PRIMARY KEY (transaction_id)
time TIMESTAMPTZ NOT NULL
);

CREATE INDEX ON dex_ex_transactions (time, height);
Expand Down

0 comments on commit f82c0a4

Please sign in to comment.