Source code for nnero.analysis

##################################################################################
# This file is part of NNERO.
#
# Copyright (c) 2024, Gaétan Facchinetti
#
# NNERO is free software: you can redistribute it and/or modify it 
# under the terms of the GNU General Public License as published by 
# the Free Software Foundation, either version 3 of the License, or any 
# later version. NNERO is distributed in the hope that it will be useful, 
# but WITHOUT ANY WARRANTY; without even the implied warranty of 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU 
# General Public License along with NNERO. 
# If not, see <https://www.gnu.org/licenses/>.
#
##################################################################################

##################
#
# Definition of some functions to analyse chains and plot them
#
##################



import glob
import warnings
import os

import numpy as np
from copy import copy

from scipy.ndimage import gaussian_filter, gaussian_filter1d


from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Self

from .data         import MP_KEY_CORRESPONDANCE
from .predictor    import DEFAULT_VALUES
from .regressor    import Regressor
from .classifier   import Classifier
from .interpolator import Interpolator
from .predictor    import predict_Xe_numpy, predict_tau_numpy, predict_parameter_numpy


EMCEE_IMPORTED = False

try:
    import emcee
    EMCEE_IMPORTED = True
except:
    pass


[docs] def neutrino_masses(mnu_1, mnu_2 = 0.0, mnu_3 = 0.0, hierarchy = 'NORMAL'): ## DEFINE NEUTRINO MASSES BEFORE ANYTHING ELSE delta_m21_2 = 7.5e-5 delta_m31_NO_2 = 2.55e-3 delta_m31_IO_2 = 2.45e-3 # degenerate neutrino masses (all equal to the first one) if hierarchy == 'DEGENERATE': mnu_2 = mnu_1 mnu_3 = mnu_1 # normal ordering if hierarchy == "NORMAL": mnu_2 = np.sqrt(mnu_1**2 + delta_m21_2) mnu_3 = np.sqrt(mnu_1**2 + delta_m31_NO_2) # inverse ordering if hierarchy == "INVERSE": mnu_2 = np.sqrt(mnu_1**2 + delta_m31_IO_2) mnu_3 = np.sqrt(mnu_1**2 + delta_m31_IO_2 + delta_m21_2) # create an array of neutrino masses m_neutrinos = np.array([mnu_1, mnu_2, mnu_3]) return m_neutrinos
[docs] def to_CLASS_names(array: list[str] | np.ndarray): is_array = isinstance(array, np.ndarray) labels_correspondance = {value : key for key, value in MP_KEY_CORRESPONDANCE.items()} array = [labels_correspondance[value] if value in labels_correspondance else value for value in array] if is_array is True: array = np.array(array) return array
[docs] def to_21cmFAST_names(array: list[str] | np.ndarray): is_array = isinstance(array, np.ndarray) array = [MP_KEY_CORRESPONDANCE[value] if value in MP_KEY_CORRESPONDANCE else value for value in array] if is_array is True: array = np.array(array) return array
################################################# ## CHAIN ANALYSIS TOOLS
[docs] class MPChain: """ Class MPChain reading chains from MontePython output files Parameters ---------- filename : str Path to the the file where the chain is stored. """ def __init__(self, filename: str): self._filename: str = filename self.load()
[docs] def load(self) -> None: """ Load the chain and automatically remove the non-markovian points. """ # read the file to get the chain with open(self._filename, 'r') as f: data = np.loadtxt(f) self.weights = data[:, 0] self.lnlkl = - data[:, 1] self._values = data[:, 2:].T self._total_n_steps = np.sum(self.weights, dtype=int) self._total_length = self._values.shape[1] # reread the file to find the non markovian part of the chain with open(self._filename, 'r') as f: self._markov_index = 0 for line in f: if (line.strip().startswith('#')) and ('update proposal' in line): self._markov_index = int(line.split(' ')[2]) # remove the non markovian part by default self._values = self._values[:, self._markov_index:] self.weights = self.weights[self._markov_index:] self.lnlkl = self.lnlkl[self._markov_index:] self._max_lnlkl = np.max(self.lnlkl) index_max_lnlkl = np.argmax(self.lnlkl) self._best_fit = self._values[:, index_max_lnlkl] self._n_params = self._values.shape[0] self._mean_value: np.ndarray = np.zeros(self._n_params) self._var_value: np.ndarray = np.zeros(self._n_params)
[docs] def remove_burnin(self, global_max_lnlkl: float) -> None: """ Remove the burnin points according to the value of the maximum log likelihood over all chains. Only points of the chain that are after its overcrossing of global_max_lnlkl - 3 are kept. Parameters ---------- global_max_lnlkl : float Global maximum log likelihood over all chains """ if np.all(self.lnlkl < (global_max_lnlkl - 3)): self._burnin_index = len(self.lnlkl) else: burnin_index = np.where(self.lnlkl >= global_max_lnlkl - 3)[0] if len(burnin_index) > 0: self._burnin_index = burnin_index[0] else: self._burnin_index = 0 self._values = self._values[:, self._burnin_index:] self.weights = self.weights[self._burnin_index:] self.lnlkl = self.lnlkl[self._burnin_index:]
[docs] def values(self, discard: int = 0, thin: int = 1) -> np.ndarray: """ Get the values of the chain. Parameters ---------- discard : int, optional Number of initial points to discard, by default 0. thin : int, optional Thining factor (taking only one value every value of thin), by default 1. Returns ------- np.ndarray with dimension (number of parameters, length of chain) """ if discard > self._values.shape[-1]: discard = self._values.shape[-1] warnings.warn("All points in chain " + self._filename + " discarded (discard >= chain length)") return self._values[:, discard::thin]
[docs] def compute_stats(self) -> None: """ Compute the mean and standard deviation within the chain. Should be called after `remove_burnin()`. """ n = np.sum(self.weights) self._mean_value = np.sum(self._values * self.weights[None, :], axis=-1) / n self._var_value = (np.sum((self._values**2) * self.weights[None, :], axis=-1) - n * (self._mean_value)**2) / (n-1)
@property def markov_index(self): return self._markov_index @property def max_lnlkl(self): return self._max_lnlkl @property def burnin_index(self): return self._burnin_index @property def best_fit(self): return self._best_fit @property def mean_value(self): return self._mean_value @property def var_value(self): return self._var_value @property def length(self) -> int: """ Number of accepted steps not counting burnin and non markovian points. Returns ------- int """ return self._values.shape[1] @property def total_length(self) -> int: """ Total number of accepted steps Returns ------- int """ return self._total_length @property def total_n_steps(self) -> int: """ Total number of steps. Returns ------- int """ return self._total_n_steps @property def n_steps(self) -> int: """ Number of steps not counting burnin and non markovian points. Returns ------- int """ return np.sum(self.weights, dtype=int) @property def n_params(self): return self._n_params @property def acceptance_rate(self): return self.total_length/self.total_n_steps
[docs] class Samples(ABC): """ Class containing all chains of a MCMC analysis Parameters ---------- path : str Path to the chains. ids : list | np.ndarray | None, optional List of chains to take into accoung. If none all possible found chains are added. By default None. """ def __init__(self, path : str, ids: list[int] | np.ndarray | None = None) -> None: self._path = path self._ids = np.array(ids) if ids is not None else None
[docs] @abstractmethod def flat(self, discard: np.ndarray | None = None, thin: None | int = None, **kwargs) -> np.ndarray: pass
@property def path(self): return self._path @property def ids(self): return self._ids @property @abstractmethod def param_names(self) -> np.ndarray: pass @property @abstractmethod def scaling_factor(self) -> dict: pass
[docs] @dataclass class GaussianInfo: mean: np.ndarray | None = None cov: np.ndarray | None = None param_names: np.ndarray | list[str] | None = None
[docs] def compatible_with(self, other: Self) -> bool: all_params = set(list(to_CLASS_names(self.param_names)) + list(to_CLASS_names(other.param_names)) ) for param in all_params: if param in to_CLASS_names(self.param_names) and param in to_CLASS_names(other.param_names): index_1 = list(to_CLASS_names(self.param_names)).index(param) index_2 = list(to_CLASS_names(other.param_names)).index(param) if self.mean is not None and other.mean is not None: if self.mean[index_1] != other.mean[index_2]: return False return True
[docs] class GaussianSamples(Samples): """ Daughter class of Samples for gaussian generated chains. """ def __init__(self, gaussians: list[GaussianInfo | str] | GaussianInfo | str, add_tau: bool = False, params: list[str] | None = None, *, n: int = 200_000) -> None: super().__init__("", None) self._add_tau = add_tau # define a list of extra gaussian to add on top self._gaussians = gaussians if (not isinstance(self._gaussians, list)): self._gaussians = [self._gaussians] # if given string input read the data # and check for compatibility between mean values for ig in range(len(self._gaussians)): if isinstance(self._gaussians[ig], str): self._gaussians[ig] = self.load_data(self._gaussians[ig]) if not self._gaussians[0].compatible_with(self._gaussians[ig]): raise ValueError('fiducial parameters should agree') # parameters on that we will be contained # in the total covariance matrix self._params = params if self._params is None and self._gaussians is not None: self._params = self._gaussians[0].params self._params = to_CLASS_names(self._params) # get all the parameters in total all_params = [] for g in self._gaussians: for p in g.param_names: if p not in all_params: all_params.append(p) # build a big inverse covariance matrix # that contains all parameters in the input # covariance matrices all_n = len(all_params) all_inv_cov = np.zeros((all_n, all_n)) all_mean = np.zeros(all_n) for g in self._gaussians: this_params = list(to_CLASS_names(g.param_names)) indices = [all_params.index(param) for param in this_params] all_inv_cov[np.ix_(indices, indices)] += np.linalg.inv(g.cov) if g.mean is not None: all_mean[indices] = g.mean # select the part of the covariance matrix for the desired parameters all_cov = np.linalg.inv(all_inv_cov) self._indices = [all_params.index(param) for param in self._params] # construct a GaussianInfo object from the total cov and mean defined above self._gaussian = GaussianInfo(all_mean[self._indices], all_cov[np.ix_(self._indices, self._indices)], self._params) self._generated_data = np.random.multivariate_normal(all_mean, all_cov, n).T self._all_params = all_params # add tau_ion as a parameter if self._add_tau is True: self._params = np.array(['tau_reio'] + list(self._params))
[docs] def load_data(self, filename) -> GaussianInfo: gaussian = GaussianInfo() with open(filename, 'rb') as file: data = np.load(file) gaussian.cov = data.get('cov', None) gaussian.param_names = list(to_CLASS_names(data['params'])) if 'params' in data else None gaussian.mean = data.get('fiducial', None) if gaussian.mean is None: gaussian.mean = data.get('mean', None) return gaussian
[docs] def flat(self, discard: np.ndarray | None = None, thin: None | int = None, **kwargs) -> np.ndarray: if discard is None: discard = 0 if thin is None: thin = 1 flat_chain = self._generated_data[:, discard::thin] if self._add_tau is True: classifier = kwargs.get('classifier', None) regressor = kwargs.get('regressor', None) if regressor is None: regressor = Regressor.load() mask = np.full_like(flat_chain, fill_value=True, dtype=bool) for ip, param in enumerate(to_CLASS_names(regressor.parameters_name)): if param in self._all_params: # only take from index 1 as tau is the zeroth index = list(self._all_params).index(param) val_range = regressor.parameters_range mask[index, :] = flat_chain[index, :] < val_range[ip, 1] mask[index, :] = (flat_chain[index, :] > val_range[ip, 0]) & mask[index, :] valid_columns = np.all(mask, axis=0) flat_chain = flat_chain[:, valid_columns] tau = compute_tau(flat_chain, self._all_params, classifier, regressor) return np.vstack((tau[None, :], flat_chain[self._indices, :])) return flat_chain[self._indices, :]
@property def param_names(self) -> np.ndarray: return self._params @property def all_param_names(self) -> np.ndarray: return self._all_params @property def scaling_factor(self) -> dict: return {name: 1.0 for name in self.param_names} @property def gaussian(self) -> GaussianInfo: return self._gaussian
[docs] class EMCEESamples(Samples): """ Daughter class of Samples for emcee chains. """ def __init__(self, path : str, add_tau: bool = False) -> None: if EMCEE_IMPORTED is False: print("Cannot import emcee, therefore cannot work with emcee chains") return None super().__init__(path, None) self._add_tau = add_tau self.load_chains() # Check for convergence criterion self._has_converged = False self._autocorr = None try: self._autocorr = self._reader.get_autocorr_time() self._has_converged = True except emcee.autocorr.AutocorrError: self._autocorr = self._reader.get_autocorr_time(quiet = True) pass
[docs] def load_chains(self): self._reader = emcee.backends.HDFBackend(self.path) filename = os.path.join(*self.path.split('/')[:-1], *self.path.split('/')[-1].split('.')[:-1]) with open(filename + '.npz', 'rb') as file: data = np.load(file) self._parameters_theta = data.get('parameters_theta', None) self._parameters_xi = data.get('parameters_xi', None) self._xi = data.get('xi', None) if self._add_tau is False: self._param_names = np.array(list(self._parameters_theta) + list(self._parameters_xi)) else: self._param_names = np.array(['tau_reio'] + list(self._parameters_theta) + list(self._parameters_xi))
[docs] def flat(self, discard: np.ndarray | None = None, thin: None | int = None, **kwargs) -> np.ndarray: burnin_discard = int(np.max(2*self._autocorr)) if self._autocorr is not None else 0 if discard is None: discard = 0 if thin is None: thin = 1 flat_chain = self._reader.get_chain(discard=burnin_discard + discard, thin=thin, flat = True) flat_chain = np.hstack((flat_chain, np.tile(self._xi, (flat_chain.shape[0], 1)))).T if self._add_tau is True: classifier = kwargs.get('classifier', None) regressor = kwargs.get('regressor', None) tau = compute_tau(flat_chain, self.param_names, classifier, regressor) flat_chain = np.vstack((tau[None, :], flat_chain)) return flat_chain
[docs] def convergence(self): if self._has_converged is False: print("Not converged yet") print(self._autocorr)
@property def reader(self): return self._reader @property def autocorr(self) -> np.ndarray: return self._autocorr @property def param_names(self) -> np.ndarray: return self._param_names @property def scaling_factor(self) -> dict: return {name: 1.0 for name in self.param_names}
[docs] def save_sampling_parameters(filename: str, parameters_theta : list[str], parameters_xi: list[str], xi: np.ndarray): with open(filename.split('.')[0] + '.npz', 'wb') as file: np.savez(file, parameters_theta=to_CLASS_names(parameters_theta), parameters_xi=to_CLASS_names(parameters_xi), xi=xi)
[docs] def compute_tau(flat_chain: np.ndarray, param_names: list[str] | np.ndarray, classifier: Classifier | None = None, regressor: Regressor | None = None) -> np.ndarray: # get the ordered list of parameters if regressor is None: regressor = Regressor.load() if classifier is None: classifier = Classifier.load() r_parameters = to_CLASS_names(regressor.metadata.parameters_name) parameters = to_CLASS_names(copy(param_names)) if isinstance(parameters, list): parameters = np.array(parameters) if 'tau_reio' in parameters: parameters = parameters[parameters != 'tau_reio'] for param in parameters: if param == 'sum_mnu': param = 'm_nu1' # make a list of the given parameters that are needed for the interpolator # in the order they are data_for_param = [] for ip, param in enumerate(parameters): if param == 'sum_mnu': parameters[ip] = 'm_nu1' if param in r_parameters: data_for_param.append(param) # get the data sample data = np.empty((len(r_parameters), flat_chain.shape[-1])) # find the ordering in which data_sample is set in prepare_data_plot indices_to_plot = [list(parameters).index(param) for param in data_for_param] for ip, param in enumerate(r_parameters): # if we ran the MCMC over that parameter if param in parameters[indices_to_plot]: index = list(parameters[indices_to_plot]).index(param) data[ip, :] = flat_chain[index, :] else: data[ip, :] = DEFAULT_VALUES[to_21cmFAST_names([param])[0]] #return data tau = predict_tau_numpy(data.T, classifier, regressor) return tau
[docs] def compute_parameter(flat_chain: np.ndarray, param_names: list[str] | np.ndarray, classifier: Classifier | None = None, interpolator: Regressor | None = None, parameter: str | None = None) -> np.ndarray: # get the ordered list of parameters if interpolator is None: if interpolator is not None: interpolator = Interpolator.load("DefaultInterpolator_" + parameter) else: raise ValueError("Need to pass the parameter to interpolate to the predictor.") if classifier is None: classifier = Classifier.load() # parameters that the interpolator needs i_parameters = to_CLASS_names(interpolator.metadata.parameters_name) # parameters that are given in the sample parameters = to_CLASS_names(copy(param_names)) if isinstance(parameters, list): parameters = np.array(parameters) # if tau_reio is given in the parameters we remove it if 'tau_reio' in parameters: flat_chain = flat_chain[parameters != 'tau_reio'] parameters = parameters[parameters != 'tau_reio'] # make a list of the given parameters that are needed for the interpolator # in the order they are data_for_param = [] for ip, param in enumerate(parameters): if param == 'sum_mnu': parameters[ip] = 'm_nu1' if param in i_parameters: data_for_param.append(param) # get the data sample data = np.empty((len(i_parameters), flat_chain.shape[-1])) # find the ordering in which data_sample is set in prepare_data_plot indices_to_plot = [list(parameters).index(param) for param in data_for_param] for ip, param in enumerate(i_parameters): # if we ran the MCMC over that parameter if param in parameters[indices_to_plot]: index = list(parameters[indices_to_plot]).index(param) data[ip, :] = flat_chain[index, :] else: data[ip, :] = DEFAULT_VALUES[param] return predict_parameter_numpy(data.T, classifier, interpolator, parameter)
[docs] class MPSamples(Samples): """ Daughter class of Samples for MontePython chains. """ def __init__(self, path: str, ids: list[int] | np.ndarray | None = None): super().__init__(path, ids) self._chains : list[MPChain] = [] self.load_chains() self.load_paramnames() self._max_lnlkl = np.max([chain.max_lnlkl for chain in self._chains]) chain_max_lnlkl = np.argmax([chain.max_lnlkl for chain in self._chains]) self._best_fit = self._chains[chain_max_lnlkl].best_fit for chain in self._chains: chain.remove_burnin(self._max_lnlkl) ####################### # print some results max_markov_index = len(str(int(np.max([chain.markov_index for chain in self._chains])))) max_burnin_index = len(str(int(np.max([chain.burnin_index for chain in self._chains])))) for ic, chain in enumerate(self._chains): print(f'Chain {ic+1:<3} : Removed {chain.markov_index:<{max_markov_index}} non-markovian points, ' \ + f'{chain.burnin_index:<{max_burnin_index}} points of burn-in, keep ' + str(chain._values.shape[1]) \ + f' steps | (max_lnlkl = {chain.max_lnlkl:.2f}, acceptance_rate = {chain.acceptance_rate:.2f})' ) ####################### # compute some stats # define some global quantities (total number of steps and overall mean of the parameters) self._total_steps = np.sum(np.array([chain.n_steps for chain in self._chains]), dtype=int) self._total_mean = np.zeros(self.n_params) for chain in self._chains: if chain.length > 0: chain.compute_stats() self._total_mean = self._total_mean + chain.n_steps * chain.mean_value self._total_mean = self._total_mean / self._total_steps self.load_scaling_factor()
[docs] def load_chains(self) -> None: # look for chains in the folder chains_path = self.path + '_*.txt' self._chains_name = np.array(glob.glob(chains_path)) ids = np.array([int(name.split('.')[-2].split('_')[-1]) for name in self._chains_name], dtype=int) self._chains_name = self._chains_name[np.argsort(ids)] # raise an error if no chain is found if len(self._chains_name) == 0: raise ValueError("No chain found at " + chains_path) # redefine the chain name list from the given ids if self.ids is not None: self._chains_name = self._chains_name[self.ids] # define the number of chains self.n_chains = len(self._chains_name) # prepare an array for the non markovian chain self._markov = np.zeros(self.n_chains, dtype=int) # read all chains self._chains = [MPChain(filename) for filename in self._chains_name] self.n_params = self._chains[0].values().shape[0]
[docs] def load_paramnames(self): self._names = np.empty(0) self._latex_names = np.empty(0) with open(self.path + '.paramnames', 'r') as f: for line in f: ls = line.split('\t') self._names = np.append(self._names, ls[0][:-1]) self._latex_names = np.append(self._latex_names, r'${}$'.format(ls[1][1:-2]))
[docs] def load_scaling_factor(self) -> None: self._scaling_factor = {} with open(os.path.join(*[*(self._path.split('/')[:-1]), 'log.param']), 'r') as file: for line in file: if (not line.strip().startswith('#')) and ('data.parameters' in line): name = line.split('[')[1].strip("[] ='") value = float(line.split('[')[2].strip("[] ='\n").split(',')[-2].strip("[] ='")) self._scaling_factor[name] = value
[docs] def flat(self, discard: np.ndarray | None = None, thin: None | int = None, **kwargs) -> np.ndarray: """ Flatten samples of all chains. Parameters ---------- discard : np.ndarray | None, optional Number of points to discard at begining of the chain, by default None (0). thin : None | int, optional Reduce the size of the sample by taking one point every `thin`, by default None. If Nont compute the reduced size such that the total length of the sample is 10000. Returns ------- np.ndarray Sample in a 2 dimensional array of shape (# of parameters, # of points) """ if isinstance(discard, int): discard = np.full(self.n_chains, discard) if discard is None: discard = np.zeros(self.n_chains, dtype=int) if thin is None: m_total_length = 0 for ichain, chain in enumerate(self.chains): m_total_length = m_total_length + chain.values(discard = discard[ichain], thin=1).shape[1] if m_total_length > 1e+4: thin = int(m_total_length/10000) else: thin = 1 res = np.empty((self.n_params, 0)) for ichain, chain in enumerate(self.chains): res = np.concatenate((res, chain.values(discard = discard[ichain], thin=thin)), axis=-1) return res
[docs] def convergence(self) -> np.ndarray: """ Gelman-Rubin criterion weighted by the length of the chain as implemented in MontePython. Returns ------- np.ndarray R-1 for all parameters """ within = np.zeros(self.n_params) between = np.zeros(self.n_params) for chain in self.chains : within = within + chain.n_steps * chain.var_value between = between + chain.n_steps * (chain.mean_value - self.total_mean)**2 within = within / self.total_steps between = between / (self.total_steps-1) return between/within
[docs] def covmat(self, discard: np.ndarray | None = None, params_in_cov : list[str] | None = None) -> np.ndarray: """ Covariance matrix. Parameters ---------- discard : np.ndarray | None, optional Number of points to discard at begining of the chain, by default None (0). data_to_cov : list[str] | None, optional List of parameters to put in the covariance matrix (in the order of that list). If None consider all parameters available. Returns ------- np.ndarray Covariance matric (n, n) array """ points = prepare_data_plot(self, params_in_cov, discard, thin=1) return np.cov(points)
[docs] def print_best_fit(self, discard: np.ndarray | None = None, **kwargs): samples_flat = self.flat(discard=discard, thin=1, **kwargs) med = np.median(samples_flat, axis=1) mean = np.mean(samples_flat, axis=1) nc = np.zeros(len(self.param_names), dtype=int) for ip, param in enumerate(self.param_names): nc[ip] = len(param) max_nc = np.max(nc) for ip, param in enumerate(self.param_names): fill = " " * (max_nc - nc[ip]) print(param + fill + ' : \t' + str(self.best_fit[ip]) + " | " + str(med[ip]) + " | " + str(mean[ip]))
@property def chains(self): return self._chains @property def max_lnlkl(self): return self._max_lnlkl @property def best_fit(self): return self._best_fit @property def param_names(self): return self._names @property def param_names_latex(self): return self._latex_names @property def scaling_factor(self): return self._scaling_factor @property def total_steps(self): return self._total_steps @property def total_mean(self): return self._total_mean
[docs] def prepare_data_plot(samples: Samples, data_to_plot, discard = 0, thin = 1, **kwargs): data_to_plot = to_CLASS_names(data_to_plot) # give the transformation rules between the parameter if there is to be one def transform_data(data, name, param_names): if name == 'sum_mnu': if 'm_nu1' in param_names: index_mnu = data_to_plot.index('sum_mnu') data[index_mnu] = np.sum(neutrino_masses(data[index_mnu], hierarchy='NORMAL'), axis = 0) # change also the param_names param_names[index_mnu] = name else: raise ValueError('Impossible to obtain ' + name + ' from the input data sample') return param_names data = samples.flat(discard=discard, thin=thin, **kwargs) param_names = copy(samples.param_names) # rescaling the data according to the scaling factor for iname, name in enumerate(samples.param_names): if (samples.scaling_factor[name] != 1) and (name != '1/m_wdm' and name != 'm_wdm'): # note that we keep the warm dark matter transformed # just means that we need to be carefull with the units data[iname] = samples.scaling_factor[name] * data[iname] # first transform the data if necessary if 'sum_mnu' in data_to_plot: param_names = transform_data(data, 'sum_mnu', param_names) # reduce the data to the indices to plot indices_to_plot = [np.where(param_names == param)[0][0] for param in data_to_plot if param in param_names] data = data[indices_to_plot] # remove outliers for tau_reio if 'tau_reio' in data_to_plot: index_tau_reio = data_to_plot.index('tau_reio') mask = data[index_tau_reio] < 0.1 n_outliers = np.count_nonzero(~mask) if n_outliers > 0 : warnings.warn(f'Removing {n_outliers} outlier points for tau_reio: ' + str(data[index_tau_reio, ~mask])) data = data[:, mask] return data
################################################# ## PLOTTING TOOLS MATPLOTLIB_IMPORTED = False try: import matplotlib.pyplot as plt import matplotlib.colors as mpc from dataclasses import dataclass MATPLOTLIB_IMPORTED = True
[docs] class AxesGrid: def __init__(self, n: int, *, scale: float = 2.0, wspace: float = 0.05, hspace: float = 0.05, labels: list[str] | None = None, names: list[str] | None = None, **kwargs): # close all pre-existing figures and disable interactive mode plt.close('all') # define a figure object and the grid spec self._fig = plt.figure(figsize=(scale*n, scale*n), constrained_layout=False) self._spec = self._fig.add_gridspec(ncols=n, nrows=n, wspace=wspace, hspace=hspace) # initialise the length self._size: int = n # initialise an empty list self._axs: list[plt.Axes] = [None for i in range(int(n*(n+1)/2))] # define the edges of the axes self._edges: np.ndarray = np.full((n, 2), fill_value=None) # define the labels of the axes self._labels: list[str] = [r'${{{}}}_{}$'.format(r'\theta', i) for i in range(self.size)] if labels is None else labels # define the name of the parameter attached to each axis self._names: list[str] = self._labels if names is None else names # define the titles of the axes (showing mean and quantiles if asked) self._titles: list[list[str]] = [[] for i in range(self.size)] # define the text objects holding the title self._titles_text : list[list[plt.Text | None]] = [[] for i in range(self.size)] # define the axes on the grid for i in range(n): for j in range(i+1): k = self.index_1D(i, j) self._axs[k] = self._fig.add_subplot(self._spec[i, j]) if i < n-1: self._axs[k].xaxis.set_ticklabels([]) if j > 0: self._axs[k].yaxis.set_ticklabels([]) self._axs[0].yaxis.set_ticklabels([]) # define default font and rotation of ticks self._fontsize: float = self._axs[0].xaxis.label.get_size() self._ticks_rotation: float = 50 self._titles_color: list[str] = [[] for i in range(self.size)] self._default_color: str = 'blue' self.update_labels(**kwargs)
[docs] def get(self, i: int | str, j: int | str): i, j = (self.index_from_name(k) if isinstance(k, str) else k for k in [i, j]) return self._axs[self.index_1D(i, j)]
[docs] def index_1D(self, i: int, j: int): if j > i: raise ValueError("j must be less or equal than i") return int(i*(i+1)/2 + j)
[docs] def indices_2D(self, k): i = np.arange(0, k+2, dtype=int) ind_triangle = i*(i+1)/2 row = np.searchsorted(ind_triangle, k, side='right')-1 column = int(k - ind_triangle[row]) return row, column
# show the plot
[docs] def show(self): self._fig.show()
# change one label name
[docs] def set_label(self, i:int, name:str): self._labels[i] = name if i > 0: self.get(i, 0).set_ylabel(name) self.get(self.size-1, i).set_xlabel(name)
# update all the label properties
[docs] def update_labels(self, **kwargs): self._labels = kwargs.get('labels', self._labels) self._fontsize = kwargs.get('fontsize', self._fontsize) self._ticks_rotation = kwargs.get('ticks_rotation', self._ticks_rotation) for i in range(1, self.size): k = self.index_1D(i, 0) self._axs[k].set_ylabel(self._labels[i], fontsize=self._fontsize) self._axs[k].tick_params(axis='y', labelsize=self._fontsize-2) for tick in self._axs[k].get_yticklabels(): tick.set_rotation(self._ticks_rotation) for j in range(self.size): k = self.index_1D(self.size-1, j) self._axs[k].set_xlabel(self._labels[j], fontsize=self._fontsize) self._axs[k].tick_params(axis='x', labelsize=self._fontsize-2) for tick in self._axs[k].get_xticklabels(): tick.set_rotation(self._ticks_rotation)
#update the edges
[docs] def update_edges(self, axis: int | str, min: float, max: float): j = self.index_from_name(axis) if isinstance(axis, str) else axis self._edges[j, :] = np.array([min, max]) for i in range(j, self.size): self.get(i, j).set_xlim([self._edges[j, 0], self._edges[j, -1]]) for k in range(0, j): self.get(j, k).set_ylim([self._edges[j, 0], self._edges[j, -1]])
# update the titles properties
[docs] def add_title(self, axis: int, new_titles: str, color: str | None = None): self._titles_color[axis].append(color if color is not None else self._default_color) self._titles[axis].append(new_titles) self._titles_text[axis].append(None)
[docs] def update_titles(self, height = 1.05, spacing = 1.9, fontsize = None): if fontsize is None: fontsize = self._fontsize for j in range(self.size): k = self.index_1D(j, j) for it, title in enumerate(self._titles[j]): total_height = height if it == 0 else height+it*spacing*1e-2*fontsize if self._titles_text[j][it] is None: self._titles_text[j][it] = self._axs[k].text(0.5, total_height, title, fontsize=fontsize, color = self._titles_color[j][it], ha='center', transform=self._axs[k].transAxes) else: self._titles_text[j][it].set_position((0.5, total_height)) self._titles_text[j][it].set_text(title) self._titles_text[j][it].set_fontsize(fontsize)
[docs] def index_from_name(self, name: str | list[str]): if isinstance(name, str): return self.names.index(name) else: return [self.names.index(na) for na in name]
# List of properties @property def fig(self): return self._fig @property def spec(self): return self._spec @property def size(self): return self._size @property def edges(self): return self._edges @edges.setter def edges(self, value: np.ndarray): self._edges = value @property def labels(self): return self._labels @labels.setter def labels(self, value: list[str]): self._labels = value self.update_labels() @property def names(self): return self._names
[docs] @dataclass class ProcessedData: hists_1D : np.ndarray | None = None hists_2D : np.ndarray | None = None edges : np.ndarray | None = None centers : np.ndarray | None = None levels : np.ndarray | None = None q : np.ndarray | None = None mean : np.ndarray | None = None median : np.ndarray | None = None bestfit : np.ndarray | None = None quantiles : np.ndarray | None = None samples : np.ndarray | None = None size : int | None = None
[docs] def compute_quantiles(sample: np.ndarray, q: float, bins: int | np.ndarray = 30) -> tuple[np.ndarray, np.ndarray]: """ Give the q-th quantile of the input sample. Parameters ---------- sample : np.ndarray 1D array of data points. q : float Quantile value. bins : int | np.ndarray, optional Binning of the histogram that is used for a first approximation of the quantile edges, by default 30. Returns ------- tuple[np.ndarray, np.ndarray] min,max bounds """ def split_in_groups(lst): res = [[int(lst[0])]] i = 0 for il, element in enumerate(lst): element = int(element) if il > 0: if element - lst[il-1] == 1: res[i].append(element) else: res.append([element]) i = i+1 return res # define the histogram from the sample hist, edges = np.histogram(sample, bins=bins, density=True) hist_count, _ = np.histogram(sample, bins=bins) # normalise the histogram to 1 hist = hist/np.max(hist) # define the total sum of the histogram s_hist = np.sum(hist) # order the histogram values ordered_hist = np.sort(hist)[::-1] order_of_hist = np.argsort(hist)[::-1] # compute the cumulative sum and get all indices that # contribute to the quantile sum_hist = np.cumsum(ordered_hist)/s_hist indices = np.arange(0, len(hist), dtype=int)[sum_hist < q] # the first bin is already high enough we return its boundaries if len(indices) == 0: e_min = np.empty((1, 2)) e_max = np.empty((1, 2)) index_one = order_of_hist[0] e_min[0, 0] = edges[:-1][index_one] e_max[0, 0] = edges[:-1][index_one] e_min[0, 1] = edges[1:][index_one] e_max[0, 1] = edges[1:][index_one] ilg = 0 total_count_small = 0 x_min = np.zeros(1) x_max = np.zeros(1) m = np.median(sample) else: # add an extra index to be sure we capture all the good bins final_index = int(indices[-1]+1) # if we have same size hist values where sum_hist = q then # we need to consider all possible bins that can contribute # we put all these indices into a table multiple_indices = [final_index] while final_index < len(ordered_hist)-1 and (ordered_hist[final_index] == ordered_hist[final_index+1]): multiple_indices.append(final_index+1) final_index = final_index+1 # chack for each last index what is the one that minimize the number of groups len_groups = [] for index in multiple_indices: bin_indices = np.sort(order_of_hist[np.concatenate((indices, np.array([index])))]) # all bins contributing len_groups.append(len(split_in_groups(bin_indices))) index_min_groups = np.argmin(len_groups) bin_indices = np.sort(order_of_hist[np.concatenate((indices, np.array([multiple_indices[index_min_groups]])))]) groups = split_in_groups(bin_indices) # here size is different from length, # size refers to the 'physical' size size_groups = [] count_groups = [] sample_groups = [] # get what is inside the groups # approximate values of the other groups e_min = np.empty((len(groups), 2)) e_max = np.empty((len(groups), 2)) x_min = np.empty(len(groups)) x_max = np.empty(len(groups)) for ig, group in enumerate(groups): index_min, index_max = np.min(group), np.max(group) e_max[ig, 0], e_max[ig, 1] = edges[:-1][index_max], edges[1:][index_max] e_min[ig, 0], e_min[ig, 1] = edges[:-1][index_min], edges[1:][index_min] size_groups.append(float(e_max[ig, 1]-e_min[ig, 0])) count_groups.append(int(np.sum(hist_count[group]))) mask = (sample > e_min[ig, 0]) & (sample < e_max[ig, 1]) sample_groups.append(sample[mask]) x_min[ig] = e_min[ig, 0] x_max[ig] = e_max[ig, 1] # adjust the exact value for the largest group # and keep only the bins of the smaller groups ilg = np.argmax(size_groups) total_count_small = np.sum(count_groups) - count_groups[ilg] m = np.median(sample_groups[ilg]) if len(groups) > 0 else np.median(sample) # define the grid for the finner search x_m = np.linspace(e_min[ilg, 0], e_min[ilg, 1], 30) x_p = np.linspace(e_max[ilg, 0], e_max[ilg, 1], 30) # for each value of x_m find the value of x_p such that quantile is q mask = (sample >= x_m[:, None, None]) & (sample <= x_p[None, :, None]) count = (np.count_nonzero(mask, axis=-1) + total_count_small)/len(sample) m_x_p = x_p[np.argmin(np.abs(count - q), axis=1)] if count[0, -2] < q and count[1, -1] < q and count[1, -2] < q: x_min[ilg], x_max[ilg] = x_m[0], x_p[-1] return x_min, x_max # remove the values where x_p is just equal to the max # at the very edege of the grid mask_xp = np.argmin(m_x_p != x_p[-1]) # if we have removed all points then problem # we actually do not mask, it is just that we # cannot really find the q contour with our # resolution. Return the max value if mask_xp == 0: x_min[ilg] = x_m[-1] x_max[ilg] = x_p[-1] else: xm = x_m[:mask_xp+1] xp = m_x_p[:mask_xp+1] index = np.argmin(np.sqrt((2*m - xm - xp)**2)) x_min[ilg], x_max[ilg] = xm[index], xp[index] return x_min, x_max
[docs] def generate_contours(samples: np.ndarray, bins: int = 20, q = [0.68, 0.95], smooth_2D: bool = False, smooth_1D: bool = False, sigma_smooth: float = 1.5,) -> ProcessedData: data = ProcessedData() n = samples.shape[0] hists_1D = np.empty((n, bins)) hists_2D = np.empty((n, n, bins, bins)) # 2D array of 2D array levels = np.empty((n, n, len(q)+1)) # 2D array of 1D array quantiles = np.full((len(q), n, 2), fill_value=np.nan) q = np.array(q) if np.any(q != sorted(q)): raise ValueError('levels should be given in ascending order') # check that the input levels are in ascending order # edges and centers of the histograms edges = np.vstack([np.linspace(np.min(s), np.max(s), bins+1) for s in samples]) centers = (edges[:, :-1] + edges[:, 1:]) / 2 mean = np.mean(samples, axis=-1) median = np.median(samples, axis=-1) # loop over all places with 1D histograms for i in range(n): hists_1D[i, :], _ = np.histogram(samples[i, :], bins=edges[i, :], density=True) hists_1D[i] = gaussian_filter1d(hists_1D[i], sigma=sigma_smooth) if smooth_1D is True else hists_1D[i] hists_1D[i] = hists_1D[i] / np.max(hists_1D[i]) # evaluate the quantiles for il, q_val in enumerate(q): try: x_min, x_max = compute_quantiles(samples[i, :], q_val, bins=bins) except: warnings.warn(f"Impossible to compute quantiles for entry {i}") if len(x_min) > 1 or len(x_max) > 1: warnings.warn(f"Quantiles not given for what appears to be a multimodal distribution {i}") else: try: quantiles[il, i, 0], quantiles[il, i, 1] = x_min[0], x_max[0] except Exception as e: print(quantiles[il, i, 0], quantiles[il, i, 1], x_min, x_max) raise e # loop over all places with 2D histograms for i in range(1, n): for j in range(i): hists_2D[i, j, :, :], _, _ = np.histogram2d(samples[j, :], samples[i, :], bins=[edges[j, :], edges[i, :]], density=True) hists_2D[i, j] = gaussian_filter(hists_2D[i, j], sigma=sigma_smooth) if smooth_2D is True else hists_2D[i, j] # Flatten the histogram to sort for cumulative density sorted_hist = np.sort(hists_2D[i, j].ravel())[::-1] # Sort in descending order # Compute cumulative density cumulative = np.cumsum(sorted_hist) / np.sum(sorted_hist) # Find threshold levels for the desired confidence intervals levels[i, j, :-1] = sorted_hist[np.searchsorted(cumulative, q[::-1])] levels[i, j, -1] = 1.1*np.max(sorted_hist) data.hists_2D = hists_2D data.hists_1D = hists_1D data.samples = samples data.edges = edges data.centers = centers data.levels = levels data.mean = mean data.median = median data.quantiles = quantiles data.size = hists_1D.shape[0] data.q = q return data
[docs] def plot_data(grid: AxesGrid, data : ProcessedData, show_hist: bool = False, show_surface: bool = True, show_contour: bool = False, show_quantiles: list[bool] = [False, False], show_mean: bool = False, show_title: bool = True, show_points: bool = False, redefine_edges: bool = True, q_in_title: int = 0.68, colors: list[str] = 'orange', axes : list[int] | np.ndarray | None = None, exclude_quantiles : int | str | list[int] | list[str] = [], exclude_mean : int | str | list[int] | list[str] = [], exclude_title : int | str | list[int] | list[str] = [], alphas: list[float] = 1.0): alphas, colors, exclude_quantiles, exclude_mean, exclude_title = ([array] if isinstance(array, float) else array for array in [alphas, colors, exclude_quantiles, exclude_mean, exclude_title]) exclude_quantiles, exclude_mean, exclude_title = (grid.index_from_name(exclude) if (len(exclude) > 0 and isinstance(exclude[0], str)) else exclude for exclude in [exclude_quantiles, exclude_mean, exclude_title]) if axes is None: axes = np.arange(0, data.size) # first define the colors we will need to use contour_colors = [mpc.to_rgba(color, alphas[ic]) if isinstance(color, str) else color for ic, color in enumerate(colors)] # if we provide one color and we ask for more levels then # we define new colors automatically colors if len(contour_colors) == 1: pastelness = np.array([0.7]) if len(data.levels[0, 0]) == 3 else np.linspace(0.5, 0.8, len(data.levels[0, 0])-2) pastelness = pastelness[:, None] * np.ones((1, 4)) pastelness[:, -1] = 0 # add custom pastel colors to the stack of colors contour_colors = np.vstack(((1.0 - pastelness) * np.array(contour_colors) + pastelness, contour_colors)) # plot the contours and points for i in range(1, data.size): for j in range(i): hist = data.hists_2D[i, j] if show_points is True: # first thin the samples so that we plot only 5000 points n = data.samples.shape[-1] r = np.max([1, int(n/5000)]) grid.get(axes[i], axes[j]).scatter(data.samples[j, ::r], data.samples[i, ::r], marker='o', edgecolors='none', color = contour_colors[-1], s=2, alpha=0.5) if show_hist is True: extent = [data.edges[j, 0], data.edges[j, -1], data.edges[i, 0], data.edges[i, -1]] grid.get(axes[i], axes[j]).imshow(hist.T, origin='lower', extent=extent, cmap='Greys', aspect='auto') if show_surface is True: try: grid.get(axes[i], axes[j]).contourf(*np.meshgrid(data.centers[j], data.centers[i]), hist.T, levels=data.levels[i, j], colors=contour_colors) except ValueError as e: print("Error for axis : ", i, j) raise e if show_contour is True: grid.get(axes[i], axes[j]).contour(*np.meshgrid(data.centers[j], data.centers[i]), hist.T, levels=data.levels[i, j], colors=contour_colors) # fill in the 1D histograms for i in range(0, data.size): grid.get(axes[i], axes[i]).stairs(data.hists_1D[i], edges=data.edges[i, :], color = colors[0]) if (show_mean is True) and (i not in exclude_mean): grid.get(axes[i], axes[i]).axvline(data.mean[i], color=contour_colors[0], linewidth=0.5, linestyle='--') for iq, quantile in enumerate(data.quantiles[::-1]): if (show_quantiles[iq] is True) and (i not in exclude_quantiles): # fill the histogram in terms of the first quantile mask_edges = (data.edges[i, :] <= quantile[i, 1]) & (data.edges[i, :] >= quantile[i, 0]) # corresponding mask for the histogram values mask_hist = (mask_edges[:-1]*mask_edges[1:] == 1) if len(data.hists_1D[i, mask_hist]) > 0: # plot the quantiles with a shaded histogram grid.get(axes[i], axes[i]).stairs(data.hists_1D[i, mask_hist], edges=data.edges[i, mask_edges], color = contour_colors[iq], fill=True, alpha=0.5) title_color = contour_colors[1] title_color[-1] = 1.0 # get the number of quantiles if q_in_title in data.q: jq = np.where(q_in_title == data.q)[0][0] else: raise ValueError('quantile in title should be a q value given in generate_contour') if (show_title is True) and (i not in exclude_title): if not np.isnan(data.quantiles[jq, i, 0]) and not np.isnan(data.quantiles[jq, i, 1]) : grid.add_title(axes[i], r'${:.3g}$'.format(data.median[i], color=colors[0]) + '$^{{ +{:.2g} }}_{{ -{:.2g} }}$'.format(data.quantiles[jq, i, 1] - data.median[i], data.median[i] - data.quantiles[jq, i, 0] ), color=title_color) else: grid.add_title(axes[i], r'${:.3g}$'.format(data.median[i], color=colors[0]), color=title_color) grid.update_titles() # (re)define the grid edges if redefine_edges is True: new_boundaries = data.edges[:, [0, -1]] old_boundaries = grid.edges[axes] old_boundaries[old_boundaries[:, 0] == None] = data.edges[old_boundaries[:, 0] == None, :][:, [0, -1]] new_min = np.minimum(new_boundaries[:, 0], old_boundaries[:, 0]) new_max = np.maximum(new_boundaries[:, 1], old_boundaries[:, 1]) new_edges = np.vstack((new_min, new_max)).T grid.edges[axes, :] = new_edges for j in range(0, data.size): for i in range(j, data.size): grid.get(axes[i], axes[j]).set_xlim([grid.edges[axes[j], 0], grid.edges[axes[j], -1]]) if i > j: grid.get(axes[i], axes[j]).set_ylim([grid.edges[axes[i], 0], grid.edges[axes[i], -1]])
[docs] def plot_2D_marginal(ax: plt.Axes, data : ProcessedData, i: int, j: int, show_hist: bool = False, show_surface: bool = True, show_contour: bool = False, show_points: bool = False, colors: list[str] = 'orange', alphas: list[float] = 1.0): if i < j: i, j = j, i hist = data.hists_2D[i, j].T alphas, colors = ([array] if isinstance(array, float) else array for array in [alphas, colors]) # first define the colors we will need to use contour_colors = [mpc.to_rgba(color, alphas[ic]) if isinstance(color, str) else color for ic, color in enumerate(colors)] # if we provide one color and we ask for more levels then # we define new colors automatically colors if len(contour_colors) == 1: pastelness = np.array([0.7]) if len(data.levels[0, 0]) == 3 else np.linspace(0.5, 0.8, len(data.levels[0, 0])-2) pastelness = pastelness[:, None] * np.ones((1, 4)) pastelness[:, -1] = 0 # add custom pastel colors to the stack of colors contour_colors = np.vstack(((1.0 - pastelness) * np.array(contour_colors) + pastelness, contour_colors)) if show_points is True: # first thin the samples so that we plot only 5000 points n = data.samples.shape[-1] r = np.max([1, int(n/10000)]) ax.scatter(data.samples[j, ::r], data.samples[i, ::r], marker='o', edgecolors='none', color = contour_colors[-1], s=2, alpha=0.5) if show_hist is True: extent = [data.edges[j, 0], data.edges[j, -1], data.edges[i, 0], data.edges[i, -1]] ax.imshow(hist, origin='lower', extent=extent, cmap='Greys', aspect='auto') if show_surface is True: try: ax.contourf(*np.meshgrid(data.centers[j], data.centers[i]), hist, levels=data.levels[i, j], colors=contour_colors) except ValueError as e: print("Error for axis : ", i, j) raise e if show_contour is True: ax.contour(*np.meshgrid(data.centers[j], data.centers[i]), hist, levels=data.levels[i, j], colors=contour_colors)
[docs] def prepare_data_Xe(samples: Samples, data_to_plot: list[str] | np.ndarray, discard: int = 0, thin: int = 100, *, classifier: Classifier | None = None, regressor: Regressor | None = None,): data_for_Xe = [] data_to_plot = to_CLASS_names(data_to_plot) # get the ordered list of parameters if regressor is None: regressor = Regressor.load() if classifier is None: classifier = Classifier.load() parameters = regressor.metadata.parameters_name for param in data_to_plot: if param == 'sum_mnu': param = 'm_nu1' if (param in MP_KEY_CORRESPONDANCE) and (MP_KEY_CORRESPONDANCE[param] in parameters): data_for_Xe.append(param) labels_correspondance = {value : key for key, value in MP_KEY_CORRESPONDANCE.items()} # get the data sample data_sample = prepare_data_plot(samples, data_for_Xe, discard=discard, thin=thin, regressor = regressor, classifier = classifier) data = np.empty((len(parameters), data_sample.shape[-1])) # find the ordering in which data_sample is set in prepare_data_plot indices_to_plot = [np.where(samples.param_names == param)[0][0] for param in data_for_Xe if param in samples.param_names] for ip, param in enumerate(parameters): # if we ran the MCMC over that parameter if labels_correspondance[param] in samples.param_names[indices_to_plot]: index = list(samples.param_names[indices_to_plot]).index(labels_correspondance[param]) data[ip] = data_sample[index, :] else: data[ip, :] = DEFAULT_VALUES[param] return data
[docs] def get_Xe_stats(samples: Samples, data_to_plot: list[str] | np.ndarray, nbins: int = 100, discard: int = 0, thin: int = 100, *, classifier: Classifier | None = None, regressor: Regressor | None = None, smooth: bool = False, sigma_smooth: float = 1.5, **kwargs): data = prepare_data_Xe(samples, data_to_plot, discard, thin, classifier=classifier, regressor=regressor).T for kw, val in kwargs.items(): if kw in regressor.parameters_name: ikw = list(regressor.parameters_name).index(kw) data[:, ikw] = val else: raise ValueError("Need to pass a kwarg that is in the parameter list of the classifier / regressor") Xe = predict_Xe_numpy(theta=data, classifier=classifier, regressor=regressor) # here remove some outliers that should not have # passed the likelihood condition if np.count_nonzero(Xe[:, -1]==-1)/len(Xe) > 0.01: warnings.warn("More than 1 percent of outliers with late reionization") Xe = Xe[Xe[:, -1] > 0] z = regressor.metadata.z mean, med = np.mean(Xe, axis=0), np.median(Xe, axis=0) min, max = np.min(Xe), np.max(Xe) log_bins = np.linspace(np.log10(min), np.log10(max), nbins) log_hist = np.zeros((len(z), len(log_bins)-1)) lin_bins = np.linspace(min, max, nbins) lin_hist = np.zeros((len(z), len(lin_bins)-1)) # make an histogram for each value of z for iz, x in enumerate(Xe.T): log_hist[iz], _ = np.histogram(np.log10(x), bins=log_bins, density = True) log_hist[iz] = gaussian_filter1d(log_hist[iz], sigma=sigma_smooth) if smooth is True else log_hist[iz] log_hist[iz] = log_hist[iz]/np.max(log_hist[iz]) lin_hist[iz], _ = np.histogram(x, bins=lin_bins, density = True) lin_hist[iz] = gaussian_filter1d(lin_hist[iz], sigma=sigma_smooth) if smooth is True else lin_hist[iz] lin_hist[iz] = lin_hist[iz]/np.max(lin_hist[iz]) log_bins = 10**((log_bins[:-1] + log_bins[1:])/2.0) lin_bins = (lin_bins[:-1] + lin_bins[1:])/2.0 return z, mean, med, log_hist, log_bins, lin_hist, lin_bins
[docs] def get_Xe_tanh_stats(samples: Samples, nbins: int = 100, discard: int = 0, thin : int = 100, x_inf: float = 2e-4, *, smooth: bool = False, sigma_smooth: float = 1.5, **kwargs): def Xe_class(z, z_reio = 8.0): return 0.5 * ( 1+np.tanh( (1+z_reio)/(1.5*0.5)*(1-((1+z)/(1+z_reio))**(1.5)) ) ) index = list(samples.param_names).index('z_reio') z_reio = samples.flat(discard=discard, thin=thin, **kwargs)[index, :] z = np.linspace(0, 35, 100) Xe = Xe_class(z, z_reio[:, None]) + x_inf mean, med = np.mean(Xe, axis=0), np.median(Xe, axis=0) min, max = np.min(Xe), np.max(Xe) log_bins = np.linspace(np.log10(min), np.log10(max), nbins) log_hist = np.zeros((len(z), len(log_bins)-1)) lin_bins = np.linspace(min, max, nbins) lin_hist = np.zeros((len(z), len(lin_bins)-1)) # make an histogram for each value of z for iz, x in enumerate(Xe.T): log_hist[iz], _ = np.histogram(np.log10(x), bins=log_bins, density = True) log_hist[iz] = gaussian_filter1d(log_hist[iz], sigma=sigma_smooth) if smooth is True else log_hist[iz] log_hist[iz] = log_hist[iz]/np.max(log_hist[iz]) lin_hist[iz], _ = np.histogram(x, bins=lin_bins, density = True) lin_hist[iz] = gaussian_filter1d(lin_hist[iz], sigma=sigma_smooth) if smooth is True else lin_hist[iz] lin_hist[iz] = lin_hist[iz]/np.max(lin_hist[iz]) log_bins = 10**((log_bins[:-1] + log_bins[1:])/2.0) lin_bins = (lin_bins[:-1] + lin_bins[1:])/2.0 return z, mean, med, log_hist, log_bins, lin_hist, lin_bins
except: pass