from io import StringIO
import math
from typing import Optional, ClassVar
from dataclasses import dataclass, field
from contextlib import closing
import numpy as np
from beers_utils.general_utils import GeneralUtils
from beers_utils.molecule import Molecule
from beers_utils.constants import CONSTANTS
import beers_utils.cigar
BASE_ORDER = ["A", "C", "G", "T"]
[docs]@dataclass
class Cluster:
"""
The cluster object represents initially, a single molecule bound to a flowcell which is ultimately amplified
and read from either or both directions. As such it contains base counts for each position in the original
molecule's sequence, the sequence(s) called as a result of reading those base counts in either or both directions,
and the sample barcodes (both 5' and 3' ends). A collection of these objects forms a cluster_packet.
Attributes
----------
cluster_id:
The unique id of this cluster
molecule:
The original molecule object from which the cluster is derived.
lane:
The flowcell lane where this cluster is found
coordinates:
The other flowcell coordinates (tile, x, y) identifying the cluster's location in the flowcell
molecule_count:
The total number of molecules contained within the cluster (starts a 1 and geometrically increases
with each bridge amplification cycle
diameter:
The physical space take up by the cluster (currently unused)
called_sequences:
An array of sequences that result from single or paired end reads. The first
sequence in the array is the forward read. The second is the reverse read.
called_barcode:
A string representing the read 5' barcode + 3' barcode - used in the flowcell header
quality_scores:
An array of quality score sequences corresponding to the called sequences array.
read_starts:
An array of alignment start position, one for each read, aligning them to the reference genome
read_cigars:
An array of alignment cigar strings, one for each read, aligning them to the reference genome
read_strands:
An array of alignment strand strings, one for each read, aligning them to the reference genome
base_counts:
An 4xn array of counts for each of the 4 nt bases for each position in the original molecule.
In the order of A, C, G, T counts. The initial base counts array exactly matches the original
molecule sequence.
"""
cluster_id: int
molecule: Molecule
lane: int
coordinates: tuple[int, int, int]
molecule_count: int =1
diameter: int = 0
called_sequences: list[str] = field(default_factory=list)
called_barcode: Optional[str] = None
quality_scores: list[str] = field(default_factory=list)
read_starts: list[int] = field(default_factory=list)
read_cigars: list[str] = field(default_factory=list)
read_strands: list[str] = field(default_factory=list)
base_counts: Optional[np.ndarray] = None
next_cluster_id: ClassVar[float] = 1 # Static variable for creating increasing cluster id's
[docs] def initialize_base_counts(self):
"""
Generates basecounts from the starting molecule
"""
# From sequence, we start with 1 count for each base in the sequence
encoded = np.frombuffer(self.molecule.sequence.encode("ascii"), dtype='uint8')
self.base_counts = np.array([encoded == ord(nt) for nt in BASE_ORDER]).astype(int)
self.molecule_count = 1
[docs] def assign_coordinates(self, coordinates: tuple[int, int, int]):
"""
Assign a lane coordinates object to the cluster.
Parameters
----------
coordinates:
(tile, x, y)
"""
self.coordinates = coordinates
[docs] def encode_sequence_identifier(self) -> str:
"""
Creates a sequence identifier for the cluster based upon the information contained in the cluster. The
instrument is BEERS. The run id is the identifier set by the user when running the software. The lane and the
coordinates define the location on the flowcell. The flowcell id is just filler for now.
Returns
-------
The sequence identifier as defined for the FASTQ format used here.
"""
# TODO the 1 is a placeholder for flowcell. What should we do with this?
tile, x, y = self.coordinates
return f"BEERS:1:{self.lane}:{tile}:{x}:{y}:{self.molecule.molecule_id}"
[docs] def get_base_counts_by_position(self, index: int) -> tuple[int, int, int, int]:
"""
Conveninece method to gather base counts by index (position)
Parameters
---------
index:
index (position) of interest
Returns
------
ndarray
tuple of base counts in ACGT order.
"""
return tuple(self.base_counts[:,index]) # type: ignore
def __str__(self) -> str:
"""
String representation of a cluster that may be displayed when a cluster is printed. This may not be a
complete representation. Depends on what is useful for debugging.
Returns string representation.
"""
header = f"cluster_id: {self.cluster_id}, molecule_id: {self.molecule.molecule_id}, " \
f"molecule_count: {self.molecule_count}, lane: {self.lane}, coordinates: {self.coordinates}\n"
for index in range(len(self.called_sequences)):
header += f"called sequence: {self.called_sequences[index]}\n"
header += f"quality score: {self.quality_scores[index]}\n"
with closing(StringIO()) as output:
output.write("pos\tA\tC\tG\tT\torig\n")
for index in range(len(self.molecule.sequence)):
output.write(f"{index}\t")
[output.write(f"{base_count}\t") for base_count in self.get_base_counts_by_position(index)]
output.write(f"{self.molecule.sequence[index]}\n")
return header + output.getvalue()
[docs] def serialize(self) -> str:
"""
Method to serialize data into a string. The initial line contains single value attrbitutes and is preceded
by a '#'. The second line contains serialized versions of the coordinates and the molecules and is also
preceded by a '#'. Next, may be 0 to 2 lines preceded by '##', each containing a called sequence and
a called quality score. The remaining line have no preceding hashes but list the base counts for each
position on the molecule.
Returns
-------
str
The serialized string output.
"""
tile, x, y = self.coordinates
output = f"#{self.cluster_id}\t{self.molecule_count}\t{self.diameter}\t{self.lane}\t" \
f"{self.called_barcode}\n"
output += f"#{tile}\t{x}\t{y}\n#{self.molecule.serialize()}\n"
for index in range(len(self.called_sequences)):
output += f"##{self.called_sequences[index]}\t{self.quality_scores[index]}\t{self.read_starts[index]}\t{self.read_cigars[index]}\t{self.read_strands[index]}\n"
with closing(StringIO()) as counts:
if self.molecule_count == 1 or self.base_counts is None:
# Shortcut: we only have the one molecule which is already saved
# so we indicate that here and don't write out all the counts
counts.write("None\n")
else:
for index in range(len(self.molecule.sequence)):
counts.write("\t".join([str(base_count) for base_count in self.get_base_counts_by_position(index)]))
counts.write("\n")
output += counts.getvalue()
return output
[docs] @staticmethod
def deserialize(data: str, skip_base_counts: bool =False):
"""
Method to re-render the result of the serialize method above back into a complete cluster object (without
losing or scrambling state)
Parameters
----------
data:
The string data containing the serialized version of the object
skip_base_counts:
If True, don't load the base_count data (for memory efficiency)
Returns
-------
Cluster
The Cluster object created from the data provided.
"""
data = data.rstrip('\n')
called_sequences = []
quality_scores = []
read_starts = []
read_cigars = []
read_strands = []
base_counts_not_supplied = False
g_counts = []
a_counts = []
t_counts = []
c_counts = []
for line_number, line in enumerate(data.split("\n")):
if line.startswith("##"):
called_sequence, quality_score, read_start, read_cigar, read_strand = line[2:].rstrip('\n').split("\t")
called_sequences.append(called_sequence)
quality_scores.append(quality_score)
read_starts.append(int(read_start))
read_cigars.append(read_cigar)
read_strands.append(read_strand)
elif line.startswith("#"):
if line_number == 0:
cluster_id_, molecule_count_, diameter_, lane_, called_barcode \
= line[1:].rstrip('\n').split("\t")
cluster_id = int(cluster_id_)
molecule_count = int(molecule_count_)
diameter = int(diameter_)
lane = int(lane_)
if line_number == 1:
coordinates = tuple(int(x) for x in (line[1:].rstrip('\n').split("\t")))
if line_number == 2:
molecule = Molecule.deserialize(line[1:].rstrip('\n'))
elif line == "None" or skip_base_counts:
base_counts_not_supplied = True
else:
a_count, c_count, g_count, t_count = line.rstrip('\n').split("\t")
a_counts.append(int(a_count))
c_counts.append(int(c_count))
g_counts.append(int(g_count))
t_counts.append(int(t_count))
if base_counts_not_supplied:
base_counts = None
else:
base_counts = np.array((a_counts, c_counts, g_counts, t_counts))
return Cluster(
cluster_id = cluster_id,
molecule = molecule,
lane = int(lane),
coordinates = coordinates, # type: ignore
molecule_count = int(molecule_count),
diameter = int(diameter),
called_sequences = called_sequences,
called_barcode = called_barcode,
quality_scores = quality_scores,
base_counts = base_counts,
read_starts = read_starts,
read_cigars = read_cigars,
read_strands = read_strands,
)