Source code for turbustat.simulator.make_cube


import numpy as np
import astropy.constants as co
import astropy.units as u
from itertools import product
from warnings import warn
from multiprocessing import Pool
from astropy.utils.console import ProgressBar
from astropy.io import fits

from ..io.sim_tools import create_cube_header
from .spectrum import generate_spectrum

SQRT_2PI = np.sqrt(2 * np.pi)
conv_to_K = 1.823e13 * u.cm**-2 / (u.K * u.cm / u.s)


[docs] def make_ppv(vel_field, dens_field, los_axis=0, m=1.4 * co.m_p, T=100 * u.K, los_length=1 * u.pc, vel_disp=None, chan_width=None, v_min=None, v_max=None, threads=1, max_chan=1000, vel_struct_index=0.5, verbose=False, return_hdu=True, pixel_ang_scale=1 * u.arcmin, restfreq=1.42 * u.GHz): ''' Generate a mock, optically-thin HI PPV cube from a given velocity and density field. Currently, the conversion to K assumes the 21-cm column density conversion. Parameters ---------- vel_field : `~astropy.units.Quantity` Three-dimensional isotropic, velocity field. Must have units of velocity. dens_field : `~astropy.units.Quantity` Three-dimensional isotropic, velocity field. Must have units of number density. los_axis : int, optional The axis used to produce the PPV cube. m : `~astropy.units.Quantity`, optional Average particle mass. Defaults to 1.4 time the proton mass, appropriate for the neutral ISM at solar metallicity. Used to calculate the thermal line width. T : `~astropy.units.Quantity`, optional Temperature of the gas. Defaults to 100 K. Used to calculate the thermal line width. los_length : `~astropy.units.Quantity`, optional Set the total physical length of the density and velocity fields along the line-of-sight. Defaults to 1 pc. vel_disp : `~astropy.units.Quantity`, optional Velocity dispersion of the 3D velocity field. When none is given, the mean of the projected standard deviation of the velocity field along `los_axis` is used. Used to set the extent of the velocity channels and estimate the channel widths, when not given. chan_width : `~astropy.units.Quantity`, optional Width of a velocity channel. When the width is not given, an estimate is made based on Eq. 12 from `Esquivel+2003 <https://ui.adsabs.harvard.edu/#abs/2003MNRAS.342..325E/abstract>`_. v_min : `~astropy.units.Quantity`, optional Minimum velocity channel. Set to `vel_field - 4 * v_lim`, where `v_lim = sqrt(vel_disp**2 + v_therm**2)`, when a limit is not given. v_max : `~astropy.units.Quantity`, optional Maximum velocity channel. Set to `vel_field + 4 * v_lim`, where `v_lim = sqrt(vel_disp**2 + v_therm**2)`, when a limit is not given. threads : int, optional Number of cores to run on. Defaults to 1. max_chan : int, optional Sets an upper limit on the number of velocity channels (default of 1000) to avoid using excessive amounts of memory. If the number of channels exceeds this limit, a `ValueError` is raised. vel_struct_index : float, optional Index of the velocity field. Used when automatically determining the channel width. Defaults to 0.5. verbose : bool, optional Print out the min and max velocity extents and the channel width prior to computing the cube. return_hdu : bool, optional Return the cube as a FITS HDU. Enabled by default. pixel_ang_scale : `~astropy.units.Quantity`, optional Specify the angular scale of one spatial pixel to set values in the FITS header. Defaults to 1 arcmin. restfreq : `~astropy.units.Quantity`, optional Rest frequency of the spectral line passed to the FITS header. Defaults to 1.42 GHz, roughly the 21-cm HI rest frequency. Returns ------- cube : `~astropy.units.Quantity` or `~astropy.io.fits.PrimaryHDU` The PPV cube as an array (`return_hdu=False`) or a FITS HDU (`return_hdu=True`). vel_axis : `~astropy.units.Quantity` When `return_hdu=False` is returned, the values for the velocity axis are returned. ''' # Densities better be postive if (dens_field.value < 0.).any(): raise ValueError("The density field contains negative values.") v_therm_sq = (co.k_B * T / m).to(vel_field.unit**2) # Estimate the velocity dispersion when not given. # Use the if vel_disp is None: vel_disp = np.std(vel_field, axis=los_axis).mean() # Make a line width estimate with thermal broadening for setting velocity # extent v_lim = np.sqrt(vel_disp**2 + v_therm_sq) # Setup velocity axis if v_min is None: v_min = vel_field.min() - 4 * v_lim else: if not v_min.unit.is_equivalent(u.km / u.s): raise u.UnitsError("v_min must be given in velocity units.") if v_max is None: v_max = vel_field.max() + 4 * v_lim else: if not v_max.unit.is_equivalent(u.km / u.s): raise u.UnitsError("v_max must be given in velocity units.") if v_min > v_max: raise ValueError("v_min > v_max is not allowed.") del_v = v_max - v_min # m=1 assumption. if vel_struct_index is None: warn("Setting channels for velocity structure index of 0.5.") vel_struct_index = 0.5 if chan_width is None: # Eq. 13 from Esquivel+2003. Number of channels to recover the thin # slice regime down to 2 pix. # Note that the equation in the paper has a negative sign in the sqrt # that should be a +. # Take the effective width to be ~1/2 of the effective channel width # given by Eq. 12 in Esquivel+2003, for scales down to 2 pix. dv_sq = vel_disp**2 * \ (2. / float(vel_field.shape[los_axis]))**(vel_struct_index) v_eff = np.sqrt(dv_sq + 2 * v_therm_sq).to(vel_field.unit) N_chan = int(np.ceil(del_v / (v_eff / 2.)).value) else: N_chan = int(np.ceil(del_v / chan_width)) if verbose: print("Number of spectral channels {}".format(N_chan)) print("Min velocity {}".format(v_min)) print("Max velocity {}".format(v_max)) print("Channel width {}".format(chan_width)) if N_chan < 6: warn("<6 channels will be used ({} channels). Recommend increasing the" " number of channels. See Esquivel et al. (2003) and Chepurnov & " "Lazarian (2009).") if N_chan > max_chan: raise ValueError("Number of channels larger than set maximum" " ({})".format(max_chan)) # When computing the spectrum, we want the edges of the velocity channels # numpy in py27 isn't handling the units correctly in linspace. vel_edges = np.linspace(v_min.value, v_max.value, N_chan + 1) * v_min.unit vel_axis = 0.5 * (vel_edges[1:] + vel_edges[:-1]) # Length of one pixel pix_scale = los_length.to(u.cm) / float(vel_field.shape[los_axis]) shape = list(vel_field.shape) shape.pop(los_axis) spec_gen = ((vel_edges, vel_field[field_slice(y, x, los_axis)], dens_field[field_slice(y, x, los_axis)], v_therm_sq, pix_scale, y, x) for y, x in product(range(shape[0]), range(shape[1]))) if threads == 1: spectra = list(map(_mapper, spec_gen)) else: with Pool(threads) as pool: spectra = pool.map(_mapper, spec_gen) # Map spectra into a cube cube = np.empty(vel_axis.shape + tuple(shape)) * u.K for out in spectra: spec, y, x = out cube[:, y, x] = spec if return_hdu: header = create_cube_header(pixel_ang_scale, np.diff(vel_axis)[0], 0.0 * u.arcsec, cube.shape, restfreq, u.K, v0=vel_axis[0]) return fits.PrimaryHDU(cube.value, header) return cube, vel_axis
def _spectrum_maker(vel_edges, vel_slice, dens_slice, v_therm_sq, pix_scale, y, x): ''' Generate an optically-thin spectrum. ''' spectrum = np.zeros_like(vel_edges.value[:-1]) * u.cm**-2 / (u.cm / u.s) # Make derivative of los velocity. dvdz = (np.gradient(vel_slice) / pix_scale).to(1 / u.s) v_cents = (0.5 * (vel_edges[1:] + vel_edges[:-1])).to(u.cm / u.s) spectrum = generate_spectrum(vel_slice.to(u.cm / u.s).value, dens_slice.to(u.cm**-3).value, vel_edges.to(u.cm / u.s).value, v_cents.value, dvdz.value, v_therm_sq.to(u.cm**2 / u.s**2).value, pix_scale.to(u.cm).value) spectrum = spectrum * u.cm**-2 / (u.cm / u.s) spectrum /= conv_to_K return spectrum, y, x def _mapper(inps): ''' Use with `multiprocessing.Pool.map`. ''' return _spectrum_maker(*inps) def field_slice(y, x, los_axis): ''' Slice out spatial slices of a 3D field without the axis along the observer's LOS. ''' los_slice = slice(None) spat_slices = [y, x] spat_slices.insert(los_axis, los_slice) return tuple(spat_slices)