Source code for turbustat.statistics.pdf.compare_pdf

# Licensed under an MIT open source license - see LICENSE

import numpy as np
from scipy.stats import ks_2samp, lognorm  # , anderson_ksamp
from statsmodels.distributions.empirical_distribution import ECDF
from statsmodels.base.model import GenericLikelihoodModel
from warnings import warn

from ..stats_utils import hellinger, common_histogram_bins, data_normalization
from ..base_statistic import BaseStatisticMixIn
from ...io import common_types, twod_types, threed_types, input_data


[docs]class PDF(BaseStatisticMixIn): ''' Create the PDF of a given array. Parameters ---------- img : %(dtypes)s A 1-3D array. min_val : float, optional Minimum value to keep in the given image. bins : list or numpy.ndarray or int, optional Bins to compute the PDF from. weights : %(dtypes)s, optional Weights to apply to the image. Must have the same shape as the image. use_standardized : bool, optional Enable to standardize the data before computing the PDF and ECDF. normalization_type : {"standardize", "center", "normalize", "normalize_by_mean"}, optional See `~turbustat.statistics.stat_utils.data_normalization`. Example ------- >>> from turbustat.statistics import PDF >>> from astropy.io import fits >>> moment0 = fits.open("Design4_21_0_0_flatrho_0021_13co.moment0.fits")[0] # doctest: +SKIP >>> pdf_mom0 = PDF(moment0).run(verbose=True) # doctest: +SKIP ''' __doc__ %= {"dtypes": " or ".join(common_types + twod_types + threed_types)} def __init__(self, img, min_val=-np.inf, bins=None, weights=None, normalization_type=None): super(PDF, self).__init__() self.need_header_flag = False self.header = None output_data = input_data(img, no_header=True) self.img = output_data # We want to remove NaNs and value below the threshold. keep_values = np.logical_and(np.isfinite(output_data), output_data > min_val) self.data = output_data[keep_values] # Do the same for the weights, then apply weights to the data. if weights is not None: output_weights = input_data(weights, no_header=True) self.weights = output_weights[keep_values] isfinite = np.isfinite(self.weights) self.data = self.data[isfinite] * self.weights[isfinite] if normalization_type is not None: self._normalization_type = normalization_type self.data = data_normalization(self.data, norm_type=normalization_type) else: self._normalization_type = "None" self._bins = bins self._pdf = None self._ecdf = None
[docs] def make_pdf(self, bins=None): ''' Create the PDF. Parameters ---------- bins : list or numpy.ndarray or int, optional Bins to compute the PDF from. Overrides initial bin input. ''' if bins is not None: self._bins = bins # If the number of bins is not given, use sqrt of data length. if self.bins is None: self._bins = np.sqrt(self.data.shape[0]) self._bins = int(np.round(self.bins)) # norm_weights = np.ones_like(self.data) / self.data.shape[0] self._pdf, bin_edges = np.histogram(self.data, bins=self.bins, density=True)#, # weights=norm_weights) self._bins = (bin_edges[:-1] + bin_edges[1:]) / 2
@property def normalization_type(self): return self._normalization_type @property def pdf(self): ''' PDF values in `~PDF.bins`. ''' return self._pdf @property def bins(self): ''' Bin centers. ''' return self._bins
[docs] def make_ecdf(self): ''' Create the ECDF. ''' if self.pdf is None: self.make_pdf() self._ecdf_function = ECDF(self.data) self._ecdf = self._ecdf_function(self.bins)
@property def ecdf(self): ''' ECDF values in `~PDF.bins`. ''' return self._ecdf
[docs] def find_percentile(self, values): ''' Return the percentiles of given values from the data distribution. Parameters ---------- values : float or np.ndarray Value or array of values. ''' if self.ecdf is None: self.make_ecdf() return self._ecdf_function(values) * 100.
[docs] def find_at_percentile(self, percentiles): ''' Return the values at the given percentiles. Parameters ---------- percentiles : float or np.ndarray Percentile or array of percentiles. Must be between 0 and 100. ''' if np.any(np.logical_or(percentiles > 100, percentiles < 0.)): raise ValueError("Percentiles must be between 0 and 100.") return np.percentile(self.data, percentiles)
[docs] def fit_pdf(self, model=lognorm, verbose=False, fit_type='mle', **kwargs): ''' Fit a model to the PDF. Use statsmodel's generalized likelihood setup to get uncertainty estimates and such. Parameters ---------- model : scipy.stats distribution, optional Pass any scipy distribution. NOTE: All fits assume `loc` can be fixed to 0. This is reasonable for all realistic PDF forms in the ISM. verbose : bool, optional Enable printing of the fit results. fit_type : {'mle', 'mcmc'}, optional Type of fitting to use. By default Maximum Likelihood Estimation ('mle') is used. An MCMC approach ('mcmc') may also be used. This requires the optional `emcee` to be installed. kwargs can be passed to adjust various properties of the MCMC chain. kwargs : Passed to `~emcee.EnsembleSampler`. ''' if fit_type not in ['mle', 'mcmc']: raise ValueError("fit_type must be 'mle' or 'mcmc'.") class Likelihood(GenericLikelihoodModel): # Get the number of parameters from shapes. # Add one for scales, since we're assuming loc is frozen. # Keeping loc=0 is appropriate for log-normal models. nparams = 1 if model.shapes is None else \ len(model.shapes.split(",")) + 1 def loglike(self, params): if np.isnan(params).any(): return - np.inf loglikes = \ model.logpdf(self.endog, *params[:-1], scale=params[-1], loc=0) if not np.isfinite(loglikes).all(): return - np.inf else: return loglikes.sum() def emcee_fit(model, init_params, burnin=200, steps=2000, thin=10): try: import emcee except ImportError: raise ImportError("emcee must be installed for MCMC fitting.") ndim = len(init_params) nwalkers = ndim * 10 p0 = np.zeros((nwalkers, ndim)) for i, val in enumerate(init_params): p0[:, i] = np.random.randn(nwalkers) * 0.1 + val sampler = emcee.EnsembleSampler(nwalkers, ndim, model.loglike, args=[]) pos, prob, state = sampler.run_mcmc(p0, burnin) sampler.reset() pos, prob, state = sampler.run_mcmc(pos, steps, thin=thin) return sampler # Do an initial fit with the scipy model init_params = model.fit(self.data, floc=0.0) init_params = np.append(init_params[:-2], init_params[-1]) self._model = Likelihood(self.data) if fit_type == 'mle': fitted_model = \ self._model.fit(start_params=init_params, method='nm') self._mle_fit = fitted_model fitted_model.df_model = len(init_params) fitted_model.df_resid = len(self.data) - len(init_params) self._model_params = fitted_model.params.copy() try: self._model_stderrs = fitted_model.bse.copy() cov_calc_failed = False except ValueError: warn("Variance calculation failed.") self._model_stderrs = np.ones_like(self.model_params) * np.NaN cov_calc_failed = True elif fit_type == 'mcmc': chain = emcee_fit(self._model, init_params.copy(), **kwargs) self._model_params = np.mean(chain.flatchain, axis=0) self._model_stderrs = np.percentile(chain.flatchain, [15, 85], axis=0) self._mcmc_chain = chain if verbose: if fit_type == 'mle': if cov_calc_failed: print("Fitted parameters: {}".format(self.model_params)) print("Covariance calculation failed.") else: print(fitted_model.summary()) else: print("Ran chain for {0} iterations".format(chain.iterations)) print("Used {} walkers".format(chain.acceptance_fraction.size)) print("Mean acceptance fraction of {}" .format(np.mean(chain.acceptance_fraction))) print("Parameter values: {}".format(self.model_params)) print("15th to 85th percentile ranges: {}" .format(self.model_stderrs[1] - self.model_stderrs[0]))
@property def model_params(self): ''' Parameters of the fitted model. ''' if hasattr(self, "_model_params"): return self._model_params raise Exception("No model has been fit. Run `fit_pdf` first.") @property def model_stderrs(self): ''' Standard errors of the fitted model. If using an MCMC, the 15th and 85th percentiles are returned. ''' if hasattr(self, "_model_stderrs"): return self._model_stderrs raise Exception("No model has been fit. Run `fit_pdf` first.")
[docs] def corner_plot(self, **kwargs): ''' Create a corner plot from the MCMC. Requires the 'corner' package. Parameters ---------- kwargs : Passed to `~corner.corner`. ''' if not hasattr(self, "_mcmc_chain"): raise Exception("Must run MCMC fitting first.") try: import corner except ImportError: raise ImportError("The optional package 'corner' is not " "installed.") corner.corner(self._mcmc_chain.flatchain, **kwargs)
[docs] def run(self, verbose=False, save_name=None, bins=None, do_fit=True, model=lognorm, **kwargs): ''' Compute the PDF and ECDF. Enabling verbose provides a summary plot. Parameters ---------- verbose : bool, optional Enables plotting of the results. save_name : str,optional Save the figure when a file name is given. bins : list or numpy.ndarray or int, optional Bins to compute the PDF from. Overrides initial bin input. do_fit : bool, optional Enables (by default) fitting a given model. model : scipy.stats distribution, optional Pass any scipy distribution. See `~PDF.fit_pdf`. kwargs : Passed to `~PDF.fit_pdf`. ''' self.make_pdf(bins=bins) self.make_ecdf() if do_fit: self.fit_pdf(model=model, verbose=verbose, **kwargs) if verbose: import matplotlib.pyplot as p if self.normalization_type == "standardize": xlabel = r"z-score" elif self.normalization_type == "center": xlabel = r"$I - \bar{I}$" elif self.normalization_type == "normalize_by_mean": xlabel = r"$I/\bar{I}$" else: xlabel = r"Intensity" # PDF p.subplot(121) p.semilogy(self.bins, self.pdf, 'b-', label='Data') if do_fit: # Plot the fitted model. vals = np.linspace(self.bins[0], self.bins[-1], 1000) p.semilogy(vals, model.pdf(vals, *self.model_params[:-1], scale=self.model_params[-1], loc=0), 'r--', label='Fit') p.legend(loc='best') # else: # p.loglog(self.bins, self.pdf, 'b-') p.grid(True) p.xlabel(xlabel) p.ylabel("PDF") # ECDF ax2 = p.subplot(122) ax2.yaxis.tick_right() ax2.yaxis.set_label_position("right") if self.normalization_type != "None": ax2.plot(self.bins, self.ecdf, 'b-') if do_fit: ax2.plot(vals, model.cdf(vals, *self.model_params[:-1], scale=self.model_params[-1], loc=0), 'r--') else: ax2.semilogx(self.bins, self.ecdf, 'b-') if do_fit: ax2.semilogx(vals, model.cdf(vals, *self.model_params[:-1], scale=self.model_params[-1], loc=0), 'r--') p.grid(True) p.xlabel(xlabel) p.ylabel("ECDF") p.tight_layout() if save_name is not None: p.savefig(save_name) p.close() else: p.show() return self
[docs]class PDF_Distance(object): ''' Calculate the distance between two arrays using their PDFs. Parameters ---------- img1 : %(dtypes)s Array (1-3D). img2 : %(dtypes)s Array (1-3D). min_val1 : float, optional Minimum value to keep in img1 min_val2 : float, optional Minimum value to keep in img2 do_fit : bool, optional Enables fitting a lognormal distribution to each data set. normalization_type : {"normalize", "normalize_by_mean"}, optional See `~turbustat.statistics.stat_utils.data_normalization`. nbins : int, optional Manually set the number of bins to use for creating the PDFs. weights1 : %(dtypes)s, optional Weights to be used with img1 weights2 : %(dtypes)s, optional Weights to be used with img2 ''' __doc__ %= {"dtypes": " or ".join(common_types + twod_types + threed_types)} def __init__(self, img1, img2, min_val1=-np.inf, min_val2=-np.inf, do_fit=True, normalization_type=None, nbins=None, weights1=None, weights2=None): super(PDF_Distance, self).__init__() if do_fit: if normalization_type in ["standardize", "center"]: raise Exception("Cannot perform lognormal fit when using" " 'standardize' or 'center'.") self.normalization_type = normalization_type self.PDF1 = PDF(img1, min_val=min_val1, normalization_type=normalization_type, weights=weights1) self.PDF2 = PDF(img2, min_val=min_val2, normalization_type=normalization_type, weights=weights2) self.bins, self.bin_centers = \ common_histogram_bins(self.PDF1.data, self.PDF2.data, return_centered=True, nbins=nbins) # Feed the common set of bins to be used in the PDFs self._do_fit = do_fit self.PDF1.run(verbose=False, bins=self.bins, do_fit=do_fit) self.PDF2.run(verbose=False, bins=self.bins, do_fit=do_fit)
[docs] def compute_hellinger_distance(self): ''' Computes the Hellinger Distance between the two PDFs. ''' # We're using the same bins, so normalize each to unity to keep the # distance normalized. self.hellinger_distance = \ hellinger(self.PDF1.pdf / self.PDF1.pdf.sum(), self.PDF2.pdf / self.PDF2.pdf.sum())
[docs] def compute_ks_distance(self): ''' Compute the distance using the KS Test. ''' D, p = ks_2samp(self.PDF1.data, self.PDF2.data) self.ks_distance = D self.ks_pval = p
[docs] def compute_ad_distance(self): ''' Compute the distance using the Anderson-Darling Test. ''' raise NotImplementedError( "Use of the Anderson-Darling test has been disabled" " due to occurence of overflow errors.")
# D, _, p = anderson_ksamp([self.PDF1.data, self.PDF2.data]) # self.ad_distance = D # self.ad_pval = p
[docs] def compute_lognormal_distance(self): ''' Compute the combined t-statistic for the difference in the widths of a lognormal distribution. ''' try: self.PDF1.model_params self.PDF2.model_params except AttributeError: raise Exception("Fitting has not been performed. 'do_fit' must " "first be enabled.") diff = np.abs(self.PDF1.model_params[0] - self.PDF2.model_params[0]) denom = np.sqrt(self.PDF1.model_stderrs[0]**2 + self.PDF2.model_stderrs[0]**2) self.lognormal_distance = diff / denom
[docs] def distance_metric(self, statistic='all', verbose=False, label1="Data 1", label2="Data 2", save_name=None): ''' Calculate the distance. *NOTE:* The data are standardized before comparing to ensure the distance is calculated on the same scales. Parameters ---------- statistic : 'all', 'hellinger', 'ks', 'lognormal' Which measure of distance to use. labels : tuple, optional Sets the labels in the output plot. verbose : bool, optional Enables plotting. label1 : str, optional Object or region name for img1 label2 : str, optional Object or region name for img2 save_name : str,optional Save the figure when a file name is given. ''' if statistic is 'all': self.compute_hellinger_distance() self.compute_ks_distance() # self.compute_ad_distance() if self._do_fit: self.compute_lognormal_distance() elif statistic is 'hellinger': self.compute_hellinger_distance() elif statistic is 'ks': self.compute_ks_distance() elif statistic is 'lognormal': if not self._do_fit: raise Exception("Fitting must be enabled to compute the" " lognormal distance.") self.compute_lognormal_distance() # elif statistic is 'ad': # self.compute_ad_distance() else: raise TypeError("statistic must be 'all'," "'hellinger', 'ks', or 'lognormal'.") # "'hellinger', 'ks' or 'ad'.") if verbose: import matplotlib.pyplot as p if self.normalization_type == "standardize": xlabel = r"z-score" elif self.normalization_type == "center": xlabel = r"$I - \bar{I}$" elif self.normalization_type == "normalize_by_mean": xlabel = r"$I/\bar{I}$" else: xlabel = r"Intensity" # Print fit summaries if using fitting if self._do_fit: try: print(self.PDF1._mle_fit.summary()) except ValueError: warn("Covariance calculation failed. Check the fit quality" " for data set 1!") try: print(self.PDF2._mle_fit.summary()) except ValueError: warn("Covariance calculation failed. Check the fit quality" " for data set 2!") # PDF p.subplot(121) p.semilogy(self.bin_centers, self.PDF1.pdf, 'bD-', label=label1) p.semilogy(self.bin_centers, self.PDF2.pdf, 'go-', label=label2) if self._do_fit: # Plot the fitted model. vals = np.linspace(self.bin_centers[0], self.bin_centers[-1], 1000) fit_params1 = self.PDF1.model_params p.semilogy(vals, lognorm.pdf(vals, *fit_params1[:-1], scale=fit_params1[-1], loc=0), 'b-', label='Fit 1') fit_params2 = self.PDF2.model_params p.semilogy(vals, lognorm.pdf(vals, *fit_params2[:-1], scale=fit_params2[-1], loc=0), 'g-', label='Fit 2') p.grid(True) p.xlabel(xlabel) p.ylabel("PDF") # ECDF ax2 = p.subplot(122) ax2.yaxis.tick_right() ax2.yaxis.set_label_position("right") if self.normalization_type is not None: ax2.plot(self.bin_centers, self.PDF1.ecdf, 'bD-') ax2.plot(self.bin_centers, self.PDF2.ecdf, 'go-') if self._do_fit: ax2.plot(vals, lognorm.cdf(vals, *fit_params1[:-1], scale=fit_params1[-1], loc=0), 'b-') ax2.plot(vals, lognorm.cdf(vals, *fit_params2[:-1], scale=fit_params2[-1], loc=0), 'g-') else: ax2.semilogx(self.bin_centers, self.PDF1.ecdf, 'bD-') ax2.semilogx(self.bin_centers, self.PDF2.ecdf, 'go-') if self._do_fit: ax2.semilogx(vals, lognorm.cdf(vals, *fit_params1[:-1], scale=fit_params1[-1], loc=0), 'b-') ax2.semilogx(vals, lognorm.cdf(vals, *fit_params2[:-1], scale=fit_params2[-1], loc=0), 'g-') p.grid(True) p.xlabel(xlabel) p.ylabel("ECDF") p.tight_layout() if save_name is not None: p.savefig(save_name) p.close() else: p.show() return self