X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/6fbb05e454523001fb0a3f0b8e045dc3b8eeb155..69b71af4136fdaeeb5c2afbc559208dc5f428c48:/annotate.go diff --git a/annotate.go b/annotate.go index 5afc19505f..ff4ab88717 100644 --- a/annotate.go +++ b/annotate.go @@ -2,6 +2,7 @@ package main import ( "bufio" + "context" "errors" "flag" "fmt" @@ -24,6 +25,7 @@ import ( type annotatecmd struct { variantHash bool maxTileSize int + tag2tagid map[string]tagID } func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { @@ -106,7 +108,15 @@ func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, } bufw := bufio.NewWriter(output) - err = cmd.exportTileDiffs(bufw, input) + tilelib := &tileLibrary{ + retainNoCalls: true, + retainTileSequences: true, + } + err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"), nil) + if err != nil { + return 1 + } + err = cmd.exportTileDiffs(bufw, tilelib) if err != nil { return 1 } @@ -125,45 +135,24 @@ func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, return 0 } -func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error { - var refs []CompactSequence - var tiles [][]TileVariant - var tagset [][]byte - var taglen int - err := DecodeLibrary(librdr, func(ent *LibraryEntry) error { - if len(ent.TagSet) > 0 { - if tagset != nil { - return errors.New("error: not implemented: input has multiple tagsets") - } - tagset = ent.TagSet - taglen = len(tagset[0]) - tiles = make([][]TileVariant, len(tagset)) - } - for _, tv := range ent.TileVariants { - if tv.Tag >= tagID(len(tiles)) { - return fmt.Errorf("error: reading tile variant for tag %d but only %d tags were loaded", tv.Tag, len(tiles)) - } - for len(tiles[tv.Tag]) <= int(tv.Variant) { - tiles[tv.Tag] = append(tiles[tv.Tag], TileVariant{}) - } - tiles[tv.Tag][tv.Variant] = tv - } - for _, cs := range ent.CompactSequences { - refs = append(refs, cs) - } - return nil - }) - if err != nil { - return err +func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, tilelib *tileLibrary) error { + tagset := tilelib.taglib.Tags() + if len(tagset) == 0 { + return errors.New("cannot annotate library without tags") + } + taglen := len(tagset[0]) + var refs []string + for name := range tilelib.refseqs { + refs = append(refs, name) } - tag2tagid := make(map[string]tagID, len(tagset)) + cmd.tag2tagid = make(map[string]tagID, len(tagset)) for tagid, tagseq := range tagset { - tag2tagid[string(tagseq)] = tagID(tagid) + cmd.tag2tagid[string(tagseq)] = tagID(tagid) } - sort.Slice(refs, func(i, j int) bool { return refs[i].Name < refs[j].Name }) + sort.Strings(refs) log.Infof("len(refs) %d", len(refs)) - outch := make(chan string, 1) + outch := make(chan string, runtime.NumCPU()*2) var outwg sync.WaitGroup defer outwg.Wait() outwg.Add(1) @@ -175,109 +164,124 @@ func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error }() defer close(outch) - limiter := make(chan bool, runtime.NumCPU()+1) - var diffwg sync.WaitGroup - defer diffwg.Wait() + nseqs := 0 + for _, refcs := range tilelib.refseqs { + nseqs += len(refcs) + } + + throttle := &throttle{Max: runtime.NumCPU()*2 + nseqs*2 + 1} + defer throttle.Wait() - for _, refcs := range refs { - refcs := refcs + for _, refname := range refs { + refname := refname + refcs := tilelib.refseqs[refname] var seqnames []string - for seqname := range refcs.TileSequences { + for seqname := range refcs { seqnames = append(seqnames, seqname) } sort.Strings(seqnames) for _, seqname := range seqnames { seqname := seqname - var refseq []byte - // tilestart[123] is the index into refseq - // where the tile for tag 123 was placed. - tilestart := map[tagID]int{} - tileend := map[tagID]int{} - for _, libref := range refcs.TileSequences[seqname] { - if libref.Variant < 1 { - return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refcs.Name, seqname, libref.Tag) - } - seq := tiles[libref.Tag][libref.Variant].Sequence - if len(seq) < taglen { - return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refcs.Name, seqname, libref.Tag, libref.Variant, len(seq), taglen) - } - overlap := taglen - if len(refseq) == 0 { - overlap = 0 - } - tilestart[libref.Tag] = len(refseq) - overlap - refseq = append(refseq, seq[overlap:]...) - tileend[libref.Tag] = len(refseq) + throttle.Acquire() + go func() { + defer throttle.Release() + throttle.Report(cmd.annotateSequence(throttle, outch, tilelib, taglen, refname, seqname, refcs[seqname], len(refs) > 1)) + }() + } + } + throttle.Wait() + return throttle.Err() +} + +func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string, tilelib *tileLibrary, taglen int, refname, seqname string, reftiles []tileLibRef, refnamecol bool) error { + var refseq []byte + // tilestart[123] is the index into refseq + // where the tile for tag 123 was placed. + tilestart := map[tagID]int{} + tileend := map[tagID]int{} + for _, libref := range reftiles { + if libref.Variant < 1 { + return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, seqname, libref.Tag) + } + seq := tilelib.TileVariantSequence(libref) + if len(seq) < taglen { + return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, seqname, libref.Tag, libref.Variant, len(seq), taglen) + } + overlap := taglen + if len(refseq) == 0 { + overlap = 0 + } + tilestart[libref.Tag] = len(refseq) - overlap + refseq = append(refseq, seq[overlap:]...) + tileend[libref.Tag] = len(refseq) + } + log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart)) + for tag, tvs := range tilelib.variant { + tag := tagID(tag) + refstart, ok := tilestart[tag] + if !ok { + // Tag didn't place on this + // reference sequence. (It + // might place on the same + // chromosome in a genome + // anyway, but we don't output + // the annotations that would + // result.) + continue + } + for variant := 1; variant <= len(tvs); variant++ { + variant, hash := tileVariantID(variant), tvs[variant-1] + tileseq := tilelib.TileVariantSequence(tileLibRef{Tag: tag, Variant: variant}) + if len(tileseq) == 0 { + continue + } else if len(tileseq) < taglen { + return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tileseq), taglen) } - log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart)) - for tag, tvs := range tiles { - tag := tagID(tag) - refstart, ok := tilestart[tag] - if !ok { - // Tag didn't place on this - // reference sequence. (It - // might place on the same - // chromosome in a genome - // anyway, but we don't output - // the annotations that would - // result.) - continue - } - for variant, tv := range tvs { - variant, tv := variant, tv - if variant == 0 { - continue - } - if len(tv.Sequence) < taglen { - return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tv.Sequence), taglen) - } - var refpart []byte - endtag := string(tv.Sequence[len(tv.Sequence)-taglen:]) - if endtagid, ok := tag2tagid[endtag]; !ok { - // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome. - refpart = refseq[refstart:] - log.Warnf("%x tilevar %d,%d endtag not in ref: %s", tv.Blake2b[:13], tag, variant, endtag) - } else if refendtagstart, ok := tilestart[endtagid]; !ok { - // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't. - // Give up. (TODO: something smarter) - log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", tv.Blake2b[:13], tag, variant, endtagid) - continue + var refpart []byte + endtag := string(tileseq[len(tileseq)-taglen:]) + if endtagid, ok := cmd.tag2tagid[endtag]; !ok { + // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome. + refpart = refseq[refstart:] + log.Warnf("%x tilevar %d,%d endtag not in ref: %s", hash[:13], tag, variant, endtag) + } else if refendtagstart, ok := tilestart[endtagid]; !ok { + // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't. + // Give up. (TODO: something smarter) + log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", hash[:13], tag, variant, endtagid) + continue + } else { + // Non-terminal tile vs. non-terminal reference. + refpart = refseq[refstart : refendtagstart+taglen] + log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", hash[:13], tag, variant, endtag, endtagid, refendtagstart) + } + if len(refpart) > cmd.maxTileSize { + log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", hash[:13], tag, variant, refname, seqname, refstart, len(refpart)) + continue + } + if len(tileseq) > cmd.maxTileSize { + log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", hash[:13], tag, variant, refname, seqname, len(tileseq)) + continue + } + // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tileseq) + + throttle.Acquire() + go func() { + defer throttle.Release() + diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tileseq)), 0) + for _, diff := range diffs { + diff.Position += refstart + var varid string + if cmd.variantHash { + varid = fmt.Sprintf("%x", hash)[:13] } else { - // Non-terminal tile vs. non-terminal reference. - refpart = refseq[refstart : refendtagstart+taglen] - log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", tv.Blake2b[:13], tag, variant, endtag, endtagid, refendtagstart) - } - if len(refpart) > cmd.maxTileSize { - log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s ref len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(refpart)) - continue + varid = fmt.Sprintf("%d", variant) } - if len(tv.Sequence) > cmd.maxTileSize { - log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(tv.Sequence)) - continue + refnamefield := "" + if refnamecol { + refnamefield = "\t" + refname } - // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tv.Sequence) - - diffwg.Add(1) - limiter <- true - go func() { - defer func() { - <-limiter - diffwg.Done() - }() - diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tv.Sequence)), 0) - for _, diff := range diffs { - diff.Position += refstart - var varid string - if cmd.variantHash { - varid = fmt.Sprintf("%x", tv.Blake2b)[:13] - } else { - varid = strconv.Itoa(variant) - } - outch <- fmt.Sprintf("%d\t%s\t%s\t%s:g.%s\n", tag, varid, refcs.Name, seqname, diff.String()) - } - }() + outch <- fmt.Sprintf("%d\t%s%s\t%s:g.%s\n", tag, varid, refnamefield, seqname, diff.String()) } - } + }() } } return nil