"""
This module provides the RandomPositionsSampler class.
TODO: Currently, only works with sequences from `selene_sdk.sequences.Genome`.
We would like to generalize this to `selene_sdk.sequences.Sequence` if possible.
"""
from collections import namedtuple
import logging
import random
import numpy as np
from functools import wraps
from .online_sampler import OnlineSampler
from ..utils import get_indices_and_probabilities
logger = logging.getLogger(__name__)
SampleIndices = namedtuple(
"SampleIndices", ["indices", "weights"])
"""
A tuple containing the indices for some samples, and a weight to
allot to each index when randomly drawing from them.
TODO: this is common to both the intervals sampler and the
random positions sampler. Can we move this to utils or
somewhere else?
Parameters
----------
indices : list(int)
The numeric index of each sample.
weights : list(float)
The amount of weight assigned to each sample.
Attributes
----------
indices : list(int)
The numeric index of each sample.
weights : list(float)
The amount of weight assigned to each sample.
"""
[docs]class RandomPositionsSampler(OnlineSampler):
"""This sampler randomly selects a position in the genome and queries for
a sequence centered at that position for input to the model.
TODO: generalize to selene_sdk.sequences.Sequence?
Parameters
----------
reference_sequence : selene_sdk.sequences.Genome
A reference sequence from which to create examples.
target_path : str
Path to tabix-indexed, compressed BED file (`*.bed.gz`) of genomic
coordinates mapped to the genomic features we want to predict.
features : list(str)
List of distinct features that we aim to predict.
seed : int, optional
Default is 436. Sets the random seed for sampling.
validation_holdout : list(str) or float, optional
Default is `['chr6', 'chr7']`. Holdout can be regional or
proportional. If regional, expects a list (e.g. `['chrX', 'chrY']`).
Regions must match those specified in the first column of the
tabix-indexed BED file. If proportional, specify a percentage
between (0.0, 1.0). Typically 0.10 or 0.20.
test_holdout : list(str) or float, optional
Default is `['chr8', 'chr9']`. See documentation for
`validation_holdout` for additional information.
sequence_length : int, optional
Default is 1000. Model is trained on sequences of `sequence_length`
where genomic features are annotated to the center regions of
these sequences.
center_bin_to_predict : int, optional
Default is 200. Query the tabix-indexed file for a region of
length `center_bin_to_predict`.
feature_thresholds : float [0.0, 1.0], optional
Default is 0.5. The `feature_threshold` to pass to the
`GenomicFeatures` object.
mode : {'train', 'validate', 'test'}
Default is `'train'`. The mode to run the sampler in.
save_datasets : list(str), optional
Default is `['test']`. The list of modes for which we should
save the sampled data to file.
output_dir : str or None, optional
Default is None. The path to the directory where we should
save sampled examples for a mode. If `save_datasets` is
a non-empty list, `output_dir` must be specified. If
the path in `output_dir` does not exist it will be created
automatically.
Attributes
----------
reference_sequence : selene_sdk.sequences.Genome
The reference sequence that examples are created from.
target : selene_sdk.targets.Target
The `selene_sdk.targets.Target` object holding the features that we
would like to predict.
validation_holdout : list(str) or float
The samples to hold out for validating model performance. These
can be "regional" or "proportional". If regional, this is a list
of region names (e.g. `['chrX', 'chrY']`). These regions must
match those specified in the first column of the tabix-indexed
BED file. If proportional, this is the fraction of total samples
that will be held out.
test_holdout : list(str) or float
The samples to hold out for testing model performance. See the
documentation for `validation_holdout` for more details.
sequence_length : int
The length of the sequences to train the model on.
modes : list(str)
The list of modes that the sampler can be run in.
mode : str
The current mode that the sampler is running in. Must be one of
the modes listed in `modes`.
"""
def __init__(self,
reference_sequence,
target_path,
features,
seed=436,
validation_holdout=['chr6', 'chr7'],
test_holdout=['chr8', 'chr9'],
sequence_length=1000,
center_bin_to_predict=200,
feature_thresholds=0.5,
mode="train",
save_datasets=[],
output_dir=None):
super(RandomPositionsSampler, self).__init__(
reference_sequence,
target_path,
features,
seed=seed,
validation_holdout=validation_holdout,
test_holdout=test_holdout,
sequence_length=sequence_length,
center_bin_to_predict=center_bin_to_predict,
feature_thresholds=feature_thresholds,
mode=mode,
save_datasets=save_datasets,
output_dir=output_dir)
self._sample_from_mode = {}
self._randcache = {}
for mode in self.modes:
self._sample_from_mode[mode] = None
self._randcache[mode] = {"cache_indices": None, "sample_next": 0}
self.sample_from_intervals = []
self.interval_lengths = []
self._initialized = False
def init(func):
# delay initialization to allow multiprocessing
@wraps(func)
def dfunc(self, *args, **kwargs):
if not self._initialized:
if self._holdout_type == "chromosome":
self._partition_genome_by_chromosome()
else:
self._partition_genome_by_proportion()
for mode in self.modes:
self._update_randcache(mode=mode)
self._initialized = True
return func(self, *args, **kwargs)
return dfunc
def _partition_genome_by_proportion(self):
for chrom, len_chrom in self.reference_sequence.get_chr_lens():
self.sample_from_intervals.append(
(chrom,
self.sequence_length,
len_chrom - self.sequence_length))
self.interval_lengths.append(len_chrom)
n_intervals = len(self.sample_from_intervals)
select_indices = list(range(n_intervals))
np.random.shuffle(select_indices)
n_indices_validate = int(n_intervals * self.validation_holdout)
val_indices, val_weights = get_indices_and_probabilities(
self.interval_lengths, select_indices[:n_indices_validate])
self._sample_from_mode["validate"] = SampleIndices(
val_indices, val_weights)
if self.test_holdout:
n_indices_test = int(n_intervals * self.test_holdout)
test_indices_end = n_indices_test + n_indices_validate
test_indices, test_weights = get_indices_and_probabilities(
self.interval_lengths,
select_indices[n_indices_validate:test_indices_end])
self._sample_from_mode["test"] = SampleIndices(
test_indices, test_weights)
tr_indices, tr_weights = get_indices_and_probabilities(
self.interval_lengths, select_indices[test_indices_end:])
self._sample_from_mode["train"] = SampleIndices(
tr_indices, tr_weights)
else:
tr_indices, tr_weights = get_indices_and_probabilities(
self.interval_lengths, select_indices[n_indices_validate:])
self._sample_from_mode["train"] = SampleIndices(
tr_indices, tr_weights)
def _partition_genome_by_chromosome(self):
for mode in self.modes:
self._sample_from_mode[mode] = SampleIndices([], [])
for index, (chrom, len_chrom) in enumerate(self.reference_sequence.get_chr_lens()):
if chrom in self.validation_holdout:
self._sample_from_mode["validate"].indices.append(
index)
elif self.test_holdout and chrom in self.test_holdout:
self._sample_from_mode["test"].indices.append(
index)
else:
self._sample_from_mode["train"].indices.append(
index)
self.sample_from_intervals.append(
(chrom,
self.sequence_length,
len_chrom - self.sequence_length))
self.interval_lengths.append(len_chrom - 2 * self.sequence_length)
for mode in self.modes:
sample_indices = self._sample_from_mode[mode].indices
indices, weights = get_indices_and_probabilities(
self.interval_lengths, sample_indices)
self._sample_from_mode[mode] = \
self._sample_from_mode[mode]._replace(
indices=indices, weights=weights)
def _retrieve(self, chrom, position):
bin_start = position - self._start_radius
bin_end = position + self._end_radius
retrieved_targets = self.target.get_feature_data(
chrom, bin_start, bin_end)
window_start = position - self._start_window_radius
window_end = position + self._end_window_radius
if window_end - window_start < self.sequence_length:
print(bin_start, bin_end,
self._start_radius, self._end_radius,
self._start_window_radius, self._end_window_radius,)
return None
strand = self.STRAND_SIDES[random.randint(0, 1)]
retrieved_seq = \
self.reference_sequence.get_encoding_from_coords(
chrom, window_start, window_end, strand)
if retrieved_seq.shape[0] == 0:
logger.info("Full sequence centered at {0} position {1} "
"could not be retrieved. Sampling again.".format(
chrom, position))
return None
elif np.mean(retrieved_seq==0.25) > 0.30:
logger.info("Over 30% of the bases in the sequence centered "
"at {0} position {1} are ambiguous ('N'). "
"Sampling again.".format(chrom, position))
return None
if self.mode in self._save_datasets:
feature_indices = ';'.join(
[str(f) for f in np.nonzero(retrieved_targets)[0]])
self._save_datasets[self.mode].append(
[chrom,
window_start,
window_end,
strand,
feature_indices])
if len(self._save_datasets[self.mode]) > 200000:
self.save_dataset_to_file(self.mode)
return (retrieved_seq, retrieved_targets)
def _update_randcache(self, mode=None):
if not mode:
mode = self.mode
self._randcache[mode]["cache_indices"] = np.random.choice(
self._sample_from_mode[mode].indices,
size=200000,
replace=True,
p=self._sample_from_mode[mode].weights)
self._randcache[mode]["sample_next"] = 0
[docs] @init
def sample(self, batch_size=1, mode=None):
"""
Randomly draws a mini-batch of examples and their corresponding
labels.
Parameters
----------
batch_size : int, optional
Default is 1. The number of examples to include in the
mini-batch.
mode : str, optional
Default is None. The operating mode that the object should run in.
If None, will use the current mode `self.mode`.
Returns
-------
sequences, targets : tuple(numpy.ndarray, numpy.ndarray)
A tuple containing the numeric representation of the
sequence examples and their corresponding labels. The
shape of `sequences` will be
:math:`B \\times L \\times N`, where :math:`B` is
`batch_size`, :math:`L` is the sequence length, and
:math:`N` is the size of the sequence type's alphabet.
The shape of `targets` will be :math:`B \\times F`,
where :math:`F` is the number of features.
"""
mode = mode if mode else self.mode
sequences = np.zeros((batch_size, self.sequence_length, 4))
targets = np.zeros((batch_size, self.n_features))
n_samples_drawn = 0
while n_samples_drawn < batch_size:
sample_index = self._randcache[mode]["sample_next"]
if sample_index == len(self._randcache[mode]["cache_indices"]):
self._update_randcache()
sample_index = 0
rand_interval_index = \
self._randcache[mode]["cache_indices"][sample_index]
self._randcache[mode]["sample_next"] += 1
chrom, cstart, cend = \
self.sample_from_intervals[rand_interval_index]
position = np.random.randint(cstart, cend)
retrieve_output = self._retrieve(chrom, position)
if not retrieve_output:
continue
seq, seq_targets = retrieve_output
sequences[n_samples_drawn, :, :] = seq
targets[n_samples_drawn, :] = seq_targets
n_samples_drawn += 1
return (sequences, targets)