Log total variant/genome/ref counts.
[lightning.git] / slice.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "encoding/gob"
10         "flag"
11         "fmt"
12         "io"
13         "net/http"
14         _ "net/http/pprof"
15         "os"
16         "runtime"
17         "strings"
18         "sync"
19         "sync/atomic"
20
21         "git.arvados.org/arvados.git/sdk/go/arvados"
22         "github.com/klauspost/pgzip"
23         log "github.com/sirupsen/logrus"
24 )
25
26 type slicecmd struct{}
27
28 func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
29         var err error
30         defer func() {
31                 if err != nil {
32                         fmt.Fprintf(stderr, "%s\n", err)
33                 }
34         }()
35         flags := flag.NewFlagSet("", flag.ContinueOnError)
36         flags.SetOutput(stderr)
37         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
38         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
39         projectUUID := flags.String("project", "", "project `UUID` for output data")
40         priority := flags.Int("priority", 500, "container request priority")
41         inputDir := flags.String("input-dir", "./in", "input `directory`")
42         outputDir := flags.String("output-dir", "./out", "output `directory`")
43         tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
44         err = flags.Parse(args)
45         if err == flag.ErrHelp {
46                 err = nil
47                 return 0
48         } else if err != nil {
49                 return 2
50         }
51
52         if *pprof != "" {
53                 go func() {
54                         log.Println(http.ListenAndServe(*pprof, nil))
55                 }()
56         }
57
58         if !*runlocal {
59                 runner := arvadosContainerRunner{
60                         Name:        "lightning slice",
61                         Client:      arvados.NewClientFromEnv(),
62                         ProjectUUID: *projectUUID,
63                         RAM:         200000000000,
64                         VCPUs:       32,
65                         Priority:    *priority,
66                         KeepCache:   50,
67                         APIAccess:   true,
68                 }
69                 err = runner.TranslatePaths(inputDir)
70                 if err != nil {
71                         return 1
72                 }
73                 runner.Args = []string{"slice", "-local=true",
74                         "-pprof", ":6060",
75                         "-input-dir", *inputDir,
76                         "-output-dir", "/mnt/output",
77                 }
78                 var output string
79                 output, err = runner.Run()
80                 if err != nil {
81                         return 1
82                 }
83                 fmt.Fprintln(stdout, output)
84                 return 0
85         }
86
87         err = Slice(*outputDir, *inputDir, *tagsPerFile)
88         if err != nil {
89                 return 1
90         }
91         return 0
92 }
93
94 // Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
95 // the specified number of tags per file.
96 func Slice(dstdir, srcdir string, tagsPerFile int) error {
97         infiles, err := allGobFiles(srcdir)
98         if err != nil {
99                 return err
100         }
101
102         var (
103                 tagset     [][]byte
104                 tagsetOnce sync.Once
105                 fs         []*os.File
106                 bufws      []*bufio.Writer
107                 gzws       []*pgzip.Writer
108                 encs       []*gob.Encoder
109
110                 countTileVariants int64
111                 countGenomes      int64
112                 countReferences   int64
113         )
114
115         throttle := throttle{Max: runtime.GOMAXPROCS(0)}
116         for _, path := range infiles {
117                 path := path
118                 throttle.Acquire()
119                 go func() {
120                         defer throttle.Release()
121                         f, err := open(path)
122                         if err != nil {
123                                 throttle.Report(err)
124                                 return
125                         }
126                         defer f.Close()
127                         log.Printf("reading %s", path)
128                         err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
129                                 if err := throttle.Err(); err != nil {
130                                         return err
131                                 }
132                                 if len(ent.TagSet) > 0 {
133                                         tagsetOnce.Do(func() {
134                                                 tagset = ent.TagSet
135                                                 var err error
136                                                 fs, bufws, gzws, encs, err = openOutFiles(dstdir, len(ent.TagSet), tagsPerFile)
137                                                 if err != nil {
138                                                         throttle.Report(err)
139                                                         return
140                                                 }
141                                                 for _, enc := range encs {
142                                                         err = enc.Encode(LibraryEntry{TagSet: tagset})
143                                                         if err != nil {
144                                                                 throttle.Report(err)
145                                                                 return
146                                                         }
147                                                 }
148                                         })
149                                 }
150                                 if err := throttle.Err(); err != nil {
151                                         return err
152                                 }
153                                 atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
154                                 for _, tv := range ent.TileVariants {
155                                         err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
156                                                 TileVariants: []TileVariant{tv},
157                                         })
158                                         if err != nil {
159                                                 return err
160                                         }
161                                 }
162                                 // Here, each output file gets a
163                                 // CompactGenome entry for each
164                                 // genome, even if there are no
165                                 // variants in the relevant range.
166                                 // Easier for downstream code.
167                                 atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes)))
168                                 for _, cg := range ent.CompactGenomes {
169                                         for i, enc := range encs {
170                                                 start := i * tagsPerFile
171                                                 end := start + tagsPerFile
172                                                 if max := len(cg.Variants)/2 + int(cg.StartTag); end > max {
173                                                         end = max
174                                                 }
175                                                 if start < int(cg.StartTag) {
176                                                         start = int(cg.StartTag)
177                                                 }
178                                                 var variants []tileVariantID
179                                                 if start < end {
180                                                         variants = cg.Variants[(start-int(cg.StartTag))*2 : (end-int(cg.StartTag))*2]
181                                                 }
182                                                 err := enc.Encode(LibraryEntry{CompactGenomes: []CompactGenome{{
183                                                         Name:     cg.Name,
184                                                         Variants: variants,
185                                                         StartTag: tagID(start),
186                                                         EndTag:   tagID(start + tagsPerFile),
187                                                 }}})
188                                                 if err != nil {
189                                                         return err
190                                                 }
191                                         }
192                                 }
193                                 // Write all ref seqs to the first
194                                 // slice. Easier for downstream code.
195                                 atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences)))
196                                 if len(ent.CompactSequences) > 0 {
197                                         err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
198                                         if err != nil {
199                                                 return err
200                                         }
201                                 }
202                                 return nil
203                         })
204                         throttle.Report(err)
205                 }()
206         }
207         throttle.Wait()
208         if throttle.Err() != nil {
209                 closeOutFiles(fs, bufws, gzws, encs)
210                 return throttle.Err()
211         }
212         defer log.Printf("Total %d tile variants, %d genomes, %d reference sequences", countTileVariants, countGenomes, countReferences)
213         return closeOutFiles(fs, bufws, gzws, encs)
214 }
215
216 func openOutFiles(dstdir string, tags, tagsPerFile int) (fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder, err error) {
217         nfiles := (tags + tagsPerFile - 1) / tagsPerFile
218         fs = make([]*os.File, nfiles)
219         bufws = make([]*bufio.Writer, nfiles)
220         gzws = make([]*pgzip.Writer, nfiles)
221         encs = make([]*gob.Encoder, nfiles)
222         for i := 0; i*tagsPerFile < tags; i++ {
223                 fs[i], err = os.Create(dstdir + fmt.Sprintf("/library%04d.gob.gz", i))
224                 if err != nil {
225                         return
226                 }
227                 bufws[i] = bufio.NewWriterSize(fs[i], 1<<26)
228                 gzws[i] = pgzip.NewWriter(bufws[i])
229                 encs[i] = gob.NewEncoder(gzws[i])
230         }
231         return
232 }
233
234 func closeOutFiles(fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder) error {
235         var firstErr error
236         for _, gzw := range gzws {
237                 if gzw != nil {
238                         err := gzw.Close()
239                         if err != nil && firstErr == nil {
240                                 firstErr = err
241                         }
242                 }
243         }
244         for _, bufw := range bufws {
245                 if bufw != nil {
246                         err := bufw.Flush()
247                         if err != nil && firstErr == nil {
248                                 firstErr = err
249                         }
250                 }
251         }
252         for _, f := range fs {
253                 if f != nil {
254                         err := f.Close()
255                         if err != nil && firstErr == nil {
256                                 firstErr = err
257                         }
258                 }
259         }
260         return firstErr
261 }