Source code for pyms.GCMS.Class

"""
Class to model GC-MS data.
"""

################################################################################
#                                                                              #
#    PyMassSpec software for processing of mass-spectrometry data              #
#    Copyright (C) 2005-2012 Vladimir Likic                                    #
#    Copyright (C) 2019-2020 Dominic Davis-Foster                              #
#                                                                              #
#    This program is free software; you can redistribute it and/or modify      #
#    it under the terms of the GNU General Public License version 2 as         #
#    published by the Free Software Foundation.                                #
#                                                                              #
#    This program is distributed in the hope that it will be useful,           #
#    but WITHOUT ANY WARRANTY; without even the implied warranty of            #
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the             #
#    GNU General Public License for more details.                              #
#                                                                              #
#    You should have received a copy of the GNU General Public License         #
#    along with this program; if not, write to the Free Software               #
#    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.                 #
#                                                                              #
################################################################################

# stdlib
import copy
import pathlib
from statistics import mean, median, stdev
from typing import List, Optional, Sequence, TypeVar, cast

# 3rd party
import numpy
from domdf_python_tools.doctools import prettify_docstrings
from domdf_python_tools.typing import PathLike

# this package
from pyms.Base import pymsBaseClass
from pyms.IonChromatogram import IonChromatogram
from pyms.Mixins import GetIndexTimeMixin, MaxMinMassMixin, TimeListMixin
from pyms.Spectrum import MassSpectrum, Scan
from pyms.Utils.IO import prepare_filepath
from pyms.Utils.Time import time_str_secs
from pyms.Utils.Utils import _number_types, is_path, is_sequence_of, signedinteger

__all__ = ["GCMS_data", "IntStr"]

MassSpectrum = MassSpectrum  # For legacy imports. Stops PyCharm complaining TODO: Remove eventually.

IntStr = TypeVar("IntStr", int, str)


[docs]@prettify_docstrings class GCMS_data(pymsBaseClass, TimeListMixin, MaxMinMassMixin, GetIndexTimeMixin): """ Generic object for GC-MS data. Contains the raw data as a list of scans and a list of times. :param time_list: Scan retention times. :param scan_list: :authors: Qiao Wang, Andrew Isaac, Vladimir Likic, Dominic Davis-Foster (type assertions and properties) """ def __init__(self, time_list: Sequence[float], scan_list: Sequence[Scan]): if not is_sequence_of(time_list, _number_types): raise TypeError("'time_list' must be a Sequence of numbers") if not is_sequence_of(scan_list, Scan): raise TypeError("'scan_list' must be a Sequence of Scan objects") self._time_list = list(time_list) self._scan_list = list(scan_list) self._set_time() self._set_min_max_mass() self._calc_tic()
[docs] def __eq__(self, other) -> bool: # noqa: MAN001 """ Return whether this GCMS_data object is equal to another object. :param other: The other object to test equality with. """ if isinstance(other, self.__class__): return self.scan_list == other.scan_list and self.time_list == other.time_list return NotImplemented
[docs] def __len__(self) -> int: """ Returns the length of the data object, defined as the number of scans. :author: Vladimir Likic """ return len(self._scan_list)
[docs] def __repr__(self) -> str: return f"<GCMS_data({self.min_rt} - {self.max_rt} seconds, time step {self.time_step}, {len(self)} scans)>"
[docs] def __str__(self) -> str: return self.__repr__()
def _calc_tic(self) -> None: """ Calculate the total ion chromatogram. :authors: Qiao Wang, Andrew Isaac, Vladimir Likic """ intensities = [] for scan in self._scan_list: intensities.append(sum(scan.intensity_list)) ia = numpy.array(intensities) rt = copy.deepcopy(self._time_list) tic = IonChromatogram(ia, rt) self._tic = tic def _set_time(self) -> None: """ Sets time-related properties of the data. :author: Vladimir Likic """ # calculate the time step, its spread, and along the way # check that retention times are increasing time_diff_list = [] for index, t1 in enumerate(self._time_list): if index == len(self._time_list) - 1: break t2 = self._time_list[index + 1] if not t2 > t1: raise ValueError("Retention times are not in ascending order!") time_diff = t2 - t1 time_diff_list.append(time_diff) time_step = mean(time_diff_list) time_step_std = stdev(time_diff_list) self._time_step = time_step self._time_step_std = time_step_std self._min_rt = min(self._time_list) self._max_rt = max(self._time_list) def _set_min_max_mass(self) -> None: """ Sets the min and max mass value. :authors: Qiao Wang, Andrew Isaac, Vladimir Likic """ min_mass = None max_mass = None for scan in self._scan_list: if min_mass is None: min_mass = scan.min_mass elif scan.min_mass is not None: min_mass = min(scan.min_mass, min_mass) if max_mass is None: max_mass = scan.max_mass elif scan.max_mass is not None: max_mass = max(scan.max_mass, max_mass) self._min_mass = min_mass self._max_mass = max_mass
[docs] def info(self, print_scan_n: bool = False) -> None: """ Prints some information about the data. :param print_scan_n: If set to :py:obj:`True` will print the number of *m/z* values in each scan. :author: Vladimir Likic """ # print the summary of simply attributes print(f" Data retention time range: {self._min_rt / 60.0:.3f} min -- {self._max_rt / 60:.3f} min") print(f" Time step: {self._time_step:.3f} s (std={self._time_step_std:.3f} s)") print(f" Number of scans: {len(self._scan_list):d}") print(f" Minimum m/z measured: {self._min_mass:.3f}") print(f" Maximum m/z measured: {self._max_mass:.3f}") # calculate median number of m/z values measured per scan n_list = [] for ii in range(len(self._scan_list)): scan = self._scan_list[ii] n = len(scan) n_list.append(n) if print_scan_n: print(n) mz_mean = mean(n_list) mz_median = median(n_list) print(f" Mean number of m/z values per scan: {mz_mean:.0f}") print(f" Median number of m/z values per scan: {mz_median:.0f}")
@property def scan_list(self) -> List[Scan]: """ Return a list of the scan objects. :authors: Qiao Wang, Andrew Isaac, Vladimir Likic """ return copy.deepcopy(self._scan_list) @property def time_list(self) -> List[float]: """ Return a copy of the time list. """ return self._time_list[:] @property def tic(self) -> IonChromatogram: """ Returns the total ion chromatogram. :author: Andrew Isaac """ return self._tic @property def min_rt(self) -> float: """ Returns the minimum retention time for the data in seconds. """ return self._min_rt @property def max_rt(self) -> float: """ Returns the maximum retention time for the data in seconds. """ return self._max_rt @property def time_step(self) -> float: """ Returns the time step of the data. """ return self._time_step @property def time_step_std(self) -> float: """ Returns the standard deviation of the time step of the data. """ return self._time_step_std
[docs] def trim( self, begin: Optional[IntStr] = None, end: Optional[IntStr] = None, ) -> None: """ Trims data in the time domain. The arguments ``begin`` and ``end`` can be either integers (in which case they are taken as the first/last scan number for trimming) or strings in which case they are treated as time strings and converted to scan numbers. At present both ``begin`` and ``end`` must be of the same type, either both scan numbers or time strings. At least one of ``begin`` and ``end`` is required. :param begin: The start time or scan number :param end: The end time or scan number :author: Vladimir Likic """ # trim called with defaults, or silly arguments if begin is None and end is None: raise SyntaxError("At least one of 'begin' and 'end' is required") N = len(self._scan_list) # process 'begin' and 'end' if begin is None: first_scan = 0 elif isinstance(begin, (int, signedinteger)): first_scan = cast(int, begin) - 1 elif isinstance(begin, str): time = time_str_secs(begin) scan_ = self.get_index_at_time(time) if scan_ is None: raise TypeError("invalid 'begin' argument") first_scan = scan_ + 1 else: raise TypeError("invalid 'begin' argument") if end is None: last_scan = N - 1 elif isinstance(end, (int, signedinteger)): last_scan = cast(int, end) elif isinstance(end, str): time = time_str_secs(end) scan_ = self.get_index_at_time(time) if scan_ is None: raise TypeError("invalid 'end' argument") last_scan = scan_ + 1 else: raise TypeError("invalid 'end' argument") # sanity checks if not last_scan > first_scan: raise ValueError(f"last scan={last_scan:d}, first scan={first_scan:d}") elif first_scan < 0: raise ValueError("scan number must be greater than one") elif last_scan > N - 1: raise ValueError(f"last scan={last_scan:d}, total number of scans={N:d}") print(f"Trimming data to between {first_scan + 1:d} and {last_scan + 1:d} scans") scan_list_new = [] time_list_new = [] for ii in range(len(self._scan_list)): if first_scan <= ii <= last_scan: scan = self._scan_list[ii] time = self._time_list[ii] scan_list_new.append(scan) time_list_new.append(time) # update info self._scan_list = scan_list_new self._time_list = time_list_new self._set_time() self._set_min_max_mass() self._calc_tic()
[docs] def write(self, file_root: PathLike) -> None: """ Writes the entire raw data to two CSV files: - :file:`{<file_root>}.I.csv`, containing the intensities; and - :file:`{<file_root>}.mz.csv`, containing the corresponding *m/z* values. In general these are not two-dimensional matrices, because different scans may have different numbers of *m/z* values recorded. :param file_root: The root for the output file names :authors: Vladimir Likic, Dominic Davis-Foster (pathlib support) """ # noqa: D400 if not isinstance(file_root, (str, pathlib.Path)): raise TypeError("'file_root' must be a string or a pathlib.Path object") file_root = prepare_filepath(file_root) file_name1 = str(file_root) + ".I.csv" file_name2 = str(file_root) + ".mz.csv" print(f" -> Writing intensities to '{file_name1}'") print(f" -> Writing m/z values to '{file_name2}'") with open(file_name1, 'w', encoding="UTF-8") as fp1, open(file_name2, 'w', encoding="UTF-8") as fp2: for scan in self._scan_list: for index, intensity in enumerate(scan.intensity_list): if index == 0: fp1.write(f"{intensity:.4f}") else: fp1.write(f",{intensity:.4f}") fp1.write('\n') for index, mass in enumerate(scan.mass_list): if index == 0: fp2.write(f"{mass:.4f}") else: fp2.write(f",{mass:.4f}") fp2.write('\n')
[docs] def write_intensities_stream(self, file_name: PathLike) -> None: """ Loop over all scans and, for each scan, write the intensities to the given file, one intensity per line. Intensities from different scans are joined without any delimiters. :param file_name: Output file name. :authors: Vladimir Likic, Dominic Davis-Foster (pathlib support) """ # noqa: D400 if not is_path(file_name): raise TypeError("'file_name' must be a string or a PathLike object") file_name = prepare_filepath(file_name) # n = len(self._scan_list) print(" -> Writing scans to a file") with file_name.open('w', encoding="UTF-8") as fp: for scan in self._scan_list: intensities = scan.intensity_list for i in intensities: fp.write(f"{i:8.4f}\n")