Skip to content
65 changes: 38 additions & 27 deletions fasta/interleave_fasta.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,45 @@
#!/usr/bin/env python
#!/usr/bin/env python3

# encoding:utf8
# author: Sébastien Boisvert
# part of Ray distribution
"""This script takes two fasta files and interleaves them

Usage:
interleave-fasta.py fasta_file1 fasta_file2

Author: Sébastien Boisvert
Py3 conversion: RPINerd
"""

# Importing modules
import sys

# Main
if __name__ == '__main__':
try:
file1 = sys.argv[1]
file2 = sys.argv[2]
except:
print __doc__
sys.exit(1)

with open(file1) as f1:
with open(file2) as f2:
while True:
line = f1.readline()
if line.strip() == "":
break

print line.strip()
print f1.readline().strip()
print f2.readline().strip()
print f2.readline().strip()
import argparse
import os


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Interleave two fasta files")
parser.add_argument("-1", "--fasta1", type=str, help="First fasta file")
parser.add_argument("-2", "--fasta2", type=str, help="Second fasta file")
return parser.parse_args()


def main(args: argparse.Namespace) -> None:

# Verify the files exist
if not os.path.exists(args.fasta1):
raise FileNotFoundError(f"Error: file {args.fasta1} does not exist\n")
if not os.path.exists(args.fasta2):
raise FileNotFoundError(f"Error: file {args.fasta2} does not exist\n")

with open(args.fasta1, "r") as f1, open(args.fasta2, "r") as f2:
while True:
line = f1.readline()
if line.strip() == "":
break

print(line.strip())
print(f1.readline().strip())
print(f2.readline().strip())
print(f2.readline().strip())


if __name__ == "__main__":
args = parse_args()
main(args)
112 changes: 67 additions & 45 deletions fastq/append_to_fastq_read_header.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@
"""
Running with no options other than -i and -o, this script will transform headers like these:

@SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT
@SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT

to headers like this:

@SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1
@SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1

That is, it will see the '1' or '2' after the whitespace and transform the header to this instead (in FASTA):

>SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1
>SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1

Otherwise, if you want a specific string to be appended to all headers, use the --append option.

Expand All @@ -24,70 +24,92 @@
import re
import sys

def main():
parser = argparse.ArgumentParser( description='Append strings to the end of FASTQ read headers')

def parse_args() -> argparse.Namespace:
"""
Simple argument parser for this script.

:return: argparse.Namespace object containing the input options
"""

parser = argparse.ArgumentParser(description="Append strings to the end of FASTQ read headers")

## output file to be written
parser.add_argument('-i', '--input_file', type=str, required=True, help='Path to an input file to be read (gzip supported)' )
parser.add_argument('-o', '--output_file', type=str, required=False, help='Path to an output file to be created' )
parser.add_argument('-a', '--append', type=str, required=False, help='String to be appended to each read header' )
args = parser.parse_args()
parser.add_argument(
"-i", "--input_file", type=str, required=True, help="Path to an input file to be read (gzip supported)"
)
parser.add_argument("-o", "--output_file", type=str, required=False, help="Path to an output file to be created")
parser.add_argument("-a", "--append", type=str, required=False, help="String to be appended to each read header")

return parser.parse_args()


def append_header(line: str, append: str) -> str:
"""
Append a custom string to the end of a FASTQ header line.

:param line: FASTQ header line
:param append: string to append

:return: FASTQ header line with appended string
"""

header = re.search("^(\@\S+)", line)
if header:
return f"{header.group(1)}{append}\n"
else:
raise Exception(f"ERROR: FASTQ header line found that didn't match expected format: {line}")


def reformat_header(line: str) -> str:
"""
Reformat a FASTQ header line to the standard format.

:param line: FASTQ header line

:return: reformatted FASTQ header line
"""

header = re.search("^(\@.+?) (\d)", line)
if header:
return f"{header.group(1)}/{header.group(2)}\n"
else:
raise Exception(f"ERROR: FASTQ header line found that didn't match expected format: {line}")


def main(args: argparse.Namespace) -> None:

if args.output_file is None:
ofh = sys.stdout
else:
ofh = open( args.output_file, 'wt' )
ofh = open(args.output_file, "wt")

if args.append is None:
detect_direction = True
else:
detect_direction = False

line_count = 0

if args.input_file.endswith('.gz'):
fh = gzip.open( args.input_file, 'rb')
if args.input_file.endswith(".gz"):
fh = gzip.open(args.input_file, "rb")
is_compressed = True
else:
fh = open(args.input_file, 'rU')
fh = open(args.input_file, "rU")
is_compressed = False

for line in fh:
if is_compressed:
line = line.decode()

line_count += 1

if line_count % 4 == 1:

if detect_direction:
m = re.search('^(\@.+?) (\d)', line)
if m:
ofh.write("{0}/{1}\n".format(m.group(1), m.group(2)))
else:
raise Exception("ERROR: FASTQ header line found that didn't match expected format: {0}".format(line))
else:
m = re.search('^(\@\S+)', line)
if m:
ofh.write("{0}{1}\n".format(m.group(1), args.append))
else:
raise Exception("ERROR: FASTQ header line found that didn't match expected format: {0}".format(line))


header = append_header(line, args.append) if args.append else reformat_header(line)
ofh.write(header)
else:
ofh.write(line)

ofh.close()




if __name__ == '__main__':
main()




fh.close()
ofh.close()


if __name__ == "__main__":

args = parse_args()
main(args)
113 changes: 64 additions & 49 deletions fastq/convert_fastq_to_fasta.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,30 +12,30 @@
- 2 minutes, 8 seconds when run with the --width=60 option
- 3 minutes, 17 seconds when run with both --detect_direction and --width=60 options

** Timing was run on a laptop with an Intel(R) Core(TM) i7-3740QM CPU @ 2.70GHz,
** Timing was run on a laptop with an Intel(R) Core(TM) i7-3740QM CPU @ 2.70GHz,
utilizing only one core.

OUTPUT
------

Running with no options other than -i and -o, this script will transform headers like these:

@SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT
@61JCNAAXX100503:5:100:10001:18267/2
@SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT
@61JCNAAXX100503:5:100:10001:18267/2

to this in the FASTA output:
>SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT
>61JCNAAXX100503:5:100:10001:18267/2
to this in the FASTA output:

>SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT
>61JCNAAXX100503:5:100:10001:18267/2

If you have have headers like this:

@SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT
@SN7001163:78:C0YG5ACXX:6:1101:1129:2043 1:N:0:CCTAGGT

The optional --detect_direction option will see the '1' or '2' after the whitespace and transform
the header to this instead (in FASTA):

>SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1
>SN7001163:78:C0YG5ACXX:6:1101:1129:2043/1

Note that this can increase the runtime up to 3x (from my testing), since a regex is used.

Expand All @@ -51,67 +51,82 @@
"""

import argparse
import os
import re
import sys

def main():
parser = argparse.ArgumentParser( description='Convert FASTQ to FASTA format')

def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Convert FASTQ to FASTA format")

## output file to be written
parser.add_argument('-i', '--input_file', type=str, required=True, help='Path to an input file to be read' )
parser.add_argument('-o', '--output_file', type=str, required=False, help='Path to an output file to be created' )
parser.add_argument('-d', '--detect_direction', action='store_true', help='Pass this flag to auto-detect the mate pair direction. See full documentation for more info' )
parser.add_argument('-w', '--width', type=int, required=False, help='Defines the width (or number of bases) on each FASTA sequence line' )
args = parser.parse_args()
parser.add_argument("-i", "--input_file", type=str, required=True, help="Path to an input file to be read")
parser.add_argument("-o", "--output_file", type=str, required=False, help="Path to an output file to be created")
parser.add_argument(
"-d",
"--detect_direction",
action="store_true",
help="Pass this flag to auto-detect the mate pair direction. See full documentation for more info",
)
parser.add_argument(
"-w",
"--width",
type=int,
required=False,
help="Defines the width (or number of bases) on each FASTA sequence line",
)

return parser.parse_args()


def main(args: argparse.Namespace) -> None:

if args.output_file is None:
ofh = sys.stdout
else:
ofh = open( args.output_file, 'wt' )
ofh = open(args.output_file, "w")

line_count = 0
record_count = 0
last_header = None

for line in open(args.input_file, 'rU'):
line_count += 1
with open(args.input_file, "r") as fh:
for line in fh:
line_count += 1

if line_count % 4 == 1:
record_count += 1
if line_count % 4 == 1:
record_count += 1

if args.detect_direction:
m = re.search('^\@(.+?) (\d)', line)
if m:
last_header = "{0}/{1}\n".format(m.group(1), m.group(2))
if args.detect_direction:
m = re.search(r"^@(.+?) (\d)", line)
if m:
last_header = f"{m.group(1)}/{m.group(2)}\n"
else:
raise Exception(f"ERROR: FASTQ header line found that didn't match expected format: {line}")
else:
raise Exception("ERROR: FASTQ header line found that didn't match expected format: {0}".format(line))
else:
line = line.lstrip('@')
last_header = line

elif line_count % 4 == 2:
if args.width:
ofh.write(">{0}{1}\n".format(last_header, wrapped(line, args.width)))
else:
ofh.write(">{0}{1}".format(last_header, line))

ofh.close()

print("{0} records written to the output FASTA file".format(record_count))
line = line.lstrip("@")
last_header = line

elif line_count % 4 == 2:
if args.width:
ofh.write(f">{last_header}{wrapped(line, args.width)}\n")
else:
ofh.write(f">{last_header}{line}")

def wrapped(string, every=60):
string = string.rstrip()
''' this runs orders of magnitude faster than using the textwrap module '''
return '\n'.join(string[i:i+every] for i in range(0, len(string), every))


if __name__ == '__main__':
main()

print(f"{record_count} records written to the output FASTA file")


def wrapped(string, every=60):
string = string.rstrip()
""" this runs orders of magnitude faster than using the textwrap module """
return "\n".join(string[i : i + every] for i in range(0, len(string), every))


if __name__ == "__main__":
args = parse_args()

# Verify input file exists
if not os.path.exists(args.input_file):
raise FileNotFoundError(f"ERROR: Input file '{args.input_file}' not found\n")

main(args)
Loading