"""
Functions to perform Biller and Biemann deconvolution.
"""
################################################################################
# #
# PyMassSpec software for processing of mass-spectrometry data #
# Copyright (C) 2005-2012 Vladimir Likic #
# Copyright (C) 2019-2020 Dominic Davis-Foster #
# #
# This program is free software; you can redistribute it and/or modify #
# it under the terms of the GNU General Public License version 2 as #
# published by the Free Software Foundation. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program; if not, write to the Free Software #
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. #
# #
################################################################################
# stdlib
import copy
from typing import List, Sequence, Tuple, Union
# 3rd party
import numpy
# this package
from pyms.IntensityMatrix import BaseIntensityMatrix
from pyms.IonChromatogram import IonChromatogram
from pyms.Peak.Class import Peak
from pyms.Peak.List.Function import is_peak_list
from pyms.Spectrum import MassSpectrum
from pyms.Utils.Utils import _number_types, is_number, is_sequence_of
__all__ = [
"BillerBiemann",
"get_maxima_indices",
"get_maxima_list",
"get_maxima_list_reduced",
"get_maxima_matrix",
"num_ions_threshold",
"rel_threshold",
"sum_maxima",
]
#######################
# structure
# 1) find local maxima per ion, store intensity and scan index
# 2) sum across N scans to compensate for scan type
# 3) sum ions belonging to each maxima scan
#######################
[docs]def BillerBiemann(im: BaseIntensityMatrix, points: int = 3, scans: int = 1) -> List[Peak]:
"""
Deconvolution based on the algorithm of Biller and Biemann (1974).
:param im:
:param points: Number of scans over which to consider a maxima to be a peak.
:param scans: Number of scans to combine peaks from to compensate for spectra skewing.
:return: List of detected peaks
:authors: Andrew Isaac, Dominic Davis-Foster (type assertions)
"""
if not isinstance(im, BaseIntensityMatrix):
raise TypeError("'im' must be an IntensityMatrix object")
if not isinstance(points, int):
raise TypeError("'points' must be an integer")
if not isinstance(scans, int):
raise TypeError("'scans' must be an integer")
rt_list = im.time_list
mass_list = im.mass_list
peak_list = []
maxima_im = get_maxima_matrix(im, points, scans)
for row_idx, row in enumerate(maxima_im):
if sum(row) > 0:
rt = rt_list[row_idx]
ms = MassSpectrum(mass_list, row)
peak = Peak(rt, ms)
peak.bounds = (0, row_idx, 0) # store IM index for convenience
# TODO: can the bounds be determined from the intensity matrix?
peak_list.append(peak)
return peak_list
[docs]def get_maxima_indices(ion_intensities: Union[Sequence, numpy.ndarray], points: int = 3) -> List[int]:
"""
Returns the scan indices for the apexes of the ion.
:param ion_intensities: A list of intensities for a single ion.
:param points: Number of scans over which to consider a maxima to be a peak.
:author: Andrew Isaac, Dominic Davis-Foster (type assertions)
**Example:**
.. code-block:: python
>>> # A trivial set of data with two clear peaks
>>> data = [1, 2, 3, 4, 5, 4, 3, 2, 1, 2, 3, 4, 5, 6, 5, 4, 3, 2, 1]
>>> get_maxima_indices(data)
[4, 13]
>>> # Wider window (more points)
>>> get_maxima_indices(data, points=10)
[13]
"""
if not is_sequence_of(ion_intensities, _number_types):
raise TypeError("'ion_intensities' must be a sequence of numbers")
if not isinstance(points, int):
raise TypeError("'points' must be an integer")
# find peak inflection points
# use a 'points' point window
# for a plateau after a rise, need to check if it is the left edge of a peak
peak_point = []
edge = -1
points = int(points)
half = int(points / 2)
points = 2 * half + 1 # ensure odd number of points
for index in range(len(ion_intensities) - points + 1):
left = ion_intensities[index:index + half]
mid = ion_intensities[index + half]
right = ion_intensities[index + half + 1:index + points]
# print(left, mid, right)
if mid > max(left) and mid > max(right):
# the max value is in the middle
peak_point.append(index + half)
edge = -1 # ignore previous rising edge
elif mid > max(left) and mid == max(right):
# start of plateau following rise (left of peak?)
edge = index + half # ignore previous rising edge, update latest
elif mid == max(left) and mid > max(right):
# start of fall from plateau
if edge > -1:
centre = int((edge + index + half) / 2) # mid point
peak_point.append(centre)
edge = -1
return peak_point
[docs]def get_maxima_list(ic: IonChromatogram, points: int = 3) -> List[List[float]]:
"""
List of retention time and intensity of local maxima for ion.
:param ic:
:param points: Number of scans over which to consider a maxima to be a peak.
:return: A list of retention time and intensity of local maxima for ion.
:author: Andrew Isaac, Dominic Davis-Foster (type assertions)
"""
if not isinstance(ic, IonChromatogram):
raise TypeError("'ic' must be an IonChromatogram object")
if not isinstance(points, int):
raise TypeError("'points' must be an integer")
peak_point = get_maxima_indices(ic.intensity_array, points)
mlist = []
for index in range(len(peak_point)):
rt = ic.get_time_at_index(peak_point[index])
intensity = ic.get_intensity_at_index(peak_point[index])
mlist.append([rt, intensity])
return mlist
[docs]def get_maxima_list_reduced(
ic: IonChromatogram,
mp_rt: float,
points: int = 13,
window: int = 3,
) -> List[Tuple[float, float]]:
"""
List of retention time and intensity of local maxima for ion.
| Only peaks around a specific retention time are recorded.
| Created for use with gap filling algorithm.
:param ic:
:param mp_rt: The retention time of the missing peak
:param points: Number of scans over which to consider a maxima to be a peak.
:param window: The window around ``mp_rt`` where peaks should be recorded.
:return: A list of 2-element tuple containing the retention time and
intensity of local maxima for each ion.
:author: Andrew Isaac, Dominic Davis-Foster (type assertions)
"""
if not isinstance(ic, IonChromatogram):
raise TypeError("'ic' must be an IonChromatogram object")
if not is_number(mp_rt):
raise TypeError("'mp_rt' must be a number")
peak_point = get_maxima_indices(ic.intensity_array, points)
maxima_list = []
for index in range(len(peak_point)):
rt = ic.get_time_at_index(peak_point[index])
if (rt > float(mp_rt) - window) and (rt < float(mp_rt) + window):
intensity = ic.get_intensity_at_index(peak_point[index])
maxima_list.append((rt, intensity))
return maxima_list
[docs]def get_maxima_matrix(im: BaseIntensityMatrix, points: int = 3, scans: int = 1) -> numpy.ndarray:
"""
Constructs a matrix containing only data for scans in which particular ions apexed.
The data can be optionally consolidated into the scan within a range
with the highest total intensity by adjusting the ``scans`` parameter.
By default this is ``1``, which does not consolidate the data.
The columns are ion masses and the rows are scans.
Get matrix of local maxima for each ion.
:param im:
:param points: Number of scans over which to consider a maxima to be a peak.
:param scans: Number of scans to combine peaks from to compensate for spectra skewing.
:return: A matrix of giving the intensities of ion masses (columns) and for each scan (rows).
:author: Andrew Isaac, Dominic Davis-Foster (type assertions)
"""
if not isinstance(im, BaseIntensityMatrix):
raise TypeError("'im' must be an IntensityMatrix object")
if not isinstance(points, int):
raise TypeError("'points' must be an integer")
if not isinstance(scans, int):
raise TypeError("'scans' must be an integer")
numrows, numcols = im.size # scans, masses
# zeroed matrix, size numrows*numcols
maxima_im = numpy.zeros((numrows, numcols))
raw_im = im.intensity_array
# Construct a 2d array which is all zeros apart from the apexing ions
for col in range(numcols): # assume all rows have same width
# 1st, find maxima
maxima = get_maxima_indices(raw_im[:, col], points)
# 2nd, fill intensities
for row in maxima:
maxima_im[row, col] = raw_im[row, col]
# combine spectra within 'scans' scans.
half = int(scans / 2)
for row_idx in range(numrows):
# print(f"{row_idx=}")
# tic = 0
best = 0
loc = 0
# find best in scans
for ii in range(scans):
# print(f"{ii=}")
# Check the scan window around row_idx is not
# out of range on left (0) or right (numrows)
if 0 <= row_idx - half + ii < numrows:
# print(row_idx - half + ii)
tic = maxima_im[row_idx - half + ii].sum()
# find the index of the scan in the window with the highest TIC intensity
if tic > best:
best = tic
loc = ii
for ii in range(scans):
# Consolidate data in scan with highest TIC
source_idx = row_idx - half + ii # the scan to move data from
dest_idx = row_idx - half + loc # the scan to move data into
if 0 <= source_idx < numrows and ii != loc:
for col in range(numcols):
maxima_im[dest_idx, col] += maxima_im[source_idx, col]
maxima_im[source_idx, col] = 0
return maxima_im
[docs]def num_ions_threshold(
pl: Sequence[Peak],
n: int,
cutoff: float,
copy_peaks: bool = True,
) -> List[Peak]:
"""
Remove Peaks where there are fewer than ``n`` ions with intensities above the given threshold.
:param pl:
:param n: Minimum number of ions that must have intensities above the cutoff.
:param cutoff: The minimum intensity threshold.
:param copy_peaks: Whether the returned peak list should contain copies of the peaks.
:return: A new list of Peak objects.
:author: Andrew Isaac, Dominic Davis-Foster (type assertions)
"""
if not is_peak_list(pl):
raise TypeError("'pl' must be a list of Peak objects")
if not isinstance(n, int):
raise TypeError("'n' must be an integer")
if not is_number(cutoff):
raise TypeError("'cutoff' must be a number")
if copy_peaks:
pl = copy.deepcopy(pl)
new_pl = []
for p in pl:
ms = p.mass_spectrum
ia = ms.mass_spec
ions = 0
for i in range(len(ia)):
if ia[i] >= cutoff:
ions += 1
if ions >= n:
new_pl.append(p)
return new_pl
[docs]def rel_threshold(pl: Sequence[Peak], percent: float = 2, copy_peaks: bool = True) -> List[Peak]:
"""
Remove ions with relative intensities less than the given relative
percentage of the maximum intensity.
:param pl:
:param percent: Threshold for relative percentage of intensity.
:default percent: ``2%``
:param copy_peaks: Whether the returned peak list should contain copies of the peaks.
:return: A new list of Peak objects with threshold ions.
:author: Andrew Isaac, Dominic Davis-Foster (type assertions)
""" # noqa: D400
peak_list = pl
if not is_peak_list(peak_list):
raise TypeError("'pl' must be a list of Peak objects")
if not is_number(percent):
raise TypeError("'percent' must be a number > 0")
if percent <= 0:
raise ValueError("'percent' must be a number > 0")
if copy_peaks:
peak_list = copy.deepcopy(peak_list)
new_peak_list = []
for peak in peak_list:
ms = peak.mass_spectrum
# if ms is None:
# raise ValueError("The peak has no mass spectrum.")
ia = ms.mass_spec
# assume max(ia) big so /100 1st
cutoff = (max(ia) / 100.0) * float(percent)
for i in range(len(ia)):
if ia[i] < cutoff:
ia[i] = 0
ms.mass_spec = ia
peak.mass_spectrum = ms
new_peak_list.append(peak)
return new_peak_list
[docs]def sum_maxima(im: BaseIntensityMatrix, points: int = 3, scans: int = 1) -> IonChromatogram:
"""
Reconstruct the TIC as sum of maxima.
:param im:
:param points: Peak if maxima over 'points' number of scans.
:param scans: Number of scans to combine peaks from to compensate for spectra skewing.
:return: The reconstructed TIC.
:author: Andrew Isaac, Dominic Davis-Foster (type assertions)
"""
if not isinstance(im, BaseIntensityMatrix):
raise TypeError("'im' must be an IntensityMatrix object")
if not isinstance(points, int):
raise TypeError("'points' must be an integer")
if not isinstance(scans, int):
raise TypeError("'scans' must be an integer")
maxima_im = get_maxima_matrix(im, points)
sums = []
numrows = len(maxima_im)
half = int(scans / 2)
for row in range(numrows):
val = 0
for ii in range(scans):
if 0 <= row - half + ii < numrows:
val += maxima_im[row - half + ii].sum()
sums.append(val)
tic = IonChromatogram(numpy.array(sums), im.time_list)
return tic