# Licensed under an MIT open source license - see LICENSE
from __future__ import print_function, absolute_import, division
import numpy as np
from numpy.fft import fftshift
import astropy.units as u
from warnings import warn
import sys
from copy import deepcopy
if sys.version_info[0] >= 3:
import _pickle as pickle
else:
import cPickle as pickle
from ..base_pspec2 import StatisticBase_PSpec2D
from ..base_statistic import BaseStatisticMixIn
from ...io import input_data, common_types, twod_types
from ..fitting_utils import check_fit_limits
from ..rfft_to_fft import rfft_to_fft
[docs]
class MVC(BaseStatisticMixIn, StatisticBase_PSpec2D):
"""
Implementation of Modified Velocity Centroids (Lazarian & Esquivel, 03)
Parameters
----------
centroid : %(dtypes)s
Normalized first moment array.
moment0 : %(dtypes)s
Moment 0 array.
linewidth : %(dtypes)s
Normalized second moment array
header : FITS header
Header of any of the arrays. Used only to get the
spatial scale.
distance : `~astropy.units.Quantity`, optional
Physical distance to the region in the data.
"""
__doc__ %= {"dtypes": " or ".join(common_types + twod_types)}
def __init__(self, centroid, moment0, linewidth, header=None,
distance=None, beam=None):
# data property not used here
# self.no_data_flag = True
# self.data = None
if header is None:
try:
self._centroid, self.header = input_data(centroid,
no_header=False)
except TypeError:
warn("Could not load header from centroid. No header has been"
" specified.")
self._centroid = input_data(centroid, no_header=True)
else:
self._centroid = input_data(centroid, no_header=True)
self.header = header
self.data = self.centroid
if distance is not None:
self.distance = distance
self._moment0 = input_data(moment0, no_header=True)
self._linewidth = input_data(linewidth, no_header=True)
shape_check1 = self.centroid.shape == self.moment0.shape
shape_check2 = self.centroid.shape == self.linewidth.shape
if not shape_check1 or not shape_check2:
raise IndexError("The centroid, moment0, and linewidth arrays must"
"have the same shape.")
self.shape = self.centroid.shape
# Get rid of nans.
isnan = np.logical_and(np.isnan(self.centroid),
np.isnan(self.moment0))
isnan = np.logical_and(isnan,
np.isnan(self.linewidth))
if isnan.any():
# Need to make copies to avoid making changes to original data
self._centroid = self._centroid.copy()
self._centroid[isnan] = 0.
self._moment0 = self._moment0.copy()
self._moment0[isnan] = 0.
self._linewidth = self._linewidth.copy()
self._linewidth[isnan] = 0.
self.load_beam(beam=beam)
@property
def centroid(self):
'''
Normalized centroid array.
'''
return self._centroid
@property
def moment0(self):
'''
Zeroth moment (integrated intensity) array.
'''
return self._moment0
@property
def linewidth(self):
'''
Linewidth array. Square root of the velocity dispersion.
'''
return self._linewidth
[docs]
def compute_pspec(self, beam_correct=False,
apodize_kernel=None, alpha=0.3, beta=0.0,
use_pyfftw=False, threads=1, **pyfftw_kwargs):
'''
Compute the 2D power spectrum.
The quantity calculated here is the same as Equation 3 in Lazarian &
Esquivel (2003), but the inputted arrays are not in the same form as
described. We can, however, adjust for the use of normalized Centroids
and the linewidth.
An unnormalized centroid can be constructed by multiplying the centroid
array by the moment0. Velocity dispersion is the square of the
linewidth subtracted by the square of the normalized centroid.
Parameters
----------
beam_correct : bool, optional
If a beam object was given, divide the 2D FFT by the beam response.
apodize_kernel : None or 'splitcosinebell', 'hanning', 'tukey', 'cosinebell', 'tophat'
If None, no apodization kernel is applied. Otherwise, the type of
apodizing kernel is given.
alpha : float, optional
alpha shape parameter of the apodization kernel. See
`~turbustat.apodizing_kernel` for more information.
beta : float, optional
beta shape parameter of the apodization kernel. See
`~turbustat.apodizing_kernel` for more information.
use_pyfftw : bool, optional
Enable to use pyfftw, if it is installed.
threads : int, optional
Number of threads to use in FFT when using pyfftw.
pyfftw_kwargs : Passed to
`~turbustat.statistics.rfft_to_fft.rfft_to_fft`. See
`here <http://hgomersall.github.io/pyFFTW/pyfftw/builders/builders.html>`__
for a list of accepted kwargs.
'''
if apodize_kernel is not None:
apod_kernel = self.apodizing_kernel(kernel_type=apodize_kernel,
alpha=alpha,
beta=beta)
term1_data = self.centroid * self.moment0 * apod_kernel
term2_data = self.linewidth**2 + self.centroid**2 * apod_kernel
mom0_data = self.moment0 * apod_kernel
else:
term1_data = self.centroid * self.moment0
term2_data = self.linewidth**2 + self.centroid**2
mom0_data = self.moment0
if pyfftw_kwargs.get('threads') is not None:
pyfftw_kwargs.pop('threads')
term1 = rfft_to_fft(term1_data,
use_pyfftw=use_pyfftw,
threads=threads,
**pyfftw_kwargs)
fft_mom0 = rfft_to_fft(mom0_data,
use_pyfftw=use_pyfftw,
threads=threads,
**pyfftw_kwargs)
# Account for normalization in the line width.
term2 = np.nanmean(term2_data)
mvc_fft = term1 - term2 * fft_mom0
# Shift to the center
mvc_fft = fftshift(mvc_fft)
self._ps2D = np.abs(mvc_fft) ** 2.
if beam_correct:
self.compute_beam_pspec()
if beam_correct:
self._ps2D /= self._beam_pow
[docs]
def save_results(self, output_name, keep_data=False):
'''
Save the results of the SCF to avoid re-computing.
The pickled file will not include the data cube by default.
Parameters
----------
output_name : str
Name of the outputted pickle file.
keep_data : bool, optional
Save the data cube in the pickle file when enabled.
'''
if not output_name.endswith(".pkl"):
output_name += ".pkl"
self_copy = deepcopy(self)
# Don't keep the whole cube unless keep_data enabled.
if not keep_data:
self_copy._centroid = None
self_copy._moment0 = None
self_copy._linewidth = None
with open(output_name, 'wb') as output:
pickle.dump(self_copy, output, -1)
[docs]
def run(self, verbose=False, beam_correct=False,
apodize_kernel=None, alpha=0.2, beta=0.0,
use_pyfftw=False, threads=1, pyfftw_kwargs={},
radial_pspec_kwargs={},
low_cut=None, high_cut=None,
fit_kwargs={}, fit_unbinned=False,
fit_2D=True, fit_2D_kwargs={},
save_name=None, xunit=u.pix**-1, use_wavenumber=False):
'''
Full computation of MVC. For fitting parameters and radial binning
options, see `~turbustat.statistics.base_pspec2.StatisticBase_PSpec2D`.
Parameters
----------
verbose : bool, optional
Enables plotting.
use_pyfftw : bool, optional
Enable to use pyfftw, if it is installed.
threads : int, optional
Number of threads to use in FFT when using pyfftw.
pyfft_kwargs : Passed to
`~turbustat.statistics.rfft_to_fft.rfft_to_fft`. See
`here <https://hgomersall.github.io/pyFFTW/pyfftw/interfaces/interfaces.html#interfaces-additional-args>`_
for a list of accepted kwargs.
radial_pspec_kwargs : dict, optional
Passed to `~PowerSpectrum.compute_radial_pspec`.
low_cut : `~astropy.units.Quantity`, optional
Low frequency cut off in frequencies used in the fitting.
high_cut : `~astropy.units.Quantity`, optional
High frequency cut off in frequencies used in the fitting.
fit_kwargs : dict, optional
Passed to `~PowerSpectrum.fit_pspec`.
fit_unbinned : bool, optional
Passed to `~PowerSpectrum.fit_pspec`. Default is False.
fit_2D : bool, optional
Fit an elliptical power-law model to the 2D power spectrum.
fit_2D_kwargs : dict, optional
Keyword arguments for `~MVC.fit_2Dpspec`. Use the
`low_cut` and `high_cut` keywords to provide fit limits.
save_name : str,optional
Save the figure when a file name is given.
xunit : u.Unit, optional
Choose the unit to convert the x-axis in the plot to.
use_wavenumber : bool, optional
Plot the x-axis as the wavenumber rather than spatial frequency.
fit_kwargs : Passed to `~MVC.fit_pspec`.
'''
# Remove threads if in dict
if pyfftw_kwargs.get('threads') is not None:
pyfftw_kwargs.pop('threads')
self.compute_pspec(apodize_kernel=apodize_kernel,
alpha=alpha, beta=beta,
beam_correct=beam_correct,
use_pyfftw=use_pyfftw, threads=threads,
**pyfftw_kwargs)
self.compute_radial_pspec(**radial_pspec_kwargs)
self.fit_pspec(low_cut=low_cut, high_cut=high_cut,
fit_unbinned=fit_unbinned,
**fit_kwargs)
if fit_2D:
self.fit_2Dpspec(low_cut=low_cut, high_cut=high_cut,
**fit_2D_kwargs)
if verbose:
print(self.fit.summary())
if self._bootstrap_flag:
print("Bootstrapping used to find stderrs! "
"Errors may not equal those shown above.")
self.plot_fit(show_2D=True,
xunit=xunit, save_name=save_name,
use_wavenumber=use_wavenumber)
if save_name is not None:
import matplotlib.pyplot as p
p.close()
return self
[docs]
class MVC_Distance(object):
"""
Distance metric for MVC.
Parameters
----------
data1 : dict or `~MVC`
A dictionary containing the centroid, moment 0, and linewidth arrays
of a spectral cube. This output is created by Moments.to_dict.
The minimum expected keys are 'centroid', 'moment0' and 'linewidth'. If
weight_by_error is enabled, the dictionary should also contain the
error arrays, where the keys are the three above with '_error'
appended to the end. An `~MVC` class may also be passed which may be
pre-computed.
data2 : dict or `~MVC`
See `data1` above.
weight_by_error : bool, optional
When enabled, the property arrays are weighted by the inverse
squared of the error arrays.
low_cut : `~astropy.units.Quantity` or np.ndarray, optional
The lower frequency fitting limit. An array with 2 elements can be
passed to give separate lower limits for the datasets.
high_cut : `~astropy.units.Quantity` or np.ndarray, optional
The upper frequency fitting limit. See `low_cut` above. Defaults to
0.5.
breaks : `~astropy.units.Quantity`, list or array, optional
Specify where the break point is with appropriate units.
If none is given, no break point will be used in the fit.
pspec_kwargs : dict, optional
Passed to `radial_pspec_kwargs` in `~MVC.run`.
pspec2_kwargs : dict or None, optional
Passed to `radial_pspec_kwargs` in `~MVC.run` for `data2`. When
`None` is given, setting from `pspec_kwargs` are used for `data2`.
"""
def __init__(self, data1, data2,
weight_by_error=False, low_cut=None, high_cut=0.5 / u.pix,
breaks=None, pspec_kwargs={}, pspec2_kwargs=None):
if isinstance(data1, MVC):
self.mvc1 = data1
_has_data1 = False
else:
_has_data1 = True
if weight_by_error:
centroid1 = data1["centroid"][0] * \
data1["centroid_error"][0] ** -2.
moment01 = data1["moment0"][0] * \
data1["moment0_error"][0] ** -2.
linewidth1 = data1["linewidth"][0] * \
data1["linewidth_error"][0] ** -2.
else:
centroid1 = data1["centroid"][0]
moment01 = data1["moment0"][0]
linewidth1 = data1["linewidth"][0]
if isinstance(data2, MVC):
self.mvc2 = data2
_has_data2 = False
else:
_has_data2 = True
if weight_by_error:
centroid2 = data2["centroid"][0] * \
data2["centroid_error"][0] ** -2.
moment02 = data2["moment0"][0] * \
data2["moment0_error"][0] ** -2.
linewidth2 = data2["linewidth"][0] * \
data2["linewidth_error"][0] ** -2.
else:
centroid2 = data2["centroid"][0]
moment02 = data2["moment0"][0]
linewidth2 = data2["linewidth"][0]
low_cut, high_cut = check_fit_limits(low_cut, high_cut)
if not isinstance(breaks, list) and not isinstance(breaks, np.ndarray):
breaks = [breaks] * 2
if pspec2_kwargs is None:
pspec2_kwargs = pspec_kwargs
# if fiducial_model is not None:
# self.mvc1 = fiducial_model
if _has_data1:
self.mvc1 = MVC(centroid1, moment01, linewidth1,
data1["centroid"][1])
need_run = True
else:
need_run = False
if not hasattr(self.mvc1, '_slope'):
need_run = True
warn("MVC given as data1 does not have a fitted"
" slope. Re-running MVC.")
if need_run:
self.mvc1.run(radial_pspec_kwargs=pspec_kwargs,
high_cut=high_cut[0],
low_cut=low_cut[0],
fit_kwargs={'brk': breaks[0]}, fit_2D=False)
if _has_data2:
self.mvc2 = MVC(centroid2, moment02, linewidth2,
data2["centroid"][1])
need_run = True
else:
need_run = False
if not hasattr(self.mvc2, '_slope'):
need_run = True
warn("MVC given as data2 does not have a fitted"
" slope. Re-running MVC.")
if need_run:
self.mvc2.run(radial_pspec_kwargs=pspec2_kwargs,
high_cut=high_cut[1],
low_cut=low_cut[1],
fit_kwargs={'brk': breaks[1]}, fit_2D=False)
[docs]
def distance_metric(self, verbose=False, xunit=u.pix**-1,
save_name=None, plot_kwargs1={},
plot_kwargs2={},
use_wavenumber=False):
'''
Implements the distance metric for 2 MVC transforms.
We fit the linear portion of the transform to represent the powerlaw
A linear model with an interaction term is fit to the two powerlaws.
The distance is the t-statistic of the interaction.
Parameters
----------
verbose : bool, optional
Enables plotting.
xunit : `~astropy.units.Unit`, optional
Unit of the x-axis in the plot in pixel, angular, or
physical units.
save_name : str, optional
Name of the save file. Enables saving the figure.
plot_kwargs1 : dict, optional
Pass kwargs to `~turbustat.statistics.MVC.plot_fit`
for `data1`.
plot_kwargs2 : dict, optional
Pass kwargs to `~turbustat.statistics.MVC.plot_fit`
for `data2`.
use_wavenumber : bool, optional
Plot the x-axis as the wavenumber rather than spatial frequency.
'''
# Construct t-statistic
self.distance = \
np.abs((self.mvc1.slope - self.mvc2.slope) /
np.sqrt(self.mvc1.slope_err**2 +
self.mvc2.slope_err**2))
if verbose:
print(self.mvc1.fit.summary())
print(self.mvc2.fit.summary())
import matplotlib.pyplot as plt
defaults1 = {'color': 'b', 'symbol': 'D', 'label': '1'}
defaults2 = {'color': 'g', 'symbol': 'o', 'label': '2'}
for key in defaults1:
if key not in plot_kwargs1:
plot_kwargs1[key] = defaults1[key]
for key in defaults2:
if key not in plot_kwargs2:
plot_kwargs2[key] = defaults2[key]
if 'xunit' in plot_kwargs1:
del plot_kwargs1['xunit']
if 'xunit' in plot_kwargs2:
del plot_kwargs2['xunit']
self.mvc1.plot_fit(xunit=xunit,
use_wavenumber=use_wavenumber,
**plot_kwargs1)
self.mvc2.plot_fit(xunit=xunit,
use_wavenumber=use_wavenumber,
**plot_kwargs2)
axes = plt.gcf().get_axes()
axes[0].legend(loc='best', frameon=True)
if save_name is not None:
plt.savefig(save_name)
plt.close()
else:
plt.show()
return self