Skip to content

Commit

Permalink
Updates based on testing
Browse files Browse the repository at this point in the history
  • Loading branch information
charles-cowart committed Jan 9, 2025
1 parent 63faccb commit 3cbc89e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
43 changes: 25 additions & 18 deletions qp_klp/Protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,10 @@ def convert_raw_to_fastq(self):
if 'TellReadJob' not in self.skip_steps:
job.run(callback=self.job_callback)

self.pipeline.get_sample_ids()
failed_samples = []
# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit()
if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
# it is not performed by SPP.
Expand All @@ -129,33 +131,39 @@ def convert_raw_to_fastq(self):

def generate_sequence_counts(self):
config = self.pipeline.get_software_configuration('tell-seq')
# filter on corrected.err_barcode_removed

files_to_count_path = join(self.pipeline.output_path,
'files_to_count.txt')

with open(files_to_count_path, 'w') as f:
# for raw_counts_r1r2, count corrected.err_barcode_removed files
# (TellReadJob final output).
for root, dirs, files in walk(self.raw_fastq_files_path):
for _file in files:
if 'corrected.err_barcode_removed' in _file:
print(join(root, _file), file=f)

job = SeqCountsJob(self.pipeline.run_dir,
self.pipeline.output_path,
self.pipeline.input_file_path,
config['queue'],
config['nodes'],
config['wallclock_time_in_minutes'],
config['normcount_mem_limit'],
config['modules_to_load'],
self.master_qiita_job_id,
'',
config['integrate_script_path'],
self.pipeline.qiita_job_id)
config['job_max_array_length'],
files_to_count_path,
self.pipeline.get_sample_sheet_path(),
cores_per_task=config['tellread_cores'])

if 'SeqCountsJob' not in self.skip_steps:
job.run(callback=self.job_callback)

# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit_me(self.pipeline.get_sample_ids())
if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
# it is not performed by SPP.
self.fsr.write(failed_samples, job.__class__.__name__)

return failed_samples
# Do not add an entry to fsr because w/respect to counting, either
# all jobs are going to fail or none are going to fail. It's not
# likely that we're going to fail to count sequences for only some
# of the samples.

def integrate_results(self):
config = self.pipeline.get_software_configuration('tell-seq')
Expand All @@ -173,7 +181,6 @@ def integrate_results(self):
config['integrate_mem_limit'],
config['modules_to_load'],
self.master_qiita_job_id,
"foo",
config['integrate_script_path'],
# NB: sample_index_list used may vary
# from project to project in the future.
Expand Down Expand Up @@ -224,7 +231,7 @@ def integrate_results(self):
# audit the results to determine which samples failed to convert
# properly. Append these to the failed-samples report and also
# return the list directly to the caller.
failed_samples = job.audit_me(self.pipeline.get_sample_ids())
failed_samples = job.audit()

if hasattr(self, 'fsr'):
# NB 16S does not require a failed samples report and
Expand Down
18 changes: 9 additions & 9 deletions qp_klp/klp.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
user_input_file = parameters.pop('sample_sheet')
lane_number = parameters.pop('lane_number')

if {'body', 'content_type', 'filename'} != set(user_input_file):
return False, None, ("This doesn't appear to be a valid sample "
"sheet or mapping file; please review.")
uif_path = out_path(user_input_file['filename'].replace(' ', '_'))
# save raw data to file
with open(uif_path, 'w') as f:
f.write(user_input_file['body'])

# the run_identifier must be saved because it is not always preserved
# in a dependable location downstream. The user input file must be
# saved because it is always a unique name and it cannot be guaranteed
Expand All @@ -114,15 +122,7 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir):
# the user_input file on the first run.
restart_file_path = out_path('restart_me')
with open(restart_file_path, 'w') as f:
f.write(f"{run_identifier}\n{user_input_file}")

if {'body', 'content_type', 'filename'} != set(user_input_file):
return False, None, ("This doesn't appear to be a valid sample "
"sheet or mapping file; please review.")
uif_path = out_path(user_input_file['filename'].replace(' ', '_'))
# save raw data to file
with open(uif_path, 'w') as f:
f.write(user_input_file['body'])
f.write(f"{run_identifier}\n{uif_path}")

final_results_path = out_path('final_results')
makedirs(final_results_path, exist_ok=True)
Expand Down

0 comments on commit 3cbc89e

Please sign in to comment.