1+ use std:: collections:: { HashMap , HashSet } ;
2+
13use flarch:: {
24 broker:: { SubsystemHandler , TranslateFrom , TranslateInto } ,
35 nodeids:: { NodeID , U256 } ,
46 platform_async_trait,
57} ;
6- use rand:: seq:: SliceRandom ;
8+ use itertools:: Itertools ;
9+ use rand:: { seq:: SliceRandom , Rng } ;
710use serde:: { Deserialize , Serialize } ;
811use tokio:: sync:: watch;
912
1013use crate :: {
14+ dht_storage:: messages:: MessageClosest ,
1115 nodeconfig:: NodeInfo ,
1216 router:: messages:: { NetworkWrapper , RouterIn , RouterOut } ,
1317 timer:: TimerMessage ,
@@ -19,6 +23,7 @@ use super::{
1923} ;
2024
2125pub static mut EVIL_NO_FORWARD : bool = false ;
26+ pub static mut LOCAL_BLACKLISTS : bool = false ;
2227
2328/// These are the messages which will be exchanged between the nodes for this
2429/// module.
@@ -73,6 +78,11 @@ pub(super) struct Messages {
7378 // This is different than core.active, because there can be connections from other
7479 // modules, or connections from another node.
7580 connected : Vec < NodeID > ,
81+
82+ // readFlo id -> node id
83+ requests_in_flight : HashMap < U256 , U256 > ,
84+
85+ blacklisted_nodes : HashSet < U256 > ,
7686}
7787
7888impl Messages {
@@ -84,6 +94,8 @@ impl Messages {
8494 core : Kademlia :: new ( root, cfg) ,
8595 tx : Some ( tx) ,
8696 connected : vec ! [ ] ,
97+ requests_in_flight : HashMap :: new ( ) ,
98+ blacklisted_nodes : HashSet :: new ( ) ,
8799 } ,
88100 rx,
89101 )
@@ -233,22 +245,131 @@ impl Messages {
233245 }
234246 }
235247
248+ fn get_readflo_id ( & self , msg : NetworkWrapper ) -> Option < U256 > {
249+ if msg. module == "DHTStorage" {
250+ match msg. unwrap_yaml ( "DHTStorage" ) {
251+ Some ( msg_readflo) => match msg_readflo {
252+ MessageClosest :: ReadFlo ( _id, request_id) => {
253+ log:: warn!( "sending MessageClosest::ReadFlo {}" , msg. msg. clone( ) ) ;
254+ Some ( request_id. clone ( ) )
255+ }
256+ _ => None ,
257+ } ,
258+ None => None ,
259+ }
260+ } else {
261+ None
262+ }
263+ }
264+
265+ fn blacklist_bad_nodes ( & mut self ) {
266+ unsafe {
267+ if !self :: LOCAL_BLACKLISTS {
268+ return ;
269+ }
270+ }
271+
272+ let in_flight = self . requests_in_flight . clone ( ) ;
273+ in_flight
274+ . iter ( )
275+ . map ( |( _request_id, node_id) | node_id)
276+ . counts ( )
277+ . iter ( )
278+ . for_each ( |( node_id_request, count) | {
279+ let node_id1 = ( * node_id_request) . clone ( ) ;
280+ if count. clone ( ) > 10 {
281+ // made 10 requests to this node
282+ // remove the node from requests_in_flight to reset the counter
283+ // and blacklist the node
284+ log:: warn!( "blacklisting {node_id1}." ) ;
285+ let request_ids = self
286+ . requests_in_flight
287+ . clone ( )
288+ . iter ( )
289+ . filter ( |( _request_id, node_id2) | node_id1 == * * node_id2)
290+ . map ( |( request_id, _node_id) | request_id)
291+ . copied ( )
292+ . collect_vec ( ) ;
293+ for value in request_ids {
294+ self . requests_in_flight . remove_entry ( & value) ;
295+ }
296+ self . core . remove_node ( & node_id1) ;
297+ self . blacklisted_nodes . insert ( node_id1) ;
298+ }
299+ } ) ;
300+ }
301+
302+ fn randomly_whitelist ( & mut self ) {
303+ unsafe {
304+ if !self :: LOCAL_BLACKLISTS {
305+ return ;
306+ }
307+ }
308+
309+ let mut rng = rand:: thread_rng ( ) ;
310+ if !self . blacklisted_nodes . is_empty ( ) && rng. gen_bool ( 0.05 ) {
311+ let len = self . blacklisted_nodes . len ( ) ;
312+ if len > 0 {
313+ let random_index = rng. gen_range ( 0 ..len) ;
314+ let node_id_opt = self . blacklisted_nodes . iter ( ) . nth ( random_index) . clone ( ) ;
315+ if let Some ( node_id) = node_id_opt {
316+ log:: warn!( "whitelisting {node_id}." ) ;
317+ self . core . add_node ( node_id. clone ( ) ) ;
318+ self . blacklisted_nodes . remove ( & ( node_id. clone ( ) ) ) ;
319+ }
320+ }
321+ }
322+ }
323+
324+ fn log_requests_in_flight ( & self ) {
325+ log:: info!( "counts:" ) ;
326+ self . requests_in_flight
327+ . iter ( )
328+ . map ( |tuple| tuple. 1 )
329+ . counts ( )
330+ . iter ( )
331+ . for_each ( |item| log:: info!( " {} -> {}" , item. 0 , item. 1 ) )
332+ }
333+
236334 fn message_closest (
237- & self ,
335+ & mut self ,
238336 orig : NodeID ,
239337 last_hop : NodeID ,
240338 key : U256 ,
241339 msg : NetworkWrapper ,
242340 ) -> Vec < InternOut > {
243- match self
341+ let readflo_id_opt = self . get_readflo_id ( msg. clone ( ) ) ;
342+
343+ self . blacklist_bad_nodes ( ) ;
344+
345+ let closest = self
244346 . closest_or_connected ( key. clone ( ) , Some ( & last_hop) )
245347 . first ( )
246- {
247- Some ( & next_hop) => vec ! [
248- ModuleMessage :: Closest ( orig, key, msg. clone( ) ) . wrapper_network( next_hop) ,
249- DHTRouterOut :: MessageRouting ( orig, last_hop, next_hop, key, msg) . into( ) ,
250- ] ,
348+ . copied ( ) ;
349+
350+ self . randomly_whitelist ( ) ;
351+
352+ match closest. clone ( ) {
353+ Some ( next_hop) => {
354+ if self . blacklisted_nodes . contains ( & next_hop) {
355+ log:: error!( "IMPOSSIBLE?: sending a message to a blacklisted node." ) ;
356+ }
357+
358+ if let Some ( readflo_id) = readflo_id_opt {
359+ log:: info!( "NEXT HOP: {}" , next_hop) ;
360+ self . requests_in_flight
361+ . insert ( readflo_id. clone ( ) , next_hop. clone ( ) ) ;
362+ self . log_requests_in_flight ( ) ;
363+ }
364+ vec ! [
365+ ModuleMessage :: Closest ( orig, key, msg. clone( ) ) . wrapper_network( next_hop) ,
366+ DHTRouterOut :: MessageRouting ( orig, last_hop, next_hop, key, msg) . into( ) ,
367+ ]
368+ }
251369 None => {
370+ if readflo_id_opt. is_some ( ) {
371+ log:: warn!( "NO NEXT HOP!" ) ;
372+ }
252373 if key == self . core . root {
253374 vec ! [ DHTRouterOut :: MessageDest ( orig, last_hop, msg) . into( ) ]
254375 } else {
0 commit comments