Source code for turbustat.statistics.delta_variance.delta_variance

# Licensed under an MIT open source license - see LICENSE


'''

Implementation of the Delta-Variance Method from Stutzki et al. 1998.

'''

import numpy as np
from scipy.stats import nanmean
from astropy.convolution import convolve_fft
from astropy import units as u


[docs]class DeltaVariance(object): """ The delta-variance technique as described in Ossenkopf et al. (2008). Parameters ---------- img : numpy.ndarray The image calculate the delta-variance of. header : FITS header Image header. weights : numpy.ndarray Weights to be used. diam_ratio : float, optional The ratio between the kernel sizes. lags : numpy.ndarray or list, optional The pixel scales to compute the delta-variance at. """ def __init__(self, img, header, weights=None, diam_ratio=1.5, lags=None): super(DeltaVariance, self).__init__() self.img = img self.header = header self.diam_ratio = diam_ratio if weights is None: self.weights = np.ones(img.shape) else: self.weights = weights self.nanflag = False if np.isnan(self.img).any() or np.isnan(self.weights).any(): self.nanflag = True if lags is None: try: self.ang_size = self.header["CDELT2"] * u.deg min_size = (0.1 * u.arcmin) / self.ang_size.to(u.arcmin) # Can't be smaller than one pixel, set to 3 to # avoid noisy pixels. Can't be bigger than half the image #(deals with issue from sim cubes). if min_size < 3.0 or min_size > min(self.img.shape) / 2.: min_size = 3.0 except ValueError: print "No CDELT2 in header. Using pixel scales." self.ang_size = 1.0 * u.astrophys.pixel min_size = 3.0 self.lags = np.logspace(np.log10(min_size), np.log10(min(self.img.shape) / 2.), 25) else: self.lags = lags self.convolved_arrays = [] self.convolved_weights = [] self.delta_var = np.empty((len(self.lags, ))) self.delta_var_error = np.empty((2, len(self.lags)))
[docs] def do_convolutions(self): for i, lag in enumerate(self.lags): core = core_kernel(lag, self.img.shape[0], self.img.shape[1]) annulus = annulus_kernel( lag, self.diam_ratio, self.img.shape[0], self.img.shape[1]) # Extend to avoid boundary effects from non-periodicity pad_weights = np.pad(self.weights, int(lag), padwithzeros) pad_img = np.pad(self.img, int(lag), padwithzeros) * pad_weights interpolate_nan = False if self.nanflag: interpolate_nan = True img_core = convolve_fft( pad_img, core, normalize_kernel=True, interpolate_nan=interpolate_nan, ignore_edge_zeros=True) img_annulus = convolve_fft( pad_img, annulus, normalize_kernel=True, interpolate_nan=interpolate_nan, ignore_edge_zeros=True) weights_core = convolve_fft( pad_weights, core, normalize_kernel=True, interpolate_nan=interpolate_nan, ignore_edge_zeros=True) weights_annulus = convolve_fft( pad_weights, annulus, normalize_kernel=True, interpolate_nan=interpolate_nan, ignore_edge_zeros=True) weights_core[np.where(weights_core == 0)] = np.NaN weights_annulus[np.where(weights_annulus == 0)] = np.NaN self.convolved_arrays.append( (img_core / weights_core) - (img_annulus / weights_annulus)) self.convolved_weights.append(weights_core * weights_annulus) return self
[docs] def compute_deltavar(self, bootstrap=False, nsamples=100, alpha=0.05): for i, (convolved_array, convolved_weight) in \ enumerate(zip(self.convolved_arrays, self.convolved_weights)): delta_var_val = _delvar(convolved_array, convolved_weight) # bootstrap to find an error if bootstrap: bootstrap_delvar = np.empty((nsamples, )) for n in range(nsamples): resample = bootstrap_resample(convolved_array) bootstrap_delvar[n] = _delvar(resample, convolved_weight) stat = np.sort(bootstrap_delvar) error = (stat[int((alpha/2.0)*nsamples)], stat[int((1-alpha/2.0)*nsamples)]) else: error = (0.0, 0.0) self.delta_var[i] = delta_var_val self.delta_var_error[:, i] = error return self
[docs] def run(self, bootstrap=False, verbose=False, ang_units=True): self.do_convolutions() self.compute_deltavar(bootstrap=bootstrap) if ang_units: self.lags *= self.ang_size if verbose: import matplotlib.pyplot as p ax = p.subplot(111) ax.set_xscale("log", nonposx="clip") ax.set_yscale("log", nonposx="clip") error_bar = [self.delta_var_error[0, :], self.delta_var_error[1, :]] p.errorbar(self.lags, self.delta_var, yerr=error_bar, fmt="bD-") ax.grid(True) ax.set_xlabel("Lag (arcmin)") ax.set_ylabel(r"$\sigma^{2}_{\Delta}$") p.show() return self
def core_kernel(lag, x_size, y_size): ''' Core Kernel for convolution. Parameters ---------- lag : int Size of the lag. Set the kernel size. x_size : int Grid size to use in the x direction y_size_size : int Grid size to use in the y_size direction Returns ------- kernel : numpy.ndarray Normalized kernel. ''' x, y = np.meshgrid(np.arange(-x_size / 2, x_size / 2 + 1, 1), np.arange(-y_size / 2, y_size / 2 + 1, 1)) kernel = ((4 / np.pi * lag)) * \ np.exp(-(x ** 2. + y ** 2.) / (lag / 2.) ** 2.) return kernel / np.sum(kernel) def annulus_kernel(lag, diam_ratio, x_size, y_size): ''' Annulus Kernel for convolution. Parameters ---------- lag : int Size of the lag. Set the kernel size. diam_ratio : float Ratio between kernel diameters. x_size : int Grid size to use in the x direction y_size_size : int Grid size to use in the y_size direction Returns ------- kernel : numpy.ndarray Normalized kernel. ''' x, y = np.meshgrid(np.arange(-x_size / 2, x_size / 2 + 1, 1), np.arange(-y_size / 2, y_size / 2 + 1, 1)) inner = np.exp(-(x ** 2. + y ** 2.) / (lag / 2.) ** 2.) outer = np.exp(-(x ** 2. + y ** 2.) / (diam_ratio * lag / 2.) ** 2.) kernel = 4 / (np.pi * lag * (diam_ratio ** 2. - 1)) * (outer - inner) return kernel / np.sum(kernel) def padwithzeros(vector, pad_width, iaxis, kwargs): ''' Pad array with zeros. ''' vector[:pad_width[0]] = 0 vector[-pad_width[1]:] = 0 return vector
[docs]class DeltaVariance_Distance(object): """ Compares 2 datasets using delta-variance. The distance between them is given by the Euclidean distance between the curves weighted by the bootstrapped errors. Parameters ---------- dataset1 : FITS hdu Contains the data and header for one dataset. dataset2 : FITS hdu See above. weights1 : numpy.ndarray Weights for dataset1. weights2 : numpy.ndarray See above. diam_ratio : float, optional The ratio between the kernel sizes. lags : numpy.ndarray or list, optional The pixel scales to compute the delta-variance at. fiducial_model : DeltaVariance A computed DeltaVariance model. Used to avoid recomputing. """ def __init__(self, dataset1, dataset2, weights1=None, weights2=None, diam_ratio=1.5, lags=None, fiducial_model=None): super(DeltaVariance_Distance, self).__init__() if fiducial_model is not None: self.delvar1 = fiducial_model else: self.delvar1 = DeltaVariance(dataset1[0], dataset1[1], weights=weights1, diam_ratio=diam_ratio, lags=lags) self.delvar1.run() self.delvar2 = DeltaVariance(dataset2[0], dataset2[1], weights=weights2, diam_ratio=diam_ratio, lags=lags) self.delvar2.run() self.distance = None
[docs] def distance_metric(self, verbose=False): ''' Applies the Euclidean distance to the delta-variance curves. Parameters ---------- verbose : bool, optional Enables plotting. ''' errors1 = np.abs(self.delvar1.delta_var_error[1, :] - self.delvar1.delta_var_error[0, :]) errors2 = np.abs(self.delvar2.delta_var_error[1, :] - self.delvar2.delta_var_error[0, :]) # Check for NaNs and negatives nans1 = np.logical_or(np.isnan(self.delvar1.delta_var), self.delvar1.delta_var <= 0.0) nans2 = np.logical_or(np.isnan(self.delvar2.delta_var), self.delvar2.delta_var <= 0.0) if nans1.any() or nans2.any(): all_nans = np.logical_or(nans1, nans2) deltavar1 = np.log10(self.delvar1.delta_var)[~all_nans] deltavar2 = np.log10(self.delvar2.delta_var)[~all_nans] else: deltavar1 = np.log10(self.delvar1.delta_var) deltavar2 = np.log10(self.delvar2.delta_var) self.distance = np.linalg.norm(deltavar1 - deltavar2) if verbose: import matplotlib.pyplot as p ax = p.subplot(111) ax.set_xscale("log", nonposx="clip") ax.set_yscale("log", nonposx="clip") error_bar1 = [self.delvar1.delta_var_error[0, :], self.delvar1.delta_var_error[1, :]] p.errorbar(self.delvar1.lags, self.delvar1.delta_var, yerr=error_bar1, fmt="bD-", label="Delta Var 1") error_bar = [self.delvar2.delta_var_error[0, :], self.delvar2.delta_var_error[1, :]] p.errorbar(self.delvar2.lags, self.delvar2.delta_var, yerr=error_bar2, fmt="gD-", label="Delta Var 2") ax.legend(loc='best') ax.grid(True) ax.set_xlabel("Lag (arcmin)") ax.set_ylabel(r"$\sigma^{2}_{\Delta}$") p.show() return self
def _delvar(array, weight): ''' Computes the delta variance of the given array. ''' avg_value = nanmean(array, axis=None) val = np.nansum((array - avg_value) ** 2. * weight) /\ np.nansum(weight) return val def bootstrap_resample(X, n=None): """ Code based on: http://nbviewer.ipython.org/gist/aflaxman/6871948 Bootstrap resample an array_like Parameters ---------- X : array_like data to resample n : int, optional length of resampled array, equal to len(X) if n==None Returns ------- returns X_resamples """ if n is None: n = len(X) resample_i = np.floor(np.random.rand(n)*len(X)).astype(int) X_resample = X[resample_i] return X_resample