Source code for pyms.DPA.IO

Functions for writing peak alignment to various file formats.

#                                                                              #
#    PyMassSpec software for processing of mass-spectrometry data              #
#    Copyright (C) 2005-2012 Vladimir Likic                                    #
#    Copyright (C) 2019-2020 Dominic Davis-Foster                              #
#                                                                              #
#    This program is free software; you can redistribute it and/or modify      #
#    it under the terms of the GNU General Public License version 2 as         #
#    published by the Free Software Foundation.                                #
#                                                                              #
#    This program is distributed in the hope that it will be useful,           #
#    but WITHOUT ANY WARRANTY; without even the implied warranty of            #
#    GNU General Public License for more details.                              #
#                                                                              #
#    You should have received a copy of the GNU General Public License         #
#    along with this program; if not, write to the Free Software               #
#    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.                 #
#                                                                              #

# stdlib
import operator
from typing import List

# 3rd party
from domdf_python_tools.typing import PathLike
from openpyxl import Workbook  # type: ignore[import]
from openpyxl.comments import Comment  # type: ignore[import]
from openpyxl.formatting.rule import ColorScaleRule  # type: ignore[import]
from openpyxl.styles import PatternFill  # type: ignore[import]
from openpyxl.utils import get_column_letter  # type: ignore[import]

# this package
from pyms.DPA.Alignment import Alignment
from pyms.Peak.List.Function import composite_peak
from pyms.Utils.IO import prepare_filepath
from pyms.Utils.Utils import is_path

__all__ = ["write_mass_hunter_csv", "write_excel", "write_transposed_output"]

[docs]def write_mass_hunter_csv( alignment: Alignment, file_name: PathLike, top_ion_list: List[int], ) -> None: # , peak_list_name): """ Creates a csv file with UID, common and qualifying ions and their ratios for mass hunter interpretation. :param alignment: alignment object to write to file :param file_name: name of the output file. :param top_ion_list: a list of the common ions for each peak in the averaged peak list for the alignment. """ # noqa: D400 if not is_path(file_name): raise TypeError("'file_name' must be a string or a PathLike object") file_name = prepare_filepath(file_name) if top_ion_list is None: raise ValueError("List of common ions must be supplied") with'w', encoding="UTF-8") as fp: # write headers fp.write( '"UID","Common Ion","Qual Ion 1","ratio QI1/CI","Qual Ion 2",' '"ratio QI2/CI","l window delta","r window delta"\n' ) rtsums: List[float] = [] rtcounts = [] # The following two arrays will become list of lists # such that: # areas = [ [align1_peak1, align2_peak1, .....,alignn_peak1] # [align1_peak2, ................................] # ............................................. # [align1_peakm,....................,alignn_peakm] ] areas = [] # type: ignore[var-annotated] new_peak_lists = [] # type: ignore[var-annotated] rtmax = [] rtmin = [] for peak_list in alignment.peakpos: index = 0 for peak in peak_list: # on the first iteration, populate the lists if len(areas) < len(peak_list): areas.append([]) new_peak_lists.append([]) rtsums.append(0) rtcounts.append(0) rtmax.append(0.0) rtmin.append(0.0) if peak is not None: rt = peak.rt # get the area of the common ion for the peak # an area of 'na' shows that while the peak was # aligned, the common ion was not present area = peak.get_ion_area(top_ion_list[index]) areas[index].append(area) new_peak_lists[index].append(peak) # The following code to the else statement is # just for calculating the average rt rtsums[index] += rt rtcounts[index] += 1 # quick workaround for weird problem when # attempting to set rtmin to max time above if rtmin[index] == 0.0: rtmin[index] = 5400.0 if rt > rtmax[index]: rtmax[index] = rt if rt < rtmin[index]: rtmin[index] = rt else: areas[index].append(None) index += 1 out_strings = [] compo_peaks = [] index = 0 # now write the strings for the file for area_list in areas: # write initial info: # peak unique id, peak average rt compo_peak = composite_peak(new_peak_lists[index]) if compo_peak is None: continue compo_peaks.append(compo_peak) peak_UID = compo_peak.UID peak_UID_string = f'"{peak_UID}"' # calculate the time from the leftmost peak to the average l_window_delta = compo_peak.rt - rtmin[index] # print("l_window", l_window_delta, "rt", compo_peak.rt, "rt_min", rtmin[index]) r_window_delta = rtmax[index] - compo_peak.rt common_ion = top_ion_list[index] qual_ion_1 = int(peak_UID_string.split('-')[0].strip('"')) qual_ion_2 = int(peak_UID_string.split('-')[1]) if qual_ion_1 == common_ion: qual_ion_1 = compo_peak.get_third_highest_mz() elif qual_ion_2 == common_ion: qual_ion_2 = compo_peak.get_third_highest_mz() else: pass ci_intensity = compo_peak.get_int_of_ion(common_ion) q1_intensity = compo_peak.get_int_of_ion(qual_ion_1) q2_intensity = compo_peak.get_int_of_ion(qual_ion_2) try: q1_ci_ratio = float(q1_intensity) / float(ci_intensity) except TypeError: # if no area available for that ion q1_ci_ratio = 0.0 except ZeroDivisionError: # shouldn't happen but does!! q1_ci_ratio = 0.01 try: q2_ci_ratio = float(q2_intensity) / float(ci_intensity) except TypeError: q2_ci_ratio = 0.0 except ZeroDivisionError: # shouldn't happen, but does!! q2_ci_ratio = 0.01 out_strings.append( ','.join([ peak_UID, f"{common_ion}", f"{qual_ion_1}", f"{q1_ci_ratio * 100:.1f}", f"{qual_ion_2}", f"{q2_ci_ratio * 100:.1f}", f"{(l_window_delta + 1.5) / 60:.2f}", f"{(r_window_delta + 1.5) / 60:.2f}", ]) ) index += 1 # now write the file # print("length of areas[0]", len(areas[0])) # print("lenght of areas", len(areas)) # print("length of out_strings", len(out_strings)) for row in out_strings: fp.write(f"{row}\n")
# dump_object(compo_peaks, peak_list_name)
[docs]def write_excel( alignment: Alignment, file_name: PathLike, minutes: bool = True, ) -> None: """ Writes the alignment to an excel file, with colouring showing possible mis-alignments. :param alignment: :class:`pyms.DPA.Alignment.Alignment` object to write to file. :param file_name: The name for the retention time alignment file. :param minutes: Whether to save retention times in minutes. If :py:obj:`False`, retention time will be saved in seconds. :author: David Kainer """ if not is_path(file_name): raise TypeError("'file_name' must be a string or a PathLike object") file_name = prepare_filepath(file_name) wb = Workbook() ws = ws.title = "Aligned RT" # create header row ws["A1"] = "UID" ws["B1"] = "RTavg" for i, item in enumerate(alignment.expr_code): currcell = ws.cell(row=1, column=i + 3, value=f"{item}") comment = Comment("sample " + str(i), "dave") currcell.comment = comment # for each alignment position write alignment's peak and area for peak_idx in range(len(alignment.peakpos[0])): # loop through peak lists (rows) new_peak_list = [] for align_idx in range(len(alignment.peakpos)): # loops through samples (columns) peak = alignment.peakpos[align_idx][peak_idx] if peak is not None: if minutes: rt = peak.rt / 60.0 else: rt = peak.rt area = peak.area new_peak_list.append(peak) # write the RT into the cell in the excel file currcell = ws.cell(row=2 + peak_idx, column=3 + align_idx, value=round(rt, 3)) # get the mini-mass spec for this peak, and divide the ion intensities by 1000 to shorten them ia = peak.ion_areas ia.update((mass, int(intensity / 1000)) for mass, intensity in ia.items()) sorted_ia = sorted(ia.items(), key=operator.itemgetter(1), reverse=True) # write the peak area and mass spec into the comment for the cell comment = Comment(f"Area: {area:.0f} | MassSpec: {sorted_ia}", "dave") # currcell.number_format currcell.comment = comment else: # rt = 'NA' # area = 'NA' currcell = ws.cell(row=2 + peak_idx, column=3 + align_idx, value="NA") comment = Comment("Area: NA", "dave") # currcell.number_format currcell.comment = comment compo_peak = composite_peak(new_peak_list) if compo_peak is not None: peak_UID = compo_peak.UID peak_UID_string = f'"{peak_UID}"' ws.cell(row=2 + peak_idx, column=1, value=peak_UID_string) ws.cell(row=2 + peak_idx, column=2, value=f"{float(compo_peak.rt / 60):.3f}") # colour the cells in each row based on their RT percentile for that row i = 0 for row in ws.rows: i += 1 cell_range = ("{0}" + str(i) + ":{1}" + str(i)).format(get_column_letter(3), get_column_letter(len(row))) ws.conditional_formatting.add( cell_range, ColorScaleRule( start_type="percentile", start_value=1, start_color="E5FFCC", mid_type="percentile", mid_value=50, mid_color="FFFFFF", end_type="percentile", end_value=99, end_color="FFE5CC" ), )
[docs]def write_transposed_output( alignment: Alignment, file_name: PathLike, minutes: bool = True, ) -> None: """ Write an alignment to an Excel workbook. :param alignment: :class:`pyms.DPA.Alignment.Alignment` object to write to file :param file_name: The name of the file :param minutes: """ if not is_path(file_name): raise TypeError("'file_name' must be a string or a PathLike object") file_name = prepare_filepath(file_name) wb = Workbook() ws1 = wb.create_sheet(title="Aligned RT") ws2 = wb.create_sheet(title="Aligned Area") ws1["A1"] = "Peak" ws1["A2"] = "RTavg" ws2["A1"] = "Peak" ws2["A2"] = "RTavg" style_outlier = PatternFill(fill_type="solid", fgColor="FFAE19", bgColor="FFAE19") # write column with sample IDs for i, item in enumerate(alignment.expr_code): ws1.cell(column=1, row=i + 3, value=f"{item}") ws2.cell(column=1, row=i + 3, value=f"{item}") # for each alignment position write alignment's peak and area for peak_idx in range(len(alignment.peakpos[0])): # loop through peak lists new_peak_list = [] # this will contain a list of tuples of form (peak, col, row), but only non-NA peaks for align_idx in range(len(alignment.peakpos)): # loops through samples peak = alignment.peakpos[align_idx][peak_idx] cell_col = 2 + peak_idx cell_row = 3 + align_idx if peak is not None: if minutes: rt = peak.rt / 60.0 else: rt = peak.rt area = peak.area # these are the col,row coords of the peak in the output matrix new_peak_list.append((peak, cell_col, cell_row)) # write the RT into the cell in the excel file currcell1 = ws1.cell(column=cell_col, row=cell_row, value=round(rt, 3)) ws2.cell(column=cell_col, row=cell_row, value=round(area, 3)) # type: ignore[arg-type] # get the mini-mass spec for this peak, and divide the ion intensities by 1000 to shorten them ia = peak.ion_areas ia.update((mass, int(intensity / 1000)) for mass, intensity in ia.items()) sorted_ia = sorted(ia.items(), key=operator.itemgetter(1), reverse=True) # write the peak area and mass spec into the comment for the cell comment = Comment(f"Area: {area:.0f} | MassSpec: {sorted_ia}", "dave") currcell1.comment = comment else: # rt = 'NA' # area = 'NA' currcell1 = ws1.cell(column=cell_col, row=cell_row, value="NA") ws2.cell(column=cell_col, row=cell_row, value="NA") comment = Comment("Area: NA", "dave") currcell1.comment = comment # this method will create the compo peak, and also mark outlier peaks with a bool is_outlier compo_peak = composite_peak(list(p[0] for p in new_peak_list)) if compo_peak is not None: ws1.cell(column=2 + peak_idx, row=1, value=f'"{compo_peak.UID}"') ws1.cell(column=2 + peak_idx, row=2, value=f"{float(compo_peak.rt / 60):.3f}") ws2.cell(column=2 + peak_idx, row=1, value=f'"{compo_peak.UID}"') ws2.cell(column=2 + peak_idx, row=2, value=f"{float(compo_peak.rt / 60):.3f}") # highlight outlier cells in the current peak list for p in new_peak_list: if p[0].is_outlier: # ws[ get_column_letter(p[1]) + str(p[2]) ].style = style_outlier ws1.cell(column=p[1], row=p[2]).fill = style_outlier ws2.cell(column=p[1], row=p[2]).fill = style_outlier