Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document pub code in JDS #1522

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions roles/jd-server/src/lib/error.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use crate::mempool::error::JdsMempoolError;
use roles_logic_sv2::parsers::Mining;
use std::{
convert::From,
fmt::Debug,
sync::{MutexGuard, PoisonError},
};

use roles_logic_sv2::parsers::Mining;

use crate::mempool::error::JdsMempoolError;

/// Represents an error that can occur in the JDS.
#[derive(std::fmt::Debug)]
pub enum JdsError {
Io(std::io::Error),
Expand Down
12 changes: 12 additions & 0 deletions roles/jd-server/src/lib/job_declarator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub struct AddTrasactionsToMempool {
pub sender_add_txs_to_mempool: Sender<AddTrasactionsToMempoolInner>,
}

/// Represents a downstream connection.
///
/// This struct is used in order to give a representation to each downstream connection that the
/// JDS has.
#[derive(Debug)]
pub struct JobDeclaratorDownstream {
async_mining_allowed: bool,
Expand All @@ -72,6 +76,7 @@ pub struct JobDeclaratorDownstream {
}

impl JobDeclaratorDownstream {
/// Create a new [`JobDeclaratorDownstream`] instance.
pub fn new(
async_mining_allowed: bool,
receiver: Receiver<EitherFrame>,
Expand Down Expand Up @@ -193,6 +198,7 @@ impl JobDeclaratorDownstream {
known_transactions
}

/// Send a message to the downstream connection.
pub async fn send(
self_mutex: Arc<Mutex<Self>>,
message: roles_logic_sv2::parsers::JobDeclaration<'static>,
Expand All @@ -202,6 +208,8 @@ impl JobDeclaratorDownstream {
sender.send(sv2_frame.into()).await.map_err(|_| ())?;
Ok(())
}

/// Start the downstream connection handler.
pub fn start(
self_mutex: Arc<Mutex<Self>>,
tx_status: status::Sender,
Expand Down Expand Up @@ -425,9 +433,13 @@ fn _get_random_token() -> B0255<'static> {
inner.to_vec().try_into().unwrap()
}

/// Represents a server that JDS uses to communicate with downstream(usually JDC) connections.
pub struct JobDeclarator {}

impl JobDeclarator {
/// Start a TCP server that listens for incoming connections.
///
/// The server will accept incoming connections and spawn a new task for each new one.
pub async fn start(
config: JobDeclaratorServerConfig,
status_tx: crate::status::Sender,
Expand Down
1 change: 1 addition & 0 deletions roles/jd-server/src/lib/mempool/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use rpc_sv2::mini_rpc_client::RpcError;
use std::{convert::From, sync::PoisonError};
use tracing::{error, warn};

/// Represents the possible errors that can occur in the JDS mempool.
#[derive(Debug)]
pub enum JdsMempoolError {
EmptyMempool,
Expand Down
18 changes: 14 additions & 4 deletions roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ pub struct TransactionWithHash {
pub tx: Option<(Transaction, u32)>,
}

/// A struct that represents the mempool version of JDS.
///
/// This struct is used in order to maintain an internal copy of the mempool of the JDS. This is
/// useful in order to keep track of the transactions that are in the mempool and to be able to
/// provide the transactions to JDCs when they request them.
#[derive(Clone, Debug)]
pub struct JDsMempool {
pub mempool: HashMap<Txid, Option<(Transaction, u32)>>,
Expand All @@ -24,6 +29,9 @@ pub struct JDsMempool {
}

impl JDsMempool {
/// Return Bitcoin node RPC client.
///
/// This will return none if the URL is not an http or https URL.
pub fn get_client(&self) -> Option<mini_rpc_client::MiniRpcClient> {
let url = self.url.as_str();
if url.contains("http") {
Expand All @@ -43,6 +51,7 @@ impl JDsMempool {
tx_list_
}

/// Create a new [`JDsMempool`] instance.
pub fn new(
url: String,
username: String,
Expand All @@ -59,17 +68,15 @@ impl JDsMempool {
}
}

/// Checks if the rpc client is accessible.
/// Checks if Bitcoin node RPC client is accessible.
pub async fn health(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {
let client = self_
.safe_lock(|a| a.get_client())?
.ok_or(JdsMempoolError::NoClient)?;
client.health().await.map_err(JdsMempoolError::Rpc)
}

// this functions fill in the mempool the transactions with the given txid and insert the given
// transactions. The ids are for the transactions that are already known to the node, the
// unknown transactions are provided directly as a vector
/// Publish transactions to the Bitcoin node.
pub async fn add_tx_data_to_mempool(
self_: Arc<Mutex<Self>>,
add_txs_to_mempool_inner: AddTrasactionsToMempoolInner,
Expand Down Expand Up @@ -123,6 +130,7 @@ impl JDsMempool {
Ok(())
}

/// Get the latest version of the mempool from the Bitcoin node and update the internal copy.
pub async fn update_mempool(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {
let client = self_
.safe_lock(|x| x.get_client())?
Expand Down Expand Up @@ -155,6 +163,7 @@ impl JDsMempool {
}
}

/// Handle a new block received from downstream connections.
pub async fn on_submit(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {
let new_block_receiver: Receiver<String> =
self_.safe_lock(|x| x.new_block_receiver.clone())?;
Expand All @@ -171,6 +180,7 @@ impl JDsMempool {
Ok(())
}

/// will be deprecated soon
pub fn to_short_ids(&self, nonce: u64) -> Option<HashMap<[u8; 6], TransactionWithHash>> {
let mut ret = HashMap::new();
for tx in &self.mempool {
Expand Down
43 changes: 41 additions & 2 deletions roles/jd-server/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,56 @@ pub type Message = JdsMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;
pub type EitherFrame = StandardEitherFrame<Message>;

/// Represents the Job Declarator Server role in a Stratum V2 setup.
///
/// Stratum V2 protocol separates the Job Declaration role into two parts: the Job Declarator Server
/// (JDS) and the Job Declarator Client (JDC).
///
/// JDS is responsible for maintaining a copy of the mempool by requesting updates from a Bitcoin
/// node through the RPC interface. It is also acting as an upstream for JDC, allowing it to submit
/// solutions and verify transactions.
///
/// JDS is usually run by a mining pool operator.
#[derive(Debug, Clone)]
pub struct JobDeclaratorServer {
config: JobDeclaratorServerConfig,
}

impl JobDeclaratorServer {
/// Creates a new instance of the Job Declarator Server.
pub fn new(config: JobDeclaratorServerConfig) -> Result<Self, Box<JdsError>> {
let url = config.core_rpc_url().to_string() + ":" + &config.core_rpc_port().to_string();
let url =
config.core_rpc_url().to_string() + ":" + &config.core_rpc_port().clone().to_string();
if !is_valid_url(&url) {
return Err(Box::new(JdsError::InvalidRPCUrl));
}
Ok(Self { config })
}
/// Starts the Job Declarator Server.
///
/// This will start the Job Declarator Server and run it until it is interrupted.
///
/// JDS initialization starts with initialization of the mempool, which is done by connecting to
/// Bitcoin node. An async job is then started in order to update the mempool at regular
/// intervals. After that, JDS will start a TCP server to listen for incoming connections
/// from JDC(s).
///
/// In total JDS maintains three channels:
/// - `new_block_receiver` is used to manage new blocks found by downstreams(JDCs).
/// - `status_rx` is used to manage JDS internal state.
/// - `receiver_add_txs_to_mempool` is used to update local mempool with transactions coming
/// from JDC(s).
pub async fn start(&self) -> Result<(), JdsError> {
let config = self.config.clone();
let url = config.core_rpc_url().to_string() + ":" + &config.core_rpc_port().to_string();
let username = config.core_rpc_user();
let password = config.core_rpc_pass();
// TODO should we manage what to do when the limit is reaced?
// This channel is managing new blocks found by downstreams(JDCs).
// JDS will listen for new blocks at `new_block_receiver` and update the mempool
// accordingly.
let (new_block_sender, new_block_receiver): (Sender<String>, Receiver<String>) =
bounded(10);
// new empty mempool
let mempool = Arc::new(Mutex::new(mempool::JDsMempool::new(
url.clone(),
username.to_string(),
Expand All @@ -50,16 +79,19 @@ impl JobDeclaratorServer {
let mempool_update_interval = config.mempool_update_interval();
let mempool_cloned_ = mempool.clone();
let mempool_cloned_1 = mempool.clone();
// make sure we can access bitcoin node through RPC
if let Err(e) = mempool::JDsMempool::health(mempool_cloned_1.clone()).await {
error!("{:?}", e);
return Err(JdsError::MempoolError(e));
}
// This channel is managing JDS internal state.
let (status_tx, status_rx) = unbounded();
let sender = status::Sender::Downstream(status_tx.clone());
let mut last_empty_mempool_warning =
std::time::Instant::now().sub(std::time::Duration::from_secs(60));

let sender_update_mempool = sender.clone();
// update the mempool at regular intervals
task::spawn(async move {
loop {
let update_mempool_result: Result<(), mempool::error::JdsMempoolError> =
Expand Down Expand Up @@ -96,6 +128,9 @@ impl JobDeclaratorServer {

let mempool_cloned = mempool.clone();
let sender_submit_solution = sender.clone();
// * start an async job to submit solutions to the mempool
// * this job will take solutions from JDC and submit them to the mempool
// * the job is transferred to the mempool module via a channel(new_block_receiver/sender)
task::spawn(async move {
loop {
let result = mempool::JDsMempool::on_submit(mempool_cloned.clone()).await;
Expand All @@ -120,7 +155,10 @@ impl JobDeclaratorServer {

let cloned = config.clone();
let mempool_cloned = mempool.clone();
// JDS will update the local mempool when a new transaction is received from JDC(s) through
// this channel
let (sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded();
// start a TCP server to listen for incoming connections from JDC(s)
task::spawn(async move {
JobDeclarator::start(
cloned,
Expand All @@ -131,6 +169,7 @@ impl JobDeclaratorServer {
)
.await
});
// start a task to update local mempool with transactions coming from JDC(s)
task::spawn(async move {
loop {
if let Ok(add_transactions_to_mempool) = receiver_add_txs_to_mempool.recv().await {
Expand Down
Loading