@@ -20,27 +20,56 @@ pub type Message = JdsMessages<'static>;
20
20
pub type StdFrame = StandardSv2Frame < Message > ;
21
21
pub type EitherFrame = StandardEitherFrame < Message > ;
22
22
23
+ /// Represents the Job Declarator Server role in a Stratum V2 setup.
24
+ ///
25
+ /// Stratum V2 protocol separates the Job Declaration role into two parts: the Job Declarator Server
26
+ /// (JDS) and the Job Declarator Client (JDC).
27
+ ///
28
+ /// JDS is responsible for maintaining a copy of the mempool by requesting updates from a Bitcoin
29
+ /// node through the RPC interface. It is also acting as an upstream for JDC, allowing it to submit
30
+ /// solutions and verify transactions.
31
+ ///
32
+ /// JDS is usually run by a mining pool operator.
23
33
#[ derive( Debug , Clone ) ]
24
34
pub struct JobDeclaratorServer {
25
35
config : JobDeclaratorServerConfig ,
26
36
}
27
37
28
38
impl JobDeclaratorServer {
39
+ /// Creates a new instance of the Job Declarator Server.
29
40
pub fn new ( config : JobDeclaratorServerConfig ) -> Result < Self , Box < JdsError > > {
30
- let url = config. core_rpc_url ( ) . to_string ( ) + ":" + & config. core_rpc_port ( ) . to_string ( ) ;
41
+ let url =
42
+ config. core_rpc_url ( ) . to_string ( ) + ":" + & config. core_rpc_port ( ) . clone ( ) . to_string ( ) ;
31
43
if !is_valid_url ( & url) {
32
44
return Err ( Box :: new ( JdsError :: InvalidRPCUrl ) ) ;
33
45
}
34
46
Ok ( Self { config } )
35
47
}
48
+ /// Starts the Job Declarator Server.
49
+ ///
50
+ /// This will start the Job Declarator Server and run it until it is interrupted.
51
+ ///
52
+ /// JDS initialization starts with initialization of the mempool, which is done by connecting to
53
+ /// Bitcoin node. An async job is then started in order to update the mempool at regular
54
+ /// intervals. After that, JDS will start a TCP server to listen for incoming connections
55
+ /// from JDC(s).
56
+ ///
57
+ /// In total JDS maintains three channels:
58
+ /// - `new_block_receiver` is used to manage new blocks found by downstreams(JDCs).
59
+ /// - `status_rx` is used to manage JDS internal state.
60
+ /// - `receiver_add_txs_to_mempool` is used to update local mempool with transactions coming
61
+ /// from JDC(s).
36
62
pub async fn start ( & self ) -> Result < ( ) , JdsError > {
37
63
let config = self . config . clone ( ) ;
38
64
let url = config. core_rpc_url ( ) . to_string ( ) + ":" + & config. core_rpc_port ( ) . to_string ( ) ;
39
65
let username = config. core_rpc_user ( ) ;
40
66
let password = config. core_rpc_pass ( ) ;
41
- // TODO should we manage what to do when the limit is reaced?
67
+ // This channel is managing new blocks found by downstreams(JDCs).
68
+ // JDS will listen for new blocks at `new_block_receiver` and update the mempool
69
+ // accordingly.
42
70
let ( new_block_sender, new_block_receiver) : ( Sender < String > , Receiver < String > ) =
43
71
bounded ( 10 ) ;
72
+ // new empty mempool
44
73
let mempool = Arc :: new ( Mutex :: new ( mempool:: JDsMempool :: new (
45
74
url. clone ( ) ,
46
75
username. to_string ( ) ,
@@ -50,16 +79,19 @@ impl JobDeclaratorServer {
50
79
let mempool_update_interval = config. mempool_update_interval ( ) ;
51
80
let mempool_cloned_ = mempool. clone ( ) ;
52
81
let mempool_cloned_1 = mempool. clone ( ) ;
82
+ // make sure we can access bitcoin node through RPC
53
83
if let Err ( e) = mempool:: JDsMempool :: health ( mempool_cloned_1. clone ( ) ) . await {
54
84
error ! ( "{:?}" , e) ;
55
85
return Err ( JdsError :: MempoolError ( e) ) ;
56
86
}
87
+ // This channel is managing JDS internal state.
57
88
let ( status_tx, status_rx) = unbounded ( ) ;
58
89
let sender = status:: Sender :: Downstream ( status_tx. clone ( ) ) ;
59
90
let mut last_empty_mempool_warning =
60
91
std:: time:: Instant :: now ( ) . sub ( std:: time:: Duration :: from_secs ( 60 ) ) ;
61
92
62
93
let sender_update_mempool = sender. clone ( ) ;
94
+ // update the mempool at regular intervals
63
95
task:: spawn ( async move {
64
96
loop {
65
97
let update_mempool_result: Result < ( ) , mempool:: error:: JdsMempoolError > =
@@ -96,6 +128,9 @@ impl JobDeclaratorServer {
96
128
97
129
let mempool_cloned = mempool. clone ( ) ;
98
130
let sender_submit_solution = sender. clone ( ) ;
131
+ // * start an async job to submit solutions to the mempool
132
+ // * this job will take solutions from JDC and submit them to the mempool
133
+ // * the job is transferred to the mempool module via a channel(new_block_receiver/sender)
99
134
task:: spawn ( async move {
100
135
loop {
101
136
let result = mempool:: JDsMempool :: on_submit ( mempool_cloned. clone ( ) ) . await ;
@@ -120,7 +155,10 @@ impl JobDeclaratorServer {
120
155
121
156
let cloned = config. clone ( ) ;
122
157
let mempool_cloned = mempool. clone ( ) ;
158
+ // JDS will update the local mempool when a new transaction is received from JDC(s) through
159
+ // this channel
123
160
let ( sender_add_txs_to_mempool, receiver_add_txs_to_mempool) = unbounded ( ) ;
161
+ // start a TCP server to listen for incoming connections from JDC(s)
124
162
task:: spawn ( async move {
125
163
JobDeclarator :: start (
126
164
cloned,
@@ -131,6 +169,7 @@ impl JobDeclaratorServer {
131
169
)
132
170
. await
133
171
} ) ;
172
+ // start a task to update local mempool with transactions coming from JDC(s)
134
173
task:: spawn ( async move {
135
174
loop {
136
175
if let Ok ( add_transactions_to_mempool) = receiver_add_txs_to_mempool. recv ( ) . await {
0 commit comments