Skip to content

Commit 97283ed

Browse files
authored
attempt parallelized demultiplexing (#200)
* start work on per-sample parallel demuxing * fix lambda * fix status variable names * rm dep code, swap fetch() to non-indexed version * parallelized over samples mode * rm need to index * add duplicate for testing purposes * better/nicer errors
1 parent ad259db commit 97283ed

13 files changed

+267
-140
lines changed

harpy/_validations.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ def validate_demuxschema(infile):
265265
try:
266266
sample, segment_id = line.rstrip().split()
267267
if not segment_pattern.match(segment_id):
268-
print_error("invalid segment format", f"Segment ID '{segment_id}' does not follow the expected format.")
268+
print_error("invalid segment format", f"Segment ID [green]{segment_id}[/green] does not follow the expected format.")
269269
print_solution("This haplotagging design expects segments to follow the format of letter [green bold]A-D[/green bold] followed by [bold]two digits[/bold], e.g. [green bold]C51[/green bold]). Check that your ID segments or formatted correctly and that you are attempting to demultiplex with a workflow appropriate for your data design.")
270270
sys.exit(1)
271271
code_letters.add(segment_id[0])
@@ -283,10 +283,15 @@ def validate_demuxschema(infile):
283283
# skip rows without two columns
284284
continue
285285
if not code_letters:
286-
print_error("incorrect schema format", f"Schema file {os.path.basename(infile)} has no valid rows. Rows should be sample<tab>segment, e.g. sample_01<tab>C75")
286+
print_error("incorrect schema format", f"Schema file [blue]{os.path.basename(infile)}[/blue] has no valid rows. Rows should be sample<tab>segment, e.g. sample_01<tab>C75")
287287
sys.exit(1)
288288
if len(code_letters) > 1:
289-
print("invalid schema", f"Schema file {os.path.basename(infile)} has sample IDs expected to be identified across multiple barcode segments. All sample IDs for this technology should be in a single segment, such as [bold green]C[/bold green] or [bold green]D[/bold green].")
289+
print_error("invalid schema", f"Schema file [blue]{os.path.basename(infile)}[/blue] has sample IDs occurring in different barcode segments.")
290+
print_solution_with_culprits(
291+
"All sample IDs for this barcode design should be in a single segment, such as [bold green]C[/bold green] or [bold green]D[/bold green]. Make sure the schema contains only one segment.",
292+
"The segments identified in the schema:"
293+
)
294+
click.echo(", ".join(code_letters))
290295
sys.exit(1)
291296

292297
def validate_regions(regioninput, genome):

harpy/bin/bx_stats.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def writestats(x, writechrom, destination):
5757
all_bx = set()
5858
LAST_CONTIG = None
5959

60-
for read in alnfile.fetch():
60+
for read in alnfile.fetch(until_eof=True):
6161
chrom = read.reference_name
6262
# check if the current chromosome is different from the previous one
6363
# if so, print the dict to file and empty it (a consideration for RAM usage)

harpy/bin/check_bam.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
parser.error(f"{args.input} was not found")
3131

3232
bam_in = args.input
33-
if bam_in.lower().endswith(".bam"):
34-
if not os.path.exists(bam_in + ".bai"):
35-
pysam.index(bam_in)
33+
#if bam_in.lower().endswith(".bam"):
34+
# if not os.path.exists(bam_in + ".bai"):
35+
# pysam.index(bam_in)
3636

3737
# regex for EXACTLY AXXCXXBXXDXX
3838
haplotag = re.compile('^A[0-9][0-9]C[0-9][0-9]B[0-9][0-9]D[0-9][0-9]')
@@ -52,7 +52,7 @@
5252
BX_NOT_LAST = 0
5353
NO_MI = 0
5454

55-
for record in alnfile.fetch():
55+
for record in alnfile.fetch(until_eof=True):
5656
N_READS += 1
5757
tags = [i[0] for i in record.get_tags()]
5858
# is there a bx tag?

harpy/bin/concatenate_bam.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@
5959
haplotag_limit = 96**4
6060

6161
# instantiate output file
62-
if aln_list[0].lower().endswith(".bam"):
63-
if not os.path.exists(f"{aln_list[0]}.bai"):
64-
pysam.index(aln_list[0])
65-
# for housekeeping
66-
DELETE_FIRST_INDEX = True
67-
else:
68-
DELETE_FIRST_INDEX = False
62+
#if aln_list[0].lower().endswith(".bam"):
63+
# if not os.path.exists(f"{aln_list[0]}.bai"):
64+
# pysam.index(aln_list[0])
65+
# # for housekeeping
66+
# DELETE_FIRST_INDEX = True
67+
# else:
68+
# DELETE_FIRST_INDEX = False
6969
with pysam.AlignmentFile(aln_list[0]) as xam_in:
7070
header = xam_in.header.to_dict()
7171
# Remove all @PG lines
@@ -99,14 +99,14 @@
9999
# create MI dict for this sample
100100
MI_LOCAL = {}
101101
# create index if it doesn't exist
102-
if xam.lower().endswith(".bam"):
103-
if not os.path.exists(f"{xam}.bai"):
104-
pysam.index(xam)
105-
DELETE_INDEX = True
106-
else:
107-
DELETE_INDEX = False
102+
#if xam.lower().endswith(".bam"):
103+
# if not os.path.exists(f"{xam}.bai"):
104+
# pysam.index(xam)
105+
# DELETE_INDEX = True
106+
# else:
107+
# DELETE_INDEX = False
108108
with pysam.AlignmentFile(xam) as xamfile:
109-
for record in xamfile.fetch():
109+
for record in xamfile.fetch(until_eof=True):
110110
try:
111111
mi = record.get_tag("MI")
112112
# if previously converted for this sample, use that
@@ -136,8 +136,7 @@
136136
except KeyError:
137137
pass
138138
bam_out.write(record)
139-
if DELETE_INDEX:
140-
Path.unlink(f"{xam}.bai")
139+
141140
# just for consistent housekeeping
142-
if DELETE_FIRST_INDEX:
143-
Path.unlink(f"{aln_list[0]}.bai")
141+
#if DELETE_FIRST_INDEX:
142+
# Path.unlink(f"{aln_list[0]}.bai")

harpy/bin/deconvolve_alignments.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,18 @@ def write_missingbx(bam, alnrecord):
9595
# MI is the name of the current molecule, starting a 1 (0+1)
9696
MI = 0
9797

98-
if bam_input.lower().endswith(".bam") and not os.path.exists(bam_input + ".bai"):
99-
try:
100-
pysam.index(bam_input)
101-
except (OSError, pysam.SamtoolsError) as e:
102-
sys.stderr.write(f"Error indexing BAM file: {e}\n")
103-
sys.exit(1)
98+
#if bam_input.lower().endswith(".bam") and not os.path.exists(bam_input + ".bai"):
99+
# try:
100+
# pysam.index(bam_input)
101+
# except (OSError, pysam.SamtoolsError) as e:
102+
# sys.stderr.write(f"Error indexing BAM file: {e}\n")
103+
# sys.exit(1)
104104

105105
with (
106106
pysam.AlignmentFile(bam_input) as alnfile,
107107
pysam.AlignmentFile(sys.stdout.buffer, "wb", template = alnfile) as outfile
108108
):
109-
for record in alnfile.fetch():
109+
for record in alnfile.fetch(until_eof=True):
110110
chrm = record.reference_name
111111
bp = record.query_alignment_length
112112
# check if the current chromosome is different from the previous one

harpy/bin/leviathan_bx_shim.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
args = parser.parse_args()
2828
if not os.path.exists(args.input):
2929
parser.error(f"{args.input} was not found")
30-
if not os.path.exists(args.input + ".bai"):
31-
parser.error(f"{args.input}.bai was not found")
30+
#if not os.path.exists(args.input + ".bai"):
31+
# parser.error(f"{args.input}.bai was not found")
3232

3333
# set up a generator for the BX tags
3434
bc_range = [f"{i}".zfill(2) for i in range(1,97)]
@@ -37,7 +37,7 @@
3737

3838
with pysam.AlignmentFile(args.input) as bam_in, pysam.AlignmentFile(sys.stdout.buffer, 'wb', header=bam_in.header) as bam_out:
3939
# iterate through the bam file
40-
for record in bam_in.fetch():
40+
for record in bam_in.fetch(until_eof=True):
4141
try:
4242
mi = record.get_tag("MI")
4343
if mi not in MI_BX:

harpy/demultiplex.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def demultiplex():
2828
"harpy demultiplex gen1": [
2929
{
3030
"name": "Parameters",
31-
"options": ["--qx-rx","--schema"],
31+
"options": ["--keep-unknown", "--qx-rx","--schema"],
3232
},
3333
{
3434
"name": "Workflow Controls",
@@ -38,10 +38,11 @@ def demultiplex():
3838
}
3939

4040
@click.command(no_args_is_help = True, context_settings=dict(allow_interspersed_args=False), epilog = "Documentation: https://pdimens.github.io/harpy/workflows/demultiplex/")
41+
@click.option('-u', '--keep-unknown', is_flag = True, default = False, help = 'Keep reads that could not be demultiplexed')
42+
@click.option('-q', '--qx-rx', is_flag = True, default = False, help = 'Include the `QX:Z` and `RX:Z` tags in the read header')
4143
@click.option('-s', '--schema', required = True, type=click.Path(exists=True, dir_okay=False, readable=True), help = 'File of `sample`\\<TAB\\>`barcode`')
42-
@click.option('-t', '--threads', default = 4, show_default = True, type = click.IntRange(min = 1, max_open = True), help = 'Number of threads to use')
44+
@click.option('-t', '--threads', default = 4, show_default = True, type = click.IntRange(min = 2, max_open = True), help = 'Number of threads to use')
4345
@click.option('-o', '--output-dir', type = click.Path(exists = False), default = "Demultiplex", show_default=True, help = 'Output directory name')
44-
@click.option('-q', '--qx-rx', is_flag = True, default = False, help = 'Include the `QX:Z` and `RX:Z` tags in the read header')
4546
@click.option('--container', is_flag = True, default = False, help = 'Use a container instead of conda')
4647
@click.option('--setup-only', is_flag = True, hidden = True, default = False, help = 'Setup the workflow and exit')
4748
@click.option('--hpc', type = HPCProfile(), help = 'Directory with HPC submission `config.yaml` file')
@@ -52,7 +53,7 @@ def demultiplex():
5253
@click.argument('R2_FQ', required=True, type=click.Path(exists=True, dir_okay=False, readable=True))
5354
@click.argument('I1_FQ', required=True, type=click.Path(exists=True, dir_okay=False, readable=True))
5455
@click.argument('I2_FQ', required=True, type=click.Path(exists=True, dir_okay=False, readable=True))
55-
def gen1(r1_fq, r2_fq, i1_fq, i2_fq, output_dir, schema, qx_rx, threads, snakemake, skip_reports, quiet, hpc, container, setup_only):
56+
def gen1(r1_fq, r2_fq, i1_fq, i2_fq, output_dir, keep_unknown, schema, qx_rx, threads, snakemake, skip_reports, quiet, hpc, container, setup_only):
5657
"""
5758
Demultiplex Generation I haplotagged FASTQ files
5859
@@ -84,12 +85,13 @@ def gen1(r1_fq, r2_fq, i1_fq, i2_fq, output_dir, schema, qx_rx, threads, snakema
8485
"workflow" : "demultiplex gen1",
8586
"snakemake_log" : sm_log,
8687
"output_directory" : output_dir,
88+
"include_qx_rx_tags" : qx_rx,
89+
"keep_unknown" : keep_unknown,
8790
"workflow_call" : command.rstrip(),
8891
"conda_environments" : conda_envs,
8992
"reports" : {
9093
"skip": skip_reports
9194
},
92-
"include_qx_rx_tags" : qx_rx,
9395
"inputs" : {
9496
"demultiplex_schema" : Path(schema).resolve().as_posix(),
9597
"R1": Path(r1_fq).resolve().as_posix(),

harpy/scripts/demultiplex_gen1.py

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,6 @@ def read_barcodes(file_path, segment):
2020
continue
2121
return data_dict
2222

23-
def read_schema(file_path):
24-
"""Read and parse schema file of sample<tab>id_segment"""
25-
# one sample can have more than one code
26-
# {segment : sample}
27-
data_dict = {}
28-
# codes can be Axx, Bxx, Cxx, Dxx
29-
code_letters = set()
30-
with open(file_path, 'r') as file:
31-
for line in file:
32-
try:
33-
sample, segment_id = line.rstrip().split()
34-
data_dict[segment_id] = sample
35-
code_letters.add(segment_id[0])
36-
except ValueError:
37-
# skip rows without two columns
38-
continue
39-
id_letter = code_letters.pop()
40-
return id_letter, data_dict
41-
4223
def get_min_dist(needle, code_letter):
4324
minDist = 999
4425
nbFound = 0
@@ -81,7 +62,9 @@ def get_read_codes(index_read, left_segment, right_segment):
8162
sys.stderr = sys.stdout = f
8263
outdir = snakemake.params.outdir
8364
qxrx = snakemake.params.qxrx
84-
schema = snakemake.input.schema
65+
sample_name = snakemake.params.sample
66+
id_segments = snakemake.params.id_segments
67+
id_letter = id_segments[0][0]
8568
r1 = snakemake.input.R1
8669
r2 = snakemake.input.R2
8770
i1 = snakemake.input.I1
@@ -97,14 +80,6 @@ def get_read_codes(index_read, left_segment, right_segment):
9780
"D" : read_barcodes(bx_d, "D"),
9881
}
9982

100-
#read schema
101-
id_letter, samples_dict = read_schema(schema)
102-
samples = list(set(samples_dict.values()))
103-
samples.append("unidentified_data")
104-
#create an array of files (one per sample) for writing
105-
R1_output = {sample: open(f"{outdir}/{sample}.R1.fq", 'w') for sample in samples}
106-
R2_output = {sample: open(f"{outdir}/{sample}.R2.fq", 'w') for sample in samples}
107-
10883
segments = {'A':'', 'B':'', 'C':'', 'D':''}
10984
unclear_read_map={}
11085
clear_read_map={}
@@ -113,49 +88,46 @@ def get_read_codes(index_read, left_segment, right_segment):
11388
pysam.FastxFile(r2) as R2,
11489
pysam.FastxFile(i1, persist = False) as I1,
11590
pysam.FastxFile(i2, persist = False) as I2,
116-
open(snakemake.output.valid, 'w') as clearBC_log,
117-
open(snakemake.output.invalid, 'w') as unclearBC_log
91+
open(f"{outdir}/{sample_name}.R1.fq", 'w') as R1_out,
92+
open(f"{outdir}/{sample_name}.R2.fq", 'w') as R2_out,
93+
open(snakemake.output.bx_info, 'w') as BC_log
11894
):
11995
for r1_rec, r2_rec, i1_rec, i2_rec in zip(R1, R2, I1, I2):
120-
segments['A'], segments['C'], statusR1 = get_read_codes(i1_rec.sequence, "C", "A")
121-
segments['B'], segments['D'], statusR2 = get_read_codes(i2_rec.sequence, "D", "B")
96+
segments['A'], segments['C'], R1_status = get_read_codes(i1_rec.sequence, "C", "A")
97+
segments['B'], segments['D'], R2_status = get_read_codes(i2_rec.sequence, "D", "B")
98+
if segments[id_letter] not in id_segments:
99+
continue
100+
statuses = [R1_status, R2_status]
122101
BX_code = segments['A'] + segments['C'] + segments['B']+ segments['D']
123102
bc_tags = f"BX:Z:{BX_code}"
124103
if qxrx:
125104
bc_tags = f"RX:Z:{i1_rec.sequence}+{i2_rec.sequence}\tQX:Z:{i1_rec.quality}+{i2_rec.quality}\t{bc_tags}"
126105
r1_rec.comment += f"\t{bc_tags}"
127106
r2_rec.comment += f"\t{bc_tags}"
128-
# search sample name
129-
sample_name = samples_dict.get(segments[id_letter], "unidentified_data")
130-
R1_output[sample_name].write(f"{r1_rec}\n")
131-
R2_output[sample_name].write(f"{r2_rec}\n")
107+
R1_out.write(f"{r1_rec}\n")
108+
R2_out.write(f"{r2_rec}\n")
132109

133-
if (statusR1 == "unclear" or statusR2 == "unclear"):
110+
# logging barcode identification
111+
if "unclear" in statuses:
134112
if BX_code in unclear_read_map:
135113
unclear_read_map[BX_code] += 1
136114
else:
137115
unclear_read_map[BX_code] = 1
138116
else:
139-
if (statusR1 == "corrected" or statusR2 == "corrected"):
117+
if "corrected" in statuses:
140118
if BX_code in clear_read_map:
141119
clear_read_map[BX_code][1] += 1
142120
else:
143121
clear_read_map[BX_code] = [0,1]
144122
else:
145-
if (statusR1 == "found" and statusR2 == "found"):
123+
if all(status == "found" for status in statuses):
146124
if BX_code in clear_read_map:
147125
clear_read_map[BX_code][0] += 1
148126
else:
149-
clear_read_map[BX_code] = [1,0]
150-
151-
for sample_name in samples:
152-
R1_output[sample_name].close()
153-
R2_output[sample_name].close()
127+
clear_read_map[BX_code] = [1,0]
154128

155-
clearBC_log.write("Barcode\tCorrect reads\tCorrected reads\n" )
129+
BC_log.write("Barcode\tTotal_Reads\tCorrect_Reads\tCorrected_Reads\n")
156130
for code in clear_read_map:
157-
clearBC_log.write(code+"\t"+"\t".join(str(x) for x in clear_read_map[code])+"\n")
158-
159-
unclearBC_log.write("Barcode\tReads\n")
131+
BC_log.write(f"{code}\t{sum(clear_read_map[code])}\t{clear_read_map[code][0]}\t{clear_read_map[code][1]}\n")
160132
for code in unclear_read_map:
161-
unclearBC_log.write(code +"\t"+str(unclear_read_map [code])+"\n")
133+
BC_log.write(f"{code}\t{unclear_read_map[code]}\t0\t0\n")

0 commit comments

Comments
 (0)