Skip to content

Commit

Permalink
use rayon in sort subcli
Browse files Browse the repository at this point in the history
  • Loading branch information
sharkLoc committed Dec 19, 2024
1 parent 9134937 commit 93980e9
Show file tree
Hide file tree
Showing 9 changed files with 669 additions and 632 deletions.
1,234 changes: 628 additions & 606 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ name = "fqkit"
version = "0.4.11"
edition = "2021"
authors = ["sharkLoc <[email protected]>"]
rust-version = "1.77.2"
rust-version = "1.83.0"
homepage = "https://github.com/sharkLoc/fqkit"
repository = "https://github.com/sharkLoc/fqkit"
categories = ["science"]
Expand Down Expand Up @@ -36,9 +36,11 @@ flate2 = "1.0.24"
log = "0.4.20"
lowcharts = "0.5.8"
nthash = "0.5.1"
num_cpus = "1.16.0"
plotters = "0.3.4"
rand = "0.8.5"
rand_pcg = "0.3.1"
rayon = "1.10.0"
regex = "1.9.5"
rgb = "0.8.36"
term_size = "0.3.2"
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ Commands:
help Print this message or the help of the given subcommand(s)

Global Arguments:
-@, --threads <INT> threads number [default: 1]
--compress-level <INT> set gzip/bzip2/xz compression level 1 (compress faster) - 9 (compress better) for gzip/bzip2/xz output file, just work with option -o/--out [default: 6]
--output-type <u|g|b|x> output type for stdout: 'g' gzip; 'b' bzip2; 'x' xz; 'u' uncompressed txt format [default: u
--log <FILE> if file name specified, write log message to this file, or write to stderr
Expand Down
6 changes: 3 additions & 3 deletions src/cli/filter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{error::FqkitError, utils::*};
use anyhow::{Ok, Result};
use bio::io::fastq;
use crossbeam::channel::unbounded;
use crossbeam::channel::bounded;
use log::*;

#[allow(clippy::too_many_arguments)]
Expand Down Expand Up @@ -112,7 +112,7 @@ pub fn filter_fastq(
chunk = 5000;
}

let (tx, rx) = unbounded();
let (tx, rx) = bounded(5000);
let mut fq_iter1 = fq_reader1.records();
let mut fq_iter2 = fq_reader2.records();
loop {
Expand All @@ -130,7 +130,7 @@ pub fn filter_fastq(
drop(tx);

crossbeam::scope(|s| {
let (tx2, rx2) = unbounded();
let (tx2, rx2) = bounded(5000);
let _handles: Vec<_> = (0..ncpu)
.map(|_| {
let tx_tmp = tx2.clone();
Expand Down
7 changes: 4 additions & 3 deletions src/cli/size.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::utils::*;
use anyhow::Result;
use bio::io::fastq;
use crossbeam::channel::unbounded;
//use crossbeam::channel::unbounded;
use crossbeam::channel::bounded;
use log::*;

#[derive(Clone, Copy)]
Expand Down Expand Up @@ -79,7 +80,7 @@ pub fn size_fastq(
}
bases = base.a + base.t + base.g + base.c + base.n;
} else {
let (tx, rx) = unbounded();
let (tx, rx) = bounded(5000);//unbounded();
let mut fqiter = fq_reader.records();
loop {
let chunk: Vec<_> = fqiter.by_ref().take(chunk).map_while(Result::ok).collect();
Expand All @@ -91,7 +92,7 @@ pub fn size_fastq(
drop(tx);

crossbeam::scope(|s| {
let (tx2, rx2) = unbounded();
let (tx2, rx2) = bounded(5000); //unbounded();
let _handles: Vec<_> = (0..ncpu)
.map(|_| {
let rx_tmp = rx.clone();
Expand Down
23 changes: 12 additions & 11 deletions src/cli/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::utils::*;
use anyhow::{Error, Ok};
use bio::io::fastq;
use log::*;
use rayon::prelude::*;

#[allow(clippy::too_many_arguments)]
pub fn sort_fastq(
Expand All @@ -13,7 +14,7 @@ pub fn sort_fastq(
reverse: bool,
out: Option<&String>,
compression_level: u32,
stdout_type: char,
stdout_type: char
) -> Result<(), Error> {

let mut n = 0;
Expand Down Expand Up @@ -57,28 +58,28 @@ pub fn sort_fastq(
if sort_by_name {
info!("sort read by name");
if reverse {
vec_reads.sort_by(|a, b| {
vec_reads.par_sort_by(|a, b| {
let read_name1 = if let Some(des) = a.desc() {
format!("{} {}", a.id(), des)
} else {
a.id().to_string()
};
let read_name2 = if let Some(des) = b.desc() {
format!("{} {}", a.id(), des)
format!("{} {}", b.id(), des)
} else {
b.id().to_string()
};
read_name2.cmp(&read_name1)
});
} else {
vec_reads.sort_by(|a, b| {
vec_reads.par_sort_by(|a, b| {
let read_name1 = if let Some(des) = a.desc() {
format!("{} {}", a.id(), des)
} else {
a.id().to_string()
};
let read_name2 = if let Some(des) = b.desc() {
format!("{} {}", a.id(), des)
format!("{} {}", b.id(), des)
} else {
b.id().to_string()
};
Expand All @@ -88,23 +89,23 @@ pub fn sort_fastq(
} else if sort_by_seq {
info!("sort read by sequence");
if reverse {
vec_reads.sort_by(|a, b| b.seq().cmp(a.seq()));
vec_reads.par_sort_by(|a, b| b.seq().cmp(a.seq()));
} else {
vec_reads.sort_by(|a, b| a.seq().cmp(b.seq()));
vec_reads.par_sort_by(|a, b| a.seq().cmp(b.seq()));
}
} else if sort_by_length {
info!("sort read by length");
if reverse {
//vec_reads.sort_by(|a, b| b.seq().len().cmp(&a.seq().len()));
vec_reads.sort_by_key(|b| std::cmp::Reverse(b.seq().len()))
vec_reads.par_sort_by_key(|b| std::cmp::Reverse(b.seq().len()))
} else {
//vec_reads.sort_by(|a, b| a.seq().len().cmp(&b.seq().len()));
vec_reads.sort_by_key(|a| a.seq().len())
vec_reads.par_sort_by_key(|a| a.seq().len())
}
} else if sort_by_gc {
info!("sort read by gc content");
if reverse {
vec_reads.sort_by(|a, b| {
vec_reads.par_sort_by(|a, b| {
let r1_gc = a
.seq()
.iter()
Expand All @@ -120,7 +121,7 @@ pub fn sort_fastq(
r2_gc.partial_cmp(&r1_gc).unwrap()
});
} else {
vec_reads.sort_by(|a, b| {
vec_reads.par_sort_by(|a, b| {
let r1_gc = a
.seq()
.iter()
Expand Down
7 changes: 5 additions & 2 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ pub struct Args {
#[clap(subcommand)]
pub command: Subcli,

/// threads number
#[arg(short = '@', long = "threads", default_value_t = 1, global = true, value_name = "INT", help_heading = Some("Global Arguments"))]
pub threads: usize,

/// set gzip/bzip2/xz compression level 1 (compress faster) - 9 (compress better) for gzip/bzip2/xz output file, just work with option -o/--out
#[arg(long = "compress-level", default_value_t = 6, global = true,
value_parser = value_parser!(u32).range(1..=9), value_name = "INT",
help_heading = Some("Global Arguments")
value_parser = value_parser!(u32).range(1..=9), value_name = "INT", help_heading = Some("Global Arguments")
)]
pub compression_level: u32,

Expand Down
15 changes: 9 additions & 6 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ use thiserror::Error;

#[derive(Debug, Error)]
pub enum FqkitError {
#[error("stdin not detected")]
#[error("Stdin not detected")]
StdinNotDetected,

#[error("failed to open file: {0}")]
#[error("Failed to open file: {0}")]
IoError(#[from] io::Error),

#[error("invalid output dir: {0}")]
#[error("Invalid output dir: {0}")]
InvalidOutputDir(String),

#[error("empty file: {0}")]
#[error("ThreadPoolBuildError error")]
ThreadPoolBuildError(#[from] rayon::ThreadPoolBuildError),

#[error("Empty file: {0}")]
EmptyFile(String),

#[error("invalid phred value")]
#[error("Invalid phred value")]
InvalidPhredValue,

#[error("invalid figure types")]
#[error("Invalid figure types")]
InvalidFigureType,
}
4 changes: 4 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ fn main() -> Result<(), Error> {
logger(arg.verbose, arg.logfile, arg.quiet)?;
let start = Instant::now();
info!("version: {}", VERSION);

let cpus = num_cpus::get();
info!("cpu numbers: {}", cpus);
rayon::ThreadPoolBuilder::new().num_threads(arg.threads).build_global()?;

match arg.command {
Subcli::topn { input, num, out } => {
Expand Down

0 comments on commit 93980e9

Please sign in to comment.