diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 713f27d..7060186 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -29,7 +29,7 @@ jobs: uses: actions-rs/cargo@v1 with: command: build - args: --release + args: --release --all-features - uses: montudor/action-zip@v1 with: args: zip -j -r galois.zip target/release/galois target/release/sidecar galois.toml.example sidecar.toml.example README.md LICENSE diff --git a/CHANGELOG.md b/CHANGELOG.md index 243a229..5bb1b84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# v0.7.0-rc.13 + +- online test of migration + # v0.5.1 - enable polygon diff --git a/Cargo.lock b/Cargo.lock index ebf12a1..c814a2b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -695,7 +695,7 @@ dependencies = [ "js-sys", "num-integer", "num-traits", - "time", + "time 0.1.45", "wasm-bindgen", "winapi 0.3.9", ] @@ -1032,6 +1032,12 @@ dependencies = [ "pem-rfc7468", ] +[[package]] +name = "deranged" +version = "0.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" + [[package]] name = "derive_more" version = "0.99.17" @@ -1622,6 +1628,7 @@ dependencies = [ "rust_decimal_macros", "serde", "serde_json", + "signal-hook", "sp-core", "sp-runtime", "sparse-merkle-tree", @@ -1631,6 +1638,7 @@ dependencies = [ "syn 1.0.109", "tempdir", "thiserror", + "tokio", "toml", ] @@ -4180,6 +4188,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" +[[package]] +name = "signal-hook" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8621587d4798caf8eb44879d42e56b9a93ea5dcd315a6487c357130095b62801" +dependencies = [ + "libc", + "signal-hook-registry", +] + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -4778,6 +4796,8 @@ dependencies = [ "rand 0.8.5", "rsa", "rust_decimal", + "rustls", + "rustls-pemfile", "sha1", "sha2 0.10.6", "smallvec 1.10.0", @@ -4785,7 +4805,10 @@ dependencies = [ "sqlx-rt", "stringprep", "thiserror", + "time 0.3.26", + "tokio-stream", "url", + "webpki-roots", ] [[package]] @@ -4812,6 +4835,11 @@ name = "sqlx-rt" version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "804d3f245f894e61b1e6263c84b23ca675d96753b5abfd5cc8597d86806e8024" +dependencies = [ + "once_cell", + "tokio", + "tokio-rustls", +] [[package]] name = "ss58-registry" @@ -5089,6 +5117,34 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "time" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a79d09ac6b08c1ab3906a2f7cc2e81a0e27c7ae89c63812df75e52bef0751e07" +dependencies = [ + "deranged", + "itoa", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" + +[[package]] +name = "time-macros" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75c65469ed6b3a4809d987a41eb1dc918e9bc1d92211cbad7ae82931846f7451" +dependencies = [ + "time-core", +] + [[package]] name = "tiny-bip39" version = "0.8.2" diff --git a/engine/Cargo.toml b/engine/Cargo.toml index 0c08412..123e8ba 100644 --- a/engine/Cargo.toml +++ b/engine/Cargo.toml @@ -9,7 +9,7 @@ description = "High performance matching system" [features] default = [] -v1-to-v2 = ["sqlx"] +v1-to-v2 = ["sqlx", "tokio"] [dependencies] rust_decimal = { version = "1.22", features = ["serde-bincode"] } @@ -20,7 +20,8 @@ async-trait = "0.1.63" serde_json = "1.0" rocksdb = "0.21" flate2 = { version = "1.0", features = ["zlib"], default-features = false } -sqlx = { version = "0.6.2", features = ["mysql", "decimal", "chrono"], optional = true } +sqlx = { version = "0.6.2", features = ["mysql", "decimal", "chrono", "runtime-tokio-rustls", "time"], optional = true } +tokio = { version = "1", features = ["full"], optional = true } toml = "0.5" lazy_static = "1.4" linked-hash-map = { version = "0.5.3", features = ["serde_impl"] } @@ -45,6 +46,7 @@ memmap = "0.7" dashmap = "5.4.0" indexmap = "1.9.2" rand = "0.8.5" +signal-hook = "0.3" smt = { git = "https://github.com/uinb/sparse-merkle-tree", tag = "v0.1.8", package = "sparse-merkle-tree", features = ["serde-rs", "blake2b"] } sub-api = { package = "substrate-api-client", git = "https://github.com/uinb/fusotao-rust-client.git", branch = "master" } node-api = { package = "ac-node-api", git = "https://github.com/uinb/fusotao-rust-client.git", branch = "master" } diff --git a/engine/src/config.rs b/engine/src/config.rs index 24eabae..f90d9f3 100644 --- a/engine/src/config.rs +++ b/engine/src/config.rs @@ -68,7 +68,7 @@ pub struct MigrateCmd { help = "The old coredump file path" )] pub input_path: String, - #[arg(long, action=clap::ArgAction::SetFalse, help = "Migrate coredump file only if set")] + #[arg(long, action=clap::ArgAction::SetTrue, help = "Migrate coredump file only if set")] pub core_only: bool, } diff --git a/engine/src/executor/mod.rs b/engine/src/executor/mod.rs index fedfc37..0043b7e 100644 --- a/engine/src/executor/mod.rs +++ b/engine/src/executor/mod.rs @@ -20,9 +20,9 @@ pub mod orders; use crate::{ core::*, - input::{Event, Message}, + input::{self, Event, Message}, orderbook::*, - output::Output, + output::{Depth, Output}, prover, snapshot, }; use anyhow::anyhow; @@ -64,7 +64,7 @@ pub fn init(recv: DriverChannel, market: MarketChannel, response: ResponseChanne log::debug!("event {} rejected: {}", id, e); let msg = json!({"error": e.to_string()}); let v = to_vec(&msg).unwrap_or_default(); - let _ = response.send((session, Message::new(req_id, v))); + let _ = response.send((session, Message::new_req(req_id, v))); } Err(EventsError::EventIgnored(id, e)) => { log::info!("event {} ignored: {}", id, e); @@ -135,11 +135,12 @@ fn do_execute( quote_fee: Decimal::zero(), }); // compatiable with old version since we don't use mysql auto increment id anymore + // session=0 indicates replaying from snapshot if session != 0 { response .send(( session, - Message::new( + Message::new_req( req_id, to_vec(&json!({ "id": mr.taker.order_id @@ -148,6 +149,17 @@ fn do_execute( ), )) .map_err(|_| EventsError::Interrupted(id))?; + let orderbook: &_ = orderbook; + let depth: Depth = (cmd.symbol, orderbook).into(); + response + .send(( + 0, + Message::new_broadcast( + input::DEPTH_UPDATED, + to_vec(&depth).unwrap_or_default(), + ), + )) + .map_err(|_| EventsError::Interrupted(id))?; } let out = clearing::clear( &mut data.accounts, @@ -163,7 +175,13 @@ fn do_execute( if session != 0 { // broadcast to all sessions response - .send((0, Message::new(0, to_vec(&o).unwrap_or_default()))) + .send(( + 0, + Message::new_broadcast( + input::ORDER_MATCHED, + to_vec(&o).unwrap_or_default(), + ), + )) .map_err(|_| EventsError::Interrupted(id))?; } } @@ -231,7 +249,7 @@ fn do_execute( response .send(( session, - Message::new( + Message::new_req( req_id, to_vec(&json!({ "id": cmd.order_id @@ -240,6 +258,17 @@ fn do_execute( ), )) .map_err(|_| EventsError::Interrupted(id))?; + let orderbook: &_ = orderbook; + let depth: Depth = (cmd.symbol, orderbook).into(); + response + .send(( + 0, + Message::new_broadcast( + input::DEPTH_UPDATED, + to_vec(&depth).unwrap_or_default(), + ), + )) + .map_err(|_| EventsError::Interrupted(id))?; } let out = clearing::clear( &mut data.accounts, @@ -398,42 +427,46 @@ fn do_execute( .map_or(vec![], |order| to_vec(order).unwrap_or_default()), None => vec![], }; - let _ = response.send((session, Message::new(req_id, v))); + let _ = response.send((session, Message::new_req(req_id, v))); Ok(()) } Event::QueryUserOrders(symbol, user_id, session, req_id) => { let o = data.orders.list(user_id, symbol); let v = to_vec(&o).unwrap_or_default(); - let _ = response.send((session, Message::new(req_id, v))); + let _ = response.send((session, Message::new_req(req_id, v))); Ok(()) } Event::QueryBalance(user_id, currency, session, req_id) => { let a = assets::get_balance_to_owned(&data.accounts, &user_id, currency); let v = to_vec(&a).unwrap_or_default(); - let _ = response.send((session, Message::new(req_id, v))); + let _ = response.send((session, Message::new_req(req_id, v))); Ok(()) } Event::QueryAccounts(user_id, session, req_id) => { let a = assets::get_account_to_owned(&data.accounts, &user_id); let v = to_vec(&a).unwrap_or_default(); - let _ = response.send((session, Message::new(req_id, v))); + let _ = response.send((session, Message::new_req(req_id, v))); Ok(()) } Event::QueryExchangeFee(symbol, session, req_id) => { - let mut v: HashMap = HashMap::new(); - let orderbook = data.orderbooks.get(&symbol); - match orderbook { - Some(book) => { - v.insert(String::from("maker_fee"), book.maker_fee); - v.insert(String::from("taker_fee"), book.taker_fee); - } - _ => { - v.insert(String::from("maker_fee"), Decimal::new(0, 0)); - v.insert(String::from("taker_fee"), Decimal::new(0, 0)); - } - } - let v = to_vec(&v).unwrap_or_default(); - let _ = response.send((session, Message::new(req_id, v))); + let (maker, taker) = data + .orderbooks + .get(&symbol) + .map(|b| (b.maker_fee, b.taker_fee)) + .unwrap_or_default(); + let map = HashMap::from([("maker_fee", maker), ("taker_fee", taker)]); + let v = to_vec(&map).unwrap_or_default(); + let _ = response.send((session, Message::new_req(req_id, v))); + Ok(()) + } + Event::QueryAllOrderbooks(session, req_id) => { + let depth = data + .orderbooks + .iter() + .map(|(s, o)| (*s, o).into()) + .collect::>(); + let v = to_vec(&depth).unwrap_or_default(); + let _ = response.send((session, Message::new_req(req_id, v))); Ok(()) } Event::Dump(id) => { diff --git a/engine/src/executor/orderbook.rs b/engine/src/executor/orderbook.rs index 7e7e4bb..84c04a8 100644 --- a/engine/src/executor/orderbook.rs +++ b/engine/src/executor/orderbook.rs @@ -12,9 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::core::{Amount, Fee, OrderId, Price, Symbol, UserId}; +use crate::core::{Amount, Fee, OrderId, Price, UserId}; use linked_hash_map::LinkedHashMap; -use rust_decimal::{prelude::Zero, Decimal}; +use rust_decimal::prelude::Zero; use serde::{Deserialize, Serialize}; use std::collections::{btree_map::OccupiedEntry, BTreeMap, HashMap}; @@ -116,7 +116,7 @@ impl OrderPage { } } - pub fn as_level(&self, base_scale: u32, quote_scale: u32, total: Amount) -> Level { + pub fn merge(&self, base_scale: u32, quote_scale: u32, total: Amount) -> Level { let mut amount = self.amount; let mut price = self.price; amount.rescale(base_scale); @@ -167,14 +167,6 @@ pub struct OrderBook { pub max_id: OrderId, } -#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] -pub struct Depth { - pub asks: Vec, - pub bids: Vec, - pub depth: usize, - pub symbol: Symbol, -} - impl OrderBook { pub fn new( base_scale: u32, @@ -221,29 +213,6 @@ impl OrderBook { ) } - pub fn as_depth(&self, level: usize, symbol: Symbol) -> Depth { - let mut asks = Vec::::new(); - let mut bids = Vec::::new(); - let mut ask_total = Decimal::zero(); - for (_, ask) in self.asks.iter().take(level) { - let level = ask.as_level(self.base_scale, self.quote_scale, ask_total); - ask_total = level.2; - asks.push(level); - } - let mut bid_total = Decimal::zero(); - for (_, bid) in self.bids.iter().rev().take(level) { - let level = bid.as_level(self.base_scale, self.quote_scale, bid_total); - bid_total = level.2; - bids.push(level); - } - Depth { - asks, - bids, - depth: level, - symbol, - } - } - pub fn insert(&mut self, order: Order, ask_or_bid: AskOrBid) { match ask_or_bid { AskOrBid::Ask => Self::insert_into(&mut self.asks, &mut self.indices, order), diff --git a/engine/src/input/mod.rs b/engine/src/input/mod.rs index 0838ffc..ec190be 100644 --- a/engine/src/input/mod.rs +++ b/engine/src/input/mod.rs @@ -211,6 +211,7 @@ impl TryInto for Input { self.session, self.req_id, )), + QUERY_ALL_ORDERBOOKS => Ok(Event::QueryAllOrderbooks(self.session, self.req_id)), DUMP => Ok(Event::Dump(self.cmd.event_id.ok_or(anyhow!(""))?)), _ => Err(anyhow!("Unsupported Command")), } @@ -233,11 +234,23 @@ pub enum Event { QueryAccounts(UserId, u64, u64), QueryExchangeFee(Symbol, u64, u64), QueryUserOrders(Symbol, UserId, u64, u64), + QueryAllOrderbooks(u64, u64), // the `EventId` has been executed Dump(EventId), } -impl Event {} +impl Event { + pub fn should_save(&self) -> bool { + matches!( + self, + Self::Limit(..) + | Self::Cancel(..) + | Self::TransferOut(..) + | Self::TransferIn(..) + | Self::UpdateSymbol(..) + ) + } +} #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LimitCmd { @@ -329,6 +342,7 @@ pub mod cmd { pub const GET_NONCE_FOR_BROKER: u32 = 26; pub const QUERY_FUSOTAO_PROGRESS: u32 = 27; pub const QUERY_USER_ORDERS: u32 = 28; + pub const QUERY_ALL_ORDERBOOKS: u32 = 29; } #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)] @@ -404,7 +418,11 @@ impl Command { pub const fn is_querying_core_data(&self) -> bool { matches!( self.cmd, - QUERY_ACCOUNTS | QUERY_BALANCE | QUERY_ORDER | QUERY_EXCHANGE_FEE + QUERY_ACCOUNTS + | QUERY_BALANCE + | QUERY_ORDER + | QUERY_EXCHANGE_FEE + | QUERY_ALL_ORDERBOOKS ) } @@ -419,15 +437,12 @@ impl Command { | QUERY_SCAN_HEIGHT ) } - - pub const fn is_internally_generated(&self) -> bool { - matches!(self.cmd, DUMP) - } } #[derive(Clone, Debug)] pub struct Message { pub req_id: u64, + pub broadcast_type: u8, pub payload: Vec, } @@ -436,13 +451,29 @@ const _PAYLOAD_MASK: u64 = 0x0000_ffff_0000_0000; const _CHK_SUM_MASK: u64 = 0x0000_0000_ffff_0000; const _ERR_RSP_MASK: u64 = 0x0000_0000_0000_0001; const _NXT_FRM_MASK: u64 = 0x0000_0000_0000_0002; +const _BRD_TYP_MASK: u64 = 0x0000_0000_0000_ff00; + +pub const ORDER_MATCHED: u8 = 0x01; +pub const DEPTH_UPDATED: u8 = 0x02; /// header = 0x0316<2bytes payload len><2bytes cheskcum><2bytes flag> pub const MAX_FRAME_SIZE: usize = 64 * 1024; impl Message { - pub fn new(req_id: u64, payload: Vec) -> Self { - Self { req_id, payload } + pub fn new_req(req_id: u64, payload: Vec) -> Self { + Self { + req_id, + broadcast_type: 0, + payload, + } + } + + pub fn new_broadcast(broadcast_type: u8, payload: Vec) -> Self { + Self { + req_id: 0, + broadcast_type, + payload, + } } pub fn encode(self) -> Vec { @@ -452,6 +483,7 @@ impl Message { for i in 0..frame_count - 1 { let mut header = _MAGIC_N_MASK; header |= (MAX_FRAME_SIZE as u64) << 32; + header |= (self.broadcast_type as u64) << 16; header |= 1; payload_len -= MAX_FRAME_SIZE; all.extend_from_slice(&header.to_be_bytes()); @@ -460,6 +492,7 @@ impl Message { } let mut header = _MAGIC_N_MASK; header |= (payload_len as u64) << 32; + header |= (self.broadcast_type as u64) << 16; all.extend_from_slice(&header.to_be_bytes()); all.extend_from_slice(&self.req_id.to_be_bytes()); all.extend_from_slice(&self.payload[(frame_count - 1) * MAX_FRAME_SIZE..]); @@ -474,12 +507,11 @@ impl Message { ((header & _PAYLOAD_MASK) >> 32) as usize } - #[allow(dead_code)] - pub const fn get_checksum(header: u64) -> u16 { - ((header & _CHK_SUM_MASK) >> 16) as u16 - } - pub const fn has_next_frame(header: u64) -> bool { (header & _NXT_FRM_MASK) == _NXT_FRM_MASK } + + pub const fn get_broadcast_type(header: u64) -> u8 { + ((header & _BRD_TYP_MASK) >> 16) as u8 + } } diff --git a/engine/src/input/sequencer.rs b/engine/src/input/sequencer.rs index 4125961..8bb5c18 100644 --- a/engine/src/input/sequencer.rs +++ b/engine/src/input/sequencer.rs @@ -29,6 +29,9 @@ pub fn init( recovery ); if C.dry_run.is_some() { + use signal_hook::{consts::SIGINT, iterator::Signals}; + let mut signals = Signals::new(&[SIGINT]).unwrap(); + signals.forever().next(); return; } std::thread::spawn(move || -> anyhow::Result<()> { @@ -38,15 +41,19 @@ pub fn init( let (session, req_id) = (input.session, input.req_id); input.sequence = current_id; let cmd = serde_json::to_vec(&input.cmd)?; - if let Ok(event) = input.try_into() { - save(current_id, cmd)?; - to_executor.send(event)?; - if current_id % C.sequence.checkpoint == 0 { - to_executor.send(Event::Dump(current_id))?; + if let Ok(event) = >::try_into(input) { + if event.should_save() { + save(current_id, cmd)?; + to_executor.send(event)?; + if current_id % C.sequence.checkpoint == 0 { + to_executor.send(Event::Dump(current_id))?; + } + } else { + to_executor.send(event)?; } current_id += 1; } else { - to_server.send((session, Message::new(req_id, vec![])))?; + to_server.send((session, Message::new_req(req_id, vec![])))?; } } }); diff --git a/engine/src/input/server.rs b/engine/src/input/server.rs index 0a61229..1d0bd9f 100644 --- a/engine/src/input/server.rs +++ b/engine/src/input/server.rs @@ -214,7 +214,7 @@ async fn handle_req( if cmd.is_querying_share_data() { let w = shared.handle_req(&cmd)?; to_session - .send(Message::new(req_id, w)) + .send(Message::new_req(req_id, w)) .await .map_err(|e| anyhow::anyhow!("read loop -> write loop -> {:?}", e))?; Ok(()) diff --git a/engine/src/migration.rs b/engine/src/migration.rs index b56e955..0fc670e 100644 --- a/engine/src/migration.rs +++ b/engine/src/migration.rs @@ -15,7 +15,11 @@ pub fn migrate(c: crate::config::MigrateCmd) { cfg_if::cfg_if! { if #[cfg(feature = "v1-to-v2")] { - v1_to_v2::migrate(c); + use tokio::runtime::Runtime; + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + v1_to_v2::migrate(c).await; + }); } else { println!("{:?}", c); panic!("The binary doesn't contain the feature, please re-compile with feature `v1-to-v2` to enable"); @@ -26,12 +30,11 @@ pub fn migrate(c: crate::config::MigrateCmd) { #[cfg(feature = "v1-to-v2")] mod v1_to_v2 { use crate::{config::*, core, input::Command}; - use rust_decimal::Decimal; use sqlx::mysql::MySqlConnectOptions; use sqlx::{MySql, Pool, Row}; use std::str::FromStr; - pub fn migrate(c: MigrateCmd) { + pub async fn migrate(c: MigrateCmd) { lazy_static::initialize(&C); let input_file = c.input_path; let output_file = c.output_path; @@ -58,20 +61,17 @@ mod v1_to_v2 { user_id: core::UserId::from_str(row.get("f_user_id")).unwrap(), symbol: s, direction: row.get("f_order_type"), - create_timestamp: row.get("f_timestamp"), - amount: Decimal::from_str(row.get("f_amount")).unwrap(), - price: Decimal::from_str(row.get("f_price")).unwrap(), + create_timestamp: row + .get::("f_timestamp") + .timestamp_millis() + as u64, + amount: row.get("f_amount"), + price: row.get("f_price"), status: row.get("f_status"), - matched_quote_amount: Decimal::from_str( - row.get("f_matched_quote_amount"), - ) - .unwrap(), - matched_base_amount: Decimal::from_str( - row.get("f_matched_base_amount"), - ) - .unwrap(), - base_fee: Decimal::from_str(row.get("f_base_fee")).unwrap(), - quote_fee: Decimal::from_str(row.get("f_quote_fee")).unwrap(), + matched_quote_amount: row.get("f_matched_quote_amount"), + matched_base_amount: row.get("f_matched_base_amount"), + base_fee: row.get("f_base_fee"), + quote_fee: row.get("f_quote_fee"), } }) .fetch_all(p.as_ref()) @@ -88,27 +88,45 @@ mod v1_to_v2 { .open(output_file) .unwrap(); data.into_raw(file).unwrap(); + log::info!("coredump file migrated"); if !ignore_sequences { - let r = futures::executor::block_on(async move { - let sql = format!( - "select f_id,f_cmd,f_status, f_time from t_sequence where f_event_id > {}", - event_id - ); - sqlx::query(&sql) - .map(|row: sqlx::mysql::MySqlRow| -> (u64, Command, u8) { - let mut cmd: Command = serde_json::from_str(row.get("f_cmd")).unwrap(); - cmd.timestamp = Some(row.get("f_time")); - (row.get("f_id"), cmd, row.get("f_status")) - }) - .fetch_all(pool.as_ref()) - .await - .unwrap() - }); - for cmd in r { - if cmd.2 != 2 { - crate::sequencer::save(cmd.0, serde_json::to_vec(&cmd.1).unwrap()).unwrap(); + let mut cursor = event_id; + log::info!("starting to migrate event from {}", cursor); + loop { + let new = migrate_sequences(&pool, cursor, 1000).await; + log::info!("migrating sequences {} to {}", cursor, new); + if cursor == new { + break; } + cursor = new; + } + } + } + + async fn migrate_sequences(pool: &Pool, event_id: u64, limit: usize) -> u64 { + let sql = format!( + "select f_id,f_cmd,f_status,f_timestamp from t_sequence where f_id > {} limit {}", + event_id, limit + ); + let r = sqlx::query(&sql) + .map(|row: sqlx::mysql::MySqlRow| -> (u64, Command, u8) { + let mut cmd: Command = serde_json::from_str(row.get("f_cmd")).unwrap(); + cmd.timestamp = Some( + row.get::("f_timestamp") + .unix_timestamp() as u64, + ); + (row.get("f_id"), cmd, row.get("f_status")) + }) + .fetch_all(pool) + .await + .unwrap(); + let mut cursor = event_id; + for cmd in r { + if cmd.2 != 2 { + crate::sequencer::save(cmd.0, serde_json::to_vec(&cmd.1).unwrap()).unwrap(); + cursor = cmd.0; } } + cursor } } diff --git a/engine/src/output/mod.rs b/engine/src/output/mod.rs index c21c6a4..4b012f7 100644 --- a/engine/src/output/mod.rs +++ b/engine/src/output/mod.rs @@ -13,6 +13,9 @@ // limitations under the License. use crate::core::*; +use crate::executor::orderbook::Level; +use rust_decimal::{prelude::Zero, Decimal}; +use serde::{Deserialize, Serialize}; pub mod market; @@ -36,3 +39,34 @@ pub struct Output { pub base_frozen: Amount, pub timestamp: u64, } + +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] +pub struct Depth { + pub asks: Vec, + pub bids: Vec, + pub symbol: Symbol, +} + +impl From<(Symbol, &OrderBook)> for Depth { + fn from(orderbook: (Symbol, &OrderBook)) -> Self { + let mut asks = Vec::::new(); + let mut bids = Vec::::new(); + let mut ask_total = Decimal::zero(); + for (_, ask) in orderbook.1.asks.iter() { + let level = ask.merge(orderbook.1.base_scale, orderbook.1.quote_scale, ask_total); + ask_total = level.2; + asks.push(level); + } + let mut bid_total = Decimal::zero(); + for (_, bid) in orderbook.1.bids.iter().rev() { + let level = bid.merge(orderbook.1.base_scale, orderbook.1.quote_scale, bid_total); + bid_total = level.2; + bids.push(level); + } + Depth { + asks, + bids, + symbol: orderbook.0, + } + } +} diff --git a/sidecar/src/backend.rs b/sidecar/src/backend.rs index 01b8ad8..3385a06 100644 --- a/sidecar/src/backend.rs +++ b/sidecar/src/backend.rs @@ -20,6 +20,7 @@ use galois_engine::{ input::{cmd::*, Command, Message}, orderbook::Order, orders::PendingOrder, + output::Depth, }; use rust_decimal::Decimal; use serde_json::{json, to_vec, Value as JsonValue}; @@ -37,7 +38,7 @@ use x25519_dalek::StaticSecret; type ToBackend = Sender>; type FromFrontend = Receiver>; type Notifier = Sender; -type Broadcast = UnboundedSender; +type Broadcast = UnboundedSender<(u8, JsonValue)>; #[derive(Clone, Debug)] pub struct BackendConnection { @@ -121,7 +122,8 @@ impl BackendConnection { } }; if req_id == 0 { - let _ = broadcast.send(json); + let typ = Message::get_broadcast_type(header); + let _ = broadcast.send((typ, json)); } else if let Some((_, noti)) = req.remove(&req_id) { let _ = noti.send(json).await; } @@ -145,7 +147,7 @@ impl BackendConnection { req_id += 1; let Req { payload, notifier } = req; sink.insert(req_id, notifier); - let msg = Message::new(req_id, payload); + let msg = Message::new_req(req_id, payload); match stream.write_all(&msg.encode()).await { Ok(_) => log::debug!("write to galois -> OK"), Err(e) => log::debug!("write to galois -> {:?}", e), @@ -307,6 +309,15 @@ impl BackendConnection { serde_json::from_value::>(r).map_err(|_| anyhow::anyhow!("galois?")) } + pub async fn get_orderbooks(&self) -> anyhow::Result> { + let r = self + .request(to_vec(&json!({ "cmd": QUERY_ALL_ORDERBOOKS })).expect("jsonser;qed")) + .await + .inspect_err(|e| log::debug!("{:?}", e)) + .map_err(|_| anyhow::anyhow!("Galois not available"))?; + serde_json::from_value::>(r).map_err(|_| anyhow::anyhow!("galois?")) + } + pub async fn get_x25519(&self) -> anyhow::Result { let r = self .request(to_vec(&json!({ "cmd": GET_X25519_KEY })).expect("jsonser;qed")) diff --git a/sidecar/src/context.rs b/sidecar/src/context.rs index 470cce8..e502567 100644 --- a/sidecar/src/context.rs +++ b/sidecar/src/context.rs @@ -21,7 +21,7 @@ use crate::{ AccountId32, Sr25519Pair, Sr25519Public, Sr25519Signature, }; use dashmap::DashMap; -use galois_engine::{core::*, fusotao::OffchainSymbol, orders::PendingOrder}; +use galois_engine::{core::*, fusotao::OffchainSymbol, input, orders::PendingOrder, output::Depth}; use hyper::{Body, Request, Response}; use parity_scale_codec::{Decode, Encode}; use rocksdb::DB; @@ -49,8 +49,12 @@ pub struct Context { pub x25519: StaticSecret, pub db: DB, pub subscribers: Arc>>, - pub session_nonce: Arc>, + // broker -> channel map to notify the active brokers + pub active_brokers: Arc>>, + // we simply maintein the symbol -> orderbook_depth map in form of string + pub orderbooks: Arc>, pub markets: Arc, OffchainSymbol)>>, + pub session_nonce: Arc>, } impl Context { @@ -64,39 +68,62 @@ impl Context { String, UnboundedSender<(String, PendingOrderWrapper)>, >::default()); + let active_brokers = Arc::new(DashMap::default()); let conn = backend.clone(); let markets = futures::executor::block_on(async move { conn.get_markets().await.map(|markets| { - Arc::new(DashMap::from_iter(markets.into_iter().map(|m| { - (m.symbol.clone(), (Arc::new(AtomicBool::new(false)), m)) - }))) + let map = DashMap::from_iter( + markets + .into_iter() + .map(|m| (m.symbol.clone(), (Arc::new(AtomicBool::new(false)), m))), + ); + Arc::new(map) }) }) .unwrap(); log::debug!("Loading marketings from backend: {:?}", markets); + let conn = backend.clone(); + let orderbooks = futures::executor::block_on(async move { + conn.get_orderbooks().await.map(|orderbooks| { + let map = DashMap::from_iter(orderbooks.into_iter().map(|d| (d.symbol.clone(), d))); + Arc::new(map) + }) + }) + .unwrap(); let sub = subscribers.clone(); + let depth = orderbooks.clone(); + // let notify_when_depth_updated = active_brokers.clone(); tokio::spawn(async move { loop { let v = dispatcher.recv().await; if v.is_none() { continue; } - let v = v.unwrap(); - // TODO support multi-types broadcasting messages from engine - if let Ok(o) = serde_json::from_value::(v) { - let user_id = o.user_id.to_string(); - let r = if let Some(u) = sub.get(&user_id) { - u.value().send((user_id.clone(), o.into())) - } else { - Ok(()) - }; - match r { - Err(e) => { - log::debug!("sending order to channel error: {}", e); - sub.remove(&user_id); + let (typ, payload) = v.unwrap(); + match typ { + input::ORDER_MATCHED => { + if let Ok(o) = serde_json::from_value::(payload) { + let user_id = o.user_id.to_string(); + let r = if let Some(u) = sub.get(&user_id) { + u.value().send((user_id.clone(), o.into())) + } else { + Ok(()) + }; + match r { + Err(e) => { + log::debug!("sending order to channel error: {}", e); + sub.remove(&user_id); + } + Ok(_) => {} + } + } + } + input::DEPTH_UPDATED => { + if let Ok(d) = serde_json::from_value::(payload) { + depth.insert(d.symbol.clone(), d); } - Ok(_) => {} } + _ => {} } } }); @@ -104,9 +131,11 @@ impl Context { backend, x25519, db, - session_nonce: Arc::new(DashMap::default()), + active_brokers, + orderbooks, subscribers, markets, + session_nonce: Arc::new(DashMap::default()), } } @@ -361,9 +390,6 @@ pub fn validate_signature_should_work() { let key: [u8; 32] = hex::decode(seed).unwrap().try_into().unwrap(); let p = Sr25519Pair::from_seed(&key); let signature = p.sign(&nonce); - println!("pubkey: 0x{}", hex::encode(&p.public())); - println!("nonce: 0x{}", hex::encode(&nonce)); - println!("signature: 0x{}", hex::encode(&signature)); assert!(Sr25519Pair::verify(&signature, nonce, &p.public())); } @@ -375,11 +401,5 @@ pub fn validate_deser_signature_should_work() { let public = Sr25519Public::from_raw(*AccountId32::from_ss58check(ss58).unwrap().as_ref()); let signature = hex::decode(&signature.trim_start_matches("0x")).unwrap(); let signature = Sr25519Signature::decode(&mut &signature[..]).unwrap(); - println!("pubkey: 0x{}", hex::encode(&public)); - println!("nonce: 0x{}", hex::encode(&nonce)); - println!( - "signature: 0x{}", - hex::encode::<&[u8; 64]>(signature.as_ref()) - ); assert!(Sr25519Pair::verify(&signature, nonce, &public)); }