__author__ = "sibirrer", "ajshajib"
from scipy.interpolate import interp1d
from scipy.interpolate import RectBivariateSpline
from scipy.interpolate import RegularGridInterpolator
import numpy as np
[docs]
class KinScalingParamManager(object):
"""Class to handle the sorting of parameters in the kinematics scaling."""
def __init__(self, j_kin_scaling_param_name_list):
"""
:param j_kin_scaling_param_name_list: list of strings for the parameters as they are interpolated in the same
order as j_kin_scaling_grid
"""
if j_kin_scaling_param_name_list is None:
self._param_list = []
else:
self._param_list = j_kin_scaling_param_name_list
self._num_param = len(self._param_list)
@property
def num_scaling_dim(self):
"""Number of parameter dimensions for kinematic scaling.
:return: number of scaling dimensions
:rtype: int
"""
return self._num_param
[docs]
def kwargs2param_array(self, kwargs):
"""Converts dictionary to sorted array in same order as interpolation grid.
:param kwargs: dictionary of all model components, must include the one that are
interpolated
:return: sorted list of parameters to interpolate
"""
param_array = []
for param in self._param_list:
if param not in kwargs:
raise ValueError(
"key %s not in parameters and hence kinematic scaling not possible"
% param
)
param_array.append(kwargs.get(param))
return param_array
[docs]
def param_array2kwargs(self, param_array):
"""Inverse function of kwargs2param_array for a given param_array returns the
dictionary split in anisotropy and lens models.
:param param_array:
:return: kwargs_anisotropy, kwargs_lens, kwargs_deprojection
"""
kwargs_anisotropy, kwargs_lens, kwargs_deprojection = {}, {}, {}
for i, param in enumerate(self._param_list):
if param in ["gamma_in", "gamma_pl", "log_m2l"]:
kwargs_lens[param] = param_array[i]
elif param in ["q_intrinsic"]:
kwargs_deprojection[param] = param_array[i]
else:
kwargs_anisotropy[param] = param_array[i]
return kwargs_anisotropy, kwargs_lens, kwargs_deprojection
[docs]
class ParameterScalingSingleMeasurement(object):
"""Class to manage anisotropy scaling for single slit observation."""
def __init__(self, param_grid_axes, j_kin_scaling_grid):
"""
:param param_grid_axes: list of arrays of interpolated parameter values
:param j_kin_scaling_grid: array with the scaling of J() for single measurement bin in same dimensions as the
param_arrays
"""
self._evalute_scaling = False
# check if param arrays is 1d list or 2d list
if param_grid_axes is not None and j_kin_scaling_grid is not None:
if isinstance(param_grid_axes, list):
self._dim_scaling = len(param_grid_axes)
else:
self._dim_scaling = 1
param_grid_axes = [param_grid_axes]
if self._dim_scaling == 1:
self._f_ani = interp1d(
param_grid_axes[0],
j_kin_scaling_grid,
kind="linear",
fill_value="extrapolate",
)
elif self._dim_scaling == 2:
self._f_ani = r = RectBivariateSpline(
param_grid_axes[0],
param_grid_axes[1],
j_kin_scaling_grid,
kx=min(len(param_grid_axes[0]) - 1, 3),
ky=min(len(param_grid_axes[1]) - 1, 3),
)
# self._f_ani = interp2d(
# param_grid_axes[0], param_grid_axes[1], j_kin_scaling_grid.T
# )
else:
self._f_ani = RegularGridInterpolator(
tuple(param_grid_axes),
j_kin_scaling_grid,
)
self._evalute_scaling = True
[docs]
def j_scaling(self, param_array):
"""
:param param_array: sorted list of parameters for the interpolation function
:return: scaling J(a_ani) for single slit
"""
if self._evalute_scaling is not True or len(param_array) == 0:
return 1
if self._dim_scaling == 1:
return self._f_ani(param_array[0])
elif self._dim_scaling == 2:
return self._f_ani(param_array[0], param_array[1])[0].T[0]
else:
return self._f_ani(param_array)[0]
[docs]
class KinScaling(KinScalingParamManager):
"""Class to manage model parameter and anisotropy scalings for IFU data."""
def __init__(
self,
j_kin_scaling_param_axes=None,
j_kin_scaling_grid_list=None,
j_kin_scaling_param_name_list=None,
):
"""
:param j_kin_scaling_param_axes: array of parameter values for each axes of j_kin_scaling_grid
:param j_kin_scaling_grid_list: list of array with the scalings of J() for each IFU
:param j_kin_scaling_param_name_list: list of strings for the parameters as they are interpolated in the same
order as j_kin_scaling_grid
"""
self._param_arrays = j_kin_scaling_param_axes
if (
not isinstance(j_kin_scaling_param_axes, list)
and j_kin_scaling_param_name_list is not None
):
self._param_arrays = [j_kin_scaling_param_axes]
self._evaluate_scaling = False
self._is_log_m2l_population_level = False
if (
j_kin_scaling_param_axes is not None
and j_kin_scaling_grid_list is not None
and j_kin_scaling_param_name_list is not None
):
self._evaluate_scaling = True
self._j_scaling_ifu = []
self._f_ani_list = []
for scaling_grid in j_kin_scaling_grid_list:
self._j_scaling_ifu.append(
ParameterScalingSingleMeasurement(
j_kin_scaling_param_axes, scaling_grid
)
)
if isinstance(j_kin_scaling_param_axes, list):
self._dim_scaling = len(j_kin_scaling_param_axes)
else:
self._dim_scaling = 1
KinScalingParamManager.__init__(
self, j_kin_scaling_param_name_list=j_kin_scaling_param_name_list
)
[docs]
def param_bounds_interpol(self):
"""Minimum and maximum bounds of parameters that are being used to call
interpolation function.
:return: dictionaries of minimum and maximum bounds
"""
kwargs_min, kwargs_max = {}, {}
if self._evaluate_scaling is True:
for i, key in enumerate(self._param_list):
kwargs_min[key] = min(self._param_arrays[i])
kwargs_max[key] = max(self._param_arrays[i])
return kwargs_min, kwargs_max
[docs]
def kin_scaling(self, kwargs_param):
"""
:param kwargs_param: dictionary of parameters for scaling the kinematics
:return: scaling J(a_ani) for the IFU's
"""
if kwargs_param is None:
return np.ones(self._dim_scaling)
param_array = self.kwargs2param_array(kwargs_param)
if self._evaluate_scaling is not True or len(param_array) == 0:
return np.ones(self._dim_scaling)
scaling_list = []
for scaling_class in self._j_scaling_ifu:
scaling = scaling_class.j_scaling(param_array)
scaling_list.append(scaling)
return np.array(scaling_list)