# Licensed under an MIT open source license - see LICENSE
from __future__ import print_function, absolute_import, division
import numpy as np
from scipy.stats import chisquare
from scipy.optimize import curve_fit
import astropy.units as u
from astropy.table import Table
from ..stats_utils import standardize, padwithzeros
from ..base_statistic import BaseStatisticMixIn
from ...io import common_types, twod_types
[docs]
class Tsallis(BaseStatisticMixIn):
"""
The Tsallis Distribution (see Tofflemire et al., 2011)
Parameters
----------
img : %(dtypes)s
2D image.
header : FITS header, optional
The image header. Needed for the pixel scale.
lags : `~astropy.units.Quantity`, optional
Give the spatial lag values to compute the distribution at. The
default lag sizes are powers of 2 up to half the image size (so for a
128 by 128 image, the lags will be [1, 2, 4, 8, 16, 32, 64]).
distance : `~astropy.units.Quantity`, optional
Physical distance to the region in the data.
"""
__doc__ %= {"dtypes": " or ".join(common_types + twod_types)}
def __init__(self, img, header=None, lags=None, distance=None):
self.input_data_header(img, header)
if distance is not None:
self.distance = distance
if lags is None:
# Find the next smallest power of 2 from the smallest axis
max_power = \
np.floor(np.log2(min(self.data.shape) / 2.)).astype(int)
self.lags = [2**i for i in range(max_power + 1)] * u.pix
else:
self.lags = lags
@property
def lags(self):
'''
Lag values to calculate the statistics at.
'''
return self._lags
@lags.setter
def lags(self, values):
if not isinstance(values, u.Quantity):
raise TypeError("lags must be given as a astropy.units.Quantity.")
# Now make sure that we can convert into pixels before setting.
try:
pix_rad = self._to_pixel(values)
except Exception as e:
raise e
# The radius should be larger than a pixel
if np.any(pix_rad.value < 1):
raise ValueError("One of the chosen lags is smaller than one "
"pixel."
" Ensure that all lag values are larger than one "
"pixel.")
half_comp = (np.floor(pix_rad.value) - min(self.data.shape) / 2.)
if np.any(half_comp > 1e-10):
raise ValueError("At least one of the lags is larger than half of"
" the image size. Remove these lags from the "
"array.")
self._lags = values
[docs]
def make_tsallis(self, periodic=True, num_bins=None):
'''
Calculate the Tsallis distribution at each lag.
We standardize each distribution such that it has a mean of zero and
variance of one before fitting.
If the lag values are fractions of a pixel when converted to pixel
units, the lag is rounded down to the next smallest integer value.
Parameters
----------
periodic : bool, optional
Use for simulations with periodic boundaries.
num_bins : int, optional
Number of bins to use in the histograms. Defaults to the
square-root of the number of finite points in the image.
'''
if num_bins is None:
num_bins = \
np.ceil(np.sqrt(np.isfinite(self.data).sum())).astype(int)
self._lag_arrays = np.empty((len(self.lags),
self.data.shape[0],
self.data.shape[1]))
self._lag_distribs = np.empty((len(self.lags), 2, num_bins))
# Convert the lags into pixels
pix_lags = np.floor(self._to_pixel(self.lags).value).astype(int)
for i, lag in enumerate(pix_lags):
if periodic:
pad_img = self.data
else:
pad_img = np.pad(self.data, lag, padwithzeros)
rolls = np.roll(pad_img, lag, axis=0) +\
np.roll(pad_img, -lag, axis=0) +\
np.roll(pad_img, lag, axis=1) +\
np.roll(pad_img, -lag, axis=1)
# Remove the padding
if periodic:
clip_resulting = (rolls / 4.) - pad_img
else:
clip_resulting = (rolls[lag:-lag, lag:-lag] / 4.) -\
pad_img[lag:-lag, lag:-lag]
# Normalize the data
data = standardize(clip_resulting)
# Ignore nans for the histogram
hist, bin_edges = np.histogram(data[~np.isnan(data)],
bins=num_bins)
bin_centres = (bin_edges[:-1] + bin_edges[1:]) / 2
normlog_hist = np.log10(hist / np.sum(hist, dtype="float"))
# Keep results
self._lag_arrays[i, :] = data
self._lag_distribs[i, 0, :] = bin_centres
self._lag_distribs[i, 1, :] = normlog_hist
@property
def lag_arrays(self):
'''
Arrays of the image computed at different lags.
'''
return self._lag_arrays
@property
def lag_distribs(self):
'''
Histogram bins and values compute from `~Tsallis.lag_arrays`. The
histogram values are in log10.
'''
return self._lag_distribs
[docs]
def fit_tsallis(self, sigma_clip=5):
'''
Fit the Tsallis distributions.
Parameters
----------
sigma_clip : float
Sets the sigma value to clip data at. If `None`,
no clipping is performed on the data. Defaults to 5.
'''
if not hasattr(self, 'lag_distribs'):
raise Exception("Calculate the distributions first with "
"Tsallis.make_tsallis.")
self._sigma_clip = sigma_clip
self._tsallis_params = np.empty((len(self.lags), 3))
self._tsallis_stderrs = np.empty((len(self.lags), 3))
self._tsallis_chisq = np.empty((len(self.lags), 1))
for i, dist in enumerate(self.lag_distribs):
if sigma_clip is None:
# Keep all finite data
finite_mask = np.logical_and(np.isfinite(dist[0]),
np.isfinite(dist[1]))
clipped = dist[0][finite_mask], dist[1][finite_mask]
else:
clipped = clip_to_sigma(dist[0], dist[1], sigma=sigma_clip)
params, pcov = curve_fit(tsallis_function, clipped[0], clipped[1],
p0=(-np.max(clipped[1]), 1., 2.),
maxfev=100 * len(dist[0]))
fitted_vals = tsallis_function(clipped[0], *params)
self._tsallis_params[i] = params
self._tsallis_stderrs[i] = np.sqrt(np.diag(pcov))
self._tsallis_chisq[i] = np.sum((np.exp(fitted_vals) - np.exp(clipped[1]))**2. / np.exp(clipped[1]))
@property
def tsallis_params(self):
'''
Parameters of the Tsallis distribution fit at each lag value.
'''
return self._tsallis_params
@property
def tsallis_stderrs(self):
'''
Standard errors of the Tsallis distribution fit at each lag value.
'''
return self._tsallis_stderrs
@property
def tsallis_chisq(self):
'''
Reduced chi-squared values for the fit at each lag value.
'''
return self._tsallis_chisq
@property
def tsallis_table(self):
'''
Return the fit parameters, standard error, and chi-squared values as
an `~astropy.table.Table`.
'''
data = [self.lags] + [col for col in self.tsallis_params.T] + \
[col for col in self.tsallis_stderrs.T] + [self.tsallis_chisq]
names = ['lags', 'logA', 'w2', 'q', 'logA_stderr', 'w2_stderr',
'q_stderr', 'redchisq']
return Table(data, names=names)
[docs]
def plot_parameters(self, save_name=None, **kwargs):
'''
Plot the fit parameters as a function of lag.
Parameters
----------
save_name : str,optional
Save name for the figure. Enables saving the plot.
kwargs : passed to `~matplotlib.pyplot.errorbar`.
'''
import matplotlib.pyplot as plt
fig, axes = plt.subplots(3, 1, sharex=True)
ax1 = axes[0]
ax1.errorbar(self.lags.value, self.tsallis_table['logA'],
yerr=self.tsallis_table['logA_stderr'],
**kwargs)
ax1.set_ylabel(r"log A")
ax1.grid()
ax2 = axes[1]
ax2.errorbar(self.lags.value, self.tsallis_table['w2'],
yerr=self.tsallis_table['w2_stderr'],
**kwargs)
ax2.set_ylabel(r"$w^2$")
ax2.grid()
ax3 = axes[2]
ax3.errorbar(self.lags.value, self.tsallis_table['q'],
yerr=self.tsallis_table['q_stderr'],
**kwargs)
ax3.set_ylabel(r"q")
ax3.grid()
if save_name is not None:
plt.savefig(save_name)
plt.close()
else:
plt.show()
[docs]
def plot_fit(self, save_name=None, color='r',
fit_color='k'):
'''
Plot the distributions and fits to the Tsallis function.
Parameters
----------
save_name : str, optional
Save name for the figure. Enables saving the plot.
'''
import matplotlib.pyplot as plt
if fit_color is None:
fit_color = color
fig, axes = plt.subplots(len(self.lags), 1, sharex=True)
for vals in zip(self.lags, self.lag_distribs,
self.lag_arrays, self.tsallis_params,
axes):
lag, dist, arr, params, ax = vals
ax.plot(dist[0], dist[1], 'D', color=color,
label="Lag {}".format(lag), alpha=0.5)
ax.plot(dist[0], tsallis_function(dist[0], *params),
color=fit_color)
# Indicate which data was used for the fits.
# Only if sigma-clipping is applied.
if self._sigma_clip is not None:
ax.axvline(self._sigma_clip, color='r', linestyle='--',
alpha=0.7)
ax.axvline(-self._sigma_clip, color='r', linestyle='--',
alpha=0.7)
ax.legend(frameon=True, loc='best')
if save_name is not None:
plt.savefig(save_name)
plt.close()
else:
plt.show()
[docs]
def run(self, verbose=False, num_bins=None, periodic=True, sigma_clip=5,
save_name=None):
'''
Run all steps.
Parameters
----------
verbose : bool, optional
Enables plotting.
num_bins : int, optional
Sets the number of bins to use in the lag histograms. Passed
to `~Tsallis.make_tsallis`.
periodic : bool, optional
Treat periodic boundaries. Passed
to `~Tsallis.make_tsallis`. Enabled by default.
sigma_clip : float
Sets the sigma value to clip data at.
Passed to :func:`fit_tsallis`.
save_name : str,optional
Save the figure when a file name is given.
'''
self.make_tsallis(num_bins=num_bins, periodic=periodic)
self.fit_tsallis(sigma_clip=sigma_clip)
if verbose:
# print the table of parameters
print(self.tsallis_table)
self.plot_fit(save_name=save_name)
return self
# class Tsallis_Distance(object):
# '''
# Distance Metric for the Tsallis Distribution.
# Parameters
# ----------
# array1 : %(dtypes)s
# 2D datas.
# array2 : %(dtypes)s
# 2D datas.
# lags : `~astropy.units.Quantity`
# Lags to calculate at.
# fiducial_model : Tsallis
# Computed Tsallis object. use to avoid recomputing.
# tsallis1_kwargs : dict, optional
# Pass kwargs to `~Tsallis.run` for array1.
# tsallis2_kwargs : dict, optional
# Pass kwargs to `~Tsallis.run` for array2.
# '''
# __doc__ %= {"dtypes": " or ".join(common_types + twod_types)}
# def __init__(self, array1, array2, lags=None, tsallis1_kwargs={},
# tsallis2_kwargs={}, fiducial_model=None,):
# super(Tsallis_Distance, self).__init__()
# if fiducial_model is not None:
# self.tsallis1 = fiducial_model
# else:
# self.tsallis1 = \
# Tsallis(array1, lags=lags).run(verbose=False,
# **tsallis1_kwargs)
# self.tsallis2 = \
# Tsallis(array2, lags=lags).run(verbose=False,
# **tsallis2_kwargs)
# self.distance = None
# def distance_metric(self, verbose=False, save_name=None):
# '''
# We do not consider the parameter a in the distance metric. Since we
# are fitting to a PDF, a is related to the number of data points and
# is therefore not a true measure of the differences between the data
# sets. The distance is computed by summing the squared difference of
# the parameters, normalized by the sums of the squares, for each lag.
# The total distance the sum between the two parameters.
# Parameters
# ----------
# verbose : bool, optional
# Enables plotting.
# save_name : str,optional
# Save the figure when a file name is given.
# '''
# w1 = self.tsallis1.tsallis_params[:, 1]
# w2 = self.tsallis2.tsallis_params[:, 1]
# q1 = self.tsallis1.tsallis_params[:, 2]
# q2 = self.tsallis2.tsallis_params[:, 2]
# # diff_a = (a1-a2)**2.
# diff_w = (w1 - w2) ** 2. / (w1 ** 2. + w2 ** 2.)
# diff_q = (q1 - q2) ** 2. / (q1 ** 2. + q2 ** 2.)
# self.distance = np.sum(diff_w + diff_q)
# if verbose:
# import matplotlib.pyplot as p
# lags = self.tsallis1.lags
# p.plot(lags, diff_w, "rD", label="Difference of w")
# p.plot(lags, diff_q, "go", label="Difference of q")
# p.legend()
# p.xscale('log', basex=2)
# p.ylabel("Normalized Difference")
# p.xlabel("Lags (pixels)")
# p.grid(True)
# if save_name is not None:
# p.savefig(save_name)
# p.close()
# else:
# p.show()
# return self
def tsallis_function(x, *p):
'''
Tsallis distribution function as given in Tofflemire
Implemented in log form. The expected parameters are
log A, w^2, and q.
Parameters
----------
x : numpy.ndarray or list
x-data
params : list
Contains the three parameter values.
'''
loga, wsquare, q = p
return (-1 / (q - 1)) * (np.log10(1 + (q - 1) *
(x ** 2. / wsquare)) + loga)
def clip_to_sigma(x, y, sigma=2):
'''
Clip to values between +/- sigma.
Parameters
----------
x : numpy.ndarray
x-data
y : numpy.ndarray
y-data
'''
clip_mask = np.logical_and(y < sigma, y > -sigma)
# And ensure all data is finite for fitting
finite_mask = np.logical_and(np.isfinite(y), np.isfinite(x))
all_mask = np.logical_and(clip_mask, finite_mask)
return x[all_mask], y[all_mask]