Skip to content

more efficient owned records, and use a fork of flate2 that supports mutli-part gzip #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ keywords = ["bio", "fastq", "parser"]
[dependencies]
memchr = ">=0.1"
lz4 = "^1.20"
flate2 = { version = "^0.2", features = ["zlib"], default-features = false }
flate2 = { version = "^0.2.18", features = ["zlib"], default-features = false }

[dev-dependencies]
parasailors = "^0.3"
bio = "^0.10"
13 changes: 7 additions & 6 deletions examples/alignment_count.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use fastq::{parse_path, Record};
use std::env::args;
use parasailors as align;
use bio::alignment::pairwise::*;

extern crate fastq;
extern crate parasailors;
extern crate bio;

const ADAPTER: &'static [u8] = b"AATGATACGGCGACCACCGAGA\
TCTACACTCTTTCCCTACACGA\
Expand All @@ -19,14 +19,15 @@ fn main() {
let results = parse_path(path, |parser| {
let nthreads = 2;
let results: Vec<usize> = parser.parallel_each(nthreads, |record_sets| {
let matrix = align::Matrix::new(align::MatrixType::Identity);
let profile = align::Profile::new(ADAPTER, &matrix);

let score = |a: u8, b: u8| if a == b {1i32} else {-1i32};
let mut aligner = Aligner::new(-5, -1, &score);

let mut thread_total = 0;

for record_set in record_sets {
for record in record_set.iter() {
let score = align::local_alignment_score(
&profile, record.seq(), 8, 1);
let score = aligner.semiglobal(ADAPTER, record.seq()).score;
if score > 10 {
thread_total += 1;
}
Expand Down
5 changes: 5 additions & 0 deletions src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ impl Buffer {
let new_end = (n_in_buffer + 15) & !0x0f; // make sure next read is aligned
let new_start = new_end.checked_sub(n_in_buffer).unwrap();

// Never move the data forward in the buffer
if new_start >= self.start {
return 0;
}

let dest = self.data[new_start..].as_mut_ptr();
let src = self.data[self.start..].as_ptr();

Expand Down
119 changes: 78 additions & 41 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@
//! ```rust,no_run
//! use fastq::{parse_path, Record};
//! use std::env::args;
//! use parasailors as align;
//! use bio::alignment::pairwise::*;
//!
//! extern crate fastq;
//! extern crate parasailors;
//! extern crate bio;
//!
//! fn main() {
//! let filename = args().nth(1);
Expand All @@ -78,13 +78,16 @@
//! let results: Vec<usize> = parser.parallel_each(nthreads, |record_sets| {
//! // we can initialize thread local variables here.
//! let adapter = b"AATGATACGGCGACCACCGAGATCTACACTCTTTCCCTACACGACGCTCTTCCGATCT";
//! let matrix = align::Matrix::new(align::MatrixType::Identity);
//! let profile = align::Profile::new(adapter, &matrix);
//!
//! let score = |a: u8, b: u8| if a == b {1i32} else {-1i32};
//! let mut aligner = Aligner::new(-5, -1, &score);
//!
//! let mut thread_total = 0;
//!
//! for record_set in record_sets {
//! for record in record_set.iter() {
//! let score = align::local_alignment_score(&profile, record.seq(), 5, 1);
//! let alignment = aligner.semiglobal(adapter, record.seq());
//! let score = alignment.score;
//! if score > 8 {
//! thread_total += 1;
//! }
Expand Down Expand Up @@ -112,11 +115,9 @@ use std::sync::Arc;
use std::iter::FromIterator;
use std::path::Path;
use memchr::memchr;
use lz4::Decoder;
use flate2::read::GzDecoder;
use flate2::read::MultiGzDecoder;

extern crate memchr;
extern crate lz4;
extern crate flate2;

mod thread_reader;
Expand Down Expand Up @@ -170,10 +171,12 @@ pub struct RefRecord<'a> {
/// A fastq record that ownes its data arrays.
#[derive(Debug)]
pub struct OwnedRecord {
pub head: Vec<u8>,
pub seq: Vec<u8>,
pub sep: Option<Vec<u8>>,
pub qual: Vec<u8>,
// (start, stop), but might include \r at the end
head: usize,
seq: usize,
sep: usize,
qual: usize,
data: Vec<u8>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -223,34 +226,26 @@ impl<'a> Record for RefRecord<'a> {


impl Record for OwnedRecord {
#[inline]
fn head(&self) -> &[u8] {
// skip the '@' at the beginning
&self.head
trim_winline(&self.data[1 .. self.head])
}

#[inline]
fn seq(&self) -> &[u8] {
&self.seq
trim_winline(&self.data[self.head + 1 .. self.seq])
}

#[inline]
fn qual(&self) -> &[u8] {
&self.qual
trim_winline(&self.data[self.sep + 1 .. self.qual])
}

#[inline]
fn write<W: Write>(&self, writer: &mut W) -> Result<usize> {
let mut written = 0;
written += writer.write(b"@")?;
written += writer.write(self.head())?;
written += writer.write(b"\n")?;
written += writer.write(self.seq())?;
written += writer.write(b"\n")?;
match self.sep {
Some(ref s) => { written += writer.write(s)? }
None => { written += writer.write(b"+")? }
}
written += writer.write(b"\n")?;
written += writer.write(self.qual())?;
written += writer.write(b"\n")?;
Ok(written)
writer.write_all(&self.data)?;
Ok(self.data.len())
}
}

Expand Down Expand Up @@ -294,10 +289,11 @@ impl<'a> RefRecord<'a> {
/// Copy the borrowed data array and return an owned record.
pub fn to_owned_record(&self) -> OwnedRecord {
OwnedRecord {
seq: self.seq().to_vec(),
qual: self.qual().to_vec(),
head: self.head().to_vec(),
sep: Some(trim_winline(&self.data[self.seq + 1..self.sep]).to_vec())
head: self.head,
seq: self.seq,
sep: self.sep,
qual: self.qual,
data: self.data.to_vec(),
}
}
}
Expand Down Expand Up @@ -429,16 +425,10 @@ pub fn parse_path<P, F, O>(path: Option<P>, func: F) -> Result<O>
let mut magic_bytes = [0u8; 4];
reader.read_exact(&mut magic_bytes)?;
let mut reader = Cursor::new(magic_bytes.to_vec()).chain(reader);
if unsafe { std::mem::transmute::<_, u32>(magic_bytes.clone()) }.to_le() == 0x184D2204 {
let bufsize = 1<<22;
let queuelen = 2;
return Ok(thread_reader(bufsize, queuelen, Decoder::new(reader)?, |reader| {
func(Parser::new(Box::new(reader)))
}).expect("lz4 reader thread paniced"))
} else if &magic_bytes[..2] == b"\x1f\x8b" {
if &magic_bytes[..2] == b"\x1f\x8b" {
let bufsize = 1<<22;
let queuelen = 2;
let reader = GzDecoder::new(reader)?;
let reader = MultiGzDecoder::new(reader)?;
return Ok(thread_reader(bufsize, queuelen, reader, |reader| {
func(Parser::new(Box::new(reader)))
}).expect("gzip reader thread paniced"))
Expand Down Expand Up @@ -527,6 +517,43 @@ impl RecordSet {
}
}

pub struct OwnedRecordIter<R: Read> {
record_set_iter: RecordSetIter<R>,
current_record_set: Option<RecordSet>,
pos: usize,
}

impl<R: Read> Iterator for OwnedRecordIter<R> {
type Item = OwnedRecord;

#[inline]
fn next(&mut self) -> Option<OwnedRecord> {

if self.current_record_set.is_none() || self.current_record_set.as_ref().unwrap().records.len() == self.pos {
match self.record_set_iter.next() {
Some(Ok(rs)) => {
self.current_record_set = Some(rs);
self.pos = 0;
},
_ => {
self.current_record_set = None;
return None;
}
}
}

match self.current_record_set {
Some(ref rs) => {
let ref idx_record = rs.records[self.pos];
let ref_rec = idx_record.to_ref_record(&rs.buffer);
self.pos += 1;
Some(ref_rec.to_owned_record())
},
_ => None
}
}
}


pub struct RecordSetItems<'a> {
idx_records: ::std::slice::Iter<'a, IdxRecord>,
Expand Down Expand Up @@ -628,6 +655,16 @@ impl<R: Read> Parser<R> {
}
}

///
pub fn owned_records(self) -> OwnedRecordIter<R> {
let rec_sets = self.record_sets();
OwnedRecordIter {
record_set_iter: rec_sets,
current_record_set: None,
pos: 0,
}
}

/// Apply a function to each record, but distribute the work on a number of threads.
///
/// # Parameters
Expand Down