"""
This module provides the EvaluateModel class.
"""
import logging
import os
import warnings
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable
from .sequences import Genome
from .utils import _is_lua_trained_model
from .utils import initialize_logger
from .utils import load_model_from_state_dict
from .utils import PerformanceMetrics
logger = logging.getLogger("selene")
[docs]class EvaluateModel(object):
"""
Evaluate model on a test set of sequences with known targets.
Parameters
----------
model : torch.nn.Module
The model architecture.
criterion : torch.nn._Loss
The loss function that was optimized during training.
data_sampler : selene_sdk.samplers.Sampler
Used to retrieve samples from the test set for evaluation.
features : list(str)
List of distinct features the model predicts.
trained_model_path : str
Path to the trained model file, saved using `torch.save`.
output_dir : str
The output directory in which to save model evaluation and logs.
batch_size : int, optional
Default is 64. Specify the batch size to process examples.
Should be a power of 2.
n_test_samples : int or None, optional
Default is `None`. Use `n_test_samples` if you want to limit the
number of samples on which you evaluate your model. If you are
using a sampler of type `selene_sdk.samplers.OnlineSampler`,
by default it will draw 640000 samples if `n_test_samples` is `None`.
report_gt_feature_n_positives : int, optional
Default is 10. In the final test set, each class/feature must have
more than `report_gt_feature_n_positives` positive samples in order to
be considered in the test performance computation. The output file that
states each class' performance will report 'NA' for classes that do
not have enough positive samples.
use_cuda : bool, optional
Default is `False`. Specify whether a CUDA-enabled GPU is available
for torch to use during training.
data_parallel : bool, optional
Default is `False`. Specify whether multiple GPUs are available
for torch to use during training.
use_features_ord : list(str) or None, optional
Default is None. Specify an ordered list of features for which to
run the evaluation. The features in this list must be identical to or
a subset of `features`, and in the order you want the resulting
`test_targets.npz` and `test_predictions.npz` to be saved.
Attributes
----------
model : torch.nn.Module
The trained model.
criterion : torch.nn._Loss
The model was trained using this loss function.
sampler : selene_sdk.samplers.Sampler
The example generator.
features : list(str)
List of distinct features the model predicts.
batch_size : int
The batch size to process examples. Should be a power of 2.
use_cuda : bool
If `True`, use a CUDA-enabled GPU. If `False`, use the CPU.
data_parallel : bool
Whether to use multiple GPUs or not.
"""
def __init__(self,
model,
criterion,
data_sampler,
features,
trained_model_path,
output_dir,
batch_size=64,
n_test_samples=None,
report_gt_feature_n_positives=10,
use_cuda=False,
data_parallel=False,
use_features_ord=None):
self.criterion = criterion
trained_model = torch.load(
trained_model_path, map_location=lambda storage, location: storage)
if "state_dict" in trained_model:
self.model = load_model_from_state_dict(
trained_model["state_dict"], model)
else:
self.model = load_model_from_state_dict(
trained_model, model)
self.model.eval()
self.sampler = data_sampler
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
self.features = features
self._use_ixs = list(range(len(features)))
if use_features_ord is not None:
feature_ixs = {f: ix for (ix, f) in enumerate(features)}
self._use_ixs = []
self.features = []
for f in use_features_ord:
if f in feature_ixs:
self._use_ixs.append(feature_ixs[f])
self.features.append(f)
else:
warnings.warn(("Feature {0} in `use_features_ord` "
"does not match any features in the list "
"`features` and will be skipped.").format(f))
self._write_features_ordered_to_file()
initialize_logger(
os.path.join(self.output_dir, "{0}.log".format(
__name__)),
verbosity=2)
self.data_parallel = data_parallel
if self.data_parallel:
self.model = nn.DataParallel(model)
logger.debug("Wrapped model in DataParallel")
self.use_cuda = use_cuda
if self.use_cuda:
self.model.cuda()
self.batch_size = batch_size
self._metrics = PerformanceMetrics(
self._get_feature_from_index,
report_gt_feature_n_positives=report_gt_feature_n_positives)
self._test_data, self._all_test_targets = \
self.sampler.get_data_and_targets(self.batch_size, n_test_samples)
# TODO: we should be able to do this on the sampler end instead of
# here. the current workaround is problematic, since
# self._test_data still has the full featureset in it, and we
# select the subset during `evaluate`
self._all_test_targets = self._all_test_targets[:, self._use_ixs]
# reset Genome base ordering when applicable.
if (hasattr(self.sampler, "reference_sequence") and
isinstance(self.sampler.reference_sequence, Genome)):
if _is_lua_trained_model(model):
Genome.update_bases_order(['A', 'G', 'C', 'T'])
else:
Genome.update_bases_order(['A', 'C', 'G', 'T'])
def _write_features_ordered_to_file(self):
"""
Write the feature ordering specified by `use_features_ord`
after matching it with the `features` list from the class
initialization parameters.
"""
fp = os.path.join(self.output_dir, 'use_features_ord.txt')
with open(fp, 'w+') as file_handle:
for f in self.features:
file_handle.write('{0}\n'.format(f))
def _get_feature_from_index(self, index):
"""
Gets the feature at an index in the features list.
Parameters
----------
index : int
Returns
-------
str
The name of the feature/target at the specified index.
"""
return self.features[index]
[docs] def evaluate(self):
"""
Passes all samples retrieved from the sampler to the model in
batches and returns the predictions. Also reports the model's
performance on these examples.
Returns
-------
dict
A dictionary, where keys are the features and the values are
each a dict of the performance metrics (currently ROC AUC and
AUPR) reported for each feature the model predicts.
"""
batch_losses = []
all_predictions = []
for (inputs, targets) in self._test_data:
inputs = torch.Tensor(inputs)
targets = torch.Tensor(targets[:, self._use_ixs])
if self.use_cuda:
inputs = inputs.cuda()
targets = targets.cuda()
with torch.no_grad():
inputs = Variable(inputs)
targets = Variable(targets)
predictions = None
if _is_lua_trained_model(self.model):
predictions = self.model.forward(
inputs.transpose(1, 2).contiguous().unsqueeze_(2))
else:
predictions = self.model.forward(
inputs.transpose(1, 2))
predictions = predictions[:, self._use_ixs]
loss = self.criterion(predictions, targets)
all_predictions.append(predictions.data.cpu().numpy())
batch_losses.append(loss.item())
all_predictions = np.vstack(all_predictions)
average_scores = self._metrics.update(
all_predictions, self._all_test_targets)
self._metrics.visualize(
all_predictions, self._all_test_targets, self.output_dir)
np.savez_compressed(
os.path.join(self.output_dir, "test_predictions.npz"),
data=all_predictions)
np.savez_compressed(
os.path.join(self.output_dir, "test_targets.npz"),
data=self._all_test_targets)
loss = np.average(batch_losses)
logger.info("test loss: {0}".format(loss))
for name, score in average_scores.items():
logger.info("test {0}: {1}".format(name, score))
test_performance = os.path.join(
self.output_dir, "test_performance.txt")
feature_scores_dict = self._metrics.write_feature_scores_to_file(
test_performance)
return feature_scores_dict