# Licensed under an MIT open source license - see LICENSE
import numpy as np
import numpy.random as ra
from ..psds import pspec
from ..rfft_to_fft import rfft_to_fft
import statsmodels.formula.api as sm
from pandas import Series, DataFrame
from numpy.fft import fft2, fftshift
[docs]class PowerSpectrum(object):
"""
Compute the power spectrum of a given image. (Burkhart et al., 2010)
Parameters
----------
img : numpy.ndarray
2D image.
header : FITS header
The image header. Needed for the pixel scale.
"""
def __init__(self, img, header, weights=None):
super(PowerSpectrum, self).__init__()
self.img = img
# Get rid of nans
self.img[np.isnan(self.img)] = 0.0
self.header = header
self.degperpix = np.abs(header["CDELT2"])
if weights is None:
weights = np.ones(img.shape)
else:
# Get rid of all NaNs
weights[np.isnan(weights)] = 0.0
weights[np.isnan(self.img)] = 0.0
self.img[np.isnan(self.img)] = 0.0
self.weighted_img = self.img * weights
self.ps2D = None
self.ps1D = None
self.freq = None
[docs] def compute_pspec(self):
'''
Compute the 2D power spectrum.
'''
fft = fftshift(rfft_to_fft(self.weighted_img))
self.ps2D = np.power(fft, 2.)
return self
[docs] def compute_radial_pspec(self, return_index=True, wavenumber=False,
return_stddev=False, azbins=1,
binsize=1.0, view=False, **kwargs):
'''
Computes the radially averaged power spectrum.
This uses Adam Ginsburg's code (see https://github.com/keflavich/agpy).
See the above url for parameter explanations.
'''
self.freq, self.ps1D = pspec(self.ps2D, return_index=return_index,
wavenumber=wavenumber,
return_stddev=return_stddev,
azbins=azbins, binsize=binsize,
view=view, **kwargs)
return self
[docs] def run(self, phys_units=False, verbose=False):
'''
Full computation of the Spatial Power Spectrum.
Parameters
----------
phys_units : bool, optional
Sets frequency scale to physical units.
verbose: bool, optional
Enables plotting.
'''
self.compute_pspec()
self.compute_radial_pspec(logspacing=True)
if phys_units:
self.freq *= self.degperpix ** -1
if verbose:
import matplotlib.pyplot as p
p.subplot(121)
p.imshow(
np.log10(self.ps2D), origin="lower", interpolation="nearest")
p.colorbar()
p.subplot(122)
p.loglog(self.freq, self.ps1D, "bD")
p.xlabel("log K")
p.ylabel("Power (K)")
p.show()
return self
[docs]class PSpec_Distance(object):
"""
Distance metric for the spatial power spectrum. A linear model with an
interaction term is fit to the powerlaws. The distance is the
t-statistic of the interaction term.
Parameters
----------
data1 : dict
Dictionary containing necessary property arrays.
data2 : dict
Dictionary containing necessary property arrays.
fiducial_model : PowerSpectrum
Computed PowerSpectrum object. use to avoid recomputing.
"""
def __init__(self, data1, data2, weights1=None, weights2=None,
fiducial_model=None):
super(PSpec_Distance, self).__init__()
self.shape1 = data1[0].shape
self.shape2 = data2[0].shape
if fiducial_model is None:
self.pspec1 = PowerSpectrum(data1[0],
data1[1],
weights=weights1)
self.pspec1.run()
else:
self.pspec1 = fiducial_model
self.pspec2 = PowerSpectrum(data2[0],
data2[1],
weights=weights2)
self.pspec2.run()
self.results = None
self.distance = None
[docs] def distance_metric(self, low_cut=2.0, high_cut=64.0, verbose=False):
'''
Implements the distance metric for 2 Power Spectrum transforms.
We fit the linear portion of the transform to represent the powerlaw
A linear model with an interaction term is fit to the two powerlaws.
The distance is the t-statistic of the interaction.
Parameters
----------
low_cut : int or float, optional
Set the cut-off for low spatial frequencies. Visually, below ~2
deviates from the power law (for the simulation set).
high_cut : int or float, optional
Set the cut-off for high spatial frequencies. Values beyond the
size of the root grid are found to have no meaningful contribution
verbose : bool, optional
Enables plotting.
'''
clip_freq1 = \
self.pspec1.freq[clip_func(self.pspec1.freq, low_cut, high_cut)]
clip_ps1D1 = \
self.pspec1.ps1D[clip_func(self.pspec1.freq, low_cut, high_cut)]
clip_freq2 = \
self.pspec2.freq[clip_func(self.pspec2.freq, low_cut, high_cut)]
clip_ps1D2 = \
self.pspec2.ps1D[clip_func(self.pspec2.freq, low_cut, high_cut)]
dummy = [0] * len(clip_freq1) + [1] * len(clip_freq2)
x = np.concatenate((np.log10(clip_freq1), np.log10(clip_freq2)))
regressor = x.T * dummy
log_ps1D = np.concatenate((np.log10(clip_ps1D1), np.log10(clip_ps1D2)))
d = {"dummy": Series(dummy), "scales": Series(
x), "log_ps1D": Series(log_ps1D), "regressor": Series(regressor)}
df = DataFrame(d)
model = sm.ols(
formula="log_ps1D ~ dummy + scales + regressor", data=df)
self.results = model.fit()
self.distance = np.abs(self.results.tvalues["regressor"])
if verbose:
print self.results.summary()
import matplotlib.pyplot as p
p.plot(np.log10(clip_freq1), np.log10(clip_ps1D1), "bD",
np.log10(clip_freq2), np.log10(clip_ps1D2), "gD")
p.plot(df["scales"][:len(clip_freq1)],
self.results.fittedvalues[:len(clip_freq1)], "b",
df["scales"][-len(clip_freq2):],
self.results.fittedvalues[-len(clip_freq2):], "g")
p.grid(True)
p.xlabel("log K")
p.ylabel("Power (K)")
p.show()
return self
[docs]class BiSpectrum(object):
"""
Computes the bispectrum (three-point correlation function) of the given
image (Burkhart et al., 2010).
The bispectrum and the bicoherence are returned. The bicoherence is
a normalized version (real and to unity) of the bispectrum.
Parameters
----------
img : numpy.ndarray
2D image.
"""
def __init__(self, img):
super(BiSpectrum, self).__init__()
self.img = img
self.shape = img.shape
# Set nans to min
self.img[np.isnan(self.img)] = np.nanmin(self.img)
self.kx = np.arange(0., self.shape[0] / 2., 1)
self.ky = np.arange(0., self.shape[1] / 2., 1)
self.bispectrum = np.zeros(
(int(self.shape[0] / 2.) - 1, int(self.shape[1] / 2.) - 1),
dtype=np.complex)
self.bicoherence = np.zeros(
(int(self.shape[0] / 2.) - 1, int(self.shape[1] / 2.) - 1))
self.bispectrum_amp = None
self.accumulator = np.zeros(
(int(self.shape[0] / 2.) - 1, int(self.shape[1] / 2.) - 1))
self.tracker = np.zeros(self.shape)
[docs] def compute_bispectrum(self, nsamples=100, seed=1000):
'''
Do the computation.
Parameters
----------
nsamples : int, optional
Sets the number of samples to take at each vector
magnitude.
seed : int, optional
Sets the seed for the distribution draws.
'''
fft = np.fft.fft2(self.img)
conjfft = np.conj(fft)
ra.seed(seed)
for k1mag in range(int(fft.shape[0] / 2.)):
for k2mag in range(int(fft.shape[1] / 2.)):
phi1 = ra.uniform(0, 2 * np.pi, nsamples)
phi2 = ra.uniform(0, 2 * np.pi, nsamples)
k1x = np.asarray([int(k1mag * np.cos(angle))
for angle in phi1])
k2x = np.asarray([int(k2mag * np.cos(angle))
for angle in phi2])
k1y = np.asarray([int(k1mag * np.sin(angle))
for angle in phi1])
k2y = np.asarray([int(k2mag * np.sin(angle))
for angle in phi2])
k3x = k1x + k2x
k3y = k1y + k2y
x = np.asarray([int(np.sqrt(i ** 2. + j ** 2.))
for i, j in zip(k1x, k1y)])
y = np.asarray([int(np.sqrt(i ** 2. + j ** 2.))
for i, j in zip(k2x, k2y)])
for n in range(nsamples):
self.bispectrum[x[n], y[n]] +=\
fft[k1x[n], k1y[n]] *\
fft[k2x[n], k2y[n]] *\
conjfft[k3x[n], k3y[n]]
self.bicoherence[x[n], y[n]] +=\
np.abs(fft[k1x[n], k1y[n]] *
fft[k2x[n], k2y[n]] *
conjfft[k3x[n], k3y[n]])
self.accumulator[x[n], y[n]] += 1.
# Track where we're sampling from in fourier space
self.tracker[k1x[n], k1y[n]] += 1
self.tracker[k2x[n], k2y[n]] += 1
self.tracker[k3x[n], k3y[n]] += 1
self.bicoherence = (np.abs(self.bispectrum) / self.bicoherence)
self.bispectrum /= self.accumulator
self.bispectrum_amp = np.log10(np.abs(self.bispectrum) ** 2.)
return self
[docs] def run(self, nsamples=100, verbose=False):
'''
Compute the bispectrum. Necessary to maintiain package standards.
Parameters
----------
nsamples : int, optional
Sets the number of samples to take at each vector magnitude.
verbose : bool, optional
Enables plotting.
'''
self.compute_bispectrum(nsamples=nsamples)
if verbose:
import matplotlib.pyplot as p
p.subplot(1, 2, 1)
p.title("Bispectrum")
p.imshow(
self.bispectrum_amp, origin="lower", interpolation="nearest")
p.colorbar()
p.contour(self.bispectrum_amp, colors="k")
p.xlabel("k1")
p.ylabel("k2")
p.subplot(1, 2, 2)
p.title("Bicoherence")
p.imshow(self.bicoherence, origin="lower", interpolation="nearest")
p.colorbar()
p.xlabel("k1")
p.ylabel("k2")
p.show()
[docs]class BiSpectrum_Distance(object):
'''
Calculate the distance between two images based on their bicoherence.
The distance is the L2 norm between the bicoherence surfaces.
Parameters
----------
data1 : FITS hdu
Contains the data and header of the image.
data2 : FITS hdu
Contains the data and header of the image.
nsamples : int, optional
Sets the number of samples to take at each vector magnitude.
fiducial_model : Bispectrum
Computed Bispectrum object. use to avoid recomputing.
'''
def __init__(self, data1, data2, nsamples=100, fiducial_model=None):
super(BiSpectrum_Distance, self).__init__()
if fiducial_model is not None:
self.bispec1 = fiducial_model
else:
self.bispec1 = BiSpectrum(data1[0])
self.bispec1.run()
self.bispec2 = BiSpectrum(data2[0])
self.bispec2.run()
self.distance = None
[docs] def distance_metric(self, verbose=False):
self.distance = np.linalg.norm(self.bispec1.bicoherence.ravel() -
self.bispec2.bicoherence.ravel())
if verbose:
import matplotlib.pyplot as p
p.subplot(1, 3, 1)
p.title("Bicoherence 1")
p.imshow(
self.bispec1.bicoherence, origin="lower",
interpolation="nearest")
p.colorbar()
p.xlabel("k1")
p.ylabel("k2")
p.subplot(1, 3, 2)
p.title("Bicoherence 2")
p.imshow(
self.bispec2.bicoherence, origin="lower",
interpolation="nearest")
p.colorbar()
p.xlabel("k1")
p.ylabel("k2")
p.subplot(1, 3, 3)
p.title("Difference")
p.imshow(np.abs(self.bispec1.bicoherence - self.bispec2.bicoherence),
origin="lower", interpolation="nearest",
vmax=1.0, vmin=0.0)
p.colorbar()
p.xlabel("k1")
p.ylabel("k2")
p.show()
return self
def clip_func(arr, low, high):
return np.logical_and(arr > low, arr < high)