19073: Fix dup tag detection.
[lightning.git] / tilestats.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "flag"
10         "fmt"
11         "io"
12         "net/http"
13         _ "net/http/pprof"
14         "os"
15         "path/filepath"
16         "sort"
17         "strings"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         log "github.com/sirupsen/logrus"
21 )
22
23 type tilingStats struct {
24 }
25
26 func (cmd *tilingStats) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
27         var err error
28         defer func() {
29                 if err != nil {
30                         fmt.Fprintf(stderr, "%s\n", err)
31                 }
32         }()
33         flags := flag.NewFlagSet("", flag.ContinueOnError)
34         flags.SetOutput(stderr)
35         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
36         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
37         projectUUID := flags.String("project", "", "project `UUID` for output data")
38         priority := flags.Int("priority", 500, "container request priority")
39         inputDir := flags.String("input-dir", "./in", "input `directory`")
40         outputDir := flags.String("output-dir", "./out", "output `directory`")
41         err = flags.Parse(args)
42         if err == flag.ErrHelp {
43                 err = nil
44                 return 0
45         } else if err != nil {
46                 return 2
47         }
48
49         if *pprof != "" {
50                 go func() {
51                         log.Println(http.ListenAndServe(*pprof, nil))
52                 }()
53         }
54
55         if !*runlocal {
56                 runner := arvadosContainerRunner{
57                         Name:        "lightning tiling-stats",
58                         Client:      arvados.NewClientFromEnv(),
59                         ProjectUUID: *projectUUID,
60                         RAM:         8000000000,
61                         VCPUs:       2,
62                         Priority:    *priority,
63                         KeepCache:   2,
64                         APIAccess:   true,
65                 }
66                 err = runner.TranslatePaths(inputDir)
67                 if err != nil {
68                         return 1
69                 }
70                 runner.Args = []string{"tiling-stats", "-local=true",
71                         "-pprof=:6060",
72                         "-input-dir=" + *inputDir,
73                         "-output-dir=/mnt/output",
74                 }
75                 var output string
76                 output, err = runner.Run()
77                 if err != nil {
78                         return 1
79                 }
80                 fmt.Fprintln(stdout, output)
81                 return 0
82         }
83
84         infiles, err := allFiles(*inputDir, matchGobFile)
85         if err != nil {
86                 return 1
87         }
88         if len(infiles) == 0 {
89                 err = fmt.Errorf("no input files found in %s", *inputDir)
90                 return 1
91         }
92         sort.Strings(infiles)
93
94         var refseqs []CompactSequence
95         var reftiledata = make(map[tileLibRef][]byte, 11000000)
96         in0, err := open(infiles[0])
97         if err != nil {
98                 return 1
99         }
100         defer in0.Close()
101         var taglen int
102         err = DecodeLibrary(in0, strings.HasSuffix(infiles[0], ".gz"), func(ent *LibraryEntry) error {
103                 if len(ent.TagSet) > 0 {
104                         taglen = len(ent.TagSet[0])
105                 }
106                 refseqs = append(refseqs, ent.CompactSequences...)
107                 for _, tv := range ent.TileVariants {
108                         if tv.Ref {
109                                 reftiledata[tileLibRef{tv.Tag, tv.Variant}] = tv.Sequence
110                         }
111                 }
112                 return nil
113         })
114         if err != nil {
115                 return 1
116         }
117         in0.Close()
118         if len(refseqs) == 0 {
119                 err = fmt.Errorf("%s: reference sequence not found", infiles[0])
120                 return 1
121         }
122         if taglen == 0 {
123                 err = fmt.Errorf("%s: tagset not found", infiles[0])
124                 return 1
125         }
126
127         for _, cseq := range refseqs {
128                 _, basename := filepath.Split(cseq.Name)
129                 bedname := fmt.Sprintf("%s/%s.bed", *outputDir, basename)
130                 log.Infof("writing %s", bedname)
131                 var f *os.File
132                 f, err = os.OpenFile(bedname, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0777)
133                 if err != nil {
134                         return 1
135                 }
136                 defer f.Close()
137                 bufw := bufio.NewWriterSize(f, 1<<24)
138                 seqnames := make([]string, 0, len(cseq.TileSequences))
139                 for seqname := range cseq.TileSequences {
140                         seqnames = append(seqnames, seqname)
141                 }
142                 sort.Strings(seqnames)
143                 // Mark duplicate tags (tags that place more than once
144                 // on the reference)
145                 duptag := map[tagID]bool{}
146                 for _, seqname := range seqnames {
147                         for _, libref := range cseq.TileSequences[seqname] {
148                                 if dup, seen := duptag[libref.Tag]; seen && !dup {
149                                         duptag[libref.Tag] = true
150                                 } else {
151                                         duptag[libref.Tag] = false
152                                 }
153                         }
154                 }
155                 for _, seqname := range seqnames {
156                         pos := 0
157                         for _, libref := range cseq.TileSequences[seqname] {
158                                 if duptag[libref.Tag] {
159                                         continue
160                                 }
161                                 tiledata := reftiledata[libref]
162                                 if len(tiledata) <= taglen {
163                                         err = fmt.Errorf("bogus input data: ref tile libref %v has len %d < taglen %d", libref, len(tiledata), taglen)
164                                         return 1
165                                 }
166                                 score := 1000 * countBases(tiledata) / len(tiledata)
167                                 _, err = fmt.Fprintf(bufw, "%s %d %d %d %d . %d %d\n",
168                                         seqname,
169                                         pos, pos+len(tiledata),
170                                         libref.Tag,
171                                         score,
172                                         pos+taglen, pos+len(tiledata)-taglen)
173                                 if err != nil {
174                                         return 1
175                                 }
176                                 pos += len(tiledata) - taglen
177                         }
178                 }
179                 err = bufw.Flush()
180                 if err != nil {
181                         return 1
182                 }
183                 err = f.Close()
184                 if err != nil {
185                         return 1
186                 }
187         }
188         return 0
189 }