##################################################################################
# This file is part of NNERO.
#
# Copyright (c) 2024, Gaétan Facchinetti
#
# NNERO is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or any
# later version. NNERO is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU
# General Public License along with NNERO.
# If not, see <https://www.gnu.org/licenses/>.
#
##################################################################################
import numpy as np
import torch
import torch.nn as nn
from .data import TorchDataset, DataSet, uniform_to_true
from .network import NeuralNetwork
from .cosmology import optical_depth_no_rad
import os
import pkg_resources
DATA_PATH = pkg_resources.resource_filename('nnero', 'nn_data/')
[docs]
class Regressor(NeuralNetwork):
"""
Daughter class of :py:class:`NeuralNetwork` specialised for regressors.
Parameters
----------
model: torch.nn.Module | None
If not None, the model that will be used for classifier.
Otherwise, a new model is constructed from `n_input`, `n_hidden_features`
and `n_hidden_layers`. Default is None.
n_input: int, optional
Number of input on the neural network
(corresponds to the number of parameters).
Default is 16.
n_input: int, optional
Number of output on the neural network
(corresponds to the number of redshift bins).
Default is 50. Overriden if `dataset` is specified.
n_hidden_features: int, optional
Number of hidden features per layer. Default is 80.
n_hidden_layers: int, optional
Number of layers. Default is 5.
name: str | None
Name of the neural network. If None, automatically set to DefaultClassifier.
Default is None.
dataset: Dataset | None
Dataset on which the model will be trained.
If provided, gets `n_input` and `n_output` from the data and overrides the user input value.
use_pca: bool, optional
If `True`, decompose the interpolated function on the principal component eigenbasis.
Default is True.
pca_precision: float, optional
If `use_pca` is `True` sets how many eigenvalues needs to be considered.
Only consider eigenvectors with eigenvalues > precision^2 * max(eigenvalues).
Default is 1e-3.
alpha_tau: float, optioal
Weighting of the relative error on X_e and optical depth in the cost function.
Default is 0.5
Attributes
----------
- name : str
the name of the model
"""
def __init__(self,
*,
n_input: int = 16,
n_output: int = 50,
n_hidden_features: int = 80,
n_hidden_layers: int = 5,
alpha_tau : float = 0.5,
model = None,
name: str | None = None,
use_pca: bool = True,
pca_precision: float = 1e-3,
dataset: DataSet | None = None):
if name is None:
name = "DefaultRegressor"
# save as attribute wether or not we want to work
# in the principal component eigenbasis
self._use_pca = use_pca
self._pca_precision = pca_precision
# if we provide a dataset with pca_precision we override
# the number of output to correspond to the number of
# usefull eigenvectors otherwise it is set to the number
# of redshift bins used
# if no dataset is given, need to know in advance how
# many usefull eigenvectors there are and pass is as n_output
# ----
# this also initialises the pca vectors in the metadata
# attribute of the dataset object
# ----
# when loading from a file we do not provide a dataset
# and therefore do not redo the initialisation of the pca,
# pca eigenvectors and all are read from the saved metadata
if dataset is not None:
n_input = len(dataset.metadata.parameters_name)
if use_pca is True:
n_output = dataset.init_principal_components(self.pca_precision)
else:
n_output = len(dataset.metadata.z)
# give a default empty array for the structure
# stays None if a complex model is passed as input
struct = np.empty(0)
# definition of the model if none is given as input
if model is None:
hidden_layers = []
for _ in range(n_hidden_layers):
hidden_layers.append(nn.Linear(n_hidden_features, n_hidden_features))
hidden_layers.append(nn.ReLU())
# create a sequential model
model = nn.Sequential(nn.Linear(n_input, n_hidden_features), *hidden_layers, nn.Linear(n_hidden_features, n_output))
# save the structure of this sequential model and more
struct = np.array([n_input, n_output, n_hidden_features, n_hidden_layers, alpha_tau, use_pca, pca_precision])
# call the (grand)parent constructors
super(Regressor, self).__init__(name)
super(NeuralNetwork, self).__init__()
# structure of the model
self._struct = struct
# define the model
self._model = model
# define parameters in the loss
self._alpha_tau = alpha_tau
# if the dataset is already given, set it as the dataset of the network
if dataset is not None:
self.set_check_metadata_and_partition(dataset)
[docs]
@classmethod
def load(cls, path: str | None = None):
"""
Loads a regressor.
Parameters
----------
path: str | None
Path to the saved files containing the regressor data.
If None automatically fetch the DefaultRegressor.
Returns
-------
Regressor
"""
if path is None:
path = os.path.join(DATA_PATH, "DefaultRegressor")
name = path.split('/')[-1]
if os.path.isfile(path + '_struct.npy'):
with open(path + '_struct.npy', 'rb') as file:
struct = np.load(file)
if len(struct) == 7:
regressor = Regressor(n_input=int(struct[0]),
n_output=int(struct[1]),
n_hidden_features=int(struct[2]),
n_hidden_layers=int(struct[3]),
alpha_tau=struct[4],
use_pca=bool(struct[5]),
pca_precision=struct[6],
name=name)
regressor.load_weights_and_extras(path)
regressor.eval()
print('Model ' + str(name) + ' sucessfully loaded')
return regressor
# if the struct read is not of the right size
# check for a pickled save of the full class
# (although this is not recommended)
if os.path.isfile(path + '.pth') :
regressor = torch.load(path + ".pth")
regressor.eval()
print('Model ' + str(name) + ' sucessfully loaded from a .pth archive')
return regressor
raise ValueError("Could not find a fully saved regressor model at: " + path)
[docs]
def forward(self, x):
"""
Forward evaluation of the model.
Parameters
----------
x: torch.Tensor
input features
"""
if self.use_pca is True:
# reconstruct the value of the function in the principal component eigenbasis
y = torch.matmul(self._model(x), self.metadata.torch_pca_eigenvectors[0:self.metadata.pca_n_eigenvectors, :])
y = y + self.metadata.torch_pca_mean_values
y = 10**y
else:
y = self._model(x)
return torch.clamp(y, max=1.0)
[docs]
def tau_ion(self, x, y):
"""
Optical depth to reionization.
Parameters
----------
x: torch.Tensor
Input features.
y: torch.Tensor
Output of the Regressor. Corresponds to X_e(z).
"""
z_tensor = torch.tensor(self.metadata.z, dtype=torch.float32)
omega_b = uniform_to_true(x[:, self.metadata.pos_omega_b], self.metadata.min_omega_b, self.metadata.max_omega_b)
omega_c = uniform_to_true(x[:, self.metadata.pos_omega_dm], self.metadata.min_omega_c, self.metadata.max_omega_c)
hlittle = uniform_to_true(x[:, self.metadata.pos_hlittle], self.metadata.min_hlittle, self.metadata.max_hlittle)
return optical_depth_no_rad(z_tensor, y, omega_b, omega_c, hlittle)
[docs]
def loss_xHII(self, output, target):
return torch.mean(torch.abs(1.0-torch.div(output, target[:, :-1])))
[docs]
def loss_tau(self, tau_pred, target):
return torch.mean(torch.abs(1.0 - torch.div(tau_pred, target[:, -1])))
[docs]
def test_tau(self, dataset:DataSet) -> np.ndarray:
"""
Test the efficiency of the regressor to reconstruct the optical depth to reionization.
Parameters
----------
dataset : DataSet
DataSet containing the training partition and the test partition.
Returns
-------
np.ndarray
Distribution of relative error between the predicted and true optical depth.
Array with the size of the test dataset.
"""
self.set_check_metadata_and_partition(dataset, check_only = True)
x_test = torch.tensor(dataset.x_array[dataset.partition.early_test], dtype=torch.float32)
tau_test = torch.tensor(dataset.y_regressor[dataset.partition.early_test, -1], dtype=torch.float32)
self.eval()
with torch.no_grad():
y_pred = self.forward(x_test)
tau_pred = self.tau_ion(x_test, y_pred)
return (1.0-tau_pred/tau_test).numpy()
[docs]
def test_xHII(self, dataset:DataSet) -> tuple[np.ndarray, np.ndarray]:
"""
Test the efficiency of the regressor to reconstruct the free electron fraction X_e.
Parameters
----------
dataset : DataSet
DataSet containing the training partition and the test partition.
Returns
-------
tuple(np.ndarray, np.ndarray)
Prediction for X_e and true values.
"""
self.set_check_metadata_and_partition(dataset, check_only = True)
x_test = torch.tensor(dataset.x_array[dataset.partition.early_test], dtype=torch.float32)
y_test = torch.tensor(dataset.y_regressor[dataset.partition.early_test, :-1], dtype=torch.float32)
self.eval()
with torch.no_grad():
y_pred = self.forward(x_test)
return y_pred.numpy(), y_test.numpy()
@property
def alpha_tau(self):
return self._alpha_tau
@property
def use_pca(self):
return self._use_pca
@property
def pca_precision(self):
return self._pca_precision
@property
def z(self):
return self._metadata.z
[docs]
def train_regressor(model: Regressor,
dataset: DataSet,
optimizer: torch.optim.Optimizer,
*,
epochs = 50,
learning_rate = 1e-3,
verbose = True,
batch_size = 64,
**kwargs):
"""
Trains a given regressor.
Parameters
----------
model : Regressor
Regressor model to train.
dataset : DataSet
Dataset on which to train the regressor.
optimizer : torch.optim.Optimizer
Optimizer used for training.
epochs : int, optional
Number of epochs, by default 50.
learning_rate : float, optional
Learning rate for training, by default 1e-3.
verbose : bool, optional
If true, outputs a summary of the losses at each epoch, by default True.
batch_size : int, optional
Size of the training batches, by default 64.
"""
# if we use pca but have not yet initialised it in metadata do it at first training step
# this can happen if no dataset is given to initialise the model while havind use_pca = True
if (len(model.train_loss) == 0) and (model.use_pca is True) and (len(dataset.metadata.pca_eigenvalues) == 0):
dataset.init_principal_components(model.pca_precision)
# set or check the metadata and parition attributes of the model
model.set_check_metadata_and_partition(dataset)
# format the data for the regressor
train_dataset = TorchDataset(dataset.x_array[dataset.partition.early_train], dataset.y_regressor[dataset.partition.early_train])
valid_dataset = TorchDataset(dataset.x_array[dataset.partition.early_valid], dataset.y_regressor[dataset.partition.early_valid])
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, **kwargs)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=batch_size, shuffle=True, **kwargs)
# we have only one param_group here
# we modify the learning rate of that group
optimizer.param_groups[0]['lr'] = learning_rate
# start loop on the epochs
for epoch in range(epochs):
train_loss = np.array([])
valid_loss = np.array([])
train_accuracy = np.array([])
valid_accuracy = np.array([])
# training mode
model.train()
for batch in train_loader:
x_batch, y_batch = batch
optimizer.zero_grad()
y_pred = model.forward(x_batch)
tau_pred = model.tau_ion(x_batch, y_pred)
loss_xHII = model.loss_xHII(y_pred, y_batch)
loss_tau = model.loss_tau(tau_pred, y_batch)
loss = (1.0-model.alpha_tau) * loss_xHII + model.alpha_tau * loss_tau
loss.backward()
optimizer.step()
train_loss = np.append(train_loss, loss.item())
train_accuracy = np.append(train_accuracy, 1-loss_tau.item())
# evaluation mode
model.eval()
with torch.no_grad():
for batch in valid_loader:
x_batch, y_batch = batch
y_pred = model.forward(x_batch) # forward pass
tau_pred = model.tau_ion(x_batch, y_pred)
loss_xHII = model.loss_xHII(y_pred, y_batch)
loss_tau = model.loss_tau(tau_pred, y_batch)
loss = (1.0-model.alpha_tau) * loss_xHII + model.alpha_tau * loss_tau
valid_loss = np.append(valid_loss, loss.item())
valid_accuracy = np.append(valid_accuracy, 1-loss_tau.item())
# get the mean of all batches
model._train_loss = np.append(model._train_loss, np.mean(train_loss))
model._valid_loss = np.append(model._valid_loss, np.mean(valid_loss))
model._train_accuracy = np.append(model._train_accuracy, np.mean(train_accuracy))
model._valid_accuracy = np.append(model._valid_accuracy, np.mean(valid_accuracy))
if verbose:
print(f'Epoch [{epoch+1}/{epochs}], loss: ({model.train_loss[-1]:.4f}, {model.valid_loss[-1]:.4f}), accuracy = ({model.train_accuracy[-1]:.4f}, {model.valid_accuracy[-1]:.4f})')