import argparse
import importlib
import pathlib
import time
import re
import os
import sys
import resource
import json
import math
import pickle
from typing import Optional
import numpy as np
from beers_utils.sample import Sample
from beers_utils.constants import CONSTANTS
from beers_utils.molecule import Molecule
from beers_utils.molecule_packet import MoleculePacket
from beers_utils.general_utils import GeneralUtils
from camparee.molecule_maker import MoleculeMakerStep
from beers.logger import Logger
[docs]class LibraryPrepPipeline():
"""
The class runs all the steps in the library_prep pipeline as described and wired together in the configuration
file. The point of entry into this class is the static method main().
"""
stage_name = "library_prep_pipeline"
package = "beers.library_prep"
def __init__(self):
pass
[docs] @staticmethod
def validate(configuration: dict, global_config: dict):
"""
Static method to run each step validate process to identify errant parameters. If any errors are found,
a BeersLibraryPrepValidationException is raised. Prints error messages to stderr.
Parameters
----------
configuration:
json-like object containing the SequencePipeline-specific configuration
global_config:
json-like object containing the full config of the BEERS run
"""
#if not all([molecule.validate() for molecule in self.molecule_packet.molecules]):
# raise BeersLibraryPrepValidationException("Validation error in molecule packet: see stderr for details.")
# Gather the step classes
validation_okay = True
for step in configuration['steps']:
module_name, step_name = step["step_name"].rsplit(".")
parameters = step["parameters"]
module = importlib.import_module(f'.{module_name}', package=LibraryPrepPipeline.package)
step_class = getattr(module, step_name)
# Valdiate steps's parameters and print out their errors (if any)
errors = step_class.validate(parameters, global_config)
if len(errors) > 0:
validation_okay = False
print(f"Validation errors in {step['step_name']}:", file=sys.stderr)
for error in errors:
print('\t' + error, file=sys.stderr)
# Check that the input samples match those in global config
input_samples = set(configuration['input']['from_distribution_data'].keys())
global_samples = set(global_config['samples'].keys())
missing_samples = input_samples.difference(global_samples)
if len(missing_samples) > 0:
print(f"Every sample in library_prep_input.from_distribution_data must also be specified in global.samples but missing {missing_samples}", file=sys.stderr)
if not validation_okay:
raise BeersLibraryPrepValidationException("Validation error: see stderr for details.")
[docs] def execute(self,
configuration: dict,
global_config: dict,
packet_file: str,
output_quant_file: str,
input_quant_file: str,
log_paths: str,
input_molecule_packet: MoleculePacket,
rng: np.random.Generator,
full_logs: bool,
):
"""
Performs the Sequence Pipeline on a MoleculePacket.
Parameters
----------
configuration:
json-like object of the configuration data relevant to this pipeline stage.
global_config:
json-like object of the full configuration data
packet_file:
path to write out the packet file to
output_quant_file:
path to write out the quantified counts of the lib prep output
input_quant_file:
path to write out the quantified counts file to
log_paths:
list of paths to output the pipeline step logs to, in the same
order as the steps appear in the config file
input_molecule_packet:
the molecule packet to run through this pipeline stage
rng:
random number generator to use
full_logs:
whether to output molecule logs at intermediate steps
"""
original_ids = set(str(m.molecule_id) for m in input_molecule_packet.molecules)
self.print_summary(original_ids, input_molecule_packet.molecules)
packet_num = input_molecule_packet.molecule_packet_id
# Initialize steps prior to executing them below.
pipeline_steps = []
step_names = []
for step in configuration['steps']:
module_name, step_name = step["step_name"].rsplit(".")
parameters = step["parameters"]
module = importlib.import_module(f'.{module_name}', package=LibraryPrepPipeline.package)
step_class = getattr(module, step_name)
pipeline_steps.append(step_class(parameters, global_config))
step_names.append(step_name)
pipeline_start = time.time()
molecule_packet = input_molecule_packet
molecule_packet.write_quantification_file(input_quant_file)
for step, step_name, step_logpath in zip(pipeline_steps, step_names, log_paths):
print(f"Opening {step_logpath}")
step_start = time.time()
with Logger(step_logpath, compression = None, full_logs = full_logs) as log:
molecule_packet = step.execute(molecule_packet, rng, log)
elapsed_time = time.time() - step_start
self.print_summary(original_ids, molecule_packet.molecules, elapsed_time)
pipeline_elapsed_time = time.time() - pipeline_start
print(f"Finished {LibraryPrepPipeline.stage_name} in {pipeline_elapsed_time:.1f} seconds")
# Write final sample to a gzip file for inspection
molecule_packet.serialize(packet_file)
print(f"Output final sample to {packet_file}")
molecule_packet.write_quantification_file(output_quant_file)
print(f"Output final sample quantification to {output_quant_file}")
[docs] def print_summary(self, original_ids: set[str], sample: list[Molecule], elapsed_time:Optional[float]=None):
"""Output a summary of the sample (number of molecules, time taken, etc.).
original_ids:
Molecule IDs from input molecule packet
sample:
list of molecules in the sample
elapsed_time:
elapsed time to display, if not None
"""
if elapsed_time is not None:
print(f"Step took {elapsed_time:.3} seconds")
print(f"Sample has {len(sample)} molecules")
parent_ids = set(str(m.molecule_id).split(".")[0] for m in sample)
#TODO: this does not seem to be calculated correctly: always gives 0
percent_original_represented = len(original_ids.intersection(parent_ids))/len(original_ids)
print(f"Percent of the original ids that are still represented: {percent_original_represented:0.2%}")
size_bin_cutoffs = [0, 100, 500, 1000, 1_000_000_000]
size_counts, _ = np.histogram([len(mol) for mol in sample], bins=size_bin_cutoffs)
print(f"Counts of molecules in size ranges:")
for i in range(len(size_counts)-1):
print(f" <={size_bin_cutoffs[i+1]}: {size_counts[i]}")
print(f">{size_bin_cutoffs[-2]}: {size_counts[-1]}")
[docs] @staticmethod
def main(
seed: str,
configuration: str,
global_configuration: str,
packet_file: str,
output_quant_file: str,
input_quant_file: str,
log_paths: str,
molecule_packet_filename: Optional[str],
packet_id: str,
sample_id: str,
distribution_directory: Optional[str] = None,
molecules_per_packet_from_distribution: int = 10000,
full_logs: bool = False,
):
"""
This method would be called by a command line script in the bin directory. It sets a random seed, loads a
directory containing the relevant parts of the user's configuration file, unmarshalls a molecule packet from
the provided molecule packet filename, initializes and validates the library prep pipeline stage and then
executes it for the molecule packet.
Parameters
----------
seed:
value to use as the seed for the random number generator
configuration:
the json string containing the configration data specific to the library prep pipeline
global_configuration:
json string containing the configuration data for the whole run
packet_file:
path to write out the packet file to
output_quant_file:
path to write out the quantified counts of the lib prep output
input_quant_file:
path to write out the quantified counts file to
log_paths:
list of paths to output the pipeline step logs to, in the same
order as the steps appear in the config file
molecule_packet_filename:
the file from which to unmarshall the molecule packet
packet_id:
id number to assign the packet
sample_id:
id number of the sample
distribution_directory:
directory of CAMPAREE output distribution datas from which to generate molecules
(default None, must supply molecule_packet_filename instead)
molecules_per_packet_from_distribution:
packet size to generate if using distributions
full_logs:
whether to output all logs for intermediate steps
If True, output may be very large
"""
config = json.loads(configuration)
global_config = json.loads(global_configuration)
molecule_maker_parameters = global_config['molecule_maker_parameters']
packet_id_num = int(packet_id)
# Seed the RNG by the given (global) seed, plus the sample ID, the packet ID, and a constant for the library prep stage of 1
# This ensures that each packet is repeatable but that each gets their own seed state and that the seed state differs
# from each of the stages it goes through.
seed_list = [seed, sample_id, packet_id_num, 1]
print(f"Initializing with seed {seed_list}")
rng = np.random.default_rng(seed_list) # type: ignore
if molecule_packet_filename:
if molecule_packet_filename.endswith(".txt"):
molecule_packet = MoleculePacket.from_CAMPAREE_molecule_file(molecule_packet_filename, packet_id_num)
elif molecule_packet_filename.endswith(".pickle"):
with open(molecule_packet_filename, "rb") as pickle_file:
molecule_packet = pickle.load(pickle_file)
# Make sure we use BEERS's packet ids so that they all are distinct
molecule_packet.molecule_packet_id = packet_id_num
elif distribution_directory:
distribution_dir = pathlib.Path(distribution_directory)
sample = Sample(
sample_id = sample_id,
sample_name = sample_id,
adapter_sequences = [],
fastq_file_paths = [],
pooled = False,
)
# TODO: move MoleculeMaker to accept its log filepath rather than taking a directory
# that contains all the MoleculeMaker log files
mm_log_dir = pathlib.Path(log_paths[0]).parent.parent
molecule_maker = MoleculeMakerStep(
log_directory_path = mm_log_dir,
parameters = molecule_maker_parameters,
)
molecule_packets = list(molecule_maker.execute(
sample = sample,
sample_data_directory = distribution_dir,
output_type = "generator",
output_molecule_count = molecules_per_packet_from_distribution, # we only generate one packet
seed = seed,
molecules_per_packet = molecules_per_packet_from_distribution,
rng = rng,
))
molecule_packet = molecule_packets[0]
molecule_packet.molecule_packet_id = packet_id
else:
raise BeersLibraryPrepValidationException("Neither molecule packet filename nor distribution directory provided")
library_prep_pipeline = LibraryPrepPipeline()
library_prep_pipeline.execute(
config,
global_config,
packet_file,
output_quant_file,
input_quant_file,
log_paths,
molecule_packet,
rng,
full_logs
)
[docs]class BeersLibraryPrepValidationException(Exception):
pass