Source code for turbustat.statistics.delta_variance.delta_variance

# Licensed under an MIT open source license - see LICENSE
from __future__ import (print_function, absolute_import, division,
                        unicode_literals)

import numpy as np
from astropy import units as u
from astropy.wcs import WCS
from copy import copy
import statsmodels.api as sm
from warnings import warn
from astropy.utils.console import ProgressBar

from ..base_statistic import BaseStatisticMixIn
from ...io import common_types, twod_types, input_data
from ..stats_utils import common_scale, padwithzeros
from ..fitting_utils import check_fit_limits, residual_bootstrap
from .kernels import core_kernel, annulus_kernel
from ..stats_warnings import TurbuStatMetricWarning
from ..lm_seg import Lm_Seg
from ..convolve_wrapper import convolution_wrapper


[docs] class DeltaVariance(BaseStatisticMixIn): """ The delta-variance technique as described in Ossenkopf et al. (2008). Parameters ---------- img : %(dtypes)s The image calculate the delta-variance of. header : FITS header, optional Image header. Required when img is a `~numpy.ndarray`. weights : %(dtypes)s Weights to be used. diam_ratio : float, optional The ratio between the kernel sizes. lags : numpy.ndarray or list, optional The pixel scales to compute the delta-variance at. nlags : int, optional Number of lags to use. distance : `~astropy.units.Quantity`, optional Physical distance to the region in the data. Examples -------- >>> from turbustat.statistics import DeltaVariance >>> from astropy.io import fits >>> moment0 = fits.open("2D.fits") # doctest: +SKIP >>> delvar = DeltaVariance(moment0) # doctest: +SKIP >>> delvar.run(verbose=True) # doctest: +SKIP """ __doc__ %= {"dtypes": " or ".join(common_types + twod_types)} def __init__(self, img, header=None, weights=None, diam_ratio=1.5, lags=None, nlags=25, distance=None): super(DeltaVariance, self).__init__() # Set the data and perform checks self.input_data_header(img, header) self.diam_ratio = diam_ratio if weights is None: # self.weights = np.ones(self.data.shape) self.weights = np.isfinite(self.data).astype(float) else: self.weights = input_data(weights, no_header=True) if distance is not None: self.distance = distance if lags is None: min_size = 3.0 self.lags = \ np.logspace(np.log10(min_size), np.log10(min(self.data.shape) / 2.), nlags) * u.pix else: # Check if the given lags are a Quantity # Default to pixel scales if it isn't if not hasattr(lags, "value"): self.lags = lags * u.pix else: self.lags = self._to_pixel(lags) self._convolved_arrays = [] self._convolved_weights = [] @property def lags(self): ''' Lag values. ''' return self._lags @lags.setter def lags(self, values): if not isinstance(values, u.Quantity): raise TypeError("lags must be given as an astropy.units.Quantity.") pix_lags = self._to_pixel(values) if np.any(pix_lags.value < 1): raise ValueError("At least one of the lags is smaller than one " "pixel. Remove these lags from the array.") # Catch floating point issues in comparing to half the image shape half_comp = (np.floor(pix_lags.value) - min(self.data.shape) / 2.) if np.any(half_comp > 1e-10): raise ValueError("At least one of the lags is larger than half of" " the image size. Remove these lags from the " "array.") self._lags = values @property def weights(self): ''' Array of weights. ''' return self._weights @weights.setter def weights(self, arr): if arr.shape != self.data.shape: raise ValueError("Given weight array does not match the shape of " "the given image.") self._weights = arr
[docs] def compute_deltavar(self, allow_huge=False, boundary='wrap', min_weight_frac=0.01, nan_treatment='fill', preserve_nan=False, use_pyfftw=False, threads=1, pyfftw_kwargs={}, show_progress=True, keep_convolve_arrays=False): ''' Perform the convolution and calculate the delta variance at all lags. Parameters ---------- allow_huge : bool, optional Passed to `~astropy.convolve.convolve_fft`. Allows operations on images larger than 1 Gb. boundary : {"wrap", "fill"}, optional Use "wrap" for periodic boundaries, and "fill" for non-periodic. min_weight_frac : float, optional Set the fraction of the peak of the weight array to mask below. Default is 0.01. This will remove most edge artifacts, but is not guaranteed to! Increase this value if artifacts are encountered (this typically results in large spikes in the delta-variance curve). nan_treatment : {'interpolate', 'fill'}, optional Enable to interpolate over NaNs in the convolution. Default is 'fill'. use_pyfftw : bool, optional Enable to use pyfftw, if it is installed. threads : int, optional Number of threads to use in FFT when using pyfftw. pyfftw_kwargs : Passed to See `here <http://hgomersall.github.io/pyFFTW/pyfftw/builders/builders.html>`_ for a list of accepted kwargs. show_progress : bool, optional Show a progress bar while convolving the image at each lag. keep_convolve_arrays : bool, optional Keeps the convolved arrays at each lag. Disabled by default to minimize memory usage. ''' self._delta_var = np.empty((len(self.lags))) self._delta_var_error = np.empty((len(self.lags))) if show_progress: bar = ProgressBar(len(self.lags)) for i, lag in enumerate(self.lags.value): core = core_kernel(lag, self.data.shape[0], self.data.shape[1]) annulus = annulus_kernel(lag, self.diam_ratio, self.data.shape[0], self.data.shape[1]) if boundary == "wrap": # Don't pad for periodic boundaries pad_weights = self.weights pad_img = self.data * self.weights elif boundary == "fill": # Extend to avoid boundary effects from non-periodicity pad_weights = np.pad(self.weights, int(lag), padwithzeros) pad_img = np.pad(self.data, int(lag), padwithzeros) * \ pad_weights else: raise ValueError("boundary must be 'wrap' or 'fill'. " "Given {}".format(boundary)) img_core = \ convolution_wrapper(pad_img, core, boundary=boundary, fill_value=0. if nan_treatment=='fill' else np.NaN, allow_huge=allow_huge, nan_treatment=nan_treatment, use_pyfftw=use_pyfftw, threads=threads, pyfftw_kwargs=pyfftw_kwargs) img_annulus = \ convolution_wrapper(pad_img, annulus, boundary=boundary, fill_value=0. if nan_treatment=='fill' else np.NaN, allow_huge=allow_huge, nan_treatment=nan_treatment, use_pyfftw=use_pyfftw, threads=threads, pyfftw_kwargs=pyfftw_kwargs) weights_core = \ convolution_wrapper(pad_weights, core, boundary=boundary, fill_value=0. if nan_treatment=='fill' else np.NaN, allow_huge=allow_huge, nan_treatment=nan_treatment, use_pyfftw=use_pyfftw, threads=threads, pyfftw_kwargs=pyfftw_kwargs) weights_annulus = \ convolution_wrapper(pad_weights, annulus, boundary=boundary, fill_value=0. if nan_treatment=='fill' else np.NaN, allow_huge=allow_huge, nan_treatment=nan_treatment, use_pyfftw=use_pyfftw, threads=threads, pyfftw_kwargs=pyfftw_kwargs) cutoff_val = min_weight_frac * self.weights.max() weights_core[np.where(weights_core <= cutoff_val)] = np.NaN weights_annulus[np.where(weights_annulus <= cutoff_val)] = np.NaN conv_arr = (img_core / weights_core) - \ (img_annulus / weights_annulus) conv_weight = weights_core * weights_annulus if preserve_nan: conv_arr[np.isnan(pad_img)] = np.NaN if keep_convolve_arrays: self._convolved_arrays.append(conv_arr) self._convolved_weights.append(weights_core * weights_annulus) val, err = _delvar(conv_arr, conv_weight, lag) if (val <= 0) or (err <= 0) or np.isnan(val) or np.isnan(err): self._delta_var[i] = np.NaN self._delta_var_error[i] = np.NaN else: self._delta_var[i] = val self._delta_var_error[i] = err if show_progress: bar.update(i + 1)
@property def convolve_arrays(self): if len(self._convolved_arrays) == 0: warn("Run `DeltaVariance.compute_deltavar` with " "`keep_convolve_arrays=True`") return self._convolve_arrays @property def convolve_weights(self): if len(self._convolved_weights) == 0: warn("Run `DeltaVariance.compute_deltavar` with " "`keep_convolve_arrays=True`") return self._convolve_arrays @property def delta_var(self): ''' Delta Variance values. ''' return self._delta_var @property def delta_var_error(self): ''' 1-sigma errors on the Delta variance values. ''' return self._delta_var_error
[docs] def fit_plaw(self, xlow=None, xhigh=None, brk=None, verbose=False, bootstrap=False, bootstrap_kwargs={}, **fit_kwargs): ''' Fit a power-law to the Delta-variance spectrum. Parameters ---------- xlow : `~astropy.units.Quantity`, optional Lower lag value to consider in the fit. xhigh : `~astropy.units.Quantity`, optional Upper lag value to consider in the fit. brk : `~astropy.units.Quantity`, optional Give an initial guess for a break point. This enables fitting with a `turbustat.statistics.Lm_Seg`. bootstrap : bool, optional Bootstrap using the model residuals to estimate the standard errors. bootstrap_kwargs : dict, optional Pass keyword arguments to `~turbustat.statistics.fitting_utils.residual_bootstrap`. verbose : bool, optional Show fit summary when enabled. ''' x = np.log10(self.lags.value) y = np.log10(self.delta_var) if xlow is not None: xlow = self._to_pixel(xlow) lower_limit = x >= np.log10(xlow.value) else: lower_limit = \ np.ones_like(self.delta_var, dtype=bool) xlow = self.lags.min() * 0.99 if xhigh is not None: xhigh = self._to_pixel(xhigh) upper_limit = x <= np.log10(xhigh.value) else: upper_limit = \ np.ones_like(self.delta_var, dtype=bool) xhigh = self.lags.max() * 1.01 self._fit_range = [xlow, xhigh] within_limits = np.logical_and(lower_limit, upper_limit) y = y[within_limits] x = x[within_limits] weights = self.delta_var_error[within_limits] ** -2 min_fits_pts = 3 if brk is not None: # Try fitting a segmented model pix_brk = self._to_pixel(brk) if pix_brk < xlow or pix_brk > xhigh: raise ValueError("brk must be within xlow and xhigh.") model = Lm_Seg(x, y, np.log10(pix_brk.value), weights=weights) fit_kwargs['verbose'] = verbose fit_kwargs['cov_type'] = 'HC3' model.fit_model(**fit_kwargs) self.fit = model.fit if model.params.size == 5: # Check to make sure this leaves enough to fit to. if sum(x < model.brk) < min_fits_pts: warn("Not enough points to fit to." + " Ignoring break.") self._brk = None else: good_pts = x.copy() < model.brk x = x[good_pts] y = y[good_pts] self._brk = 10**model.brk * u.pix self._slope = model.slopes if bootstrap: stderrs = residual_bootstrap(model.fit, **bootstrap_kwargs) self._slope_err = stderrs[1:-1] self._brk_err = np.log(10) * self.brk.value * \ stderrs[-1] * u.pix else: self._slope_err = model.slope_errs self._brk_err = np.log(10) * self.brk.value * \ model.brk_err * u.pix self.fit = model.fit else: self._brk = None # Break fit failed, revert to normal model warn("Model with break failed, reverting to model\ without break.") else: self._brk = None # Revert to model without break if none is given, or if the segmented # model failed. if self.brk is None: x = sm.add_constant(x) # model = sm.OLS(y, x, missing='drop') model = sm.WLS(y, x, missing='drop', weights=weights) self.fit = model.fit(cov_type='HC3') self._slope = self.fit.params[1] if bootstrap: stderrs = residual_bootstrap(self.fit, **bootstrap_kwargs) self._slope_err = stderrs[1] else: self._slope_err = self.fit.bse[1] self._bootstrap_flag = bootstrap if verbose: print(self.fit.summary()) if self._bootstrap_flag: print("Bootstrapping used to find stderrs! " "Errors may not equal those shown above.") self._model = model
@property def brk(self): ''' Fitted break point. ''' return self._brk @property def brk_err(self): ''' 1-sigma on the break point in the segmented linear model. ''' return self._brk_err @property def slope(self): ''' Fitted slope. ''' return self._slope @property def slope_err(self): ''' Standard error on the fitted slope. ''' return self._slope_err @property def fit_range(self): ''' Range of lags used in the fit. ''' return self._fit_range
[docs] def fitted_model(self, xvals): ''' Computes the fitted power-law in log-log space using the given x values. Parameters ---------- xvals : `~numpy.ndarray` Values of log(lags) to compute the model at (base 10 log). Returns ------- model_values : `~numpy.ndarray` Values of the model at the given values. ''' if isinstance(self._model, Lm_Seg): return self._model.model(xvals) else: return self.fit.params[0] + self.fit.params[1] * xvals
[docs] def plot_fit(self, save_name=None, xunit=u.pix, symbol='o', color='r', fit_color='k', label=None, show_residual=True): ''' Plot the delta-variance curve and the fit. Parameters ---------- save_name : str,optional Save the figure when a file name is given. xunit : u.Unit, optional The unit to show the x-axis in. symbol : str, optional Shape to plot the data points with. color : {str, RGB tuple}, optional Color to show the delta-variance curve in. fit_color : {str, RGB tuple}, optional Color of the fitted line. Defaults to `color` when no input is given. label : str, optional Label to later be used in a legend. show_residual : bool, optional Plot the fit residuals. ''' if fit_color is None: fit_color = color import matplotlib.pyplot as plt fig = plt.gcf() axes = plt.gcf().get_axes() if len(axes) == 0: if show_residual: ax = plt.subplot2grid((4, 1), (0, 0), colspan=1, rowspan=3) ax_r = plt.subplot2grid((4, 1), (3, 0), colspan=1, rowspan=1, sharex=ax) else: ax = plt.subplot(111) elif len(axes) == 1: ax = axes[0] else: ax = axes[0] ax_r = axes[1] ax.set_xscale("log") ax.set_yscale("log") lags = self._spatial_unit_conversion(self.lags, xunit).value # Check for NaNs fin_vals = np.logical_or(np.isfinite(self.delta_var), np.isfinite(self.delta_var_error)) ax.errorbar(lags[fin_vals], self.delta_var[fin_vals], yerr=self.delta_var_error[fin_vals], fmt="{}-".format(symbol), color=color, label=label, zorder=-1) xvals = np.linspace(self._fit_range[0].value, self._fit_range[1].value, 100) * self.lags.unit xvals_conv = self._spatial_unit_conversion(xvals, xunit).value ax.plot(xvals_conv, 10**self.fitted_model(np.log10(xvals.value)), '--', color=fit_color, linewidth=2) xlow = \ self._spatial_unit_conversion(self._fit_range[0], xunit).value xhigh = \ self._spatial_unit_conversion(self._fit_range[1], xunit).value ax.axvline(xlow, color=color, alpha=0.5, linestyle='-.') ax.axvline(xhigh, color=color, alpha=0.5, linestyle='-.') # ax.legend(loc='best') ax.grid(True) if show_residual: resids = self.delta_var - 10**self.fitted_model(np.log10(lags)) ax_r.errorbar(lags[fin_vals], resids[fin_vals], yerr=self.delta_var_error[fin_vals], fmt="{}-".format(symbol), color=color, zorder=-1) ax_r.set_ylabel("Residuals") ax_r.set_xlabel("Lag ({})".format(xunit)) ax_r.axhline(0., color=fit_color, linestyle='--') ax_r.axvline(xlow, color=color, alpha=0.5, linestyle='-.') ax_r.axvline(xhigh, color=color, alpha=0.5, linestyle='-.') ax_r.grid() plt.setp(ax.get_xticklabels(), visible=False) else: ax.set_xlabel("Lag ({})".format(xunit)) ax.set_ylabel(r"$\sigma^{2}_{\Delta}$") plt.tight_layout() fig.subplots_adjust(hspace=0.1) if save_name is not None: plt.savefig(save_name) plt.close() else: plt.show()
[docs] def run(self, show_progress=True, verbose=False, xunit=u.pix, nan_treatment='fill', preserve_nan=False, allow_huge=False, boundary='wrap', use_pyfftw=False, threads=1, pyfftw_kwargs={}, xlow=None, xhigh=None, brk=None, fit_kwargs={}, save_name=None): ''' Compute the delta-variance. Parameters ---------- show_progress : bool, optional Show a progress bar during the creation of the covariance matrix. verbose : bool, optional Plot delta-variance transform. xunit : u.Unit, optional The unit to show the x-axis in. allow_huge : bool, optional See `~DeltaVariance.do_convolutions`. nan_treatment : bool, optional Enable to interpolate over NaNs in the convolution. Default is True. boundary : {"wrap", "fill"}, optional Use "wrap" for periodic boundaries, and "cut" for non-periodic. use_pyfftw : bool, optional Enable to use pyfftw, if it is installed. threads : int, optional Number of threads to use in FFT when using pyfftw. pyfftw_kwargs : Passed to See `here <http://hgomersall.github.io/pyFFTW/pyfftw/builders/builders.html>`_ for a list of accepted kwargs. xlow : `~astropy.units.Quantity`, optional Lower lag value to consider in the fit. xhigh : `~astropy.units.Quantity`, optional Upper lag value to consider in the fit. brk : `~astropy.units.Quantity`, optional Give an initial break point guess. Enables fitting a segmented linear model. fit_kwargs : dict, optional Passed to `~turbustat.statistics.lm_seg.Lm_Seg.fit_model` when using a broken linear fit. save_name : str,optional Save the figure when a file name is given. ''' self.compute_deltavar(allow_huge=allow_huge, boundary=boundary, nan_treatment=nan_treatment, preserve_nan=preserve_nan, use_pyfftw=use_pyfftw, threads=threads, pyfftw_kwargs=pyfftw_kwargs, show_progress=show_progress) self.fit_plaw(xlow=xlow, xhigh=xhigh, brk=brk, verbose=verbose, **fit_kwargs) if verbose: self.plot_fit(save_name=save_name, xunit=xunit) return self
[docs] class DeltaVariance_Distance(object): """ Compares 2 datasets using delta-variance. The distance between them is given by the Euclidean distance between the curves weighted by the bootstrapped errors. .. note:: When passing a computed `~DeltaVariance` class for `dataset1` or `dataset2`, it may be necessary to recompute the delta-variance if `use_common_lags=True` and the existing lags do not match the common lags. Parameters ---------- dataset1 : %(dtypes)s or `~DeltaVariance` class Contains the data and header for one dataset. Or pass a `~DeltaVariance` class that may be pre-computed. dataset2 : %(dtypes)s or `~DeltaVariance` class See `dataset1` above. weights1 : %(dtypes)s Weights for dataset1. weights2 : %(dtypes)s See above. diam_ratio : float, optional The ratio between the kernel sizes. lags : numpy.ndarray or list, optional The pixel scales to compute the delta-variance at. lags2 : numpy.ndarray or list, optional The pixel scales for the delta-variance of `dataset2`. Ignored if `use_common_lags=True`. use_common_lags : bool, optional Use a set of common lags that have the same angular sizes for both datasets. This is required for `DeltaVariance_Distance.curve_distance` metric. delvar_kwargs : dict, optional Pass kwargs to `~DeltaVariance.run`. delvar2_kwargs : dict, optional Pass kwargs to `~DeltaVariance.run` for `dataset2`. When `None` is given, the kwargs in `delvar_kwargs` are used for both datasets. """ __doc__ %= {"dtypes": " or ".join(common_types + twod_types)} def __init__(self, dataset1, dataset2, weights1=None, weights2=None, diam_ratio=1.5, lags=None, use_common_lags=True, delvar_kwargs={}, delvar2_kwargs=None): super(DeltaVariance_Distance, self).__init__() if isinstance(dataset1, DeltaVariance): _given_data1 = False self.delvar1 = dataset1 else: _given_data1 = True dataset1 = copy(input_data(dataset1, no_header=False)) if isinstance(dataset1, DeltaVariance): _given_data2 = False self.delvar2 = dataset2 else: _given_data2 = True dataset2 = copy(input_data(dataset2, no_header=False)) self._common_lags = use_common_lags # Create a default set of lags, in pixels if use_common_lags: if lags is None: min_size = 3.0 nlags = 25 if _given_data1: shape1 = dataset1[0].shape else: shape1 = self.delvar1.data.shape if _given_data2: shape2 = dataset2[0].shape else: shape2 = self.delvar2.data.shape if min(shape1) > min(shape2): lags = \ np.logspace(np.log10(min_size), np.log10(min(shape2) / 2.), nlags) * u.pix else: lags = \ np.logspace(np.log10(min_size), np.log10(min(shape1) / 2.), nlags) * u.pix # Now adjust the lags such they have a common scaling when the # datasets are not on a common grid. if _given_data1: wcs1 = WCS(dataset1[1]) else: wcs1 = self.delvar1._wcs if _given_data2: wcs2 = WCS(dataset2[1]) else: wcs2 = self.delvar2._wcs scale = common_scale(wcs1, wcs2) if scale == 1.0: lags1 = lags lags2 = lags elif scale > 1.0: lags1 = scale * lags lags2 = lags else: lags1 = lags lags2 = lags / float(scale) else: if lags2 is None and lags is not None: lags2 = lags if lags is not None: lags1 = lags else: lags1 = None # if fiducial_model is not None: # self.delvar1 = fiducial_model if _given_data1: self.delvar1 = DeltaVariance(dataset1, weights=weights1, diam_ratio=diam_ratio, lags=lags1) self.delvar1.run(**delvar_kwargs) else: # Check if we need to re-run the statistic if the lags are wrong. if lags1 is not None: if not (self.delvar1.lags == lags1).all(): self.delvar1.run(**delvar_kwargs) if not hasattr(self.delvar1, "_slope"): warn("DeltaVariance given as dataset1 does not have a fitted" " slope. Re-running delta variance.") if lags1 is not None: self.delvar1._lags = lags1 self.delvar1.run(**delvar_kwargs) if delvar2_kwargs is None: delvar2_kwargs = delvar_kwargs if _given_data2: self.delvar2 = DeltaVariance(dataset2, weights=weights2, diam_ratio=diam_ratio, lags=lags2) self.delvar2.run(**delvar2_kwargs) else: if lags2 is not None: if not (self.delvar2.lags == lags2).all(): self.delvar2.run(**delvar2_kwargs) if not hasattr(self.delvar2, "_slope"): warn("DeltaVariance given as dataset2 does not have a fitted" " slope. Re-running delta variance.") if lags2 is not None: self.delvar2._lags = lags2 self.delvar2.run(**delvar_kwargs) @property def curve_distance(self): ''' The L2 norm between the delta-variance curves. ''' return self._curve_distance @property def slope_distance(self): ''' The t-statistic of the difference in the delta-variance slopes. ''' return self._slope_distance
[docs] def distance_metric(self, verbose=False, xunit=u.pix, save_name=None, plot_kwargs1={}, plot_kwargs2={}): ''' Applies the Euclidean distance to the delta-variance curves. Parameters ---------- verbose : bool, optional Enables plotting. xunit : `~astropy.units.Unit`, optional Unit of the x-axis in the plot in pixel, angular, or physical units. save_name : str, optional Name of the save file. Enables saving the figure. plot_kwargs1 : dict, optional Pass kwargs to `~turbustat.statistics.DeltaVariance.plot_fit` for `dataset1`. plot_kwargs2 : dict, optional Pass kwargs to `~turbustat.statistics.DeltaVariance.plot_fit` for `dataset2`. ''' # curve distance is only defined if the delta-variance is measured at # the same lags if self._common_lags: # Check for NaNs and negatives nans1 = np.logical_or(np.isnan(self.delvar1.delta_var), self.delvar1.delta_var <= 0.0) nans2 = np.logical_or(np.isnan(self.delvar2.delta_var), self.delvar2.delta_var <= 0.0) all_nans = np.logical_or(nans1, nans2) # Cut the curves at the specified xlow and xhigh points fit_range1 = self.delvar1.fit_range fit_range2 = self.delvar2.fit_range # The curve metric only makes sense if the same range is used for # both check_range = fit_range1[0] == fit_range2[0] and \ fit_range1[1] == fit_range2[1] if check_range: # Lags are always in pixels. As are the limits cuts1 = np.logical_and(self.delvar1.lags >= fit_range1[0], self.delvar1.lags <= fit_range1[1]) cuts2 = np.logical_and(self.delvar2.lags >= fit_range2[0], self.delvar2.lags <= fit_range2[1]) valids1 = np.logical_and(cuts1, ~all_nans) valids2 = np.logical_and(cuts2, ~all_nans) deltavar1_sum = np.sum(self.delvar1.delta_var[valids1]) deltavar1 = \ np.log10(self.delvar1.delta_var[valids1] / deltavar1_sum) deltavar2_sum = np.sum(self.delvar2.delta_var[valids2]) deltavar2 = \ np.log10(self.delvar2.delta_var[valids2] / deltavar2_sum) # Distance between two normalized curves self._curve_distance = np.linalg.norm(deltavar1 - deltavar2) else: warn("The curve distance is only defined when the fit " "range and lags for both datasets are equal. " "Setting curve_distance to NaN.", TurbuStatMetricWarning) self._curve_distance = np.NaN else: self._curve_distance = np.NaN # Distance between the fitted slopes (combined t-statistic) self._slope_distance = \ np.abs(self.delvar1.slope - self.delvar2.slope) / \ np.sqrt(self.delvar1.slope_err**2 + self.delvar2.slope_err**2) if verbose: import matplotlib.pyplot as plt print(self.delvar1.fit.summary()) print(self.delvar2.fit.summary()) defaults1 = {'color': 'b', 'symbol': 'D', 'label': '1'} defaults2 = {'color': 'g', 'symbol': 'o', 'label': '2'} for key in defaults1: if key not in plot_kwargs1: plot_kwargs1[key] = defaults1[key] for key in defaults2: if key not in plot_kwargs2: plot_kwargs2[key] = defaults2[key] if 'xunit' in plot_kwargs1: del plot_kwargs1['xunit'] if 'xunit' in plot_kwargs2: del plot_kwargs2['xunit'] self.delvar1.plot_fit(xunit=xunit, **plot_kwargs1) self.delvar2.plot_fit(xunit=xunit, **plot_kwargs2) axes = plt.gcf().get_axes() axes[0].legend(loc='best', frameon=True) if save_name is not None: plt.savefig(save_name) plt.close() else: plt.show() return self
def _delvar(array, weight, lag): ''' Computes the delta variance of the given array. ''' arr_cent = array.copy() - np.nanmean(array, axis=None) val = np.nansum(arr_cent ** 2. * weight) /\ np.nansum(weight) # The error needs to be normalized by the number of independent # pixels in the array. # Take width to be 1/2 FWHM. Note that lag is defined as 2*sigma. # So 2ln(2) sigma^2 = ln(2)/2 * lag^2 kern_area = np.ceil(0.5 * np.pi * np.log(2) * lag**2).astype(int) nindep = np.sqrt(np.isfinite(arr_cent).sum() // kern_area) val_err = np.sqrt((np.nansum(arr_cent ** 4. * weight) / np.nansum(weight)) - val**2) / nindep return val, val_err