From 2faac59ae36228ca6195ef8c1f94efd2bd9c7977 Mon Sep 17 00:00:00 2001 From: Kachinsky <22611640+kb1ns@users.noreply.github.com> Date: Tue, 14 Nov 2023 23:25:54 +0800 Subject: [PATCH] add migrate sub command (#121) * add migrate sub command --------- Co-authored-by: Cyberaurora <22611640+cyberaurora@users.noreply.github.com> --- Cargo.lock | 136 ++-------------------------------- Cargo.toml | 1 + bin/src/galois.rs | 3 +- engine/Cargo.toml | 8 +- engine/src/config.rs | 63 +++++++++------- engine/src/core.rs | 17 +++-- engine/src/input/sequencer.rs | 2 +- engine/src/input/server.rs | 2 +- engine/src/lib.rs | 1 + engine/src/migration.rs | 114 ++++++++++++++++++++++++++++ engine/src/output/market.rs | 6 +- engine/src/output/mod.rs | 45 +---------- sidecar/Cargo.toml | 5 +- sidecar/src/context.rs | 40 +++++----- 14 files changed, 207 insertions(+), 236 deletions(-) create mode 100644 engine/src/migration.rs diff --git a/Cargo.lock b/Cargo.lock index 6e0bda7..2f0b269 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -147,12 +147,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "arc-swap" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" - [[package]] name = "array-bytes" version = "4.2.0" @@ -1038,17 +1032,6 @@ dependencies = [ "pem-rfc7468", ] -[[package]] -name = "derivative" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "derive_more" version = "0.99.17" @@ -1071,12 +1054,6 @@ dependencies = [ "opaque-debug 0.3.0", ] -[[package]] -name = "destructure_traitobject" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c877555693c14d2f84191cfd3ad8582790fc52b5e2274b40b59cf5f5cea25c7" - [[package]] name = "digest" version = "0.8.1" @@ -1197,9 +1174,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece" dependencies = [ "humantime", "is-terminal", @@ -1625,6 +1602,7 @@ dependencies = [ "chrono", "clap 4.1.7", "dashmap", + "env_logger", "flate2", "futures", "generic-array 0.14.6", @@ -1634,7 +1612,6 @@ dependencies = [ "lazy_static", "linked-hash-map", "log", - "log4rs", "lz4_flex", "magic-crypt", "memmap", @@ -1648,6 +1625,7 @@ dependencies = [ "sp-core", "sp-runtime", "sparse-merkle-tree", + "sqlx", "structopt", "substrate-api-client", "syn 1.0.109", @@ -2533,9 +2511,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.139" +version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "201de327520df007757c1f0adce6e827fe8562fbc28bfd9c15571c66ca1f5f79" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" [[package]] name = "libloading" @@ -2683,39 +2661,6 @@ dependencies = [ "value-bag", ] -[[package]] -name = "log-mdc" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a94d21414c1f4a51209ad204c1776a3d0765002c76c6abcb602a6f09f1e881c7" - -[[package]] -name = "log4rs" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d36ca1786d9e79b8193a68d480a0907b612f109537115c6ff655a3a1967533fd" -dependencies = [ - "anyhow", - "arc-swap", - "chrono", - "derivative", - "fnv", - "humantime", - "libc", - "log", - "log-mdc", - "parking_lot 0.12.1", - "serde", - "serde-value", - "serde_json", - "serde_yaml", - "thiserror", - "thread-id", - "toml", - "typemap-ors", - "winapi 0.3.9", -] - [[package]] name = "lru" version = "0.7.8" @@ -3148,15 +3093,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "ordered-float" -version = "2.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" -dependencies = [ - "num-traits", -] - [[package]] name = "os_str_bytes" version = "6.4.1" @@ -4125,16 +4061,6 @@ dependencies = [ "serde_derive", ] -[[package]] -name = "serde-value" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" -dependencies = [ - "ordered-float", - "serde", -] - [[package]] name = "serde_derive" version = "1.0.152" @@ -4157,18 +4083,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_yaml" -version = "0.8.26" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "578a7433b776b56a35785ed5ce9a7e777ac0598aac5a6dd1b4b18a307c7fc71b" -dependencies = [ - "indexmap", - "ryu", - "serde", - "yaml-rust", -] - [[package]] name = "sha-1" version = "0.8.2" @@ -5153,17 +5067,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "thread-id" -version = "4.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fdfe0627923f7411a43ec9ec9c39c3a9b4151be313e0922042581fb6c9b717f" -dependencies = [ - "libc", - "redox_syscall", - "winapi 0.3.9", -] - [[package]] name = "thread_local" version = "1.1.4" @@ -5511,15 +5414,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "typemap-ors" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a68c24b707f02dd18f1e4ccceb9d49f2058c2fb86384ef9972592904d7a28867" -dependencies = [ - "unsafe-any-ors", -] - [[package]] name = "typenum" version = "1.16.0" @@ -5583,15 +5477,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" -[[package]] -name = "unsafe-any-ors" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a303d30665362d9680d7d91d78b23f5f899504d4f08b3c4cf08d055d87c0ad" -dependencies = [ - "destructure_traitobject", -] - [[package]] name = "untrusted" version = "0.7.1" @@ -5993,15 +5878,6 @@ dependencies = [ "zeroize", ] -[[package]] -name = "yaml-rust" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56c1936c4cc7a1c9ab21a1ebb602eb942ba868cbd44a99cb7cdc5892335e1c85" -dependencies = [ - "linked-hash-map", -] - [[package]] name = "zeroize" version = "1.5.7" diff --git a/Cargo.toml b/Cargo.toml index 40a7bd4..693c8c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,3 +4,4 @@ members = [ "sidecar", "bin", ] +resolver = "2" diff --git a/bin/src/galois.rs b/bin/src/galois.rs index f746403..f157eba 100644 --- a/bin/src/galois.rs +++ b/bin/src/galois.rs @@ -74,7 +74,8 @@ fn main() { env_logger::init(); let opts = config::GaloisCli::parse(); match opts.sub { - Some(config::SubCmd::EncryptConfig) => config::print_config(&opts.file).unwrap(), + Some(config::SubCmd::Encrypt) => config::print_config(&opts.file).unwrap(), + Some(config::SubCmd::Migrate(c)) => migration::migrate(c), None => { print_banner(); lazy_static::initialize(&C); diff --git a/engine/Cargo.toml b/engine/Cargo.toml index 5a05857..0c08412 100644 --- a/engine/Cargo.toml +++ b/engine/Cargo.toml @@ -8,7 +8,8 @@ repository = "https://github.com/uinb/galois" description = "High performance matching system" [features] -v1-to-v2 = [] +default = [] +v1-to-v2 = ["sqlx"] [dependencies] rust_decimal = { version = "1.22", features = ["serde-bincode"] } @@ -19,8 +20,7 @@ async-trait = "0.1.63" serde_json = "1.0" rocksdb = "0.21" flate2 = { version = "1.0", features = ["zlib"], default-features = false } -# mysql = "23.0" -# redis = { version = "0.17", features = ["tls", "tokio-rt-core", "tokio-tls-comp", "native-tls","async-native-tls", "async-std-tls-comp"] } +sqlx = { version = "0.6.2", features = ["mysql", "decimal", "chrono"], optional = true } toml = "0.5" lazy_static = "1.4" linked-hash-map = { version = "0.5.3", features = ["serde_impl"] } @@ -30,7 +30,7 @@ chashmap = "2.2" syn = "1.0.107" lz4_flex = "0.10.0" log = { version = "0.4", features = ["serde"] } -log4rs = { version = "1.0", features = ["json_encoder", "toml_format"] } +env_logger = "0.10.1" chrono = "0.4" magic-crypt = "3.1" anyhow = "1" diff --git a/engine/src/config.rs b/engine/src/config.rs index d070d66..24eabae 100644 --- a/engine/src/config.rs +++ b/engine/src/config.rs @@ -16,7 +16,7 @@ use clap::Parser; use serde::{Deserialize, Serialize}; #[derive(Debug, Parser)] -#[command(author, version)] +#[command(author = "UINB Tech", version)] pub struct GaloisCli { #[arg(short('c'), long("config"), required = true, value_name = "FILE")] pub file: std::path::PathBuf, @@ -29,27 +29,56 @@ pub struct GaloisCli { } #[derive(Debug, clap::Subcommand)] -#[command(version)] pub enum SubCmd { - EncryptConfig, + #[clap( + name = "encrypt", + about = "Encrypt config file using environment variable MAGIC_KEY as the key" + )] + Encrypt, + #[clap( + name = "migrate", + about = "Migrate coredump file and sequence storages" + )] + Migrate(MigrateCmd), } #[derive(Debug, clap::Args)] -#[command(version)] pub struct RunCmd { #[arg( long, - value_name = "EVENT-ID", - help = "Run galois in `dry-run` mode, skipping all output." + value_name = "EVENT_ID", + help = "Run galois in `dry-run` mode, skipping all outputs." )] dry_run: Option, } +#[derive(Debug, clap::Args)] +pub struct MigrateCmd { + #[arg( + long, + short = 'o', + value_name = "PATH", + help = "The new coredump file path" + )] + pub output_path: String, + #[arg( + long, + short = 'i', + value_name = "PATH", + help = "The old coredump file path" + )] + pub input_path: String, + #[arg(long, action=clap::ArgAction::SetFalse, help = "Migrate coredump file only if set")] + pub core_only: bool, +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Config { pub server: ServerConfig, pub sequence: SequenceConfig, pub fusotao: FusotaoConfig, + #[cfg(feature = "v1-to-v2")] + pub mysql: MysqlConfig, #[serde(skip_serializing)] pub dry_run: Option, } @@ -213,25 +242,3 @@ fn init_config(toml: &str, key: Option) -> anyhow::Result { } Ok(cfg) } - -#[test] -pub fn test_config() { - let toml = r#" - [server] - bind_addr = "127.0.0.1:8097" - data_home = "/tmp/galois" - - [sequence] - checkpoint = 100000 - enable_from_genesis = true - - [fusotao] - node_url = "ws://localhost:9944" - key_seed = "//Alice" - x25519_priv = "0xedcff0c69e4c0fa7e9a36e2e6d07f2cc355c8d25907a0ad2ab7e03b24f8e90f3" - proof_batch_limit = 20 - claim_block = 1 - "#; - let config = init_config(&toml, None); - assert!(config.is_ok()); -} diff --git a/engine/src/core.rs b/engine/src/core.rs index 646ebaa..0053f30 100644 --- a/engine/src/core.rs +++ b/engine/src/core.rs @@ -99,6 +99,13 @@ impl std::str::FromStr for B256 { } } +impl std::string::ToString for B256 { + fn to_string(&self) -> String { + use sp_core::crypto::Ss58Codec; + self.to_ss58check() + } +} + impl std::fmt::Debug for B256 { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { use sp_core::crypto::Ss58Codec; @@ -262,23 +269,21 @@ pub mod v1 { } } - impl TryInto for (DataV1, Vec) { - type Error = anyhow::Error; - - fn try_into(self) -> Result { + impl Into for (DataV1, Vec) { + fn into(self) -> Data { let (data, pending_orders) = self; let mut orders = UserOrders::new(); pending_orders.into_iter().for_each(|order| { orders.insert(order); }); - Ok(Data { + Data { orderbooks: data.orderbooks, accounts: data.accounts, merkle_tree: data.merkle_tree, current_event_id: data.current_event_id, tvl: data.tvl, orders, - }) + } } } } diff --git a/engine/src/input/sequencer.rs b/engine/src/input/sequencer.rs index e0d7a5d..4125961 100644 --- a/engine/src/input/sequencer.rs +++ b/engine/src/input/sequencer.rs @@ -52,7 +52,7 @@ pub fn init( }); } -fn save(id: u64, cmd: Vec) -> anyhow::Result<()> { +pub fn save(id: u64, cmd: Vec) -> anyhow::Result<()> { SEQ_STORE.put(id_to_key(id), cmd)?; Ok(()) } diff --git a/engine/src/input/server.rs b/engine/src/input/server.rs index 3b3b5f3..0a61229 100644 --- a/engine/src/input/server.rs +++ b/engine/src/input/server.rs @@ -57,7 +57,7 @@ pub fn init(receiver: FromBackend, sender: ToBackend, shared: Shared) { log::info!("bye!"); } -/// relay the messages from backend to session, need to switch the runtime using async +/// relay the messages from backend to session, using block_on to switch to async fn relay(receiver: FromBackend, sessions: Arc>) -> Result<()> { loop { let (session_id, msg) = receiver.recv()?; diff --git a/engine/src/lib.rs b/engine/src/lib.rs index 267803a..0c144c4 100644 --- a/engine/src/lib.rs +++ b/engine/src/lib.rs @@ -25,6 +25,7 @@ pub mod core; pub mod executor; pub mod fusotao; pub mod input; +pub mod migration; pub mod output; pub mod shared; pub mod snapshot; diff --git a/engine/src/migration.rs b/engine/src/migration.rs new file mode 100644 index 0000000..3e4f14d --- /dev/null +++ b/engine/src/migration.rs @@ -0,0 +1,114 @@ +// Copyright 2021-2023 UINB Technologies Pte. Ltd. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub fn migrate(c: crate::config::MigrateCmd) { + cfg_if::cfg_if! { + if #[cfg(feature = "v1-to-v2")] { + v1_to_v2::migrate(c); + } else { + println!("{:?}", c); + panic!("The binary doesn't contain the feature, please re-compile with feature `v1-to-v2` to enable"); + } + } +} + +#[cfg(feature = "v1-to-v2")] +mod v1_to_v2 { + use crate::{config::*, core, input::Command}; + use rust_decimal::Decimal; + use sqlx::mysql::MySqlConnectOptions; + use sqlx::{MySql, Pool, Row}; + use std::str::FromStr; + + pub fn migrate(c: MigrateCmd) { + lazy_static::initialize(&C); + let input_file = c.input_path; + let output_file = c.output_path; + let ignore_sequences = c.core_only; + let f = std::fs::File::open(input_file).unwrap(); + let data = core::v1::DataV1::from_raw(f).unwrap(); + let option: MySqlConnectOptions = C.mysql.url.parse().unwrap(); + let pool: Pool = + futures::executor::block_on(async move { Pool::connect_with(option).await.unwrap() }); + let mut pendings = vec![]; + let pool = std::sync::Arc::new(pool); + for (symbol, _) in data.orderbooks.iter() { + let sql = format!( + "select * from t_order_{}_{} where f_status in (0, 3)", + symbol.0, symbol.1 + ); + let s = *symbol; + let p = pool.clone(); + let r = futures::executor::block_on(async move { + sqlx::query(&sql) + .map(|row: sqlx::mysql::MySqlRow| -> core::PendingOrder { + core::PendingOrder { + order_id: row.get("f_id"), + user_id: core::UserId::from_str(row.get("f_user_id")).unwrap(), + symbol: s, + direction: row.get("f_order_type"), + create_timestamp: row.get("f_timestamp"), + amount: Decimal::from_str(row.get("f_amount")).unwrap(), + price: Decimal::from_str(row.get("f_price")).unwrap(), + status: row.get("f_status"), + matched_quote_amount: Decimal::from_str( + row.get("f_matched_quote_amount"), + ) + .unwrap(), + matched_base_amount: Decimal::from_str( + row.get("f_matched_base_amount"), + ) + .unwrap(), + base_fee: Decimal::from_str(row.get("f_base_fee")).unwrap(), + quote_fee: Decimal::from_str(row.get("f_quote_fee")).unwrap(), + } + }) + .fetch_all(p.as_ref()) + .await + .unwrap() + }); + pendings.extend(r); + } + let data: core::Data = (data, pendings).into(); + let event_id = data.current_event_id; + let file = std::fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(output_file) + .unwrap(); + data.into_raw(file).unwrap(); + if !ignore_sequences { + let r = futures::executor::block_on(async move { + let sql = format!( + "select f_id,f_cmd,f_status, f_time from t_sequence where f_event_id > {}", + event_id + ); + sqlx::query(&sql) + .map(|row: sqlx::mysql::MySqlRow| -> (u64, Command, u8) { + let mut cmd: Command = serde_json::from_str(row.get("f_cmd")).unwrap(); + cmd.timestamp = Some(row.get("f_time")); + (row.get("f_id"), cmd, row.get("f_status")) + }) + .fetch_all(pool.as_ref()) + .await + .unwrap() + }); + for cmd in r { + if cmd.2 != 2 { + crate::sequencer::save(cmd.0, serde_json::to_vec(&cmd.1).unwrap()).unwrap(); + } + } + } + } +} diff --git a/engine/src/output/market.rs b/engine/src/output/market.rs index 03fa1a9..a0e0dc4 100644 --- a/engine/src/output/market.rs +++ b/engine/src/output/market.rs @@ -13,16 +13,16 @@ // limitations under the License. use crate::{config::C, input::*, output::*}; -use rust_decimal::{prelude::Zero, Decimal}; use std::sync::mpsc::{Receiver, Sender}; type MarketChannel = Receiver>; type ResponseChannel = Sender<(u64, Message)>; -pub fn init(rx: MarketChannel, tx: ResponseChannel) { +// TODO build kline +pub fn init(rx: MarketChannel, _tx: ResponseChannel) { std::thread::spawn(move || -> anyhow::Result<()> { loop { - let crs = rx.recv()?; + let _crs = rx.recv()?; if C.dry_run.is_none() {} } }); diff --git a/engine/src/output/mod.rs b/engine/src/output/mod.rs index f425af3..c21c6a4 100644 --- a/engine/src/output/mod.rs +++ b/engine/src/output/mod.rs @@ -12,10 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::{core::*, matcher::*, orderbook::*}; -use rocksdb::{Direction, IteratorMode, WriteBatchWithTransaction}; -use rust_decimal::Decimal; -use serde::{Deserialize, Serialize}; +use crate::core::*; pub mod market; @@ -25,7 +22,7 @@ pub struct Output { pub order_id: u64, pub user_id: UserId, pub symbol: Symbol, - pub state: State, + pub state: OrderState, pub role: Role, pub ask_or_bid: AskOrBid, pub price: Price, @@ -39,41 +36,3 @@ pub struct Output { pub base_frozen: Amount, pub timestamp: u64, } - -// 24 + 32 + 4 + 4 => prefix + user_id + symbol -// pub(crate) fn id_to_key(user_id: &UserId, symbol: &Symbol) -> [u8; 64] { -// let mut key = [0xaf; 64]; -// key[24..].copy_from_slice(&user_id.0[..]); -// key[56..].copy_from_slice(&symbol.0.to_be_bytes()); -// key[60..].copy_from_slice(&symbol.1.to_be_bytes()); -// key -// } - -// pub(crate) fn lower_key() -> [u8; 64] { -// let mut key = [0xaf; 64]; -// key[24..].copy_from_slice(&[0x00; 40]); -// key -// } - -// pub(crate) fn key_to_id(key: &[u8]) -> (UserId, Symbol) { -// let mut user_id = [0u8; 32]; -// let mut base = [0u8; 4]; -// let mut quote = [0u8; 4]; -// user_id.copy_from_slice(&key[24..56]); -// base.copy_from_slice(&key[56..60]); -// quote.copy_from_slice(&key[60..]); -// ( -// B256::new(user_id), -// (u32::from_be_bytes(base), u32::from_be_bytes(quote)), -// ) -// } - -// pub(crate) fn value_to_order(value: &[u8]) -> anyhow::Result { -// let order = bincode::deserialize(value)?; -// Ok(order) -// } - -// pub(crate) fn order_to_value(order: &PendingOrder) -> anyhow::Result> { -// let b = bincode::serialize(order)?; -// Ok(b) -// } diff --git a/sidecar/Cargo.toml b/sidecar/Cargo.toml index dce4d19..6fbc4b8 100644 --- a/sidecar/Cargo.toml +++ b/sidecar/Cargo.toml @@ -23,7 +23,8 @@ toml = "0.5" tower = "0.4.13" clap = { version = "4.1.7", features = ["derive"] } parity-scale-codec = { version = "3", features = ["derive"] } -env_logger = "0.10" +env_logger = "0.10.1" +log = { version = "0.4", features = ["serde"] } x25519-dalek = "1.1.1" sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "mysql", "decimal", "chrono"] } hex = "0.4" @@ -35,10 +36,10 @@ dashmap = "5.4.0" magic-crypt = "3.1" anyhow = "1" thiserror = "1" -log = { version = "0.4", features = ["serde"] } sp-core = { git = "https://github.com/paritytech/substrate.git", branch = "polkadot-v0.9.30", package = "sp-core" } sp-io = { git = "https://github.com/paritytech/substrate.git", branch = "polkadot-v0.9.30", package = "sp-io" } galois-engine = { path = "../engine" } hex-literal = "0.4.1" + [dev-dependencies] sp-keyring = { git = "https://github.com/paritytech/substrate.git", branch = "polkadot-v0.9.30" } diff --git a/sidecar/src/context.rs b/sidecar/src/context.rs index 67891d0..6bb444b 100644 --- a/sidecar/src/context.rs +++ b/sidecar/src/context.rs @@ -70,7 +70,10 @@ impl Context { Pool::connect_with(option).await }) .unwrap(); - let subscribers = Arc::new(DashMap::default()); + let subscribers = Arc::new(DashMap::< + String, + UnboundedSender<(String, PendingOrderWrapper)>, + >::default()); let conn = backend.clone(); let markets = futures::executor::block_on(async move { conn.get_markets().await.map(|markets| { @@ -85,20 +88,26 @@ impl Context { tokio::spawn(async move { loop { let v = dispatcher.recv().await; + if v.is_none() { + continue; + } + let v = v.unwrap(); // TODO support multi-types broadcasting messages from engine - - // let r = if let Some(u) = sub.get(&user_id) { - // u.value().send((user_id.clone(), (symbol, order).into())) - // } else { - // Ok(()) - // }; - // match r { - // Err(e) => { - // log::debug!("sending order to channel error: {}", e); - // sub.remove(&user_id); - // } - // Ok(_) => {} - // } + if let Ok(o) = serde_json::from_value::(v) { + let user_id = o.user_id.to_string(); + let r = if let Some(u) = sub.get(&user_id) { + u.value().send((user_id.clone(), o.into())) + } else { + Ok(()) + }; + match r { + Err(e) => { + log::debug!("sending order to channel error: {}", e); + sub.remove(&user_id); + } + Ok(_) => {} + } + } } }); Self { @@ -296,9 +305,6 @@ where .ok_or(anyhow::anyhow!("")) .map(|v| v.to_str().map_err(|_| anyhow::anyhow!(""))) .flatten()?; - log::debug!("address: {}", ss58); - log::debug!("nonce: {}", nonce); - log::debug!("signature: {}", signature); let from_galois = conn.get_nonce(ss58).await.ok_or(anyhow::anyhow!(""))?; let nonce = nonce .parse::()