Source code for turbustat.statistics.scf.scf

# Licensed under an MIT open source license - see LICENSE
from __future__ import print_function, absolute_import, division

import numpy as np
from astropy import units as u
from astropy.wcs import WCS
from six import string_types
import statsmodels.api as sm
from warnings import warn
from astropy.utils.console import ProgressBar
from itertools import product

from ..psds import pspec, make_radial_arrays
from ..base_statistic import BaseStatisticMixIn
from ...io import common_types, threed_types, input_data
from ..stats_utils import common_scale, fourier_shift, pixel_shift
from ..fitting_utils import clip_func, residual_bootstrap
from ..elliptical_powerlaw import (fit_elliptical_powerlaw,
                                   inverse_interval_transform,
                                   inverse_interval_transform_stderr)
from ..stats_warnings import TurbuStatMetricWarning


[docs] class SCF(BaseStatisticMixIn): ''' Computes the Spectral Correlation Function of a data cube (Rosolowsky et al, 1999). Parameters ---------- cube : %(dtypes)s Data cube. header : FITS header, optional Header for the cube. size : int, optional The total size of the lags used in one dimension in pixels. The maximum lag size will be (size - 1) / 2 in each direction. roll_lags : `~numpy.ndarray` or `~astropy.units.Quantity`, optional Pass a custom array of lag values. An odd number of lags, centered at 0, must be given. If no units are given, it is assumed that the lags are in pixels. The lags should have symmetric positive and negative values (e.g., [-1, 0, 1]). distance : `~astropy.units.Quantity`, optional Physical distance to the region in the data. Examples -------- >>> from spectral_cube import SpectralCube >>> from turbustat.statistics import SCF >>> cube = SpectralCube.read("Design4.13co.fits") # doctest: +SKIP >>> scf = SCF(cube) # doctest: +SKIP >>> scf.run(verbose=True) # doctest: +SKIP ''' __doc__ %= {"dtypes": " or ".join(common_types + threed_types)} def __init__(self, cube, header=None, size=11, roll_lags=None, distance=None): super(SCF, self).__init__() # Set data and header self.input_data_header(cube, header) if distance is not None: self.distance = distance if roll_lags is None: if size % 2 == 0: Warning("Size must be odd. Reducing size to next lowest odd" " number.") size = size - 1 self.roll_lags = (np.arange(size) - size // 2) * u.pix else: if roll_lags.size % 2 == 0: Warning("Size of roll_lags must be odd. Reducing size to next" "lowest odd number.") roll_lags = roll_lags[: -1] if isinstance(roll_lags, u.Quantity): pass elif isinstance(roll_lags, np.ndarray): roll_lags = roll_lags * u.pix else: raise TypeError("roll_lags must be an astropy.units.Quantity" " array or a numpy.ndarray.") self.roll_lags = roll_lags # Make sure that we can convert the lags self._to_pixel(self.roll_lags) self.size = self.roll_lags.size self._scf_surface = None self._scf_spectrum_stddev = None self._fit2D_flag = False @property def roll_lags(self): ''' Pixel values that the cube is rolled by to compute the SCF correlation surface. ''' return self._roll_lags @roll_lags.setter def roll_lags(self, value): # Needs to be a quantity with a unit if not hasattr(value, "unit"): raise ValueError("roll_lags must be an astropy.units.Quantity.") try: self._to_pixel(value) except u.UnitConversionError: raise u.UnitConversionError("Cannot convert given roll lags to " "pixel units. `roll_lags` must have" " pixel, angular, or physical (if a" " distance is given) units.") self._roll_lags = value @property def scf_surface(self): ''' SCF correlation array ''' return self._scf_surface @property def scf_spectrum(self): ''' Azimuthally averaged 1D SCF spectrum ''' return self._scf_spectrum @property def scf_spectrum_stddev(self): ''' Standard deviation of the `~SCF.scf_spectrum` ''' return self._scf_spectrum_stddev @property def lags(self): ''' Values of the lags, in pixels, to compute SCF at ''' return self._lags
[docs] def compute_surface(self, boundary='continuous', show_progress=True): ''' Computes the SCF up to the given lag value. This is an expensive operation and could take a long time to calculate. Parameters ---------- boundary : {"continuous", "cut"} Treat the boundary as continuous (wrap-around) or cut values beyond the edge (i.e., for most observational data). show_progress : bool, optional Show a progress bar when computing the surface. = ''' if boundary not in ["continuous", "cut"]: raise ValueError("boundary must be 'continuous' or 'cut'.") self._scf_surface = np.zeros((self.size, self.size)) # Convert the lags into pixel units. pix_lags = self._to_pixel(self.roll_lags).value dx = pix_lags.copy() dy = pix_lags.copy() if show_progress: bar = ProgressBar(len(dx) * len(dy)) for n, (x_shift, y_shift) in enumerate(product(dx, dy)): i, j = np.unravel_index(n, (len(dx), len(dy))) if x_shift == 0 and y_shift == 0: self._scf_surface[j, i] = 1. if x_shift == 0: tmp = self.data else: if float(x_shift).is_integer(): shift_func = pixel_shift else: shift_func = fourier_shift tmp = shift_func(self.data, x_shift, axis=1) if y_shift != 0: if float(y_shift).is_integer(): shift_func = pixel_shift else: shift_func = fourier_shift tmp = shift_func(tmp, y_shift, axis=2) if boundary is "cut": # Always round up to the nearest integer. x_shift = np.ceil(x_shift).astype(int) y_shift = np.ceil(y_shift).astype(int) if x_shift < 0: x_slice_data = slice(None, tmp.shape[1] + x_shift) x_slice_tmp = slice(-x_shift, None) else: x_slice_data = slice(x_shift, None) x_slice_tmp = slice(None, tmp.shape[1] - x_shift) if y_shift < 0: y_slice_data = slice(None, tmp.shape[2] + y_shift) y_slice_tmp = slice(-y_shift, None) else: y_slice_data = slice(y_shift, None) y_slice_tmp = slice(None, tmp.shape[2] - y_shift) data_slice = (slice(None), x_slice_data, y_slice_data) tmp_slice = (slice(None), x_slice_tmp, y_slice_tmp) elif boundary is "continuous": data_slice = (slice(None),) * 3 tmp_slice = (slice(None),) * 3 values = \ np.nansum(((self.data[data_slice] - tmp[tmp_slice]) ** 2), axis=0) / \ (np.nansum(self.data[data_slice] ** 2, axis=0) + np.nansum(tmp[tmp_slice] ** 2, axis=0)) scf_value = 1. - \ np.sqrt(np.nansum(values) / np.sum(np.isfinite(values))) if scf_value > 1: raise ValueError("Cannot have a correlation above 1. Check " "your input data. Contact the TurbuStat " "authors if the problem persists.") self._scf_surface[j, i] = scf_value if show_progress: bar.update(n + 1)
[docs] def compute_spectrum(self, **kwargs): ''' Compute the 1D spectrum as a function of lag. Can optionally use log-spaced bins. kwargs are passed into the pspec function, which provides many options. The default settings are applicable in nearly all use cases. Parameters ---------- kwargs : passed to `turbustat.statistics.psds.pspec` ''' # If scf_surface hasn't been computed, do it if self.scf_surface is None: self.compute_surface() if "logspacing" in kwargs: warn("Disabled log-spaced bins. This does not work well for the" " SCF.", TurbuStatMetricWarning) kwargs.pop('logspacing') if "theta_0" in kwargs: azim_constraint_flag = True else: azim_constraint_flag = False out = pspec(self.scf_surface, return_stddev=True, logspacing=False, return_freqs=False, **kwargs) self._azim_constraint_flag = azim_constraint_flag if azim_constraint_flag: self._lags, self._scf_spectrum, self._scf_spectrum_stddev, \ self._azim_mask = out else: self._lags, self._scf_spectrum, self._scf_spectrum_stddev = out roll_lag_diff = np.abs(self.roll_lags[1] - self.roll_lags[0]) self._lags = self._lags * roll_lag_diff
[docs] def fit_plaw(self, xlow=None, xhigh=None, verbose=False, bootstrap=False, **bootstrap_kwargs): ''' Fit a power-law to the SCF spectrum. Parameters ---------- xlow : `~astropy.units.Quantity`, optional Lower lag value limit to consider in the fit. xhigh : `~astropy.units.Quantity`, optional Upper lag value limit to consider in the fit. verbose : bool, optional Show fit summary when enabled. ''' pix_lags = self._to_pixel(self.lags) x = np.log10(pix_lags.value) y = np.log10(self.scf_spectrum) if xlow is not None: if not isinstance(xlow, u.Quantity): raise TypeError("xlow must be an astropy.units.Quantity.") # Convert xlow into the same units as the lags xlow = self._to_pixel(xlow) self._xlow = xlow lower_limit = x >= np.log10(xlow.value) else: lower_limit = \ np.ones_like(self.scf_spectrum, dtype=bool) self._xlow = np.abs(self.lags).min() if xhigh is not None: if not isinstance(xhigh, u.Quantity): raise TypeError("xlow must be an astropy.units.Quantity.") # Convert xhigh into the same units as the lags xhigh = self._to_pixel(xhigh) self._xhigh = xhigh upper_limit = x <= np.log10(xhigh.value) else: upper_limit = \ np.ones_like(self.scf_spectrum, dtype=bool) self._xhigh = np.abs(self.lags).max() within_limits = np.logical_and(lower_limit, upper_limit) if not within_limits.any(): raise ValueError("Limits have removed all lag values. Make xlow" " and xhigh less restrictive.") y = y[within_limits] x = x[within_limits] x = sm.add_constant(x) # If the std were computed, use them as weights # Converting to the log stds doesn't matter since the weights # remain proportional to 1/sigma^2, and an overal normalization is # applied in the fitting routine. weights = self.scf_spectrum_stddev[within_limits] ** -2 model = sm.WLS(y, x, missing='drop', weights=weights) self.fit = model.fit(cov_type='HC3') self._slope = self.fit.params[1] if bootstrap: stderrs = residual_bootstrap(self.fit, **bootstrap_kwargs) self._slope_err = stderrs[1] else: self._slope_err = self.fit.bse[1] self._bootstrap_flag = bootstrap if verbose: print(self.fit.summary()) if self._bootstrap_flag: print("Bootstrapping used to find stderrs! " "Errors may not equal those shown above.")
@property def slope(self): ''' SCF spectrum slope ''' return self._slope @property def slope_err(self): ''' 1-sigma error on the SCF spectrum slope ''' return self._slope_err @property def xlow(self): ''' Lower limit for lags to consider in fits. ''' return self._xlow @property def xhigh(self): ''' Upper limit for lags to consider in fits. ''' return self._xhigh
[docs] def fitted_model(self, xvals): ''' Computes the modelled power-law using the given x values. Parameters ---------- xvals : `~astropy.Quantity` Values of lags to compute the model at. Returns ------- model_values : `~numpy.ndarray` Values of the model at the given values. Equivalent to log values of the SCF spectrum. ''' if not isinstance(xvals, u.Quantity): raise TypeError("xvals must be an astropy.units.Quantity.") # Convert into the lag units used for the fit xvals = self._to_pixel(xvals).value model_values = \ self.fit.params[0] + self.fit.params[1] * np.log10(xvals) return 10**model_values
[docs] def fit_2Dplaw(self, fit_method='LevMarq', p0=(), xlow=None, xhigh=None, bootstrap=True, niters=100, use_azimmask=False): ''' Model the 2D power-spectrum surface with an elliptical power-law model. Parameters ---------- fit_method : str, optional The algorithm fitting to use. Only 'LevMarq' is currently available. p0 : tuple, optional Initial parameters for fitting. If no values are given, the initial parameters start from the 1D fit parameters. xlow : `~astropy.units.Quantity`, optional Lower lag value limit to consider in the fit. xhigh : `~astropy.units.Quantity`, optional Upper lag value limit to consider in the fit. bootstrap : bool, optional Bootstrap using the model residuals to estimate the parameter standard errors. This tends to give more realistic intervals than the covariance matrix. niters : int, optional Number of bootstrap iterations. use_azimmask : bool, optional Use the azimuthal mask defined for the 1D spectrum, when azimuthal limit have been given. ''' # Adjust the distance based on the separation of the lags pix_lag_diff = np.diff(self._to_pixel(self.lags))[0].value if xlow is not None: # Convert xlow into the same units as the lags xlow = self._to_pixel(xlow) self._xlow = xlow else: self._xlow = np.abs(self.lags).min() if xhigh is not None: # Convert xhigh into the same units as the lags xhigh = self._to_pixel(xhigh) self._xhigh = xhigh else: self._xhigh = np.abs(self.lags).max() xlow_pix = self._to_pixel(self.xlow).value xhigh_pix = self._to_pixel(self.xhigh).value yy, xx = make_radial_arrays(self.scf_surface.shape) # Needed to make sure the definition of theta is consistent with # azimuthal masking and the elliptical p-law yy = yy[::-1] xx = xx[::-1] dists = np.sqrt(yy**2 + xx**2) * pix_lag_diff mask = clip_func(dists, xlow_pix, xhigh_pix) if hasattr(self, "_azim_mask") and use_azimmask: mask = np.logical_and(mask, self._azim_mask) if not mask.any(): raise ValueError("Limits have removed all lag values. Make xlow" " and xhigh less restrictive.") if len(p0) == 0: if hasattr(self, 'slope'): slope_guess = self.slope amp_guess = self.fit.params[0] else: # Let's guess it's going to be ~ -0.2 slope_guess = -0.2 amp_guess = 1.0 # Use an initial guess pi / 2 for theta theta = np.pi / 2. # For ellip = 0.5 ellip_conv = 0 p0 = (amp_guess, ellip_conv, theta, slope_guess) params, stderrs, fit_2Dmodel, fitter = \ fit_elliptical_powerlaw(np.log10(self.scf_surface[mask]), xx[mask], yy[mask], p0, fit_method=fit_method, bootstrap=bootstrap, niters=niters) self.fit2D = fit_2Dmodel self._fitter = fitter self._slope2D = params[3] self._slope2D_err = stderrs[3] self._theta2D = params[2] % np.pi self._theta2D_err = stderrs[2] # Apply transforms to convert back to the [0, 1) ellipticity range self._ellip2D = inverse_interval_transform(params[1], 0, 1) self._ellip2D_err = \ inverse_interval_transform_stderr(stderrs[1], params[1], 0, 1) self._fit2D_flag = True
@property def slope2D(self): ''' Fitted slope of the 2D power-law. ''' return self._slope2D @property def slope2D_err(self): ''' Slope standard error of the 2D power-law. ''' return self._slope2D_err @property def theta2D(self): ''' Fitted position angle of the 2D power-law. ''' return self._theta2D @property def theta2D_err(self): ''' Position angle standard error of the 2D power-law. ''' return self._theta2D_err @property def ellip2D(self): ''' Fitted ellipticity of the 2D power-law. ''' return self._ellip2D @property def ellip2D_err(self): ''' Ellipticity standard error of the 2D power-law. ''' return self._ellip2D_err
[docs] def plot_fit(self, save_name=None, show_radial=True, show_residual=True, show_surface=True, contour_color='k', cmap='viridis', data_color='r', fit_color='k', xunit=u.pix): ''' Plot the SCF surface, radial profiles, and associated fits. Parameters ---------- save_name : str, optional Save name for the figure. Enables saving the plot. show_radial : bool, optional Show the azimuthally-averaged 1D SCF spectrum and fit. show_surface : bool, optional Show the SCF surface and (if performed) fit. show_residual : bool, optional Plot the residuals for the 1D SCF fit. contour_color : {str, RGB tuple}, optional Color of the 2D fit contours. cmap : {str, matplotlib color map}, optional Colormap to use in the plots. Default is viridis. data_color : {str, RGB tuple}, optional Color of the azimuthally-averaged data. fit_color : {str, RGB tuple}, optional Color of the 1D fit. xunit : `~astropy.units.Unit`, optional Choose the angular unit to convert to when ang_units is enabled. ''' import matplotlib.pyplot as plt from mpl_toolkits.axes_grid1 import make_axes_locatable fig = plt.gcf() axes = plt.gcf().get_axes() if len(axes) == 3: ax, ax2, ax_r = axes elif len(axes) == 2: if show_surface and not show_residual: ax, ax2 = axes else: ax2, ax_r = axes elif len(axes) == 1: if show_radial: ax = axes[0] else: ax2 = axes[0] else: if show_surface: if show_radial: ax = plt.subplot2grid((4, 4), (0, 2), colspan=2, rowspan=4) if show_residual: ax2 = plt.subplot2grid((4, 4), (0, 0), colspan=2, rowspan=3) ax_r = plt.subplot2grid((4, 4), (3, 0), colspan=2, rowspan=1, sharex=ax2) else: ax2 = plt.subplot2grid((4, 4), (0, 0), colspan=2, rowspan=4) else: ax = plt.subplot2grid((4, 4), (0, 0), colspan=4, rowspan=4) else: if show_residual: ax2 = plt.subplot2grid((4, 4), (0, 0), colspan=4, rowspan=3) ax_r = plt.subplot2grid((4, 4), (3, 0), colspan=4, rowspan=1, sharex=ax2) else: ax2 = plt.subplot2grid((4, 4), (0, 0), colspan=4, rowspan=4) if show_surface: im1 = ax.imshow(self.scf_surface, origin="lower", interpolation="nearest", cmap=cmap) divider = make_axes_locatable(ax) cax = divider.append_axes("right", "5%", pad="3%") cb = plt.colorbar(im1, cax=cax) cb.set_label("SCF Value") yy, xx = make_radial_arrays(self.scf_surface.shape) pix_lag_diff = np.diff(self._to_pixel(self.lags))[0].value dists = np.sqrt(yy**2 + xx**2) * pix_lag_diff xlow_pix = self._to_pixel(self.xlow).value xhigh_pix = self._to_pixel(self.xhigh).value mask = clip_func(dists, xlow_pix, xhigh_pix) if not mask.all(): ax.contour(mask, colors='b', linestyles='-.', levels=[0.5]) if self._fit2D_flag: ax.contour(self.fit2D(xx, yy)[::-1], colors=contour_color, linestyles='-') if self._azim_constraint_flag: if not np.all(self._azim_mask): ax.contour(self._azim_mask, colors='b', linestyles='-.', levels=[0.5]) else: warn("Azimuthal mask includes all data. No contours will " "be drawn.") if show_radial: pix_lags = self._to_pixel(self.lags) lags = self._spatial_unit_conversion(pix_lags, xunit).value ax2.errorbar(lags, self.scf_spectrum, yerr=self.scf_spectrum_stddev, fmt='o', color=data_color, markersize=5) ax2.set_xscale("log") # , nonposy='clip') ax2.set_yscale("log") # , nonposy='clip') ax2.set_xlim(lags.min() * 0.75, lags.max() * 1.25) ax2.set_ylim(np.nanmin(self.scf_spectrum) * 0.75, np.nanmax(self.scf_spectrum) * 1.25) # Overlay the fit. Use points 5% lower than the min and max. xvals = np.linspace(lags.min() * 0.95, lags.max() * 1.05, 50) * xunit ax2.loglog(xvals, self.fitted_model(xvals), '--', linewidth=2, label='Fit', color=fit_color) # Show the fit limits xlow = self._spatial_unit_conversion(self._xlow, xunit).value xhigh = self._spatial_unit_conversion(self._xhigh, xunit).value ax2.axvline(xlow, color='b', alpha=0.5, linestyle='-.') ax2.axvline(xhigh, color='b', alpha=0.5, linestyle='-.') ax2.legend() ax2.set_ylabel("SCF Value") if show_residual: resids = self.scf_spectrum - self.fitted_model(pix_lags) ax_r.errorbar(lags, resids, yerr=self.scf_spectrum_stddev, fmt='o', color=data_color, markersize=5) ax_r.axvline(xlow, color='b', alpha=0.5, linestyle='-.') ax_r.axvline(xhigh, color='b', alpha=0.5, linestyle='-.') ax_r.axhline(0., color=fit_color, linestyle='--') ax_r.set_ylabel("Residuals") ax_r.set_xlabel("Lag ({})".format(xunit)) # ax2.get_xaxis().set_ticks([]) else: ax2.set_xlabel("Lag ({})".format(xunit)) plt.tight_layout() fig.subplots_adjust(hspace=0.1) if save_name is not None: plt.savefig(save_name) plt.close() else: plt.show()
[docs] def run(self, boundary='continuous', show_progress=True, xlow=None, xhigh=None, fit_kwargs={}, fit_2D=True, fit_2D_kwargs={}, radialavg_kwargs={}, verbose=False, xunit=u.pix, save_name=None): ''' Computes all SCF outputs. Parameters ---------- boundary : {"continuous", "cut"} Treat the boundary as continuous (wrap-around) or cut values beyond the edge (i.e., for most observational data). show_progress : bool, optional Show a progress bar during the creation of the covariance matrix. xlow : `~astropy.Quantity`, optional See `~SCF.fit_plaw`. xhigh : `~astropy.Quantity`, optional See `~SCF.fit_plaw`. fit_kwargs : dict, optional Keyword arguments for `SCF.fit_plaw`. Use the `xlow` and `xhigh` keywords to provide fit limits. fit_2D : bool, optional Fit an elliptical power-law model to the 2D spectrum. fit_2D_kwargs : dict, optional Keyword arguments for `SCF.fit_2Dplaw`. Use the `xlow` and `xhigh` keywords to provide fit limits. radialavg_kwargs : dict, optional Passed to `~SCF.compute_spectrum`. verbose : bool, optional Enables plotting. xunit : `~astropy.units.Unit`, optional Choose the angular unit to convert to when ang_units is enabled. save_name : str, optional Save the figure when a file name is given. ''' self.compute_surface(boundary=boundary, show_progress=show_progress) self.compute_spectrum(**radialavg_kwargs) self.fit_plaw(verbose=verbose, xlow=xlow, xhigh=xhigh, **fit_kwargs) if fit_2D: self.fit_2Dplaw(xlow=xlow, xhigh=xhigh, **fit_2D_kwargs) if verbose: self.plot_fit(save_name=save_name, xunit=xunit) return self
[docs] class SCF_Distance(object): ''' Calculates the distance between two data cubes based on their SCF surfaces. The distance is the L2 norm between the surfaces. We weight the surface by 1/r^2 where r is the distance from the centre. .. note:: When a pre-computed `~SCF` class is given for `cube1` or `cube2`, it needs to have the same set of lags between the cubes, defined by as the angular scales based on the FITS header. If the lags are not equivalent, the SCF will be re-computed with new lags. Parameters ---------- cube1 : %(dtypes)s or `~SCF` Data cube. Or a `~SCF` class can be passed which may be pre-computed. cube2 : %(dtypes)s or `~SCF` Data cube. Or a `~SCF` class can be passed which may be pre-computed. size : int, optional Maximum size roll, in pixels, over which SCF will be calculated. If the angular scale is different between the data cubes, the lags are scaled to have the same angular scales. boundary : {"continuous", "cut"} Treat the boundary as continuous (wrap-around) or cut values beyond the edge (i.e., for most observational data). A two element list can also be passed for treating the boundaries differently between the given cubes. ''' __doc__ %= {"dtypes": " or ".join(common_types + threed_types)} def __init__(self, cube1, cube2, size=11, boundary='continuous', show_progress=True): if isinstance(cube1, SCF): self.scf1 = cube1 _has_data1 = False else: dataset1 = input_data(cube1, no_header=False) _has_data1 = True if isinstance(cube2, SCF): self.scf2 = cube2 _has_data2 = False else: dataset2 = input_data(cube2, no_header=False) _has_data2 = True # Create a default set of lags, in pixels if size % 2 == 0: Warning("Size must be odd. Reducing size to next lowest odd" " number.") size = size - 1 self.size = size roll_lags = (np.arange(size) - size // 2) * u.pix # Now adjust the lags such they have a common scaling when the datasets # are not on a common grid. wcs1 = WCS(dataset1[1]) if _has_data1 else self.scf1._wcs wcs2 = WCS(dataset2[1]) if _has_data2 else self.scf2._wcs scale = common_scale(wcs1, wcs2) if scale == 1.0: if not _has_data1: roll_lags1 = self.scf1._to_pixel(self.scf1.roll_lags) else: roll_lags1 = roll_lags if not _has_data1: roll_lags2 = self.scf2._to_pixel(self.scf2.roll_lags) else: roll_lags2 = roll_lags if not (roll_lags1 == roll_lags2).all(): raise ValueError("The roll lags must match when using pre-computed SCFs.") elif scale > 1.0: roll_lags1 = scale * roll_lags roll_lags2 = roll_lags else: roll_lags1 = roll_lags roll_lags2 = roll_lags / float(scale) if not isinstance(boundary, string_types): if len(boundary) != 2: raise ValueError("If boundary is not a string, it must be a " "list or array of 2 string elements.") else: boundary = [boundary, boundary] # if fiducial_model is not None: # self.scf1 = fiducial_model if _has_data1: self.scf1 = SCF(cube1, roll_lags=roll_lags1) needs_run1 = True else: needs_run1 = False if roll_lags1.size != self.scf1.roll_lags.size: lag_check = True else: lag_check = (roll_lags1 == self.scf1._to_pixel(self.scf1.roll_lags)).all() if not lag_check: warn("SCF given as cube1 needs to be recomputed as the lags" " must match the common set of lags between the two data" " sets. Recomputing SCF.") needs_run1 = True self.scf1.roll_lags = roll_lags1 compute_check = hasattr(self.scf1, "_scf_spectrum") if not compute_check: warn("SCF given as cube1 does not have an SCF" " spectrum computed. Recomputing SCF.") needs_run1 = True if needs_run1: self.scf1.compute_surface(boundary=boundary[0], show_progress=show_progress) # This is for the plot, not the distance, so stick with default # params self.scf1.compute_spectrum() if _has_data2: self.scf2 = SCF(cube2, roll_lags=roll_lags2) needs_run2 = True else: needs_run2 = False if roll_lags2.size != self.scf2.roll_lags.size: lag_check = True else: lag_check = (roll_lags2 == self.scf2._to_pixel(self.scf2.roll_lags)).all() if not lag_check: warn("SCF given as cube2 needs to be recomputed as the lags" " must match the common set of lags between the two data" " sets. Recomputing SCF.") needs_run2 = True self.scf2.roll_lags = roll_lags2 compute_check = hasattr(self.scf2, "_scf_spectrum") if not compute_check: warn("SCF given as cube2 does not have an SCF" " spectrum computed. Recomputing SCF.") needs_run2 = True if needs_run2: self.scf2.compute_surface(boundary=boundary[1], show_progress=show_progress) # This is for the plot, not the distance, so stick with default # params self.scf2.compute_spectrum() if not needs_run1 and not needs_run2: self.size = self.scf1.size
[docs] def distance_metric(self, weighted=True, verbose=False, plot_kwargs1={'color': 'b', 'marker': 'D', 'label': '1'}, plot_kwargs2={'color': 'g', 'marker': 'o', 'label': '2'}, xunit=u.pix, save_name=None): ''' Compute the distance between the surfaces. Parameters ---------- weighted : bool, optional Sets whether to apply the 1/r weighting to the distance. verbose : bool, optional Enables plotting. plot_kwargs1 : dict, optional Pass kwargs to `~matplotlib.pyplot.plot` for `cube1`. plot_kwargs2 : dict, optional Pass kwargs to `~matplotlib.pyplot.plot` for `cube2`. xunit : `~astropy.units.Unit`, optional Unit of the x-axis in the plot in pixel, angular, or physical units. save_name : str,optional Save the figure when a file name is given. ''' # Since the angular scales are matched, we can assume that they will # have the same weights. So just use the shape of the lags to create # the weight surface. dx = np.arange(self.size) - self.size // 2 dy = np.arange(self.size) - self.size // 2 a, b = np.meshgrid(dx, dy) if weighted: dist_weight = 1 / np.sqrt(a ** 2 + b ** 2) # Centre pixel set to 1 dist_weight[np.where((a == 0) & (b == 0))] = 1. else: dist_weight = np.ones((self.size, self.size)) difference = (self.scf1.scf_surface - self.scf2.scf_surface) ** 2. * \ dist_weight self.distance = np.sqrt(np.sum(difference) / np.sum(dist_weight)) if verbose: import matplotlib.pyplot as plt defaults1 = {'color': 'b', 'marker': 'D', 'label': '1'} defaults2 = {'color': 'g', 'marker': 'o', 'label': '2'} for key in defaults1: if key not in plot_kwargs1: plot_kwargs1[key] = defaults1[key] for key in defaults2: if key not in plot_kwargs2: plot_kwargs2[key] = defaults2[key] fig = plt.figure() ax0 = fig.add_subplot(2, 2, 1) ax1 = fig.add_subplot(2, 2, 2, sharex=ax0, sharey=ax0) ax2 = fig.add_subplot(2, 2, 3, sharex=ax0, sharey=ax0) ax3 = fig.add_subplot(2, 2, 4) vmin = min(self.scf1.scf_surface.min(), self.scf2.scf_surface.min()) im0 = ax0.imshow(self.scf1.scf_surface, origin="lower", interpolation="nearest", vmin=vmin) ax0.set_title(plot_kwargs1['label']) fig.colorbar(im0, ax=ax0) im1 = ax1.imshow(self.scf2.scf_surface, origin="lower", interpolation="nearest", vmin=vmin) ax1.set_title(plot_kwargs2['label']) fig.colorbar(im1, ax=ax1) im2 = ax2.imshow(difference, origin="lower", interpolation="nearest") ax2.set_title("") fig.colorbar(im2, ax=ax2) pix_lags1 = self.scf1._to_pixel(self.scf1.lags) lags1 = self.scf1._spatial_unit_conversion(pix_lags1, xunit).value pix_lags2 = self.scf2._to_pixel(self.scf2.lags) lags2 = self.scf2._spatial_unit_conversion(pix_lags2, xunit).value ax3.errorbar(lags1, self.scf1.scf_spectrum, yerr=self.scf1.scf_spectrum_stddev, fmt=plot_kwargs1['marker'], color=plot_kwargs1['color'], markersize=5, label=plot_kwargs1['label']) ax3.errorbar(lags2, self.scf2.scf_spectrum, yerr=self.scf2.scf_spectrum_stddev, fmt=plot_kwargs2['marker'], color=plot_kwargs2['color'], markersize=5, label=plot_kwargs2['label']) ax3.set_xscale("log") ax3.set_yscale("log") ax3.set_xlim(min(lags1.min(), lags2.min()) * 0.75, max(lags1.max(), lags2.max()) * 1.25) ax3.set_ylim(min(self.scf1.scf_spectrum.min(), self.scf2.scf_spectrum.min()) * 0.75, max(self.scf1.scf_spectrum.max(), self.scf2.scf_spectrum.max()) * 1.25) ax3.grid(True) ax3.set_xlabel("Lags ({})".format(xunit)) plt.tight_layout() if save_name is not None: plt.savefig(save_name) plt.close() else: plt.show() return self