diff --git a/src/net/offline_fluereflows.rs b/src/net/offline_fluereflows.rs index 4385ded..72c08d5 100644 --- a/src/net/offline_fluereflows.rs +++ b/src/net/offline_fluereflows.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fs, path::Path, time::Instant}; +use std::{collections::{HashMap, BTreeMap}, fs, path::Path, time::Instant}; use crate::{ net::{ @@ -34,7 +34,6 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { }; let start = Instant::now(); - // let file_path = cur_time_file(csv_file.as_str(), file_dir, ".csv"); let file_noext = format!( "{}_converted.csv", Path::new(&file_name) @@ -45,10 +44,9 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { let output_file_path = format!("{}/{}", file_dir, file_noext); let file = fs::File::create(output_file_path.clone()).unwrap(); - //let mut wtr = csv::Writer::from_writer(file); - let mut records: Vec = Vec::new(); let mut active_flow: HashMap = HashMap::new(); + let mut flow_expirations: BTreeMap> = BTreeMap::new(); info!("Converting file: {}", file_name); @@ -56,6 +54,7 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { while let Ok(packet) = cap.next_packet() { trace!("Parsing packet"); + let (mut key_value, mut reverse_key) = match parse_keys(packet.clone()) { Ok(keys) => keys, Err(_) => continue, @@ -73,24 +72,41 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { continue; } }; + + // Define `packet_time` before any usage + let packet_time = parse_microseconds( + packet.header.ts.tv_sec as u64, + packet.header.ts.tv_usec as u64, + ); + let flags = TcpFlags::new(raw_flags); - //pushing packet in to active_flows if it is not present + // Pushing packet into active_flows if it is not present let is_reverse = match active_flow.get(&key_value) { None => match active_flow.get(&reverse_key) { None => { - // if the protocol is TCP, check if is a syn packet + // If the protocol is TCP, check if it's a SYN packet if flowdata.prot == 6 { if flags.syn > 0 { + let expiration_time = packet_time + (flow_timeout * 1_000); // Convert milliseconds to microseconds + flow_expirations + .entry(expiration_time) + .or_insert_with(Vec::new) + .push(key_value.clone()); active_flow.insert(key_value, flowdata); - trace!("flow established"); + trace!("Flow established"); } else { continue; } } else { + let expiration_time = packet_time + (flow_timeout * 1_000); // Convert milliseconds to microseconds + flow_expirations + .entry(expiration_time) + .or_insert_with(Vec::new) + .push(key_value.clone()); active_flow.insert(key_value, flowdata); - trace!("flow established"); + trace!("Flow established"); } false @@ -100,14 +116,9 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { Some(_) => false, }; - let time = parse_microseconds( - packet.header.ts.tv_sec as u64, - packet.header.ts.tv_usec as u64, - ); let pkt = flowdata.min_pkt; let ttl = flowdata.min_ttl; - //println!("active flows: {:?}", active_flow.len()); - //println!("current inputed flow{:?}", active_flow.get(&key_value).unwrap()); + let flow_key = if is_reverse { &reverse_key } else { &key_value }; if let Some(flow) = active_flow.get_mut(flow_key) { let update_key = UDFlowKey { @@ -115,7 +126,7 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { pkt, ttl, flags, - time, + time: packet_time, // Use the correct variable here }; update_flow(flow, is_reverse, update_key); @@ -126,27 +137,29 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { ); if flags.fin == 1 || flags.rst == 1 { - trace!("flow finished"); - trace!("flow data: {:?}", flow); + trace!("Flow finished"); + trace!("Flow data: {:?}", flow); records.push(*flow); active_flow.remove(flow_key); } } - // Before processing a new packet, check for and handle expired flows - let mut expired_flows = Vec::new(); - for (key, flow) in active_flow.iter() { - if flow_timeout > 0 && time > (flow.last + (flow_timeout * 1000)) { - // Assuming flow.last is in microseconds - trace!("flow expired"); - trace!("flow data: {:?}", flow); - records.push(*flow); - expired_flows.push(*key); + // Remove expired flows before processing the next packet + let current_time = packet_time; + let expired_times: Vec = flow_expirations + .range(..=current_time) + .map(|(&time, _)| time) + .collect(); + + for expiration_time in expired_times { + if let Some(keys) = flow_expirations.remove(&expiration_time) { + for key in keys { + if let Some(flow) = active_flow.remove(&key) { + records.push(flow); + } + } } } - - // Remove expired flows from the active flows map - active_flow.retain(|key, _| !expired_flows.contains(key)); } bar.finish(); info!("Converted in {:?}", start.elapsed()); @@ -156,6 +169,7 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { for (_key, flow) in active_flow.clone().iter() { records.push(*flow); } + let tasks = task::spawn(async { fluere_exporter(records, file).await; }); @@ -163,7 +177,7 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { let result = tasks.await; info!("Export {} result: {:?}", output_file_path, result); - info!("Active flow {:?}", ac_flow_cnt); - info!("Ended flow {:?}", ended_flow_cnt); + info!("Active flows: {:?}", ac_flow_cnt); + info!("Ended flows: {:?}", ended_flow_cnt); Ok(()) -} +} \ No newline at end of file