# Licensed under an MIT open source license - see LICENSE
from __future__ import print_function, absolute_import, division
import numpy as np
from scipy.stats import ks_2samp, lognorm # , anderson_ksamp
from statsmodels.distributions.empirical_distribution import ECDF
from statsmodels.base.model import GenericLikelihoodModel
from warnings import warn
from ..stats_utils import hellinger, common_histogram_bins, data_normalization
from ..base_statistic import BaseStatisticMixIn
from ...io import common_types, twod_types, threed_types, input_data
[docs]
class PDF(BaseStatisticMixIn):
'''
Create the PDF of a given array.
Parameters
----------
img : %(dtypes)s
A 1-3D array.
min_val : float, optional
Minimum value to keep in the given image.
bins : list or numpy.ndarray or int, optional
Bins to compute the PDF from.
weights : %(dtypes)s, optional
Weights to apply to the image. Must have the same shape as the image.
normalization_type : {"standardize", "center", "normalize", "normalize_by_mean"}, optional
See `~turbustat.statistics.stat_utils.data_normalization`.
Examples
--------
>>> from turbustat.statistics import PDF
>>> from astropy.io import fits
>>> moment0 = fits.open("Design4_21_0_0_flatrho_0021_13co.moment0.fits")[0] # doctest: +SKIP
>>> pdf_mom0 = PDF(moment0).run(verbose=True) # doctest: +SKIP
'''
__doc__ %= {"dtypes": " or ".join(common_types + twod_types +
threed_types)}
def __init__(self, img, min_val=-np.inf, bins=None, weights=None,
normalization_type=None):
super(PDF, self).__init__()
self.need_header_flag = False
self.header = None
output_data = input_data(img, no_header=True)
self.img = output_data
# We want to remove NaNs and value below the threshold.
keep_values = np.logical_and(np.isfinite(output_data),
output_data > min_val)
self.data = output_data[keep_values]
# Do the same for the weights, then apply weights to the data.
if weights is not None:
output_weights = input_data(weights, no_header=True)
self.weights = output_weights[keep_values]
isfinite = np.isfinite(self.weights)
self.data = self.data[isfinite] * self.weights[isfinite]
if normalization_type is not None:
self._normalization_type = normalization_type
self.data = data_normalization(self.data,
norm_type=normalization_type)
else:
self._normalization_type = "None"
self._bins = bins
self._pdf = None
self._ecdf = None
self._do_fit = False
[docs]
def make_pdf(self, bins=None):
'''
Create the PDF.
Parameters
----------
bins : list or numpy.ndarray or int, optional
Bins to compute the PDF from. Overrides initial bin input.
'''
if bins is not None:
self._bins = bins
# If the number of bins is not given, use sqrt of data length.
if self.bins is None:
self._bins = np.sqrt(self.data.shape[0])
self._bins = int(np.round(self.bins))
# norm_weights = np.ones_like(self.data) / self.data.shape[0]
self._pdf, bin_edges = np.histogram(self.data, bins=self.bins,
density=True)
# weights=norm_weights)
self._bins = (bin_edges[:-1] + bin_edges[1:]) / 2
@property
def normalization_type(self):
return self._normalization_type
@property
def pdf(self):
'''
PDF values in `~PDF.bins`.
'''
return self._pdf
@property
def bins(self):
'''
Bin centers.
'''
return self._bins
[docs]
def make_ecdf(self):
'''
Create the ECDF.
'''
if self.pdf is None:
self.make_pdf()
self._ecdf_function = ECDF(self.data)
self._ecdf = self._ecdf_function(self.bins)
@property
def ecdf(self):
'''
ECDF values in `~PDF.bins`.
'''
return self._ecdf
[docs]
def find_percentile(self, values):
'''
Return the percentiles of given values from the
data distribution.
Parameters
----------
values : float or np.ndarray
Value or array of values.
'''
if self.ecdf is None:
self.make_ecdf()
return self._ecdf_function(values) * 100.
[docs]
def find_at_percentile(self, percentiles):
'''
Return the values at the given percentiles.
Parameters
----------
percentiles : float or np.ndarray
Percentile or array of percentiles. Must be between 0 and 100.
'''
if np.any(np.logical_or(percentiles > 100, percentiles < 0.)):
raise ValueError("Percentiles must be between 0 and 100.")
return np.percentile(self.data, percentiles)
[docs]
def fit_pdf(self, model=lognorm, verbose=False,
fit_type='mle', floc=True, loc=0.0, fscale=False, scale=1.0,
**kwargs):
'''
Fit a model to the PDF. Use statsmodel's generalized likelihood
setup to get uncertainty estimates and such.
Parameters
----------
model : scipy.stats distribution, optional
Pass any scipy distribution. NOTE: All fits assume `loc` can be
fixed to 0. This is reasonable for all realistic PDF forms in the
ISM.
verbose : bool, optional
Enable printing of the fit results.
fit_type : {'mle', 'mcmc'}, optional
Type of fitting to use. By default Maximum Likelihood Estimation
('mle') is used. An MCMC approach ('mcmc') may also be used. This
requires the optional `emcee` to be installed. kwargs can be
passed to adjust various properties of the MCMC chain.
floc : bool, optional
Fix the `loc` parameter when fitting.
loc : float, optional
Value to set `loc` to when fixed.
fscale : bool, optional
Fix the `scale` parameter when fitting.
scale : float, optional
Value to set `scale` to when fixed.
kwargs : Passed to `~emcee.EnsembleSampler`.
'''
if fit_type not in ['mle', 'mcmc']:
raise ValueError("fit_type must be 'mle' or 'mcmc'.")
self._fit_fixes = {"loc": [floc, loc], "scale": [fscale, scale]}
self._do_fit = True
class Likelihood(GenericLikelihoodModel):
# Get the number of parameters from shapes.
# Add one for scales, since we're assuming loc is frozen.
# Keeping loc=0 is appropriate for log-normal models.
nparams = 1 if model.shapes is None else \
len(model.shapes.split(",")) + 1
_loc = loc
_scale = scale
def loglike(self, params):
if np.isnan(params).any():
return - np.inf
if not floc and not fscale:
loc = params[-2]
scale = params[-1]
cut = -2
elif not floc:
loc = params[-1]
scale = self._scale
cut = -1
elif not fscale:
scale = params[-1]
loc = self._loc
cut = -1
loglikes = \
model.logpdf(self.endog, *params[:cut],
scale=scale,
loc=loc)
if not np.isfinite(loglikes).all():
return -np.inf
else:
return loglikes.sum()
def emcee_fit(model, init_params, burnin=200, steps=2000, thin=10):
try:
import emcee
except ImportError:
raise ImportError("emcee must be installed for MCMC fitting.")
ndim = len(init_params)
nwalkers = ndim * 10
p0 = np.zeros((nwalkers, ndim))
for i, val in enumerate(init_params):
p0[:, i] = np.random.randn(nwalkers) * 0.1 + val
sampler = emcee.EnsembleSampler(nwalkers,
ndim,
model.loglike,
args=[])
pos, prob, state = sampler.run_mcmc(p0, burnin)
sampler.reset()
pos, prob, state = sampler.run_mcmc(pos, steps, thin=thin)
return sampler
# Do an initial fit with the scipy model
if floc and fscale:
init_params = model.fit(self.data)
elif floc:
init_params = model.fit(self.data, floc=loc)
# Remove loc from the params
init_params = np.append(init_params[:-2], init_params[-1])
elif fscale:
init_params = model.fit(self.data, fscale=scale)
# Remove scale from the params
init_params = np.append(init_params[:-2], init_params[-2])
else:
init_params = model.fit(self.data)
init_params = np.array(init_params)
self._model = Likelihood(self.data)
self._scipy_model = model
if fit_type == 'mle':
fitted_model = \
self._model.fit(start_params=init_params, method='nm')
self._mle_fit = fitted_model
fitted_model.df_model = len(init_params)
fitted_model.df_resid = len(self.data) - len(init_params)
self._model_params = fitted_model.params.copy()
try:
self._model_stderrs = fitted_model.bse.copy()
cov_calc_failed = False
except ValueError:
warn("Variance calculation failed.")
self._model_stderrs = np.ones_like(self.model_params) * np.NaN
cov_calc_failed = True
elif fit_type == 'mcmc':
chain = emcee_fit(self._model,
init_params.copy(),
**kwargs)
self._model_params = np.mean(chain.flatchain, axis=0)
self._model_stderrs = np.percentile(chain.flatchain, [15, 85],
axis=0)
self._mcmc_chain = chain
if verbose:
if fit_type == 'mle':
if cov_calc_failed:
print("Fitted parameters: {}".format(self.model_params))
print("Covariance calculation failed.")
else:
print(fitted_model.summary())
else:
print("Ran chain for {0} iterations".format(chain.iterations))
print("Used {} walkers".format(chain.acceptance_fraction.size))
print("Mean acceptance fraction of {}"
.format(np.mean(chain.acceptance_fraction)))
print("Parameter values: {}".format(self.model_params))
print("15th to 85th percentile ranges: {}"
.format(self.model_stderrs[1] - self.model_stderrs[0]))
@property
def model_params(self):
'''
Parameters of the fitted model.
'''
if hasattr(self, "_model_params"):
return self._model_params
raise Exception("No model has been fit. Run `fit_pdf` first.")
@property
def model_stderrs(self):
'''
Standard errors of the fitted model. If using an MCMC, the 15th and
85th percentiles are returned.
'''
if hasattr(self, "_model_stderrs"):
return self._model_stderrs
raise Exception("No model has been fit. Run `fit_pdf` first.")
[docs]
def corner_plot(self, **kwargs):
'''
Create a corner plot from the MCMC. Requires the 'corner' package.
Parameters
----------
kwargs : Passed to `~corner.corner`.
'''
if not hasattr(self, "_mcmc_chain"):
raise Exception("Must run MCMC fitting first.")
try:
import corner
except ImportError:
raise ImportError("The optional package 'corner' is not "
"installed.")
corner.corner(self._mcmc_chain.flatchain, **kwargs)
[docs]
def trace_plot(self, **kwargs):
'''
Create a trace plot from the MCMC.
Parameters
----------
kwargs : Passed to `~matplotlib.pyplot.plot`.
'''
if not hasattr(self, "_mcmc_chain"):
raise Exception("Must run MCMC fitting first.")
npars = self._mcmc_chain.flatchain.shape[1]
import matplotlib.pyplot as plt
fig, axes = plt.subplots(npars, 1, sharex=True)
for i, ax in enumerate(axes.ravel()):
ax.plot(self._mcmc_chain.flatchain[:, i], **kwargs)
ax.set_ylabel("par{}".format(i + 1))
axes.ravel()[-1].set_xlabel("Iterations")
plt.tight_layout()
[docs]
def plot_distrib(self, save_name=None, color='r', fit_color='k',
show_ecdf=True):
'''
Plot the PDF distribution and (if fitted) the best fit model.
Optionally show the ECDF and fit ECDF, too.
Parameters
----------
save_name : str,optional
Save the figure when a file name is given.
color : {str, RGB tuple}, optional
Color to show the Genus curves in.
fit_color : {str, RGB tuple}, optional
Color of the fitted line. Defaults to `color` when no input is
given.
show_ecdf : bool, optional
Plot the ECDF when enabled.
'''
import matplotlib.pyplot as plt
if self.normalization_type == "standardize":
xlabel = r"z-score"
elif self.normalization_type == "center":
xlabel = r"$I - \bar{I}$"
elif self.normalization_type == "normalize_by_mean":
xlabel = r"$I/\bar{I}$"
else:
xlabel = r"Intensity"
if fit_color is None:
fit_color = color
# PDF
if show_ecdf:
plt.subplot(121)
else:
plt.subplot(111)
plt.semilogy(self.bins, self.pdf, '-', color=color, label='Data')
if self._do_fit:
# Plot the fitted model.
vals = np.linspace(self.bins[0], self.bins[-1], 1000)
# Check which of the parameters were kept fixed
if self._fit_fixes['loc'][0] and self._fit_fixes['scale'][0]:
loc = self._fit_fixes['loc'][1]
scale = self._fit_fixes['scale'][1]
params = self.model_params
elif self._fit_fixes['loc'][0]:
loc = self._fit_fixes['loc'][1]
scale = self.model_params[-1]
params = self.model_params[:-1]
elif self._fit_fixes['scale'][0]:
loc = self.model_params[-1]
scale = self._fit_fixes['scale'][1]
params = self.model_params[:-1]
else:
loc = self.model_params[-2]
scale = self.model_params[-1]
params = self.model_params[:-2]
plt.semilogy(vals,
self._scipy_model.pdf(vals, *params,
scale=scale,
loc=loc),
'--', color=fit_color, label='Fit')
plt.legend(loc='best')
plt.grid(True)
plt.xlabel(xlabel)
plt.ylabel("PDF")
# ECDF
if show_ecdf:
ax2 = plt.subplot(122)
ax2.yaxis.tick_right()
ax2.yaxis.set_label_position("right")
if self.normalization_type != "None":
ax2.plot(self.bins, self.ecdf, '-', color=color)
if self._do_fit:
ax2.plot(vals,
self._scipy_model.cdf(vals, *params,
scale=scale,
loc=loc),
'--', color=fit_color)
else:
ax2.semilogx(self.bins, self.ecdf, '-', color=color)
if self._do_fit:
ax2.semilogx(vals,
self._scipy_model.cdf(vals, *params,
scale=scale,
loc=0),
'--', color=fit_color)
plt.grid(True)
plt.xlabel(xlabel)
plt.ylabel("ECDF")
plt.tight_layout()
if save_name is not None:
plt.savefig(save_name)
plt.close()
else:
plt.show()
[docs]
def run(self, verbose=False, save_name=None, bins=None, do_fit=True,
model=lognorm, color=None, **kwargs):
'''
Compute the PDF and ECDF. Enabling verbose provides
a summary plot.
Parameters
----------
verbose : bool, optional
Enables plotting of the results.
save_name : str,optional
Save the figure when a file name is given.
bins : list or numpy.ndarray or int, optional
Bins to compute the PDF from. Overrides initial bin input.
do_fit : bool, optional
Enables (by default) fitting a given model.
model : scipy.stats distribution, optional
Pass any scipy distribution. See `~PDF.fit_pdf`.
color : {str, RGB tuple}, optional
Color to show the Genus curves in when `verbose=True`.
kwargs : Passed to `~PDF.fit_pdf`.
'''
self.make_pdf(bins=bins)
self.make_ecdf()
if do_fit:
self.fit_pdf(model=model, verbose=verbose, **kwargs)
if verbose:
self.plot_distrib(save_name=save_name, color=color)
return self
[docs]
class PDF_Distance(object):
'''
Calculate the distance between two arrays using their PDFs.
.. note:: Pre-computed `~PDF` classes cannot be passed to `~PDF_Distance`
as the data need to be normalized and the PDFs should use the
same set of histogram bins.
Parameters
----------
img1 : %(dtypes)s
Array (1-3D).
img2 : %(dtypes)s
Array (1-3D).
min_val1 : float, optional
Minimum value to keep in img1
min_val2 : float, optional
Minimum value to keep in img2
do_fit : bool, optional
Enables fitting a lognormal distribution to each data set.
normalization_type : {"normalize", "normalize_by_mean"}, optional
See `~turbustat.statistics.stat_utils.data_normalization`.
nbins : int, optional
Manually set the number of bins to use for creating the PDFs.
weights1 : %(dtypes)s, optional
Weights to be used with img1
weights2 : %(dtypes)s, optional
Weights to be used with img2
bin_min : float, optional
Minimum value to use for the histogram bins *after* normalization is
applied.
bin_max : float, optional
Maximum value to use for the histogram bins *after* normalization is
applied.
'''
__doc__ %= {"dtypes": " or ".join(common_types + twod_types +
threed_types)}
def __init__(self, img1, img2, min_val1=-np.inf, min_val2=-np.inf,
do_fit=True, normalization_type=None,
nbins=None, weights1=None, weights2=None,
bin_min=None, bin_max=None):
super(PDF_Distance, self).__init__()
if do_fit:
if normalization_type in ["standardize", "center"]:
raise Exception("Cannot perform lognormal fit when using"
" 'standardize' or 'center'.")
self.normalization_type = normalization_type
self.PDF1 = PDF(img1, min_val=min_val1,
normalization_type=normalization_type,
weights=weights1)
self.PDF2 = PDF(img2, min_val=min_val2,
normalization_type=normalization_type,
weights=weights2)
self.bins, self.bin_centers = \
common_histogram_bins(self.PDF1.data, self.PDF2.data,
return_centered=True, nbins=nbins,
min_val=bin_min, max_val=bin_max)
# Feed the common set of bins to be used in the PDFs
self._do_fit = do_fit
self.PDF1.run(verbose=False, bins=self.bins, do_fit=do_fit)
self.PDF2.run(verbose=False, bins=self.bins, do_fit=do_fit)
[docs]
def compute_hellinger_distance(self):
'''
Computes the Hellinger Distance between the two PDFs.
'''
# We're using the same bins, so normalize each to unity to keep the
# distance normalized.
self.hellinger_distance = \
hellinger(self.PDF1.pdf / self.PDF1.pdf.sum(),
self.PDF2.pdf / self.PDF2.pdf.sum())
[docs]
def compute_ks_distance(self):
'''
Compute the distance using the KS Test.
'''
D, p = ks_2samp(self.PDF1.data, self.PDF2.data)
self.ks_distance = D
self.ks_pval = p
[docs]
def compute_ad_distance(self):
'''
Compute the distance using the Anderson-Darling Test.
'''
raise NotImplementedError(
"Use of the Anderson-Darling test has been disabled"
" due to occurence of overflow errors.")
# D, _, p = anderson_ksamp([self.PDF1.data, self.PDF2.data])
# self.ad_distance = D
# self.ad_pval = p
[docs]
def compute_lognormal_distance(self):
'''
Compute the combined t-statistic for the difference in the widths of
a lognormal distribution.
'''
try:
self.PDF1.model_params
self.PDF2.model_params
except AttributeError:
raise Exception("Fitting has not been performed. 'do_fit' must "
"first be enabled.")
diff = np.abs(self.PDF1.model_params[0] - self.PDF2.model_params[0])
denom = np.sqrt(self.PDF1.model_stderrs[0]**2 +
self.PDF2.model_stderrs[0]**2)
self.lognormal_distance = diff / denom
[docs]
def distance_metric(self, statistic='all', verbose=False,
plot_kwargs1={'color': 'b', 'marker': 'D',
'label': '1'},
plot_kwargs2={'color': 'g', 'marker': 'o',
'label': '2'},
save_name=None):
'''
Calculate the distance.
*NOTE:* The data are standardized before comparing to ensure the
distance is calculated on the same scales.
Parameters
----------
statistic : 'all', 'hellinger', 'ks', 'lognormal'
Which measure of distance to use.
labels : tuple, optional
Sets the labels in the output plot.
verbose : bool, optional
Enables plotting.
plot_kwargs1 : dict, optional
Pass kwargs to `~matplotlib.pyplot.plot` for
`dataset1`.
plot_kwargs2 : dict, optional
Pass kwargs to `~matplotlib.pyplot.plot` for
`dataset2`.
save_name : str,optional
Save the figure when a file name is given.
'''
if statistic is 'all':
self.compute_hellinger_distance()
self.compute_ks_distance()
# self.compute_ad_distance()
if self._do_fit:
self.compute_lognormal_distance()
elif statistic is 'hellinger':
self.compute_hellinger_distance()
elif statistic is 'ks':
self.compute_ks_distance()
elif statistic is 'lognormal':
if not self._do_fit:
raise Exception("Fitting must be enabled to compute the"
" lognormal distance.")
self.compute_lognormal_distance()
# elif statistic is 'ad':
# self.compute_ad_distance()
else:
raise TypeError("statistic must be 'all',"
"'hellinger', 'ks', or 'lognormal'.")
# "'hellinger', 'ks' or 'ad'.")
if verbose:
import matplotlib.pyplot as plt
defaults1 = {'color': 'b', 'marker': 'D', 'label': '1'}
defaults2 = {'color': 'g', 'marker': 'o', 'label': '2'}
for key in defaults1:
if key not in plot_kwargs1:
plot_kwargs1[key] = defaults1[key]
for key in defaults2:
if key not in plot_kwargs2:
plot_kwargs2[key] = defaults2[key]
if self.normalization_type == "standardize":
xlabel = r"z-score"
elif self.normalization_type == "center":
xlabel = r"$I - \bar{I}$"
elif self.normalization_type == "normalize_by_mean":
xlabel = r"$I/\bar{I}$"
else:
xlabel = r"Intensity"
# Print fit summaries if using fitting
if self._do_fit:
try:
print(self.PDF1._mle_fit.summary())
except ValueError:
warn("Covariance calculation failed. Check the fit quality"
" for data set 1!")
try:
print(self.PDF2._mle_fit.summary())
except ValueError:
warn("Covariance calculation failed. Check the fit quality"
" for data set 2!")
# PDF
plt.subplot(121)
plt.semilogy(self.bin_centers, self.PDF1.pdf,
color=plot_kwargs1['color'], linestyle='none',
marker=plot_kwargs1['marker'],
label=plot_kwargs1['label'])
plt.semilogy(self.bin_centers, self.PDF2.pdf,
color=plot_kwargs2['color'], linestyle='none',
marker=plot_kwargs2['marker'],
label=plot_kwargs2['label'])
if self._do_fit:
# Plot the fitted model.
vals = np.linspace(self.bin_centers[0], self.bin_centers[-1],
1000)
fit_params1 = self.PDF1.model_params
plt.semilogy(vals,
lognorm.pdf(vals, *fit_params1[:-1],
scale=fit_params1[-1],
loc=0),
color=plot_kwargs1['color'], linestyle='-')
fit_params2 = self.PDF2.model_params
plt.semilogy(vals,
lognorm.pdf(vals, *fit_params2[:-1],
scale=fit_params2[-1],
loc=0),
color=plot_kwargs2['color'], linestyle='-')
plt.grid(True)
plt.xlabel(xlabel)
plt.ylabel("PDF")
plt.legend(frameon=True)
# ECDF
ax2 = plt.subplot(122)
ax2.yaxis.tick_right()
ax2.yaxis.set_label_position("right")
if self.normalization_type is not None:
ax2.plot(self.bin_centers, self.PDF1.ecdf,
color=plot_kwargs1['color'], linestyle='-',
marker=plot_kwargs1['marker'],
label=plot_kwargs1['label'])
ax2.plot(self.bin_centers, self.PDF2.ecdf,
color=plot_kwargs2['color'], linestyle='-',
marker=plot_kwargs2['marker'],
label=plot_kwargs2['label'])
if self._do_fit:
ax2.plot(vals,
lognorm.cdf(vals,
*fit_params1[:-1],
scale=fit_params1[-1],
loc=0),
color=plot_kwargs1['color'], linestyle='-',)
ax2.plot(vals,
lognorm.cdf(vals,
*fit_params2[:-1],
scale=fit_params2[-1],
loc=0),
color=plot_kwargs2['color'], linestyle='-',)
else:
ax2.semilogx(self.bin_centers, self.PDF1.ecdf,
color=plot_kwargs1['color'], linestyle='-',
marker=plot_kwargs1['marker'],
label=plot_kwargs1['label'])
ax2.semilogx(self.bin_centers, self.PDF2.ecdf,
color=plot_kwargs2['color'], linestyle='-',
marker=plot_kwargs2['marker'],
label=plot_kwargs2['label'])
if self._do_fit:
ax2.semilogx(vals,
lognorm.cdf(vals, *fit_params1[:-1],
scale=fit_params1[-1],
loc=0),
color=plot_kwargs1['color'], linestyle='-',)
ax2.semilogx(vals,
lognorm.cdf(vals, *fit_params2[:-1],
scale=fit_params2[-1],
loc=0),
color=plot_kwargs2['color'], linestyle='-',)
plt.grid(True)
plt.xlabel(xlabel)
plt.ylabel("ECDF")
plt.tight_layout()
if save_name is not None:
plt.savefig(save_name)
plt.close()
else:
plt.show()
return self