From 775305086d35989a1adb63a9246045a53bd865c6 Mon Sep 17 00:00:00 2001 From: jking-aus <72330194+jking-aus@users.noreply.github.com> Date: Mon, 14 Oct 2024 12:40:49 +1100 Subject: [PATCH] Merge QBFT to unstable (#11) Co-authored-by: Age Manning --- Cargo.lock | 139 ++++++--- Cargo.toml | 25 +- anchor/Cargo.toml | 8 +- anchor/qbft/Cargo.toml | 11 + anchor/qbft/src/config.rs | 185 ++++++++++++ anchor/qbft/src/error.rs | 19 ++ anchor/qbft/src/lib.rs | 590 ++++++++++++++++++++++++++++++++++++++ anchor/qbft/src/tests.rs | 412 ++++++++++++++++++++++++++ docs/architecture.md | 15 + 9 files changed, 1347 insertions(+), 57 deletions(-) create mode 100644 anchor/qbft/Cargo.toml create mode 100644 anchor/qbft/src/config.rs create mode 100644 anchor/qbft/src/error.rs create mode 100644 anchor/qbft/src/lib.rs create mode 100644 anchor/qbft/src/tests.rs create mode 100644 docs/architecture.md diff --git a/Cargo.lock b/Cargo.lock index 212ba2cd..d2d9c2d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -58,7 +58,7 @@ dependencies = [ "derive_arbitrary", "derive_more", "getrandom", - "hashbrown", + "hashbrown 0.14.5", "hex-literal", "indexmap", "itoa", @@ -94,7 +94,7 @@ checksum = "4d0f2d905ebd295e7effec65e5f6868d153936130ae718352771de3e7d03c75c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -331,7 +331,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -342,14 +342,14 @@ checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] name = "autocfg" -version = "1.3.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "axum" @@ -565,9 +565,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.21" +version = "1.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07b1695e2c7e8fc85310cde85aeaab7e3097f593c91d209d3f9df76c928100f0" +checksum = "812acba72f0a070b003d3697490d2b55b837230ae7c6c6497f05cc2ddbb8d938" dependencies = [ "shlex", ] @@ -580,18 +580,18 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "clap" -version = "4.5.18" +version = "4.5.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0956a43b323ac1afaffc053ed5c4b7c1f1800bacd1683c353aabbb752515dd3" +checksum = "7be5744db7978a28d9df86a214130d106a89ce49644cbc4e3f0c22c3fba30615" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.5.18" +version = "4.5.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d72166dd41634086d5803a47eb71ae740e61d84709c36f3c34110173db3961b" +checksum = "a5fbc17d3ef8278f55b282b2a2e75ae6f6c7d4bb70ed3d0382375104bfafdb4b" dependencies = [ "anstream", "anstyle", @@ -834,7 +834,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -856,7 +856,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core 0.20.10", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -888,7 +888,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -908,7 +908,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", "unicode-xid", ] @@ -1075,7 +1075,7 @@ dependencies = [ "darling 0.20.10", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -1232,7 +1232,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -1310,7 +1310,7 @@ checksum = "53010ccb100b96a67bc32c0175f0ed1426b31b655d562898e57325f81c023ac0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -1343,13 +1343,19 @@ dependencies = [ "serde", ] +[[package]] +name = "hashbrown" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" + [[package]] name = "hashlink" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown", + "hashbrown 0.14.5", ] [[package]] @@ -1430,9 +1436,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.9.4" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fcc0b4a115bf80b728eb8ea024ad5bd707b615bfed49e0665b6e0f86fd082d9" +checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" [[package]] name = "httpdate" @@ -1513,13 +1519,13 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "arbitrary", "equivalent", - "hashbrown", + "hashbrown 0.15.0", "serde", ] @@ -1679,6 +1685,15 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matchit" version = "0.7.3" @@ -1843,9 +1858,12 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.19.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "82881c4be219ab5faaf2ad5e5e5ecdff8c66bd7402ca3160975c93b24961afd1" +dependencies = [ + "portable-atomic", +] [[package]] name = "option-ext" @@ -1968,6 +1986,12 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -2035,7 +2059,7 @@ dependencies = [ "rand", "rand_chacha", "rand_xorshift", - "regex-syntax", + "regex-syntax 0.8.5", "rusty-fork", "tempfile", "unarray", @@ -2049,7 +2073,7 @@ checksum = "6ff7ff745a347b87471d859a377a9a404361e7efc2a971d73424a6d183c0fc77" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2058,6 +2082,16 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "qbft" +version = "0.1.0" +dependencies = [ + "futures", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -2141,9 +2175,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.5" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62871f2d65009c0256aed1b9cfeeb8ac272833c404e13d53d400cd0dad7a2ac0" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" dependencies = [ "bitflags 2.6.0", ] @@ -2167,8 +2201,17 @@ checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" dependencies = [ "aho-corasick", "memchr", - "regex-automata", - "regex-syntax", + "regex-automata 0.4.8", + "regex-syntax 0.8.5", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", ] [[package]] @@ -2179,9 +2222,15 @@ checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.5", ] +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + [[package]] name = "regex-syntax" version = "0.8.5" @@ -2439,7 +2488,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2702,9 +2751,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.77" +version = "2.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f35bcdf61fd8e7be6caf75f429fdca8beb3ed76584befb503b1569faee373ed" +checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" dependencies = [ "proc-macro2", "quote", @@ -2786,7 +2835,7 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2856,7 +2905,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2924,7 +2973,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -2954,10 +3003,14 @@ version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] @@ -2982,7 +3035,7 @@ dependencies = [ "darling 0.20.10", "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3375,7 +3428,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] [[package]] @@ -3395,5 +3448,5 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.77", + "syn 2.0.79", ] diff --git a/Cargo.toml b/Cargo.toml index 90b43c5a..e951c36a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,10 +1,6 @@ [workspace] # Extra tooling projects will be added. -members = [ - "anchor", - "anchor/client", - "anchor/http_api", -] +members = ["anchor", "anchor/client", "anchor/http_api", "anchor/qbft"] resolver = "2" [workspace.package] @@ -12,10 +8,13 @@ edition = "2021" [workspace.dependencies] client = { path = "anchor/client" } +qbft = { path = "anchor/qbft" } http_api = { path = "anchor/http_api" } -task_executor = { git = "https://github.com/sigp/lighthouse", branch = "anchor", default-features = false, features = ["tracing"] } -sensitive_url = { git = "https://github.com/sigp/lighthouse", branch = "anchor"} -slot_clock = { git = "https://github.com/sigp/lighthouse", branch = "anchor"} +task_executor = { git = "https://github.com/sigp/lighthouse", branch = "anchor", default-features = false, features = [ + "tracing", +] } +sensitive_url = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } +slot_clock = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } async-channel = "1.9" axum = "0.7.7" clap = "4.5.15" @@ -24,9 +23,15 @@ futures = "0.3.30" # dirs = "3" hyper = "1.4" serde = { version = "1.0.208", features = ["derive"] } -tokio = { version = "1.39.2", features = ["rt", "rt-multi-thread", "time", "signal", "macros"] } +tokio = { version = "1.39.2", features = [ + "rt", + "rt-multi-thread", + "time", + "signal", + "macros", +] } tracing = "0.1.40" -tracing-subscriber = { version = "0.3", features = ["fmt"] } +tracing-subscriber = { version = "0.3.18", features = ["fmt", "env-filter"] } [profile.maxperf] inherits = "release" diff --git a/anchor/Cargo.toml b/anchor/Cargo.toml index 1920d660..83574fc8 100644 --- a/anchor/Cargo.toml +++ b/anchor/Cargo.toml @@ -11,10 +11,10 @@ sensitive_url = { workspace = true } async-channel = { workspace = true } clap = { workspace = true } dirs = { workspace = true } -futures = { workspace = true } -serde = { workspace = true } -tokio = { workspace = true } -tracing = { workspace = true } +futures = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } client = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/anchor/qbft/Cargo.toml b/anchor/qbft/Cargo.toml new file mode 100644 index 00000000..1e5f6608 --- /dev/null +++ b/anchor/qbft/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "qbft" +version = "0.1.0" +authors = ["Sigma Prime +where + F: LeaderFunction + Clone, +{ + pub operator_id: usize, + pub instance_height: usize, + pub round: usize, + pub state: InstanceState, + pub pr: usize, + pub committee_size: usize, + pub committee_members: Vec, + pub quorum_size: usize, + pub round_time: Duration, + pub leader_fn: F, +} + +/// Generic LeaderFunction trait to allow for future implementations of the QBFT module +pub trait LeaderFunction { + /// Returns true if we are the leader + fn leader_function( + &self, + operator_id: usize, + round: usize, + instance_height: usize, + committee_size: usize, + ) -> bool; +} + +/*#[derive(Debug, Clone)] +pub enum InstanceState { + AwaitingProposal, + Propose, + Prepare, + Commit, + SentRoundChange, + Complete, +}*/ + +// TODO: Remove this allow +#[allow(dead_code)] +impl Config { + /// A unique identification number assigned to the QBFT consensus and given to all members of + /// the committee + pub fn operator_id(&self) -> usize { + self.operator_id + } + /// The committee size + pub fn committee_size(&self) -> usize { + self.committee_size + } + + pub fn commmittee_members(&self) -> Vec { + self.committee_members.clone() + } + + /// The quorum size required for the committee to reach consensus + pub fn quorum_size(&self) -> usize { + self.quorum_size + } + + /// The round number -- likely always 0 at initialisation unless we want to implement re-joining an existing + /// instance that has been dropped locally + pub fn round(&self) -> usize { + self.round + } + + /// How long the round will last + pub fn round_time(&self) -> Duration { + self.round_time + } + + /// Whether the operator is the lead of the committee for the round -- need to properly + /// implement this in a way that is deterministic based on node IDs + pub fn leader_fn(&self) -> F { + // TODO: This clone is bad, we don't want to clone but return a + // reference. When we generalise this will be changed + self.leader_fn.clone() + } +} + +impl Default for Config { + fn default() -> Self { + //use the builder to also validate defaults + ConfigBuilder::default() + .build() + .expect("Default parameters should be valid") + } +} +/// Builder struct for constructing the QBFT instance configuration +pub struct ConfigBuilder { + config: Config, +} + +impl Default for ConfigBuilder { + fn default() -> Self { + ConfigBuilder { + config: Config { + operator_id: 0, + state: InstanceState::AwaitingProposal, + instance_height: 0, + committee_size: 5, + committee_members: vec![0, 1, 2, 3, 4], + quorum_size: 4, + round: 0, + pr: 0, + round_time: Duration::new(2, 0), + leader_fn: DefaultLeaderFunction {}, + }, + } + } +} +impl From> for ConfigBuilder { + fn from(config: Config) -> Self { + ConfigBuilder { config } + } +} + +// TODO: Remove this lint later, just removes warnings for now +#[allow(dead_code)] +impl ConfigBuilder { + pub fn operator_id(&mut self, operator_id: usize) -> &mut Self { + self.config.operator_id = operator_id; + self + } + + pub fn instance_height(&mut self, instance_height: usize) -> &mut Self { + self.config.instance_height = instance_height; + self + } + + pub fn committee_size(&mut self, committee_size: usize) -> &mut Self { + self.config.committee_size = committee_size; + self + } + + pub fn quorum_size(&mut self, quorum_size: usize) -> &mut Self { + self.config.quorum_size = quorum_size; + self + } + + pub fn round(&mut self, round: usize) -> &mut Self { + self.config.round = round; + self + } + + pub fn round_time(&mut self, round_time: Duration) -> &mut Self { + self.config.round_time = round_time; + self + } + pub fn leader_fn(&mut self, leader_fn: F) -> &mut Self { + self.config.leader_fn = leader_fn; + self + } + + pub fn build(&self) -> Result, ConfigBuilderError> { + if self.config.quorum_size < 1 { + return Err(ConfigBuilderError::QuorumSizeTooSmall); + } + + Ok(self.config.clone()) + } +} + +// input parameters for leader function need to include the round and the node ID +// +/// TODO: Input will be passed to instance in config by client processor when creating new instance +#[derive(Debug, Clone)] +pub struct DefaultLeaderFunction {} + +impl LeaderFunction for DefaultLeaderFunction { + fn leader_function( + &self, + operator_id: usize, + round: usize, + instance_height: usize, + committee_size: usize, + ) -> bool { + operator_id == (round + instance_height) % committee_size + } +} diff --git a/anchor/qbft/src/error.rs b/anchor/qbft/src/error.rs new file mode 100644 index 00000000..2868a842 --- /dev/null +++ b/anchor/qbft/src/error.rs @@ -0,0 +1,19 @@ +/// Error associated with Config building. +// TODO: Remove this allow +#[derive(Debug)] +pub enum ConfigBuilderError { + /// Quorum size too small + QuorumSizeTooSmall, +} + +impl std::error::Error for ConfigBuilderError {} + +impl std::fmt::Display for ConfigBuilderError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Self::QuorumSizeTooSmall => { + write!(f, "Quorum size too small") + } + } + } +} diff --git a/anchor/qbft/src/lib.rs b/anchor/qbft/src/lib.rs new file mode 100644 index 00000000..e982a305 --- /dev/null +++ b/anchor/qbft/src/lib.rs @@ -0,0 +1,590 @@ +use config::{Config, LeaderFunction}; +use std::cmp::Eq; +use std::collections::HashMap; +use std::fmt::Debug; +use std::hash::Hash; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tracing::{debug, error, warn}; + +mod config; +mod error; + +#[cfg(test)] +mod tests; + +type Round = usize; +type OperatorId = usize; + +/// The structure that defines the Quorum Based Fault Tolerance (Qbft) instance +pub struct Qbft +where + F: LeaderFunction + Clone, + D: Debug + Clone + Eq + Hash, +{ + config: Config, + instance_height: usize, + current_round: usize, + data: Option, + /// The messages received this round that we have collected to reach quorum + prepare_messages: HashMap>>, + commit_messages: HashMap>>, + round_change_messages: HashMap>>, + // Channel that links the Qbft instance to the client processor and is where messages are sent + // to be distributed to the committee + message_out: UnboundedSender>, + // Channel that receives messages from the client processor + message_in: UnboundedReceiver>, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub enum InstanceState { + AwaitingProposal, + Propose, + Prepare, + Commit, + SentRoundChange, + Complete, +} + +/// Generic Data trait to allow for future implementations of the QBFT module +// Messages that can be received from the message_in channel +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub enum InMessage { + /// A request for data to form consensus on if we are the leader. + RecvData(RecvData), + /// A PROPOSE message to be sent on the network. + Propose(ProposeMessage), + /// A PREPARE message to be sent on the network. + Prepare(PrepareMessage), + /// A commit message to be sent on the network. + Commit(CommitMessage), + /// Round change message received from network + RoundChange(RoundChange), + /// Close instance message received from the client processor + RequestClose(CloseMessage), +} + +/// Messages that may be sent to the message_out channel from the instance to the client processor +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub enum OutMessage { + /// A request for data to form consensus on if we are the leader. + GetData(GetData), + /// A PROPOSE message to be sent on the network. + Propose(ProposeMessage), + /// A PREPARE message to be sent on the network. + Prepare(PrepareMessage), + /// A commit message to be sent on the network. + Commit(CommitMessage), + /// The round has ended, send this message to the network to inform all participants. + RoundChange(RoundChange), + /// The consensus instance has completed. + Completed(CompletedMessage), +} +/// Type definitions for the allowable messages +#[allow(dead_code)] +#[derive(Debug, Clone)] + +pub struct RoundChange { + operator_id: usize, + instance_height: usize, + round_new: usize, + pr: usize, + pv: D, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct GetData { + operator_id: usize, + instance_height: usize, + round: usize, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct RecvData { + operator_id: usize, + instance_height: usize, + round: usize, + value: D, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct ProposeMessage { + operator_id: usize, + instance_height: usize, + round: usize, + value: D, +} + +#[allow(dead_code)] +#[derive(Debug, Clone, Default)] +pub struct PrepareMessage { + operator_id: usize, + instance_height: usize, + round: usize, + value: D, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct CommitMessage { + operator_id: usize, + instance_height: usize, + round: usize, + value: D, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct CompletedMessage { + operator_id: usize, + instance_height: usize, + round: usize, + completion_status: Completed, +} + +#[derive(Debug, Clone)] +pub struct CloseMessage { + operator_id: usize, +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +/// The consensus instance has finished. +pub enum Completed { + /// The instance has timed out. + TimedOut, + /// Consensus was reached on the provided data. + Success(D), +} + +// TODO: Make a builder and validate config +// TODO: getters and setters for the config fields +// TODO: Remove this allow + +#[allow(dead_code)] +impl Qbft +where + F: LeaderFunction + Clone, + D: Debug + Clone + Eq + Hash + Hash + Eq, +{ + pub fn new( + config: Config, + ) -> ( + UnboundedSender>, + UnboundedReceiver>, + Self, + ) { + let (in_sender, message_in) = tokio::sync::mpsc::unbounded_channel(); + let (message_out, out_receiver) = tokio::sync::mpsc::unbounded_channel(); + + let estimated_map_size = config.committee_size; + let instance = Qbft { + current_round: config.round, + instance_height: config.instance_height, + config, + data: None, + prepare_messages: HashMap::with_capacity(estimated_map_size), + commit_messages: HashMap::with_capacity(estimated_map_size), + round_change_messages: HashMap::with_capacity(estimated_map_size), + message_out, + message_in, + }; + + (in_sender, out_receiver, instance) + } + + pub async fn start_instance(mut self) { + let mut round_end = tokio::time::interval(self.config.round_time); + self.start_round(); + loop { + tokio::select! { + message = self.message_in.recv() => { + match message { + // When a receive data message is received, run the + // received_data function + Some(InMessage::RecvData(received_data)) => self.received_data(received_data), + //When a Propose message is received, run the + // received_propose function + Some(InMessage::Propose(propose_message)) => self.received_propose(propose_message), + // When a Prepare message is received, run the + // received_prepare function + Some(InMessage::Prepare(received_prepare)) => self.received_prepare(received_prepare), + // When a Commit message is received, run the + // received_commit function + Some(InMessage::Commit(commit_message)) => self.received_commit(commit_message), + // When a RoundChange message is received, run the + // received_roundChange function + Some(InMessage::RoundChange(round_change_message)) => self.received_round_change(round_change_message), + // When a CloseRequest is received, close the instance + Some(InMessage::RequestClose(close_message)) => { + //stub function in case we want to do anything pre-close + self.received_request_close(); + if close_message.operator_id == self.operator_id(){ + break; + } + } + + None => { }// Channel is closed + } + + } + _ = round_end.tick() => { + + // TODO: Leaving implement + debug!("ID{}: Round {} failed, incrementing round", self.config.operator_id, self.current_round); + self.increment_round(); + if self.current_round > 2 { + self.send_completed(Completed::TimedOut); + // May not need break if can reliably close from client but keeping for now in case of bugs + break; + } + } + } + } + debug!("ID{}: Instance killed", self.config.operator_id); + } + + //Get and set functions + fn set_state(&mut self, new_state: InstanceState) { + self.config.state = new_state; + } + fn set_data(&mut self, data: D) { + self.data = Some(data); + } + fn operator_id(&self) -> usize { + self.config.operator_id + } + fn committee_members(&self) -> Vec { + self.config.committee_members.clone() + } + fn send_message(&mut self, message: OutMessage) { + let _ = self.message_out.send(message); + } + fn set_pr(&mut self, round: Round) { + self.config.pr = round; + } + fn increment_round(&mut self) { + self.current_round += 1; + self.start_round(); + } + fn store_messages(&mut self, in_message: InMessage) { + match in_message { + InMessage::RecvData(_message) => { + warn! {"ID {}: called store message on RecvData", self.operator_id()} + } + + InMessage::Propose(_message) => { + warn! {"ID {}: called store message on Propose", self.operator_id()} + } + + InMessage::RequestClose(_message) => { + warn! {"ID {}: called store message on RequestClose", self.operator_id()} + } + + InMessage::Prepare(message) => { + if self + .prepare_messages + .entry(message.round) + .or_default() + .insert(message.operator_id, message.clone()) + .is_some() + { + warn!( + "ID {}: Operator {} sent duplicate prepare", + self.operator_id(), + message.operator_id + ) + }; + } + + InMessage::Commit(message) => { + if self + .commit_messages + .entry(message.round) + .or_default() + .insert(message.operator_id, message.clone()) + .is_some() + { + warn!( + "ID {}: Operator {} sent duplicate commit", + self.operator_id(), + message.operator_id + ); + } + } + + InMessage::RoundChange(message) => { + if self + .round_change_messages + .entry(message.round_new) + .or_default() + .insert(message.operator_id, message.clone()) + .is_some() + { + warn!( + "ID {}: Operator {} sent duplicate RoundChange request", + self.operator_id(), + message.operator_id + ); + } + } + } + } + + //Validation and check functions + fn check_leader(&self, operator_id: usize) -> bool { + self.config.leader_fn.leader_function( + operator_id, + self.current_round, + self.instance_height, + self.config.committee_size, + ) + } + fn validate_data(&self, data: D) -> Option { + Some(data) + } + fn check_committee(&self, operator_id: &usize) -> bool { + self.committee_members().contains(operator_id) + } + + //Round start function + fn start_round(&mut self) { + self.set_state(InstanceState::AwaitingProposal); + debug!( + "ID{}: Round {} starting", + self.operator_id(), + self.current_round + ); + + if self.check_leader(self.operator_id()) { + debug!("ID{}: believes they are the leader", self.operator_id()); + + self.send_message(OutMessage::GetData(GetData { + operator_id: self.operator_id(), + instance_height: self.instance_height, + round: self.current_round, + })); + }; + } + + /// Received message functions + /// Received data to be sent as proposal + fn received_data(&mut self, message: RecvData) { + // Check that we are the leader to make sure this is a timely response, for whilst we are + // still the leader and that we're awaiting a proposal + if self.check_leader(self.operator_id()) + && self.check_committee(&self.operator_id()) + && matches!(self.config.state, InstanceState::AwaitingProposal) + { + if let Some(data) = self.validate_data(message.value.clone()) { + debug!( + "ID{}: received data {:?}", + self.operator_id(), + message.value.clone() + ); + self.set_data(data.clone()); + self.send_proposal(data.clone()); + self.send_prepare(data); + } else { + error!("ID{}: Received invalid data", self.operator_id()); + } + } + } + /// We have received a proposal message + fn received_propose(&mut self, propose_message: ProposeMessage) { + // Check if proposal is from the leader we expect + if self.check_leader(propose_message.operator_id) + && self.check_committee(&propose_message.operator_id) + && matches!(self.config.state, InstanceState::AwaitingProposal) + { + let self_operator_id = self.operator_id(); + debug!( + "ID {}: Proposal is from round leader with ID {}", + self_operator_id, propose_message.operator_id, + ); + // Validate the proposal with a local function that is is passed in from the config + // similar to the leaderfunction for now return bool -> true + if let Some(data) = self.validate_data(propose_message.value) { + // If of valid type, set data locally then send prepare + self.set_data(data.clone()); + self.send_prepare(data); + } + } + } + + /// We have received a prepare message + fn received_prepare(&mut self, prepare_message: PrepareMessage) { + // Check if the prepare message is from the committee and the data is valid + if self.check_committee(&prepare_message.operator_id) + && self.validate_data(prepare_message.value.clone()).is_some() + { + self.store_messages(InMessage::Prepare(prepare_message.clone())); + // If we have stored round messages + if let Some(round_messages) = self.prepare_messages.get(&prepare_message.round) { + // Check the quorum size + if round_messages.len() >= self.config.quorum_size { + let counter = round_messages.values().fold( + HashMap::<&D, usize>::new(), + |mut counter, message| { + *counter.entry(&message.value).or_default() += 1; + counter + }, + ); + if let Some((data, count)) = counter.into_iter().max_by_key(|&(_, v)| v) { + if count >= self.config.quorum_size + && matches!(self.config.state, InstanceState::Prepare) + { + self.send_commit(data.clone()); + } + } + } + } + } + } + + ///We have received a commit message + fn received_commit(&mut self, commit_message: CommitMessage) { + if self.check_committee(&commit_message.operator_id) + && self.validate_data(commit_message.value.clone()).is_some() + { + // Store the received commit message + self.store_messages(InMessage::Commit(commit_message.clone())); + if let Some(round_messages) = self.prepare_messages.get(&commit_message.round) { + // Check the quorum size + if round_messages.len() >= self.config.quorum_size { + let counter = round_messages.values().fold( + HashMap::<&D, usize>::new(), + |mut counter, message| { + *counter.entry(&message.value).or_default() += 1; + counter + }, + ); + if let Some((data, count)) = counter.into_iter().max_by_key(|&(_, v)| v) { + if count >= self.config.quorum_size + && matches!(self.config.state, InstanceState::Commit) + { + self.send_completed(Completed::Success(data.clone())); + } + } + } + } + } + } + + fn received_round_change(&mut self, round_change_message: RoundChange) { + // Store the received commit message + if self + .round_change_messages + .entry(round_change_message.round_new) + .or_default() + .insert( + round_change_message.operator_id, + round_change_message.clone(), + ) + .is_some() + { + warn!( + operator = round_change_message.operator_id, + "Operator sent round change request" + ); + } + } + + fn received_request_close(&self) { + debug!( + "ID{}: State - {:?} -- Received close request", + self.operator_id(), + self.config.state + ); + } + + //Send message functions + fn send_proposal(&mut self, data: D) { + self.send_message(OutMessage::Propose(ProposeMessage { + operator_id: self.operator_id(), + instance_height: self.instance_height, + round: self.current_round, + value: data, + })); + self.set_state(InstanceState::Propose); + debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + } + + fn send_prepare(&mut self, data: D) { + self.send_message(OutMessage::Prepare(PrepareMessage { + operator_id: self.operator_id(), + instance_height: self.instance_height, + round: self.current_round, + value: data.clone(), + })); + //And store a prepare locally + let operator_id = self.operator_id(); + self.prepare_messages + .entry(self.current_round) + .or_default() + .insert( + operator_id, + PrepareMessage { + operator_id, + instance_height: self.instance_height, + round: self.current_round, + value: data, + }, + ); + self.set_state(InstanceState::Prepare); + debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + } + + fn send_commit(&mut self, data: D) { + self.send_message(OutMessage::Commit(CommitMessage { + operator_id: self.operator_id(), + instance_height: self.instance_height, + round: self.current_round, + value: data.clone(), + })); + //And store a commit locally + let operator_id = self.operator_id(); + self.commit_messages + .entry(self.current_round) + .or_default() + .insert( + operator_id, + CommitMessage { + operator_id, + instance_height: self.instance_height, + round: self.current_round, + value: data, + }, + ); + self.set_state(InstanceState::Commit); + debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + } + + fn send_round_change(&mut self, data: D) { + self.send_message(OutMessage::RoundChange(RoundChange { + operator_id: self.operator_id(), + instance_height: self.instance_height, + round_new: self.current_round + 1, + pr: self.config.pr, + pv: data, + })); + self.set_state(InstanceState::SentRoundChange); + debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + } + fn send_completed(&mut self, completion_status: Completed) { + self.send_message(OutMessage::Completed(CompletedMessage { + operator_id: self.operator_id(), + instance_height: self.instance_height, + round: self.current_round, + completion_status, + })); + self.set_state(InstanceState::Complete); + debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + } +} diff --git a/anchor/qbft/src/tests.rs b/anchor/qbft/src/tests.rs new file mode 100644 index 00000000..069e049a --- /dev/null +++ b/anchor/qbft/src/tests.rs @@ -0,0 +1,412 @@ +//! A collection of unit tests for the QBFT Protocol. +//! +//! These test individual components and also provide full end-to-end tests of the entire protocol. + +use super::*; +use config::DefaultLeaderFunction; +use futures::stream::select_all; +use futures::StreamExt; +use std::cmp::Eq; +use std::hash::Hash; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tracing::debug; +use tracing_subscriber::filter::EnvFilter; + +// HELPER FUNCTIONS FOR TESTS + +/// Enable debug logging for tests +const ENABLE_TEST_LOGGING: bool = true; + +/// The ID for the instances. +type Id = usize; +/// A struct to help build and initialise a test of running instances +#[allow(dead_code)] +struct TestQBFTCommitteeBuilder { + /// The size of the test committee. (Default is 5). + committee_size: usize, + /// The configuration to use for all the instances. + config: Config, + /// Whether we should send back dummy validation input to each instance when it requests it. + emulate_client_processor: bool, + /// Whether to emulate a broadcast network and have all network-related messages be relayed to + /// teach instance. + emulate_broadcast_network: bool, +} + +impl Default for TestQBFTCommitteeBuilder { + fn default() -> Self { + TestQBFTCommitteeBuilder { + committee_size: 5, + config: Config::default(), + emulate_client_processor: true, + emulate_broadcast_network: true, + } + } +} + +#[allow(dead_code)] +impl TestQBFTCommitteeBuilder { + /// Sets the size of the testing committee. + pub fn committee_size(mut self, committee_size: usize) -> Self { + self.committee_size = committee_size; + self + } + + /// Set whether to emulate validation or not + pub fn emulate_validation(mut self, emulate: bool) -> Self { + self.emulate_client_processor = emulate; + self + } + /// Set whether to emulate network or not + pub fn emulate_broadcast_network(mut self, emulate: bool) -> Self { + self.emulate_broadcast_network = emulate; + self + } + + /// Sets the config for all instances to run + pub fn set_config(mut self, config: Config) -> Self { + self.config = config; + self + } + + /// Consumes self and runs a test scenario. This returns a [`TestQBFTCommittee`] which + /// represents a running quorum. + pub fn run( + self, + data: D, + ) -> TestQBFTCommittee { + if ENABLE_TEST_LOGGING { + let env_filter = EnvFilter::new("debug"); + tracing_subscriber::fmt().with_env_filter(env_filter).init(); + } + + let (senders, mut receivers) = + construct_and_run_committee(self.config, self.committee_size); + + if self.emulate_client_processor { + receivers = emulate_client_processor(receivers, senders.clone(), data); + } + + if self.emulate_broadcast_network { + receivers = emulate_broadcast_network(receivers, senders.clone()); + } + + TestQBFTCommittee { senders, receivers } + } +} + +/// A testing structure representing a committee of running instances +struct TestQBFTCommittee { + /// Channels to receive all the messages coming out of all the running qbft instances + receivers: Vec>>, + /// Channels to send messages to all the running qbft instances + senders: Vec>>, +} + +#[allow(dead_code)] +impl TestQBFTCommittee +where + D: Debug + Default + Clone + Send + Sync + 'static + Eq + Hash, +{ + /// Waits until all the instances have ended + pub async fn wait_until_end(&mut self) { + debug!("Waiting for completion"); + // Loops through and waits for messages from all channels until there is nothing left. + + // Cheeky Hack, might need to change in the future + let receivers = std::mem::take(&mut self.receivers); + + let mut all_recievers = select_all( + receivers + .into_iter() + .enumerate() + .map(|(id, receiver)| InstanceStream:: { id, receiver }), + ); + while all_recievers.next().await.is_some() {} + debug!("Completed"); + } + + /// Sends a message to an instance. Specify its index (or id) and the message you want to send. + pub fn send_message(&mut self, instance: usize, message: InMessage) { + let _ = self.senders[instance].send(message); + } +} + +// Helper type to handle Streams with instance ids. +// +// I wanted a Stream that returns the instance id as well as the message when it becomes ready. +// TODO: Can probably group this thing via a MAP in a stream function. +struct InstanceStream { + id: Id, + receiver: UnboundedReceiver>, +} + +impl futures::Stream for InstanceStream +where + D: Debug + Default + Clone + Eq + Hash, +{ + type Item = (Id, OutMessage); + + // Required method + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.receiver.poll_recv(cx) { + Poll::Ready(Some(message)) => Poll::Ready(Some((self.id, message))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +/// Constructs and runs committee of QBFT Instances +/// +/// This will create instances and spawn them in a task and return the sender/receiver channels for +/// all created instances. +#[allow(clippy::type_complexity)] +fn construct_and_run_committee( + mut config: Config, + committee_size: usize, +) -> ( + Vec>>, + Vec>>, +) { + // The ID of a committee is just an integer in [0,committee_size) + + // A collection of channels to send messages to each instance. + let mut senders = Vec::with_capacity(committee_size); + // A collection of channels to receive messages from each instances. + // We will redirect messages to each instance, simulating a broadcast network. + let mut receivers = Vec::with_capacity(committee_size); + + for id in 0..committee_size { + // Creates a new instance + // 0 config.id = 0 + config.operator_id = id; + let (sender, receiver, instance) = Qbft::new(config.clone()); + senders.push(sender); + receivers.push(receiver); + + // spawn the instance + // TODO: Make the round time adjustable, to get deterministic results for testing. + debug!(id, "Starting instance"); + tokio::spawn(instance.start_instance()); + } + + (senders, receivers) +} + +/// This will collect all the outbound messages that are destined for local not network +/// interaction. +/// +/// Specifically it handles: +/// - GetData +/// +/// It will respond to these messages back to the instance that requested them with arbitrary data. +/// In order to just respond to these messages and forward others on, this function takes ownership +/// of the receive channels and replaces them with new ones in the return value. The sending +/// channel can be cloned and put in here. +/// +/// We duplicate the messages that we consume, so the returned receive channels behave identically +/// to the ones we take ownership of. +fn emulate_client_processor( + receivers: Vec>>, + senders: Vec>>, + data: D, +) -> Vec>> { + debug!("Emulating client processor requests"); + let handle_out_messages_fn = + move |message: OutMessage, + index: usize, + senders: &mut Vec>>, + new_senders: &mut Vec>>| { + // Duplicate the message to the new channel + let _ = new_senders[index].send(message.clone()); + if let OutMessage::GetData(request) = message.clone() { + let _ = senders[index].send(InMessage::RecvData(RecvData { + operator_id: request.operator_id, + round: request.round, + instance_height: request.instance_height, + value: data.clone(), + })); + + debug!("responding to GetData") + } + if let OutMessage::Completed(completed_message) = message.clone() { + let _ = senders[index].send(InMessage::RequestClose(CloseMessage { + operator_id: completed_message.operator_id, + })); + } + }; + + // Get messages from each instance, apply the function above and return the resulting channels + generically_handle_messages(receivers, senders, handle_out_messages_fn) +} + +/// This function takes the senders and receivers and will duplicate messages from all instances +/// and send those messages to all other instances. +/// This simulates a kind of broadcast network. +/// Specifically it handles: +/// ProposeMessage +/// PrepareMessage +/// CommitMessage +/// RoundChange +/// And forwards the others untouched. +fn emulate_broadcast_network( + receivers: Vec>>, + senders: Vec>>, +) -> Vec>> { + debug!("Emulating a gossip network"); + let emulate_gossip_network_fn = + |message: OutMessage, + index: usize, + senders: &mut Vec>>, + new_senders: &mut Vec>>| { + // Duplicate the message to the new channel + let _ = new_senders[index].send(message.clone()); + + match message { + OutMessage::Propose(propose_message) => { + // Send the message to all other nodes + senders + .iter_mut() + .enumerate() + .for_each(|(current_index, sender)| { + if current_index != index { + let _ = sender.send(InMessage::Propose(propose_message.clone())); + } + }); + } + OutMessage::Prepare(prepare_message) => { + senders + .iter_mut() + .enumerate() + .for_each(|(current_index, sender)| { + if current_index != index { + let _ = sender.send(InMessage::Prepare(prepare_message.clone())); + } + }); + } + OutMessage::Commit(commit_message) => { + senders + .iter_mut() + .enumerate() + .for_each(|(current_index, sender)| { + if current_index != index { + let _ = sender.send(InMessage::Commit(commit_message.clone())); + } + }); + } + OutMessage::RoundChange(round_change) => { + senders + .iter_mut() + .enumerate() + .for_each(|(current_index, sender)| { + if current_index != index { + let _ = sender.send(InMessage::RoundChange(round_change.clone())); + } + }); + } + _ => {} // We don't interact with any of the others + }; + }; + + generically_handle_messages(receivers, senders, emulate_gossip_network_fn) +} + +/// This is a base function to prevent duplication of code. It's used by `emulate_gossip_network` +/// and `handle_all_out_messages`. It groups the logic of taking the channels, cloning them and +/// returning new channels. Leaving the logic of message handling as a parameter. +fn generically_handle_messages( + receivers: Vec>>, + mut senders: Vec>>, + // This is a function that takes the outbound message from the instances and the old inbound + // sending channel and the new inbound sending channel. Given the outbound message, we can send a + // response to the old inbound sender, and potentially duplicate the message to the new receiver + // via the second Sender. + mut message_handling: T, +) -> Vec>> +where + T: FnMut( + OutMessage, + usize, + &mut Vec>>, + &mut Vec>>, + ) + + 'static + + Send + + Sync, +{ + // Build a new set of channels to replace the ones we have taken ownership of. We will just + // forward network messages to these channels + let mut new_receivers = Vec::with_capacity(receivers.len()); + let mut new_senders = Vec::with_capacity(senders.len()); + + // Populate the new channels. + for _id in 0..receivers.len() { + let (new_sender, new_receiver) = tokio::sync::mpsc::unbounded_channel::>(); + new_receivers.push(new_receiver); + new_senders.push(new_sender); + } + + // Run a task to handle all the out messages + + tokio::spawn(async move { + // First need to group all the receive channels into a single Stream that we can await. + // We will use a FuturesUnordered which groups a collection of futures. + // We also need to know the number of which receiver sent us the message so we know + // which sender to forward to. For this reason we make a little intermediate type with the + // index. + + let mut grouped_receivers = select_all(receivers.into_iter().enumerate().map( + |(index, receiver)| InstanceStream { + id: index, + receiver, + }, + )); + + while let Some((index, out_message)) = grouped_receivers.next().await { + /*debug!( + ?out_message, + "Instance" = index, + "Handling message from instance" + );*/ + // Custom handling of the out message + message_handling(out_message, index, &mut senders, &mut new_senders); + // Add back a new future to await for the next message + } + + /* loop { + match grouped_receivers.next().await { + Some((index, out_message)) => { + debug!( + ?out_message, + "Instance" = index, + "Handling message from instance" + ); + // Custom handling of the out message + message_handling(out_message, index, &mut senders, &mut new_senders); + // Add back a new future to await for the next message + } + None => { + // At least one instance has finished. + break; + } + } + }*/ + debug!("Task shutdown"); + }); + + // Return the channels that will just handle network messages + new_receivers +} + +#[tokio::test] +async fn test_basic_committee() { + // Construct and run a test committee + let mut test_instance = TestQBFTCommitteeBuilder::default().run::(21); + + // assert_eq!(1, 2); + + // Wait until consensus is reached or all the instances have ended + test_instance.wait_until_end().await; +} diff --git a/docs/architecture.md b/docs/architecture.md new file mode 100644 index 00000000..b15cb655 --- /dev/null +++ b/docs/architecture.md @@ -0,0 +1,15 @@ +# Client Architecture + +# Terminology + +- Consensus Group - Committee +- Individual SSV Node Instance - Operator +- Validator/SSV validator - Ethereum validator representing 32 ETH + +An operator exists within a committee. A committee is formed from a group of +operators. + +The operators hold a share of the validator - the share is broadcast by the +smart contract. + +