Source code for maspy.featuremethods

"""
#TODO: module description
"""

#  Copyright 2015-2017 David M. Hollenstein, Jakob J. Hollenstein
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

######################### Python 2 and 3 compatibility #########################
from __future__ import absolute_import, division, print_function
from __future__ import unicode_literals
from future.utils import viewitems, viewkeys, viewvalues, listitems, listvalues

try:
    #python 2.7
    from itertools import izip as zip
except ImportError:
    #python 3 series
    pass
################################################################################

import bisect
from collections import defaultdict as ddict
import functools
import io
import operator
import os

#TODO: remove matplotlib from this modul, return a report object
from matplotlib import pyplot as plt
import numpy

import maspy.auxiliary as aux
import maspy.core
import maspy.sil


# --- Functions to work with FeatureItems --- #
[docs]def matchToFeatures(fiContainer, specContainer, specfiles=None, fMassKey='mz', sMassKey='obsMz', isotopeErrorList=(0), precursorTolerance=5, toleranceUnit='ppm', rtExpansionUp=0.10, rtExpansionDown=0.05, matchCharge=True, scoreKey='pep', largerBetter=False): """Annotate :class:`Fi <maspy.core.Fi>` (Feature items) by matching :class:`Si <maspy.core.Si>` (Spectrum items) or :class:`Sii <maspy.core.Sii>` (Spectrum identification items). :param fiContainer: :class:`maspy.core.FeatureContainer`, contains ``Fi``. :param specContainer: :class:`maspy.core.MsrunContainer` or :class:`maspy.core.SiiContainer`, contains ``Si`` or ``Sii``. :param specfiles: filenames of ms-run files, if specified consider only items from those files :type specfiles: str, list or None :param fMassKey: mass attribute key in :attr:`Fi.__dict__` :param sMassKey: mass attribute key in :attr:`Si.__dict__` or :attr:`Sii.__dict__` (eg 'obsMz', 'excMz') :param isotopeErrorList: allowed isotope errors relative to the spectrum mass, for example "0" or "1". If no feature has been matched with isotope error 0, the spectrum mass is increased by the mass difference of carbon isotopes 12 and 13 and matched again. The different isotope error values are tested in the specified order therefore "0" should normally be the first value of the list. :type isotopeErrorList: list or tuple of int :param precursorTolerance: the largest allowed mass deviation of ``Si`` or ``Sii`` relative to ``Fi`` :param toleranceUnit: defines how the ``precursorTolerance`` is applied to the mass value of ``Fi``. ``"ppm": mass * (1 +/- tolerance*1E-6)`` or ``"da": mass +/- value`` :param rtExpansionUp: relative upper expansion of ``Fi`` retention time area. ``limitHigh = Fi.rtHigh + (Fi.rtHigh - Fi.rtLow) * rtExpansionUp`` :param rtExpansionDown: relative lower expansion of ``Fi`` retention time area. ``limitLow = Fi.rtLow - (Fi.rtHigh - Fi.rtLow) * rtExpansionDown`` :param matchCharge: bool, True if ``Fi`` and ``Si`` or ``Sii`` must have the same ``charge`` state to be matched. :param scoreKey: ``Sii`` attribute name used for scoring the identification reliability :param largerBetter: bool, True if higher score value means a better identification reliability .. note: Concerning the feature retention area expansion. If ``Si`` or ``Sii`` is matched to multiple ``Fi`` the rt expansion is removed and the matching is repeated. .. note: If the ``specContainer`` is a ``SiiContainer`` then matched ``Fi`` are annotated with :attr:`Sii.peptide`, if multiple ``Sii`` are matched to ``Fi`` the one with the best score is used. #TODO: this function is nested pretty badly and should maybe be rewritten #TODO: replace tolerance unit "ppm" by tolerance mode "relative" and change repsective calculations """ isotopeErrorList = aux.toList(isotopeErrorList) if specContainer.__class__.__name__ == 'MsrunContainer': listKeySpecIds = 'siIds' else: listKeySpecIds = 'siiIds' specContainerSpecfiles = [_ for _ in viewkeys(specContainer.info)] if specfiles is not None: specfiles = aux.toList(specfiles) else: specfiles = [_ for _ in viewkeys(fiContainer.info)] specfiles = list(set(specfiles).intersection(set(specContainerSpecfiles))) for specfile in specfiles: multiMatchCounter = int() isotopeErrorMatchCounter = int() specArrays = specContainer.getArrays([sMassKey, 'rt', 'charge', 'msLevel'], specfiles=specfile ) featureArrays = fiContainer.getArrays(['rtHigh', 'rtLow', 'charge', fMassKey], specfiles=specfile, sort=fMassKey ) featureArrays['rtHighExpanded'] = (featureArrays['rtHigh'] + (featureArrays['rtHigh'] - featureArrays['rtLow']) * rtExpansionUp ) featureArrays['rtLowExpanded'] = (featureArrays['rtLow'] - (featureArrays['rtHigh'] - featureArrays['rtLow']) * rtExpansionDown ) specFeatureDict = dict() ## key = scanNr, value = set(featureKeys) featureSpecDict = dict() ## key = featureKey, value = set(scanNrs) for specPos, specId in enumerate(specArrays['id']): specZ = specArrays['charge'][specPos] if specZ is None: continue specMass = specArrays[sMassKey][specPos] specRt = specArrays['rt'][specPos] matchComplete = False isotopeErrorPos = 0 while not matchComplete: isotopeError = isotopeErrorList[isotopeErrorPos] # calculate mass limits for each isotope error if toleranceUnit.lower() == 'ppm': specMassHigh = ((specMass + isotopeError * 1.003355 / specZ) * (1 + precursorTolerance*1E-6) ) specMassLow = ((specMass + isotopeError * 1.003355 / specZ) * (1 - precursorTolerance*1E-6) ) elif toleranceUnit.lower() == 'da': specMassHigh = ((specMass + isotopeError * 1.003355 / specZ) + precursorTolerance ) specMassLow = ((specMass + isotopeError * 1.003355 / specZ) - precursorTolerance ) posL = bisect.bisect_left(featureArrays[fMassKey], specMassLow ) posR = bisect.bisect_right(featureArrays[fMassKey], specMassHigh ) if matchCharge: chargeMask = (featureArrays['charge'][posL:posR] == specZ) fRtHighKey = 'rtHighExpanded' fRtLowKey = 'rtLowExpanded' for fRtHighKey, fRtLowKey in [('rtHighExpanded', 'rtLowExpanded'), ('rtHigh', 'rtLow') ]: rtMask = ((featureArrays[fRtLowKey][posL:posR] <= specRt) & (featureArrays[fRtHighKey][posL:posR] >= specRt) ) if matchCharge: matchedFeatureIds = featureArrays['id'][posL:posR][rtMask & chargeMask] else: matchedFeatureIds = featureArrays['id'][posL:posR][rtMask] if len(matchedFeatureIds) <= 1: break # if exactly one feature has been matched, if len(matchedFeatureIds) > 0: if len(matchedFeatureIds) == 1: matchComplete = True if isotopeErrorList[isotopeErrorPos] != 0: isotopeErrorMatchCounter += 1 else: #Stop if Spectrum can be matched to multiple features multiMatchCounter += 1 break isotopeErrorPos += 1 if isotopeErrorPos >= len(isotopeErrorList): #Stop if all allowed isotope errors have been tested break if matchComplete: for featureId in matchedFeatureIds: getattr(fiContainer.container[specfile][featureId], listKeySpecIds ).append(specId) fiContainer.container[specfile][featureId].isMatched = True specFeatureDict[specId] = featureId featureSpecDict[featureId] = specId stats = dict() stats['totalFeatures'] = len(featureArrays['id']) stats['matchedFeatures'] = len(featureSpecDict) stats['relMatchedFeatures'] = round(100*stats['matchedFeatures']/stats['totalFeatures'], 1) stats['totalSpectra'] = len(specArrays['id'][(specArrays['msLevel'] != 1)]) stats['matchedSpectra'] = len(specFeatureDict) stats['relMatchedSpectra'] = round(100*stats['matchedSpectra']/stats['totalSpectra'], 1) print('------', specfile, '------') print('Annotated features:\t\t\t', stats['matchedFeatures'], '/', stats['totalFeatures'], '=', stats['relMatchedFeatures'], '%') print('Spectra matched to features:\t\t', stats['matchedSpectra'], '/', stats['totalSpectra'], '=', stats['relMatchedSpectra'], '%') if multiMatchCounter != 0: print('Discarded because of multiple matches:\t', multiMatchCounter) if isotopeErrorMatchCounter != 0: print('Isotope error matched spectra:\t\t', isotopeErrorMatchCounter) #annotate feature with sii information (peptide, sequence, score) if isinstance(specContainer, maspy.core.SiiContainer): for featureId in viewkeys(featureSpecDict): matches = list() for specId in fiContainer.container[specfile][featureId].siiIds: _sii = specContainer.getValidItem(specfile, specId) score = getattr(_sii, scoreKey) peptide = _sii.peptide sequence = _sii.sequence matches.append([score, peptide, sequence]) matches.sort(reverse=largerBetter) fiContainer.container[specfile][featureId].isAnnotated = True fiContainer.container[specfile][featureId].score = matches[0][0] fiContainer.container[specfile][featureId].peptide = matches[0][1] fiContainer.container[specfile][featureId].sequence = matches[0][2]
[docs]def rtCalibration(fiContainer, allowedRtDev=60, allowedMzDev=2.5, reference=None, specfiles=None, showPlots=False, plotDir=None, minIntensity=1e5): """Performs a retention time calibration between :class:`FeatureItem` of multiple specfiles. :ivar fiContainer: Perform alignment on :class:`FeatureItem` in :attr:`FeatureContainer.specfiles` :ivar allowedRtDev: maxium retention time difference of two features in two runs to be matched :ivar allowedMzDev: maxium relative m/z difference (in ppm) of two features in two runs to be matched :ivar showPlots: boolean, True if a plot should be generated which shows to results of the calibration :ivar plotDir: if not None and showPlots is True, the plots are saved to this location. :ivar reference: Can be used to specifically specify a reference specfile :ivar specfiles: Limit alignment to those specfiles in the fiContainer :ivar minIntensity: consider only features with an intensity above this value """ #TODO: long function, maybe split into subfunctions specfiles = [_ for _ in viewkeys(fiContainer.info)] if specfiles is None else specfiles matchCharge = True refMzKey = 'mz' mzKey = 'mz' if reference is not None: if reference in specfiles: specfiles = [reference] + list(set(specfiles).difference(set([reference]))) else: print('Specified reference specfile not present, using reference: ', specfiles[0]) for featureItem in fiContainer.getItems(specfiles=specfiles): if not hasattr(featureItem, 'obsRt'): setattr(featureItem, 'obsRt', featureItem.rt) referenceArrays = None for specfile in specfiles: featureArrays = fiContainer.getArrays(['rt', 'charge', 'mz', 'intensity'], specfiles=specfile, sort='rt' ) if minIntensity is not None: intensityMask = (featureArrays['intensity'] > minIntensity) for key in list(viewkeys(featureArrays)): featureArrays[key] = featureArrays[key][intensityMask] if referenceArrays is None: referenceArrays = featureArrays if showPlots: print('Reference: '+specfile) continue rtPosList = list() rtDevList = list() mzDevRelList = list() mzDevAbsList = list() for featurePos in range(len(featureArrays[mzKey])): currRt = featureArrays['rt'][featurePos] currMz = featureArrays[mzKey][featurePos] currZ = featureArrays['charge'][featurePos] mzLimitUp = currMz*(1+allowedMzDev*1E-6) mzLimitLow = currMz*(1-allowedMzDev*1E-6) rtLimitUp = currRt+allowedRtDev rtLimitLow = currRt-allowedRtDev posL = bisect.bisect_left(referenceArrays['rt'], rtLimitLow) posU = bisect.bisect_right(referenceArrays['rt'], rtLimitUp) refMask = (referenceArrays[refMzKey][posL:posU] <= mzLimitUp) & (referenceArrays[refMzKey][posL:posU] >= mzLimitLow) if matchCharge: refMask = refMask & (referenceArrays['charge'][posL:posU] == currZ) currMzDev = abs(referenceArrays[refMzKey][posL:posU][refMask] - currMz) bestHitMask = currMzDev.argsort() for refRt, refMz in zip(referenceArrays['rt'][posL:posU][refMask][bestHitMask], referenceArrays[refMzKey][posL:posU][refMask][bestHitMask]): rtPosList.append(currRt) rtDevList.append(currRt - refRt) mzDevRelList.append((1 - currMz / refMz)*1E6) mzDevAbsList.append(currMz - refMz) break rtPosList = numpy.array(rtPosList) rtDevList = numpy.array(rtDevList) splineInitialKnots = int(max(rtPosList) - min(rtPosList)) dataFit = aux.DataFit(rtDevList, rtPosList) dataFit.splineInitialKnots = splineInitialKnots dataFit.splineTerminalExpansion = 0.2 dataFit.processInput(dataAveraging='median', windowSize=10) dataFit.generateSplines() if showPlots: corrDevArr = rtDevList - dataFit.corrArray(rtPosList) timePoints = [min(rtPosList) + x for x in range(int(max(rtPosList)-min(rtPosList)))] corrValues = dataFit.corrArray(timePoints) fig, ax = plt.subplots(3, 2, sharex=False, sharey=False, figsize=(20, 18)) fig.suptitle(specfile) ax[0][0].hist(rtDevList, bins=100, color='grey', alpha=0.5, label='observed') ax[0][0].hist(corrDevArr, bins=100, color='red', alpha=0.5, label='corrected') ax[0][0].set_title('Retention time deviation') ax[0][0].legend() ax[0][0].set_xlim(allowedRtDev*-1, allowedRtDev) ax[0][1].hist(mzDevRelList, bins=100, color='grey') ax[0][1].set_title('Mz deviation [ppm]') ax[1][0].scatter(rtPosList, rtDevList, color='grey', alpha=0.1, label='observed') ax[1][0].plot(timePoints,corrValues, color='red', alpha=0.5, label='correction function') ax[1][0].set_title('Retention time deviation over time') ax[1][0].legend() ax[1][0].set_ylim(allowedRtDev*-1, allowedRtDev) ax[1][1].scatter(rtPosList, mzDevRelList, color='grey', alpha=0.1) ax[1][1].set_title('Mz deviation over time') ax[1][1].set_ylim(allowedMzDev*-1, allowedMzDev) ax[2][0].scatter(rtPosList, corrDevArr, color='grey', alpha=0.1) ax[2][0].set_title('Aligned retention time deviation over time') ax[2][0].set_ylim(allowedRtDev*-1, allowedRtDev) if plotDir is not None: plotloc = aux.joinpath(plotDir, specfile+'.rtAlign.png') fig.savefig(plotloc) else: fig.show() featureArrays = fiContainer.getArrays(['rt'], specfiles=specfile, sort='rt') featureArrays['corrRt'] = featureArrays['rt'] - dataFit.corrArray(featureArrays['rt']) for featureId, corrRt, rt in zip(featureArrays['id'], featureArrays['corrRt'], featureArrays['rt']): fiContainer.container[specfile][featureId].rt = corrRt
##TODO: Code is deprecated, new classes are currently located in maspy.featuregrouping #class FeatureGroupItem(object): # """Representation of a group of :class:`FeatureItem`. # # :ivar isMatched: False by default, True if any :class:`FeatureItem` in the group are matched. # :ivar isAnnotated: False by default, True if any :class:`FeatureItem` in the group are annotated. # :ivar siIds: containerId values of matched Si entries # :ivar siiIds: containerId values of matched Sii entries # :ivar featureIds: containerId values of :class:`FeatureItem` in the feature group # :ivar peptide: peptide sequence of best scoring Sii match # :ivar sequence: plain amino acid sequence of best scoring Sii match, used to retrieve protein information # :ivar score: score of best scoring Sii match # :ivar matchMatrix: structured representation of :attr:`FeatureItem.containerId` in the feature group. # :ivar intensityMatrix: similar to :attr:`matchMatrix` but contains :attr:`FeatureItem.intensity` values. # {chargeState: 2d numpy.array with specfiles as 1st dimension and labelState as 2nd dimension} # """ # def __init__(self): # self.isMatched = None # self.isAnnotated = None # self.siIds = list() # self.siiIds = list() # self.featureIds = list() # self.peptide = None # self.sequence = None # self.score = None # self.matchMatrix = dict() # self.intensityMatrix = dict() # # #class FeatureGroupContainer(object): # """ItemContainer for peptide feature groups :class`FeatureGroupItem`. # # :ivar container: Storage list of :class:`FeatureGroupItem` # :ivar index: Use :attr:`FeatureItem.containerId` to which :class:`FeatureGroupItem` the feature was grouped # :ivar labelDescriptor: :class:`maspy.sil.LabelDescriptor` describes the label setup of an experiment # :ivar specfiles: List of keywords (filenames) representing files # :ivar specfilePositions: {specfile:arrayPosition, ...} # arrayPosition respresents the array position of a specfile in :attr:`FeatureGroupItem.matchMatrix` # """ # def __init__(self, specfiles, labelDescriptor=None): # self.container = dict() # self.labelDescriptor = maspy.sil.LabelDescriptor() if labelDescriptor is None else labelDescriptor # self._index = 0 # # self.info = dict() # for position, specfile in enumerate(specfiles): # self.info[specfile] = {'matrixPosition': position} # # def getItems(self, specfiles=None, sort=False, reverse=False, selector=lambda fgi: True): # """Generator that yields filtered and/or sorted :class:`Si` objects from :instance:`self.sic` # # :param specfiles: filenames of msrun files - if specified return only items from those files # :type specfiles: str or [str, str, ...] # :param sort: if "sort" is specified the returned list of items is sorted according to the :class:`Si` # attribute specified by "sort", if the attribute is not present the item is skipped. # :param reverse: boolean to reverse sort order # :param selector: a function which is called with each :class:`Si` item and returns # True (include item) or False (discard item). If not specified all items are returned # """ # specfiles = [_ for _ in viewkeys(self.info)] if specfiles is None else aux.toList(specfiles) # return _getItems(self.container, specfiles, sort, reverse, selector) # # def getArrays(self, report='lfq', attr=None, specfiles=None, sort=False, reverse=False, selector=lambda si: True, defaultValue=None): # """Return a condensed array of data selected from :class:`Si` objects of :instance:`self.sic` # for fast and convenient data processing. # # :param attr: list of :class:`Si` item attributes that should be added to the returned array. # If an attribute is not present the "defaultValue" is added instead. The attributes "id" and "specfile" # are always included, in combination they serve as a unique id. # :param specfiles: filenames of msrun files - if specified return only items from those files # :type specfiles: str or [str, str, ...] # :param sort: if "sort" is specified the returned list of items is sorted according to the :class:`Si` # attribute specified by "sort", if the attribute is not present the item is skipped. # :param reverse: boolean to reverse sort order # :param selector: a function which is called with each :class:`Si` item and returns # True (include item) or False (discard item). If not specified all items are returned # # return {'attribute1': numpy.array(), 'attribute1': numpy.array(), ...} # """ # attr = attr if attr is not None else [] # attr = set(['id', 'specfile'] + aux.toList(attr)) # specfiles = [_ for _ in viewkeys(self.info)] if specfiles is None else aux.toList(specfiles) # # arrays = arrays = dict([(key, []) for key in attr]) # reportAttributes = list() # if report == 'lfq': # arrays['charge'] = list() # arrays['labelState'] = list() # for specfile in self.specfiles: # arrays[specfile] = list() # reportAttributes.append(specfile) # elif report == 'sil': # arrays['charge'] = list() # arrays['specfile'] = list() # for labelState in list(viewkeys(self.labelDescriptor.labels)) + [-1]: # labelAttributeName = ' '.join(('label:', str(labelState))) # arrays[labelAttributeName] = list() # reportAttributes.append(labelAttributeName) # # if report == 'sil': # for item in _getItems(self.container, specfiles, sort, reverse, selector): # for charge in viewkeys(item.intensityMatrix): # for specfile in specfiles: # specfilePosition = self.info[specfile]['matrixPosition'] # for key in attributes: # arrays[key].append(getattr(item, key, None)) # arrays['charge'].append(charge) # arrays['specfile'].append(specfile) # for labelState in list(viewkeys(self.labelDescriptor.labels)) + [-1]: # labelAttributeName = ' '.join(('label:', str(labelState))) # arrays[labelAttributeName].append(item.intensityMatrix[charge][specfilePosition, labelState]) # elif report == 'lfq': # for item in _getItems(self.container, specfiles, sort, reverse, selector): # for charge in viewkeys(item.intensityMatrix): # for labelState in list(viewkeys(self.labelDescriptor.labels)) + [-1]: # for key in attributes: # arrays[key].append(getattr(item, key, None)) # arrays['charge'].append(charge) # arrays['labelState'].append(labelState) # for specfile in specfiles: # specfilePosition = self.info[specfile]['matrixPosition'] # arrays[specfile].append(item.intensityMatrix[charge][specfilePosition, labelState]) # else: # raise Exception('report must be either "lfq" or "sil", not '+report)## # # for key in [_ for _ in viewkeys(arrays)]: # if key in reportAttributes: # arrays[key] = numpy.array(arrays[key], dtype=numpy.float64) # else: # arrays[key] = numpy.array(arrays[key]) # return arrays