From 7d5c4c9a0577256e130e19771e6b811b22958447 Mon Sep 17 00:00:00 2001 From: Sameer Srivastava Date: Sun, 18 Feb 2024 23:12:11 +0100 Subject: [PATCH] Implementation for text_grep - Issue (Searcher (rustcore) #1715) Working code for text_grep - Issue Searcher (rustcore) #1715 --- application/apps/indexer/Cargo.lock | 196 +++++++++++------- application/apps/text_grep/.gitignore | 3 + application/apps/text_grep/Cargo.toml | 17 ++ application/apps/text_grep/src/lib.rs | 193 +++++++++++++++++ .../apps/text_grep/src/wc_1_ahocorasick.rs | 165 +++++++++++++++ application/apps/text_grep/src/wc_reviewed.rs | 154 ++++++++++++++ 6 files changed, 652 insertions(+), 76 deletions(-) create mode 100644 application/apps/text_grep/.gitignore create mode 100644 application/apps/text_grep/Cargo.toml create mode 100644 application/apps/text_grep/src/lib.rs create mode 100644 application/apps/text_grep/src/wc_1_ahocorasick.rs create mode 100644 application/apps/text_grep/src/wc_reviewed.rs diff --git a/application/apps/indexer/Cargo.lock b/application/apps/indexer/Cargo.lock index 12224d2c7..864ffe954 100644 --- a/application/apps/indexer/Cargo.lock +++ b/application/apps/indexer/Cargo.lock @@ -65,15 +65,6 @@ dependencies = [ "zerocopy", ] -[[package]] -name = "aho-corasick" -version = "0.7.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" -dependencies = [ - "memchr", -] - [[package]] name = "aho-corasick" version = "1.1.2" @@ -219,9 +210,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.0.2" +version = "2.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "487f1e0fcbe47deb8b0574e646def1c903389d95241dd1bbcc6ce4a715dfc0c1" +checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" [[package]] name = "blake3" @@ -761,12 +752,12 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f258a7194e7f7c2a7837a8913aeab7fd8c383457034fa20ce4dd3dcb813e8eb8" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -790,12 +781,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "1.9.0" +version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" -dependencies = [ - "instant", -] +checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" [[package]] name = "fd-lock" @@ -804,7 +792,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ae6b3d9530211fb3b12a95374b8b0823be812f53d09e18c5675c0146b09642" dependencies = [ "cfg-if", - "rustix", + "rustix 0.37.27", "windows-sys 0.48.0", ] @@ -928,40 +916,38 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" [[package]] name = "grep-matcher" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3902ca28f26945fe35cad349d776f163981d777fee382ccd6ef451126f51b319" +checksum = "47a3141a10a43acfedc7c98a60a834d7ba00dfe7bec9071cbfc19b55b292ac02" dependencies = [ "memchr", ] [[package]] name = "grep-regex" -version = "0.1.11" +version = "0.1.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "997598b41d53a37a2e3fc5300d5c11d825368c054420a9c65125b8fe1078463f" +checksum = "f748bb135ca835da5cbc67ca0e6955f968db9c5df74ca4f56b18e1ddbc68230d" dependencies = [ - "aho-corasick 0.7.20", "bstr", "grep-matcher", "log", - "regex", - "regex-syntax 0.6.29", - "thread_local", + "regex-automata", + "regex-syntax", ] [[package]] name = "grep-searcher" -version = "0.1.11" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5601c4b9f480f0c9ebb40b1f6cbf447b8a50c5369223937a6c5214368c58779f" +checksum = "ba536ae4f69bec62d8839584dd3153d3028ef31bb229f04e09fb5a9e5a193c54" dependencies = [ "bstr", - "bytecount", "encoding_rs", "encoding_rs_io", "grep-matcher", "log", + "memchr", "memmap2", ] @@ -1147,7 +1133,7 @@ checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" dependencies = [ "hermit-abi 0.3.3", "io-lifetimes", - "rustix", + "rustix 0.37.27", "windows-sys 0.48.0", ] @@ -1183,9 +1169,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.150" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "libredox" @@ -1193,9 +1179,9 @@ version = "0.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85c833ca1e66078851dba29046874e38f08b2c883700aa29a03ddd3b23814ee8" dependencies = [ - "bitflags 2.0.2", + "bitflags 2.4.2", "libc", - "redox_syscall 0.4.1", + "redox_syscall", ] [[package]] @@ -1224,6 +1210,12 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" +[[package]] +name = "linux-raw-sys" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" + [[package]] name = "lock_api" version = "0.4.11" @@ -1275,9 +1267,9 @@ checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "memmap2" -version = "0.5.10" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83faa42c0a078c393f6b29d5db232d8be22776a891f8f56e5284faee4a20b327" +checksum = "fe751422e4a8caa417e13c3ea66452215d7d63e19e604f4980461212f3ae1322" dependencies = [ "libc", ] @@ -1474,7 +1466,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall 0.4.1", + "redox_syscall", "smallvec", "windows-targets 0.48.5", ] @@ -1772,15 +1764,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "redox_syscall" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "redox_syscall" version = "0.4.1" @@ -1807,10 +1790,10 @@ version = "1.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" dependencies = [ - "aho-corasick 1.1.2", + "aho-corasick", "memchr", "regex-automata", - "regex-syntax 0.8.2", + "regex-syntax", ] [[package]] @@ -1819,17 +1802,11 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ - "aho-corasick 1.1.2", + "aho-corasick", "memchr", - "regex-syntax 0.8.2", + "regex-syntax", ] -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.2" @@ -1876,10 +1853,23 @@ dependencies = [ "errno", "io-lifetimes", "libc", - "linux-raw-sys", + "linux-raw-sys 0.3.8", "windows-sys 0.48.0", ] +[[package]] +name = "rustix" +version = "0.38.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +dependencies = [ + "bitflags 2.4.2", + "errno", + "libc", + "linux-raw-sys 0.4.13", + "windows-sys 0.52.0", +] + [[package]] name = "rustyline" version = "11.0.0" @@ -1975,7 +1965,7 @@ checksum = "c32634e2bd4311420caa504404a55fad2131292c485c97014cbed89a5899885f" dependencies = [ "CoreFoundation-sys", "IOKit-sys", - "bitflags 2.0.2", + "bitflags 1.3.2", "cfg-if", "libudev", "mach2", @@ -2208,16 +2198,14 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.6.0" +version = "3.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6" +checksum = "a365e8cd18e44762ef95d87f284f4b5cd04107fec2ff3052bd6a3e6069669e67" dependencies = [ - "autocfg", "cfg-if", "fastrand", - "redox_syscall 0.3.5", - "rustix", - "windows-sys 0.48.0", + "rustix 0.38.31", + "windows-sys 0.52.0", ] [[package]] @@ -2264,16 +2252,6 @@ dependencies = [ "syn 2.0.39", ] -[[package]] -name = "thread_local" -version = "1.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" -dependencies = [ - "cfg-if", - "once_cell", -] - [[package]] name = "tinytemplate" version = "1.2.1" @@ -2585,6 +2563,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-targets" version = "0.42.2" @@ -2615,6 +2602,21 @@ dependencies = [ "windows_x86_64_msvc 0.48.5", ] +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -2627,6 +2629,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.42.2" @@ -2639,6 +2647,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.42.2" @@ -2651,6 +2665,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.42.2" @@ -2663,6 +2683,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.42.2" @@ -2675,6 +2701,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -2687,6 +2719,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.42.2" @@ -2699,6 +2737,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "yansi" version = "0.5.1" diff --git a/application/apps/text_grep/.gitignore b/application/apps/text_grep/.gitignore new file mode 100644 index 000000000..ee289d4c5 --- /dev/null +++ b/application/apps/text_grep/.gitignore @@ -0,0 +1,3 @@ +.* +!.gitignore +target/ \ No newline at end of file diff --git a/application/apps/text_grep/Cargo.toml b/application/apps/text_grep/Cargo.toml new file mode 100644 index 000000000..c9273120c --- /dev/null +++ b/application/apps/text_grep/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "text_grep" +version = "0.1.0" +edition = "2021" + +[lib] +name = "text_grep" +crate-type = ["lib"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[dependencies] +tokio-util = "0.7.10" +tempfile = "3.10.0" +rayon = "1.8.1" +aho-corasick = "1.1.2" +grep-searcher = "0.1.13" +grep-regex = "0.1.12" \ No newline at end of file diff --git a/application/apps/text_grep/src/lib.rs b/application/apps/text_grep/src/lib.rs new file mode 100644 index 000000000..cf58b6111 --- /dev/null +++ b/application/apps/text_grep/src/lib.rs @@ -0,0 +1,193 @@ +use grep_regex::RegexMatcher; +use grep_searcher::sinks::UTF8; +use grep_searcher::Searcher; +use rayon::prelude::*; +use std::collections::HashMap; +use std::io::Read; +use std::path::{Path, PathBuf}; +use std::sync::mpsc::Sender; +use std::sync::{mpsc, Arc, Mutex}; +use std::thread; +use std::time::Instant; +use tokio_util::sync::CancellationToken; + +#[derive(Debug)] +pub struct SearchResult { + pub file_path: String, + pub pattern_counts: HashMap, + pub error_message: Option, +} + +pub struct TextGrep; + +impl Default for TextGrep { + fn default() -> Self { + TextGrep::new() + } +} + +impl TextGrep { + pub fn new() -> Self { + TextGrep + } + + pub async fn count_occurrences( + &self, + patterns: Vec<&str>, + file_paths: Vec<&str>, + chunk_size: usize, + cancel_token: CancellationToken, + ) -> Result, String> { + let mut results = Vec::new(); + let cancel_token_clone = cancel_token.clone(); + + let (sender, receiver) = mpsc::channel(); + let (error_sender, error_receiver) = mpsc::channel(); + + let patterns_arc: Vec<_> = patterns.iter().map(|&p| Arc::from(p)).collect(); + let file_paths_arc: Vec<_> = file_paths.iter().map(|&fp| Arc::from(fp)).collect(); + + let thread_handles: Vec<_> = file_paths_arc + .par_iter() + .map(|file_path| { + let patterns = patterns_arc.clone(); + let sender = sender.clone(); + let error_sender_clone = error_sender.clone(); + let cancel_token = cancel_token_clone.clone(); + let file_path = Arc::clone(file_path); + thread::spawn(move || { + if let Err(err) = + process_file(&file_path, &patterns, chunk_size, &cancel_token, &sender) + { + if error_sender_clone.send(err.to_string()).is_err() { + eprintln!("Error sending error message through channel"); + } + } + }) + }) + .collect(); + + for handle in thread_handles { + handle.join().unwrap(); + } + + while let Ok(err_msg) = error_receiver.try_recv() { + eprintln!("Error processing file: {:?}", err_msg); + results.push(SearchResult { + file_path: "".to_string(), + pattern_counts: HashMap::new(), + error_message: Some(err_msg.to_string()), + }); + } + + while let Ok(search_result) = receiver.try_recv() { + results.push(search_result?); + } + + Ok(results) + } +} + +fn process_file( + file_path: &Arc, + patterns: &[Arc], + chunk_size: usize, + cancel_token: &CancellationToken, + sender: &Sender>, +) -> Result<(), String> { + let file_path = PathBuf::from(&**file_path); + + if !is_text_file(&file_path) { + let error_msg = format!("File '{}' is not a text file", file_path.display()); + if sender.send(Err(error_msg.clone())).is_err() { + eprintln!("Error sending search result through channel"); + } + return Ok(()); + } + + let start_time = Instant::now(); + let pattern_counts = HashMap::new(); + + let mut file = std::fs::File::open(&file_path).map_err(|e| e.to_string())?; + let mut buffer = vec![0; chunk_size]; // define a buffer to read chunks of data + + // create matchers for each pattern and store them with their corresponding patterns in a hashmap + let mut matchers = HashMap::new(); + for pattern in patterns { + let pattern_string = pattern.as_ref().to_string(); + let matcher = RegexMatcher::new(&pattern_string).map_err(|e| e.to_string())?; + matchers.insert(pattern_string, matcher); + } + + let pattern_counts_mutex = Arc::new(Mutex::new(pattern_counts)); + + let mut threads = vec![]; + + loop { + if cancel_token.is_cancelled() { + return Err("Operation cancelled".to_string()); + } + let bytes_read = file.read(&mut buffer).map_err(|e| e.to_string())?; + if bytes_read == 0 { + break; // Reached EOF + } + + let matchers_clone = matchers.clone(); + let pattern_counts_mutex_clone = pattern_counts_mutex.clone(); + let buffer_clone = buffer.clone(); + + let thread_handle = thread::spawn(move || { + let mut local_pattern_counts = HashMap::new(); + + for (pattern, matcher) in &matchers_clone { + let mut total_count = 0; + let mut searcher = Searcher::new(); + + searcher + .search_reader( + matcher, + &buffer_clone[..bytes_read], + UTF8(|_, _| { + total_count += 1; + Ok(true) + }), + ) + .map_err(|e| e.to_string()) + .unwrap(); + + local_pattern_counts.insert(pattern.clone(), total_count); + } + + let mut pattern_counts_mutex_guard = pattern_counts_mutex_clone.lock().unwrap(); + for (pattern, count) in local_pattern_counts { + *pattern_counts_mutex_guard.entry(pattern).or_insert(0) += count; + } + }); + + threads.push(thread_handle); + } + + for thread in threads { + thread.join().unwrap(); + } + + let end_time = start_time.elapsed(); + eprintln!("Time taken {:?}", end_time); + + let pattern_counts_mutex_guard = pattern_counts_mutex.lock().unwrap(); + let aggregated_pattern_counts = pattern_counts_mutex_guard.clone(); + + sender + .send(Ok(SearchResult { + file_path: file_path.to_string_lossy().into_owned(), + pattern_counts: aggregated_pattern_counts, + error_message: None, + })) + .map_err(|e| e.to_string())?; + + Ok(()) +} + +fn is_text_file(_file_path: &Path) -> bool { + true +} diff --git a/application/apps/text_grep/src/wc_1_ahocorasick.rs b/application/apps/text_grep/src/wc_1_ahocorasick.rs new file mode 100644 index 000000000..c012340c8 --- /dev/null +++ b/application/apps/text_grep/src/wc_1_ahocorasick.rs @@ -0,0 +1,165 @@ +use std::sync::{mpsc, Arc}; +use std::thread; +use tokio_util::sync::CancellationToken; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::fs::File; +use std::io::{BufReader, Read}; +use std::time::Instant; + +#[derive(Debug)] +pub struct SearchResult { + pub file_path: String, + pub pattern_counts: HashMap, + pub error_message: Option, // Include an optional error message field +} + +pub struct TextGrep; + +impl TextGrep { + pub fn new() -> Self { + TextGrep + } + + pub async fn count_occurrences( + &self, + patterns: Vec<&str>, + file_paths: Vec<&str>, + chunk_size: usize, + cancel_token: CancellationToken, + ) -> Result, String> { + let mut results = Vec::new(); + + let cancel_token_clone = cancel_token.clone(); + + // Create a channel for sending search results from threads to the main thread + let (sender, receiver) = mpsc::channel(); + let (error_sender, error_receiver) = mpsc::channel(); + + + let patterns_arc: Vec<_> = patterns.iter().map(|&p| Arc::from(p)).collect(); + let file_paths_arc: Vec<_> = file_paths.iter().map(|&fp| Arc::from(fp)).collect(); + + // spawn a thread for each file to read it concurrently + let thread_handles: Vec<_> = file_paths_arc + .iter() + .map(|file_path| { + let patterns = patterns_arc.clone(); + let sender = sender.clone(); + let error_sender_clone = error_sender.clone(); + let cancel_token = cancel_token_clone.clone(); + let file_path = Arc::clone(file_path); + thread::spawn(move || { + if let Err(err) = process_file(&file_path, &patterns, chunk_size, &cancel_token, &sender) { + if error_sender_clone.send(err.to_string()).is_err() { + eprintln!("Error sending error message through channel"); + } + } + }) + }) + .collect(); + + eprintln!("Reached till before join of handles"); + // Wait for all threads to finish + for handle in thread_handles { + handle.join().unwrap(); + } + + eprintln!("Reached till after join of handles"); + + while let Ok(err_msg) = error_receiver.try_recv() { + eprintln!("Error processing file: {:?}", err_msg); + // Push a SearchResult with the error message + results.push(SearchResult { + file_path: "".to_string(), + pattern_counts: HashMap::new(), + error_message: Some(err_msg.to_string()), + }); + } + + // Loop to receive search results + while let Ok(search_result) = receiver.try_recv() { + results.push(search_result?); + } + + Ok(results) + } +} + +fn process_file( + file_path: &Arc, + patterns: &[Arc], + chunk_size: usize, + cancel_token: &CancellationToken, + sender: &mpsc::Sender>, +) -> Result<(), String> { + let file_path = PathBuf::from(&**file_path); + if !is_text_file(&file_path) { + let error_msg = format!("File '{}' is not a text file", file_path.display()); + if sender.send(Err(error_msg.clone())).is_err() { + eprintln!("Error sending search result through channel"); + } + return Ok(()); + } + + let mut pattern_counts = HashMap::new(); + let start_time = Instant::now(); + eprintln!("Trying to process file {:?}", file_path); + // Preparing searchers for all patterns + let searchers: Vec<_> = patterns + .iter() + .map(|pattern| { + aho_corasick::AhoCorasickBuilder::new() + .match_kind(aho_corasick::MatchKind::Standard) + .ascii_case_insensitive(true) + .build(vec![&**pattern]) + }) + .collect::>() + .map_err(|e| e.to_string())?; + + eprintln!("Trying to Open file {:?}", file_path); + let file = File::open(&file_path).map_err(|e| e.to_string())?; + let mut reader = BufReader::new(file); + let mut buffer = vec![0; chunk_size]; + + loop { + if cancel_token.is_cancelled() { + return Err("Operation cancelled".to_string()); + } + + // eprintln!("Trying to Read file {:?}", file_path); + let bytes_read = reader.read(&mut buffer).map_err(|e| e.to_string())?; + + // eprintln!("After reading chunk in file {:?}", file_path); + if bytes_read == 0 { + break; + } + + let content = String::from_utf8_lossy(&buffer[..bytes_read]); + for (pattern_index, searcher) in searchers.iter().enumerate() { + let mut count = 0; + for _mat in searcher.find_iter(&*content) { + count += 1; + } + pattern_counts.insert((*patterns[pattern_index]).to_string(), count); + // eprintln!("After reading chunk in file {:?}, count is {} ", file_path, count); + } + } + + let end_time = start_time.elapsed(); + eprintln!("Time taken {:?}", end_time); + + sender.send(Ok(SearchResult { + file_path: file_path.to_string_lossy().into_owned(), + pattern_counts, + error_message: None, // No error message for successful result + })).map_err(|e| e.to_string())?; + + eprintln!("Finally outside the loop 2"); + + Ok(()) +} + +fn is_text_file(_file_path: &Path) -> bool { + true +} \ No newline at end of file diff --git a/application/apps/text_grep/src/wc_reviewed.rs b/application/apps/text_grep/src/wc_reviewed.rs new file mode 100644 index 000000000..c6311cf34 --- /dev/null +++ b/application/apps/text_grep/src/wc_reviewed.rs @@ -0,0 +1,154 @@ +//Code reviewed on Friday with Dmitry. +use std::io::Read; +use std::sync::{Arc, mpsc}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::thread; +use grep_regex::RegexMatcher; +use grep_searcher::Searcher; +use grep_searcher::sinks::UTF8; +use std::sync::mpsc::Sender; +use std::time::Instant; +use tokio_util::sync::CancellationToken; + + +#[derive(Debug)] +pub struct SearchResult { + pub file_path: String, + pub pattern_counts: HashMap, + pub error_message: Option, +} + +pub struct TextGrep; + +impl TextGrep { + pub fn new() -> Self { + TextGrep + } + + pub async fn count_occurrences( + &self, + patterns: Vec<&str>, + file_paths: Vec<&str>, + chunk_size: usize, + cancel_token: CancellationToken, + ) -> Result, String> { + let mut results = Vec::new(); + let cancel_token_clone = cancel_token.clone(); + + let (sender, receiver) = mpsc::channel(); + let (error_sender, error_receiver) = mpsc::channel(); + + let patterns_arc: Vec<_> = patterns.iter().map(|&p| Arc::from(p)).collect(); + let file_paths_arc: Vec<_> = file_paths.iter().map(|&fp| Arc::from(fp)).collect(); + + let thread_handles: Vec<_> = file_paths_arc + .iter() + .map(|file_path| { + let patterns = patterns_arc.clone(); + let sender = sender.clone(); + let error_sender_clone = error_sender.clone(); + let cancel_token = cancel_token_clone.clone(); + let file_path = Arc::clone(file_path); + thread::spawn(move || { + if let Err(err) = process_file(&file_path, &patterns, chunk_size, &cancel_token, &sender) { + if error_sender_clone.send(err.to_string()).is_err() { + eprintln!("Error sending error message through channel"); + } + } + }) + }) + .collect(); + + for handle in thread_handles { + handle.join().unwrap(); + } + + while let Ok(err_msg) = error_receiver.try_recv() { + eprintln!("Error processing file: {:?}", err_msg); + results.push(SearchResult { + file_path: "".to_string(), + pattern_counts: HashMap::new(), + error_message: Some(err_msg.to_string()), + }); + } + + while let Ok(search_result) = receiver.try_recv() { + results.push(search_result?); + } + + Ok(results) + } +} + +fn process_file( + file_path: &Arc, + patterns: &[Arc], + chunk_size: usize, + cancel_token: &CancellationToken, + sender: &Sender>, +) -> Result<(), String> { + let file_path = PathBuf::from(&**file_path); + + if !is_text_file(&file_path) { + let error_msg = format!("File '{}' is not a text file", file_path.display()); + if sender.send(Err(error_msg.clone())).is_err() { + eprintln!("Error sending search result through channel"); + } + return Ok(()); + } + + let start_time = Instant::now(); + let mut pattern_counts = HashMap::new(); + + let mut file = std::fs::File::open(&file_path).map_err(|e| e.to_string())?; + let mut buffer = vec![0; chunk_size]; + + // matchers for each pattern and store them with their corresponding patterns in a hashmap + let mut matchers = HashMap::new(); + for pattern in patterns { + let pattern_string = pattern.as_ref().to_string(); + let matcher = RegexMatcher::new(&pattern_string).map_err(|e| e.to_string())?; + matchers.insert(pattern_string, matcher); + } + + loop { + if cancel_token.is_cancelled() { + return Err("Operation cancelled".to_string()); + } + let bytes_read = file.read(&mut buffer).map_err(|e| e.to_string())?; + if bytes_read == 0 { + break; // Reached EOF + } + + // process each pattern for this chunk of text + for (pattern, matcher) in &matchers { + let mut total_count = 0; + let mut searcher = Searcher::new(); + // let reader = text.as_bytes(); + searcher.search_reader( + matcher, + &buffer[..bytes_read], + UTF8(|_, _| { + total_count += 1; + Ok(true) + }), + ).map_err(|e| e.to_string())?; + *pattern_counts.entry(pattern.clone()).or_insert(0) += total_count; + } + } + + let end_time = start_time.elapsed(); + eprintln!("Time taken {:?}", end_time); + sender.send(Ok(SearchResult { + file_path: file_path.to_string_lossy().into_owned(), + pattern_counts, + error_message: None, + })).map_err(|e| e.to_string())?; + + Ok(()) +} + +fn is_text_file(_file_path: &Path) -> bool { + true +} \ No newline at end of file