Source code for mavis.pairing.pairing

from .constants import DEFAULTS, PAIRING_DISTANCES

from ..annotate.variant import determine_prime
from ..breakpoint import Breakpoint
from ..constants import CALL_METHOD, COLUMNS, ORIENT, PRIME, PROTOCOL, STRAND
from ..error import NotSpecifiedError
from ..interval import Interval


[docs]def predict_transcriptome_breakpoint(breakpoint, transcript): """ for a given genomic breakpoint and the target transcript. Predicts the possible transcriptomic breakpoints that would be expected based on the splicing model for abrogated splice sites Args: breakpoint (Breakpoint): the genomic breakpoint transcript (PreTranscript): the transcript see :ref:`theory - pairing similar events <theory-pairing-similar-events>` """ prime = determine_prime(transcript, breakpoint) exons = transcript.exons[:] if not Interval.overlaps(breakpoint, transcript): raise AssertionError('breakpoint does not overlap the transcript', breakpoint, transcript) if transcript.get_strand() == STRAND.NEG: exons.reverse() tbreaks = [] for i, curr in enumerate(exons): temp = curr.acceptor_splice_site | curr.donor_splice_site if Interval.overlaps(breakpoint, temp): # overlaps a splice site or exon if len(breakpoint) > 1: raise NotSpecifiedError( 'breakpoint overlaps an exon or splice site and is not specific (i.e. has ' 'an interval greater than 1)') elif prime == PRIME.FIVE: if i > 0: prev = exons[i - 1] tbreaks.append( Breakpoint( breakpoint.chr, prev.donor, strand=breakpoint.strand, orient=breakpoint.orient )) else: # prime == PRIME.THREE if i + 1 < len(exons): nexxt = exons[i + 1] tbreaks.append( Breakpoint( breakpoint.chr, nexxt.acceptor, strand=breakpoint.strand, orient=breakpoint.orient )) tbreaks.append( Breakpoint( breakpoint.chr, breakpoint.start, breakpoint.end, strand=breakpoint.strand, orient=breakpoint.orient )) elif i > 0: # look at the previous intron prev = exons[i - 1] try: intron_start, intron_end = sorted([prev.donor_splice_site.end, curr.acceptor_splice_site.start]) intron = Interval(intron_start + 1, intron_end - 1) if Interval.overlaps(breakpoint, intron): if prime == PRIME.FIVE: tbreaks.append( Breakpoint( breakpoint.chr, prev.donor, strand=breakpoint.strand, orient=breakpoint.orient )) else: tbreaks.append( Breakpoint( breakpoint.chr, curr.acceptor, strand=breakpoint.strand, orient=breakpoint.orient )) except AttributeError: # for introns that are smaller than this ignore (covered by exon check) pass if not tbreaks: raise AssertionError('could not predict breakpoint') return sorted(tbreaks)
def _equivalent_events(event1, event2): # basic checks if any([ event1.break1.chr != event2.break1.chr, event1.break2.chr != event2.break2.chr, len(set([STRAND.NS, event1.break1.strand, event2.break1.strand])) > 2, len(set([STRAND.NS, event1.break2.strand, event2.break2.strand])) > 2, len(set([ORIENT.NS, event1.break1.orient, event2.break1.orient])) > 2, len(set([ORIENT.NS, event1.break2.orient, event2.break2.orient])) > 2, event1.opposing_strands != event2.opposing_strands ]): return False return True
[docs]def comparison_distance(event1, event2, input_distances=None): distances = {} distances.update(PAIRING_DISTANCES.items()) if input_distances is not None: distances.update(input_distances) max_distance = max( distances[event1.data.get(COLUMNS.call_method, CALL_METHOD.CONTIG)], distances[event2.data.get(COLUMNS.call_method, CALL_METHOD.CONTIG)]) return max_distance
[docs]def equivalent(event1, event2, distances=None): """ compares two events by breakpoint position to see if they are equivalent """ max_distance = comparison_distance(event1, event2, distances) if not _equivalent_events(event1, event2): return False # location comparison if any([ not _equivalent_events(event1, event2), abs(Interval.dist(event1.break1, event2.break1)) > max_distance, abs(Interval.dist(event1.break2, event2.break2)) > max_distance, event1.data[COLUMNS.event_type] != event2.data[COLUMNS.event_type] ]): return False return True
[docs]def inferred_equivalent(event1, event2, reference_transcripts, distances=None, product_sequences=None): """ comparison of events using product prediction and breakpoint prediction """ product_sequences = dict() if product_sequences is None else product_sequences # basic checks if not _equivalent_events(event1, event2): return False if event1.data[COLUMNS.protocol] != PROTOCOL.GENOME: event1, event2 = event2, event1 max_distance = comparison_distance(event1, event2, distances) if event1.data[COLUMNS.fusion_sequence_fasta_id] and event2.data[COLUMNS.fusion_sequence_fasta_id]: fusion1 = product_sequences[event1.data[COLUMNS.fusion_sequence_fasta_id]] fusion2 = product_sequences[event2.data[COLUMNS.fusion_sequence_fasta_id]] if fusion1 != fusion2: return False for col in [COLUMNS.fusion_cdna_coding_start, COLUMNS.fusion_cdna_coding_end]: if event1.data[col] != event2.data[col]: return False return True break1_match = False break2_match = False if event1.data[COLUMNS.protocol] != event2.data[COLUMNS.protocol]: # mixed # predict genome breakpoints to compare by location transcript1 = reference_transcripts.get(event1.data[COLUMNS.transcript1], None) if transcript1: try: pbreaks = predict_transcriptome_breakpoint(event1.break1, transcript1) for breakpoint in pbreaks: if abs(Interval.dist(breakpoint, event2.break1)) <= max_distance: break1_match = True break except NotSpecifiedError: pass transcript2 = reference_transcripts.get(event1.data[COLUMNS.transcript2], None) if transcript2: try: pbreaks = predict_transcriptome_breakpoint(event1.break2, transcript2) for breakpoint in pbreaks: if abs(Interval.dist(breakpoint, event2.break2)) <= max_distance: break2_match = True break except NotSpecifiedError: pass elif event1.data[COLUMNS.event_type] != event2.data[COLUMNS.event_type]: return False if abs(Interval.dist(event1.break1, event2.break1)) <= max_distance: break1_match = True if abs(Interval.dist(event1.break2, event2.break2)) <= max_distance: break2_match = True return break1_match and break2_match