Source code for turbustat.statistics.genus.genus

# Licensed under an MIT open source license - see LICENSE
from __future__ import print_function, absolute_import, division

import numpy as np
import scipy.ndimage as nd
from scipy.interpolate import InterpolatedUnivariateSpline
from astropy.convolution import Gaussian2DKernel, convolve_fft
from astropy.wcs import WCS
import astropy.units as u
from warnings import warn

from ..stats_utils import standardize, common_scale
from ..base_statistic import BaseStatisticMixIn
from ...io import common_types, twod_types, input_data, find_beam_properties


[docs] class Genus(BaseStatisticMixIn): """ Genus Statistics based off of Chepurnov et al. (2008). Parameters ---------- img : %(dtypes)s 2D image. min_value : `~astropy.units.Quantity` or float, optional Minimum value in the data to consider. If None, the minimum is used. When `img` has an attached brightness unit, `min_value` must have the same units. max_value : `~astropy.units.Quantity` or float, optional Maximum value in the data to consider. If None, the maximum is used. When `img` has an attached brightness unit, `min_value` must have the same units. lowdens_percent : float, optional Lower percentile of the data to use. Defaults to the minimum value. Overrides `min_value` when the value of this percentile is greater than `min_value`. highdens_percent : float, optional Upper percentile of the data to use. Defaults to the maximum value. Overrides `max_value` when the value of this percentile is lower than `max_value`. numpts : int, optional Number of thresholds to calculate statistic at. thresholds : numpy.ndarray or `~astropy.units.Quantity`, optional Pass a custom set of thresholds to compute the Genus statistic at. Note that this overrides the `min/max_value`, `low/highdens_percent`, and `numpts` keywords. enable_smoothing : bool, optional When `True`, uses the set of `smoothing_radii` to convolve the input data with 2D Gaussian kernels. When `False`, no smoothing is applied to the data. smoothing_radii : np.ndarray or `astropy.units.Quantity`, optional Kernel radii to smooth data to. If units are not attached, the radii are assumed to be in pixels. If no radii are given, 5 smoothing radii will be used ranging from 1 pixel to one-tenth the smallest dimension size. distance : `~astropy.units.Quantity`, optional Physical distance to the region in the data. Examples -------- >>> from turbustat.statistics import Genus >>> from astropy.io import fits >>> import astropy.units as u >>> import numpy as np >>> moment0 = fits.open("Design4_21_0_0_flatrho_0021_13co.moment0.fits")[0] # doctest: +SKIP >>> genus = Genus(moment0, lowdens_percent=15, highdens_percent=85) # doctest: +SKIP >>> genus.run() # doctest: +SKIP """ __doc__ %= {"dtypes": " or ".join(common_types + twod_types)} def __init__(self, img, min_value=None, max_value=None, lowdens_percent=0, highdens_percent=100, numpts=100, thresholds=None, enable_smoothing=True, smoothing_radii=None, distance=None): super(Genus, self).__init__() if isinstance(img, np.ndarray): self.need_header_flag = False self.data = input_data(img, no_header=True) self.header = None else: self.need_header_flag = True self.data, self.header = input_data(img, no_header=False) if distance is not None: self.distance = distance if thresholds is not None: if hasattr(self.data, 'unit'): if not hasattr(thresholds, 'unit'): raise TypeError("data has units of {}. 'thresholds' must " "have equivalent units." .format(self.data.unit)) if not thresholds.unit.is_equivalent(self.data.unit): raise u.UnitsError("thresholds does not have an equivalent " "units to the img unit.") thresholds_match = thresholds.to(self.data.unit) else: thresholds_match = thresholds self._thresholds = thresholds_match else: if min_value is None: if lowdens_percent == 0.0: min_value = np.nanmin(self.data) else: min_value = np.nanpercentile(self.data, lowdens_percent) if not hasattr(min_value, 'unit') and hasattr(self.data, 'unit'): min_value *= self.data.unit if hasattr(self.data, 'unit'): if not hasattr(min_value, 'unit'): raise TypeError("data has units of {}. 'min_value' must " "have equivalent units." .format(self.data.unit)) if not min_value.unit.is_equivalent(self.data.unit): raise u.UnitsError("min_value does not have an equivalent " "units to the img unit.") min_value = min_value.to(self.data.unit) if max_value is None: if highdens_percent == 100.0: max_value = np.nanmax(self.data) else: max_value = np.nanpercentile(self.data, highdens_percent) if not hasattr(max_value, 'unit') and hasattr(self.data, 'unit'): max_value *= self.data.unit if hasattr(max_value, 'unit') and hasattr(self.data, 'unit'): if not hasattr(max_value, 'unit'): raise TypeError("data has units of {}. 'max_value' must " "have equivalent units." .format(self.data.unit)) if not max_value.unit.is_equivalent(self.data.unit): raise u.UnitsError("max_value does not have an equivalent " "units to the img unit.") max_value = max_value.to(self.data.unit) self._thresholds = np.linspace(min_value, max_value, numpts) self._enable_smoothing = enable_smoothing if enable_smoothing: if smoothing_radii is None: self.smoothing_radii = np.array([1.0]) else: if isinstance(smoothing_radii, u.Quantity): self.smoothing_radii = self._to_pixel(smoothing_radii).value else: self.smoothing_radii = smoothing_radii else: self.smoothing_radii = [] @property def thresholds(self): ''' Values of the data to compute the Genus statistics at. ''' return self._thresholds @property def smoothing_radii(self): ''' Pixel radii used to smooth the data. ''' return self._smoothing_radii @smoothing_radii.setter def smoothing_radii(self, values): # Allow passing an empty list/array when no smoothing is chosen. if self._enable_smoothing: if np.any(values < 1.0): raise ValueError("All smoothing radii must be larger than one" " pixel.") if np.any(values > 0.5 * min(self.data.shape)): raise ValueError("All smoothing radii must be smaller than half of" " the image shape.") self._smoothing_radii = values else: self._smoothing_radii = [None] @property def smoothed_images(self): ''' List of smoothed versions of the image, using the radii in `~Genus.smoothing_radii`. ''' if not hasattr(self, '_smoothed_images'): raise ValueError("Set `keep_smoothed_images=True` in " "Genus.make_genus_curve") return self._smoothed_images @property def smoothed_means(self): ''' Means from the smoothed images. Needed to convert the thresholds into standardized values. ''' return self._smoothed_means @property def smoothed_stds(self): ''' Standard deviations from the smoothed images. Needed to convert the thresholds into standardized values. ''' return self._smoothed_stds
[docs] def make_genus_curve(self, enable_small_removal=True, use_beam=False, min_size=4, connectivity=2, keep_smoothed_images=False, match_kernel=False, **convolution_kwargs): ''' Smooth the data with a Gaussian kernel to create the genus curve from at the specified thresholds. Parameters ---------- enable_small_removal : bool, optional Make use of the small region removal. Default is `True`. If `False`, the `use_beam` and `min_size` keywords are not used. use_beam : bool, optional When enabled, will use the given `beam_fwhm` or try to load it from the header. When disabled, the minimum size is set by `min_size`. min_size : int or `~astropy.units.Quantity`, optional Directly specify the minimum area a region must have to be counted. Integer values with no units are assumed to be in pixels. connectivity : {1, 2}, optional Connectivity used when removing regions below min_size. keep_smoothed_images : bool, optional Keep the convolved images in the `~Genus.smoothed_images` list. Default is `False`. match_kernel : bool, optional Match kernel shape to the data shape when convolving. Default is `False`. Enable to reproduce behaviour of `~Genus` prior to version 1.0 of TurbuStat. convolution_kwargs: Passed to `~astropy.convolve.convolve_fft`. ''' if keep_smoothed_images: self._smoothed_images = [] if use_beam: major, minor = find_beam_properties(self.header)[:2] major = self._to_pixel(major) minor = self._to_pixel(minor) # the area of a Gaussian beam is 2 pi sigma^2, and major/minor are FWHMs pix_area = 2 * np.pi * major * minor / np.sqrt(8*np.log(2)) min_size = int(np.floor(pix_area.value)) else: if isinstance(min_size, u.Quantity): # Convert to pixel area min_size = self._to_pixel_area(min_size) min_size = int(np.floor(min_size.value)) else: min_size = int(min_size) self._genus_stats = np.empty((len(self.smoothing_radii), len(self.thresholds))) self._smoothed_means = np.empty(len(self.smoothing_radii)) self._smoothed_stds = np.empty(len(self.smoothing_radii)) if hasattr(self.data, 'unit'): self._smoothed_means *= self.data.unit self._smoothed_stds *= self.data.unit conn_kernel = nd.generate_binary_structure(2, connectivity) for j, width in enumerate(self.smoothing_radii): # Skip smoothing when none is given if width is None: smooth_img = self.data else: if match_kernel: kernel = Gaussian2DKernel(width, x_size=self.data.shape[0], y_size=self.data.shape[1]) else: kernel = Gaussian2DKernel(width) smooth_img = convolve_fft(self.data, kernel, **convolution_kwargs) # Append the mean/std from the smoothed image: self._smoothed_means[j] = np.nanmean(smooth_img) self._smoothed_stds[j] = np.nanstd(smooth_img) if keep_smoothed_images: self._smoothed_images.append(smooth_img) for i, thresh in enumerate(self.thresholds): high_density = smooth_img > thresh low_density = smooth_img < thresh if enable_small_removal: high_density = remove_small_objects(high_density, min_size=min_size, connectivity=connectivity) low_density = remove_small_objects(low_density, min_size=min_size, connectivity=connectivity) # eight-connectivity to count the regions high_density_labels, high_density_num = nd.label(high_density, conn_kernel) low_density_labels, low_density_num = nd.label(low_density, conn_kernel) self._genus_stats[j, i] = high_density_num - low_density_num
@property def genus_stats(self): ''' Array of genus statistic values for all smoothed images (0th axis) and all threshold values (1st axis). ''' return self._genus_stats
[docs] def make_genus_stats_table(self): ''' Returns an astropy table with genus curve and thresholds with 1 table per smoothed image. ''' from astropy.table import Table, Column genus_tables = [] for ii, radius in enumerate(self.smoothing_radii): tab = Table() tab.add_column(Column(self.thresholds, name='thresholds')) tab.add_column(Column(self.genus_stats[ii], name='genus_value')) mean_val = self.smoothed_means[ii] if hasattr(mean_val, 'unit'): mean_vals = ([mean_val.value] * len(self.thresholds)) * mean_val.unit else: mean_vals = np.array([mean_val] * len(self.thresholds)) tab.add_column(Column(mean_vals, name='data_mean')) std_val = self.smoothed_stds[ii] if hasattr(std_val, 'unit'): std_vals = ([std_val.value] * len(self.thresholds)) * std_val.unit else: std_vals = np.array([std_val] * len(self.thresholds)) tab.add_column(Column(std_vals, name='data_std')) genus_tables.append(tab) if hasattr(radius, 'unit'): radius_vals = ([radius.value] * len(self.thresholds)) * radius.unit else: radius_vals = np.array([radius] * len(self.thresholds)) tab.add_column(Column(radius_vals, name='smooth_radius')) genus_tables.append(tab) return genus_tables
[docs] def plot_fit(self, save_name=None, color='r', symbol='o', standardize=False): ''' Plot the Genus curves. Parameters ---------- save_name : str,optional Save the figure when a file name is given. color : {str, RGB tuple}, optional Color to show the Genus curves in. ''' import matplotlib.pyplot as plt num = len(self.smoothing_radii) num_cols = num // 2 if num % 2 == 0 else (num // 2) + 1 for i in range(1, num + 1): if num == 1: ax = plt.subplot(111) else: ax = plt.subplot(num_cols, 2, i) if self.smoothing_radii[i-1] is not None: textstring = "Smooth Size: {0:.2f}".format(self.smoothing_radii[i-1]) else: textstring = "No smoothing" ax.text(0.3, 0.1, textstring, transform=ax.transAxes, fontsize=12) if standardize: thresholds = (self.thresholds - self.smoothed_means[i-1]) / self.smoothed_stds[i-1] else: thresholds = self.thresholds plt.plot(thresholds, self.genus_stats[i - 1], "{}-".format(symbol), color=color) plt.grid(True) if (num - i + 1) <= 2: if standardize: plt.xlabel("Standardized Intensity") else: plt.xlabel("Intensity") else: plt.setp(ax.get_xticklabels(), visible=False) plt.tight_layout() if save_name is not None: plt.savefig(save_name) plt.close() else: plt.show()
[docs] def run(self, verbose=False, save_name=None, color='r', symbol='o', standardize_intensity=False, **kwargs): ''' Run the whole statistic. Parameters ---------- verbose : bool, optional Enables plotting. save_name : str,optional Save the figure when a file name is given. Must have `verbose` enabled for plotting. kwargs : See `~Genus.make_genus_curve`. ''' self.make_genus_curve(**kwargs) if verbose: self.plot_fit(save_name=save_name, color=color, symbol=symbol, standardize=standardize_intensity) return self
[docs] class Genus_Distance(object): """ Distance Metric for the Genus Statistic. .. note:: Since the data need to be normalized for the distance metrics, there is no option to pass a pre-compute `~Genus` statistic. Parameters ---------- img1 : %(dtypes)s 2D image. img2 : %(dtypes)s 2D image. smoothing_radii : list, optional Kernel radii to smooth data to. See `~Genus`. numpts : int, optional Number of thresholds to calculate statistic at. See `~Genus`. min_value : `~astropy.units.Quantity` or float or list, optional Minimum value to use for Genus statistic. When a two-element list is given, the first item is used for `img1` and the second for `img2`. See `~Genus`. max_value : `~astropy.units.Quantity` or float, optional Maximum value to use for Genus statistic. When a two-element list is given, the first item is used for `img1` and the second for `img2`. See `~Genus`. lowdens_percent : float, optional Lowest percentile of the data to use for Genus statistic. When a two-element list is given, the first item is used for `img1` and the second for `img2`. See `~Genus`. highdens_percent : float, optional Highest percentile of the data to use for Genus statistic. When a two-element list is given, the first item is used for `img1` and the second for `img2`. See `~Genus`. genus_kwargs : dict, optional Dictionary passed to `~Genus.run`. genus2_kwargs : None or dict, optional Dictionary passed to `~Genus.run` for `img2`. When `None` is given, settings from `genus_kwargs` are used for `img2`. """ __doc__ %= {"dtypes": " or ".join(common_types + twod_types)} def __init__(self, img1, img2, smoothing_radii=None, numpts=100, min_value=None, max_value=None, lowdens_percent=0, highdens_percent=100, genus_kwargs={}, genus2_kwargs=None): # Check if list for inputs, where first is for img1 and second is # for img2 if not isinstance(min_value, list): min_value = [min_value] * 2 if not isinstance(max_value, list): max_value = [max_value] * 2 if not isinstance(lowdens_percent, list): lowdens_percent = [lowdens_percent] * 2 if not isinstance(highdens_percent, list): highdens_percent = [highdens_percent] * 2 if genus2_kwargs is None: genus2_kwargs = genus_kwargs # Standardize the intensity values in the images img1, hdr1 = input_data(img1) img2, hdr2 = input_data(img2) img1 = standardize(img1) img2 = standardize(img2) self.genus1 = Genus(img1, smoothing_radii=smoothing_radii, min_value=min_value[0], max_value=max_value[0], lowdens_percent=lowdens_percent[0], highdens_percent=highdens_percent[0]) self.genus1.run(**genus_kwargs) self.genus2 = Genus(img2, smoothing_radii=smoothing_radii, min_value=min_value[1], max_value=max_value[1], lowdens_percent=lowdens_percent[1], highdens_percent=highdens_percent[1]) self.genus2.run(**genus2_kwargs) # When normalizing the genus curves for the distance metric, find # the scaling between the angular size of the grids. self.scale = common_scale(WCS(hdr1), WCS(hdr2))
[docs] def distance_metric(self, verbose=False, label1=None, label2=None, save_name=None, color1='b', color2='g', marker1='D', marker2='o'): ''' Data is centered and normalized (via normalize). The distance is the difference between cubic splines of the curves. All values are normalized by the area of the image they were calculated from. Parameters ---------- verbose : bool, optional Enables plotting. label1 : str, optional Object or region name for img1 label2 : str, optional Object or region name for img2 save_name : str,optional Save the figure when a file name is given. ''' # 2 times the average number between the two num_pts = \ int((len(self.genus1.thresholds) + len(self.genus2.thresholds)) / 2) # Get the min and the max of the thresholds min_pt = max(np.min(self.genus1.thresholds), np.min(self.genus2.thresholds)) max_pt = min(np.max(self.genus1.thresholds), np.max(self.genus2.thresholds)) points = np.linspace(min_pt, max_pt, 2 * num_pts) # Divide each by the area of the data. genus1 is additionally # adjusted by the scale factor of the angular size between the # datasets. genus1 = self.genus1.genus_stats[0, :] / \ float(self.genus1.data.size / self.scale) genus2 = self.genus2.genus_stats[0, :] / float(self.genus2.data.size) interp1 = \ InterpolatedUnivariateSpline(self.genus1.thresholds, genus1, k=3) interp2 = \ InterpolatedUnivariateSpline(self.genus2.thresholds, genus2, k=3) self.distance = np.linalg.norm(interp1(points) - interp2(points)) if verbose: import matplotlib.pyplot as plt plt.plot(self.genus1.thresholds, genus1, color=color1, marker=marker1, label=label1) plt.plot(self.genus2.thresholds, genus2, color=color2, marker=marker2, label=label2) plt.plot(points, interp1(points), color1) plt.plot(points, interp2(points), color2) plt.xlabel("z-score") plt.ylabel("Genus Score") plt.grid(True) plt.legend(loc="best") if save_name is not None: plt.savefig(save_name) plt.close() else: plt.show() return self
[docs] def GenusDistance(*args, **kwargs): ''' Old name for the Genus_Distance class. ''' warn("Use the new 'Genus_Distance' class. 'GenusDistance' is deprecated and will" " be removed in a future release.", Warning) return Genus_Distance(*args, **kwargs)
def remove_small_objects(arr, min_size, connectivity=2): ''' Remove objects less than the given size. Function is based on skimage.morphology.remove_small_objects Parameters ---------- arr : numpy.ndarray Binary array containing the mask. min_size : int Smallest allowed size. connectivity : int, optional Connectivity of the neighborhood. ''' struct = nd.generate_binary_structure(arr.ndim, connectivity) labels, num = nd.label(arr, struct) sizes = nd.sum(arr, labels, range(1, num + 1)) for i, size in enumerate(sizes): if size >= min_size: continue posns = np.where(labels == i + 1) arr[posns] = 0 return arr def model_gaussian_genus(x, A, theta_C): ''' Analytic Genus model for a Gaussian field using the formalism from Coles (1988) (https://ui.adsabs.harvard.edu/abs/1988MNRAS.234..509C/abstract). .. note:: The A and theta_C parameters will be degenerate as they set the normalization of the Genus curve, not its shape. Parameters ---------- x : numpy.ndarray or float Standardized value ((val - mean) / std). A : float A is the area of the total field. For a 2D image with uncorrelated noise, this is equal to the number of pixels. With correlated values, this will be closer to the number of independent regions. theta_C : float theta_C is the 'coherence angle' and will be close to the width of a 2D Gaussian matching the spatial correlation scale (i.e., the beam or PSF width). ''' return (A / theta_C**2) * x * np.exp(-x**2 / 2.) / (2 * np.pi)**1.5