"""
Classes for peak alignment by dynamic programming.
"""
################################################################################
# #
# PyMassSpec software for processing of mass-spectrometry data #
# Copyright (C) 2005-2012 Vladimir Likic #
# Copyright (C) 2019-2020 Dominic Davis-Foster #
# #
# This program is free software; you can redistribute it and/or modify #
# it under the terms of the GNU General Public License version 2 as #
# published by the Free Software Foundation. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program; if not, write to the Free Software #
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. #
# #
################################################################################
# stdlib
import copy
import math
import operator
import pathlib
from typing import Dict, List, Optional, Sequence
# 3rd party
import numpy
import pandas # type: ignore[import]
from domdf_python_tools.typing import PathLike
from pandas import DataFrame
# this package
from pyms.Experiment import Experiment
from pyms.Peak import Peak
from pyms.Peak.List.Function import composite_peak
from pyms.Utils.IO import prepare_filepath
from pyms.Utils.Utils import _number_types, is_path, is_sequence, is_sequence_of
__all__ = ["Alignment", "exprl2alignment"]
# Ensure that the intersphinx links are correct.
DataFrame.__module__ = "pandas"
[docs]class Alignment:
"""
Models an alignment of peak lists.
:param expr: The experiment to be converted into an alignment object.
:authors: Woon Wai Keen, Qiao Wang, Vladimir Likic, Dominic Davis-Foster.
"""
#:
peakpos: List[List[Peak]]
#: List of experiment codes.
expr_code: List[str]
#:
peakalgt: List[List[Peak]]
#:
similarity: Optional[float]
def __init__(self, expr: Optional[Experiment]):
if expr is None:
self.peakpos = []
self.peakalgt = []
self.expr_code = []
self.similarity = None
elif not isinstance(expr, Experiment):
raise TypeError("'expr' must be an 'Experiment' object")
else:
# for peak in expr.peak_list:
# if peak.area() == None or peak.area() <= 0:
# error("All peaks must have an area for alignment")
self.peakpos = [copy.deepcopy(expr.peak_list)]
self.peakalgt = numpy.transpose(self.peakpos).tolist() # type: ignore[arg-type]
self.expr_code = [expr.expr_code]
self.similarity = None
[docs] def __len__(self) -> int:
"""
Returns the length of the alignment, defined as the number of
peak positions in the alignment.
:rtype:
:authors: Qiao Wang, Vladimir Likic
""" # noqa: D400
return len(self.peakalgt)
[docs] def aligned_peaks(self, minutes: bool = False) -> Sequence[Optional[Peak]]:
"""
Returns a list of Peak objects where each peak has the combined spectra
and average retention time of all peaks that aligned.
:param minutes: Whether retention times are in minutes.
If :py:obj:`False`, retention time are in seconds.
:return: A list of composite peaks based on the alignment.
:author: Andrew Isaac
.. TODO:: minutes currently does nothing
""" # noqa: D400
# for all peaks found
peak_list = []
for peak_idx in range(len(self.peakpos[0])):
# get aligned peaks, ignore missing
new_peak_list = []
for align_idx in range(len(self.peakpos)):
peak = self.peakpos[align_idx][peak_idx]
if peak is not None:
new_peak_list.append(peak)
# create composite
new_peak = composite_peak(new_peak_list)
peak_list.append(new_peak)
return peak_list
[docs] def common_ion(self) -> List[float]:
"""
Calculates a common ion among the peaks of an aligned peak.
:return: A list of the highest intensity common ion for all aligned peaks.
:author: Sean O'Callaghan
"""
# print("#peak lists =", len(self.peakpos))
# print("#peaks =", len(self.peakpos[0]))
# a list to contain the dictionaries
# each dictionary contains the
# top ions and their frequency for each peak
# in the alignment
list_of_top_ion_dicts: List = []
empty_count_list: List[int] = []
for peak_list in self.peakpos:
# (re)initialise the peak index
index = 0
for peak in peak_list:
# if the dict has not been created, create it
# will only run on first peak list
if len(list_of_top_ion_dicts) < len(peak_list):
list_of_top_ion_dicts.append({})
# copy the list entry to a local dict for code clarity
top_ion_dict = list_of_top_ion_dicts[index]
# count the empty peaks
if (len(empty_count_list)) < len(peak_list):
empty_count = 0
else:
empty_count = empty_count_list[index]
# make sure the peak is present
if peak is not None:
# get the top 5 ions
top_5 = peak.ion_areas.keys()
for ion in top_5:
if ion in top_ion_dict:
# if we have seen it, increment the count
top_ion_dict[ion] += 1
else:
# if we haven't seen it before, add it
top_ion_dict[ion] = 1
empty_count += 1
# copy the dict back to the list
list_of_top_ion_dicts[index] = top_ion_dict
if len(empty_count_list) < len(peak_list):
empty_count_list.append(empty_count)
else:
empty_count_list[index] = empty_count
# increment for the next peak
index += 1
# print("length of list of dicts", len(list_of_top_ion_dicts))
# index = 0
# for entry in list_of_top_ion_dicts:
# for ion in sorted(entry, key=entry.get, reverse=True)[0:3]:
# print(ion,":", entry[ion],end='')
# print(' empty:', empty_count_list[index])
# index += 1
top_ion_list = []
for entry in list_of_top_ion_dicts:
# This was in the Repo and commented in easyGC
# top_ion_list.append(self.get_highest_mz_ion(entry))
# This was in the version being used by GSMatch and easyGC
top_ion_list.append(max(entry, key=entry.get))
# TODO: Change over to the new version
return top_ion_list
[docs] def filter_min_peaks(self, min_peaks: int) -> None:
"""
Filters alignment positions that have less peaks than ``min_peaks``.
This function is useful only for within state alignment.
:param min_peaks: Minimum number of peaks required for the alignment
position to survive filtering.
:author: Qiao Wang
"""
if not isinstance(min_peaks, int):
raise TypeError("'min_peaks' must be an integer")
filtered_list = []
for pos in range(len(self.peakalgt)):
# if len(filter(None,self.peakalgt[pos])) >= min_peaks:
if len([x for x in self.peakalgt[pos] if x is not None]) >= min_peaks:
filtered_list.append(self.peakalgt[pos])
self.peakalgt = filtered_list
self.peakpos = numpy.transpose(self.peakalgt).tolist() # type: ignore[arg-type]
[docs] @staticmethod
def get_highest_mz_ion(ion_dict: Dict[float, int]) -> float:
"""
Returns the preferred ion for quantitiation.
Looks at the list of candidate ions, selects those which have
highest occurrence, and selects the heaviest of those.
:param ion_dict: a dictionary of *m/z* value: number of occurrences.
:return ion: The ion with the highest *m/z* value.
"""
max_occurrences = max(ion_dict.values())
most_freq_mzs = []
for key, value in ion_dict.items():
if value == max_occurrences:
most_freq_mzs.append(key)
return max(most_freq_mzs)
[docs] def write_csv(
self,
rt_file_name: PathLike,
area_file_name: PathLike,
minutes: bool = True,
) -> None:
"""
Writes the alignment to CSV files.
This function writes two files: one containing the alignment of peak
retention times and the other containing the alignment of peak areas.
:param rt_file_name: The name for the retention time alignment file.
:param area_file_name: The name for the areas alignment file.
:param minutes: Whether to save retention times in minutes.
If :py:obj:`False`, retention time will be saved in seconds.
:authors: Woon Wai Keen, Andrew Isaac, Vladimir Likic, David Kainer,
Dominic Davis-Foster (pathlib support)
"""
if not isinstance(rt_file_name, (str, pathlib.Path)):
raise TypeError("'rt_file_name' must be a string or a pathlib.Path object")
if not isinstance(area_file_name, (str, pathlib.Path)):
raise TypeError("'area_file_name' must be a string or a pathlib.Path object")
rt_file_name = prepare_filepath(rt_file_name)
area_file_name = prepare_filepath(area_file_name)
with rt_file_name.open('w', encoding="UTF-8") as fp1, area_file_name.open('w', encoding="UTF-8") as fp2:
# create header
header = ["UID", "RTavg"]
for item in self.expr_code:
header.append(f'"{item}"')
# write headers
fp1.write(','.join(header) + '\n')
fp2.write(','.join(header) + '\n')
# for each alignment position write alignment's peak and area
for peak_idx in range(len(self.peakpos[0])): # loop through peak lists (rows)
rts = []
areas = []
new_peak_list = []
for align_idx in range(len(self.peakpos)):
peak = self.peakpos[align_idx][peak_idx]
if peak is not None:
if minutes:
rt = peak.rt / 60.0
else:
rt = peak.rt
rts.append(rt)
areas.append(peak.area)
new_peak_list.append(peak)
else:
rts.append(None)
areas.append(None)
compo_peak = composite_peak(new_peak_list)
if compo_peak is None:
continue
# write to retention times file
fp1.write(compo_peak.UID)
if minutes:
fp1.write(f",{float(compo_peak.rt / 60):.3f}")
else:
fp1.write(f",{compo_peak.rt:.3f}")
for rt in rts:
if rt is None or numpy.isnan(rt):
fp1.write(",NA")
else:
fp1.write(f",{rt:.3f}")
fp1.write('\n')
# write to peak areas file
fp2.write(compo_peak.UID)
if minutes:
fp2.write(f",{float(compo_peak.rt / 60):.3f}")
else:
fp2.write(f",{compo_peak.rt:.3f}")
for area in areas:
if area is None:
fp2.write(",NA")
else:
fp2.write(f",{area:.0f}")
fp2.write('\n')
[docs] def write_common_ion_csv(
self,
area_file_name: PathLike,
top_ion_list: Sequence[float],
minutes: bool = True,
) -> None:
"""
Writes the alignment to CSV files.
This function writes two files: one containing the alignment of peak
retention times and the other containing the alignment of peak areas.
:param area_file_name: The name for the areas alignment file.
:param top_ion_list: A list of the highest intensity common ion along the aligned peaks.
:param minutes: Whether to save retention times in minutes.
If :py:obj:`False`, retention time will be saved in seconds.
:authors: Woon Wai Keen, Andrew Isaac, Sean O'Callaghan, Vladimir Likic,
Dominic Davis-Foster (pathlib support)
.. TODO:: minutes currently does nothing
"""
if not is_path(area_file_name):
raise TypeError("'area_file_name' must be a string or a PathLike object")
if not is_sequence_of(top_ion_list, _number_types):
raise TypeError(f"'top_ion_list' must be a Sequence of numbers")
area_file_name = prepare_filepath(area_file_name)
with area_file_name.open('w') as fp:
# create header
header = ['"UID"', '"RTavg"', '"Quant Ion"']
for item in self.expr_code:
header.append(f'"{item}"')
# write headers
fp.write(','.join(header) + '\n')
rtsums: List[float] = []
rtcounts = []
# The following two arrays will become list of lists
# such that:
# areas = [ [align1_peak1, align2_peak1, .....,alignn_peak1]
# [align1_peak2, ................................]
# .............................................
# [align1_peakm,....................,alignn_peakm] ]
areas: List[List] = []
new_peak_lists: List[List[Peak]] = []
for peak_list in self.peakpos:
index = 0
for peak in peak_list:
# one the first iteration, populate the lists
if len(areas) < len(peak_list):
areas.append([])
new_peak_lists.append([])
rtsums.append(0)
rtcounts.append(0)
if peak is not None:
rt = peak.rt
# get the area of the common ion for the peak
# an area of 'na' shows that while the peak was
# aligned, the common ion was not present
area = peak.get_ion_area(top_ion_list[index])
areas[index].append(area)
new_peak_lists[index].append(peak)
# The following code to the else statement is
# just for calculating the average rt
rtsums[index] += rt
rtcounts[index] += 1
else:
areas[index].append(None)
index += 1
out_strings = []
index = 0
# now write the strings for the file
for area_list in areas:
# write initial info:
# peak unique id, peak average rt
compo_peak = composite_peak(new_peak_lists[index])
if compo_peak is None:
continue
peak_UID = compo_peak.UID
peak_UID_string = f'"{peak_UID}"'
rt_avg = rtsums[index] / rtcounts[index]
out_strings.append(f"{peak_UID_string},{rt_avg / 60:.3f},{top_ion_list[index]:f}")
for area in area_list:
if area is not None:
out_strings[index] += f",{area:.4f}"
else:
out_strings[index] += ",NA"
index += 1
# now write the file
# print("length of areas[0]", len(areas[0]))
# print("length of areas", len(areas))
# print("length of out_strings", len(out_strings))
for row in out_strings:
fp.write(row + '\n')
[docs] def write_ion_areas_csv(self, ms_file_name: PathLike, minutes: bool = True) -> None:
"""
Write Ion Areas to CSV File.
:param ms_file_name: The name of the file
:param minutes: Whether to save retention times in minutes.
If :py:obj:`False`, retention time will be saved in seconds.
:authors: David Kainer, Dominic Davis-Foster (pathlib support)
"""
if not is_path(ms_file_name):
raise TypeError("'ms_file_name' must be a string or a PathLike object")
ms_file_name = prepare_filepath(ms_file_name)
with ms_file_name.open('w') as fp1:
# create header
header = ['"UID"', '"RTavg"']
for item in self.expr_code:
header.append(f'"{item}"')
# write headers
fp1.write('|'.join(header) + '\n')
for peak_idx in range(len(self.peakpos[0])):
ias = []
new_peak_list = []
for align_idx in range(len(self.peakpos)):
peak = self.peakpos[align_idx][peak_idx]
if peak is not None:
ia = peak.ion_areas
ia.update((mass, math.floor(intensity)) for mass, intensity in ia.items())
sorted_ia = sorted(ia.items(), key=operator.itemgetter(1), reverse=True)
ias.append(sorted_ia)
new_peak_list.append(peak)
compo_peak = composite_peak(new_peak_list)
if compo_peak is None:
continue
# write to ms file
fp1.write(compo_peak.UID)
if minutes:
fp1.write(f"|{compo_peak.rt/60:.3f}")
else:
fp1.write(f"|{compo_peak.rt:.3f}")
for intensity_array in ias:
if intensity_array is None:
fp1.write("|NA")
else:
fp1.write(f"|{intensity_array}")
fp1.write('\n')
[docs] def get_peak_alignment(
self,
minutes: bool = True,
require_all_expr: bool = True,
) -> DataFrame:
"""
Returns a Pandas dataframe of aligned retention times.
:param minutes: Whether to return retention times in minutes.
If :py:obj:`False`, retention time will be returned in seconds.
:param require_all_expr: Whether the peak must be present in
all experiments to be included in the data frame.
:authors: Woon Wai Keen, Andrew Isaac, Vladimir Likic, Dominic Davis-Foster
"""
rt_table = []
# for each alignment position write alignment's RT
for peak_idx in range(len(self.peakpos[0])):
rts = []
countrt = 0
for align_idx in range(len(self.peakpos)):
peak = self.peakpos[align_idx][peak_idx]
if peak is not None:
if minutes:
rt = peak.rt / 60.0
else:
rt = peak.rt
rts.append(rt)
countrt = countrt + 1
else:
rts.append(None)
if (require_all_expr and countrt == len(self.expr_code)) or not require_all_expr:
rt_table.append(rts)
rt_alignment = pandas.DataFrame(rt_table, columns=self.expr_code)
rt_alignment = rt_alignment.reindex(sorted(rt_alignment.columns), axis=1)
return rt_alignment
[docs] def get_ms_alignment(self, require_all_expr: bool = True) -> DataFrame:
"""
Returns a Pandas dataframe of mass spectra for the aligned peaks.
:param require_all_expr: Whether the peak must be present in
all experiments to be included in the data frame.
:authors: Woon Wai Keen, Andrew Isaac, Vladimir Likic, Dominic Davis-Foster
"""
ms_table = []
# for each alignment position write alignment's ms
for peak_idx in range(len(self.peakpos[0])):
specs = []
countms = 0
for align_idx in range(len(self.peakpos)):
peak = self.peakpos[align_idx][peak_idx]
if peak is not None:
ms = peak.mass_spectrum
specs.append(ms)
countms = countms + 1
else:
specs.append(None)
if (require_all_expr and countms == len(self.expr_code)) or not require_all_expr:
ms_table.append(specs)
ms_alignment = pandas.DataFrame(ms_table, columns=self.expr_code)
ms_alignment = ms_alignment.reindex(sorted(ms_alignment.columns), axis=1)
return ms_alignment
[docs] def get_peaks_alignment(self, require_all_expr: bool = True) -> DataFrame:
"""
Returns a Pandas dataframe of Peak objects for the aligned peaks.
:param require_all_expr: Whether the peak must be present in
all experiments to be included in the data frame.
:authors: Woon Wai Keen, Andrew Isaac, Vladimir Likic, Dominic Davis-Foster
"""
peaks_table = []
# for each alignment position write alignment's ms
for peak_idx in range(len(self.peakpos[0])):
peaks = []
count_peaks = 0
for align_idx in range(len(self.peakpos)):
peak = self.peakpos[align_idx][peak_idx]
if peak is not None:
peaks.append(peak)
count_peaks = count_peaks + 1
else:
peaks.append(None)
if (require_all_expr and count_peaks == len(self.expr_code)) or not require_all_expr:
peaks_table.append(peaks)
peak_alignment = pandas.DataFrame(peaks_table, columns=self.expr_code)
peak_alignment = peak_alignment.reindex(sorted(peak_alignment.columns), axis=1)
return peak_alignment
[docs] def get_area_alignment(self, require_all_expr: bool = True) -> DataFrame:
"""
Returns a Pandas dataframe containing the peak areas of the aligned peaks.
:param require_all_expr: Whether the peak must be present in all experiments
to be included in the data frame.
:authors: Woon Wai Keen, Andrew Isaac, Vladimir Likic, Dominic Davis-Foster
"""
areas_table = []
# for each alignment position write alignment's ms
for peak_idx in range(len(self.peakpos[0])):
areas = []
count_areas = 0
for align_idx in range(len(self.peakpos)):
peak = self.peakpos[align_idx][peak_idx]
if peak is not None:
area = peak.area
areas.append(area)
count_areas = count_areas + 1
else:
areas.append(None)
if (require_all_expr and count_areas == len(self.expr_code)) or not require_all_expr:
areas_table.append(areas)
area_alignment = pandas.DataFrame(areas_table, columns=self.expr_code)
area_alignment = area_alignment.reindex(sorted(area_alignment.columns), axis=1)
return area_alignment
[docs]def exprl2alignment(expr_list: List[Experiment]) -> List[Alignment]:
"""
Converts a list of experiments into a list of alignments.
:param expr_list: The list of experiments to be converted into an alignment objects.
:return: A list of alignment objects for the experiments.
:author: Vladimir Likic
"""
if not is_sequence(expr_list):
raise TypeError("'expr_list' must be a Sequence")
alignments = []
for item in expr_list:
if not isinstance(item, Experiment):
raise TypeError("list items must be 'Experiment' instances")
alignments.append(Alignment(item))
return alignments