| 
 | 1 | +use {  | 
 | 2 | +    borsh::BorshDeserialize,  | 
 | 3 | +    clap::Parser,  | 
 | 4 | +    posted_message::PostedMessageUnreliableData,  | 
 | 5 | +    secp256k1::SecretKey,  | 
 | 6 | +    signed_body::SignedBody,  | 
 | 7 | +    solana_account_decoder::UiAccountEncoding,  | 
 | 8 | +    solana_client::{  | 
 | 9 | +        nonblocking::pubsub_client::PubsubClient,  | 
 | 10 | +        pubsub_client::PubsubClientError,  | 
 | 11 | +        rpc_config::{  | 
 | 12 | +            RpcAccountInfoConfig,  | 
 | 13 | +            RpcProgramAccountsConfig,  | 
 | 14 | +        },  | 
 | 15 | +        rpc_filter::{  | 
 | 16 | +            Memcmp,  | 
 | 17 | +            RpcFilterType,  | 
 | 18 | +        },  | 
 | 19 | +    },  | 
 | 20 | +    solana_sdk::pubkey::Pubkey,  | 
 | 21 | +    std::{  | 
 | 22 | +        fs,  | 
 | 23 | +        str::FromStr,  | 
 | 24 | +        time::Duration,  | 
 | 25 | +    },  | 
 | 26 | +    tokio::time::sleep,  | 
 | 27 | +    tokio_stream::StreamExt,  | 
 | 28 | +    wormhole_sdk::{  | 
 | 29 | +        vaa::Body,  | 
 | 30 | +        Address,  | 
 | 31 | +        Chain,  | 
 | 32 | +    },  | 
 | 33 | +};  | 
 | 34 | + | 
 | 35 | +mod config;  | 
 | 36 | +mod posted_message;  | 
 | 37 | +mod signed_body;  | 
 | 38 | + | 
 | 39 | +struct ListenerConfig {  | 
 | 40 | +    ws_url:              String,  | 
 | 41 | +    secret_key:          SecretKey,  | 
 | 42 | +    wormhole_pid:        Pubkey,  | 
 | 43 | +    accumulator_address: Pubkey,  | 
 | 44 | +}  | 
 | 45 | + | 
 | 46 | +fn find_message_pda(wormhole_pid: &Pubkey, slot: u64) -> Pubkey {  | 
 | 47 | +    let ring_index = (slot % 10_000) as u32;  | 
 | 48 | +    Pubkey::find_program_address(  | 
 | 49 | +        &[b"AccumulatorMessage", &ring_index.to_be_bytes()],  | 
 | 50 | +        wormhole_pid,  | 
 | 51 | +    )  | 
 | 52 | +    .0  | 
 | 53 | +}  | 
 | 54 | + | 
 | 55 | +async fn run_listener(config: ListenerConfig) -> Result<(), PubsubClientError> {  | 
 | 56 | +    let client = PubsubClient::new(config.ws_url.as_str()).await?;  | 
 | 57 | +    let (mut stream, unsubscribe) = client  | 
 | 58 | +        .program_subscribe(  | 
 | 59 | +            &config.wormhole_pid,  | 
 | 60 | +            Some(RpcProgramAccountsConfig {  | 
 | 61 | +                filters:        Some(vec![RpcFilterType::Memcmp(Memcmp::new(  | 
 | 62 | +                    0,  | 
 | 63 | +                    solana_client::rpc_filter::MemcmpEncodedBytes::Bytes(b"msu".to_vec()),  | 
 | 64 | +                ))]),  | 
 | 65 | +                account_config: RpcAccountInfoConfig {  | 
 | 66 | +                    encoding:         Some(UiAccountEncoding::Base64),  | 
 | 67 | +                    data_slice:       None,  | 
 | 68 | +                    commitment:       Some(  | 
 | 69 | +                        solana_sdk::commitment_config::CommitmentConfig::confirmed(),  | 
 | 70 | +                    ),  | 
 | 71 | +                    min_context_slot: None,  | 
 | 72 | +                },  | 
 | 73 | +                with_context:   None,  | 
 | 74 | +                sort_results:   None,  | 
 | 75 | +            }),  | 
 | 76 | +        )  | 
 | 77 | +        .await?;  | 
 | 78 | + | 
 | 79 | +    while let Some(update) = stream.next().await {  | 
 | 80 | +        if find_message_pda(&config.wormhole_pid, update.context.slot).to_string()  | 
 | 81 | +            != update.value.pubkey  | 
 | 82 | +        {  | 
 | 83 | +            continue; // Skip updates that are not for the expected PDA  | 
 | 84 | +        }  | 
 | 85 | + | 
 | 86 | +        let unreliable_data: PostedMessageUnreliableData = {  | 
 | 87 | +            let data = match update.value.account.data.decode() {  | 
 | 88 | +                Some(data) => data,  | 
 | 89 | +                None => {  | 
 | 90 | +                    tracing::error!("Failed to decode account data");  | 
 | 91 | +                    continue;  | 
 | 92 | +                }  | 
 | 93 | +            };  | 
 | 94 | + | 
 | 95 | +            match BorshDeserialize::deserialize(&mut data.as_slice()) {  | 
 | 96 | +                Ok(data) => data,  | 
 | 97 | +                Err(e) => {  | 
 | 98 | +                    tracing::error!(error = ?e, "Invalid unreliable data format");  | 
 | 99 | +                    continue;  | 
 | 100 | +                }  | 
 | 101 | +            }  | 
 | 102 | +        };  | 
 | 103 | + | 
 | 104 | +        if Chain::Pythnet != unreliable_data.emitter_chain.into() {  | 
 | 105 | +            continue;  | 
 | 106 | +        }  | 
 | 107 | +        if config.accumulator_address != Pubkey::from(unreliable_data.emitter_address) {  | 
 | 108 | +            continue;  | 
 | 109 | +        }  | 
 | 110 | + | 
 | 111 | +        let body = Body {  | 
 | 112 | +            timestamp:         unreliable_data.submission_time,  | 
 | 113 | +            nonce:             unreliable_data.nonce,  | 
 | 114 | +            emitter_chain:     unreliable_data.emitter_chain.into(),  | 
 | 115 | +            emitter_address:   Address(unreliable_data.emitter_address),  | 
 | 116 | +            sequence:          unreliable_data.sequence,  | 
 | 117 | +            consistency_level: unreliable_data.consistency_level,  | 
 | 118 | +            payload:           unreliable_data.payload.clone(),  | 
 | 119 | +        };  | 
 | 120 | + | 
 | 121 | +        match SignedBody::try_new(body, config.secret_key) {  | 
 | 122 | +            Ok(signed_body) => println!("Signed Body: {:?}", signed_body),  | 
 | 123 | +            Err(e) => tracing::error!(error = ?e, "Failed to sign body"),  | 
 | 124 | +        };  | 
 | 125 | +    }  | 
 | 126 | + | 
 | 127 | +    tokio::spawn(async move { unsubscribe().await });  | 
 | 128 | + | 
 | 129 | +    Err(PubsubClientError::ConnectionClosed(  | 
 | 130 | +        "Stream ended".to_string(),  | 
 | 131 | +    ))  | 
 | 132 | +}  | 
 | 133 | + | 
 | 134 | +fn load_secret_key(path: String) -> SecretKey {  | 
 | 135 | +    let bytes = fs::read(path.clone()).expect("Invalid secret key file");  | 
 | 136 | +    if bytes.len() == 32 {  | 
 | 137 | +        let byte_array: [u8; 32] = bytes.try_into().expect("Invalid secret key length");  | 
 | 138 | +        return SecretKey::from_byte_array(byte_array).expect("Invalid secret key length");  | 
 | 139 | +    }  | 
 | 140 | + | 
 | 141 | +    let content = fs::read_to_string(path)  | 
 | 142 | +        .expect("Invalid secret key file")  | 
 | 143 | +        .trim()  | 
 | 144 | +        .to_string();  | 
 | 145 | +    SecretKey::from_str(&content).expect("Invalid secret key")  | 
 | 146 | +}  | 
 | 147 | + | 
 | 148 | +#[tokio::main]  | 
 | 149 | +async fn main() {  | 
 | 150 | +    let run_options = config::RunOptions::parse();  | 
 | 151 | +    let secret_key = load_secret_key(run_options.secret_key_path);  | 
 | 152 | +    let client = PubsubClient::new(&run_options.pythnet_url)  | 
 | 153 | +        .await  | 
 | 154 | +        .expect("Invalid WebSocket URL");  | 
 | 155 | +    drop(client); // Drop the client to avoid holding the connection open  | 
 | 156 | +    let accumulator_address = Pubkey::from_str("G9LV2mp9ua1znRAfYwZz5cPiJMAbo1T6mbjdQsDZuMJg")  | 
 | 157 | +        .expect("Invalid accumulator address");  | 
 | 158 | +    let wormhole_pid =  | 
 | 159 | +        Pubkey::from_str(&run_options.wormhole_pid).expect("Invalid Wormhole program ID");  | 
 | 160 | + | 
 | 161 | +    loop {  | 
 | 162 | +        if let Err(e) = run_listener(ListenerConfig {  | 
 | 163 | +            ws_url: run_options.pythnet_url.clone(),  | 
 | 164 | +            secret_key,  | 
 | 165 | +            wormhole_pid,  | 
 | 166 | +            accumulator_address,  | 
 | 167 | +        })  | 
 | 168 | +        .await  | 
 | 169 | +        {  | 
 | 170 | +            tracing::error!(error = ?e, "Error listening to messages");  | 
 | 171 | +            sleep(Duration::from_millis(200)).await; // Wait before retrying  | 
 | 172 | +        }  | 
 | 173 | +    }  | 
 | 174 | +}  | 
0 commit comments