from __future__ import print_function
import numpy as np
from six import iteritems
from six.moves import xrange
[docs]def gradient_purePython(mzs, intensities, min_intensity, peak_width_bins):
"""This function takes a vector of intensities and finds local maxima based on turning points in the first
differential.
It is a trivial algorithm that assumes sufficient signal processing has been performed that the spectra are locally smooth.
Input
mzs: list of mz values for profile spectrum
intensities: list of intensity values. must be same length as mzs
Output:
mzs_c: list of apex mzs
intensites_c: list of apex intensities
"""
# Input Checks
if not len(mzs) == len(intensities):
raise ValueError("Length of mzs must match length of intensities")
# Calculate first and second differential
dmz = [intensities[ii] - intensities[ii - 1]
for ii in range(1, len(intensities))]
dmz2 = [dmz[ii] - dmz[ii - 1] for ii in range(1, len(dmz))]
# Find crossing points
cPoint = [dmz[ii] * dmz[ii - 1] <= 0 for ii in range(1, len(dmz))]
mPoint = [d2 < 0 for d2 in dmz2]
# Get indicies of crossing points
# Could check left/right of crossing point
indices_list_l = [ii for ii in range(
0, len(cPoint)) if all((cPoint[ii], mPoint[ii]))]
indices_list_r = [il + 1 for il in indices_list_l]
indices_list = [il if intensities[il] > intensities[ir]
else ir for il, ir in zip(indices_list_l, indices_list_r)]
indices_list = list(set(indices_list)) # unique values
mzs_list = [mzs[ix] for ix in indices_list]
intensities_list = [intensities[ix] for ix in indices_list]
# Only keep peak above min intensity
mzs_list = [mzs_list[ii] for ii in range(len(mzs_list)) if intensities_list[
ii] > min_intensity]
intensities_list = [intensities_list[ii] for ii in range(
len(intensities_list)) if intensities_list[ii] > min_intensity]
# Quick and dirty approximation of centroids
mzs_c, intensities_c = estimate_centroid_simple_weighting(
mzs, intensities, indices_list, peak_width_bins)
return mzs_c, intensities_c
[docs]def estimate_centroid_simple_weighting(mzs, intensities, indices_list,
peak_width_bins):
"""Simple averaging to estimate the peak centroid
Input:
mzs: profile spectrum mzs
intensities: profile spectrum intensities
indices_list: mz bin of local maxima
Output:
mzs_c: centroids mzs
intensities_c: centroid intensity (==peak maxima)
"""
if not peak_width_bins % 2 == 1:
raise ValueError("peak width should be an odd number of bins")
weighted_bins = int(peak_width_bins) / 2 # note integer division
mzs_c = []
intensities_c = []
for ii in range(len(indices_list)):
s = w = 0.0
max_intensity_idx = 0
max_intensity = -1
for k in xrange(-weighted_bins, weighted_bins + 1):
idx = indices_list[ii] + k
mz = mzs[idx]
intensity = intensities[idx]
w += intensity
s += mz * intensity
if intensity > max_intensity:
max_intensity = intensity
mzs_c.append(s / w)
intensities_c.append(max_intensity)
return mzs_c, intensities_c
[docs]def gradient(mzs, intensities, **opt_args):
function_args = {'max_output': -1, 'weighted_bins': 1,
'min_intensity': 1e-5, 'grad_type': 'gradient'}
for key, val in iteritems(opt_args):
if key in function_args.keys():
function_args[key] = val
else:
print('possible arguments:')
for i in function_args.keys():
print(i)
raise NameError('gradient does not take argument: %s' % key)
# TODO: temporary workaround to disable the parameter until it is fixed.
mzs = np.asarray(mzs)
intensities = np.asarray(intensities)
mzMaxNum = function_args['max_output']
weighted_bins = function_args['weighted_bins']
min_intensity = function_args['min_intensity']
gradient_type = function_args['grad_type']
assert mzMaxNum < len(mzs)
assert len(mzs) == len(intensities)
assert weighted_bins < len(mzs) / 2.
# calc first§ond differential
if gradient_type == 'gradient':
MZgrad = np.gradient(intensities)
MZgrad2 = np.gradient(MZgrad)[0:-1]
elif gradient_type == 'diff':
MZgrad = np.concatenate((np.diff(intensities), [1]))
MZgrad2 = np.diff(MZgrad)
else:
raise ValueError('gradient type {} not known'.format(gradient_type))
# detect crossing points
cPoint = MZgrad[0:-1] * MZgrad[1:] <= 0
mPoint = MZgrad2 < 0
indices = cPoint & mPoint
# bool->list of indices
# Could check left/right of crossing point
indices_list_l = np.where(cPoint & mPoint)[0]
indices_list_r = indices_list_l + 1
indices_list = np.where(
intensities[indices_list_l] > intensities[indices_list_r],
indices_list_l, indices_list_r)
indices_list = np.unique(indices_list)
# Remove any 'peaks' that aren't real
indices_list = indices_list[intensities[indices_list] > min_intensity]
# Select the peaks
intensities_list = intensities[indices_list]
mzs_list = mzs[indices_list]
# Tidy up if required
if mzMaxNum > 0:
if len(mzs_list) > mzMaxNum:
sort_idx = np.argsort(intensities_list)
intensities_list = intensities_list[sort_idx[-mzMaxNum:]]
mzs_list = mzs_list[sort_idx[-mzMaxNum:]]
indices_list = indices_list[sort_idx[-mzMaxNum:]]
# elif len(mzs) < mzMaxNum:
# # FIXME: for what purpose are we appending zeros here?
# lengthDiff = mzMaxNum - len(indices_list)
# pad = np.zeros((lengthDiff, 1))
# mzs_list = np.concatenate((mzs_list, pad))
# intensities_list = np.concatenate((intensities_list, pad))
# indices_list = np.concatenate((indices_list, pad))
if weighted_bins > 0:
# check no peaks within bin width of spectrum edge
good_idx = (indices_list > weighted_bins) & (
indices_list < (len(mzs) - weighted_bins))
mzs_list = mzs_list[good_idx]
intensities_list = intensities_list[good_idx]
indices_list = indices_list[good_idx]
r = pick_max_(mzs, intensities, mzs_list, intensities_list,
indices_list, weighted_bins)
mzs_list = r[0, :]
intensities_list = r[1, :]
indices_list = r[2, :].astype(int)
return (mzs_list, intensities_list, indices_list)
[docs]def pick_max_(mzs, intensities, mzs_list, intensities_list, indices_list,
weighted_bins):
result = np.zeros((3, len(mzs_list)))
for ii in xrange(len(mzs_list)):
s = w = 0.0
max_intensity_idx = 0
max_intensity = -1
for k in xrange(-weighted_bins, weighted_bins + 1):
idx = indices_list[ii] + k
mz = mzs[idx]
intensity = intensities[idx]
w += intensity
s += mz * intensity
if intensity > max_intensity:
max_intensity = intensity
max_intensity_idx = idx
result[0][ii] = s / w
result[1][ii] = max_intensity
result[2][ii] = max_intensity_idx
return result
try:
from numba import njit
pick_max_ = njit(pick_max_)
except ImportError:
pass