X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/dcfed469c5d20237cbee87c0d955c3fafa77bd6f..de9aa35ea67f3aab2c97b126adfdffabff754521:/annotate.go diff --git a/annotate.go b/annotate.go index 7a6ee3b538..25651889aa 100644 --- a/annotate.go +++ b/annotate.go @@ -1,7 +1,12 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( "bufio" + "context" "errors" "flag" "fmt" @@ -22,8 +27,11 @@ import ( ) type annotatecmd struct { - variantHash bool - maxTileSize int + dropTiles []bool + variantHash bool + maxTileSize int + tag2tagid map[string]tagID + reportAnnotation func(tag tagID, outcol int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant) } func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { @@ -73,13 +81,13 @@ func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, if err != nil { return 1 } - runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.tsv"} + runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.csv"} var output string output, err = runner.Run() if err != nil { return 1 } - fmt.Fprintln(stdout, output+"/tilevariants.tsv") + fmt.Fprintln(stdout, output+"/tilevariants.csv") return 0 } @@ -104,9 +112,17 @@ func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, } defer output.Close() } - bufw := bufio.NewWriter(output) + bufw := bufio.NewWriterSize(output, 4*1024*1024) - err = cmd.exportTileDiffs(bufw, input) + tilelib := &tileLibrary{ + retainNoCalls: true, + retainTileSequences: true, + } + err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz")) + if err != nil { + return 1 + } + err = cmd.exportTileDiffs(bufw, tilelib) if err != nil { return 1 } @@ -125,45 +141,24 @@ func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, return 0 } -func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error { - var refs []CompactSequence - var tiles [][]TileVariant - var tagset [][]byte - var taglen int - err := DecodeLibrary(librdr, func(ent *LibraryEntry) error { - if len(ent.TagSet) > 0 { - if tagset != nil { - return errors.New("error: not implemented: input has multiple tagsets") - } - tagset = ent.TagSet - taglen = len(tagset[0]) - tiles = make([][]TileVariant, len(tagset)) - } - for _, tv := range ent.TileVariants { - if tv.Tag >= tagID(len(tiles)) { - return fmt.Errorf("error: reading tile variant for tag %d but only %d tags were loaded", tv.Tag, len(tiles)) - } - for len(tiles[tv.Tag]) <= int(tv.Variant) { - tiles[tv.Tag] = append(tiles[tv.Tag], TileVariant{}) - } - tiles[tv.Tag][tv.Variant] = tv - } - for _, cs := range ent.CompactSequences { - refs = append(refs, cs) - } - return nil - }) - if err != nil { - return err +func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, tilelib *tileLibrary) error { + tagset := tilelib.taglib.Tags() + if len(tagset) == 0 { + return errors.New("cannot annotate library without tags") + } + taglen := len(tagset[0]) + var refs []string + for name := range tilelib.refseqs { + refs = append(refs, name) } - tag2tagid := make(map[string]tagID, len(tagset)) + cmd.tag2tagid = make(map[string]tagID, len(tagset)) for tagid, tagseq := range tagset { - tag2tagid[string(tagseq)] = tagID(tagid) + cmd.tag2tagid[string(tagseq)] = tagID(tagid) } - sort.Slice(refs, func(i, j int) bool { return refs[i].Name < refs[j].Name }) + sort.Strings(refs) log.Infof("len(refs) %d", len(refs)) - outch := make(chan string, 1) + outch := make(chan string, runtime.NumCPU()*2) var outwg sync.WaitGroup defer outwg.Wait() outwg.Add(1) @@ -175,109 +170,142 @@ func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, librdr io.Reader) error }() defer close(outch) - limiter := make(chan bool, runtime.NumCPU()+1) - var diffwg sync.WaitGroup - defer diffwg.Wait() + nseqs := 0 + for _, refcs := range tilelib.refseqs { + nseqs += len(refcs) + } + + throttle := &throttle{Max: runtime.NumCPU()*2 + nseqs*2 + 1} + defer throttle.Wait() - for _, refcs := range refs { - refcs := refcs + for _, refname := range refs { + refname := refname + refcs := tilelib.refseqs[refname] var seqnames []string - for seqname := range refcs.TileSequences { + for seqname := range refcs { seqnames = append(seqnames, seqname) } sort.Strings(seqnames) for _, seqname := range seqnames { seqname := seqname - var refseq []byte - // tilestart[123] is the index into refseq - // where the tile for tag 123 was placed. - tilestart := map[tagID]int{} - tileend := map[tagID]int{} - for _, libref := range refcs.TileSequences[seqname] { - if libref.Variant < 1 { - return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refcs.Name, seqname, libref.Tag) - } - seq := tiles[libref.Tag][libref.Variant].Sequence - if len(seq) < taglen { - return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refcs.Name, seqname, libref.Tag, libref.Variant, len(seq), taglen) - } - overlap := taglen - if len(refseq) == 0 { - overlap = 0 - } - tilestart[libref.Tag] = len(refseq) - overlap - refseq = append(refseq, seq[overlap:]...) - tileend[libref.Tag] = len(refseq) + throttle.Acquire() + if throttle.Err() != nil { + break } - log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart)) - for tag, tvs := range tiles { - tag := tagID(tag) - refstart, ok := tilestart[tag] - if !ok { - // Tag didn't place on this - // reference sequence. (It - // might place on the same - // chromosome in a genome - // anyway, but we don't output - // the annotations that would - // result.) - continue - } - for variant, tv := range tvs { - variant, tv := variant, tv - if variant == 0 { - continue - } - if len(tv.Sequence) < taglen { - return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tv.Sequence), taglen) - } - var refpart []byte - endtag := string(tv.Sequence[len(tv.Sequence)-taglen:]) - if endtagid, ok := tag2tagid[endtag]; !ok { - // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome. - refpart = refseq[refstart:] - log.Warnf("%x tilevar %d,%d endtag not in ref: %s", tv.Blake2b[:13], tag, variant, endtag) - } else if refendtagstart, ok := tilestart[endtagid]; !ok { - // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't. - // Give up. (TODO: something smarter) - log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", tv.Blake2b[:13], tag, variant, endtagid) - continue + go func() { + defer throttle.Release() + throttle.Report(cmd.annotateSequence(throttle, outch, tilelib, taglen, refname, seqname, refcs[seqname], len(refs) > 1)) + }() + } + } + throttle.Wait() + return throttle.Err() +} + +func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string, tilelib *tileLibrary, taglen int, refname, seqname string, reftiles []tileLibRef, refnamecol bool) error { + refnamefield := "" + if refnamecol { + refnamefield = "," + trimFilenameForLabel(refname) + } + var refseq []byte + // tilestart[123] is the index into refseq + // where the tile for tag 123 was placed. + tilestart := map[tagID]int{} + tileend := map[tagID]int{} + for _, libref := range reftiles { + if libref.Variant < 1 { + return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, seqname, libref.Tag) + } + seq := tilelib.TileVariantSequence(libref) + if len(seq) < taglen { + return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, seqname, libref.Tag, libref.Variant, len(seq), taglen) + } + overlap := taglen + if len(refseq) == 0 { + overlap = 0 + } + tilestart[libref.Tag] = len(refseq) - overlap + refseq = append(refseq, seq[overlap:]...) + tileend[libref.Tag] = len(refseq) + } + log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart)) + // outtag is tag's index in the subset of tags that aren't + // dropped. If there are 10M tags and half are dropped by + // dropTiles, tag ranges from 0 to 10M-1 and outtag ranges + // from 0 to 5M-1. + // + // IOW, in the matrix built by cgs2array(), {tag} is + // represented by columns {outtag}*2 and {outtag}*2+1. + outcol := -1 + for tag, tvs := range tilelib.variant { + if len(cmd.dropTiles) > tag && cmd.dropTiles[tag] { + continue + } + tag := tagID(tag) + outcol++ + // Must shadow outcol var to use safely in goroutine below. + outcol := outcol + refstart, ok := tilestart[tag] + if !ok { + // Tag didn't place on this reference + // sequence. (It might place on the same + // chromosome in a genome anyway, but we don't + // output the annotations that would result.) + // outch <- fmt.Sprintf("%d,%d,-1%s\n", tag, outcol, refnamefield) + continue + } + for variant := 1; variant <= len(tvs); variant++ { + variant, hash := tileVariantID(variant), tvs[variant-1] + tileseq := tilelib.TileVariantSequence(tileLibRef{Tag: tag, Variant: variant}) + if len(tileseq) == 0 { + continue + } else if len(tileseq) < taglen { + return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tileseq), taglen) + } + var refpart []byte + endtag := string(tileseq[len(tileseq)-taglen:]) + if endtagid, ok := cmd.tag2tagid[endtag]; !ok { + // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome. + refpart = refseq[refstart:] + log.Warnf("%x tilevar %d,%d endtag not in ref: %s", hash[:13], tag, variant, endtag) + } else if refendtagstart, ok := tilestart[endtagid]; !ok { + // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't. + // Give up. (TODO: something smarter) + log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", hash[:13], tag, variant, endtagid) + continue + } else { + // Non-terminal tile vs. non-terminal reference. + refpart = refseq[refstart : refendtagstart+taglen] + log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", hash[:13], tag, variant, endtag, endtagid, refendtagstart) + } + if len(refpart) > cmd.maxTileSize { + log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", hash[:13], tag, variant, refname, seqname, refstart, len(refpart)) + continue + } + if len(tileseq) > cmd.maxTileSize { + log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", hash[:13], tag, variant, refname, seqname, len(tileseq)) + continue + } + // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tileseq) + + throttle.Acquire() + go func() { + defer throttle.Release() + diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tileseq)), 0) + for _, diff := range diffs { + diff.Position += refstart + var varid string + if cmd.variantHash { + varid = fmt.Sprintf("%x", hash)[:13] } else { - // Non-terminal tile vs. non-terminal reference. - refpart = refseq[refstart : refendtagstart+taglen] - log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", tv.Blake2b[:13], tag, variant, endtag, endtagid, refendtagstart) - } - if len(refpart) > cmd.maxTileSize { - log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, refstart, len(refpart)) - continue + varid = fmt.Sprintf("%d", variant) } - if len(tv.Sequence) > cmd.maxTileSize { - log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", tv.Blake2b[:13], tag, variant, refcs.Name, seqname, len(tv.Sequence)) - continue + outch <- fmt.Sprintf("%d,%d,%s%s,%s:g.%s\n", tag, outcol, varid, refnamefield, seqname, diff.String()) + if cmd.reportAnnotation != nil { + cmd.reportAnnotation(tag, outcol, variant, refname, seqname, diff) } - // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tv.Sequence) - - diffwg.Add(1) - limiter <- true - go func() { - defer func() { - <-limiter - diffwg.Done() - }() - diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tv.Sequence)), 0) - for _, diff := range diffs { - diff.Position += refstart - var varid string - if cmd.variantHash { - varid = fmt.Sprintf("%x", tv.Blake2b)[:13] - } else { - varid = strconv.Itoa(variant) - } - outch <- fmt.Sprintf("%d\t%s\t%s\t%s:g.%s\n", tag, varid, refcs.Name, seqname, diff.String()) - } - }() } - } + }() } } return nil