From b10800348b8a4d4d39f9b6d6eaf48615579d2cf0 Mon Sep 17 00:00:00 2001 From: jking-aus <72330194+jking-aus@users.noreply.github.com> Date: Mon, 21 Oct 2024 12:15:28 +1100 Subject: [PATCH] Qbft testing (#12) Co-authored-by: Age Manning --- Cargo.lock | 17 + Cargo.toml | 1 + anchor/qbft/Cargo.toml | 1 + anchor/qbft/src/config.rs | 96 +--- anchor/qbft/src/lib.rs | 916 ++++++++++++++++++---------------- anchor/qbft/src/tests.rs | 248 ++++----- anchor/qbft/src/types.rs | 136 +++++ anchor/qbft/src/validation.rs | 32 ++ anchor/src/main.rs | 1 - 9 files changed, 803 insertions(+), 645 deletions(-) create mode 100644 anchor/qbft/src/types.rs create mode 100644 anchor/qbft/src/validation.rs diff --git a/Cargo.lock b/Cargo.lock index d2d9c2d6..3bf0bbbd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -675,6 +675,15 @@ version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2459377285ad874054d797f3ccebf984978aa39129f6eafde5cdc8315b612f8" +[[package]] +name = "convert_case" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "cpufeatures" version = "0.2.14" @@ -906,6 +915,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ + "convert_case", "proc-macro2", "quote", "syn 2.0.79", @@ -2086,6 +2096,7 @@ checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" name = "qbft" version = "0.1.0" dependencies = [ + "derive_more", "futures", "tokio", "tracing", @@ -3148,6 +3159,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + [[package]] name = "unicode-xid" version = "0.2.6" diff --git a/Cargo.toml b/Cargo.toml index e951c36a..c650c268 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ task_executor = { git = "https://github.com/sigp/lighthouse", branch = "anchor", ] } sensitive_url = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } slot_clock = { git = "https://github.com/sigp/lighthouse", branch = "anchor" } +derive_more = { version = "1.0.0", features = ["full"] } async-channel = "1.9" axum = "0.7.7" clap = "4.5.15" diff --git a/anchor/qbft/Cargo.toml b/anchor/qbft/Cargo.toml index 1e5f6608..c6f032d2 100644 --- a/anchor/qbft/Cargo.toml +++ b/anchor/qbft/Cargo.toml @@ -9,3 +9,4 @@ tokio = { workspace = true, features = ["sync"] } tracing = { workspace = true } futures = { workspace = true } tracing-subscriber = { workspace = true } +derive_more = { workspace = true } diff --git a/anchor/qbft/src/config.rs b/anchor/qbft/src/config.rs index 5111672b..6d70e9ab 100644 --- a/anchor/qbft/src/config.rs +++ b/anchor/qbft/src/config.rs @@ -1,5 +1,7 @@ use super::error::ConfigBuilderError; -use crate::InstanceState; +use crate::types::{DefaultLeaderFunction, InstanceHeight, LeaderFunction, OperatorId, Round}; +use std::collections::HashSet; +use std::fmt::Debug; use std::time::Duration; #[derive(Clone, Debug)] @@ -7,46 +9,22 @@ pub struct Config where F: LeaderFunction + Clone, { - pub operator_id: usize, - pub instance_height: usize, - pub round: usize, - pub state: InstanceState, + pub operator_id: OperatorId, + pub instance_height: InstanceHeight, + pub round: Round, pub pr: usize, pub committee_size: usize, - pub committee_members: Vec, + pub committee_members: HashSet, pub quorum_size: usize, pub round_time: Duration, + pub max_rounds: usize, pub leader_fn: F, } -/// Generic LeaderFunction trait to allow for future implementations of the QBFT module -pub trait LeaderFunction { - /// Returns true if we are the leader - fn leader_function( - &self, - operator_id: usize, - round: usize, - instance_height: usize, - committee_size: usize, - ) -> bool; -} - -/*#[derive(Debug, Clone)] -pub enum InstanceState { - AwaitingProposal, - Propose, - Prepare, - Commit, - SentRoundChange, - Complete, -}*/ - -// TODO: Remove this allow -#[allow(dead_code)] impl Config { /// A unique identification number assigned to the QBFT consensus and given to all members of /// the committee - pub fn operator_id(&self) -> usize { + pub fn operator_id(&self) -> OperatorId { self.operator_id } /// The committee size @@ -54,7 +32,7 @@ impl Config { self.committee_size } - pub fn commmittee_members(&self) -> Vec { + pub fn commmittee_members(&self) -> HashSet { self.committee_members.clone() } @@ -65,7 +43,7 @@ impl Config { /// The round number -- likely always 0 at initialisation unless we want to implement re-joining an existing /// instance that has been dropped locally - pub fn round(&self) -> usize { + pub fn round(&self) -> Round { self.round } @@ -74,19 +52,20 @@ impl Config { self.round_time } + pub fn max_rounds(&self) -> usize { + self.max_rounds + } + /// Whether the operator is the lead of the committee for the round -- need to properly /// implement this in a way that is deterministic based on node IDs - pub fn leader_fn(&self) -> F { - // TODO: This clone is bad, we don't want to clone but return a - // reference. When we generalise this will be changed - self.leader_fn.clone() + pub fn leader_fn(&self) -> &F { + &self.leader_fn } } - impl Default for Config { fn default() -> Self { //use the builder to also validate defaults - ConfigBuilder::default() + ConfigBuilder::::default() .build() .expect("Default parameters should be valid") } @@ -100,15 +79,15 @@ impl Default for ConfigBuilder { fn default() -> Self { ConfigBuilder { config: Config { - operator_id: 0, - state: InstanceState::AwaitingProposal, - instance_height: 0, - committee_size: 5, - committee_members: vec![0, 1, 2, 3, 4], + operator_id: OperatorId::default(), + instance_height: InstanceHeight::default(), + committee_size: 0, + committee_members: HashSet::new(), quorum_size: 4, - round: 0, + round: Round::default(), pr: 0, round_time: Duration::new(2, 0), + max_rounds: 4, leader_fn: DefaultLeaderFunction {}, }, } @@ -120,15 +99,13 @@ impl From> for ConfigBuilder { } } -// TODO: Remove this lint later, just removes warnings for now -#[allow(dead_code)] impl ConfigBuilder { - pub fn operator_id(&mut self, operator_id: usize) -> &mut Self { + pub fn operator_id(&mut self, operator_id: OperatorId) -> &mut Self { self.config.operator_id = operator_id; self } - pub fn instance_height(&mut self, instance_height: usize) -> &mut Self { + pub fn instance_height(&mut self, instance_height: InstanceHeight) -> &mut Self { self.config.instance_height = instance_height; self } @@ -143,11 +120,10 @@ impl ConfigBuilder { self } - pub fn round(&mut self, round: usize) -> &mut Self { + pub fn round(&mut self, round: Round) -> &mut Self { self.config.round = round; self } - pub fn round_time(&mut self, round_time: Duration) -> &mut Self { self.config.round_time = round_time; self @@ -165,21 +141,3 @@ impl ConfigBuilder { Ok(self.config.clone()) } } - -// input parameters for leader function need to include the round and the node ID -// -/// TODO: Input will be passed to instance in config by client processor when creating new instance -#[derive(Debug, Clone)] -pub struct DefaultLeaderFunction {} - -impl LeaderFunction for DefaultLeaderFunction { - fn leader_function( - &self, - operator_id: usize, - round: usize, - instance_height: usize, - committee_size: usize, - ) -> bool { - operator_id == (round + instance_height) % committee_size - } -} diff --git a/anchor/qbft/src/lib.rs b/anchor/qbft/src/lib.rs index e982a305..d51e2cfd 100644 --- a/anchor/qbft/src/lib.rs +++ b/anchor/qbft/src/lib.rs @@ -1,180 +1,71 @@ -use config::{Config, LeaderFunction}; +pub use config::{Config, ConfigBuilder}; use std::cmp::Eq; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::hash::Hash; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tracing::{debug, error, warn}; +use tracing::{debug, instrument, warn, Level}; +pub use validation::{validate_consensus_data, ValidatedData, ValidationError}; + +pub use types::{ + Completed, ConsensusData, InMessage, InstanceHeight, InstanceState, LeaderFunction, OperatorId, + OutMessage, Round, +}; mod config; mod error; +mod types; +mod validation; #[cfg(test)] mod tests; -type Round = usize; -type OperatorId = usize; +type RoundChangeMap = HashMap>>>; -/// The structure that defines the Quorum Based Fault Tolerance (Qbft) instance +/// The structure that defines the Quorum Based Fault Tolerance (QBFT) instance. +/// +/// This builds and runs an entire QBFT process until it completes. It can complete either +/// successfully (i.e that it has successfully come to consensus, or through a timeout where enough +/// round changes have elapsed before coming to consensus. pub struct Qbft where F: LeaderFunction + Clone, D: Debug + Clone + Eq + Hash, { + /// The initial configuration used to establish this instance of QBFT. config: Config, - instance_height: usize, - current_round: usize, - data: Option, - /// The messages received this round that we have collected to reach quorum - prepare_messages: HashMap>>, - commit_messages: HashMap>>, - round_change_messages: HashMap>>, - // Channel that links the Qbft instance to the client processor and is where messages are sent + /// Initial data that we will propose if we are the leader. + start_data: ValidatedData, + /// The instance height acts as an ID for the current instance and helps distinguish it from + /// other instances. + instance_height: InstanceHeight, + /// The current round this instance state is in.a + current_round: Round, + /// If we have come to consensus in a previous round this is set here. + past_consensus: HashMap>, + /// The messages received this round that we have collected to reach quorum. + prepare_messages: HashMap, HashSet>>, + commit_messages: HashMap, HashSet>>, + /// Stores the round change messages. The second hashmap stores optional past consensus + /// data for each round change message. + round_change_messages: HashMap>, + // Channel that links the QBFT instance to the client processor and is where messages are sent // to be distributed to the committee message_out: UnboundedSender>, // Channel that receives messages from the client processor message_in: UnboundedReceiver>, + /// The current state of the instance + state: InstanceState, } -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub enum InstanceState { - AwaitingProposal, - Propose, - Prepare, - Commit, - SentRoundChange, - Complete, -} - -/// Generic Data trait to allow for future implementations of the QBFT module -// Messages that can be received from the message_in channel -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub enum InMessage { - /// A request for data to form consensus on if we are the leader. - RecvData(RecvData), - /// A PROPOSE message to be sent on the network. - Propose(ProposeMessage), - /// A PREPARE message to be sent on the network. - Prepare(PrepareMessage), - /// A commit message to be sent on the network. - Commit(CommitMessage), - /// Round change message received from network - RoundChange(RoundChange), - /// Close instance message received from the client processor - RequestClose(CloseMessage), -} - -/// Messages that may be sent to the message_out channel from the instance to the client processor -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub enum OutMessage { - /// A request for data to form consensus on if we are the leader. - GetData(GetData), - /// A PROPOSE message to be sent on the network. - Propose(ProposeMessage), - /// A PREPARE message to be sent on the network. - Prepare(PrepareMessage), - /// A commit message to be sent on the network. - Commit(CommitMessage), - /// The round has ended, send this message to the network to inform all participants. - RoundChange(RoundChange), - /// The consensus instance has completed. - Completed(CompletedMessage), -} -/// Type definitions for the allowable messages -#[allow(dead_code)] -#[derive(Debug, Clone)] - -pub struct RoundChange { - operator_id: usize, - instance_height: usize, - round_new: usize, - pr: usize, - pv: D, -} - -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub struct GetData { - operator_id: usize, - instance_height: usize, - round: usize, -} - -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub struct RecvData { - operator_id: usize, - instance_height: usize, - round: usize, - value: D, -} - -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub struct ProposeMessage { - operator_id: usize, - instance_height: usize, - round: usize, - value: D, -} - -#[allow(dead_code)] -#[derive(Debug, Clone, Default)] -pub struct PrepareMessage { - operator_id: usize, - instance_height: usize, - round: usize, - value: D, -} - -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub struct CommitMessage { - operator_id: usize, - instance_height: usize, - round: usize, - value: D, -} - -#[allow(dead_code)] -#[derive(Debug, Clone)] -pub struct CompletedMessage { - operator_id: usize, - instance_height: usize, - round: usize, - completion_status: Completed, -} - -#[derive(Debug, Clone)] -pub struct CloseMessage { - operator_id: usize, -} - -#[allow(dead_code)] -#[derive(Debug, Clone)] -/// The consensus instance has finished. -pub enum Completed { - /// The instance has timed out. - TimedOut, - /// Consensus was reached on the provided data. - Success(D), -} - -// TODO: Make a builder and validate config -// TODO: getters and setters for the config fields -// TODO: Remove this allow - -#[allow(dead_code)] impl Qbft where F: LeaderFunction + Clone, - D: Debug + Clone + Eq + Hash + Hash + Eq, + D: Debug + Clone + Hash + Eq, { pub fn new( config: Config, + start_data: ValidatedData, ) -> ( UnboundedSender>, UnboundedReceiver>, @@ -184,161 +75,115 @@ where let (message_out, out_receiver) = tokio::sync::mpsc::unbounded_channel(); let estimated_map_size = config.committee_size; + let instance = Qbft { current_round: config.round, instance_height: config.instance_height, config, - data: None, + start_data, + past_consensus: HashMap::with_capacity(2), prepare_messages: HashMap::with_capacity(estimated_map_size), commit_messages: HashMap::with_capacity(estimated_map_size), round_change_messages: HashMap::with_capacity(estimated_map_size), message_out, message_in, + state: InstanceState::AwaitingProposal, }; (in_sender, out_receiver, instance) } + // This adds the fields to all our logs for this instance. + #[instrument(name = "QBFT",skip_all, fields(operator_id=*self.config.operator_id,instance_height=*self.config.instance_height), level= Level::ERROR)] pub async fn start_instance(mut self) { let mut round_end = tokio::time::interval(self.config.round_time); self.start_round(); loop { + // If we reached a critical error, end gracefully + if matches!(self.state, InstanceState::Complete) { + return; + } + tokio::select! { message = self.message_in.recv() => { match message { - // When a receive data message is received, run the - // received_data function - Some(InMessage::RecvData(received_data)) => self.received_data(received_data), - //When a Propose message is received, run the + // When a Propose message is received, run the // received_propose function - Some(InMessage::Propose(propose_message)) => self.received_propose(propose_message), + Some(InMessage::Propose(operator_id, consensus_data)) => self.received_propose(operator_id, consensus_data), // When a Prepare message is received, run the // received_prepare function - Some(InMessage::Prepare(received_prepare)) => self.received_prepare(received_prepare), + Some(InMessage::Prepare(operator_id, consensus_data)) => self.received_prepare(operator_id, consensus_data), // When a Commit message is received, run the // received_commit function - Some(InMessage::Commit(commit_message)) => self.received_commit(commit_message), - // When a RoundChange message is received, run the - // received_roundChange function - Some(InMessage::RoundChange(round_change_message)) => self.received_round_change(round_change_message), + Some(InMessage::Commit(operator_id, consensus_data)) => self.received_commit(operator_id, consensus_data), + // When a RoundChange message is received, run the received_roundChange function + Some(InMessage::RoundChange(operator_id, round, maybe_past_consensus_data)) => self.received_round_change(operator_id, round, maybe_past_consensus_data), // When a CloseRequest is received, close the instance - Some(InMessage::RequestClose(close_message)) => { - //stub function in case we want to do anything pre-close - self.received_request_close(); - if close_message.operator_id == self.operator_id(){ - break; - } - } - - None => { }// Channel is closed + None => { } // Channel is closed } } _ = round_end.tick() => { - // TODO: Leaving implement - debug!("ID{}: Round {} failed, incrementing round", self.config.operator_id, self.current_round); - self.increment_round(); - if self.current_round > 2 { + debug!(round = *self.current_round,"Incrementing round"); + if *self.current_round > self.config.max_rounds() { self.send_completed(Completed::TimedOut); - // May not need break if can reliably close from client but keeping for now in case of bugs break; - } + } + self.send_round_change(self.current_round.next()); + // Start a new round + self.set_round(self.current_round.next()); } } } - debug!("ID{}: Instance killed", self.config.operator_id); + debug!("Instance killed"); } - //Get and set functions - fn set_state(&mut self, new_state: InstanceState) { - self.config.state = new_state; - } - fn set_data(&mut self, data: D) { - self.data = Some(data); - } - fn operator_id(&self) -> usize { + /// Returns the operator id for this instance. + fn operator_id(&self) -> OperatorId { self.config.operator_id } - fn committee_members(&self) -> Vec { - self.config.committee_members.clone() + + /// Obtains the maximum number of faulty nodes that this consensus can tolerate + fn get_f(&self) -> usize { + let f = (self.config.committee_size - 1) % 3; + if f > 0 { + f + } else { + 1 + } } + + /// Sends an outbound message fn send_message(&mut self, message: OutMessage) { - let _ = self.message_out.send(message); - } - fn set_pr(&mut self, round: Round) { - self.config.pr = round; - } - fn increment_round(&mut self) { - self.current_round += 1; - self.start_round(); + if self.message_out.send(message).is_err() { + // The outbound channel has been closed. This instance can no longer progress. We + // should terminate the current running instance + warn!( + instance_height = *self.config.instance_height, + "Receiver channel closed. Terminating" + ); + self.state = InstanceState::Complete + } } - fn store_messages(&mut self, in_message: InMessage) { - match in_message { - InMessage::RecvData(_message) => { - warn! {"ID {}: called store message on RecvData", self.operator_id()} - } - - InMessage::Propose(_message) => { - warn! {"ID {}: called store message on Propose", self.operator_id()} - } - - InMessage::RequestClose(_message) => { - warn! {"ID {}: called store message on RequestClose", self.operator_id()} - } - InMessage::Prepare(message) => { - if self - .prepare_messages - .entry(message.round) - .or_default() - .insert(message.operator_id, message.clone()) - .is_some() - { - warn!( - "ID {}: Operator {} sent duplicate prepare", - self.operator_id(), - message.operator_id - ) - }; - } - - InMessage::Commit(message) => { - if self - .commit_messages - .entry(message.round) - .or_default() - .insert(message.operator_id, message.clone()) - .is_some() - { - warn!( - "ID {}: Operator {} sent duplicate commit", - self.operator_id(), - message.operator_id - ); - } - } - - InMessage::RoundChange(message) => { - if self - .round_change_messages - .entry(message.round_new) - .or_default() - .insert(message.operator_id, message.clone()) - .is_some() - { - warn!( - "ID {}: Operator {} sent duplicate RoundChange request", - self.operator_id(), - message.operator_id - ); - } - } + /// Once we have achieved consensus on a PREPARE round, we add the data to mapping to match + /// against later. + fn insert_consensus(&mut self, round: Round, data: ValidatedData) { + debug!(round = *round, ?data, "Reached prepare consensus"); + if let Some(past_data) = self.past_consensus.insert(round, data.clone()) { + warn!(round = *round, ?data, past_data = ?past_data, "Adding duplicate consensus data"); } } - //Validation and check functions - fn check_leader(&self, operator_id: usize) -> bool { + /// Shifts this instance into a new round> + fn set_round(&mut self, new_round: Round) { + self.current_round.set(new_round); + self.start_round(); + } + + // Validation and check functions. + fn check_leader(&self, operator_id: &OperatorId) -> bool { self.config.leader_fn.leader_function( operator_id, self.current_round, @@ -346,245 +191,448 @@ where self.config.committee_size, ) } - fn validate_data(&self, data: D) -> Option { - Some(data) + + /// Checks to make sure any given operator is in this instance's comittee. + fn check_committee(&self, operator_id: &OperatorId) -> bool { + self.config.committee_members.contains(operator_id) } - fn check_committee(&self, operator_id: &usize) -> bool { - self.committee_members().contains(operator_id) + + /// Justify the round change quorum + /// In order to justify a round change quorum, we find the maximum round of the quorum set that + /// had achieved a past consensus. If we have also seen consensus on this round for the + /// suggested data, then it is justified and this function returns that data. + /// If there is no past consensus data in the round change quorum or we disagree with quorum set + /// this function will return None, and we obtain the data as if we were beginning this + /// instance. + fn justify_round_change_quorum(&self) -> Option<&ValidatedData> { + // If we have messages for the current round + if let Some(new_round_messages) = self.round_change_messages.get(&self.current_round) { + // If we have a quorum + if new_round_messages.len() >= self.config.quorum_size { + // Find the maximum round,value pair + let max_consensus_data = new_round_messages + .values() + .max_by_key(|maybe_past_consensus_data| { + maybe_past_consensus_data + .as_ref() + .map(|consensus_data| *consensus_data.round) + .unwrap_or(0) + })? + .clone()?; + + // We a maximum, check to make sure we have seen quorum on this + let past_data = self.past_consensus.get(&max_consensus_data.round)?; + if *past_data == max_consensus_data.data { + return Some(past_data); + } + } + } + None } - //Round start function + // Handles the beginning of a round. fn start_round(&mut self) { - self.set_state(InstanceState::AwaitingProposal); - debug!( - "ID{}: Round {} starting", - self.operator_id(), - self.current_round - ); - - if self.check_leader(self.operator_id()) { - debug!("ID{}: believes they are the leader", self.operator_id()); - - self.send_message(OutMessage::GetData(GetData { - operator_id: self.operator_id(), - instance_height: self.instance_height, - round: self.current_round, - })); - }; - } + debug!(round = *self.current_round, "Starting new round",); - /// Received message functions - /// Received data to be sent as proposal - fn received_data(&mut self, message: RecvData) { - // Check that we are the leader to make sure this is a timely response, for whilst we are - // still the leader and that we're awaiting a proposal - if self.check_leader(self.operator_id()) - && self.check_committee(&self.operator_id()) - && matches!(self.config.state, InstanceState::AwaitingProposal) - { - if let Some(data) = self.validate_data(message.value.clone()) { + // Remove round change messages that would be for previous rounds + self.round_change_messages + .retain(|&round, _value| round >= self.current_round); + + // Initialise the instance state for the round + self.state = InstanceState::AwaitingProposal; + + // Check if we are the leader + if self.check_leader(&self.operator_id()) { + // We are the leader + debug!("Current leader"); + // Check justification of round change quorum + if let Some(validated_data) = self.justify_round_change_quorum().cloned() { debug!( - "ID{}: received data {:?}", - self.operator_id(), - message.value.clone() - ); - self.set_data(data.clone()); - self.send_proposal(data.clone()); - self.send_prepare(data); + old_data = ?validated_data, + "Using consensus data from a previous round"); + self.send_proposal(validated_data.clone()); + self.send_prepare(validated_data); } else { - error!("ID{}: Received invalid data", self.operator_id()); + debug!("Using initialised data"); + self.send_proposal(self.start_data.clone()); + self.send_prepare(self.start_data.clone()); } } } + /// We have received a proposal message - fn received_propose(&mut self, propose_message: ProposeMessage) { + fn received_propose(&mut self, operator_id: OperatorId, consensus_data: ConsensusData) { // Check if proposal is from the leader we expect - if self.check_leader(propose_message.operator_id) - && self.check_committee(&propose_message.operator_id) - && matches!(self.config.state, InstanceState::AwaitingProposal) - { - let self_operator_id = self.operator_id(); - debug!( - "ID {}: Proposal is from round leader with ID {}", - self_operator_id, propose_message.operator_id, + if !(self.check_leader(&operator_id)) { + warn!(from = *operator_id, "PROPOSE message from non-leader"); + return; + } + // Check that this operator is in our committee + if !self.check_committee(&operator_id) { + warn!( + from = *operator_id, + "PROPOSE message from non-committee operator" ); - // Validate the proposal with a local function that is is passed in from the config - // similar to the leaderfunction for now return bool -> true - if let Some(data) = self.validate_data(propose_message.value) { - // If of valid type, set data locally then send prepare - self.set_data(data.clone()); - self.send_prepare(data); + return; + } + + // Check that we are awaiting a proposal + if !matches!(self.state, InstanceState::AwaitingProposal) { + warn!(from=*operator_id, ?self.state, "PROPOSE message while in invalid state"); + return; + } + // Ensure that this message is for the correct round + if !(self.current_round == consensus_data.round) { + warn!( + from = *operator_id, + current_round = *self.current_round, + propose_round = *consensus_data.round, + "PROPOSE message received for the wrong round" + ); + return; + } + + // Validate the data + let Ok(consensus_data) = validate_consensus_data(consensus_data) else { + warn!( + from = *operator_id, + current_round = *self.current_round, + "PROPOSE message is invalid" + ); + return; + }; + + debug!(from = *operator_id, "PROPOSE received"); + + // Justify the proposal by checking the round changes + if let Some(justified_data) = self.justify_round_change_quorum() { + if *justified_data != consensus_data.data { + // The data doesn't match the justified value we expect. Drop the message + warn!( + from = *operator_id, + ?consensus_data, + ?justified_data, + "PROPOSE message isn't justified" + ); + return; } + self.send_prepare(consensus_data.data); + } else { + // We have no previous consensus data + // If of valid type, set data locally then send prepare + self.send_prepare(consensus_data.data); } } /// We have received a prepare message - fn received_prepare(&mut self, prepare_message: PrepareMessage) { - // Check if the prepare message is from the committee and the data is valid - if self.check_committee(&prepare_message.operator_id) - && self.validate_data(prepare_message.value.clone()).is_some() + fn received_prepare(&mut self, operator_id: OperatorId, consensus_data: ConsensusData) { + // Check that this operator is in our committee + if !self.check_committee(&operator_id) { + warn!( + from = *operator_id, + "PREPARE message from non-committee operator" + ); + return; + } + + // Check that we are in the correct state + if (self.state as u8) >= (InstanceState::SentRoundChange as u8) { + warn!(from=*operator_id, ?self.state, "PREPARE message while in invalid state"); + return; + } + + // Ensure that this message is for the correct round + if !(self.current_round == consensus_data.round) { + warn!( + from = *operator_id, + current_round = *self.current_round, + propose_round = *consensus_data.round, + "PREPARE message received for the wrong round" + ); + return; + } + + // Validate the data + let Ok(consensus_data) = validate_consensus_data(consensus_data) else { + warn!( + from = *operator_id, + current_round = *self.current_round, + "PREPARE message is invalid" + ); + return; + }; + + debug!(from = *operator_id, "PREPARE received"); + + // Store the prepare message + if !self + .prepare_messages + .entry(consensus_data.round) + .or_default() + .entry(consensus_data.data) + .or_default() + .insert(operator_id) { - self.store_messages(InMessage::Prepare(prepare_message.clone())); - // If we have stored round messages - if let Some(round_messages) = self.prepare_messages.get(&prepare_message.round) { - // Check the quorum size - if round_messages.len() >= self.config.quorum_size { - let counter = round_messages.values().fold( - HashMap::<&D, usize>::new(), - |mut counter, message| { - *counter.entry(&message.value).or_default() += 1; - counter - }, - ); - if let Some((data, count)) = counter.into_iter().max_by_key(|&(_, v)| v) { - if count >= self.config.quorum_size - && matches!(self.config.state, InstanceState::Prepare) - { - self.send_commit(data.clone()); - } - } + warn!(from = *operator_id, "PREPARE message is a duplicate") + }; + + // Check if we have reached quorum, if so send commit messages and store the fact that we + // have reached consensus on this quorum. + let mut update_data = None; + if let Some(prepare_messages) = self.prepare_messages.get(&self.current_round) { + // Check the quorum size + if let Some((data, operators)) = prepare_messages + .iter() + .max_by_key(|(_data, operators)| operators.len()) + { + if operators.len() >= self.config.quorum_size + && matches!(self.state, InstanceState::Prepare) + { + // We reached quorum on this data + update_data = Some(data.clone()); } } } + + // Send the data + if let Some(data) = update_data { + self.send_commit(data.clone()); + self.insert_consensus(self.current_round, data.clone()); + } } ///We have received a commit message - fn received_commit(&mut self, commit_message: CommitMessage) { - if self.check_committee(&commit_message.operator_id) - && self.validate_data(commit_message.value.clone()).is_some() + fn received_commit(&mut self, operator_id: OperatorId, consensus_data: ConsensusData) { + // Check that this operator is in our committee + if !self.check_committee(&operator_id) { + warn!( + from = *operator_id, + "COMMIT message from non-committee operator" + ); + return; + } + + // Check that we are awaiting a proposal + if (self.state as u8) >= (InstanceState::SentRoundChange as u8) { + warn!(from=*operator_id, ?self.state, "COMMIT message while in invalid state"); + return; + } + + // Ensure that this message is for the correct round + if !(self.current_round == consensus_data.round) { + warn!( + from = *operator_id, + current_round = *self.current_round, + propose_round = *consensus_data.round, + "COMMIT message received for the wrong round" + ); + return; + } + + // Validate the data + let Ok(consensus_data) = validate_consensus_data(consensus_data) else { + warn!( + from = *operator_id, + current_round = *self.current_round, + "COMMIT message is invalid" + ); + return; + }; + + debug!(from = *operator_id, "COMMIT received"); + + // Store the received commit message + if !self + .commit_messages + .entry(self.current_round) + .or_default() + .entry(consensus_data.data) + .or_default() + .insert(operator_id) { - // Store the received commit message - self.store_messages(InMessage::Commit(commit_message.clone())); - if let Some(round_messages) = self.prepare_messages.get(&commit_message.round) { - // Check the quorum size - if round_messages.len() >= self.config.quorum_size { - let counter = round_messages.values().fold( - HashMap::<&D, usize>::new(), - |mut counter, message| { - *counter.entry(&message.value).or_default() += 1; - counter - }, - ); - if let Some((data, count)) = counter.into_iter().max_by_key(|&(_, v)| v) { - if count >= self.config.quorum_size - && matches!(self.config.state, InstanceState::Commit) - { - self.send_completed(Completed::Success(data.clone())); - } - } + warn!(from = *operator_id, "Received duplicate commit"); + } + + // Check if we have reached quorum + if let Some(commit_messages) = self.commit_messages.get(&self.current_round) { + // Check the quorum size + if let Some((data, operators)) = commit_messages + .iter() + .max_by_key(|(_data, operators)| operators.len()) + { + if operators.len() >= self.config.quorum_size + && matches!(self.state, InstanceState::Commit) + { + self.send_completed(Completed::Success(data.data.clone())); + self.state = InstanceState::Complete; } } } } - fn received_round_change(&mut self, round_change_message: RoundChange) { - // Store the received commit message + /// We have received a round change message. + fn received_round_change( + &mut self, + operator_id: OperatorId, + round: Round, + maybe_past_consensus_data: Option>, + ) { + // Check that this operator is in our committee + if !self.check_committee(&operator_id) { + warn!( + from = *operator_id, + "ROUNDCHANGE message from non-committee operator" + ); + return; + } + + // Check that we are awaiting a proposal + // NOTE: THis is not necessary, but putting it here as these functions can be grouped for + // later + if (self.state as u8) >= (InstanceState::Complete as u8) { + warn!(from=*operator_id, ?self.state, "ROUNDCHANGE message while in invalid state"); + return; + } + + // Ensure that this message is for the correct round + if round < self.current_round || *round > self.config.max_rounds { + warn!( + from = *operator_id, + current_round = *self.current_round, + propose_round = *round, + max_rounds = self.config.max_rounds, + "ROUNDCHANGE message received for the wrong round" + ); + return; + } + + // Validate the data, if it exists + let maybe_past_consensus_data = match maybe_past_consensus_data { + Some(consensus_data) => { + let Ok(consensus_data) = validate_consensus_data(consensus_data) else { + warn!( + from = *operator_id, + current_round = *self.current_round, + "ROUNDCHANGE message is invalid" + ); + return; + }; + Some(consensus_data) + } + None => None, + }; + + debug!(from = *operator_id, "ROUNDCHANGE received"); + + // Store the round change message, for the round the message references if self .round_change_messages - .entry(round_change_message.round_new) + .entry(round) .or_default() - .insert( - round_change_message.operator_id, - round_change_message.clone(), - ) + .insert(operator_id, maybe_past_consensus_data.clone()) .is_some() { - warn!( - operator = round_change_message.operator_id, - "Operator sent round change request" - ); + warn!(from = *operator_id, "ROUNDCHANGE duplicate request",); } - } - fn received_request_close(&self) { - debug!( - "ID{}: State - {:?} -- Received close request", - self.operator_id(), - self.config.state - ); + // There are two cases to check here + // 1. If we receive f+1 round change messages, we need to send our own round-change message + // 2. If we have received a quorum of round change messages, we need to start a new round + + // Check if we have any messages for the suggested round + if let Some(new_round_messages) = self.round_change_messages.get(&round) { + // Check the quorum size + if new_round_messages.len() >= self.config.quorum_size + && matches!(self.state, InstanceState::SentRoundChange) + { + // 1. If we have reached a quorum for this round, advance to that round. + debug!(operator_id = ?self.operator_id(), round = *round, "Round change quorum reached"); + self.set_round(round); + } else if new_round_messages.len() > self.get_f() + && !(matches!(self.state, InstanceState::SentRoundChange)) + { + // 2. We have seen 2f + 1 messtages for this round. + self.send_round_change(round); + } + } } - //Send message functions - fn send_proposal(&mut self, data: D) { - self.send_message(OutMessage::Propose(ProposeMessage { - operator_id: self.operator_id(), - instance_height: self.instance_height, + // Send message functions + fn send_proposal(&mut self, data: ValidatedData) { + self.send_message(OutMessage::Propose(ConsensusData { round: self.current_round, - value: data, + data: data.data, })); - self.set_state(InstanceState::Propose); - debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + self.state = InstanceState::Prepare; + debug!(?self.state, "State Changed"); } - fn send_prepare(&mut self, data: D) { - self.send_message(OutMessage::Prepare(PrepareMessage { - operator_id: self.operator_id(), - instance_height: self.instance_height, + fn send_prepare(&mut self, data: ValidatedData) { + let consensus_data = ConsensusData { round: self.current_round, - value: data.clone(), - })); - //And store a prepare locally + data, + }; + self.send_message(OutMessage::Prepare(consensus_data.clone().into())); + // And store a prepare locally let operator_id = self.operator_id(); self.prepare_messages .entry(self.current_round) .or_default() - .insert( - operator_id, - PrepareMessage { - operator_id, - instance_height: self.instance_height, - round: self.current_round, - value: data, - }, - ); - self.set_state(InstanceState::Prepare); - debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + .entry(consensus_data.data) + .or_default() + .insert(operator_id); + + self.state = InstanceState::Prepare; + debug!(?self.state, "State Changed"); } - fn send_commit(&mut self, data: D) { - self.send_message(OutMessage::Commit(CommitMessage { - operator_id: self.operator_id(), - instance_height: self.instance_height, + fn send_commit(&mut self, data: ValidatedData) { + let consensus_data = ConsensusData { round: self.current_round, - value: data.clone(), - })); - //And store a commit locally + data, + }; + self.send_message(OutMessage::Commit(consensus_data.clone().into())); //And store a commit locally let operator_id = self.operator_id(); self.commit_messages .entry(self.current_round) .or_default() - .insert( - operator_id, - CommitMessage { - operator_id, - instance_height: self.instance_height, - round: self.current_round, - value: data, - }, - ); - self.set_state(InstanceState::Commit); - debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + .entry(consensus_data.data) + .or_default() + .insert(operator_id); + self.state = InstanceState::Commit; + debug!(?self.state, "State changed", ); } - fn send_round_change(&mut self, data: D) { - self.send_message(OutMessage::RoundChange(RoundChange { - operator_id: self.operator_id(), - instance_height: self.instance_height, - round_new: self.current_round + 1, - pr: self.config.pr, - pv: data, - })); - self.set_state(InstanceState::SentRoundChange); - debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + fn send_round_change(&mut self, round: Round) { + // Get the maximum round we have come to consensus on + let best_consensus = self + .past_consensus + .iter() + .max_by_key(|(&round, _v)| *round) + .map(|(&round, data)| ConsensusData { + round, + data: data.clone(), + }); + + self.send_message(OutMessage::RoundChange( + round, + best_consensus.clone().map(|v| v.into()), + )); + + // And store locally + let operator_id = self.operator_id(); + self.round_change_messages + .entry(round) + .or_default() + .insert(operator_id, best_consensus); + + self.state = InstanceState::SentRoundChange; + debug!(state = ?self.state, "New State"); } + fn send_completed(&mut self, completion_status: Completed) { - self.send_message(OutMessage::Completed(CompletedMessage { - operator_id: self.operator_id(), - instance_height: self.instance_height, - round: self.current_round, - completion_status, - })); - self.set_state(InstanceState::Complete); - debug!("ID{}: State - {:?}", self.operator_id(), self.config.state); + self.send_message(OutMessage::Completed(completion_status)); + self.state = InstanceState::Complete; + debug!(state = ?self.state, "New State"); } } diff --git a/anchor/qbft/src/tests.rs b/anchor/qbft/src/tests.rs index 069e049a..8239d7ec 100644 --- a/anchor/qbft/src/tests.rs +++ b/anchor/qbft/src/tests.rs @@ -3,7 +3,7 @@ //! These test individual components and also provide full end-to-end tests of the entire protocol. use super::*; -use config::DefaultLeaderFunction; +use crate::validation::{validate_data, ValidatedData}; use futures::stream::select_all; use futures::StreamExt; use std::cmp::Eq; @@ -12,19 +12,15 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tracing::debug; use tracing_subscriber::filter::EnvFilter; +use types::DefaultLeaderFunction; // HELPER FUNCTIONS FOR TESTS /// Enable debug logging for tests const ENABLE_TEST_LOGGING: bool = true; -/// The ID for the instances. -type Id = usize; /// A struct to help build and initialise a test of running instances -#[allow(dead_code)] struct TestQBFTCommitteeBuilder { - /// The size of the test committee. (Default is 5). - committee_size: usize, /// The configuration to use for all the instances. config: Config, /// Whether we should send back dummy validation input to each instance when it requests it. @@ -36,9 +32,16 @@ struct TestQBFTCommitteeBuilder { impl Default for TestQBFTCommitteeBuilder { fn default() -> Self { - TestQBFTCommitteeBuilder { + let config = Config:: { + // Set a default committee size of 5. committee_size: 5, - config: Config::default(), + // Populate the committee members + committee_members: (0..5).map(OperatorId::from).collect::>(), + ..Default::default() + }; + + TestQBFTCommitteeBuilder { + config, emulate_client_processor: true, emulate_broadcast_network: true, } @@ -49,7 +52,7 @@ impl Default for TestQBFTCommitteeBuilder { impl TestQBFTCommitteeBuilder { /// Sets the size of the testing committee. pub fn committee_size(mut self, committee_size: usize) -> Self { - self.committee_size = committee_size; + self.config.committee_size = committee_size; self } @@ -72,21 +75,22 @@ impl TestQBFTCommitteeBuilder { /// Consumes self and runs a test scenario. This returns a [`TestQBFTCommittee`] which /// represents a running quorum. - pub fn run( - self, - data: D, - ) -> TestQBFTCommittee { + pub fn run(self, data: D) -> TestQBFTCommittee + where + D: Debug + Default + Clone + Send + Sync + 'static + Eq + Hash, + { if ENABLE_TEST_LOGGING { let env_filter = EnvFilter::new("debug"); - tracing_subscriber::fmt().with_env_filter(env_filter).init(); + tracing_subscriber::fmt() + .compact() + .with_env_filter(env_filter) + .init(); } - let (senders, mut receivers) = - construct_and_run_committee(self.config, self.committee_size); + // Validate the data + let validated_data = validate_data(data).unwrap(); - if self.emulate_client_processor { - receivers = emulate_client_processor(receivers, senders.clone(), data); - } + let (senders, mut receivers) = construct_and_run_committee(self.config, validated_data); if self.emulate_broadcast_network { receivers = emulate_broadcast_network(receivers, senders.clone()); @@ -97,14 +101,14 @@ impl TestQBFTCommitteeBuilder { } /// A testing structure representing a committee of running instances +#[allow(dead_code)] struct TestQBFTCommittee { /// Channels to receive all the messages coming out of all the running qbft instances - receivers: Vec>>, + receivers: HashMap>>, /// Channels to send messages to all the running qbft instances - senders: Vec>>, + senders: HashMap>>, } -#[allow(dead_code)] impl TestQBFTCommittee where D: Debug + Default + Clone + Send + Sync + 'static + Eq + Hash, @@ -117,19 +121,23 @@ where // Cheeky Hack, might need to change in the future let receivers = std::mem::take(&mut self.receivers); - let mut all_recievers = select_all( - receivers - .into_iter() - .enumerate() - .map(|(id, receiver)| InstanceStream:: { id, receiver }), - ); + let mut all_recievers = + select_all( + receivers + .into_iter() + .map(|(operator_id, receiver)| InstanceStream:: { + operator_id, + receiver, + }), + ); while all_recievers.next().await.is_some() {} debug!("Completed"); } /// Sends a message to an instance. Specify its index (or id) and the message you want to send. - pub fn send_message(&mut self, instance: usize, message: InMessage) { - let _ = self.senders[instance].send(message); + #[allow(dead_code)] + pub fn send_message(&mut self, operator_id: &OperatorId, message: InMessage) { + let _ = self.senders.get(operator_id).unwrap().send(message); } } @@ -138,7 +146,7 @@ where // I wanted a Stream that returns the instance id as well as the message when it becomes ready. // TODO: Can probably group this thing via a MAP in a stream function. struct InstanceStream { - id: Id, + operator_id: OperatorId, receiver: UnboundedReceiver>, } @@ -146,12 +154,12 @@ impl futures::Stream for InstanceStream where D: Debug + Default + Clone + Eq + Hash, { - type Item = (Id, OutMessage); + type Item = (OperatorId, OutMessage); // Required method fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match self.receiver.poll_recv(cx) { - Poll::Ready(Some(message)) => Poll::Ready(Some((self.id, message))), + Poll::Ready(Some(message)) => Poll::Ready(Some((self.operator_id, message))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, } @@ -165,29 +173,27 @@ where #[allow(clippy::type_complexity)] fn construct_and_run_committee( mut config: Config, - committee_size: usize, + validated_data: ValidatedData, ) -> ( - Vec>>, - Vec>>, + HashMap>>, + HashMap>>, ) { // The ID of a committee is just an integer in [0,committee_size) // A collection of channels to send messages to each instance. - let mut senders = Vec::with_capacity(committee_size); + let mut senders = HashMap::with_capacity(config.committee_size); // A collection of channels to receive messages from each instances. // We will redirect messages to each instance, simulating a broadcast network. - let mut receivers = Vec::with_capacity(committee_size); + let mut receivers = HashMap::with_capacity(config.committee_size); - for id in 0..committee_size { + for id in 0..config.committee_size { // Creates a new instance - // 0 config.id = 0 - config.operator_id = id; - let (sender, receiver, instance) = Qbft::new(config.clone()); - senders.push(sender); - receivers.push(receiver); + config.operator_id = OperatorId::from(id); + let (sender, receiver, instance) = Qbft::new(config.clone(), validated_data.clone()); + senders.insert(config.operator_id, sender); + receivers.insert(config.operator_id, receiver); // spawn the instance - // TODO: Make the round time adjustable, to get deterministic results for testing. debug!(id, "Starting instance"); tokio::spawn(instance.start_instance()); } @@ -195,53 +201,6 @@ fn construct_and_run_committee( - receivers: Vec>>, - senders: Vec>>, - data: D, -) -> Vec>> { - debug!("Emulating client processor requests"); - let handle_out_messages_fn = - move |message: OutMessage, - index: usize, - senders: &mut Vec>>, - new_senders: &mut Vec>>| { - // Duplicate the message to the new channel - let _ = new_senders[index].send(message.clone()); - if let OutMessage::GetData(request) = message.clone() { - let _ = senders[index].send(InMessage::RecvData(RecvData { - operator_id: request.operator_id, - round: request.round, - instance_height: request.instance_height, - value: data.clone(), - })); - - debug!("responding to GetData") - } - if let OutMessage::Completed(completed_message) = message.clone() { - let _ = senders[index].send(InMessage::RequestClose(CloseMessage { - operator_id: completed_message.operator_id, - })); - } - }; - - // Get messages from each instance, apply the function above and return the resulting channels - generically_handle_messages(receivers, senders, handle_out_messages_fn) -} - /// This function takes the senders and receivers and will duplicate messages from all instances /// and send those messages to all other instances. /// This simulates a kind of broadcast network. @@ -252,57 +211,63 @@ fn emulate_client_processor( - receivers: Vec>>, - senders: Vec>>, -) -> Vec>> { + receivers: HashMap>>, + senders: HashMap>>, +) -> HashMap>> { debug!("Emulating a gossip network"); let emulate_gossip_network_fn = |message: OutMessage, - index: usize, - senders: &mut Vec>>, - new_senders: &mut Vec>>| { + operator_id: &OperatorId, + senders: &mut HashMap>>, + new_senders: &mut HashMap>>| { // Duplicate the message to the new channel - let _ = new_senders[index].send(message.clone()); + let _ = new_senders.get(operator_id).unwrap().send(message.clone()); match message { - OutMessage::Propose(propose_message) => { + OutMessage::Propose(consensus_data) => { // Send the message to all other nodes senders .iter_mut() - .enumerate() - .for_each(|(current_index, sender)| { - if current_index != index { - let _ = sender.send(InMessage::Propose(propose_message.clone())); + .for_each(|(current_operator_id, sender)| { + if current_operator_id != operator_id { + let _ = sender + .send(InMessage::Propose(*operator_id, consensus_data.clone())); } }); } OutMessage::Prepare(prepare_message) => { senders .iter_mut() - .enumerate() - .for_each(|(current_index, sender)| { - if current_index != index { - let _ = sender.send(InMessage::Prepare(prepare_message.clone())); + .for_each(|(current_operator_id, sender)| { + if current_operator_id != operator_id { + let _ = sender.send(InMessage::Prepare( + *operator_id, + prepare_message.clone(), + )); } }); } OutMessage::Commit(commit_message) => { + // Ignoring commits in round 2 for testing senders .iter_mut() - .enumerate() - .for_each(|(current_index, sender)| { - if current_index != index { - let _ = sender.send(InMessage::Commit(commit_message.clone())); + .for_each(|(current_operator_id, sender)| { + if current_operator_id != operator_id { + let _ = sender + .send(InMessage::Commit(*operator_id, commit_message.clone())); } - }); + }) } - OutMessage::RoundChange(round_change) => { + OutMessage::RoundChange(round, optional_data) => { senders .iter_mut() - .enumerate() - .for_each(|(current_index, sender)| { - if current_index != index { - let _ = sender.send(InMessage::RoundChange(round_change.clone())); + .for_each(|(current_operator_id, sender)| { + if current_operator_id != operator_id { + let _ = sender.send(InMessage::RoundChange( + *operator_id, + round, + optional_data.clone(), + )); } }); } @@ -317,20 +282,20 @@ fn emulate_broadcast_network( - receivers: Vec>>, - mut senders: Vec>>, + receivers: HashMap>>, + mut senders: HashMap>>, // This is a function that takes the outbound message from the instances and the old inbound // sending channel and the new inbound sending channel. Given the outbound message, we can send a // response to the old inbound sender, and potentially duplicate the message to the new receiver // via the second Sender. mut message_handling: T, -) -> Vec>> +) -> HashMap>> where T: FnMut( OutMessage, - usize, - &mut Vec>>, - &mut Vec>>, + &OperatorId, + &mut HashMap>>, + &mut HashMap>>, ) + 'static + Send @@ -338,14 +303,14 @@ where { // Build a new set of channels to replace the ones we have taken ownership of. We will just // forward network messages to these channels - let mut new_receivers = Vec::with_capacity(receivers.len()); - let mut new_senders = Vec::with_capacity(senders.len()); + let mut new_receivers = HashMap::with_capacity(receivers.len()); + let mut new_senders = HashMap::with_capacity(senders.len()); // Populate the new channels. - for _id in 0..receivers.len() { + for operator_id in receivers.keys() { let (new_sender, new_receiver) = tokio::sync::mpsc::unbounded_channel::>(); - new_receivers.push(new_receiver); - new_senders.push(new_sender); + new_receivers.insert(*operator_id, new_receiver); + new_senders.insert(*operator_id, new_sender); } // Run a task to handle all the out messages @@ -357,21 +322,23 @@ where // which sender to forward to. For this reason we make a little intermediate type with the // index. - let mut grouped_receivers = select_all(receivers.into_iter().enumerate().map( - |(index, receiver)| InstanceStream { - id: index, + let mut grouped_receivers = select_all(receivers.into_iter().map( + |(operator_id, receiver)| InstanceStream { + operator_id, receiver, }, )); - while let Some((index, out_message)) = grouped_receivers.next().await { - /*debug!( - ?out_message, - "Instance" = index, - "Handling message from instance" - );*/ + while let Some((operator_id, out_message)) = grouped_receivers.next().await { + /* + debug!( + ?out_message, + operator = *operator_id, + "Handling message from instance" + ); + */ // Custom handling of the out message - message_handling(out_message, index, &mut senders, &mut new_senders); + message_handling(out_message, &operator_id, &mut senders, &mut new_senders); // Add back a new future to await for the next message } @@ -403,9 +370,8 @@ where #[tokio::test] async fn test_basic_committee() { // Construct and run a test committee - let mut test_instance = TestQBFTCommitteeBuilder::default().run::(21); - // assert_eq!(1, 2); + let mut test_instance = TestQBFTCommitteeBuilder::default().run(21); // Wait until consensus is reached or all the instances have ended test_instance.wait_until_end().await; diff --git a/anchor/qbft/src/types.rs b/anchor/qbft/src/types.rs new file mode 100644 index 00000000..ff7ab577 --- /dev/null +++ b/anchor/qbft/src/types.rs @@ -0,0 +1,136 @@ +//! A collection of types used by the QBFT modules +use crate::validation::ValidatedData; +use derive_more::{Add, Deref, From}; +use std::cmp::Eq; +use std::fmt::Debug; +use std::hash::Hash; + +/// Generic LeaderFunction trait to allow for future implementations of the QBFT module +pub trait LeaderFunction { + /// Returns true if we are the leader + fn leader_function( + &self, + operator_id: &OperatorId, + round: Round, + instance_height: InstanceHeight, + committee_size: usize, + ) -> bool; +} + +#[derive(Debug, Clone)] +pub struct DefaultLeaderFunction {} + +impl LeaderFunction for DefaultLeaderFunction { + fn leader_function( + &self, + operator_id: &OperatorId, + round: Round, + instance_height: InstanceHeight, + committee_size: usize, + ) -> bool { + *operator_id == ((*round + *instance_height) % committee_size).into() + } +} + +/// This represents an individual round, these change on regular time intervals +#[derive(Clone, Copy, Debug, Deref, Default, Add, PartialEq, Eq, Hash, PartialOrd)] +pub struct Round(usize); + +impl Round { + /// Returns the next round + pub fn next(&self) -> Round { + Round(self.0 + 1) + } + + /// Sets the current round + pub fn set(&mut self, round: Round) { + *self = round; + } +} + +/// The operator that is participating in the consensus instance. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Hash, From, Deref)] +pub struct OperatorId(usize); + +/// The instance height behaves like an "ID" for the QBFT instance. It is used to uniquely identify +/// different instances, that have the same operator id. +#[derive(Clone, Copy, Debug, Default)] +pub struct InstanceHeight(usize); + +impl Deref for InstanceHeight { + type Target = usize; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[derive(Debug, Clone, Copy)] +#[repr(u8)] +pub enum InstanceState { + /// Awaiting a propose from a leader + AwaitingProposal, + /// Awaiting consensus on PREPARE messages + Prepare = 1, + /// Awaiting consensus on COMMIT messages + Commit, + /// We have sent a round change message + SentRoundChange = 4, + /// The consensus instance is complete + Complete, +} + +/// Generic Data trait to allow for future implementations of the QBFT module +// Messages that can be received from the message_in channel +#[derive(Debug, Clone)] +pub enum InMessage { + /// A PROPOSE message to be sent on the network. + Propose(OperatorId, ConsensusData), + /// A PREPARE message to be sent on the network. + Prepare(OperatorId, ConsensusData), + /// A commit message to be sent on the network. + Commit(OperatorId, ConsensusData), + /// Round change message received from network + RoundChange(OperatorId, Round, Option>), +} + +/// Messages that may be sent to the message_out channel from the instance to the client processor +#[derive(Debug, Clone)] +pub enum OutMessage { + /// A PROPOSE message to be sent on the network. + Propose(ConsensusData), + /// A PREPARE message to be sent on the network. + Prepare(ConsensusData), + /// A commit message to be sent on the network. + Commit(ConsensusData), + /// The round has ended, send this message to the network to inform all participants. + RoundChange(Round, Option>), + /// The consensus instance has completed. + Completed(Completed), +} +/// Type definitions for the allowable messages +/// This holds the consensus data for a given round. +#[derive(Debug, Clone)] +pub struct ConsensusData { + /// The round that this data corresponds to + pub round: Round, + /// The actual value we reached consensus on. + pub data: D, +} + +impl From>> for ConsensusData { + fn from(value: ConsensusData>) -> Self { + ConsensusData { + round: value.round, + data: value.data.data, + } + } +} + +#[derive(Debug, Clone)] +/// The consensus instance has finished. +pub enum Completed { + /// The instance has timed out. + TimedOut, + /// Consensus was reached on the provided data. + Success(D), +} diff --git a/anchor/qbft/src/validation.rs b/anchor/qbft/src/validation.rs new file mode 100644 index 00000000..d6cbdd9c --- /dev/null +++ b/anchor/qbft/src/validation.rs @@ -0,0 +1,32 @@ +//! Validation for data function + +use crate::types::ConsensusData; + +/// The list of possible validation errors that can occur +#[derive(Debug)] +pub enum ValidationError { + Invalid, +} + +/// Data that has been validated by our validation function. +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +pub struct ValidatedData { + pub data: D, +} + +/// This verifies the data is correct an appropriate to use for consensus. +pub fn validate_data(data: D) -> Result, ValidationError> { + Ok(ValidatedData { data }) +} + +// Validates consensus data +pub fn validate_consensus_data( + consensus_data: ConsensusData, +) -> Result>, ValidationError> { + let round = consensus_data.round; + let validated_data = validate_data(consensus_data.data)?; + Ok(ConsensusData { + round, + data: validated_data, + }) +} diff --git a/anchor/src/main.rs b/anchor/src/main.rs index b95b244a..b817f662 100644 --- a/anchor/src/main.rs +++ b/anchor/src/main.rs @@ -1,7 +1,6 @@ use tracing::{error, info}; mod environment; - use client::Client; use environment::Environment; use task_executor::ShutdownReason;