Source code for turbustat.statistics.delta_variance.delta_variance

# Licensed under an MIT open source license - see LICENSE

import numpy as np
from astropy.convolution import convolve_fft
from astropy import units as u
from astropy.wcs import WCS
from copy import copy
import statsmodels.api as sm

from ..base_statistic import BaseStatisticMixIn
from ...io import common_types, twod_types, input_data
from ..stats_utils import common_scale, standardize
from ..fitting_utils import check_fit_limits


[docs]class DeltaVariance(BaseStatisticMixIn): """ The delta-variance technique as described in Ossenkopf et al. (2008). Parameters ---------- img : %(dtypes)s The image calculate the delta-variance of. header : FITS header, optional Image header. Required when img is a `~numpy.ndarray`. weights : %(dtypes)s Weights to be used. diam_ratio : float, optional The ratio between the kernel sizes. lags : numpy.ndarray or list, optional The pixel scales to compute the delta-variance at. nlags : int, optional Number of lags to use. Example ------- >>> from turbustat.statistics import DeltaVariance >>> from astropy.io import fits >>> moment0 = fits.open("Design4_21_0_0_flatrho_0021_13co.moment0.fits") # doctest: +SKIP >>> delvar = DeltaVariance(moment0) # doctest: +SKIP >>> delvar.run(verbose=True) # doctest: +SKIP """ __doc__ %= {"dtypes": " or ".join(common_types + twod_types)} def __init__(self, img, header=None, weights=None, diam_ratio=1.5, lags=None, nlags=25): super(DeltaVariance, self).__init__() # Set the data and perform checks self.input_data_header(img, header) self.diam_ratio = diam_ratio if weights is None: self.weights = np.ones(self.data.shape) else: self.weights = input_data(weights, no_header=True) self.nanflag = False if np.isnan(self.data).any() or np.isnan(self.weights).any(): self.nanflag = True if lags is None: min_size = 3.0 self.lags = \ np.logspace(np.log10(min_size), np.log10(min(self.data.shape) / 2.), nlags) * u.pix else: # Check if the given lags are a Quantity # Default to pixel scales if it isn't if not hasattr(lags, "value"): self.lags = lags * u.pix else: self.lags = self.to_pixel(lags) self.convolved_arrays = [] self.convolved_weights = [] @property def weights(self): ''' Array of weights. ''' return self._weights @weights.setter def weights(self, arr): if arr.shape != self.data.shape: raise ValueError("Given weight array does not match the shape of " "the given image.") self._weights = arr
[docs] def do_convolutions(self, allow_huge=False, boundary='wrap'): ''' Perform the convolutions at all lags. Parameters ---------- allow_huge : bool, optional Passed to `~astropy.convolve.convolve_fft`. Allows operations on images larger than 1 Gb. boundary : {"wrap", "fill"}, optional Use "wrap" for periodic boundaries, and "fill" for non-periodic. ''' for i, lag in enumerate(self.lags.value): core = core_kernel(lag, self.data.shape[0], self.data.shape[1]) annulus = annulus_kernel( lag, self.diam_ratio, self.data.shape[0], self.data.shape[1]) if boundary == "wrap": # Don't pad for periodic boundaries pad_weights = self.weights pad_img = self.data * self.weights elif boundary == "fill": # Extend to avoid boundary effects from non-periodicity pad_weights = np.pad(self.weights, int(lag), padwithzeros) pad_img = np.pad(self.data, int(lag), padwithzeros) * \ pad_weights else: raise ValueError("boundary must be 'wrap' or 'fill'. " "Given {}".format(boundary)) img_core = convolve_fft( pad_img, core, normalize_kernel=True, interpolate_nan=self.nanflag, ignore_edge_zeros=True, allow_huge=allow_huge, boundary=boundary) img_annulus = convolve_fft( pad_img, annulus, normalize_kernel=True, interpolate_nan=self.nanflag, ignore_edge_zeros=True, allow_huge=allow_huge, boundary=boundary) weights_core = convolve_fft( pad_weights, core, normalize_kernel=True, interpolate_nan=self.nanflag, ignore_edge_zeros=True, allow_huge=allow_huge, boundary=boundary) weights_annulus = convolve_fft( pad_weights, annulus, normalize_kernel=True, interpolate_nan=self.nanflag, ignore_edge_zeros=True, allow_huge=allow_huge, boundary=boundary) weights_core[np.where(weights_core == 0)] = np.NaN weights_annulus[np.where(weights_annulus == 0)] = np.NaN self.convolved_arrays.append( (img_core / weights_core) - (img_annulus / weights_annulus)) self.convolved_weights.append(weights_core * weights_annulus)
[docs] def compute_deltavar(self): ''' Computes the delta-variance values and errors. ''' self._delta_var = np.empty((len(self.lags))) self._delta_var_error = np.empty((len(self.lags))) for i, (conv_arr, conv_weight, lag) in enumerate(zip(self.convolved_arrays, self.convolved_weights, self.lags.value)): val, err = _delvar(conv_arr, conv_weight, lag) if (val <= 0) or (err <= 0) or np.isnan(val) or np.isnan(err): self._delta_var[i] = np.NaN self._delta_var_error[i] = np.NaN else: self._delta_var[i] = val self._delta_var_error[i] = err
@property def delta_var(self): ''' Delta Variance values. ''' return self._delta_var @property def delta_var_error(self): ''' 1-sigma errors on the Delta variance values. ''' return self._delta_var_error
[docs] def fit_plaw(self, xlow=None, xhigh=None, verbose=False): ''' Fit a power-law to the SCF spectrum. Parameters ---------- xlow : float, optional Lower lag value to consider in the fit. xhigh : float, optional Upper lag value to consider in the fit. verbose : bool, optional Show fit summary when enabled. ''' x = np.log10(self.lags.value) y = np.log10(self.delta_var) if xlow is not None: lower_limit = x >= np.log10(xlow) else: lower_limit = \ np.ones_like(self.delta_var, dtype=bool) if xhigh is not None: upper_limit = x <= np.log10(xhigh) else: upper_limit = \ np.ones_like(self.delta_var, dtype=bool) self._fit_range = [xlow, xhigh] within_limits = np.logical_and(lower_limit, upper_limit) y = y[within_limits] x = x[within_limits] x = sm.add_constant(x) # If the std were computed, use them as weights weighted_fit = True if weighted_fit: # Converting to the log stds doesn't matter since the weights # remain proportional to 1/sigma^2, and an overal normalization is # applied in the fitting routine. weights = self.delta_var_error[within_limits] ** -2 model = sm.WLS(y, x, missing='drop', weights=weights) else: model = sm.OLS(y, x, missing='drop') self.fit = model.fit() if verbose: print(self.fit.summary()) self._slope = self.fit.params[1] self._slope_err = self.fit.bse[1]
@property def slope(self): return self._slope @property def slope_err(self): return self._slope_err @property def fit_range(self): return self._fit_range
[docs] def fitted_model(self, xvals): ''' Computes the fitted power-law in log-log space using the given x values. Parameters ---------- xvals : `~numpy.ndarray` Values of log(lags) to compute the model at (base 10 log). Returns ------- model_values : `~numpy.ndarray` Values of the model at the given values. Equivalent to log values of the SCF spectrum. ''' model_values = self.fit.params[0] + self.fit.params[1] * xvals return model_values
[docs] def run(self, verbose=False, ang_units=False, unit=u.deg, allow_huge=False, boundary='wrap', xlow=None, xhigh=None, save_name=None): ''' Compute the delta-variance. Parameters ---------- verbose : bool, optional Plot delta-variance transform. ang_units : bool, optional Convert frequencies to angular units using the given header. unit : u.Unit, optional Choose the angular unit to convert to when ang_units is enabled. allow_huge : bool, optional See `~DeltaVariance.do_convolutions`. boundary : {"wrap", "fill"}, optional Use "wrap" for periodic boundaries, and "cut" for non-periodic. xlow : float, optional Lower lag value to consider in the fit. xhigh : float, optional Upper lag value to consider in the fit. save_name : str,optional Save the figure when a file name is given. ''' self.do_convolutions(allow_huge=allow_huge, boundary=boundary) self.compute_deltavar() self.fit_plaw(xlow=xlow, xhigh=xhigh, verbose=verbose) if verbose: import matplotlib.pyplot as p ax = p.subplot(111) ax.set_xscale("log", nonposx="clip") ax.set_yscale("log", nonposx="clip") if ang_units: lags = \ self.lags.to(unit, equivalencies=self.angular_equiv).value else: lags = self.lags.value # Check for NaNs fin_vals = np.logical_or(np.isfinite(self.delta_var), np.isfinite(self.delta_var_error)) p.errorbar(lags, self.delta_var[fin_vals], yerr=self.delta_var_error[fin_vals], fmt="bD-", label="Data") xvals = \ np.linspace(self.lags.min().value if xlow is None else xlow, self.lags.max().value if xhigh is None else xhigh, 100) p.plot(xvals, 10**self.fitted_model(np.log10(xvals)), 'r--', linewidth=2, label='Fit') p.legend(loc='best') ax.grid(True) if ang_units: ax.set_xlabel("Lag ({})".format(unit)) else: ax.set_xlabel("Lag (pixels)") ax.set_ylabel(r"$\sigma^{2}_{\Delta}$") if save_name is not None: p.savefig(save_name) p.close() else: p.show() return self
def core_kernel(lag, x_size, y_size): ''' Core Kernel for convolution. Parameters ---------- lag : int Size of the lag. Set the kernel size. x_size : int Grid size to use in the x direction y_size_size : int Grid size to use in the y_size direction Returns ------- kernel : numpy.ndarray Normalized kernel. ''' x, y = np.meshgrid(np.arange(-x_size / 2, x_size / 2 + 1, 1), np.arange(-y_size / 2, y_size / 2 + 1, 1)) kernel = ((4 / np.pi * lag**2)) * \ np.exp(-(x ** 2. + y ** 2.) / (lag / 2.) ** 2.) return kernel / np.sum(kernel) def annulus_kernel(lag, diam_ratio, x_size, y_size): ''' Annulus Kernel for convolution. Parameters ---------- lag : int Size of the lag. Set the kernel size. diam_ratio : float Ratio between kernel diameters. x_size : int Grid size to use in the x direction y_size_size : int Grid size to use in the y_size direction Returns ------- kernel : numpy.ndarray Normalized kernel. ''' x, y = np.meshgrid(np.arange(-x_size / 2, x_size / 2 + 1, 1), np.arange(-y_size / 2, y_size / 2 + 1, 1)) inner = np.exp(-(x ** 2. + y ** 2.) / (lag / 2.) ** 2.) outer = np.exp(-(x ** 2. + y ** 2.) / (diam_ratio * lag / 2.) ** 2.) kernel = 4 / (np.pi * lag**2 * (diam_ratio ** 2. - 1)) * (outer - inner) return kernel / np.sum(kernel) def padwithzeros(vector, pad_width, iaxis, kwargs): ''' Pad array with zeros. ''' vector[:pad_width[0]] = 0 vector[-pad_width[1]:] = 0 return vector
[docs]class DeltaVariance_Distance(object): """ Compares 2 datasets using delta-variance. The distance between them is given by the Euclidean distance between the curves weighted by the bootstrapped errors. Parameters ---------- dataset1 : %(dtypes)s Contains the data and header for one dataset. dataset2 : %(dtypes)s See above. weights1 : %(dtypes)s Weights for dataset1. weights2 : %(dtypes)s See above. diam_ratio : float, optional The ratio between the kernel sizes. lags : numpy.ndarray or list, optional The pixel scales to compute the delta-variance at. fiducial_model : DeltaVariance A computed DeltaVariance model. Used to avoid recomputing. ang_units : bool, optional Convert frequencies to angular units using the given header. xlow : float or np.ndarray, optional The lower lag fitting limit. An array with 2 elements can be passed to give separate lower limits for the datasets. xhigh : float or np.ndarray, optional The upper lag fitting limit. See `xlow` above. boundary : str, np.ndarray or list, optional Set how boundaries should be handled. If a string is not passed, a two element list/array with separate boundary conditions is expected. """ __doc__ %= {"dtypes": " or ".join(common_types + twod_types)} def __init__(self, dataset1, dataset2, weights1=None, weights2=None, diam_ratio=1.5, lags=None, fiducial_model=None, xlow=None, xhigh=None, boundary='wrap'): super(DeltaVariance_Distance, self).__init__() dataset1 = copy(input_data(dataset1, no_header=False)) dataset2 = copy(input_data(dataset2, no_header=False)) # Enforce standardization on both datasets. Values for the # delta-variance then represents a relative fraction of structure on # different scales. # dataset1[0] = standardize(dataset1[0]) # dataset2[0] = standardize(dataset2[0]) # Create a default set of lags, in pixels if lags is None: min_size = 3.0 nlags = 25 shape1 = dataset1[0].shape shape2 = dataset2[0].shape if min(shape1) > min(shape2): lags = \ np.logspace(np.log10(min_size), np.log10(min(shape2) / 2.), nlags) * u.pix else: lags = \ np.logspace(np.log10(min_size), np.log10(min(shape1) / 2.), nlags) * u.pix # Check the limits are given in an understandable form. # The returned xlow and xhigh are arrays. xlow, xhigh = check_fit_limits(xlow, xhigh) # Allow separate boundary conditions to be passed if isinstance(boundary, basestring): boundary = [boundary] * 2 else: if not len(boundary) == 2: raise ValueError("boundary must be a two-element list/array" " when a string is not passed.") # Now adjust the lags such they have a common scaling when the datasets # are not on a common grid. scale = common_scale(WCS(dataset1[1]), WCS(dataset2[1])) if scale == 1.0: lags1 = lags lags2 = lags elif scale > 1.0: lags1 = scale * lags lags2 = lags else: lags1 = lags lags2 = lags / float(scale) if fiducial_model is not None: self.delvar1 = fiducial_model else: self.delvar1 = DeltaVariance(dataset1, weights=weights1, diam_ratio=diam_ratio, lags=lags1) self.delvar1.run(xlow=xlow[0], xhigh=xhigh[0], boundary=boundary[0]) self.delvar2 = DeltaVariance(dataset2, weights=weights2, diam_ratio=diam_ratio, lags=lags2) self.delvar2.run(xlow=xlow[1], xhigh=xhigh[1], boundary=boundary[1])
[docs] def distance_metric(self, verbose=False, label1=None, label2=None, ang_units=False, unit=u.deg, save_name=None): ''' Applies the Euclidean distance to the delta-variance curves. Parameters ---------- verbose : bool, optional Enables plotting. label1 : str, optional Object or region name for dataset1 label2 : str, optional Object or region name for dataset2 ang_units : bool, optional Convert frequencies to angular units using the given header. unit : u.Unit, optional Choose the angular unit to convert to when ang_units is enabled. save_name : str,optional Save the figure when a file name is given. ''' # Check for NaNs and negatives nans1 = np.logical_or(np.isnan(self.delvar1.delta_var), self.delvar1.delta_var <= 0.0) nans2 = np.logical_or(np.isnan(self.delvar2.delta_var), self.delvar2.delta_var <= 0.0) all_nans = np.logical_or(nans1, nans2) deltavar1_sum = np.sum(self.delvar1.delta_var[~all_nans]) deltavar1 = np.log10(self.delvar1.delta_var[~all_nans] / deltavar1_sum) deltavar2_sum = np.sum(self.delvar2.delta_var[~all_nans]) deltavar2 = np.log10(self.delvar2.delta_var[~all_nans] / deltavar2_sum) # Distance between two normalized curves self.curve_distance = np.linalg.norm(deltavar1 - deltavar2) # Distance between the fitted slopes (combined t-statistic) self.slope_distance = \ np.abs(self.delvar1.slope - self.delvar2.slope) / \ np.sqrt(self.delvar1.slope_err**2 + self.delvar2.slope_err**2) if verbose: import matplotlib.pyplot as p ax = p.subplot(111) ax.set_xscale("log", nonposx="clip") ax.set_yscale("log", nonposx="clip") if ang_units: ang_equiv1 = self.delvar1.angular_equiv ang_equiv2 = self.delvar2.angular_equiv lags1 = \ self.delvar1.lags.to(unit, equivalencies=ang_equiv1).value lags2 = \ self.delvar2.lags.to(unit, equivalencies=ang_equiv2).value else: lags1 = self.delvar1.lags.value lags2 = self.delvar2.lags.value lags1 = lags1[~all_nans] lags2 = lags2[~all_nans] # Normalize the errors for when plotting. NOT log-scaled. deltavar1_err = \ self.delvar1.delta_var_error[~all_nans] / deltavar1_sum deltavar2_err = \ self.delvar2.delta_var_error[~all_nans] / deltavar2_sum p.errorbar(lags1, 10**deltavar1, yerr=deltavar1_err, fmt="bD-", label=label1) p.errorbar(lags2, 10**deltavar2, yerr=deltavar2_err, fmt="go-", label=label2) lims1 = self.delvar1.fit_range xvals1 = \ np.linspace(self.delvar1.lags.min().value if lims1[0] is None else lims1[0], self.delvar1.lags.max().value if lims1[1] is None else lims1[1], 100) lims2 = self.delvar2.fit_range xvals2 = \ np.linspace(self.delvar2.lags.min().value if lims2[0] is None else lims2[0], self.delvar2.lags.max().value if lims2[1] is None else lims2[1], 100) if ang_units: xvals1_conv = xvals1.to(unit, equivalencies=ang_equiv1).value xvals2_conv = xvals2.to(unit, equivalencies=ang_equiv2).value else: xvals1_conv = xvals1 xvals2_conv = xvals2 p.plot(xvals1_conv, 10**self.delvar1.fitted_model(np.log10(xvals1)) / deltavar1_sum, 'b--', linewidth=8, alpha=0.75) p.plot(xvals2_conv, 10**self.delvar2.fitted_model(np.log10(xvals2)) / deltavar2_sum, 'g--', linewidth=8, alpha=0.75) # Vertical lines to indicate fit region p.axvline(self.delvar1.lags.min().value if lims1[0] is None else lims1[0], color='b', alpha=0.5, linestyle='-') p.axvline(self.delvar1.lags.max().value if lims1[1] is None else lims1[1], color='b', alpha=0.5, linestyle='-') p.axvline(self.delvar2.lags.min().value if lims2[0] is None else lims2[0], color='g', alpha=0.5, linestyle='-') p.axvline(self.delvar2.lags.max().value if lims2[1] is None else lims2[1], color='g', alpha=0.5, linestyle='-') ax.legend(loc='best') ax.grid(True) if ang_units: ax.set_xlabel("Lag ({})".format(unit)) else: ax.set_xlabel("Lag (pixels)") ax.set_ylabel(r"$\sigma^{2}_{\Delta}$") if save_name is not None: p.savefig(save_name) p.close() else: p.show() return self
def _delvar(array, weight, lag): ''' Computes the delta variance of the given array. ''' arr_cent = array.copy() - np.nanmean(array, axis=None) val = np.nansum(arr_cent ** 2. * weight) /\ np.nansum(weight) # The error needs to be normalized by the number of independent # pixels in the array. # Take width to be 1/2 FWHM. Note that lag is defined as 2*sigma. # So 2ln(2) sigma^2 = ln(2)/2 * lag^2 kern_area = np.ceil(0.5 * np.pi * np.log(2) * lag**2).astype(int) nindep = np.sqrt(np.isfinite(arr_cent).sum() / kern_area) val_err = np.sqrt((np.nansum(arr_cent ** 4. * weight) / np.nansum(weight)) - val**2) / nindep return val, val_err