import os
import io
import glob
import contextlib
import collections
from beers.cluster_packet import ClusterPacket
from beers.utilities.demultiplex import demultiplexer
from beers_utils.constants import CONSTANTS
from beers.flowcell import Flowcell
[docs]class FastQ:
"""
The FastQ object generates a FastQ report for the run for a given flowcell and for each direction in the case of
paired end reads. This report function is called by the controller but ONLY when all cluster packets have been
processed.
"""
def __init__(
self,
flowcell: Flowcell,
sample_id: str,
sample_barcode: str,
):
"""
The FastQ object requires the flowcell, the top level directory housing the cluster packets that have
emerged from the sequence pipeline (they will be in the data directory under the sequence pipeline stage name),
and the output directory for the fasta files (they will be in the data directory under the controller stage
name).
Parameters
----------
flowcell:
The flowcell to which this fastQ object applies.
sample_id:
id of the sample whose fastq files we generate
sample_barcode:
barcodes as a string of the format '{i5}+{i7}'. Demultiplexing is done off these
"""
self.flowcell = flowcell
self.sample_id = sample_id
self.sample_barcode = sample_barcode
[docs] def generate_report(self, cluster_packet_paths, output_file_paths, bad_barcode_file_paths, sort_by_coordinates=False):
"""
The principal method of this object generates one or two reports depending upon whether paired end reads are
called for. All the information needed to create the FASTQ files is found in the cluster packets themselves.
For each cluster packet, we identify whether there is one called sequence or two (paired ends). The first
called sequence in the list is always the forward one. We find each cluster in the cluster packet that is
affixed to the lanes of interest. A fasta header is generated internally for each of those remaining clusters
for the given direction. The remaining clusters are sorted by their coordinates and each entry is written to
the FASTQ file - header, called sequence, + quality score.
Output files are named according to barcode_S#_L#_R#.fastq specifying sample, lane and read direction numbers.
Parameters
----------
cluster_packet_paths:
list of cluster packet file paths to a generate report from
output_file_paths:
list of two (one for R1/R2 read directions) lists of sam/bam file to output to for each lane, in the same order as config.flowcell.lanes_to_use
bad_barcode_file_paths:
list of two (one for R1/R2 read directions) list of sam/bam files to output to for each lane, in the same order as config.flowcell.lanes_to_use
sort_by_coordinates:
Whether to sort output by coordinates, as would typically be done with a fastq
file from Illumina. Default: False. Setting this to True will consume considerably more memory
as all reads are read in at once.
"""
def cluster_generator():
def inner_cluster_generator():
for cluster_packet_path in cluster_packet_paths:
cluster_packet = ClusterPacket.deserialize(cluster_packet_path, skip_base_counts=True)
yield from cluster_packet.clusters
if sort_by_coordinates:
yield from sorted(inner_cluster_generator(), key=lambda cluster: cluster.coordinates)
else:
yield from inner_cluster_generator()
with contextlib.ExitStack() as stack:
# Open all the files
fastq_output_file_path = {direction: {lane: path for lane, path in zip(self.flowcell.lanes_to_use, out_paths)}
for direction, out_paths in zip(CONSTANTS.DIRECTION_CONVENTION, output_file_paths)}
bad_barcode_file_path = {direction: {lane: path for lane, path in zip(self.flowcell.lanes_to_use, bad_paths)}
for direction, bad_paths in zip(CONSTANTS.DIRECTION_CONVENTION, bad_barcode_file_paths)}
print(f"Writing out demultiplexed fastq files to:")
for direction in fastq_output_file_path.values():
for fastq in direction.values():
print(fastq)
bad_barcode_files = {direction: {lane: stack.enter_context(open(bad_barcode_file_path[direction][lane], "w"))
for lane in self.flowcell.lanes_to_use}
for direction in CONSTANTS.DIRECTION_CONVENTION}
def fastq_by_barcode(direction, lane):
fastq_files = {self.sample_barcode: stack.enter_context(open(fastq_output_file_path[direction][lane],'w'))}
return collections.defaultdict(
lambda : bad_barcode_files[direction][lane],
fastq_files
)
fastq_output_files = {direction: {lane: fastq_by_barcode(direction, lane)
for lane in self.flowcell.lanes_to_use}
for direction in CONSTANTS.DIRECTION_CONVENTION}
demuxes = {direction: {lane: demultiplexer(fastqs) for lane, fastqs in fastq_output_files[direction].items()}
for direction in CONSTANTS.DIRECTION_CONVENTION}
# Iterate through all the cluster packets
for cluster in cluster_generator():
for direction_num in CONSTANTS.DIRECTION_CONVENTION:
cluster.generate_fasta_header(direction_num)
barcode = cluster.called_barcode
fastq_file = demuxes[direction_num][cluster.lane](barcode)
fastq_file.write(cluster.header + "\n")
fastq_file.write(cluster.called_sequences[direction_num - 1] + "\n")
fastq_file.write("+\n")
fastq_file.write(cluster.quality_scores[direction_num - 1] + "\n")