1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
24 "git.arvados.org/arvados.git/sdk/go/arvados"
25 "github.com/arvados/lightning/hgvs"
26 log "github.com/sirupsen/logrus"
29 type annotatecmd struct {
33 tag2tagid map[string]tagID
34 reportAnnotation func(tag tagID, outcol int, variant tileVariantID, refname string, seqname string, pdi hgvs.Variant)
37 func (cmd *annotatecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
41 fmt.Fprintf(stderr, "%s\n", err)
44 flags := flag.NewFlagSet("", flag.ContinueOnError)
45 flags.SetOutput(stderr)
46 pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
47 runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
48 projectUUID := flags.String("project", "", "project `UUID` for output data")
49 priority := flags.Int("priority", 500, "container request priority")
50 inputFilename := flags.String("i", "-", "input `file` (library)")
51 outputFilename := flags.String("o", "-", "output `file`")
52 flags.BoolVar(&cmd.variantHash, "variant-hash", false, "output variant hash instead of index")
53 flags.IntVar(&cmd.maxTileSize, "max-tile-size", 50000, "don't try to make annotations for tiles bigger than given `size`")
54 err = flags.Parse(args)
55 if err == flag.ErrHelp {
58 } else if err != nil {
60 } else if flags.NArg() > 0 {
61 err = fmt.Errorf("errant command line arguments after parsed flags: %v", flags.Args())
67 log.Println(http.ListenAndServe(*pprof, nil))
71 if *outputFilename != "-" {
72 err = errors.New("cannot specify output file in container mode: not implemented")
75 runner := arvadosContainerRunner{
76 Name: "lightning annotate",
77 Client: arvados.NewClientFromEnv(),
78 ProjectUUID: *projectUUID,
83 err = runner.TranslatePaths(inputFilename)
87 runner.Args = []string{"annotate", "-local=true", fmt.Sprintf("-variant-hash=%v", cmd.variantHash), "-max-tile-size", strconv.Itoa(cmd.maxTileSize), "-i", *inputFilename, "-o", "/mnt/output/tilevariants.csv"}
89 output, err = runner.Run()
93 fmt.Fprintln(stdout, output+"/tilevariants.csv")
97 var input io.ReadCloser
98 if *inputFilename == "-" {
99 input = ioutil.NopCloser(stdin)
101 input, err = os.Open(*inputFilename)
108 var output io.WriteCloser
109 if *outputFilename == "-" {
110 output = nopCloser{stdout}
112 output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0666)
118 bufw := bufio.NewWriterSize(output, 4*1024*1024)
120 tilelib := &tileLibrary{
122 retainTileSequences: true,
124 err = tilelib.LoadGob(context.Background(), input, strings.HasSuffix(*inputFilename, ".gz"))
128 err = cmd.exportTileDiffs(bufw, tilelib)
147 func (cmd *annotatecmd) exportTileDiffs(outw io.Writer, tilelib *tileLibrary) error {
148 tagset := tilelib.taglib.Tags()
149 if len(tagset) == 0 {
150 return errors.New("cannot annotate library without tags")
152 taglen := len(tagset[0])
154 for name := range tilelib.refseqs {
155 refs = append(refs, name)
157 cmd.tag2tagid = make(map[string]tagID, len(tagset))
158 for tagid, tagseq := range tagset {
159 cmd.tag2tagid[string(tagseq)] = tagID(tagid)
162 log.Infof("len(refs) %d", len(refs))
164 outch := make(chan string, runtime.NumCPU()*2)
165 var outwg sync.WaitGroup
170 for s := range outch {
171 io.WriteString(outw, s)
177 for _, refcs := range tilelib.refseqs {
181 throttle := &throttle{Max: runtime.NumCPU()*2 + nseqs*2 + 1}
182 defer throttle.Wait()
184 for _, refname := range refs {
186 refcs := tilelib.refseqs[refname]
187 var seqnames []string
188 for seqname := range refcs {
189 seqnames = append(seqnames, seqname)
191 sort.Strings(seqnames)
192 for _, seqname := range seqnames {
195 if throttle.Err() != nil {
199 defer throttle.Release()
200 throttle.Report(cmd.annotateSequence(throttle, outch, tilelib, taglen, refname, seqname, refcs[seqname], len(refs) > 1))
205 return throttle.Err()
208 func (cmd *annotatecmd) annotateSequence(throttle *throttle, outch chan<- string, tilelib *tileLibrary, taglen int, refname, seqname string, reftiles []tileLibRef, refnamecol bool) error {
211 refnamefield = "," + trimFilenameForLabel(refname)
214 // tilestart[123] is the index into refseq
215 // where the tile for tag 123 was placed.
216 tilestart := map[tagID]int{}
217 tileend := map[tagID]int{}
218 for _, libref := range reftiles {
219 if libref.Variant < 1 {
220 return fmt.Errorf("reference %q seq %q uses variant zero at tag %d", refname, seqname, libref.Tag)
222 seq := tilelib.TileVariantSequence(libref)
223 if len(seq) < taglen {
224 return fmt.Errorf("reference %q seq %q uses tile %d variant %d with sequence len %d < taglen %d", refname, seqname, libref.Tag, libref.Variant, len(seq), taglen)
227 if len(refseq) == 0 {
230 tilestart[libref.Tag] = len(refseq) - overlap
231 refseq = append(refseq, seq[overlap:]...)
232 tileend[libref.Tag] = len(refseq)
234 log.Infof("seq %s len(refseq) %d len(tilestart) %d", seqname, len(refseq), len(tilestart))
235 // outtag is tag's index in the subset of tags that aren't
236 // dropped. If there are 10M tags and half are dropped by
237 // dropTiles, tag ranges from 0 to 10M-1 and outtag ranges
240 // IOW, in the matrix built by cgs2array(), {tag} is
241 // represented by columns {outtag}*2 and {outtag}*2+1.
243 for tag, tvs := range tilelib.variant {
244 if len(cmd.dropTiles) > tag && cmd.dropTiles[tag] {
249 // Must shadow outcol var to use safely in goroutine below.
251 refstart, ok := tilestart[tag]
253 // Tag didn't place on this reference
254 // sequence. (It might place on the same
255 // chromosome in a genome anyway, but we don't
256 // output the annotations that would result.)
257 // outch <- fmt.Sprintf("%d,%d,-1%s\n", tag, outcol, refnamefield)
260 for variant := 1; variant <= len(tvs); variant++ {
261 variant, hash := tileVariantID(variant), tvs[variant-1]
262 tileseq := tilelib.TileVariantSequence(tileLibRef{Tag: tag, Variant: variant})
263 if len(tileseq) == 0 {
265 } else if len(tileseq) < taglen {
266 return fmt.Errorf("tilevar %d,%d has sequence len %d < taglen %d", tag, variant, len(tileseq), taglen)
269 endtag := string(tileseq[len(tileseq)-taglen:])
270 if endtagid, ok := cmd.tag2tagid[endtag]; !ok {
271 // Tile variant doesn't end on a tag, so it can only place at the end of a chromosome.
272 refpart = refseq[refstart:]
273 log.Warnf("%x tilevar %d,%d endtag not in ref: %s", hash[:13], tag, variant, endtag)
274 } else if refendtagstart, ok := tilestart[endtagid]; !ok {
275 // Ref ends a chromsome with a (possibly very large) variant of this tile, but genomes with this tile don't.
276 // Give up. (TODO: something smarter)
277 log.Debugf("%x not annotating tilevar %d,%d because end tag %d is not in ref", hash[:13], tag, variant, endtagid)
280 // Non-terminal tile vs. non-terminal reference.
281 refpart = refseq[refstart : refendtagstart+taglen]
282 log.Tracef("\n%x tilevar %d,%d endtag %s endtagid %d refendtagstart %d", hash[:13], tag, variant, endtag, endtagid, refendtagstart)
284 if len(refpart) > cmd.maxTileSize {
285 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s pos %d ref len %d", hash[:13], tag, variant, refname, seqname, refstart, len(refpart))
288 if len(tileseq) > cmd.maxTileSize {
289 log.Warnf("%x tilevar %d,%d skipping long diff, ref %s seq %s variant len %d", hash[:13], tag, variant, refname, seqname, len(tileseq))
292 // log.Printf("\n%x @ refstart %d \n< %s\n> %s\n", tv.Blake2b, refstart, refpart, tileseq)
296 defer throttle.Release()
297 diffs, _ := hgvs.Diff(strings.ToUpper(string(refpart)), strings.ToUpper(string(tileseq)), 0)
298 for _, diff := range diffs {
299 diff.Position += refstart
302 varid = fmt.Sprintf("%x", hash)[:13]
304 varid = fmt.Sprintf("%d", variant)
306 outch <- fmt.Sprintf("%d,%d,%s%s,%s:g.%s\n", tag, outcol, varid, refnamefield, seqname, diff.String())
307 if cmd.reportAnnotation != nil {
308 cmd.reportAnnotation(tag, outcol, variant, refname, seqname, diff)