An simple interval class for DNA sequences from FASTA files that provides
fast access to sequences and methods for interval logic on those sequences.

Usually you will create a `Genome` and then use that object to create
intervals.  The intervals have a sequence property that will look up the
actual sequence::

    >>> from fastinterval import Genome, Interval
    >>> test_genome = Genome('test/example.fa')
    >>> int1 = test_genome.interval(100, 150, chrom='1')
    >>> print int1
    >>> print int1.sequence

fastinterval uses pyfasta to retrieve the sequence, so the access is mmapped
(i.e fast).  It supports strandedness, which will be respected when accessing
the sequence::

    >>> int2 = test_genome.interval(100, 150, chrom='1', strand=-1)
    >>> print int2.sequence

The Interval class supports many interval operations::

    >>> int1 = test_genome.interval(100, 150, chrom='1')
    >>> int2 = test_genome.interval(125, 175, chrom='1')
    >>> int1.distance(int2)
    >>> int1.span(int2)
    Interval(100, 175)
    >>> int1.overlaps(int2)
    >>> int1.is_contiguous(int2)
    >>> int1 in int2
    >>> int1.intersection(int2)
    Interval(125, 150)
    >>> int1.union(int2)
    Interval(100, 175)
    >>> Interval.merge([int1, int2, test_genome.interval(200,250, chrom='1')])
    [Interval(100, 175), Interval(200, 250)]

The Interval class is also based on bx python intervals.  So you can pass in
a value attritbue to point to an external object, and create interval trees and
so on.

    >>> from bx.intervals.intersection import IntervalTree
    >>> int3 = test_genome.interval(150, 200, chrom='1', value='foo')
    >>> tree = IntervalTree()
    >>> _ = map(tree.insert_interval, (int1, int2, int3))
    >>> tree.find(190, 195)
    [Interval(150, 200, value=foo)]


VERSION = '0.1.1'

from pyfasta import Fasta
from bx.intervals import Interval as BaseInterval

def _convert_strand(strand):
    """ convert UCSC +/- to +1/-1"""
    if strand == '-': return -1
    if strand == '+': return 1
    return strand

[docs]class Interval(BaseInterval): """ A genomic interval """ def __init__(self, start, stop, genome=None, **kws): self.genome = genome if 'strand' in kws: kws['strand'] = _convert_strand(kws['strand']) BaseInterval.__init__(self, start, stop, **kws) @property
[docs] def sequence(self): """ Return the DNA from this intecal as a string """ if not self.genome: raise Exception('Cannot retrieve sequence without a genome') return self.genome.sequence(dict( start = self.start, stop = self.end, chr = self.chrom, strand = self.strand ), one_based=False).upper()
def __str__(self): """ Return a chr:start-stop:strand representation of the interval """ return "%s:%s-%s:%s" % (self.chrom, self.start, self.end, self.strand if self.strand else '') def __len__(self): """ Return interval length """ return self.end - self.start @classmethod
[docs] def from_string(cls, loc, **kws): """ Create an interval from a chrx:start-end style string """ # chr1:10858-10967:1 toks = loc.split(':') chrom = toks[0] start, end = map(int, toks[1].split('-')) if len(toks) == 3: try: strand = int(toks[2]) except ValueError: strand = toks[2] if strand == '+': strand = 1 elif strand == '-': strand = -1 elif strand == '': strand = None else: raise Exception('Unknown strand %s' % strand) else: strand = None return cls(start, end, chrom=chrom, strand=strand, **kws)
[docs] def distance(self, other): """ return the distance between two intervals """ if self.chrom != other.chrom: return float('Inf') if self.start > other.end: return self.start - other.end if self.end < other.start: return other.start - self.end return 0
[docs] def span(self, other, **kws): """ Return an interval spanning two intervals """ if self.chrom != other.chrom: raise Exception('cannot get span over two chromosomes') return self.copy( start = min([self.start, other.start]), end = max([self.end, other.end]), **kws )
[docs] def span_between(self, other, **kws): """Return an Inteval spanning the gap between two intervals """ if self.chrom != other.chrom: raise Exception('cannot get span over two chromosomes') if self.overlaps(other): return None return self.copy( start = min([self.end, other.end]), end = max([self.start, other.start]), **kws )
[docs] def overlaps(self, other): """ Return True if the intervals share at least one base """ return self.distance(other) == 0 and not (self.start==other.end or self.end==other.start)
[docs] def is_contiguous(self, other): """ Return True if the intervals are overlapping or contiguous """ return self.distance(other) == 0
def __contains__(self, other): """ Return true if one interval contains the other """ return self.start <= other.start and self.end >= other.end
[docs] def intersection(self, other): """ Return the interval containing the intersection of two intervals """ if not self.overlaps(other): return None return Interval( max(self.start, other.start), min(self.end, other.end), chrom = self.chrom )
[docs] def union(self, other, merge_contiguous=False): """ Return an interval containing the interval of two overlapping interval""" test = Interval.is_contiguous if merge_contiguous else Interval.overlaps if not test(self, other): raise Exception('cannot get union of non overlapping intervals') return self.span(other)
[docs] def copy(self, **kws): """ Copy this interval, and optionally provide a dict of new attrs """ if 'start' in kws: start = kws.pop('start') else: start = self.start if 'end' in kws: end = kws.pop('end') else : end = self.end template = dict(chrom=self.chrom, strand=self.strand, value=self.value, genome=self.genome) template.update(kws) return Interval(start, end, **template)
def __sub__(self, other): """ Subtract an interval and return a list of intervals i.e. (10,50) - (20,30) returns [(10,20), (30,50)] """ if not self.overlaps(other): return [self] if self in other: return [] elif other in self and not (self.start == other.start or self.end == other.end): left = self.copy() left.end = other.start right = self.copy() right.start = other.end return [left, right] elif self.start >= other.start: right = self.copy() right.start = other.end return [right] elif self.end <= other.end: left = self.copy() left.end = other.start return [left] raise Exception('unhandled subtraction') @classmethod
[docs] def merge(cls, intervals, merge_contiguous=False, **kwargs): """ merge a list of intervals and return a list of intervals By default, the intervals must be overlapping to be merged. If you want to merge contiguous intervals, set merge_contiguous to True. """ is_overlapping = cls.is_contiguous if merge_contiguous else cls.overlaps intervals = sorted(intervals, key=lambda x: (x.chrom, x.end)) if len(intervals) > 1: done, todo = [intervals[0].copy(**kwargs)], intervals[1:] while todo: item = todo.pop(0).copy(**kwargs) while done and is_overlapping(item, done[-1]): item = done.pop().union(item, merge_contiguous=merge_contiguous) done.append(item) else: done = intervals return done
@classmethod def coverage(cls, intervals): if not intervals: return [] chrom = intervals[0].chrom starts = [x.start for x in intervals] ends = [x.end for x in intervals] breaks = sorted(set(starts + ends)) return [ cls(start, end, chrom=chrom, value=1 + len([x for x in starts if x<= start]) - len([x for x in ends if x <= end]) ) for start, end in zip(breaks, breaks[1:]) ]
[docs] def add_border(self, size=0, upstream=0, downstream=0): """ return interval with some bases added to each end """ if not (size or upstream or downstream): return self.copy() if size and (upstream or downstream): raise Exception('please either size or upstream/downstream') if size: return self.copy(start=self.start-size, end = self.end+size) if not self.strand: raise Exception('Cannot add upstrea/downstream to strandless interval') if self.strand == 1: return self.copy(start=self.start - upstream, end = self.end + downstream) else: return self.copy(start=self.start - downstream, end = self.end + upstream)
[docs] def truncate(self, size): """ truncate this interval to size, respecting the orientation """ if self.strand is None: raise Exception('cannot truncate unstranded interval') elif self.strand > 0: return self.copy(end=min(self.start+size, self.end)) elif self.strand < 0: return self.copy(start=max(self.end-size, self.start))
[docs]class Genome(object): """ A convienience for creating intervals on the same genome """ def __init__(self, fname, *args, **kws): """ Create a genome using a Fasta file. Other args passed to pyfasta """ self.fasta = Fasta(fname, *args, **kws)
[docs] def interval(self, start, end, **kws): """ return an interval on this genome """ return Interval(start, end, genome=self.fasta, **kws)
[docs] def from_string(self, data): """docstring for from_string""" return Interval.from_string(data, genome=self.fasta)
[docs]class MinimalSpanningSet(object): """ Create a minimal spanning set for target intervals from a set of candidates """ def score_candidate(self, candidate): return sum([ len(candidate.intersection(x)) for x in self.remaining_targets if candidate.overlaps(x) ]) def coverage(self, choices): """ work out the covered based for a set of choices""" covered = Interval.merge( [ choice.intersection(x) for x in self.targets for choice in choices if choice.overlaps(x) ] ) return sum(map(len, covered)) def __init__(self, targets, candidates, score_function=None, sort_key=None): self.targets = targets self.remaining_targets = list(targets) self.candidates = candidates self.chosen = [] self.sort_key = sort_key if score_function is None: self.score_function = MinimalSpanningSet.score_candidate self._find_set() def _find_set(self): """ main loop """ while True: scores = {} # work out the score by summing the length of overlaps with the target for candidate in self.candidates: scores[candidate] = self.score_candidate(candidate) # choose the best candidates rankings = sorted(self.candidates, key=scores.get, reverse=True) if rankings == []: break best = rankings[0] # break if no improvement is possible if scores[best] == 0: break if self.sort_key: equiv = [x for x in rankings if scores.get(x)==scores.get(best)] equiv = sorted(equiv, key=self.sort_key) best = equiv[-1] # update the data self.chosen.append(best) self.candidates.remove(best) self._update_targets(best) # break if no targets or candidates left if len(self.targets) == 0 or len(self.candidates) == 0: break self._remove_redundant() def _update_targets(self, choice): """ remove chosen interval from targets """ new_targets = [] for target in self.remaining_targets: new_targets.extend(target - choice) self.remaining_targets = new_targets def _remove_redundant(self): """ drop any candidates that are completely covered by the rest""" total_coverage = self.coverage(self.chosen) for c in list(self.chosen): # if the coverage without a choice is the same, remove it test = list(self.chosen) test.remove(c) if self.coverage(test) == total_coverage: self.chosen.remove(c)

