Source code for beers.cluster_packet

from beers_utils.sample import Sample
from beers.cluster import Cluster
import os
import gzip

[docs]class ClusterPacket: """ The cluster packet object is the medium of exchange between steps in the sequence pipeline. Each step execution in the sequence pipeline accepts a cluster packet as input and returns a chuster packet, usually modified somehow, as output. The cluster packet is primarily composed of clusters that are derived from molecules. The cluster packet itself is derived from the molecule packet that contained those precursor molecules to begin with. At the end of the pipeline, when FASTQ reports are being created the data contained in the cluster packet is used to populate those FASTQ reports. """ next_cluster_packet_id = 0 # Static variable for creating increasing cluster packet id's def __init__(self, cluster_packet_id: int, sample: Sample, clusters: list[Cluster]): """ The cluster packet is mostly a wrapper for the clusters but additionally, it does contain data from the sample from which the original molecules were drawn. Note then that a cluster packet respresents a portion of just one sample. Parameters --------- cluster_packet_id: Unique identifier for a cluster packet. This is normally included in filenames to assure uniqueness when saving cluster data to disk. sample: A sample object representing the sample ancestor of the contained clusters clusters: The contained clusters """ self.cluster_packet_id = cluster_packet_id self.sample = sample self.clusters = clusters def __str__(self): """ String representation of a cluster packet that may be displayed when a cluster packet object is printed. This may not be a complete representation. Returns ------- A string representing the cluster packet. """ return f"cluster_packet_id: {self.cluster_packet_id}, sample_name: {self.sample.sample_name}, " \ f"# of clusters: {len(self.clusters)}"
[docs] def serialize(self, file_path: str): """ Cluster packets are serialized and saved to the file system and likewise de-serialized from the file system. The serialized data is written, in compressed form, to a gzip file. The first line contains the cluster packet id and the serialized sample data, prepended with a '#'. Following that, each cluster is serialized and added to the file. Parameters ---------- file_path: location of the file into which the serialized, compressed data is to go. """ with gzip.open(file_path, 'wb') as obj_file: obj_file.write(f"#{self.cluster_packet_id}\n#{self.sample.serialize()}\n".encode()) for cluster in self.clusters: obj_file.write(cluster.serialize().encode()) # Clusters take up a variable number of lines, so we need a separator obj_file.write("-\n".encode())
[docs] @staticmethod def deserialize(file_path: str, skip_base_counts: bool=False) -> 'ClusterPacket': """ Deserialize the compressed data found in the gzipped file located via the given file path, into a fully restored object of the ClusterPacket class. Parameters ---------- file_path: The locatation of the gzipped file containing the serialized object. skip_base_counts: if True, don't load base counts (for memory efficiency) Returns ------- ClusterPacket """ cluster_lines: list[str] = [] clusters = [] cluster_packet_id = 0 sample = None with gzip.open(file_path, 'rb') as obj_file: for line_number, line in enumerate(obj_file): line = line.rstrip(b'\n') if line_number == 0: cluster_packet_id = int(line[1:].decode()) elif line_number == 1: sample = Sample.deserialize(line.decode()) else: if line.decode() == '-': if cluster_lines: clusters.append(Cluster.deserialize("\n".join(cluster_lines), skip_base_counts)) cluster_lines = [] else: cluster_lines.append(line.decode()) return ClusterPacket(cluster_packet_id, sample, clusters)