Source code for turbustat.statistics.pspec_bispec.bispec

# Licensed under an MIT open source license - see LICENSE
# from __future__ import print_function, absolute_import, division

import numpy as np
import numpy.random as ra
import astropy.units as u
from scipy.stats import binned_statistic
from warnings import warn
from astropy.utils.console import ProgressBar
from astropy.utils import NumpyRNGContext
from itertools import product

try:
    from pyfftw.interfaces.numpy_fft import fft2
    PYFFTW_FLAG = True
except ImportError:
    PYFFTW_FLAG = False

from ..base_statistic import BaseStatisticMixIn
from ...io import common_types, twod_types, input_data
from ..psds import make_radial_arrays


[docs] class Bispectrum(BaseStatisticMixIn): """ Computes the bispectrum (three-point correlation function) of the given image (Burkhart et al., 2010). The bispectrum and the bicoherence are returned. The bicoherence is a normalized version (real and to unity) of the bispectrum. Parameters ---------- img : %(dtypes)s 2D image. Examples -------- >>> from turbustat.statistics import Bispectrum >>> from astropy.io import fits >>> moment0 = fits.open("Design4_21_0_0_flatrho_0021_13co.moment0.fits") # doctest: +SKIP >>> bispec = Bispectrum(moment0) # doctest: +SKIP >>> bispec.run(verbose=True, nsamples=1000) # doctest: +SKIP """ __doc__ %= {"dtypes": " or ".join(common_types + twod_types)} def __init__(self, img): self.need_header_flag = False self.header = None self.data = input_data(img, no_header=True) self.shape = self.data.shape # Set nans to min isnan = np.isnan(self.data) if isnan.any(): self.data = self.data.copy() self.data[isnan] = 0.
[docs] def compute_bispectrum(self, show_progress=True, use_pyfftw=False, threads=1, nsamples=100, seed=1000, mean_subtract=False, **pyfftw_kwargs): ''' Do the computation. Parameters ---------- show_progress : optional, bool Show progress bar while sampling the bispectrum. use_pyfftw : bool, optional Enable to use pyfftw, if it is installed. threads : int, optional Number of threads to use in FFT when using pyfftw. nsamples : int, optional Sets the number of samples to take at each vector magnitude. seed : int, optional Sets the seed for the distribution draws. mean_subtract : bool, optional Subtract the mean from the data before computing. This removes the "zero frequency" (i.e., constant) portion of the power, resulting in a loss of phase coherence along the k_1=k_2 line. pyfft_kwargs : Passed to `~turbustat.statistics.rfft_to_fft.rfft_to_fft`. See `here <https://hgomersall.github.io/pyFFTW/pyfftw/interfaces/interfaces.html#interfaces-additional-args>`_ for a list of accepted kwargs. ''' if mean_subtract: norm_data = self.data - self.data.mean() else: norm_data = self.data if use_pyfftw: if PYFFTW_FLAG: if pyfftw_kwargs.get('threads') is not None: pyfftw_kwargs.pop('threads') fftarr = fft2(norm_data, threads=threads, **pyfftw_kwargs) else: warn("pyfftw not installed. Reverting to using numpy.") use_pyfftw = False if not use_pyfftw: fftarr = np.fft.fft2(norm_data) conjfft = np.conj(fftarr) bispec_shape = (int(self.shape[0] / 2.), int(self.shape[1] / 2.)) self._bispectrum = np.zeros(bispec_shape, dtype=complex) self._bicoherence = np.zeros(bispec_shape, dtype=float) self._tracker = np.zeros(self.shape, dtype=np.int16) biconorm = np.ones_like(self.bispectrum, dtype=float) if show_progress: bar = ProgressBar(np.prod(fftarr.shape) / 4.) prod = product(range(int(fftarr.shape[0] / 2.)), range(int(fftarr.shape[1] / 2.))) with NumpyRNGContext(seed): for n, (k1mag, k2mag) in enumerate(prod): phi1 = ra.uniform(0, 2 * np.pi, nsamples) phi2 = ra.uniform(0, 2 * np.pi, nsamples) k1x = np.asarray([int(k1mag * np.cos(angle)) for angle in phi1]) k2x = np.asarray([int(k2mag * np.cos(angle)) for angle in phi2]) k1y = np.asarray([int(k1mag * np.sin(angle)) for angle in phi1]) k2y = np.asarray([int(k2mag * np.sin(angle)) for angle in phi2]) k3x = np.asarray([int(k1mag * np.cos(ang1) + k2mag * np.cos(ang2)) for ang1, ang2 in zip(phi1, phi2)]) k3y = np.asarray([int(k1mag * np.sin(ang1) + k2mag * np.sin(ang2)) for ang1, ang2 in zip(phi1, phi2)]) samps = fftarr[k1x, k1y] * fftarr[k2x, k2y] * conjfft[k3x, k3y] self._bispectrum[k1mag, k2mag] = np.sum(samps) biconorm[k1mag, k2mag] = np.sum(np.abs(samps)) # Track where we're sampling from in fourier space self._tracker[k1x, k1y] += 1 self._tracker[k2x, k2y] += 1 self._tracker[k3x, k3y] += 1 if show_progress: bar.update(n + 1) self._bicoherence = (np.abs(self.bispectrum) / biconorm) self._bispectrum_amp = np.log10(np.abs(self.bispectrum))
@property def bispectrum(self): ''' Bispectrum array. ''' return self._bispectrum @property def bispectrum_logamp(self): ''' log amplitudes of the bispectrum. ''' return self._bispectrum_amp @property def bicoherence(self): ''' Bicoherence array. ''' return self._bicoherence @property def tracker(self): ''' Array showing the number of samples in each k_1 k_2 bin. ''' return self._tracker
[docs] def azimuthal_slice(self, radii, delta_radii, bin_width=5. * u.deg, value='bispectrum', return_masks=False): ''' Create an azimuthal slice of the bispectrum or bicoherence surfaces. Parameters ---------- radii : float or np.ndarray Radii in the bispectrum plane to extract slices at. Multiple slices are returned if radii is an array. delta_radii : float or np.ndarray The width around the radii in the bispectrum plane. If multiple radii are given, `delta_radii` must match the length of `radii`. bin_width : `~astropy.units.Quantity`, optional The angular size of the bins used to create the slice. value : str, optional Which surface to create a profile from. Can be "bispectrum" (default), "bispectrum_logamp", or "bicoherence". return_masks : bool, optional Return the radial masks used to create the slices. Returns ------- azimuthal_slices : dict Dictionary with the azimuthal slices. Each radius given in radii is a key in the dictionary. Each slice is a numpy array containing the bin centers, the averaged values, and the standard deviations in the azimuthal bins. ''' if value == "bispectrum": value_arr = self.bispectrum elif value == "bispectrum_logamp": value_arr = self.bispectrum_logamp elif value == "bicoherence": value_arr = self.bicoherence else: raise TypeError("value must be 'bispectrum'" ", 'bispectrum_logamp', or 'bicoherence'") # Keep a finite value array for binned_statistic for scipy >1.14 finite_mask = np.isfinite(value_arr) if isinstance(radii, np.ndarray) or isinstance(radii, list): if not isinstance(delta_radii, np.ndarray): delta_radii = np.array([delta_radii] * len(radii)) if len(radii) != len(delta_radii): raise ValueError("Length of radii and delta_radii must match.") else: radii = np.array([radii]) if not isinstance(delta_radii, np.ndarray): delta_radii = np.array([delta_radii]) if not hasattr(bin_width, "unit"): raise TypeError("bin_width must have an attached angular unit.") elif not bin_width.unit.is_equivalent(u.rad): raise TypeError("bin_width must have an attached angular unit.") else: bin_width = bin_width.to(u.rad).value kky, kkx = make_radial_arrays(self.bispectrum.shape, y_center=0, x_center=0) dist = np.sqrt(kky**2 + kkx**2) theta = np.arctan2(kky, kkx) nbins = np.floor(np.pi / bin_width).astype(int) bins = np.linspace(0, np.pi, nbins) azimuthal_slices = {} if return_masks: masks = [] for rad, del_rad in zip(radii, delta_radii): # Create the mask of the radii to extract the profile at. mask = np.logical_and(dist >= rad - del_rad / 2., dist <= rad + del_rad / 2.) vals, bin_edge, cts = binned_statistic(theta[mask & finite_mask].ravel(), value_arr[mask & finite_mask].ravel(), bins=bins, statistic=np.nanmean) stds, bin_edge, cts = binned_statistic(theta[mask & finite_mask].ravel(), value_arr[mask & finite_mask].ravel(), bins=bins, statistic=np.nanstd) bin_cents = (bin_edge[1:] + bin_edge[:-1]) / 2. azimuthal_slices[rad] = np.array([bin_cents, vals, stds]) if return_masks: masks.append(mask) if return_masks: return azimuthal_slices, masks return azimuthal_slices
[docs] def radial_slice(self, thetas, delta_thetas, bin_width=1., value='bispectrum', return_masks=False): ''' Create a radial slice of the bispectrum (or bicoherence) plane. Parameters ---------- thetas : `~astropy.units.Quantity` Azimuthal angles in the bispectrum plane to extract slices at. Multiple slices are returned if `thetas` is an array. delta_thetas : `~astropy.units.Quantity` The width around the angle in the bispectrum plane. If multiple angles are given, `delta_thetas` must match the length of `thetas`. bin_width : float, optional The radial size of the bins used to create the slice. value : str, optional Which surface to create a profile from. Can be "bispectrum" (default), "bispectrum_logamp", or "bicoherence". return_masks : bool, optional Return the radial masks used to create the slices. Returns ------- radial_slices : dict Dictionary with the radial slices. Each angle given in thetas is a key in the dictionary. Each slice is a numpy array containing the bin centers, the averaged values, and the standard deviations in the radial bins. ''' if value == "bispectrum": value_arr = self.bispectrum elif value == "bispectrum_logamp": value_arr = self.bispectrum_logamp elif value == "bicoherence": value_arr = self.bicoherence else: raise TypeError("value must be 'bispectrum'" ", 'bispectrum_logamp', or 'bicoherence'") # Keep a finite value array for binned_statistic for scipy >1.14 finite_mask = np.isfinite(value_arr) orig_thetas = thetas.copy().value # Check units if not hasattr(thetas, "unit") or not hasattr(delta_thetas, "unit"): raise TypeError("thetas must have an attached angular unit.") elif (not thetas.unit.is_equivalent(u.rad) or not delta_thetas.unit.is_equivalent(u.rad)): raise TypeError("thetas must have an attached angular unit.") else: thetas = thetas.to(u.rad).value delta_thetas = delta_thetas.to(u.rad).value # Make sure the lengths match if multiple values are given. if isinstance(thetas, np.ndarray): if not isinstance(delta_thetas, np.ndarray): delta_thetas = np.array([delta_thetas] * len(thetas)) if len(thetas) != len(delta_thetas): raise ValueError("Length of thetas and delta_thetas must " "match.") else: thetas = np.array([thetas]) orig_thetas = np.array([orig_thetas]) if not isinstance(delta_thetas, np.ndarray): delta_thetas = np.array([delta_thetas]) kky, kkx = make_radial_arrays(self.bispectrum.shape, y_center=0, x_center=0) radial_slices = dict.fromkeys(orig_thetas) if return_masks: masks = [] dist = np.sqrt(kky**2 + kkx**2) theta_arr = np.arctan2(kky, kkx) nbins = np.floor(dist.max() / bin_width).astype(int) bins = np.linspace(0, dist.max(), nbins) for theta, del_theta, theta0 in zip(thetas, delta_thetas, orig_thetas): # Create the mask of the radii to extract the profile at. mask = np.logical_and(theta_arr >= theta - del_theta / 2., theta_arr <= theta + del_theta / 2.) vals, bin_edge, cts = binned_statistic(dist[mask & finite_mask].ravel(), value_arr[mask & finite_mask].ravel(), bins=bins, statistic=np.nanmean) stds, bin_edge, cts = binned_statistic(dist[mask & finite_mask].ravel(), value_arr[mask & finite_mask].ravel(), bins=bins, statistic=np.nanstd) bin_cents = (bin_edge[1:] + bin_edge[:-1]) / 2. radial_slices[theta0] = np.array([bin_cents, vals, stds]) if return_masks: masks.append(mask) if return_masks: return radial_slices, masks return radial_slices
[docs] def plot_surface(self, save_name=None, show_bicoh=True, cmap='viridis', contour_color='k'): ''' Plot the bispectrum amplitude and (optionally) the bicoherence. Parameters ---------- save_name : str, optional Save name for the figure. Enables saving the plot. show_bicoh : bool, optional Plot the bicoherence surface. Enabled by default. cmap : {str, matplotlib color map}, optional Colormap to use in the plots. Default is viridis. contour_color : {str, RGB tuple}, optional Color of the amplitude contours. ''' import matplotlib.pyplot as plt from mpl_toolkits.axes_grid1 import make_axes_locatable if show_bicoh: ax1 = plt.subplot(1, 2, 1) else: ax1 = plt.subplot(1, 1, 1) im1 = ax1.imshow(self.bispectrum_logamp, origin="lower", interpolation="nearest", cmap=cmap) divider = make_axes_locatable(ax1) cax = divider.append_axes("right", size="5%", pad=0.05) cbar1 = plt.colorbar(im1, cax=cax) cbar1.set_label(r"log$_{10}$ Bispectrum Amplitude") ax1.contour(self.bispectrum_logamp, colors=contour_color) ax1.set_xlabel(r"$k_1$") ax1.set_ylabel(r"$k_2$") if show_bicoh: ax2 = plt.subplot(1, 2, 2) im2 = ax2.imshow(self.bicoherence, origin="lower", interpolation="nearest", cmap=cmap) divider = make_axes_locatable(ax2) cax2 = divider.append_axes("right", size="5%", pad=0.05) cbar2 = plt.colorbar(im2, cax=cax2) cbar2.set_label("Bicoherence") ax2.set_xlabel(r"$k_1$") ax2.set_ylabel(r"$k_2$") plt.tight_layout() if save_name is not None: plt.savefig(save_name) plt.close() else: plt.show()
[docs] def run(self, show_progress=True, use_pyfftw=False, threads=1, nsamples=100, seed=1000, mean_subtract=False, verbose=False, save_name=None, **pyfftw_kwargs): ''' Compute the bispectrum. Necessary to maintain package standards. Parameters ---------- show_progress : optional, bool Show progress bar while sampling the bispectrum. use_pyfftw : bool, optional Enable to use pyfftw, if it is installed. threads : int, optional Number of threads to use in FFT when using pyfftw. nsamples : int, optional See `~BiSpectrum.compute_bispectrum`. seed : int, optional See `~BiSpectrum.compute_bispectrum`. mean_subtract : bool, optional See `~BiSpectrum.compute_bispectrum`. verbose : bool, optional Enables plotting. save_name : str,optional Save the figure when a file name is given. pyfftw_kwargs : Passed to `~turbustat.statistics.rfft_to_fft.rfft_to_fft`. See `here <https://hgomersall.github.io/pyFFTW/pyfftw/interfaces/interfaces.html#interfaces-additional-args>`_ for a list of accepted kwargs. ''' self.compute_bispectrum(show_progress=show_progress, use_pyfftw=use_pyfftw, threads=threads, nsamples=nsamples, mean_subtract=mean_subtract, seed=seed, **pyfftw_kwargs) if verbose: self.plot_surface(save_name=save_name) return self
[docs] def BiSpectrum(*args, **kwargs): ''' Old name for the Bispectrum class. ''' warn("Use the new 'Bispectrum' class. 'BiSpectrum' is deprecated and will" " be removed in a future release.", Warning) return Bispectrum(*args, **kwargs)
[docs] class Bispectrum_Distance(object): ''' Calculate the distance between two images based on their bicoherence. The distance is the L2 norm between the bicoherence surfaces. Parameters ---------- data1 : %(dtypes)s or `~Bispectrum` Contains the data and header of the image. Or a `~Bispectrum` class may be given which can be pre-computed. data2 : %(dtypes)s or `~Bispectrum` Contains the data and header of the second image. Or a `~Bispectrum` class may be given which can be pre-computed. stat_kwargs : dict, optional Passed to `~Bispectrum.run`. ''' __doc__ %= {"dtypes": " or ".join(common_types + twod_types)} def __init__(self, data1, data2, stat_kwargs={}): # if fiducial_model is not None: # self.bispec1 = fiducial_model if isinstance(data1, Bispectrum): self.bispec1 = data1 needs_run = False if not hasattr(self.bispec1, '_bicoherence'): needs_run = True warn("Bispectrum given as data1 does not have a" " bicoherence surface. Re-running Bispectrum.") else: self.bispec1 = BiSpectrum(data1) needs_run = True if needs_run: self.bispec1.run(**stat_kwargs) if isinstance(data2, Bispectrum): self.bispec2 = data2 needs_run = False if not hasattr(self.bispec2, '_bicoherence'): needs_run = True warn("Bispectrum given as data2 does not have a" " bicoherence surface. Re-running Bispectrum.") else: self.bispec2 = BiSpectrum(data2) needs_run = True if needs_run: self.bispec2.run(**stat_kwargs) @property def surface_distance(self): ''' L2 distance between the bicoherence surfaces. ''' return self._surface_distance @property def mean_distance(self): ''' Absolute difference between the mean bicoherence. ''' return self._mean_distance
[docs] def distance_metric(self, verbose=False, label1=None, label2=None, save_name=None, cmap='viridis'): ''' verbose : bool, optional Enable plotting. label1 : str, optional Object or region name for data1 label2 : str, optional Object or region name for data2 save_name : str,optional Save the figure when a file name is given. cmap : str, optional Colormap to show the bicoherence surfaces. ''' if self.bispec1.bicoherence.shape == self.bispec2.bicoherence.shape: self._surface_distance = \ np.linalg.norm(self.bispec1.bicoherence.ravel() - self.bispec2.bicoherence.ravel()) else: warn("Bicoherence surface must have equal shapes for the surface" " distance metric.") self._surface_distance = np.NaN self._mean_distance = np.abs(self.bispec1.bicoherence.mean() - self.bispec2.bicoherence.mean()) if verbose: import matplotlib.pyplot as plt fig = plt.gcf() if label1 is None: label1 = "1" if label2 is None: label2 = "2" fig = plt.gcf() ax1 = fig.add_subplot(121) ax1.set_title(label1) ax1.imshow(self.bispec1.bicoherence, origin="lower", cmap=cmap, interpolation="nearest", vmax=1.0, vmin=0.0) ax1.set_xlabel(r"$k_1$") ax1.set_ylabel(r"$k_2$") ax2 = plt.subplot(122) ax2.set_title(label2) im = plt.imshow(self.bispec2.bicoherence, origin="lower", cmap=cmap, interpolation="nearest", vmax=1.0, vmin=0.0) ax2.set_xlabel(r"$k_1$") ax2.set_yticklabels([]) fig.subplots_adjust(right=0.8) cbar_ax = fig.add_axes([0.85, 0.15, 0.05, 0.7]) fig.colorbar(im, cax=cbar_ax) if save_name is not None: plt.savefig(save_name) plt.close() else: plt.show() return self
[docs] def BiSpectrum_Distance(*args, **kwargs): ''' Old name for the Bispectrum class. ''' warn("Use the new 'Bispectrum_Distance' class. 'BiSpectrum_Distance'" " is deprecated and will be removed in a future release.", Warning) return Bispectrum_Distance(*args, **kwargs)