Source code for selene_sdk.evaluate_model

"""
This module provides the EvaluateModel class.
"""
import logging
import os
import warnings

import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable

from .sequences import Genome
from .utils import _is_lua_trained_model
from .utils import initialize_logger
from .utils import load_model_from_state_dict
from .utils import PerformanceMetrics


logger = logging.getLogger("selene")


[docs]class EvaluateModel(object): """ Evaluate model on a test set of sequences with known targets. Parameters ---------- model : torch.nn.Module The model architecture. criterion : torch.nn._Loss The loss function that was optimized during training. data_sampler : selene_sdk.samplers.Sampler Used to retrieve samples from the test set for evaluation. features : list(str) List of distinct features the model predicts. trained_model_path : str Path to the trained model file, saved using `torch.save`. output_dir : str The output directory in which to save model evaluation and logs. batch_size : int, optional Default is 64. Specify the batch size to process examples. Should be a power of 2. n_test_samples : int or None, optional Default is `None`. Use `n_test_samples` if you want to limit the number of samples on which you evaluate your model. If you are using a sampler of type `selene_sdk.samplers.OnlineSampler`, by default it will draw 640000 samples if `n_test_samples` is `None`. report_gt_feature_n_positives : int, optional Default is 10. In the final test set, each class/feature must have more than `report_gt_feature_n_positives` positive samples in order to be considered in the test performance computation. The output file that states each class' performance will report 'NA' for classes that do not have enough positive samples. use_cuda : bool, optional Default is `False`. Specify whether a CUDA-enabled GPU is available for torch to use during training. data_parallel : bool, optional Default is `False`. Specify whether multiple GPUs are available for torch to use during training. use_features_ord : list(str) or None, optional Default is None. Specify an ordered list of features for which to run the evaluation. The features in this list must be identical to or a subset of `features`, and in the order you want the resulting `test_targets.npz` and `test_predictions.npz` to be saved. Attributes ---------- model : torch.nn.Module The trained model. criterion : torch.nn._Loss The model was trained using this loss function. sampler : selene_sdk.samplers.Sampler The example generator. features : list(str) List of distinct features the model predicts. batch_size : int The batch size to process examples. Should be a power of 2. use_cuda : bool If `True`, use a CUDA-enabled GPU. If `False`, use the CPU. data_parallel : bool Whether to use multiple GPUs or not. """ def __init__(self, model, criterion, data_sampler, features, trained_model_path, output_dir, batch_size=64, n_test_samples=None, report_gt_feature_n_positives=10, use_cuda=False, data_parallel=False, use_features_ord=None): self.criterion = criterion trained_model = torch.load( trained_model_path, map_location=lambda storage, location: storage) if "state_dict" in trained_model: self.model = load_model_from_state_dict( trained_model["state_dict"], model) else: self.model = load_model_from_state_dict( trained_model, model) self.model.eval() self.sampler = data_sampler self.output_dir = output_dir os.makedirs(output_dir, exist_ok=True) self.features = features self._use_ixs = list(range(len(features))) if use_features_ord is not None: feature_ixs = {f: ix for (ix, f) in enumerate(features)} self._use_ixs = [] self.features = [] for f in use_features_ord: if f in feature_ixs: self._use_ixs.append(feature_ixs[f]) self.features.append(f) else: warnings.warn(("Feature {0} in `use_features_ord` " "does not match any features in the list " "`features` and will be skipped.").format(f)) self._write_features_ordered_to_file() initialize_logger( os.path.join(self.output_dir, "{0}.log".format( __name__)), verbosity=2) self.data_parallel = data_parallel if self.data_parallel: self.model = nn.DataParallel(model) logger.debug("Wrapped model in DataParallel") self.use_cuda = use_cuda if self.use_cuda: self.model.cuda() self.batch_size = batch_size self._metrics = PerformanceMetrics( self._get_feature_from_index, report_gt_feature_n_positives=report_gt_feature_n_positives) self._test_data, self._all_test_targets = \ self.sampler.get_data_and_targets(self.batch_size, n_test_samples) # TODO: we should be able to do this on the sampler end instead of # here. the current workaround is problematic, since # self._test_data still has the full featureset in it, and we # select the subset during `evaluate` self._all_test_targets = self._all_test_targets[:, self._use_ixs] # reset Genome base ordering when applicable. if (hasattr(self.sampler, "reference_sequence") and isinstance(self.sampler.reference_sequence, Genome)): if _is_lua_trained_model(model): Genome.update_bases_order(['A', 'G', 'C', 'T']) else: Genome.update_bases_order(['A', 'C', 'G', 'T']) def _write_features_ordered_to_file(self): """ Write the feature ordering specified by `use_features_ord` after matching it with the `features` list from the class initialization parameters. """ fp = os.path.join(self.output_dir, 'use_features_ord.txt') with open(fp, 'w+') as file_handle: for f in self.features: file_handle.write('{0}\n'.format(f)) def _get_feature_from_index(self, index): """ Gets the feature at an index in the features list. Parameters ---------- index : int Returns ------- str The name of the feature/target at the specified index. """ return self.features[index]
[docs] def evaluate(self): """ Passes all samples retrieved from the sampler to the model in batches and returns the predictions. Also reports the model's performance on these examples. Returns ------- dict A dictionary, where keys are the features and the values are each a dict of the performance metrics (currently ROC AUC and AUPR) reported for each feature the model predicts. """ batch_losses = [] all_predictions = [] for (inputs, targets) in self._test_data: inputs = torch.Tensor(inputs) targets = torch.Tensor(targets[:, self._use_ixs]) if self.use_cuda: inputs = inputs.cuda() targets = targets.cuda() with torch.no_grad(): inputs = Variable(inputs) targets = Variable(targets) predictions = None if _is_lua_trained_model(self.model): predictions = self.model.forward( inputs.transpose(1, 2).contiguous().unsqueeze_(2)) else: predictions = self.model.forward( inputs.transpose(1, 2)) predictions = predictions[:, self._use_ixs] loss = self.criterion(predictions, targets) all_predictions.append(predictions.data.cpu().numpy()) batch_losses.append(loss.item()) all_predictions = np.vstack(all_predictions) average_scores = self._metrics.update( all_predictions, self._all_test_targets) self._metrics.visualize( all_predictions, self._all_test_targets, self.output_dir) np.savez_compressed( os.path.join(self.output_dir, "test_predictions.npz"), data=all_predictions) np.savez_compressed( os.path.join(self.output_dir, "test_targets.npz"), data=self._all_test_targets) loss = np.average(batch_losses) logger.info("test loss: {0}".format(loss)) for name, score in average_scores.items(): logger.info("test {0}: {1}".format(name, score)) test_performance = os.path.join( self.output_dir, "test_performance.txt") feature_scores_dict = self._metrics.write_feature_scores_to_file( test_performance) return feature_scores_dict