Source code for beers.sam

import os
import glob
import contextlib
import collections
import pysam
from beers.cluster_packet import ClusterPacket
from beers_utils.general_utils import GeneralUtils
from beers_utils.constants import CONSTANTS
from beers.utilities.demultiplex import demultiplexer
from beers.flowcell import Flowcell

[docs]class SAM: """ The SAM object generates a SAM/BAM report for the run for a given flowcell and for each direction in the case of paired end reads. This report function is called by the controller but ONLY when all cluster packets have been processed. """ def __init__( self, flowcell: Flowcell, sample_id: str, sample_barcode: str, ): """ The SAM object requires the flowcell, the top level directory housing the cluster packets that have emerged from the sequence pipeline (they will be in the data directory under the sequence pipeline stage name), and the output directory for the fasta files (they will be in the data directory under the controller stage name). Parameters ---------- flowcell: The flowcell to which this SAM object applies. sample_id: id of the sample whose SAM files we generate sample_barcode: barcodes as a string of the format '{i5}+{i7}'. Demultiplexing is done off these """ self.flowcell = flowcell self.sample_id = sample_id self.sample_barcode = sample_barcode
[docs] def generate_report(self, cluster_packet_paths, output_file_paths, bad_barcode_file_paths, reference_seqs, BAM=False, sort_by_coordinates=False): """ The principal method of this object generates one or two reports depending upon whether paired end reads are called for. All the information needed to create the SAM files is found in the cluster packets themselves. For each cluster packet, we identify whether there is one called sequence or two (paired ends). The first called sequence in the list is always the forward one. We find each cluster in the cluster packet that is affixed to the lanes of interest. The remaining clusters are sorted by their coordinates and each entry is written to the SAM file. Parameters ---------- cluster_packet_paths: list of cluster packet file paths to a generate report from output_file_paths: list of sam/bam file to output to for each lane, in the same order as config.flowcell.lanes_to_use bad_barcode_file_paths: list of sam/bam files to output to for each lane, in the same order as config.flowcell.lanes_to_use reference_seqs: dictionary mapping reference names to reference sequences, used for SAM header BAM: if true, output in BAM format, else SAM (default) sort_by_coordinates: Whether to sort output by coordinates, as would typically be done with a fastq file from Illumina. Default: False. Setting this to True will consume considerably more memory as all reads are read in at once. """ sam_header = { "HD": { "VN": "1.0"}, "SQ": [ {'SN': chrom_name.split()[0], 'LN': len(seq)} for chrom_name, seq in reference_seqs.items() ] } chrom_list = [sq['SN'] for sq in sam_header['SQ']] print("Loading from", cluster_packet_paths) def cluster_generator(): def inner_cluster_generator(): for cluster_packet_path in cluster_packet_paths: cluster_packet = ClusterPacket.deserialize(cluster_packet_path, skip_base_counts=True) yield from cluster_packet.clusters if sort_by_coordinates: yield from sorted(inner_cluster_generator(), key=lambda cluster: cluster.coordinates) else: yield from inner_cluster_generator() with contextlib.ExitStack() as stack: # Open all the files sam_output_file_path = {lane: path for lane, path in zip(self.flowcell.lanes_to_use, output_file_paths)} bad_barcode_file_path = {lane: path for lane, path in zip(self.flowcell.lanes_to_use, bad_barcode_file_paths)} print(f"Writing out demultiplexed sam files to:") for sam_path in sam_output_file_path.values(): print(sam_path) bad_barcode_files = {lane: stack.enter_context(pysam.AlignmentFile(bad_barcode_file_path[lane], ('wb' if BAM else 'w'), header=sam_header)) for lane in self.flowcell.lanes_to_use} def sam_by_barcode(lane): sam_files = {self.sample_barcode: stack.enter_context(pysam.AlignmentFile(sam_output_file_path[lane], ('wb' if BAM else 'w'), header=sam_header))} return collections.defaultdict( lambda : bad_barcode_files[lane], **sam_files ) sam_output_files = {lane: sam_by_barcode(lane) for lane in self.flowcell.lanes_to_use} demuxes = {lane: demultiplexer(output_files) for lane, output_files in sam_output_files.items()} for cluster in cluster_generator(): paired = len(cluster.called_sequences) == 2 sam = demuxes[cluster.lane](cluster.called_barcode) for direction, (seq, qual, start, cigar) in enumerate(zip(cluster.called_sequences, cluster.quality_scores, cluster.read_starts, cluster.read_cigars)): a = pysam.AlignedSegment() a.query_name = cluster.encode_sequence_identifier() rev_strand = ((cluster.molecule.source_strand == '-' and direction == 0) or (cluster.molecule.source_strand == '+' and direction == 1)) a.flag = (0x01*paired) + 0x02 + (0x40 if (direction == 0) else 0x80) + (0x10 if rev_strand else 0x20) a.query_sequence = seq if not rev_strand else GeneralUtils.create_complement_strand(seq) a.reference_id = chrom_list.index(cluster.molecule.source_chrom) a.reference_start = start - 1 # pysam uses 0-based index, we use 1-based a.mapping_quality = 255 a.cigarstring = cigar a.query_qualities = pysam.qualitystring_to_array(qual) sam.write(a)