Source code for pyms.Gapfill.Function

Functions to fill missing peak objects.

#                                                                              #
#    PyMassSpec software for processing of mass-spectrometry data              #
#    Copyright (C) 2005-2012 Vladimir Likic                                    #
#    Copyright (C) 2019-2020 Dominic Davis-Foster                              #
#                                                                              #
#    This program is free software; you can redistribute it and/or modify      #
#    it under the terms of the GNU General Public License version 2 as         #
#    published by the Free Software Foundation.                                #
#                                                                              #
#    This program is distributed in the hope that it will be useful,           #
#    but WITHOUT ANY WARRANTY; without even the implied warranty of            #
#    GNU General Public License for more details.                              #
#                                                                              #
#    You should have received a copy of the GNU General Public License         #
#    along with this program; if not, write to the Free Software               #
#    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.                 #
#                                                                              #

# stdlib
import pathlib
from typing import List, Optional

# 3rd party
import pandas  # type: ignore[import]
from domdf_python_tools.typing import PathLike
from enum_tools import IntEnum

# this package
from pyms.BillerBiemann import get_maxima_list_reduced
from pyms.Gapfill.Class import MissingPeak, Sample
from pyms.IntensityMatrix import build_intensity_matrix_i
from pyms.Noise.SavitzkyGolay import savitzky_golay
from pyms.Peak.Function import ion_area
from pyms.TopHat import tophat
from pyms.Utils.IO import prepare_filepath
from pyms.Utils.Utils import is_path

__all__ = [

[docs]class MissingPeakFiletype(IntEnum): """ Flag to indicate the filetype for :func:`pyms.Gapfill.Function.missing_peak_finder`. .. versionadded:: 2.3.0 """ MZML = 1 NETCDF = 2
MZML = MissingPeakFiletype.MZML NETCDF = MissingPeakFiletype.NETCDF
[docs]def file2dataframe(file_name: PathLike) -> pandas.DataFrame: """ Convert a .csv file to a pandas DataFrame. :param file_name: CSV file to read. :authors: Jairus Bowne, Sean O'Callaghan, Dominic Davis-Foster (pathlib support) .. versionadded:: 2.3.0 """ if not is_path(file_name): raise TypeError("'file_name' must be a string or a PathLike object") file_name = prepare_filepath(file_name, mkdirs=False) return pandas.read_csv( file_name, delimiter=',', quotechar='"', header=0, )
[docs]def missing_peak_finder( sample: Sample, file_name: str, points: int = 3, null_ions: Optional[List] = None, crop_ions: Optional[List] = None, threshold: int = 1000, rt_window: float = 1, filetype: MissingPeakFiletype = MZML, ) -> None: r""" Integrates raw data around missing peak locations to fill ``NA``\s in the data matrix. :param sample: The sample object containing missing peaks :param file_name: Name of the raw data file :param points: Peak finding - Peak if maxima over 'points' number of scans. :param null_ions: Ions to be deleted in the matrix. :default null_ions: ``[73, 147]`` :param crop_ions: Range of Ions to be considered. :default crop_ions: ``[50, 540]`` :param threshold: Minimum intensity of IonChromatogram allowable to fill. :param rt_window: Window in seconds around average RT to look for. :param filetype: :author: Sean O'Callaghan """ if not null_ions: null_ions = [73, 147] if not crop_ions: crop_ions = [50, 540] # TODO: some error checks on null and crop ions # TODO: a for root,files,dirs in os.path.walk(): loop print("Sample:",, "File:", file_name) if filetype == NETCDF: # this package from pyms.GCMS.IO.ANDI import ANDI_reader data = ANDI_reader(file_name) elif filetype == MZML: # this package from pyms.GCMS.IO.MZML import mzML_reader data = mzML_reader(file_name) else: print("file type not valid") # build integer intensity matrix im = build_intensity_matrix_i(data) for null_ion in null_ions: im.null_mass(null_ion) im.crop_mass(crop_ions[0], crop_ions[1]) # get the size of the intensity matrix n_scan, n_mz = im.size # smooth data for ii in range(n_mz): ic = im.get_ic_at_index(ii) ic1 = savitzky_golay(ic, points) ic_smooth = savitzky_golay(ic1, points) ic_base = tophat(ic_smooth, struct="1.5m") im.set_ic_at_index(ii, ic_base) for mp in sample.missing_peaks: mp_rt = mp.rt common_ion = mp.common_ion qual_ion_1 = float(mp.qual_ion1) qual_ion_2 = float(mp.qual_ion2) ci_ion_chrom = im.get_ic_at_mass(common_ion) print("ci = ", common_ion) qi1_ion_chrom = im.get_ic_at_mass(qual_ion_1) print("qi1 = ", qual_ion_1) qi2_ion_chrom = im.get_ic_at_mass(qual_ion_2) print("qi2 = ", qual_ion_2) ###### # Integrate the CI around that particular RT ####### # Convert time to points # How long between scans? points_1 = ci_ion_chrom.get_index_at_time(float(mp_rt)) points_2 = ci_ion_chrom.get_index_at_time(float(mp_rt) - rt_window) print("rt_window = ", points_1 - points_2) rt_window_points = points_1 - points_2 maxima_list = get_maxima_list_reduced(ci_ion_chrom, mp_rt, rt_window_points) large_peaks = [] for rt, intens in maxima_list: if intens > threshold: q1_index = qi1_ion_chrom.get_index_at_time(rt) q2_index = qi2_ion_chrom.get_index_at_time(rt) q1_intensity = qi1_ion_chrom.get_intensity_at_index(q1_index) q2_intensity = qi2_ion_chrom.get_intensity_at_index(q2_index) if q1_intensity > threshold / 2 and q2_intensity > threshold / 2: large_peaks.append([rt, intens]) print(f"found {len(large_peaks):d} peaks above threshold") areas = [] for peak in large_peaks: apex = ci_ion_chrom.get_index_at_time(peak[0]) ia = ci_ion_chrom.intensity_array.tolist() area, left, right, l_share, r_share = ion_area(ia, apex, 0) areas.append(area) ######################## areas.sort() if len(areas) > 0: biggest_area = areas[-1] mp.common_ion_area = biggest_area # mp.exact_rt = f"{float(mp_rt) / 60.0:.3f}" mp.exact_rt = float(mp_rt) / 60.0 print("found area:", biggest_area, "at rt:", mp_rt) else: print("Missing peak at rt = ", mp_rt) mp.common_ion_area = None
[docs]def mp_finder(input_matrix: List) -> List[Sample]: r""" Finds the ``'NA'``\s in the transformed ``area_ci.csv`` file and makes :class:`pyms.Gapfill.Class.Sample` objects with them :param input_matrix: Data matrix derived from the ``area_ci.csv`` file. :rtype: :authors: Jairus Bowne, Sean O'Callaghan """ # noqa: D400 sample_list = [] try: ci_pos = input_matrix[0].index(' "Quant Ion"') print("found Quant Ion position:", ci_pos) except ValueError: ci_pos = input_matrix[0].index('"Quant Ion"') uid_pos = input_matrix[0].index("UID") # Set up the sample objects # All entries on line 1 beyond the Qual Ion position are sample names for i, sample_name in enumerate(input_matrix[0][ci_pos:]): print(sample_name) sample = Sample(sample_name, i + 3) # add 4 to allow for UID, RT,QualIon sample_list.append(sample) for line in input_matrix[1:]: uid = line[uid_pos] common_ion = line[ci_pos] qual_ion_1 = uid.split('-')[0] qual_ion_2 = uid.split('-')[1] rt = uid.split('-')[-1] for i, area in enumerate(line[ci_pos:]): if area == "NA": missing_peak = MissingPeak(common_ion, qual_ion_1, qual_ion_2, rt) sample_list[i].add_missing_peak(missing_peak) return sample_list
[docs]def write_filled_csv( sample_list: List[Sample], area_file: PathLike, filled_area_file: PathLike, ) -> None: r""" Creates a new ``area_ci.csv`` file, replacing NAs with values from the sample_list objects where possible. :param sample_list: :param area_file: The file ``'area_ci.csv'`` from PyMassSpec output. :param filled_area_file: the new output file which has ``'NA'``\s values replaced. :authors: Jairus Bowne, Sean O'Callaghan, Dominic Davis-Foster """ if not is_path(filled_area_file): raise TypeError("'filled_area_file' must be a string or a pathlib.Path object") filled_area_file = prepare_filepath(filled_area_file) df = file2dataframe(area_file) uid_list: List[str] = df["UID"] rt_list: List[float] = [] for uid in uid_list: rt = uid.split('-')[-1] rt_list.append(float(rt)) for sample_name in df.columns[3:]: for sample in sample_list: if sample_name in rt_area_dict = sample.rt_areas break else: raise ValueError(f"Sample {sample_name!r} not found in sample_list.") for i, part in enumerate(df[sample_name]): if part == "NA": try: df[sample_name][i] = rt_area_dict[rt_list[i]] except KeyError: pass df.to_csv(filled_area_file, index=False, na_rep="NA")
[docs]def write_filled_rt_csv( sample_list: List[Sample], rt_file: PathLike, filled_rt_file: PathLike, ) -> None: r""" Creates a new rt.csv file, replacing ``'NA'``\s with values from the sample_list objects where possible. :param sample_list: A list of samples. :param rt_file: the file ``rt.csv`` from PyMassSpec output. :param filled_rt_file: the new output file which has ``NA`` values replaced. :authors: Jairus Bowne, Sean O'Callaghan, Dominic Davis-Foster """ if not isinstance(filled_rt_file, (str, pathlib.Path)): raise TypeError("'filled_rt_file' must be a string or a pathlib.Path object") filled_rt_file = prepare_filepath(filled_rt_file) df = file2dataframe(rt_file) uid_list: List[str] = df["UID"] rt_list: List[str] = [] for uid in uid_list: rt = uid.split('-')[-1] rt_list.append(rt) for sample_name in df.columns[3:]: for sample in sample_list: if sample_name in rt_exact_rt_dict = sample.get_mp_rt_exact_rt_dict() break else: raise ValueError(f"Sample {sample_name!r} not found in sample_list.") for i, part in enumerate(df[sample_name]): if part == "NA": try: df[sample_name][i] = rt_exact_rt_dict[float(rt_list[i])] except KeyError: pass df.to_csv(filled_rt_file, index=False, na_rep="NA")