diff --git a/qp_klp/Protocol.py b/qp_klp/Protocol.py index ae6422b..160ca86 100644 --- a/qp_klp/Protocol.py +++ b/qp_klp/Protocol.py @@ -118,8 +118,10 @@ def convert_raw_to_fastq(self): if 'TellReadJob' not in self.skip_steps: job.run(callback=self.job_callback) - self.pipeline.get_sample_ids() - failed_samples = [] + # audit the results to determine which samples failed to convert + # properly. Append these to the failed-samples report and also + # return the list directly to the caller. + failed_samples = job.audit() if hasattr(self, 'fsr'): # NB 16S does not require a failed samples report and # it is not performed by SPP. @@ -129,33 +131,39 @@ def convert_raw_to_fastq(self): def generate_sequence_counts(self): config = self.pipeline.get_software_configuration('tell-seq') + # filter on corrected.err_barcode_removed + + files_to_count_path = join(self.pipeline.output_path, + 'files_to_count.txt') + + with open(files_to_count_path, 'w') as f: + # for raw_counts_r1r2, count corrected.err_barcode_removed files + # (TellReadJob final output). + for root, dirs, files in walk(self.raw_fastq_files_path): + for _file in files: + if 'corrected.err_barcode_removed' in _file: + print(join(root, _file), file=f) job = SeqCountsJob(self.pipeline.run_dir, self.pipeline.output_path, - self.pipeline.input_file_path, config['queue'], config['nodes'], config['wallclock_time_in_minutes'], config['normcount_mem_limit'], config['modules_to_load'], self.master_qiita_job_id, - '', - config['integrate_script_path'], - self.pipeline.qiita_job_id) + config['job_max_array_length'], + files_to_count_path, + self.pipeline.get_sample_sheet_path(), + cores_per_task=config['tellread_cores']) if 'SeqCountsJob' not in self.skip_steps: job.run(callback=self.job_callback) - # audit the results to determine which samples failed to convert - # properly. Append these to the failed-samples report and also - # return the list directly to the caller. - failed_samples = job.audit_me(self.pipeline.get_sample_ids()) - if hasattr(self, 'fsr'): - # NB 16S does not require a failed samples report and - # it is not performed by SPP. - self.fsr.write(failed_samples, job.__class__.__name__) - - return failed_samples + # Do not add an entry to fsr because w/respect to counting, either + # all jobs are going to fail or none are going to fail. It's not + # likely that we're going to fail to count sequences for only some + # of the samples. def integrate_results(self): config = self.pipeline.get_software_configuration('tell-seq') @@ -173,7 +181,6 @@ def integrate_results(self): config['integrate_mem_limit'], config['modules_to_load'], self.master_qiita_job_id, - "foo", config['integrate_script_path'], # NB: sample_index_list used may vary # from project to project in the future. @@ -224,7 +231,7 @@ def integrate_results(self): # audit the results to determine which samples failed to convert # properly. Append these to the failed-samples report and also # return the list directly to the caller. - failed_samples = job.audit_me(self.pipeline.get_sample_ids()) + failed_samples = job.audit() if hasattr(self, 'fsr'): # NB 16S does not require a failed samples report and diff --git a/qp_klp/klp.py b/qp_klp/klp.py index ca6c015..6168b4e 100644 --- a/qp_klp/klp.py +++ b/qp_klp/klp.py @@ -106,6 +106,14 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): user_input_file = parameters.pop('sample_sheet') lane_number = parameters.pop('lane_number') + if {'body', 'content_type', 'filename'} != set(user_input_file): + return False, None, ("This doesn't appear to be a valid sample " + "sheet or mapping file; please review.") + uif_path = out_path(user_input_file['filename'].replace(' ', '_')) + # save raw data to file + with open(uif_path, 'w') as f: + f.write(user_input_file['body']) + # the run_identifier must be saved because it is not always preserved # in a dependable location downstream. The user input file must be # saved because it is always a unique name and it cannot be guaranteed @@ -114,15 +122,7 @@ def sequence_processing_pipeline(qclient, job_id, parameters, out_dir): # the user_input file on the first run. restart_file_path = out_path('restart_me') with open(restart_file_path, 'w') as f: - f.write(f"{run_identifier}\n{user_input_file}") - - if {'body', 'content_type', 'filename'} != set(user_input_file): - return False, None, ("This doesn't appear to be a valid sample " - "sheet or mapping file; please review.") - uif_path = out_path(user_input_file['filename'].replace(' ', '_')) - # save raw data to file - with open(uif_path, 'w') as f: - f.write(user_input_file['body']) + f.write(f"{run_identifier}\n{uif_path}") final_results_path = out_path('final_results') makedirs(final_results_path, exist_ok=True)