From 9a0eff045388e452c50bed36be2676a184806c30 Mon Sep 17 00:00:00 2001 From: r-sarma Date: Tue, 2 Jul 2024 12:26:07 +0200 Subject: [PATCH 01/18] Addition of loggers and dist strategy --- use-cases/xtclim/README.md | 38 ++- use-cases/xtclim/Requirements.txt | 24 +- use-cases/xtclim/pipeline.yaml | 24 +- .../preprocessing/preprocess_2d_seasons.py | 6 +- use-cases/xtclim/src/anomaly.py | 202 ++++++++-------- use-cases/xtclim/src/initialization.py | 4 +- use-cases/xtclim/src/pipeline.yaml | 11 - use-cases/xtclim/src/trainer.py | 219 +++++++++++++----- use-cases/xtclim/startscript | 99 ++++++++ 9 files changed, 435 insertions(+), 192 deletions(-) delete mode 100644 use-cases/xtclim/src/pipeline.yaml create mode 100755 use-cases/xtclim/startscript diff --git a/use-cases/xtclim/README.md b/use-cases/xtclim/README.md index b62d2771..fcfe9ba7 100644 --- a/use-cases/xtclim/README.md +++ b/use-cases/xtclim/README.md @@ -24,7 +24,16 @@ The file `train.py` trains the network. Caution: It will overwrite the weights o The `anomaly.py` file evaluates the network on the available datasets - train, test, and projection. -## How to launch pipeline +## Installation + +Please follow the documentation to install the itwinai environment. +After that, install the required libraries with the itwinai environment with: + +```bash +pip install -r Requirements.txt +``` + +## How to launch pipeline locally The config file `pipeline.yaml` contains all the steps to execute the workflow. You can launch it from the root of the repository with: @@ -33,5 +42,28 @@ python train.py -p pipeline.yaml ``` -## TODOs -Integration of post-processing step + distributed strategies +## How to launch pipeline on an HPC system + +The `startscript` job script can be used to launch a pipeline with SLURM on an HPC system. +These steps should be followed to export the environment variables required by the script. + +```bash +# Distributed training with torch DistributedDataParallel +PYTHON_VENV="../../envAI_hdfml" +DIST_MODE="ddp" +RUN_NAME="ddp-cerfacs" +sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",PYTHON_VENV="$PYTHON_VENV" \ + startscript +``` + +The results and/or errors are available in `job.out` and `job.err` log files. + +With MLFLow logger, the logs can be visualized in the MLFlow UI: + +```bash +mlflow ui --backend-store-uri mllogs/mlflow + +# In background +mlflow ui --backend-store-uri mllogs/mlflow > /dev/null 2>&1 & +``` + diff --git a/use-cases/xtclim/Requirements.txt b/use-cases/xtclim/Requirements.txt index d0cf004b..b1318c58 100644 --- a/use-cases/xtclim/Requirements.txt +++ b/use-cases/xtclim/Requirements.txt @@ -1,17 +1,15 @@ -cartopy -cftime -codecarbon -dask -datetime +cartopy +cftime +codecarbon +dask +datetime imageio -ipykernel -matplotlib -numpy -pandas -torch -torchvision -tqdm -urllib3==1.26.13 +ipykernel +matplotlib +numpy +pandas +tqdm +urllib3==1.26.13 xarray netCDF4 h5netcdf diff --git a/use-cases/xtclim/pipeline.yaml b/use-cases/xtclim/pipeline.yaml index b65e70d9..550cc470 100644 --- a/use-cases/xtclim/pipeline.yaml +++ b/use-cases/xtclim/pipeline.yaml @@ -4,7 +4,8 @@ epochs: 3 batch_size: 10 lr: 0.001 scenario: '245' -strategy: 'ddp' +strategy: ddp +evaluation: 'past' # Workflows pipeline: @@ -21,11 +22,24 @@ pipeline: init_args: scenario: ${scenario} training-step: - #class_path: src.trainer_dist.XTClimTrainer - class_path: src.trainer.TorchTrainer + class_path: src.trainer.XTClimTrainer init_args: epochs: ${epochs} batch_size: ${batch_size} lr: ${lr} - #strategy: ${strategy} - + strategy: ${strategy} + logger: + class_path: itwinai.loggers.LoggersCollection + init_args: + loggers: + - class_path: itwinai.loggers.MLFlowLogger + init_args: + experiment_name: XTClim (Cerfacs) + log_freq: epoch + - class_path: itwinai.loggers.WandBLogger + init_args: + log_freq: epoch + evaluation-step: + class_path: src.anomaly.XTClimPredictor + init_args: + evaluation: ${evaluation} diff --git a/use-cases/xtclim/preprocessing/preprocess_2d_seasons.py b/use-cases/xtclim/preprocessing/preprocess_2d_seasons.py index 9b85a152..d9280366 100644 --- a/use-cases/xtclim/preprocessing/preprocess_2d_seasons.py +++ b/use-cases/xtclim/preprocessing/preprocess_2d_seasons.py @@ -112,8 +112,8 @@ def execute(self): scenarios = [self.scenario] # Load preprocessed "daily temperature images" and time series - train_images = np.load("input/preprocessed_2d_train_data_allssp.npy") - test_images = np.load("input/preprocessed_2d_test_data_allssp.npy") + train_images = np.load("input/preprocessed_2d_train_data_allssp.npy", allow_pickle=True) + test_images = np.load("input/preprocessed_2d_test_data_allssp.npy", allow_pickle=True) train_time = pd.read_csv("input/dates_train_data.csv") test_time = pd.read_csv("input/dates_test_data.csv") @@ -131,7 +131,7 @@ def execute(self): ##### 4. Apply to Projection Datasets for scenario in scenarios: - proj_images = np.load(f"input/preprocessed_2d_proj{scenario}_data_allssp.npy") + proj_images = np.load(f"input/preprocessed_2d_proj{scenario}_data_allssp.npy", allow_pickle=True) proj_time = pd.read_csv("input/dates_proj_data.csv") proj_season_images, proj_season_time = self.season_split( diff --git a/use-cases/xtclim/src/anomaly.py b/use-cases/xtclim/src/anomaly.py index 8f07af0e..1cb7a6a4 100644 --- a/use-cases/xtclim/src/anomaly.py +++ b/use-cases/xtclim/src/anomaly.py @@ -1,114 +1,132 @@ +from typing import Literal import torch import numpy as np import pandas as pd from operator import add +#from itwinai.torch.inference import TorchPredictor +from itwinai.components import monitor_exec, Predictor + import model from torch.utils.data import DataLoader from engine import evaluate -from initialization import device, beta, criterion, n_avg, pixel_wise_criterion +from initialization import beta, criterion, n_avg, pixel_wise_criterion + +# TODO: itwinai doesnt support distributed inference at the moment!! +device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # pick the season to study among: # '' (none, i.e. full dataset), 'winter_', 'spring_', 'summer_', 'autumn_' +# This is now moved to the configuration file # choose wether to evaluate train and test data, and/or projections -past_evaluation = False -future_evaluation = True - - -if past_evaluation: - for season in ['winter_', 'spring_', 'summer_', 'autumn_']: - - # load previously trained model - cvae_model = model.ConvVAE().to(device) - cvae_model.load_state_dict(torch.load(f'../outputs/cvae_model_{season}1d.pth')) - - # train set and data loader - train_time = pd.read_csv(f"../input/dates_train_{season}data.csv") - train_data = np.load(f"../input/preprocessed_1d_train_{season}data_allssp.npy") - n_train = len(train_data) - trainset = [ ( torch.from_numpy(np.reshape(train_data[i], (3, 32, 32))), +#past_evaluation = False +#future_evaluation = True + +class XTClimPredictor(Predictor): + def __init__( + self, + evaluation: Literal["past", "future"] = 'past' + ): + super().__init__(model) + self.evaluation = evaluation + + @monitor_exec + def execute(self): + + if self.evaluation=='past': + for season in ['winter_', 'spring_', 'summer_', 'autumn_']: + + # load previously trained model + cvae_model = model.ConvVAE().to(device) + cvae_model.load_state_dict(torch.load(f'outputs/cvae_model_{season}1d_1memb.pth')['model_state_dict'], strict=False) + + # train set and data loader + train_time = pd.read_csv(f"input/dates_train_{season}data_1memb.csv") + train_data = np.load(f"input/preprocessed_1d_train_{season}data_1memb.npy") + n_train = len(train_data) + trainset = [ ( torch.from_numpy(np.reshape(train_data[i], (2, 32, 32))), train_time['0'][i] ) for i in range(n_train) ] - trainloader = DataLoader( - trainset, batch_size=1, shuffle=False - ) - - # test set and data loader - test_time = pd.read_csv(f"../input/dates_test_{season}data.csv") - test_data = np.load(f"../input/preprocessed_1d_test_{season}data_allssp.npy") - n_test = len(test_data) - testset = [ ( torch.from_numpy(np.reshape(test_data[i], (3, 32, 32))), + trainloader = DataLoader( + trainset, batch_size=1, shuffle=False + ) + + # test set and data loader + test_time = pd.read_csv(f"input/dates_test_{season}data_1memb.csv") + test_data = np.load(f"input/preprocessed_1d_test_{season}data_1memb.npy") + n_test = len(test_data) + testset = [ ( torch.from_numpy(np.reshape(test_data[i], (2, 32, 32))), test_time['0'][i] ) for i in range(n_test) ] - testloader = DataLoader( - testset, batch_size=1, shuffle=False - ) - - # average over a few iterations - # for a better reconstruction estimate - train_avg_losses, _, tot_train_losses, _ = evaluate(cvae_model, trainloader, - trainset, device, - criterion, + testloader = DataLoader( + testset, batch_size=1, shuffle=False + ) + + # average over a few iterations + # for a better reconstruction estimate + train_avg_losses, _, tot_train_losses, _ = evaluate(cvae_model, trainloader, + trainset, device, + criterion, pixel_wise_criterion) - test_avg_losses, _, tot_test_losses, _ = evaluate(cvae_model, testloader, - testset, device, criterion, + test_avg_losses, _, tot_test_losses, _ = evaluate(cvae_model, testloader, + testset, device, criterion, pixel_wise_criterion) - for i in range(1, n_avg): - train_avg_loss, _, train_losses, _ = evaluate(cvae_model, trainloader, - trainset, device, criterion, + for i in range(1, n_avg): + train_avg_loss, _, train_losses, _ = evaluate(cvae_model, trainloader, + trainset, device, criterion, pixel_wise_criterion) - tot_train_losses = list(map(add, tot_train_losses, train_losses)) - train_avg_losses += train_avg_loss - test_avg_loss, _, test_losses, _ = evaluate(cvae_model, testloader, - testset, device, criterion, + tot_train_losses = list(map(add, tot_train_losses, train_losses)) + train_avg_losses += train_avg_loss + test_avg_loss, _, test_losses, _ = evaluate(cvae_model, testloader, + testset, device, criterion, pixel_wise_criterion) - tot_test_losses = list(map(add, tot_test_losses, test_losses)) - test_avg_losses += test_avg_loss - tot_train_losses = np.array(tot_train_losses)/n_avg - tot_test_losses = np.array(tot_test_losses)/n_avg - train_avg_losses = train_avg_losses/n_avg - test_avg_losses = test_avg_losses/n_avg - - pd.DataFrame(tot_train_losses).to_csv(f"../outputs/train_losses_{season}1d_allssp.csv") - pd.DataFrame(tot_test_losses).to_csv(f"../outputs/test_losses_{season}1d_allssp.csv") - print('Train average loss:', train_avg_losses) - print('Test average loss:', test_avg_losses) - - -if future_evaluation: - for season in ['winter_', 'spring_', 'summer_', 'autumn_']: - - # load previously trained model - cvae_model = model.ConvVAE().to(device) - cvae_model.load_state_dict(torch.load(f'../outputs/cvae_model_{season}1d.pth')) - - for scenario in ['585', '370', '245', '126']: - - # projection set and data loader - proj_time = pd.read_csv(f"../input/dates_proj_{season}data.csv") - proj_data = np.load(f"../input/preprocessed_1d_proj{scenario}_{season}data_allssp.npy") - n_proj = len(proj_data) - projset = [ ( torch.from_numpy(np.reshape(proj_data[i], (3, 32, 32))), + tot_test_losses = list(map(add, tot_test_losses, test_losses)) + test_avg_losses += test_avg_loss + tot_train_losses = np.array(tot_train_losses)/n_avg + tot_test_losses = np.array(tot_test_losses)/n_avg + train_avg_losses = train_avg_losses/n_avg + test_avg_losses = test_avg_losses/n_avg + + pd.DataFrame(tot_train_losses).to_csv(f"outputs/train_losses_{season}1d_allssp.csv") + pd.DataFrame(tot_test_losses).to_csv(f"outputs/test_losses_{season}1d_allssp.csv") + print('Train average loss:', train_avg_losses) + print('Test average loss:', test_avg_losses) + + + else: + for season in ['winter_', 'spring_', 'summer_', 'autumn_']: + + # load previously trained model + cvae_model = model.ConvVAE().to(device) + cvae_model.load_state_dict(torch.load(f'outputs/cvae_model_{season}1d_1memb.pth')['model_state_dict'], strict=False) + + for scenario in ['585', '245']: + + # projection set and data loader + proj_time = pd.read_csv(f"input/dates_proj_{season}data_1memb.csv") + proj_data = np.load(f"input/preprocessed_1d_proj{scenario}_{season}data_1memb.npy") + n_proj = len(proj_data) + projset = [ ( torch.from_numpy(np.reshape(proj_data[i], (3, 32, 32))), proj_time['0'][i] ) for i in range(n_proj) ] - projloader = DataLoader( - projset, batch_size=1, shuffle=False - ) - - # get the losses for each data set - # on various experiments to have representative statistics - proj_avg_losses, _, tot_proj_losses, _ = evaluate(cvae_model, projloader, - projset, device, criterion, + projloader = DataLoader( + projset, batch_size=1, shuffle=False + ) + + # get the losses for each data set + # on various experiments to have representative statistics + proj_avg_losses, _, tot_proj_losses, _ = evaluate(cvae_model, projloader, + projset, device, criterion, pixel_wise_criterion) - - for i in range(1, n_avg): - proj_avg_loss, _, proj_losses, _ = evaluate(cvae_model, projloader, - projset, device, criterion, + + for i in range(1, n_avg): + proj_avg_loss, _, proj_losses, _ = evaluate(cvae_model, projloader, + projset, device, criterion, pixel_wise_criterion) - tot_proj_losses = list(map(add, tot_proj_losses, proj_losses)) - proj_avg_losses += proj_avg_loss - - tot_proj_losses = np.array(tot_proj_losses)/n_avg - proj_avg_losses = proj_avg_losses/n_avg - - # save the losses time series - pd.DataFrame(tot_proj_losses).to_csv(f"../outputs/proj{scenario}_losses_{season}1d_allssp.csv") - print(f'SSP{scenario} Projection average loss:', proj_avg_losses, 'for', season[:-1]) \ No newline at end of file + tot_proj_losses = list(map(add, tot_proj_losses, proj_losses)) + proj_avg_losses += proj_avg_loss + + tot_proj_losses = np.array(tot_proj_losses)/n_avg + proj_avg_losses = proj_avg_losses/n_avg + + # save the losses time series + pd.DataFrame(tot_proj_losses).to_csv(f"outputs/proj{scenario}_losses_{season}1d_allssp.csv") + print(f'SSP{scenario} Projection average loss:', proj_avg_losses, 'for', season[:-1]) diff --git a/use-cases/xtclim/src/initialization.py b/use-cases/xtclim/src/initialization.py index e0030627..f1af9c7d 100644 --- a/use-cases/xtclim/src/initialization.py +++ b/use-cases/xtclim/src/initialization.py @@ -1,8 +1,8 @@ import torch import torch.nn as nn - -device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') +# Device is now obtained from itwinai backend +#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # Mean-Squared Error as the average difference between the pixels # in the original image vs. the reconstructed one diff --git a/use-cases/xtclim/src/pipeline.yaml b/use-cases/xtclim/src/pipeline.yaml deleted file mode 100644 index 8274071d..00000000 --- a/use-cases/xtclim/src/pipeline.yaml +++ /dev/null @@ -1,11 +0,0 @@ -pipeline: - class_path: itwinai.pipeline.Pipeline - init_args: - steps: - training-step: - class_path: trainer.TorchTrainer - init_args: - epochs: 3 - batch_size: 10 - lr: 0.0001 - diff --git a/use-cases/xtclim/src/trainer.py b/use-cases/xtclim/src/trainer.py index 4314060a..687280be 100644 --- a/use-cases/xtclim/src/trainer.py +++ b/use-cases/xtclim/src/trainer.py @@ -14,33 +14,78 @@ tracker.start() """ +from typing import Literal, Optional import torch import torch.optim as optim import numpy as np import pandas as pd +import sys +import os + +from itwinai.components import monitor_exec +from itwinai.torch.trainer import TorchTrainer +from itwinai.torch.distributed import ( + distributed_resources_available, + TorchDistributedStrategy, + TorchDDPStrategy, + HorovodStrategy, + DeepSpeedStrategy, + NonDistributedStrategy +) +from itwinai.loggers import Logger +from itwinai.torch.config import TrainingConfiguration -from itwinai.components import Trainer, monitor_exec import model from torch.utils.data import DataLoader from torchvision.utils import make_grid from engine import train, validate from utils import save_reconstructed_images, save_loss_plot, save_ex -from initialization import device, beta, criterion +from initialization import beta, criterion, pixel_wise_criterion -class TorchTrainer(Trainer): +class XTClimTrainer(TorchTrainer): def __init__( self, epochs: int, batch_size: int, - lr: float + lr: float, + strategy: Literal["ddp", "deepspeed", "horovod"] = 'ddp', + save_best: bool = True, + logger: Optional[Logger] = None ): - super().__init__() - self.epochs = epochs + super().__init__( + epochs=epochs, + config={}, + strategy=strategy, + logger=logger + ) self.batch_size = batch_size + self.epochs = epochs self.lr = lr + # Global training configuration + self.config = TrainingConfiguration( + batch_size=self.batch_size, + save_best=save_best, + ) + + def final_loss(self, bce_loss, mu, logvar, beta=0.1): + """ + Adds up reconstruction loss (BCELoss) and Kullback-Leibler divergence. + KL-Divergence = 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2) + + Parameters: + bce_loss: recontruction loss + mu: the mean from the latent vector + logvar: log variance from the latent vector + beta: weight over the KL-Divergence + """ + BCE = bce_loss + KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) + return BCE + beta*KLD + @monitor_exec def execute(self): + # pick the season to study among: # '' (none, i.e. full dataset), 'winter_', 'spring_', 'summer_', 'autumn_' seasons = ["winter_", "spring_", "summer_", "autumn_"] @@ -48,22 +93,35 @@ def execute(self): # number of members used for the training of the network n_memb = 1 - # initialize learning parameters - #lr0 = 0.001 - #batch_size = 64 - #epochs = 100 #early stopping parameters stop_delta = 0.01 # under 1% improvement consider the model starts converging patience = 15 # wait for a few epochs to be sure before actually stopping early_count = 0 # count when validation loss < stop_delta old_valid_loss = 0 # keep track of validation loss at t-1 + # Initialize distributed backend + self._init_distributed_strategy() for season in seasons: - # initialize the model - cvae_model = model.ConvVAE().to(device) + cvae_model = model.ConvVAE() optimizer = optim.Adam(cvae_model.parameters(), lr=self.lr) + # First, define strategy-wise optional configurations + if isinstance(self.strategy, DeepSpeedStrategy): + # Batch size definition is not optional for DeepSpeedStrategy! + distribute_kwargs = dict( + config_params=dict( + train_micro_batch_size_per_gpu=self.config.batch_size + ) + ) + else: + distribute_kwargs = {} + + # Distributed model, optimizer, and scheduler + cvae_model, optimizer, _ = self.strategy.distributed( + cvae_model, optimizer, **distribute_kwargs + ) + # load training set and train data train_time = pd.read_csv(f"input/dates_train_{season}data_{n_memb}memb.csv") train_data = np.load( @@ -75,7 +133,8 @@ def execute(self): for i in range(n_train) ] # load train set, shuffle it, and create batches - trainloader = DataLoader(trainset, batch_size=self.batch_size, shuffle=True) + trainloader = self.strategy.create_dataloader(trainset, batch_size=self.batch_size, + shuffle=True, pin_memory=True) # load validation set and validation data test_time = pd.read_csv(f"input/dates_test_{season}data_{n_memb}memb.csv") @@ -85,77 +144,106 @@ def execute(self): (torch.from_numpy(np.reshape(test_data[i], (2, 32, 32))), test_time["0"][i]) for i in range(n_test) ] - testloader = DataLoader(testset, batch_size=self.batch_size, shuffle=False) + testloader = self.strategy.create_dataloader(testset, batch_size=self.batch_size, + shuffle=False, pin_memory=True) + + if self.strategy.is_main_worker and self.logger: + self.logger.create_logger_context() # a list to save all the reconstructed images in PyTorch grid format grid_images = [] # a list to save the loss evolutions train_loss = [] valid_loss = [] - min_valid_epoch_loss = 100 # random high value + min_valid_epoch_loss = float('inf') # random high value for epoch in range(self.epochs): - print(f"Epoch {epoch+1} of {self.epochs}") + if self.strategy.is_main_worker: + print(f"Epoch {epoch+1} of {self.epochs}") + + if self.strategy.is_distributed: + # Inform the sampler that a new epoch started: shuffle + # may be needed + trainloader.sampler.set_epoch(epoch) + testloader.sampler.set_epoch(epoch) # train the model train_epoch_loss = train( - cvae_model, trainloader, trainset, device, optimizer, criterion, beta + cvae_model, trainloader, trainset, self.device, optimizer, criterion, beta ) # evaluate the model on the test set valid_epoch_loss, recon_images = validate( - cvae_model, testloader, testset, device, criterion, beta + cvae_model, testloader, testset, self.device, criterion, beta ) + self.log(train_epoch_loss, + 'epoch_train_loss', + kind='metric' + ) + self.log(valid_epoch_loss, + 'epoch_valid_loss', + kind='metric' + ) + # keep track of the losses train_loss.append(train_epoch_loss) valid_loss.append(valid_epoch_loss) - # save the reconstructed images from the validation loop - #save_reconstructed_images(recon_images, epoch+1, season) - - # convert the reconstructed images to PyTorch image grid format - image_grid = make_grid(recon_images.detach().cpu()) - grid_images.append(image_grid) - # save one example of reconstructed image before and after training - - #if epoch == 0 or epoch == self.epochs-1: - # save_ex(recon_images[0], epoch, season) - - # decreasing learning rate - if (epoch + 1) % 20 == 0: - lr = lr / 5 - -#------- - - - # early stopping to avoid overfitting -# if ( -# epoch > 1 -# and (old_valid_loss - valid_epoch_loss) / old_valid_loss < stop_delta -# ): - # if the marginal improvement in validation loss is too small -# early_count += 1 - - #if early_count > patience: - # if too small improvement for a few epochs in a row, stop learning - # save_ex(recon_images[0], epoch, season) - #break - -# else: - # if the condition is not verified anymore, reset the count -# early_count = 0 -# old_valid_loss = valid_epoch_loss - -#--------------- - - # save best model - if valid_epoch_loss < min_valid_epoch_loss: - min_valid_epoch_loss = valid_epoch_loss - torch.save( - cvae_model.state_dict(), - f"outputs/cvae_model_{season}1d_{n_memb}memb.pth", - ) + # save the reconstructed images from the validation loop + #save_reconstructed_images(recon_images, epoch+1, season) + + # convert the reconstructed images to PyTorch image grid format + image_grid = make_grid(recon_images.detach().cpu()) + grid_images.append(image_grid) + # save one example of reconstructed image before and after training + + #if epoch == 0 or epoch == self.epochs-1: + # save_ex(recon_images[0], epoch, season) + + # decreasing learning rate + if (epoch + 1) % 20 == 0: + lr = lr / 5 + + # early stopping to avoid overfitting + if ( + epoch > 1 + and (old_valid_loss - valid_epoch_loss) / old_valid_loss < stop_delta + ): + # if the marginal improvement in validation loss is too small + early_count += 1 + if early_count > patience: + # if too small improvement for a few epochs in a row, stop learning + save_ex(recon_images[0], epoch, season) + break + + else: + # if the condition is not verified anymore, reset the count + early_count = 0 + old_valid_loss = valid_epoch_loss + + # save best model + worker_val_losses = self.strategy.gather_obj(valid_epoch_loss) + if self.strategy.is_main_worker: + # Save only in the main worker + # avg_loss has a meaning only in the main worker + avg_loss = np.mean(worker_val_losses) + if self.config.save_best and avg_loss < min_valid_epoch_loss: + min_valid_epoch_loss = avg_loss + checkpoint = { + 'epoch': epoch, + 'model_state_dict': cvae_model.state_dict(), + 'optim_state_dict': optimizer.state_dict(), + 'val_loss': valid_epoch_loss, + } + # save checkpoint only if it is better than + # the previous ones + checkpoint_filename = f"outputs/cvae_model_{season}1d_{n_memb}memb.pth" + torch.save(checkpoint, checkpoint_filename) + # itwinai - log checkpoint as artifact + self.log(checkpoint_filename, + os.path.basename(checkpoint_filename), + kind='artifact') print(f"Train Loss: {train_epoch_loss:.4f}") print(f"Val Loss: {valid_epoch_loss:.4f}") @@ -169,5 +257,10 @@ def execute(self): f"outputs/test_loss_indiv_{season}1d_{n_memb}memb.csv" ) + # Clean-up + if self.strategy.is_main_worker and self.logger: + self.logger.destroy_logger_context() + self.strategy.clean_up() + # emissions = tracker.stop() # print(f"Emissions from this training run: {emissions:.5f} kg CO2eq") diff --git a/use-cases/xtclim/startscript b/use-cases/xtclim/startscript new file mode 100755 index 00000000..fa192a80 --- /dev/null +++ b/use-cases/xtclim/startscript @@ -0,0 +1,99 @@ +#!/bin/bash + +# general configuration of the job +#SBATCH --job-name=cerfacs-IT +#SBATCH --account=intertwin +#SBATCH --partition=batch +#SBATCH --output=job.out +#SBATCH --error=job.err +#SBATCH --time=01:00:00 +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --cpus-per-task=1 +#SBATCH --gpus-per-node=1 +#SBATCH --exclusive + +# command +COMMAND="train.py -p pipeline.yaml" +EXEC="$COMMAND" + +# set modules +ml --force purge +ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py + +# Job info +echo "DEBUG: TIME: $(date)" +sysN="$(uname -n | cut -f2- -d.)" +sysN="${sysN%%[0-9]*}" +echo "Running on system: $sysN" +echo "DEBUG: EXECUTE: $EXEC" +echo "DEBUG: SLURM_SUBMIT_DIR: $SLURM_SUBMIT_DIR" +echo "DEBUG: SLURM_JOB_ID: $SLURM_JOB_ID" +echo "DEBUG: SLURM_JOB_NODELIST: $SLURM_JOB_NODELIST" +echo "DEBUG: SLURM_NNODES: $SLURM_NNODES" +echo "DEBUG: SLURM_NTASKS: $SLURM_NTASKS" +echo "DEBUG: SLURM_TASKS_PER_NODE: $SLURM_TASKS_PER_NODE" +echo "DEBUG: SLURM_SUBMIT_HOST: $SLURM_SUBMIT_HOST" +echo "DEBUG: SLURMD_NODENAME: $SLURMD_NODENAME" +echo "DEBUG: CUDA_VISIBLE_DEVICES: $CUDA_VISIBLE_DEVICES" +if [ "$DEBUG" = true ] ; then + echo "DEBUG: NCCL_DEBUG=INFO" + export NCCL_DEBUG=INFO +fi +echo + +# set vars +export NCCL_DEBUG=INFO +export SRUN_CPUS_PER_TASK=${SLURM_CPUS_PER_TASK} +export OMP_NUM_THREADS=1 +if [ "$SLURM_CPUS_PER_TASK" > 0 ] ; then + export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK +fi +export CUDA_VISIBLE_DEVICES="0,1,2,3" + +# Env vairables check +if [ -z "$DIST_MODE" ]; then + >&2 echo "ERROR: env variable DIST_MODE is not set. Allowed values are 'horovod', 'ddp' or 'deepspeed'" + exit 1 +fi +if [ -z "$RUN_NAME" ]; then + >&2 echo "WARNING: env variable RUN_NAME is not set. It's a way to identify some specific run of an experiment." + RUN_NAME=$DIST_MODE +fi +if [ -z "$PYTHON_VENV" ]; then + >&2 echo "WARNING: env variable PYTHON_VENV is not set. It's the path to a python virtual environment." +else + # Activate Python virtual env + source $PYTHON_VENV/bin/activate +fi + +# Launch training +if [ "$DIST_MODE" == "ddp" ] ; then + echo "DDP training" + srun --cpu-bind=none --ntasks-per-node=1 \ + bash -c "torchrun \ + --log_dir='logs_torchrun' \ + --nnodes=$SLURM_NNODES \ + --nproc_per_node=$SLURM_GPUS_PER_NODE \ + --rdzv_id=$SLURM_JOB_ID \ + --rdzv_conf=is_host=\$(((SLURM_NODEID)) && echo 0 || echo 1) \ + --rdzv_backend=c10d \ + --rdzv_endpoint='$(scontrol show hostnames "$SLURM_JOB_NODELIST" | head -n 1)'i:29500 \ + $EXEC" + +elif [ "$DIST_MODE" == "deepspeed" ] ; then + echo "DEEPSPEED training" + MASTER_ADDR=$(scontrol show hostnames "\$SLURM_JOB_NODELIST" | head -n 1)i + export MASTER_ADDR + export MASTER_PORT=29500 + srun --cpu-bind=none python -u $EXEC + +elif [ "$DIST_MODE" == "horovod" ] ; then + echo "HOROVOD training" + srun --cpu-bind=none python -u $EXEC +else + >&2 echo "ERROR: unrecognized \$DIST_MODE env variable" + exit 1 +fi + +#eof From e89d103578250641ab608c2fac414aa18a90caa4 Mon Sep 17 00:00:00 2001 From: r-sarma Date: Mon, 29 Jul 2024 15:27:40 +0200 Subject: [PATCH 02/18] seasons launch iteratively and fixes --- use-cases/xtclim/README.md | 6 +- use-cases/xtclim/pipeline.yaml | 1 + use-cases/xtclim/src/trainer.py | 344 ++++++++++++++++---------------- use-cases/xtclim/startscript | 2 +- use-cases/xtclim/train.py | 34 ++-- 5 files changed, 197 insertions(+), 190 deletions(-) diff --git a/use-cases/xtclim/README.md b/use-cases/xtclim/README.md index fcfe9ba7..058c8834 100644 --- a/use-cases/xtclim/README.md +++ b/use-cases/xtclim/README.md @@ -35,10 +35,12 @@ pip install -r Requirements.txt ## How to launch pipeline locally -The config file `pipeline.yaml` contains all the steps to execute the workflow. You can launch it from the root of the repository with: +The config file `pipeline.yaml` contains all the steps to execute the workflow. +This file also contains all the seasons, and a separate run is launched for each season. +You can launch it from the root of the repository with: ```bash -python train.py -p pipeline.yaml +python train.py ``` diff --git a/use-cases/xtclim/pipeline.yaml b/use-cases/xtclim/pipeline.yaml index 550cc470..b67e1391 100644 --- a/use-cases/xtclim/pipeline.yaml +++ b/use-cases/xtclim/pipeline.yaml @@ -5,6 +5,7 @@ batch_size: 10 lr: 0.001 scenario: '245' strategy: ddp +seasons: ['winter_', 'spring_', 'summer_', 'autumn_'] evaluation: 'past' # Workflows diff --git a/use-cases/xtclim/src/trainer.py b/use-cases/xtclim/src/trainer.py index 687280be..2edbbac6 100644 --- a/use-cases/xtclim/src/trainer.py +++ b/use-cases/xtclim/src/trainer.py @@ -48,6 +48,7 @@ def __init__( epochs: int, batch_size: int, lr: float, + seasons: Literal["winter_", "spring_", "summer_", "autumn_"] = 'winter_', strategy: Literal["ddp", "deepspeed", "horovod"] = 'ddp', save_best: bool = True, logger: Optional[Logger] = None @@ -58,13 +59,18 @@ def __init__( strategy=strategy, logger=logger ) - self.batch_size = batch_size self.epochs = epochs - self.lr = lr + self.seasons = seasons # Global training configuration self.config = TrainingConfiguration( - batch_size=self.batch_size, + batch_size = batch_size, + lr = lr, save_best=save_best, + n_memb = 1, # number of members used in training the network + stop_delta = 0.01, # under 1% improvement consider the model starts converging + patience = 15, # wait for a few epochs to be sure before actually stopping + early_count = 0, # count when validation loss < stop_delta + old_valid_loss = 0 # keep track of validation loss at t-1 ) def final_loss(self, bce_loss, mu, logvar, beta=0.1): @@ -72,195 +78,187 @@ def final_loss(self, bce_loss, mu, logvar, beta=0.1): Adds up reconstruction loss (BCELoss) and Kullback-Leibler divergence. KL-Divergence = 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2) - Parameters: - bce_loss: recontruction loss - mu: the mean from the latent vector - logvar: log variance from the latent vector - beta: weight over the KL-Divergence + Args: + - bce_loss (torch.Tensor): recontruction loss + - mu (torch.Tensor): the mean from the latent vector + - logvar (torch.Tensor): log variance from the latent vector + - beta (torch.Tensor): weight over the KL-Divergence + + Returns: + - total loss (torch.Tensor) """ BCE = bce_loss KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) return BCE + beta*KLD - @monitor_exec def execute(self): # pick the season to study among: - # '' (none, i.e. full dataset), 'winter_', 'spring_', 'summer_', 'autumn_' - seasons = ["winter_", "spring_", "summer_", "autumn_"] - - # number of members used for the training of the network - n_memb = 1 - - #early stopping parameters - stop_delta = 0.01 # under 1% improvement consider the model starts converging - patience = 15 # wait for a few epochs to be sure before actually stopping - early_count = 0 # count when validation loss < stop_delta - old_valid_loss = 0 # keep track of validation loss at t-1 + season = self.seasons # Initialize distributed backend self._init_distributed_strategy() - for season in seasons: - # initialize the model - cvae_model = model.ConvVAE() - optimizer = optim.Adam(cvae_model.parameters(), lr=self.lr) - - # First, define strategy-wise optional configurations - if isinstance(self.strategy, DeepSpeedStrategy): - # Batch size definition is not optional for DeepSpeedStrategy! - distribute_kwargs = dict( - config_params=dict( - train_micro_batch_size_per_gpu=self.config.batch_size - ) + # initialize the model + cvae_model = model.ConvVAE() + optimizer = optim.Adam(cvae_model.parameters(), lr=self.config.lr) + + # First, define strategy-wise optional configurations + if isinstance(self.strategy, DeepSpeedStrategy): + # Batch size definition is not optional for DeepSpeedStrategy! + distribute_kwargs = dict( + config_params=dict( + train_micro_batch_size_per_gpu=self.config.batch_size ) - else: - distribute_kwargs = {} + ) + else: + distribute_kwargs = {} + + # Distributed model, optimizer, and scheduler + cvae_model, optimizer, _ = self.strategy.distributed( + cvae_model, optimizer, **distribute_kwargs + ) - # Distributed model, optimizer, and scheduler - cvae_model, optimizer, _ = self.strategy.distributed( - cvae_model, optimizer, **distribute_kwargs + # load training set and train data + train_time = pd.read_csv(f"input/dates_train_{season}data_{self.config.n_memb}memb.csv") + train_data = np.load( + f"input/preprocessed_1d_train_{season}data_{self.config.n_memb}memb.npy" + ) + n_train = len(train_data) + trainset = [ + (torch.from_numpy(np.reshape(train_data[i], (2, 32, 32))), train_time["0"][i]) + for i in range(n_train) + ] + # load train set, shuffle it, and create batches + trainloader = self.strategy.create_dataloader(trainset, batch_size=self.config.batch_size, + shuffle=True, pin_memory=True) + + # load validation set and validation data + test_time = pd.read_csv(f"input/dates_test_{season}data_{self.config.n_memb}memb.csv") + test_data = np.load(f"input/preprocessed_1d_test_{season}data_{self.config.n_memb}memb.npy") + n_test = len(test_data) + testset = [ + (torch.from_numpy(np.reshape(test_data[i], (2, 32, 32))), test_time["0"][i]) + for i in range(n_test) + ] + testloader = self.strategy.create_dataloader(testset, batch_size=self.config.batch_size, + shuffle=False, pin_memory=True) + + if self.strategy.is_main_worker and self.logger: + self.logger.create_logger_context() + + # a list to save all the reconstructed images in PyTorch grid format + grid_images = [] + # a list to save the loss evolutions + train_loss = [] + valid_loss = [] + min_valid_epoch_loss = float('inf') # random high value + + for epoch in range(self.epochs): + if self.strategy.is_main_worker: + print(f"Epoch {epoch+1} of {self.epochs}") + + if self.strategy.is_distributed: + # Inform the sampler that a new epoch started: shuffle + # may be needed + trainloader.sampler.set_epoch(epoch) + testloader.sampler.set_epoch(epoch) + + # train the model + train_epoch_loss = train( + cvae_model, trainloader, trainset, self.device, optimizer, criterion, beta ) - # load training set and train data - train_time = pd.read_csv(f"input/dates_train_{season}data_{n_memb}memb.csv") - train_data = np.load( - f"input/preprocessed_1d_train_{season}data_{n_memb}memb.npy" + # evaluate the model on the test set + valid_epoch_loss, recon_images = validate( + cvae_model, testloader, testset, self.device, criterion, beta ) - n_train = len(train_data) - trainset = [ - (torch.from_numpy(np.reshape(train_data[i], (2, 32, 32))), train_time["0"][i]) - for i in range(n_train) - ] - # load train set, shuffle it, and create batches - trainloader = self.strategy.create_dataloader(trainset, batch_size=self.batch_size, - shuffle=True, pin_memory=True) - - # load validation set and validation data - test_time = pd.read_csv(f"input/dates_test_{season}data_{n_memb}memb.csv") - test_data = np.load(f"input/preprocessed_1d_test_{season}data_{n_memb}memb.npy") - n_test = len(test_data) - testset = [ - (torch.from_numpy(np.reshape(test_data[i], (2, 32, 32))), test_time["0"][i]) - for i in range(n_test) - ] - testloader = self.strategy.create_dataloader(testset, batch_size=self.batch_size, - shuffle=False, pin_memory=True) - - if self.strategy.is_main_worker and self.logger: - self.logger.create_logger_context() - - # a list to save all the reconstructed images in PyTorch grid format - grid_images = [] - # a list to save the loss evolutions - train_loss = [] - valid_loss = [] - min_valid_epoch_loss = float('inf') # random high value - - for epoch in range(self.epochs): - if self.strategy.is_main_worker: - print(f"Epoch {epoch+1} of {self.epochs}") - - if self.strategy.is_distributed: - # Inform the sampler that a new epoch started: shuffle - # may be needed - trainloader.sampler.set_epoch(epoch) - testloader.sampler.set_epoch(epoch) - - # train the model - train_epoch_loss = train( - cvae_model, trainloader, trainset, self.device, optimizer, criterion, beta - ) - # evaluate the model on the test set - valid_epoch_loss, recon_images = validate( - cvae_model, testloader, testset, self.device, criterion, beta - ) + self.log(train_epoch_loss, + 'epoch_train_loss', + kind='metric' + ) + self.log(valid_epoch_loss, + 'epoch_valid_loss', + kind='metric' + ) - self.log(train_epoch_loss, - 'epoch_train_loss', - kind='metric' - ) - self.log(valid_epoch_loss, - 'epoch_valid_loss', - kind='metric' - ) - - # keep track of the losses - train_loss.append(train_epoch_loss) - valid_loss.append(valid_epoch_loss) - - # save the reconstructed images from the validation loop - #save_reconstructed_images(recon_images, epoch+1, season) - - # convert the reconstructed images to PyTorch image grid format - image_grid = make_grid(recon_images.detach().cpu()) - grid_images.append(image_grid) - # save one example of reconstructed image before and after training - - #if epoch == 0 or epoch == self.epochs-1: - # save_ex(recon_images[0], epoch, season) - - # decreasing learning rate - if (epoch + 1) % 20 == 0: - lr = lr / 5 - - # early stopping to avoid overfitting - if ( - epoch > 1 - and (old_valid_loss - valid_epoch_loss) / old_valid_loss < stop_delta - ): - # if the marginal improvement in validation loss is too small - early_count += 1 - if early_count > patience: - # if too small improvement for a few epochs in a row, stop learning - save_ex(recon_images[0], epoch, season) - break - - else: - # if the condition is not verified anymore, reset the count - early_count = 0 - old_valid_loss = valid_epoch_loss - - # save best model - worker_val_losses = self.strategy.gather_obj(valid_epoch_loss) - if self.strategy.is_main_worker: - # Save only in the main worker - # avg_loss has a meaning only in the main worker - avg_loss = np.mean(worker_val_losses) - if self.config.save_best and avg_loss < min_valid_epoch_loss: - min_valid_epoch_loss = avg_loss - checkpoint = { - 'epoch': epoch, - 'model_state_dict': cvae_model.state_dict(), - 'optim_state_dict': optimizer.state_dict(), - 'val_loss': valid_epoch_loss, - } - # save checkpoint only if it is better than - # the previous ones - checkpoint_filename = f"outputs/cvae_model_{season}1d_{n_memb}memb.pth" - torch.save(checkpoint, checkpoint_filename) - # itwinai - log checkpoint as artifact - self.log(checkpoint_filename, - os.path.basename(checkpoint_filename), - kind='artifact') - - print(f"Train Loss: {train_epoch_loss:.4f}") - print(f"Val Loss: {valid_epoch_loss:.4f}") - - save_loss_plot(train_loss, valid_loss, season) - # save the loss evolutions - pd.DataFrame(train_loss).to_csv( - f"outputs/train_loss_indiv_{season}1d_{n_memb}memb.csv" - ) - pd.DataFrame(valid_loss).to_csv( - f"outputs/test_loss_indiv_{season}1d_{n_memb}memb.csv" - ) + # keep track of the losses + train_loss.append(train_epoch_loss) + valid_loss.append(valid_epoch_loss) + + # save the reconstructed images from the validation loop + #save_reconstructed_images(recon_images, epoch+1, season) + + # convert the reconstructed images to PyTorch image grid format + image_grid = make_grid(recon_images.detach().cpu()) + grid_images.append(image_grid) + # save one example of reconstructed image before and after training + + #if epoch == 0 or epoch == self.epochs-1: + # save_ex(recon_images[0], epoch, season) + + # decreasing learning rate + if (epoch + 1) % 20 == 0: + self.config.lr = self.config.lr / 5 + + # early stopping to avoid overfitting + if ( + epoch > 1 + and (self.config.old_valid_loss - valid_epoch_loss) / self.config.old_valid_loss < self.config.stop_delta + ): + # if the marginal improvement in validation loss is too small + self.config.early_count += 1 + if self.config.early_count > self.config.patience: + # if too small improvement for a few epochs in a row, stop learning + save_ex(recon_images[0], epoch, season) + break + + else: + # if the condition is not verified anymore, reset the count + self.config.early_count = 0 + self.config.old_valid_loss = valid_epoch_loss + + # save best model + worker_val_losses = self.strategy.gather_obj(valid_epoch_loss) + if self.strategy.is_main_worker: + # Save only in the main worker + # avg_loss has a meaning only in the main worker + avg_loss = np.mean(worker_val_losses) + if self.config.save_best and avg_loss < min_valid_epoch_loss: + min_valid_epoch_loss = avg_loss + checkpoint = { + 'epoch': epoch, + 'model_state_dict': cvae_model.state_dict(), + 'optim_state_dict': optimizer.state_dict(), + 'val_loss': valid_epoch_loss, + } + # save checkpoint only if it is better than + # the previous ones + checkpoint_filename = f"outputs/cvae_model_{season}1d_{self.config.n_memb}memb.pth" + torch.save(checkpoint, checkpoint_filename) + # itwinai - log checkpoint as artifact + self.log(checkpoint_filename, + os.path.basename(checkpoint_filename), + kind='artifact') + + print(f"Train Loss: {train_epoch_loss:.4f}") + print(f"Val Loss: {valid_epoch_loss:.4f}") + + save_loss_plot(train_loss, valid_loss, season) + # save the loss evolutions + pd.DataFrame(train_loss).to_csv( + f"outputs/train_loss_indiv_{season}1d_{self.config.n_memb}memb.csv" + ) + pd.DataFrame(valid_loss).to_csv( + f"outputs/test_loss_indiv_{season}1d_{self.config.n_memb}memb.csv" + ) + + # Clean-up + if self.strategy.is_main_worker and self.logger: + self.logger.destroy_logger_context() - # Clean-up - if self.strategy.is_main_worker and self.logger: - self.logger.destroy_logger_context() self.strategy.clean_up() - # emissions = tracker.stop() - # print(f"Emissions from this training run: {emissions:.5f} kg CO2eq") + # emissions = tracker.stop() + # print(f"Emissions from this training run: {emissions:.5f} kg CO2eq") diff --git a/use-cases/xtclim/startscript b/use-cases/xtclim/startscript index fa192a80..27c27d64 100755 --- a/use-cases/xtclim/startscript +++ b/use-cases/xtclim/startscript @@ -14,7 +14,7 @@ #SBATCH --exclusive # command -COMMAND="train.py -p pipeline.yaml" +COMMAND="train.py" EXEC="$COMMAND" # set modules diff --git a/use-cases/xtclim/train.py b/use-cases/xtclim/train.py index 4ed63633..cd8e5441 100644 --- a/use-cases/xtclim/train.py +++ b/use-cases/xtclim/train.py @@ -7,24 +7,30 @@ import argparse import logging from datetime import datetime +import yaml sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) sys.path.append(os.path.join(os.path.dirname(__file__), 'preprocessing')) -from itwinai.parser import ConfigParser, ArgumentParser +from itwinai.parser import ConfigParser -if __name__ == "__main__": - parser = ArgumentParser() - parser.add_argument( - "-p", "--pipeline", type=str, required=True, - help='Configuration file to the pipeline to execute.' - ) - args = parser.parse_args() - - pipe_parser = ConfigParser( - config=args.pipeline, - ) +def read_config(file_path): + with open(file_path, 'r') as f: + config = yaml.safe_load(f) + return config - pipeline = pipe_parser.parse_pipeline() - pipeline.execute() +if __name__ == "__main__": + # read the config file defined in pipeline.yaml + config = read_config('pipeline.yaml') + # load the list of seasons + seasons_list = config['seasons'] + # loop over the seasons and launch pipelines iteratively + for season in seasons_list: + config['pipeline']['init_args']['steps']['training-step']['init_args']['seasons'] = season + pipe_parser = ConfigParser( + config=config, + ) + pipeline = pipe_parser.parse_pipeline() + print(f"Running pipeline for season: {season}") + pipeline.execute() From 1d9ce4f0913f352d494fc26bcf25f8e60977d9f1 Mon Sep 17 00:00:00 2001 From: r-sarma Date: Wed, 2 Oct 2024 14:17:25 +0200 Subject: [PATCH 03/18] Distributed inference for CERFACS and src --- src/itwinai/torch/inference.py | 180 ++++++++++++++++++---- use-cases/xtclim/pipeline.yaml | 21 ++- use-cases/xtclim/src/anomaly.py | 264 +++++++++++++++++++------------- use-cases/xtclim/train.py | 7 +- 4 files changed, 329 insertions(+), 143 deletions(-) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index 0a3e9567..e38fe630 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -9,15 +9,24 @@ import abc import os -from typing import Any, Dict, Optional, Union +from typing import Any, Dict, List, Literal, Optional, Union import torch from torch import nn from torch.utils.data import DataLoader, Dataset from ..components import Predictor, monitor_exec +from ..loggers import Logger from ..serialization import ModelLoader -from ..utils import clear_key, dynamically_import_class +from .config import TrainingConfiguration +from .distributed import ( + DeepSpeedStrategy, + HorovodStrategy, + NonDistributedStrategy, + TorchDDPStrategy, + TorchDistributedStrategy, + distributed_resources_available, +) from .type import Batch @@ -42,8 +51,10 @@ def __call__(self) -> nn.Module: """ if os.path.exists(self.model_uri): # Model is on local filesystem. - model = torch.load(self.model_uri) - return model.eval() + device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + model = torch.load(self.model_uri, map_location=device) + return model + if self.model_uri.startswith("mlflow+"): # Model is on an MLFLow server @@ -79,43 +90,114 @@ def __call__(self) -> nn.Module: class TorchPredictor(Predictor): """Applies a pre-trained torch model to unseen data.""" + _strategy: TorchDistributedStrategy = None + #: PyTorch ``DataLoader`` for inference dataset. + inference_dataloader: DataLoader = None #: Pre-trained PyTorch model used to make predictions. model: nn.Module = None - #: ``Dataset`` on which to make predictions (ML inference). - test_dataset: Dataset - #: ``DataLoader`` for test dataset. - test_dataloader: DataLoader = None + #: itwinai ``itwinai.Logger`` + logger: Logger = None def __init__( self, + config: Union[Dict, TrainingConfiguration], model: Union[nn.Module, ModelLoader], - test_dataloader_class: str = "torch.utils.data.DataLoader", - test_dataloader_kwargs: Optional[Dict] = None, - name: str = None, + strategy: Literal["ddp", "deepspeed", "horovod"] = 'ddp', + logger: Optional[Logger] = None, + checkpoints_location: str = "checkpoints", + name: str = None ) -> None: super().__init__(model=model, name=name) self.save_parameters(**self.locals2params(locals())) - self.model = self.model.eval() - - # Train and validation dataloaders - self.test_dataloader_class = dynamically_import_class(test_dataloader_class) - test_dataloader_kwargs = ( - test_dataloader_kwargs if test_dataloader_kwargs is not None else {} + if isinstance(config, dict): + self.config = TrainingConfiguration(**config) + else: + self.config = config + self.model = self.model + self.strategy = strategy + self.logger = logger + self.checkpoints_location = checkpoints_location + + @property + def strategy(self) -> TorchDistributedStrategy: + """Strategy currently in use.""" + return self._strategy + + @strategy.setter + def strategy(self, strategy: Union[str, TorchDistributedStrategy]) -> None: + if isinstance(strategy, TorchDistributedStrategy): + self._strategy = strategy + else: + self._strategy = self._detect_strategy(strategy) + + @property + def device(self) -> str: + """Current device from distributed strategy.""" + return self.strategy.device() + + def _detect_strategy(self, strategy: str) -> TorchDistributedStrategy: + if not distributed_resources_available(): + print("WARNING: falling back to non-distributed strategy.") + dist_str = NonDistributedStrategy() + elif strategy == 'ddp': + dist_str = TorchDDPStrategy(backend='nccl') + elif strategy == 'horovod': + dist_str = HorovodStrategy() + elif strategy == 'deepspeed': + dist_str = DeepSpeedStrategy(backend='nccl') + else: + raise NotImplementedError( + f"Strategy '{strategy}' is not recognized/implemented.") + return dist_str + + def _init_distributed_strategy(self) -> None: + if not self.strategy.is_initialized: + self.strategy.init() + + def distribute_model(self) -> None: + """ + Distribute the torch model with the chosen strategy. + """ + if self.model is None: + raise ValueError( + "self.model is None! Mandatory constructor argument " + ) + distribute_kwargs = {} + # Distributed model, optimizer, and scheduler + (self.model,) = self.strategy.distributed( + self.model, None, None, **distribute_kwargs ) - self.test_dataloader_kwargs = clear_key( - test_dataloader_kwargs, "train_dataloader_kwargs", "dataset" + + def create_dataloaders( + self, + inference_dataset: Dataset + ) -> None: + """ + Create inference dataloader. + + Args: + inference_dataset (Dataset): inference dataset object. + """ + + self.inference_dataloader = self.strategy.create_dataloader( + dataset=inference_dataset, + batch_size=self.config.batch_size, + num_workers=self.config.num_workers_dataloader, + pin_memory=self.config.pin_gpu_memory, + generator=self.torch_rng, + shuffle=self.config.shuffle_test ) @monitor_exec def execute( self, - test_dataset: Dataset, + inference_dataset: Dataset, model: nn.Module = None, ) -> Dict[str, Any]: """Applies a torch model to a dataset for inference. Args: - test_dataset (Dataset[str, Any]): each item in this dataset is a + inference_dataset (Dataset[str, Any]): each item in this dataset is a couple (item_unique_id, item) model (nn.Module, optional): torch model. Overrides the existing model, if given. Defaults to None. @@ -124,19 +206,27 @@ def execute( Dict[str, Any]: maps each item ID to the corresponding predicted value(s). """ + self._init_distributed_strategy() if model is not None: # Overrides existing "internal" model self.model = model - test_dataloader = self.test_dataloader_class( - test_dataset, **self.test_dataloader_kwargs + self.create_dataloaders( + inference_dataset=inference_dataset ) + self.distribute_model() + + if self.logger: + self.logger.create_logger_context(rank=self.strategy.global_rank()) + hparams = self.config.model_dump() + hparams['distributed_strategy'] = self.strategy.__class__.__name__ + self.logger.save_hyperparameters(hparams) + all_predictions = dict() - for samples_ids, samples in test_dataloader: + for samples_ids, samples in inference_dataset: with torch.no_grad(): pred = self.model(samples) - pred = self.transform_predictions(pred) for idx, pre in zip(samples_ids, pred): # For each item in the batch if pre.numel() == 1: @@ -144,13 +234,45 @@ def execute( else: pre = pre.to_dense().tolist() all_predictions[idx] = pre + + if self.logger: + self.logger.destroy_logger_context() + + self.strategy.clean_up() + return all_predictions - @abc.abstractmethod - def transform_predictions(self, batch: Batch) -> Batch: - """Post-process the predictions of the torch model (e.g., apply - threshold in case of multi-label classifier). + def log( + self, + item: Union[Any, List[Any]], + identifier: Union[str, List[str]], + kind: str = 'metric', + step: Optional[int] = None, + batch_idx: Optional[int] = None, + **kwargs + ) -> None: + """Log ``item`` with ``identifier`` name of ``kind`` type at ``step`` + time step. + + Args: + item (Union[Any, List[Any]]): element to be logged (e.g., metric). + identifier (Union[str, List[str]]): unique identifier for the + element to log(e.g., name of a metric). + kind (str, optional): type of the item to be logged. Must be one + among the list of self.supported_types. Defaults to 'metric'. + step (Optional[int], optional): logging step. Defaults to None. + batch_idx (Optional[int], optional): DataLoader batch counter + (i.e., batch idx), if available. Defaults to None. """ + if self.logger: + self.logger.log( + item=item, + identifier=identifier, + kind=kind, + step=step, + batch_idx=batch_idx, + **kwargs + ) class MulticlassTorchPredictor(TorchPredictor): diff --git a/use-cases/xtclim/pipeline.yaml b/use-cases/xtclim/pipeline.yaml index b67e1391..2a062716 100644 --- a/use-cases/xtclim/pipeline.yaml +++ b/use-cases/xtclim/pipeline.yaml @@ -21,7 +21,7 @@ pipeline: preprocessing-split-step: class_path: preprocessing.preprocess_2d_seasons.SplitPreprocessedData init_args: - scenario: ${scenario} + scenario: ${scenario} training-step: class_path: src.trainer.XTClimTrainer init_args: @@ -29,6 +29,7 @@ pipeline: batch_size: ${batch_size} lr: ${lr} strategy: ${strategy} + # season is dynamically imported logger: class_path: itwinai.loggers.LoggersCollection init_args: @@ -36,11 +37,23 @@ pipeline: - class_path: itwinai.loggers.MLFlowLogger init_args: experiment_name: XTClim (Cerfacs) - log_freq: epoch - - class_path: itwinai.loggers.WandBLogger - init_args: log_freq: epoch + # - class_path: itwinai.loggers.WandBLogger + # init_args: + # log_freq: epoch evaluation-step: class_path: src.anomaly.XTClimPredictor init_args: evaluation: ${evaluation} + batch_size: ${batch_size} + strategy: ${strategy} + # model_uri and season are dynamically imported + logger: + class_path: itwinai.loggers.LoggersCollection + init_args: + loggers: + - class_path: itwinai.loggers.MLFlowLogger + init_args: + experiment_name: XTClim (Cerfacs) + log_freq: epoch + diff --git a/use-cases/xtclim/src/anomaly.py b/use-cases/xtclim/src/anomaly.py index 1cb7a6a4..59af4dc0 100644 --- a/use-cases/xtclim/src/anomaly.py +++ b/use-cases/xtclim/src/anomaly.py @@ -1,132 +1,182 @@ -from typing import Literal +from typing import Literal, Optional import torch import numpy as np import pandas as pd from operator import add -#from itwinai.torch.inference import TorchPredictor -from itwinai.components import monitor_exec, Predictor +from itwinai.torch.inference import TorchModelLoader, TorchPredictor +from itwinai.loggers import Logger +from itwinai.components import monitor_exec +from itwinai.torch.config import TrainingConfiguration -import model +import model as NNmodel from torch.utils.data import DataLoader from engine import evaluate from initialization import beta, criterion, n_avg, pixel_wise_criterion -# TODO: itwinai doesnt support distributed inference at the moment!! -device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') - -# pick the season to study among: -# '' (none, i.e. full dataset), 'winter_', 'spring_', 'summer_', 'autumn_' - -# This is now moved to the configuration file -# choose wether to evaluate train and test data, and/or projections -#past_evaluation = False -#future_evaluation = True - -class XTClimPredictor(Predictor): +class XTClimPredictor(TorchPredictor): def __init__( self, - evaluation: Literal["past", "future"] = 'past' + batch_size: int, + evaluation: Literal["past", "future"] = 'past', + seasons: Literal["winter_", "spring_", "summer_", "autumn_"] = 'winter_', + strategy: Literal["ddp", "deepspeed", "horovod"] = 'ddp', + model_uri: str = None, + logger: Optional[Logger] = None, + checkpoints_location: str = "checkpoints", + name: str = None ): - super().__init__(model) + model_loader = TorchModelLoader(model_uri) if model_uri else None + super().__init__( + model=model_loader, + config={}, + strategy=strategy, + logger=logger, + checkpoints_location=checkpoints_location, + name=name + ) self.evaluation = evaluation + self.batch_size = batch_size + self.seasons = seasons + self.config = TrainingConfiguration( + batch_size=batch_size, + ) @monitor_exec def execute(self): - + # Initialize distributed backend + self._init_distributed_strategy() if self.evaluation=='past': - for season in ['winter_', 'spring_', 'summer_', 'autumn_']: - - # load previously trained model - cvae_model = model.ConvVAE().to(device) - cvae_model.load_state_dict(torch.load(f'outputs/cvae_model_{season}1d_1memb.pth')['model_state_dict'], strict=False) - - # train set and data loader - train_time = pd.read_csv(f"input/dates_train_{season}data_1memb.csv") - train_data = np.load(f"input/preprocessed_1d_train_{season}data_1memb.npy") - n_train = len(train_data) - trainset = [ ( torch.from_numpy(np.reshape(train_data[i], (2, 32, 32))), - train_time['0'][i] ) for i in range(n_train) ] - trainloader = DataLoader( - trainset, batch_size=1, shuffle=False - ) + season = self.seasons + # load previously trained model + cvae_model = NNmodel.ConvVAE().to(self.device) + cvae_model.load_state_dict(self.model['model_state_dict'], strict=False) + + distribute_kwargs = {} + # Distributed model, optimizer, and scheduler + cvae_model, _, _ = self.strategy.distributed( + cvae_model, None, None, **distribute_kwargs + ) + + # train set and data loader + train_time = pd.read_csv(f"input/dates_train_{season}data_1memb.csv") + train_data = np.load(f"input/preprocessed_1d_train_{season}data_1memb.npy") + n_train = len(train_data) + trainset = [ ( torch.from_numpy(np.reshape(train_data[i], (2, 32, 32))), + train_time['0'][i] ) for i in range(n_train) ] + trainloader = self.strategy.create_dataloader( + trainset, batch_size=self.config.batch_size, shuffle=False + ) + + # test set and data loader + test_time = pd.read_csv(f"input/dates_test_{season}data_1memb.csv") + test_data = np.load(f"input/preprocessed_1d_test_{season}data_1memb.npy") + n_test = len(test_data) + testset = [ ( torch.from_numpy(np.reshape(test_data[i], (2, 32, 32))), + test_time['0'][i] ) for i in range(n_test) ] + testloader = self.strategy.create_dataloader( + testset, batch_size=self.config.batch_size, shuffle=False + ) + + if self.strategy.is_main_worker and self.logger: + self.logger.create_logger_context() + + # average over a few iterations + # for a better reconstruction estimate + train_avg_losses, _, tot_train_losses, _ = evaluate(cvae_model, trainloader, + trainset, self.device, + criterion, + pixel_wise_criterion) + test_avg_losses, _, tot_test_losses, _ = evaluate(cvae_model, testloader, + testset, self.device, criterion, + pixel_wise_criterion) + for i in range(1, n_avg): + train_avg_loss, _, train_losses, _ = evaluate(cvae_model, trainloader, + trainset, self.device, criterion, + pixel_wise_criterion) + tot_train_losses = list(map(add, tot_train_losses, train_losses)) + train_avg_losses += train_avg_loss + test_avg_loss, _, test_losses, _ = evaluate(cvae_model, testloader, + testset, self.device, criterion, + pixel_wise_criterion) + tot_test_losses = list(map(add, tot_test_losses, test_losses)) + test_avg_losses += test_avg_loss + tot_train_losses = np.array(tot_train_losses)/n_avg + tot_test_losses = np.array(tot_test_losses)/n_avg + train_avg_losses = train_avg_losses/n_avg + test_avg_losses = test_avg_losses/n_avg + + pd.DataFrame(tot_train_losses).to_csv(f"outputs/train_losses_{season}1d_allssp.csv") + pd.DataFrame(tot_test_losses).to_csv(f"outputs/test_losses_{season}1d_allssp.csv") + print('Train average loss:', train_avg_losses) + print('Test average loss:', test_avg_losses) + + self.log(train_avg_losses, + 'Average train loss', + kind='metric') + self.log(test_avg_losses, + 'Average test loss', + kind='metric') + + # Clean-up + if self.strategy.is_main_worker and self.logger: + self.logger.destroy_logger_context() + + self.strategy.clean_up() - # test set and data loader - test_time = pd.read_csv(f"input/dates_test_{season}data_1memb.csv") - test_data = np.load(f"input/preprocessed_1d_test_{season}data_1memb.npy") - n_test = len(test_data) - testset = [ ( torch.from_numpy(np.reshape(test_data[i], (2, 32, 32))), - test_time['0'][i] ) for i in range(n_test) ] - testloader = DataLoader( - testset, batch_size=1, shuffle=False + else: + season = self.seasons + # load previously trained model + cvae_model = NNmodel.ConvVAE().to(self.device) + cvae_model.load_state_dict(self.model['model_state_dict'], strict=False) + + distribute_kwargs = {} + # Distributed model, optimizer, and scheduler + cvae_model, _, _ = self.strategy.distributed( + cvae_model, **distribute_kwargs + ) + + if self.strategy.is_main_worker and self.logger: + self.logger.create_logger_context() + + for scenario in ['585', '245']: + + # projection set and data loader + proj_time = pd.read_csv(f"input/dates_proj_{season}data_1memb.csv") + proj_data = np.load(f"input/preprocessed_1d_proj{scenario}_{season}data_1memb.npy") + n_proj = len(proj_data) + projset = [ ( torch.from_numpy(np.reshape(proj_data[i], (3, 32, 32))), + proj_time['0'][i] ) for i in range(n_proj) ] + projloader = self.create_dataloaders( + projset, batch_size=1, shuffle=False ) - # average over a few iterations - # for a better reconstruction estimate - train_avg_losses, _, tot_train_losses, _ = evaluate(cvae_model, trainloader, - trainset, device, - criterion, - pixel_wise_criterion) - test_avg_losses, _, tot_test_losses, _ = evaluate(cvae_model, testloader, - testset, device, criterion, - pixel_wise_criterion) + # get the losses for each data set + # on various experiments to have representative statistics + proj_avg_losses, _, tot_proj_losses, _ = evaluate(cvae_model, projloader, + projset, self.device, criterion, + pixel_wise_criterion) + for i in range(1, n_avg): - train_avg_loss, _, train_losses, _ = evaluate(cvae_model, trainloader, - trainset, device, criterion, - pixel_wise_criterion) - tot_train_losses = list(map(add, tot_train_losses, train_losses)) - train_avg_losses += train_avg_loss - test_avg_loss, _, test_losses, _ = evaluate(cvae_model, testloader, - testset, device, criterion, - pixel_wise_criterion) - tot_test_losses = list(map(add, tot_test_losses, test_losses)) - test_avg_losses += test_avg_loss - tot_train_losses = np.array(tot_train_losses)/n_avg - tot_test_losses = np.array(tot_test_losses)/n_avg - train_avg_losses = train_avg_losses/n_avg - test_avg_losses = test_avg_losses/n_avg + proj_avg_loss, _, proj_losses, _ = evaluate(cvae_model, projloader, + projset, self.device, criterion, + pixel_wise_criterion) + tot_proj_losses = list(map(add, tot_proj_losses, proj_losses)) + proj_avg_losses += proj_avg_loss - pd.DataFrame(tot_train_losses).to_csv(f"outputs/train_losses_{season}1d_allssp.csv") - pd.DataFrame(tot_test_losses).to_csv(f"outputs/test_losses_{season}1d_allssp.csv") - print('Train average loss:', train_avg_losses) - print('Test average loss:', test_avg_losses) + tot_proj_losses = np.array(tot_proj_losses)/n_avg + proj_avg_losses = proj_avg_losses/n_avg + # save the losses time series + pd.DataFrame(tot_proj_losses).to_csv(f"outputs/proj{scenario}_losses_{season}1d_allssp.csv") + print(f'SSP{scenario} Projection average loss:', proj_avg_losses, 'for', season[:-1]) - else: - for season in ['winter_', 'spring_', 'summer_', 'autumn_']: - - # load previously trained model - cvae_model = model.ConvVAE().to(device) - cvae_model.load_state_dict(torch.load(f'outputs/cvae_model_{season}1d_1memb.pth')['model_state_dict'], strict=False) - - for scenario in ['585', '245']: - - # projection set and data loader - proj_time = pd.read_csv(f"input/dates_proj_{season}data_1memb.csv") - proj_data = np.load(f"input/preprocessed_1d_proj{scenario}_{season}data_1memb.npy") - n_proj = len(proj_data) - projset = [ ( torch.from_numpy(np.reshape(proj_data[i], (3, 32, 32))), - proj_time['0'][i] ) for i in range(n_proj) ] - projloader = DataLoader( - projset, batch_size=1, shuffle=False - ) - - # get the losses for each data set - # on various experiments to have representative statistics - proj_avg_losses, _, tot_proj_losses, _ = evaluate(cvae_model, projloader, - projset, device, criterion, - pixel_wise_criterion) - - for i in range(1, n_avg): - proj_avg_loss, _, proj_losses, _ = evaluate(cvae_model, projloader, - projset, device, criterion, - pixel_wise_criterion) - tot_proj_losses = list(map(add, tot_proj_losses, proj_losses)) - proj_avg_losses += proj_avg_loss - - tot_proj_losses = np.array(tot_proj_losses)/n_avg - proj_avg_losses = proj_avg_losses/n_avg - - # save the losses time series - pd.DataFrame(tot_proj_losses).to_csv(f"outputs/proj{scenario}_losses_{season}1d_allssp.csv") - print(f'SSP{scenario} Projection average loss:', proj_avg_losses, 'for', season[:-1]) + self.log(proj_avg_losses, + 'Average projection loss', + kind='metric') + + # Clean-up + if self.strategy.is_main_worker and self.logger: + self.logger.destroy_logger_context() + + self.strategy.clean_up() diff --git a/use-cases/xtclim/train.py b/use-cases/xtclim/train.py index cd8e5441..75614e48 100644 --- a/use-cases/xtclim/train.py +++ b/use-cases/xtclim/train.py @@ -20,13 +20,14 @@ def read_config(file_path): return config if __name__ == "__main__": - # read the config file defined in pipeline.yaml + config = read_config('pipeline.yaml') - # load the list of seasons seasons_list = config['seasons'] - # loop over the seasons and launch pipelines iteratively for season in seasons_list: config['pipeline']['init_args']['steps']['training-step']['init_args']['seasons'] = season + model_uri = f"outputs/cvae_model_{season}1d_1memb.pth" + config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['model_uri'] = model_uri + config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['seasons'] = season pipe_parser = ConfigParser( config=config, ) From ea624b2cd5828892e4ed8a321fa03d5f52f675c5 Mon Sep 17 00:00:00 2001 From: r-sarma Date: Wed, 2 Oct 2024 14:35:59 +0200 Subject: [PATCH 04/18] Linter errors --- src/itwinai/torch/inference.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index e38fe630..ae8439ee 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -30,6 +30,7 @@ from .type import Batch + class TorchModelLoader(ModelLoader): """Loads a torch model from somewhere. @@ -55,8 +56,7 @@ def __call__(self) -> nn.Module: model = torch.load(self.model_uri, map_location=device) return model - - if self.model_uri.startswith("mlflow+"): + if self.model_uri.startswith('mlflow+'): # Model is on an MLFLow server # Form is 'mlflow+MLFLOW_TRACKING_URI+RUN_ID+ARTIFACT_PATH' import mlflow From 57a6a4778230ab336f1d0fbeba395e57dcb188f8 Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Wed, 2 Oct 2024 14:51:11 +0200 Subject: [PATCH 05/18] Update inference.py --- src/itwinai/torch/inference.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index ae8439ee..9b14c481 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -30,7 +30,6 @@ from .type import Batch - class TorchModelLoader(ModelLoader): """Loads a torch model from somewhere. From 2e2594448220725034a8f57efd7c0fcff820483f Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Mon, 7 Oct 2024 11:58:38 +0200 Subject: [PATCH 06/18] Update config.yaml --- use-cases/mnist/torch/config.yaml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/use-cases/mnist/torch/config.yaml b/use-cases/mnist/torch/config.yaml index efa9c26b..cec0d7e9 100644 --- a/use-cases/mnist/torch/config.yaml +++ b/use-cases/mnist/torch/config.yaml @@ -98,11 +98,9 @@ inference_pipeline: class_path: itwinai.torch.inference.TorchModelLoader init_args: model_uri: ${inference_model_mlflow_uri} - test_dataloader_kwargs: - batch_size: ${batch_size} - class_path: saver.TorchMNISTLabelSaver init_args: save_dir: ${predictions_dir} predictions_file: ${predictions_file} - class_labels: ${class_labels} \ No newline at end of file + class_labels: ${class_labels} From 3e31f8e57bd1262dc4cc6c99628691a4cc82c27f Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Mon, 7 Oct 2024 13:28:46 +0200 Subject: [PATCH 07/18] Update test_mnist.py --- tests/use-cases/test_mnist.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/use-cases/test_mnist.py b/tests/use-cases/test_mnist.py index 7d6fe753..b0f58329 100644 --- a/tests/use-cases/test_mnist.py +++ b/tests/use-cases/test_mnist.py @@ -87,7 +87,8 @@ def test_mnist_inference_torch(torch_env, install_requirements): ) with tempfile.TemporaryDirectory() as temp_dir: # Create fake inference dataset and checkpoint - generate_model_cmd = f"{torch_env}/bin/python {exec} " f"--root {temp_dir}" + generate_model_cmd = (f"{torch_env}/bin/python {exec} " + f"--root {temp_dir}") subprocess.run(generate_model_cmd.split(), check=True, cwd=temp_dir) # Running inference From c9a97afd1f487d8852fe28e8500092a277bf14e3 Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Tue, 8 Oct 2024 14:43:58 +0200 Subject: [PATCH 08/18] Update config.yaml --- use-cases/mnist/torch/config.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/use-cases/mnist/torch/config.yaml b/use-cases/mnist/torch/config.yaml index cec0d7e9..79d60cf9 100644 --- a/use-cases/mnist/torch/config.yaml +++ b/use-cases/mnist/torch/config.yaml @@ -98,6 +98,11 @@ inference_pipeline: class_path: itwinai.torch.inference.TorchModelLoader init_args: model_uri: ${inference_model_mlflow_uri} + config: + batch_size: 32 + num_workers_dataloader: 4 + pin_gpu_memory: true + shuffle_test: false - class_path: saver.TorchMNISTLabelSaver init_args: From af542ad38e466e1b8faff8e3060c97c9c1cec04e Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Tue, 8 Oct 2024 15:19:41 +0200 Subject: [PATCH 09/18] Update inference.py --- src/itwinai/torch/inference.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index 9b14c481..31002dfb 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -94,6 +94,8 @@ class TorchPredictor(Predictor): inference_dataloader: DataLoader = None #: Pre-trained PyTorch model used to make predictions. model: nn.Module = None + #: PyTorch random number generator (PRNG). + torch_rng: torch.Generator = None #: itwinai ``itwinai.Logger`` logger: Logger = None From 87ef6b884eb4420527c7fc78ca86d85de5c8589c Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Tue, 8 Oct 2024 20:28:10 +0200 Subject: [PATCH 10/18] Update inference.py --- src/itwinai/torch/inference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index 31002dfb..3b8c1b68 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -165,7 +165,7 @@ def distribute_model(self) -> None: ) distribute_kwargs = {} # Distributed model, optimizer, and scheduler - (self.model,) = self.strategy.distributed( + self.model,_,_ = self.strategy.distributed( self.model, None, None, **distribute_kwargs ) From 6d17c9846664e4625973ca90da4a11d10cb27ccc Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Mon, 14 Oct 2024 10:41:19 +0200 Subject: [PATCH 11/18] Update inference.py --- src/itwinai/torch/inference.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index 3b8c1b68..0525cf13 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -114,7 +114,7 @@ def __init__( self.config = TrainingConfiguration(**config) else: self.config = config - self.model = self.model + self.model = self.model.eval() self.strategy = strategy self.logger = logger self.checkpoints_location = checkpoints_location @@ -165,7 +165,7 @@ def distribute_model(self) -> None: ) distribute_kwargs = {} # Distributed model, optimizer, and scheduler - self.model,_,_ = self.strategy.distributed( + self.model, _, _ = self.strategy.distributed( self.model, None, None, **distribute_kwargs ) @@ -225,9 +225,10 @@ def execute( self.logger.save_hyperparameters(hparams) all_predictions = dict() - for samples_ids, samples in inference_dataset: + for ids, (samples_ids, samples) in enumerate(self.inference_dataloader): with torch.no_grad(): - pred = self.model(samples) + pred = self.model(samples.to(self.device)) + pred = self.transform_predictions(pred) for idx, pre in zip(samples_ids, pred): # For each item in the batch if pre.numel() == 1: @@ -274,6 +275,13 @@ def log( batch_idx=batch_idx, **kwargs ) + + @abc.abstractmethod + def transform_predictions(self, batch: Batch) -> Batch: + """ + Post-process the predictions of the torch model (e.g., apply + threshold in case of multi-label classifier). + """ class MulticlassTorchPredictor(TorchPredictor): From 5a07a76e3c793fb76e615ed86cf40a44f5f63766 Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Mon, 14 Oct 2024 10:59:43 +0200 Subject: [PATCH 12/18] Update inference.py --- src/itwinai/torch/inference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index 0525cf13..636f0e8c 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -275,7 +275,7 @@ def log( batch_idx=batch_idx, **kwargs ) - + @abc.abstractmethod def transform_predictions(self, batch: Batch) -> Batch: """ From b88ecf15ae17c18cff43f45bea7f4fc2b7e2c735 Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Mon, 14 Oct 2024 11:19:00 +0200 Subject: [PATCH 13/18] Update config.yaml --- use-cases/mnist/torch/config.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/use-cases/mnist/torch/config.yaml b/use-cases/mnist/torch/config.yaml index 79d60cf9..90338932 100644 --- a/use-cases/mnist/torch/config.yaml +++ b/use-cases/mnist/torch/config.yaml @@ -103,6 +103,7 @@ inference_pipeline: num_workers_dataloader: 4 pin_gpu_memory: true shuffle_test: false + strategy: ${strategy} - class_path: saver.TorchMNISTLabelSaver init_args: From d6b550268201762b1e161afd34eb89823c07d5d7 Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Mon, 14 Oct 2024 11:30:08 +0200 Subject: [PATCH 14/18] Update inference.py --- src/itwinai/torch/inference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index 636f0e8c..1b6a587a 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -164,7 +164,7 @@ def distribute_model(self) -> None: "self.model is None! Mandatory constructor argument " ) distribute_kwargs = {} - # Distributed model, optimizer, and scheduler + # Distributed model self.model, _, _ = self.strategy.distributed( self.model, None, None, **distribute_kwargs ) From 5da81073140b19d1604c6a8f309b1d4541aa8908 Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:07:25 +0200 Subject: [PATCH 15/18] Update inference.py --- src/itwinai/torch/inference.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index 1b6a587a..41ae28f3 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -78,7 +78,7 @@ def __call__(self) -> nn.Module: tracking_uri=mlflow.get_tracking_uri(), ) model = torch.load(ckpt_path) - return model.eval() + return model raise ValueError( "Unrecognized model URI: model may not be there! " @@ -114,7 +114,6 @@ def __init__( self.config = TrainingConfiguration(**config) else: self.config = config - self.model = self.model.eval() self.strategy = strategy self.logger = logger self.checkpoints_location = checkpoints_location @@ -276,7 +275,6 @@ def log( **kwargs ) - @abc.abstractmethod def transform_predictions(self, batch: Batch) -> Batch: """ Post-process the predictions of the torch model (e.g., apply From a26365505d0bc874f3994210911b7866fc695f8d Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Mon, 21 Oct 2024 15:10:55 +0200 Subject: [PATCH 16/18] Update inference.py --- src/itwinai/torch/inference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/itwinai/torch/inference.py b/src/itwinai/torch/inference.py index 41ae28f3..ac232c69 100644 --- a/src/itwinai/torch/inference.py +++ b/src/itwinai/torch/inference.py @@ -5,9 +5,9 @@ # # Credit: # - Matteo Bunino - CERN +# - Rakesh Sarma - FZJ # -------------------------------------------------------------------------------------- -import abc import os from typing import Any, Dict, List, Literal, Optional, Union From bea7e5435fa3d5a7de7a2b0b86b36806f0dc40a2 Mon Sep 17 00:00:00 2001 From: r-sarma Date: Wed, 23 Oct 2024 14:03:42 +0200 Subject: [PATCH 17/18] Additional of HPO to CERFACS usecase --- use-cases/xtclim/README.md | 19 ++- use-cases/xtclim/hpo.py | 263 ++++++++++++++++++++++++++++++++ use-cases/xtclim/pipeline.yaml | 12 +- use-cases/xtclim/slurm_ray.sh | 93 +++++++++++ use-cases/xtclim/src/anomaly.py | 6 +- use-cases/xtclim/src/engine.py | 2 +- use-cases/xtclim/src/trainer.py | 15 +- 7 files changed, 394 insertions(+), 16 deletions(-) create mode 100644 use-cases/xtclim/hpo.py create mode 100644 use-cases/xtclim/slurm_ray.sh diff --git a/use-cases/xtclim/README.md b/use-cases/xtclim/README.md index 058c8834..475620fe 100644 --- a/use-cases/xtclim/README.md +++ b/use-cases/xtclim/README.md @@ -27,7 +27,7 @@ The `anomaly.py` file evaluates the network on the available datasets - train, t ## Installation Please follow the documentation to install the itwinai environment. -After that, install the required libraries with the itwinai environment with: +After that, install the required libraries within the itwinai environment with: ```bash pip install -r Requirements.txt @@ -37,7 +37,7 @@ pip install -r Requirements.txt The config file `pipeline.yaml` contains all the steps to execute the workflow. This file also contains all the seasons, and a separate run is launched for each season. -You can launch it from the root of the repository with: +You can launch the pipeline through `train.py` from the root of the repository with: ```bash python train.py @@ -59,6 +59,8 @@ sbatch --export=ALL,DIST_MODE="$DIST_MODE",RUN_NAME="$RUN_NAME",PYTHON_VENV="$PY ``` The results and/or errors are available in `job.out` and `job.err` log files. +Training and inference steps are defined in the pipeline, where distributed resources +are exploited in both the steps. With MLFLow logger, the logs can be visualized in the MLFlow UI: @@ -69,3 +71,16 @@ mlflow ui --backend-store-uri mllogs/mlflow mlflow ui --backend-store-uri mllogs/mlflow > /dev/null 2>&1 & ``` +### Hyperparameter Optimization (HPO) + +The repository also provides functionality to perform HPO with Ray. With HPO, +multiple trials with different hyperparameter configurations are run in a distributed +infrastructure, typically in an HPC environment. This allows finding the optimal +configurations which provides the minimal/maximal loss for the investigated network. +The `hpo.py` file contains the implementation, which launches the `pipeline.yaml` pipeline. +To launch an HPO experiment, simply run: +```bash +sbatch slurm_ray.sh +``` +The parsing arguments to the `hpo.py` file can be changed to customize the required parameters +that need to be considered in the HPO process. diff --git a/use-cases/xtclim/hpo.py b/use-cases/xtclim/hpo.py new file mode 100644 index 00000000..a2717dbb --- /dev/null +++ b/use-cases/xtclim/hpo.py @@ -0,0 +1,263 @@ +import argparse +import os +import sys +from pathlib import Path +from typing import Dict +import yaml + +import matplotlib.pyplot as plt +import ray +import torch +from ray import train, tune + +from itwinai.parser import ConfigParser + +sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) +sys.path.append(os.path.join(os.path.dirname(__file__), 'preprocessing')) + +def read_config(file_path): + with open(file_path, 'r') as f: + config = yaml.safe_load(f) + return config + +def run_trial(config: Dict, data: Dict): + """Execute a single trial using the given configuration (config). + This runs a full training pipeline - you can also specify a pipeline as a dictionary, + e.g. if you only want to run certain parts without changing your config.yaml file + (see below). + + Args: + config (dict): A dictionary containing hyperparameters, such as: + - 'batch_size' (int): The size of the batch for training. + - 'lr' (float): The learning rate for the optimizer. + data (dict): A dictionary containing a "pipeline_path" field, which points to the yaml + file containing the pipeline definition + """ + config = read_config('pipeline.yaml') + seasons_list = config['seasons'] + for season in seasons_list: + config['pipeline']['init_args']['steps']['training-step']['init_args']['seasons'] = season + model_uri = f"outputs/cvae_model_{season}1d_1memb.pth" + config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['model_uri'] = model_uri + config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['seasons'] = season + parser = ConfigParser( + config=config, + override_keys={ + # Set hyperparameters controlled by ray + 'batch_size': config['batch_size'], + 'lr': config['lr'], + # Override logger field, because performance is logged by ray + #'training_pipeline.init_args.steps.2.init_args.logger': None + } + ) + my_pipeline = parser.parse_pipeline( + pipeline_nested_key=data["pipeline_name"], + verbose=False + ) + print(f"Running pipeline for season: {season}") + my_pipeline.execute() + + +def run_hpo(args): + """Run hyperparameter optimization using Ray Tune. + Either starts a new optimization run or resumes from previous results. + + Args: + - args: Command-line arguments parsed by argparse. + """ + if not args.load_old_results: + + # Initialize Ray with cluster configuration from environment variables + ray.init( + address=os.environ["ip_head"], + _node_ip_address=os.environ["head_node_ip"], + ) + + # Define the search space for hyperparameters + search_space = { + 'batch_size': tune.choice([64, 128, 256]), + 'lr': tune.uniform(1e-5, 1e-3) + } + + # TuneConfig for configuring search algorithm and scheduler + tune_config = tune.TuneConfig( + metric=args.metric, # Metric to optimize (loss by default) + mode="min", # Minimize the loss + num_samples=args.num_samples # Number of trials to run + ) + + # Ray's RunConfig for experiment name and stopping criteria + run_config = train.RunConfig( + name="XTClim-Ray-Experiment", + stop={"training_iteration": args.max_iterations} + ) + + # Determine GPU and CPU utilization per trial + # We are allocating all available ressources per node evenly across trials + ngpus_per_trial = max(1, args.ngpus // args.num_samples) + ncpus_per_trial = max(1, args.ncpus // args.num_samples) + + # Set up Ray Tune Tuner with resources and parameters + resources_per_trial = {"gpu": ngpus_per_trial, "cpu": ncpus_per_trial} + trainable_with_resources = tune.with_resources( + run_trial, + resources=resources_per_trial + ) + + data = {"pipeline_name": args.pipeline_name} + trainable_with_parameters = tune.with_parameters( + trainable_with_resources, + data=data + ) + + tuner = tune.Tuner( + trainable_with_parameters, + tune_config=tune_config, + run_config=run_config, + param_space=search_space + ) + + # Run the hyperparameter optimization and get results + result_grid = tuner.fit() + + else: + # Load results from an earlier Ray Tune run + print(f"Loading results from {args.experiment_path}...") + + # Restore tuner from saved results + restored_tuner = tune.Tuner.restore( + args.experiment_path, + trainable=run_trial + ) + result_grid = restored_tuner.get_results() + + # Display experiment statistics + print(f"Number of errored trials: {result_grid.num_errors}") + print(f"Number of terminated trials: {result_grid.num_terminated}") + print(f"Ray Tune experiment path: {result_grid.experiment_path}") + + # Get the best result based on the last 10 iterations' average + best_result = result_grid.get_best_result( + scope="last-10-avg", + metric=args.metric, + mode="min" + ) + print(f"Best result: {best_result}") + + # Print a dataframe with all trial results + result_df = result_grid.get_dataframe() + print(f"All results dataframe: {result_df}") + print(f"All result columns: {result_df.columns}") + + # Plot the results for all trials + plot_results( + result_grid, + metric=args.metric, + filename="ray-loss-plot.png" + ) + plot_results( + result_grid, + metric="valid_loss", + filename="ray-valid_loss-plot.png" + ) + + +def plot_results(result_grid, metric="loss", filename="plot.png"): + """Plot the results for all trials and save the plot to a file. + + Args: + - result_grid: Results from Ray Tune trials. + - metric: The metric to plot (e.g., 'loss'). + - filename: Name of the file to save the plot. + """ + ax = None + for result in result_grid: + label = f"lr={result.config['lr']:.6f}, batch size={result.config['batch_size']}" + if ax is None: + ax = result.metrics_dataframe.plot( + "training_iteration", metric, label=label + ) + else: + result.metrics_dataframe.plot( + "training_iteration", metric, ax=ax, label=label + ) + + ax.set_title( + f"{metric.capitalize()} vs. Training Iteration for All Trials" + ) + ax.set_ylabel(metric.capitalize()) + + plt.savefig(filename) + + # Show the plot + plt.show() + + +# Main entry point for script execution +if __name__ == "__main__": + + # Parse command-line arguments + parser = argparse.ArgumentParser( + description='Hyperparameter Optimization with Ray Tune' + ) + parser.add_argument( + '--load_old_results', + type=bool, + default=False, + help='Set this to true if you want to load results from an older ray run.' + ) + parser.add_argument( + '--pipeline_name', + type=str, + default='training_pipeline', + help='Name of the training pipeline to be used. \ + This pipeline has to be defined in a file called "config.yaml". \ + Defaults to "training_pipeline"' + ) + parser.add_argument( + '--experiment_path', + type=str, + default='~/ray_results/XTClim-Ray-Experiment', + help='Directory where the results of the previous run are stored. \ + Set this only if load_old_results is set to True. \ + Defaults to ~/ray_results/XTClim-Ray-Experiment' + ) + parser.add_argument( + '--num_samples', + type=int, + default=10, help='Number of trials to run' + ) + parser.add_argument( + '--ngpus', + type=int, + help='Number of GPUs available on node.' + ) + parser.add_argument( + '--ncpus', + type=int, + help='Number of CPUs available on node.' + ) + parser.add_argument( + '--metric', + type=str, + default='loss', + help='Metric to optimise.' + ) + parser.add_argument( + '--max_iterations', + type=int, + default='20', + help='Maximum iterations per trial' + ) + + args = parser.parse_args() # Parse the command-line arguments + + # Check for available GPU + if torch.cuda.is_available(): + device = 'cuda' + print(f"Using GPU: {torch.cuda.get_device_name(torch.cuda.current_device())}") + else: + device = 'cpu' + print("Using CPU") + + run_hpo(args) diff --git a/use-cases/xtclim/pipeline.yaml b/use-cases/xtclim/pipeline.yaml index 2a062716..a1180e70 100644 --- a/use-cases/xtclim/pipeline.yaml +++ b/use-cases/xtclim/pipeline.yaml @@ -38,15 +38,15 @@ pipeline: init_args: experiment_name: XTClim (Cerfacs) log_freq: epoch - # - class_path: itwinai.loggers.WandBLogger - # init_args: - # log_freq: epoch + - class_path: itwinai.loggers.WandBLogger + init_args: + log_freq: epoch evaluation-step: class_path: src.anomaly.XTClimPredictor init_args: - evaluation: ${evaluation} - batch_size: ${batch_size} - strategy: ${strategy} + evaluation: ${evaluation} + batch_size: ${batch_size} + strategy: ${strategy} # model_uri and season are dynamically imported logger: class_path: itwinai.loggers.LoggersCollection diff --git a/use-cases/xtclim/slurm_ray.sh b/use-cases/xtclim/slurm_ray.sh new file mode 100644 index 00000000..b133ccfd --- /dev/null +++ b/use-cases/xtclim/slurm_ray.sh @@ -0,0 +1,93 @@ +#!/bin/bash + +# Job configuration +#SBATCH --job-name=ray_tune_hpo +#SBATCH --account=intertwin +#SBATCH --time 01:00:00 + +# Resources allocation +#SBATCH --cpus-per-task=8 +#SBATCH --nodes=1 +#SBATCH --ntasks-per-node=1 +#SBATCH --gpus-per-node=4 +#SBATCH --partition=batch +#SBATCH --exclusive + +# Output and error logs +#SBATCH -o job.out +#SBATCH -e job.err + +# Load environment modules +ml Stages/2024 GCC OpenMPI CUDA/12 MPI-settings/CUDA Python HDF5 PnetCDF libaio mpi4py + +# Set and activate virtual environment +PYTHON_VENV="envAI_hdfml" +source $PYTHON_VENV/bin/activate + +# make sure CUDA devices are visible +export CUDA_VISIBLE_DEVICES="0,1,2,3" + +num_gpus=$SLURM_GPUS_PER_NODE +num_cpus=$SLURM_CPUS_PER_TASK + +# This tells Tune to not change the working directory to the trial directory +# which makes relative paths accessible from inside a trial +export RAY_CHDIR_TO_TRIAL_DIR=0 + +export RAY_USAGE_STATS_DISABLE=1 + +######### Set up Ray cluster ######## + +# Get the node names +nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST") +nodes_array=($nodes) + +# The head node will act as the central manager (head) of the Ray cluster. +head_node=${nodes_array[0]} +port=7639 # This port will be used by Ray to communicate with worker nodes. + +# This is so that the ray.init() command called from the hpo.py script knows which ports to connect to +export ip_head="$head_node"i:"$port" +export head_node_ip="$head_node"i + +echo "Starting HEAD at $head_node" +# Start Ray on the head node. +# The `--head` option specifies that this node will be the head of the Ray cluster. +# `srun` submits a job that runs on the head node to start the Ray head with the specified +# number of CPUs and GPUs. +srun --nodes=1 --ntasks=1 -w "$head_node" \ + ray start --head --node-ip-address="$head_node"i --port=$port \ + --num-cpus "$num_cpus" --num-gpus "$num_gpus" --block & + +# Wait for a few seconds to ensure that the head node has fully initialized. +sleep 10 + +echo HEAD node started. + +# Start Ray worker nodes +# These nodes will connect to the head node and become part of the Ray cluster. +worker_num=$((SLURM_JOB_NUM_NODES - 1)) # Total number of worker nodes (excluding the head node). +for ((i = 1; i <= worker_num; i++)); do + node_i=${nodes_array[$i]} # Get the current worker node hostname. + echo "Starting WORKER $i at $node_i" + + # Use srun to start Ray on the worker node and connect it to the head node. + # The `--address` option tells the worker node where to find the head node. + srun --nodes=1 --ntasks=1 -w "$node_i" \ + ray start --address "$head_node"i:"$port" --redis-password='5241580000000000' \ + --num-cpus "$num_cpus" --num-gpus "$num_gpus" --block & + + sleep 5 # Wait for a few seconds before starting the next worker to prevent race conditions. +done +echo All Ray workers started. + +############################################################################################## + + +# Run the Python script using Ray +echo 'Starting HPO.' + +python hpo.py --num_samples 8 --max_iterations 2 --ngpus $num_gpus --ncpus $num_cpus --pipeline_name pipeline + +# Shutdown Ray after completion +ray stop diff --git a/use-cases/xtclim/src/anomaly.py b/use-cases/xtclim/src/anomaly.py index 59af4dc0..b854ca93 100644 --- a/use-cases/xtclim/src/anomaly.py +++ b/use-cases/xtclim/src/anomaly.py @@ -9,10 +9,10 @@ from itwinai.components import monitor_exec from itwinai.torch.config import TrainingConfiguration -import model as NNmodel +import src.model as NNmodel from torch.utils.data import DataLoader -from engine import evaluate -from initialization import beta, criterion, n_avg, pixel_wise_criterion +from src.engine import evaluate +from src.initialization import beta, criterion, n_avg, pixel_wise_criterion class XTClimPredictor(TorchPredictor): def __init__( diff --git a/use-cases/xtclim/src/engine.py b/use-cases/xtclim/src/engine.py index 28032291..4ce70c0b 100644 --- a/use-cases/xtclim/src/engine.py +++ b/use-cases/xtclim/src/engine.py @@ -1,6 +1,6 @@ from tqdm import tqdm import torch -from initialization import pixel_wise_criterion +from src.initialization import pixel_wise_criterion def final_loss(bce_loss, mu, logvar, beta=0.1): """ diff --git a/use-cases/xtclim/src/trainer.py b/use-cases/xtclim/src/trainer.py index 2edbbac6..d409d762 100644 --- a/use-cases/xtclim/src/trainer.py +++ b/use-cases/xtclim/src/trainer.py @@ -35,12 +35,13 @@ from itwinai.loggers import Logger from itwinai.torch.config import TrainingConfiguration -import model +from ray import train as train_ray +import src.model as model from torch.utils.data import DataLoader from torchvision.utils import make_grid -from engine import train, validate -from utils import save_reconstructed_images, save_loss_plot, save_ex -from initialization import beta, criterion, pixel_wise_criterion +from src.engine import train, validate +from src.utils import save_reconstructed_images, save_loss_plot, save_ex +from src.initialization import beta, criterion, pixel_wise_criterion class XTClimTrainer(TorchTrainer): def __init__( @@ -245,6 +246,12 @@ def execute(self): print(f"Train Loss: {train_epoch_loss:.4f}") print(f"Val Loss: {valid_epoch_loss:.4f}") + # Report training metrics of last epoch to Ray + train_ray.report( + {"loss": train_epoch_loss, + "valid_loss": valid_epoch_loss} + ) + save_loss_plot(train_loss, valid_loss, season) # save the loss evolutions pd.DataFrame(train_loss).to_csv( From 4b35c963998417e597670ee728bc48be0397ae6f Mon Sep 17 00:00:00 2001 From: r-sarma <126173968+r-sarma@users.noreply.github.com> Date: Thu, 24 Oct 2024 17:50:00 +0200 Subject: [PATCH 18/18] Update hpo.py --- use-cases/xtclim/hpo.py | 40 ++++++++++++++++++---------------------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/use-cases/xtclim/hpo.py b/use-cases/xtclim/hpo.py index a2717dbb..fe528f76 100644 --- a/use-cases/xtclim/hpo.py +++ b/use-cases/xtclim/hpo.py @@ -15,10 +15,6 @@ sys.path.append(os.path.join(os.path.dirname(__file__), 'src')) sys.path.append(os.path.join(os.path.dirname(__file__), 'preprocessing')) -def read_config(file_path): - with open(file_path, 'r') as f: - config = yaml.safe_load(f) - return config def run_trial(config: Dict, data: Dict): """Execute a single trial using the given configuration (config). @@ -33,27 +29,27 @@ def run_trial(config: Dict, data: Dict): data (dict): A dictionary containing a "pipeline_path" field, which points to the yaml file containing the pipeline definition """ - config = read_config('pipeline.yaml') - seasons_list = config['seasons'] + with open('pipeline.yaml', 'r') as f: + yaml_config = yaml.safe_load(f) + + # Override keys of hyperparameters to be tuned from Ray config + yaml_config['batch_size'] = config['batch_size'] + yaml_config['lr'] = config['lr'] + + # Set loggers to None as Ray logs the runs + yaml_config['pipeline']['init_args']['steps']['training-step']['init_args']['logger'] = None + yaml_config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['logger'] = None + + seasons_list = yaml_config['seasons'] for season in seasons_list: - config['pipeline']['init_args']['steps']['training-step']['init_args']['seasons'] = season + yaml_config['pipeline']['init_args']['steps']['training-step']['init_args']['seasons'] = season model_uri = f"outputs/cvae_model_{season}1d_1memb.pth" - config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['model_uri'] = model_uri - config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['seasons'] = season + yaml_config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['model_uri'] = model_uri + yaml_config['pipeline']['init_args']['steps']['evaluation-step']['init_args']['seasons'] = season parser = ConfigParser( - config=config, - override_keys={ - # Set hyperparameters controlled by ray - 'batch_size': config['batch_size'], - 'lr': config['lr'], - # Override logger field, because performance is logged by ray - #'training_pipeline.init_args.steps.2.init_args.logger': None - } - ) - my_pipeline = parser.parse_pipeline( - pipeline_nested_key=data["pipeline_name"], - verbose=False + config=yaml_config, ) + my_pipeline = parser.parse_pipeline() print(f"Running pipeline for season: {season}") my_pipeline.execute() @@ -93,7 +89,7 @@ def run_hpo(args): ) # Determine GPU and CPU utilization per trial - # We are allocating all available ressources per node evenly across trials + # We are allocating all available resources per node evenly across trials ngpus_per_trial = max(1, args.ngpus // args.num_samples) ncpus_per_trial = max(1, args.ncpus // args.num_samples)