Skip to content

Commit

Permalink
refactor: improve expired flow detection
Browse files Browse the repository at this point in the history
  • Loading branch information
SkuldNorniern committed Oct 4, 2024
1 parent 72d511c commit c873d99
Showing 1 changed file with 46 additions and 32 deletions.
78 changes: 46 additions & 32 deletions src/net/offline_fluereflows.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, fs, path::Path, time::Instant};
use std::{collections::{HashMap, BTreeMap}, fs, path::Path, time::Instant};

use crate::{
net::{
Expand Down Expand Up @@ -34,7 +34,6 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
};

let start = Instant::now();
// let file_path = cur_time_file(csv_file.as_str(), file_dir, ".csv");
let file_noext = format!(
"{}_converted.csv",
Path::new(&file_name)
Expand All @@ -45,17 +44,17 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
let output_file_path = format!("{}/{}", file_dir, file_noext);
let file = fs::File::create(output_file_path.clone()).unwrap();

//let mut wtr = csv::Writer::from_writer(file);

let mut records: Vec<FluereRecord> = Vec::new();
let mut active_flow: HashMap<Key, FluereRecord> = HashMap::new();
let mut flow_expirations: BTreeMap<u64, Vec<Key>> = BTreeMap::new();

info!("Converting file: {}", file_name);

let bar = ProgressBar::new_spinner();

while let Ok(packet) = cap.next_packet() {
trace!("Parsing packet");

let (mut key_value, mut reverse_key) = match parse_keys(packet.clone()) {
Ok(keys) => keys,
Err(_) => continue,
Expand All @@ -73,24 +72,41 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
continue;
}
};

// Define `packet_time` before any usage
let packet_time = parse_microseconds(
packet.header.ts.tv_sec as u64,
packet.header.ts.tv_usec as u64,
);

let flags = TcpFlags::new(raw_flags);
//pushing packet in to active_flows if it is not present
// Pushing packet into active_flows if it is not present
let is_reverse = match active_flow.get(&key_value) {
None => match active_flow.get(&reverse_key) {
None => {
// if the protocol is TCP, check if is a syn packet
// If the protocol is TCP, check if it's a SYN packet
if flowdata.prot == 6 {
if flags.syn > 0 {
let expiration_time = packet_time + (flow_timeout * 1_000); // Convert milliseconds to microseconds
flow_expirations
.entry(expiration_time)
.or_insert_with(Vec::new)
.push(key_value.clone());
active_flow.insert(key_value, flowdata);

trace!("flow established");
trace!("Flow established");
} else {
continue;
}
} else {
let expiration_time = packet_time + (flow_timeout * 1_000); // Convert milliseconds to microseconds
flow_expirations
.entry(expiration_time)
.or_insert_with(Vec::new)
.push(key_value.clone());
active_flow.insert(key_value, flowdata);

trace!("flow established");
trace!("Flow established");
}

false
Expand All @@ -100,22 +116,17 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
Some(_) => false,
};

let time = parse_microseconds(
packet.header.ts.tv_sec as u64,
packet.header.ts.tv_usec as u64,
);
let pkt = flowdata.min_pkt;
let ttl = flowdata.min_ttl;
//println!("active flows: {:?}", active_flow.len());
//println!("current inputed flow{:?}", active_flow.get(&key_value).unwrap());

let flow_key = if is_reverse { &reverse_key } else { &key_value };
if let Some(flow) = active_flow.get_mut(flow_key) {
let update_key = UDFlowKey {
doctets,
pkt,
ttl,
flags,
time,
time: packet_time, // Use the correct variable here
};

update_flow(flow, is_reverse, update_key);
Expand All @@ -126,27 +137,29 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
);

if flags.fin == 1 || flags.rst == 1 {
trace!("flow finished");
trace!("flow data: {:?}", flow);
trace!("Flow finished");
trace!("Flow data: {:?}", flow);
records.push(*flow);
active_flow.remove(flow_key);
}
}

// Before processing a new packet, check for and handle expired flows
let mut expired_flows = Vec::new();
for (key, flow) in active_flow.iter() {
if flow_timeout > 0 && time > (flow.last + (flow_timeout * 1000)) {
// Assuming flow.last is in microseconds
trace!("flow expired");
trace!("flow data: {:?}", flow);
records.push(*flow);
expired_flows.push(*key);
// Remove expired flows before processing the next packet
let current_time = packet_time;
let expired_times: Vec<u64> = flow_expirations
.range(..=current_time)
.map(|(&time, _)| time)
.collect();

for expiration_time in expired_times {
if let Some(keys) = flow_expirations.remove(&expiration_time) {
for key in keys {
if let Some(flow) = active_flow.remove(&key) {
records.push(flow);
}
}
}
}

// Remove expired flows from the active flows map
active_flow.retain(|key, _| !expired_flows.contains(key));
}
bar.finish();
info!("Converted in {:?}", start.elapsed());
Expand All @@ -156,14 +169,15 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
for (_key, flow) in active_flow.clone().iter() {
records.push(*flow);
}

let tasks = task::spawn(async {
fluere_exporter(records, file).await;
});

let result = tasks.await;
info!("Export {} result: {:?}", output_file_path, result);

info!("Active flow {:?}", ac_flow_cnt);
info!("Ended flow {:?}", ended_flow_cnt);
info!("Active flows: {:?}", ac_flow_cnt);
info!("Ended flows: {:?}", ended_flow_cnt);
Ok(())
}
}

0 comments on commit c873d99

Please sign in to comment.