"""
Provides the possibility to write a new mzML file from an MsrunContainer
instance, which is the maspy representation of a specfile.
"""
# Copyright 2015-2017 David M. Hollenstein, Jakob J. Hollenstein
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
######################## Python 2 and 3 compatibility #########################
from __future__ import absolute_import, division, print_function
from __future__ import unicode_literals
from future.utils import viewitems, viewkeys, viewvalues, listitems, listvalues
try:
#python 2.7
from itertools import izip as zip
except ImportError:
#python 3 series
pass
################################################################################
import hashlib
import io
import os
from lxml import etree as ETREE
import numpy
import maspy.auxiliary as aux
import maspy.xml
##########################################################
### mzml import and export methods #######################
##########################################################
[docs]def writeMzml(specfile, msrunContainer, outputdir, spectrumIds=None,
chromatogramIds=None, writeIndex=True):
""" #TODO: docstring
:param specfile: #TODO docstring
:param msrunContainer: #TODO docstring
:param outputdir: #TODO docstring
:param spectrumIds: #TODO docstring
:param chromatogramIds: #TODO docstring
"""
#TODO: maybe change to use aux.openSafeReplace
outputFile = io.BytesIO()
#TODO: perform check that specfile is present in msrunContainer and at least
# the metadatanode.
metadataTree = msrunContainer.rmc[specfile]
#Generate a list of spectrum ids that should be written to mzML
if spectrumIds is None and specfile in msrunContainer.smic:
keyTuple = [(int(key), key) for key in viewkeys(msrunContainer.smic[specfile])]
spectrumIds = [key for _, key in sorted(keyTuple)]
spectrumCounts = len(spectrumIds)
#Generate a list of chromatogram ids that should be written to mzML
if chromatogramIds is None and specfile in msrunContainer.cic:
chromatogramIds = [cId for cId in viewkeys(msrunContainer.cic[specfile])]
chromatogramCounts = len(chromatogramIds)
spectrumIndexList = list()
chromatogramIndexList = list()
xmlFile = ETREE.xmlfile(outputFile, encoding='ISO-8859-1', buffered=False)
xmlWriter = xmlFile.__enter__()
xmlWriter.write_declaration()
nsmap = {None: 'http://psi.hupo.org/ms/mzml',
'xsi': 'http://www.w3.org/2001/XMLSchema-instance'
}
mzmlAttrib = {'{http://www.w3.org/2001/XMLSchema-instance}schemaLocation': \
'http://psi.hupo.org/ms/mzml http://psidev.info/files/ms/mzML/xsd/mzML1.1.0.xsd',
'version': '1.1.0', 'id': metadataTree.attrib['id']
}
if writeIndex:
xmlIndexedMzml = xmlWriter.element('indexedmzML', nsmap=nsmap)
xmlIndexedMzml.__enter__()
xmlWriter.write('\n')
xmlMzml = xmlWriter.element('mzML', mzmlAttrib, nsmap=nsmap)
xmlMzml.__enter__()
xmlWriter.write('\n')
for metadataNode in metadataTree.getchildren():
if metadataNode.tag != 'run':
xmlWriter.write(maspy.xml.recCopyElement(metadataNode),
pretty_print=True
)
else:
xmlRun = xmlWriter.element(metadataNode.tag, metadataNode.attrib)
xmlRun.__enter__()
xmlWriter.write('\n')
for runChild in metadataNode.getchildren():
if runChild.tag == 'spectrumList':
specDefaultProcRef = runChild.attrib['defaultDataProcessingRef']
elif runChild.tag == 'chromatogramList':
chromDefaultProcRef = runChild.attrib['defaultDataProcessingRef']
else:
#TODO: maybe recCopy?
xmlRun.append(runChild)
#If any spectra should be written, generate the spectrumList Node.
if spectrumCounts > 0:
specListAttribs = {'count': str(spectrumCounts),
'defaultDataProcessingRef': specDefaultProcRef
}
xmlSpectrumList = xmlWriter.element('spectrumList',
specListAttribs
)
xmlSpectrumList.__enter__()
xmlWriter.write('\n')
for index, key in enumerate(spectrumIds):
smi = msrunContainer.smic[specfile][key]
sai = msrunContainer.saic[specfile][key]
#Store the spectrum element offset here
spectrumIndexList.append((outputFile.tell(),
smi.attributes['id']
))
xmlSpectrum = xmlSpectrumFromSmi(index, smi, sai)
xmlWriter.write(xmlSpectrum, pretty_print=True)
xmlSpectrumList.__exit__(None, None, None)
xmlWriter.write('\n')
#If any chromatograms should be written, generate the
#chromatogramList Node.
if chromatogramCounts > 0:
chromListAttribs = {'count': str(chromatogramCounts),
'defaultDataProcessingRef': chromDefaultProcRef
}
xmlChromatogramList = xmlWriter.element('chromatogramList',
chromListAttribs
)
xmlChromatogramList.__enter__()
xmlWriter.write('\n')
for index, key in enumerate(chromatogramIds):
ci = msrunContainer.cic[specfile][key]
#Store the chromatogram element offset here
chromatogramIndexList.append((outputFile.tell(), ci.id))
xmlChromatogram = xmlChromatogramFromCi(index, ci)
xmlWriter.write(xmlChromatogram, pretty_print=True)
xmlChromatogramList.__exit__(None, None, None)
xmlWriter.write('\n')
xmlRun.__exit__(None, None, None)
xmlWriter.write('\n')
#Close the mzml node
xmlMzml.__exit__(None, None, None)
#Optional: write the indexedMzml nodes and close the indexedMzml node
if writeIndex:
xmlWriter.write('\n')
indexListOffset = outputFile.tell()
_writeMzmlIndexList(xmlWriter, spectrumIndexList, chromatogramIndexList)
_writeIndexListOffset(xmlWriter, indexListOffset)
_writeMzmlChecksum(xmlWriter, outputFile)
xmlIndexedMzml.__exit__(None, None, None)
#Close the xml file
xmlFile.__exit__(None, None, None)
#Write the output mzML file
filepath = aux.joinpath(outputdir, specfile+'.mzML')
with open(filepath, 'wb') as openfile:
openfile.write(outputFile.getvalue())
def _writeMzmlIndexList(xmlWriter, spectrumIndexList, chromatogramIndexList):
""" #TODO: docstring
:param xmlWriter: #TODO: docstring
:param spectrumIndexList: #TODO: docstring
:param chromatogramIndexList: #TODO: docstring
"""
counts = 0
if spectrumIndexList:
counts += 1
if chromatogramIndexList:
counts += 1
if counts == 0:
return None
#Create indexList node
xmlIndexList = xmlWriter.element('indexList', {'count': str(counts)})
xmlIndexList.__enter__()
xmlWriter.write('\n')
_writeIndexListElement(xmlWriter, 'spectrum', spectrumIndexList)
_writeIndexListElement(xmlWriter, 'chromatogram', chromatogramIndexList)
#Close indexList node
xmlIndexList.__exit__(None, None, None)
xmlWriter.write('\n')
def _writeIndexListElement(xmlWriter, elementName, indexList):
""" #TODO: docstring
:param xmlWriter: #TODO: docstring
:param elementName: #TODO: docstring
:param indexList: #TODO: docstring
"""
if indexList:
xmlIndex = xmlWriter.element('index', {'name': elementName})
xmlIndex.__enter__()
xmlWriter.write('\n')
for offset, indexId in indexList:
offsetElement = ETREE.Element('offset', {'idRef': indexId})
offsetElement.text = str(offset)
xmlWriter.write(offsetElement, pretty_print=True)
xmlIndex.__exit__(None, None, None)
xmlWriter.write('\n')
def _writeMzmlChecksum(xmlWriter, outputFile):
""" #TODO: docstring
:param xmlWriter: #TODO: docstring
:param outputFile: #TODO: docstring
"""
sha = hashlib.sha1(outputFile.getvalue())
sha.update('<fileChecksum>')
xmlChecksumElement = ETREE.Element('fileChecksum')
xmlChecksumElement.text = sha.hexdigest()
xmlWriter.write(xmlChecksumElement, pretty_print=True)
def _writeIndexListOffset(xmlWriter, offset):
""" #TODO: docstring
:param xmlWriter: #TODO: docstring
:param offset: #TODO: docstring
"""
xmlIndexListOffset = ETREE.Element('indexListOffset')
xmlIndexListOffset.text = str(offset)
xmlWriter.write(xmlIndexListOffset, pretty_print=True)
# --- generate mzml elements from maspy objects --- #
[docs]def xmlGenScanList(scanList, scanListParams):
""" #TODO: docstring
:params scanList: #TODO: docstring
:params scanListParams: #TODO: docstring
:returns: #TODO: docstring
"""
numEntries = len(scanList)
xmlScanList = ETREE.Element('scanList', {'count': str(numEntries)})
maspy.xml.xmlAddParams(xmlScanList, scanListParams)
for scan in scanList:
#Note: no attributes supported
xmlScan = ETREE.Element('scan', {})
maspy.xml.xmlAddParams(xmlScan, scan['params'])
#Generate the scanWindowList entry
numScanWindows = len(scan['scanWindowList'])
if numScanWindows > 0:
xmlScanWindowList = ETREE.Element('scanWindowList',
{'count': str(numScanWindows)}
)
for scanWindow in scan['scanWindowList']:
xmlScanWindow = ETREE.Element('scanWindow')
maspy.xml.xmlAddParams(xmlScanWindow, scanWindow)
xmlScanWindowList.append(xmlScanWindow)
xmlScan.append(xmlScanWindowList)
xmlScanList.append(xmlScan)
return xmlScanList
[docs]def xmlGenPrecursorList(precursorList):
""" #TODO: docstring
:params precursorList: #TODO: docstring
:returns: #TODO: docstring
"""
numEntries = len(precursorList)
xmlPrecursorList = ETREE.Element('precursorList',
{'count': str(numEntries)}
)
for precursor in precursorList:
#Note: no attributes for external referencing supported
precursorAttrib = {}
if precursor['spectrumRef'] is not None:
precursorAttrib.update({'spectrumRef': precursor['spectrumRef']})
xmlPrecursor = ETREE.Element('precursor', precursorAttrib)
#Add isolationWindow element
if precursor['isolationWindow'] is not None:
xmlIsolationWindow = ETREE.Element('isolationWindow')
maspy.xml.xmlAddParams(xmlIsolationWindow,
precursor['isolationWindow']
)
xmlPrecursor.append(xmlIsolationWindow)
#Add selectedIonList element
numSelectedIons = len(precursor['selectedIonList'])
if numSelectedIons > 0:
xmlSelectedIonList = ETREE.Element('selectedIonList',
{'count': str(numSelectedIons)}
)
for selectedIon in precursor['selectedIonList']:
xmlSelectedIon = ETREE.Element('selectedIon')
maspy.xml.xmlAddParams(xmlSelectedIon, selectedIon)
xmlSelectedIonList.append(xmlSelectedIon)
xmlPrecursor.append(xmlSelectedIonList)
#Add activation element
xmlActivation = ETREE.Element('activation')
maspy.xml.xmlAddParams(xmlActivation, precursor['activation'])
xmlPrecursor.append(xmlActivation)
xmlPrecursorList.append(xmlPrecursor)
return xmlPrecursorList
[docs]def xmlGenProductList(productList):
""" #TODO: docstring
:params productList: #TODO: docstring
:returns: #TODO: docstring
"""
raise NotImplementedError('xmlGenProductList() is not yet implemented')
[docs]def xmlGenBinaryDataArrayList(binaryDataInfo, binaryDataDict,
compression='zlib', arrayTypes=None):
""" #TODO: docstring
:params binaryDataInfo: #TODO: docstring
:params binaryDataDict: #TODO: docstring
:params compression: #TODO: docstring
:params arrayTypes: #TODO: docstring
:returns: #TODO: docstring
"""
#Note: any other value for "compression" than "zlib" results in no
# compression
#Note: Use arrayTypes parameter to specify the order of the arrays
if arrayTypes is None:
arrayTypes = [_ for _ in viewkeys(binaryDataInfo)]
numEntries = len(binaryDataInfo)
xmlBinaryDataArrayList = ETREE.Element('binaryDataArrayList',
{'count': str(numEntries)}
)
for arrayType in arrayTypes:
_, dataTypeParam = maspy.xml.findBinaryDataType(binaryDataInfo[arrayType]['params'])
binaryData = binaryDataDict[arrayType]
bitEncoding = '64' if binaryData.dtype.str == '<f8' else '32'
if binaryData.size > 0:
binaryData, arrayLength = maspy.xml.encodeBinaryData(binaryData,
bitEncoding,
compression
)
else:
binaryData = ''
arrayLength = 0
# --- define binaryDataArray parameters --- #
params = list()
if bitEncoding == '64':
params.append(('MS:1000523', None, None))
else:
params.append(('MS:1000521', None, None))
if compression == 'zlib':
params.append(('MS:1000574', None, None))
else:
params.append(('MS:1000576', None, None))
mandatoryAccessions = ['MS:1000523', 'MS:1000521', 'MS:1000574',
'MS:1000576'
]
for param in binaryDataInfo[arrayType]['params']:
if param[0] not in mandatoryAccessions:
params.append(param)
#Note: not all attributes supported
binaryDataArrayAttrib = {'encodedLength': str(len(binaryData))}
for attr in ['dataProcessingRef']:
if binaryDataInfo[arrayType][attr] is not None:
binaryDataArrayAttrib[attr] = binaryDataInfo[arrayType][attr]
xmlBinaryDataArray = ETREE.Element('binaryDataArray',
binaryDataArrayAttrib
)
maspy.xml.xmlAddParams(xmlBinaryDataArray, params)
xmlBinary = ETREE.Element('binary')
xmlBinary.text = binaryData
xmlBinaryDataArray.append(xmlBinary)
xmlBinaryDataArrayList.append(xmlBinaryDataArray)
return xmlBinaryDataArrayList
[docs]def xmlSpectrumFromSmi(index, smi, sai=None, compression='zlib'):
""" #TODO: docstring
:param index: The zero-based, consecutive index of the spectrum in the
SpectrumList. (mzML specification)
:param smi: a SpectrumMetadataItem instance
:param sai: a SpectrumArrayItem instance, if none is specified no
binaryDataArrayList is written
:param compression: #TODO: docstring
:returns: #TODO: docstring
"""
if sai is not None:
arrayLength = [array.size for array in viewvalues(sai.arrays)]
if len(set(arrayLength)) != 1:
raise Exception('Unequal size for different array in sai.arrays')
else:
arrayLength = arrayLength[0]
else:
arrayLength = 0
spectrumAttrib = {'index': str(index), 'id': smi.attributes['id'],
'defaultArrayLength': str(arrayLength)}
xmlSpectrum = ETREE.Element('spectrum', **spectrumAttrib)
maspy.xml.xmlAddParams(xmlSpectrum, smi.params)
#Add the scanList
if len(smi.scanList) > 0:
xmlSpectrum.append(xmlGenScanList(smi.scanList, smi.scanListParams))
if len(smi.precursorList) > 0:
xmlSpectrum.append(xmlGenPrecursorList(smi.precursorList))
if len(smi.productList) > 0:
xmlSpectrum.append(xmlGenProductList(smi.productList))
if sai is not None:
xmlSpectrum.append(xmlGenBinaryDataArrayList(sai.arrayInfo,
sai.arrays,
compression=compression
))
return xmlSpectrum
[docs]def xmlChromatogramFromCi(index, ci, compression='zlib'):
""" #TODO: docstring
:param index: #TODO: docstring
:param ci: #TODO: docstring
:param compression: #TODO: docstring
:returns: #TODO: docstring
"""
arrayLength = [array.size for array in viewvalues(ci.arrays)]
if len(set(arrayLength)) != 1:
raise Exception('Unequal size for different array in sai.arrays')
else:
arrayLength = arrayLength[0]
chromatogramAttrib = {'index': str(index), 'id': ci.id,
'defaultArrayLength': str(arrayLength)}
if 'dataProcessingRef' in ci.attrib:
chromatogramAttrib.update({'dataProcessingRef': dataProcessingRef})
xmlChromatogram = ETREE.Element('chromatogram', **chromatogramAttrib)
maspy.xml.xmlAddParams(xmlChromatogram, ci.params)
#TODO: add appropriate functions for precursor and product
if ci.product is not None:
raise NotImplementedError()
if ci.precursor is not None:
raise NotImplementedError()
#Sort the array keys, that 'rt' is always the first, necessary for example
# for the software "SeeMS" to properly display chromatograms.
arrayTypes = set(ci.arrayInfo)
if 'rt' in arrayTypes:
arrayTypes.remove('rt')
arrayTypes = ['rt'] + list(arrayTypes)
else:
arrayTypes = list(arrayTypes)
xmlChromatogram.append(xmlGenBinaryDataArrayList(ci.arrayInfo,
ci.arrays,
compression=compression,
arrayTypes=arrayTypes
)
)
return xmlChromatogram