Source code for beers.sequence.bridge_amplification_step

import numpy as np
from beers.cluster_packet import ClusterPacket

[docs]class BridgeAmplificationStep: """ This step serves to simulate the amplification of the molecule in each cluster on the flowcell. Introducing substitutions is allowed via a substitution_rate parameter. Config Example:: # Number of cycles to perform. Generates 2^N molecules in the cluster # via a PCR-like bridge amplification process. cycles: 10 # Substitution rate per base in the cluster formation # No indels are generated here, for computation simplicity. substitution_rate: 0.01 """ CYCLE_AT_WHICH_TO_IGNORE_SUBS = 4 name = "Bridge Amplification Step" def __init__(self, step_log_file_path: str, parameters: dict, global_config: dict): """ Initializes the step with a file path to the step log and a dictionary of parameters. Missing parameters that control non-idealized behavior are replaced with default values that produce idealized step behavior. Parameters ---------- step_log_file_path: location of step logfile parameters: json-like object of parameters. global_config: json-like object of configuration for the overall BEERS2 run. """ self.log_filepath = step_log_file_path self.cycles: int = parameters["cycles"] self.substitution_rate: float = parameters.get("substitution_rate", 0) self.global_config = global_config print(f"{BridgeAmplificationStep.name} instantiated")
[docs] def execute(self, cluster_packet: ClusterPacket, rng: np.random.Generator) -> ClusterPacket: """ Amplifies the molecule in the cluster for each cluster in the cluster packet by the given number of cycles and keeps track of which base positions have incurred substitutions Parameters ---------- cluster_packet: The input cluster packet rng: Random number generator instance Returns ------ The updated cluster packet """ with open(self.log_filepath, "wt") as log: for cluster in cluster_packet.clusters: # Generate the initial base_count values cluster.initialize_base_counts() for cycle in range(1,self.cycles + 1): for cluster in cluster_packet.clusters: assert cluster.base_counts is not None # Start with a perfect copy copies = cluster.base_counts.copy() if cycle < self.CYCLE_AT_WHICH_TO_IGNORE_SUBS: # For low cycles, we produce subsitutions # in later ones, they don't matter enough to include # Number of substitutions in each position and of each base type substitutions = rng.binomial(copies, p = self.substitution_rate) # First remove substituted bases copies -= substitutions # then replace them with random draws from A, C, G, T copies += multinomial(substitutions.sum(axis=0), [0.25, 0.25, 0.25, 0.25], rng) # Add to existing molecules cluster.base_counts += copies cluster.molecule_count *= 2 print(f"Molecules per cluster after amplification is {cluster_packet.clusters[0].molecule_count}.") print(f"Total molecules over all clusters is {len(cluster_packet.clusters)*cluster_packet.clusters[0].molecule_count}") return cluster_packet
@staticmethod def validate(parameters, global_config): errors = [] if 'cycles' not in parameters: errors.append("Must specify 'cycles' for number of bridge amplification cycles.") elif not isinstance(parameters['cycles'], int) or parameters['cycles'] <= 0: errors.append("'cycles' must be a positive integer") substitution_rate = parameters.get("substitution_rate", 0) if substitution_rate < 0 or substitution_rate > 1: errors.append(f"The substitution rate must be between 0 and 1") return errors
[docs]def multinomial(n: np.ndarray, p: list, rng: np.random.Generator): ''' Partially vectorized version of np.random.multinomial, see https://stackoverflow.com/questions/55818845/fast-vectorized-multinomial-in-python Parameters ---------- n: 1-d array of non-negative integers denoting the 'number of experiments' p: 1-d array of probabilities of each possibility being drawn Returns ------ ndarray A 2-d array where the first dimension corresponds to the possible draws (length of p) and the second to the length of n. ''' n = np.array(n) p_array = np.array(p) count = n.copy() out = np.empty((len(p_array), len(n)), dtype=int) ps = p_array.cumsum(axis=-1) # Conditional probabilities with np.errstate(divide='ignore', invalid='ignore'): condp = p / ps condp[np.isnan(condp)] = 0.0 for i in range(p_array.shape[-1]-1, 0, -1): binsample = rng.binomial(count, condp[i]) out[i] = binsample count -= binsample out[0] = count return out