Skip to content

Commit 89e2dcd

Browse files
authored
feat(lazer): add new formats to protocol (#2414)
* feat(lazer): add new formats to protocol * chore: remove json format * chore(lazer): bump protocol version
1 parent 4e12098 commit 89e2dcd

File tree

13 files changed

+487
-155
lines changed

13 files changed

+487
-155
lines changed

lazer/Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lazer/sdk/rust/client/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ description = "A Rust client for Pyth Lazer"
66
license = "Apache-2.0"
77

88
[dependencies]
9-
pyth-lazer-protocol = "0.5.0"
9+
pyth-lazer-protocol = { path = "../protocol", version = "0.6.0" }
1010
tokio = { version = "1", features = ["full"] }
1111
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
1212
futures-util = "0.3"
@@ -16,6 +16,7 @@ base64 = "0.22.1"
1616
anyhow = "1.0"
1717
tracing = "0.1"
1818
url = "2.4"
19+
derive_more = { version = "1.0.0", features = ["from"] }
1920

2021
[dev-dependencies]
2122
bincode = "1.3.3"

lazer/sdk/rust/client/examples/subscribe_price_feeds.rs

Lines changed: 119 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
use base64::Engine;
22
use futures_util::StreamExt;
3-
use pyth_lazer_client::LazerClient;
4-
use pyth_lazer_protocol::message::{EvmMessage, SolanaMessage};
3+
use pyth_lazer_client::{AnyResponse, LazerClient};
4+
use pyth_lazer_protocol::message::{
5+
EvmMessage, LeEcdsaMessage, LeUnsignedMessage, Message, SolanaMessage,
6+
};
57
use pyth_lazer_protocol::payload::PayloadData;
68
use pyth_lazer_protocol::router::{
7-
Chain, Channel, DeliveryFormat, FixedRate, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
9+
Channel, DeliveryFormat, FixedRate, Format, JsonBinaryEncoding, PriceFeedId, PriceFeedProperty,
810
SubscriptionParams, SubscriptionParamsRepr,
911
};
1012
use pyth_lazer_protocol::subscription::{Request, Response, SubscribeRequest, SubscriptionId};
13+
use tokio::pin;
1114

1215
fn get_lazer_access_token() -> String {
1316
// Place your access token in your env at LAZER_ACCESS_TOKEN or set it here
@@ -22,7 +25,8 @@ async fn main() -> anyhow::Result<()> {
2225
"wss://pyth-lazer.dourolabs.app/v1/stream",
2326
&get_lazer_access_token(),
2427
)?;
25-
let mut stream = client.start().await?;
28+
let stream = client.start().await?;
29+
pin!(stream);
2630

2731
let subscription_requests = vec![
2832
// Example subscription: Parsed JSON feed targeting Solana
@@ -36,7 +40,7 @@ async fn main() -> anyhow::Result<()> {
3640
PriceFeedProperty::BestAskPrice,
3741
PriceFeedProperty::BestBidPrice,
3842
],
39-
chains: vec![Chain::Solana],
43+
formats: vec![Format::Solana],
4044
delivery_format: DeliveryFormat::Json,
4145
json_binary_encoding: JsonBinaryEncoding::Base64,
4246
parsed: true,
@@ -57,7 +61,7 @@ async fn main() -> anyhow::Result<()> {
5761
PriceFeedProperty::BestAskPrice,
5862
PriceFeedProperty::BestBidPrice,
5963
],
60-
chains: vec![Chain::Evm, Chain::Solana],
64+
formats: vec![Format::Evm, Format::Solana],
6165
delivery_format: DeliveryFormat::Binary,
6266
json_binary_encoding: JsonBinaryEncoding::Base64,
6367
parsed: false,
@@ -80,49 +84,108 @@ async fn main() -> anyhow::Result<()> {
8084
while let Some(msg) = stream.next().await {
8185
// The stream gives us base64-encoded binary messages. We need to decode, parse, and verify them.
8286
match msg? {
83-
Response::StreamUpdated(update) => {
84-
if let Some(evm_data) = update.payload.evm {
85-
// Decode binary data
86-
let binary_data =
87-
base64::engine::general_purpose::STANDARD.decode(&evm_data.data)?;
88-
let evm_message = EvmMessage::deserialize_slice(&binary_data)?;
89-
90-
// Parse and verify the EVM message
91-
let payload = parse_and_verify_evm_message(&evm_message);
92-
println!("EVM payload: {payload:?}\n");
93-
}
87+
AnyResponse::Json(msg) => match msg {
88+
Response::StreamUpdated(update) => {
89+
println!("Received a JSON update for {:?}", update.subscription_id);
90+
if let Some(evm_data) = update.payload.evm {
91+
// Decode binary data
92+
let binary_data =
93+
base64::engine::general_purpose::STANDARD.decode(&evm_data.data)?;
94+
let evm_message = EvmMessage::deserialize_slice(&binary_data)?;
95+
96+
// Parse and verify the EVM message
97+
let payload = parse_and_verify_evm_message(&evm_message);
98+
println!("EVM payload: {payload:?}");
99+
}
94100

95-
if let Some(solana_data) = update.payload.solana {
96-
// Decode binary data
97-
let binary_data =
98-
base64::engine::general_purpose::STANDARD.decode(&solana_data.data)?;
99-
let solana_message = SolanaMessage::deserialize_slice(&binary_data)?;
101+
if let Some(solana_data) = update.payload.solana {
102+
// Decode binary data
103+
let binary_data =
104+
base64::engine::general_purpose::STANDARD.decode(&solana_data.data)?;
105+
let solana_message = SolanaMessage::deserialize_slice(&binary_data)?;
100106

101-
// Parse and verify the Solana message
102-
let payload = parse_and_verify_solana_message(&solana_message);
103-
println!("Solana payload: {payload:?}\n");
104-
}
107+
// Parse and verify the Solana message
108+
let payload = parse_and_verify_solana_message(&solana_message);
109+
println!("Solana payload: {payload:?}");
110+
}
105111

106-
if let Some(parsed) = update.payload.parsed {
107-
// Parsed payloads (`parsed: true`) are already decoded and ready to use
108-
for feed in parsed.price_feeds {
109-
println!(
110-
"Parsed payload: {:?}: {:?} at {:?}\n",
111-
feed.price_feed_id, feed, parsed.timestamp_us
112-
);
112+
if let Some(data) = update.payload.le_ecdsa {
113+
// Decode binary data
114+
let binary_data =
115+
base64::engine::general_purpose::STANDARD.decode(&data.data)?;
116+
let message = LeEcdsaMessage::deserialize_slice(&binary_data)?;
117+
118+
// Parse and verify the message
119+
let payload = parse_and_verify_le_ecdsa_message(&message);
120+
println!("LeEcdsa payload: {payload:?}");
121+
}
122+
123+
if let Some(data) = update.payload.le_unsigned {
124+
// Decode binary data
125+
let binary_data =
126+
base64::engine::general_purpose::STANDARD.decode(&data.data)?;
127+
let message = LeUnsignedMessage::deserialize_slice(&binary_data)?;
128+
129+
// Parse the message
130+
let payload = PayloadData::deserialize_slice_le(&message.payload)?;
131+
println!("LE unsigned payload: {payload:?}");
132+
}
133+
134+
if let Some(parsed) = update.payload.parsed {
135+
// Parsed payloads (`parsed: true`) are already decoded and ready to use
136+
for feed in parsed.price_feeds {
137+
println!(
138+
"Parsed payload: {:?}: {:?} at {:?}",
139+
feed.price_feed_id, feed, parsed.timestamp_us
140+
);
141+
}
142+
}
143+
}
144+
msg => println!("Received non-update message: {msg:?}"),
145+
},
146+
AnyResponse::Binary(msg) => {
147+
println!("Received a binary update for {:?}", msg.subscription_id);
148+
for message in msg.messages {
149+
match message {
150+
Message::Evm(message) => {
151+
// Parse and verify the EVM message
152+
let payload = parse_and_verify_evm_message(&message);
153+
println!("EVM payload: {payload:?}");
154+
}
155+
Message::Solana(message) => {
156+
// Parse and verify the Solana message
157+
let payload = parse_and_verify_solana_message(&message);
158+
println!("Solana payload: {payload:?}");
159+
}
160+
Message::LeEcdsa(message) => {
161+
let payload = parse_and_verify_le_ecdsa_message(&message);
162+
println!("LeEcdsa payload: {payload:?}");
163+
}
164+
Message::LeUnsigned(message) => {
165+
let payload = PayloadData::deserialize_slice_le(&message.payload)?;
166+
println!("LeUnsigned payload: {payload:?}");
167+
}
168+
Message::Json(message) => {
169+
for feed in message.price_feeds {
170+
println!(
171+
"JSON payload: {:?}: {:?} at {:?}",
172+
feed.price_feed_id, feed, message.timestamp_us
173+
);
174+
}
175+
}
113176
}
114177
}
115178
}
116-
_ => println!("Received non-update message"),
117179
}
180+
println!();
118181

119182
count += 1;
120183
if count >= 50 {
121184
break;
122185
}
123186
}
124187

125-
// Unsubscribe before exiting
188+
// Unsubscribe example
126189
for sub_id in [SubscriptionId(1), SubscriptionId(2)] {
127190
client.unsubscribe(sub_id).await?;
128191
println!("Unsubscribed from {:?}", sub_id);
@@ -147,12 +210,32 @@ fn parse_and_verify_solana_message(solana_message: &SolanaMessage) -> anyhow::Re
147210

148211
fn parse_and_verify_evm_message(evm_message: &EvmMessage) -> anyhow::Result<PayloadData> {
149212
// Recover pubkey from message
150-
libsecp256k1::recover(
213+
let public_key = libsecp256k1::recover(
151214
&libsecp256k1::Message::parse(&alloy_primitives::keccak256(&evm_message.payload)),
152215
&libsecp256k1::Signature::parse_standard(&evm_message.signature)?,
153216
&libsecp256k1::RecoveryId::parse(evm_message.recovery_id)?,
154217
)?;
218+
println!(
219+
"evm address recovered from signature: {:?}",
220+
hex::encode(&alloy_primitives::keccak256(&public_key.serialize()[1..])[12..])
221+
);
155222

156223
let payload = PayloadData::deserialize_slice_be(&evm_message.payload)?;
157224
Ok(payload)
158225
}
226+
227+
fn parse_and_verify_le_ecdsa_message(message: &LeEcdsaMessage) -> anyhow::Result<PayloadData> {
228+
// Recover pubkey from message
229+
let public_key = libsecp256k1::recover(
230+
&libsecp256k1::Message::parse(&alloy_primitives::keccak256(&message.payload)),
231+
&libsecp256k1::Signature::parse_standard(&message.signature)?,
232+
&libsecp256k1::RecoveryId::parse(message.recovery_id)?,
233+
)?;
234+
println!(
235+
"evm address recovered from signature: {:?}",
236+
hex::encode(&alloy_primitives::keccak256(&public_key.serialize()[1..])[12..])
237+
);
238+
239+
let payload = PayloadData::deserialize_slice_le(&message.payload)?;
240+
Ok(payload)
241+
}

lazer/sdk/rust/client/src/lib.rs

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use anyhow::Result;
2-
use futures_util::{SinkExt, StreamExt};
3-
use pyth_lazer_protocol::subscription::{
4-
ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest,
2+
use derive_more::From;
3+
use futures_util::{SinkExt, StreamExt, TryStreamExt};
4+
use pyth_lazer_protocol::{
5+
binary_update::BinaryWsUpdate,
6+
subscription::{ErrorResponse, Request, Response, SubscriptionId, UnsubscribeRequest},
57
};
68
use tokio_tungstenite::{connect_async, tungstenite::Message};
79
use url::Url;
10+
811
/// A WebSocket client for consuming Pyth Lazer price feed updates
912
///
1013
/// This client provides a simple interface to:
@@ -25,6 +28,12 @@ pub struct LazerClient {
2528
>,
2629
}
2730

31+
#[derive(Debug, Clone, PartialEq, Eq, Hash, From)]
32+
pub enum AnyResponse {
33+
Json(Response),
34+
Binary(BinaryWsUpdate),
35+
}
36+
2837
impl LazerClient {
2938
/// Creates a new Lazer client instance
3039
///
@@ -48,7 +57,7 @@ impl LazerClient {
4857
///
4958
/// # Returns
5059
/// Returns a stream of responses from the server
51-
pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<Response>>> {
60+
pub async fn start(&mut self) -> Result<impl futures_util::Stream<Item = Result<AnyResponse>>> {
5261
let url = self.endpoint.clone();
5362
let mut request =
5463
tokio_tungstenite::tungstenite::client::IntoClientRequest::into_client_request(url)?;
@@ -62,19 +71,27 @@ impl LazerClient {
6271
let (ws_sender, ws_receiver) = ws_stream.split();
6372

6473
self.ws_sender = Some(ws_sender);
65-
let response_stream = ws_receiver.map(|msg| -> Result<Response> {
66-
let msg = msg?;
67-
match msg {
68-
Message::Text(text) => Ok(serde_json::from_str(&text)?),
69-
Message::Binary(data) => Ok(Response::from_binary(&data)?),
70-
Message::Close(_) => Ok(Response::Error(ErrorResponse {
71-
error: "WebSocket connection closed".to_string(),
72-
})),
73-
_ => Ok(Response::Error(ErrorResponse {
74-
error: "Unexpected message type".to_string(),
75-
})),
76-
}
77-
});
74+
let response_stream =
75+
ws_receiver
76+
.map_err(anyhow::Error::from)
77+
.try_filter_map(|msg| async {
78+
let r: Result<Option<AnyResponse>> = match msg {
79+
Message::Text(text) => {
80+
Ok(Some(serde_json::from_str::<Response>(&text)?.into()))
81+
}
82+
Message::Binary(data) => {
83+
Ok(Some(BinaryWsUpdate::deserialize_slice(&data)?.into()))
84+
}
85+
Message::Close(_) => Ok(Some(
86+
Response::Error(ErrorResponse {
87+
error: "WebSocket connection closed".to_string(),
88+
})
89+
.into(),
90+
)),
91+
_ => Ok(None),
92+
};
93+
r
94+
});
7895

7996
Ok(response_stream)
8097
}

lazer/sdk/rust/protocol/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "pyth-lazer-protocol"
3-
version = "0.5.1"
3+
version = "0.6.0"
44
edition = "2021"
55
description = "Pyth Lazer SDK - protocol types."
66
license = "Apache-2.0"

lazer/sdk/rust/protocol/examples/parse_and_verify.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
use {
22
anyhow::bail,
3-
byteorder::{ReadBytesExt, BE},
3+
byteorder::{ReadBytesExt, LE},
44
pyth_lazer_protocol::{
5-
message::{EvmMessage, SolanaMessage},
6-
payload::{PayloadData, EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC_BE},
5+
message::{
6+
format_magics_le::{EVM_FORMAT_MAGIC, SOLANA_FORMAT_MAGIC},
7+
EvmMessage, SolanaMessage,
8+
},
9+
payload::PayloadData,
710
},
811
std::io::{stdin, BufRead, Cursor},
912
};
@@ -12,8 +15,8 @@ fn main() -> anyhow::Result<()> {
1215
println!("Reading hex encoded payloads from stdin...");
1316
for line in stdin().lock().lines() {
1417
let message = hex::decode(line?.trim())?;
15-
let magic = Cursor::new(&message).read_u32::<BE>()?;
16-
if magic == SOLANA_FORMAT_MAGIC_BE {
18+
let magic = Cursor::new(&message).read_u32::<LE>()?;
19+
if magic == SOLANA_FORMAT_MAGIC {
1720
println!("this is a solana payload");
1821
let message = SolanaMessage::deserialize_slice(&message)?;
1922
println!(

lazer/sdk/rust/protocol/src/api.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
use serde::{Deserialize, Serialize};
22

33
use crate::router::{
4-
Chain, Channel, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty,
4+
Channel, Format, JsonBinaryEncoding, JsonUpdate, PriceFeedId, PriceFeedProperty,
55
};
66

77
#[derive(Debug, Clone, Serialize, Deserialize)]
88
#[serde(rename_all = "camelCase")]
99
pub struct LatestPriceRequest {
1010
pub price_feed_ids: Vec<PriceFeedId>,
1111
pub properties: Vec<PriceFeedProperty>,
12-
pub chains: Vec<Chain>,
12+
// "chains" was renamed to "formats". "chains" is still supported for compatibility.
13+
#[serde(alias = "chains")]
14+
pub formats: Vec<Format>,
1315
#[serde(default)]
1416
pub json_binary_encoding: JsonBinaryEncoding,
1517
/// If `true`, the stream update will contain a JSON object containing

0 commit comments

Comments
 (0)