Fix mem usage, improve logging.
[lightning.git] / slice.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "encoding/gob"
10         "flag"
11         "fmt"
12         "io"
13         "net/http"
14         _ "net/http/pprof"
15         "os"
16         "runtime"
17         "strings"
18         "sync"
19
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "github.com/klauspost/pgzip"
22         log "github.com/sirupsen/logrus"
23 )
24
25 type slicecmd struct{}
26
27 func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
28         var err error
29         defer func() {
30                 if err != nil {
31                         fmt.Fprintf(stderr, "%s\n", err)
32                 }
33         }()
34         flags := flag.NewFlagSet("", flag.ContinueOnError)
35         flags.SetOutput(stderr)
36         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
37         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
38         projectUUID := flags.String("project", "", "project `UUID` for output data")
39         priority := flags.Int("priority", 500, "container request priority")
40         inputDir := flags.String("input-dir", "./in", "input `directory`")
41         outputDir := flags.String("output-dir", "./out", "output `directory`")
42         tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
43         err = flags.Parse(args)
44         if err == flag.ErrHelp {
45                 err = nil
46                 return 0
47         } else if err != nil {
48                 return 2
49         }
50
51         if *pprof != "" {
52                 go func() {
53                         log.Println(http.ListenAndServe(*pprof, nil))
54                 }()
55         }
56
57         if !*runlocal {
58                 runner := arvadosContainerRunner{
59                         Name:        "lightning slice",
60                         Client:      arvados.NewClientFromEnv(),
61                         ProjectUUID: *projectUUID,
62                         RAM:         200000000000,
63                         VCPUs:       32,
64                         Priority:    *priority,
65                         KeepCache:   50,
66                         APIAccess:   true,
67                 }
68                 err = runner.TranslatePaths(inputDir)
69                 if err != nil {
70                         return 1
71                 }
72                 runner.Args = []string{"slice", "-local=true",
73                         "-pprof", ":6060",
74                         "-input-dir", *inputDir,
75                         "-output-dir", "/mnt/output",
76                 }
77                 var output string
78                 output, err = runner.Run()
79                 if err != nil {
80                         return 1
81                 }
82                 fmt.Fprintln(stdout, output)
83                 return 0
84         }
85
86         err = Slice(*outputDir, *inputDir, *tagsPerFile)
87         if err != nil {
88                 return 1
89         }
90         return 0
91 }
92
93 // Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
94 // the specified number of tags per file.
95 func Slice(dstdir, srcdir string, tagsPerFile int) error {
96         infiles, err := allGobFiles(srcdir)
97         if err != nil {
98                 return err
99         }
100
101         var (
102                 tagset     [][]byte
103                 tagsetOnce sync.Once
104                 fs         []*os.File
105                 bufws      []*bufio.Writer
106                 gzws       []*pgzip.Writer
107                 encs       []*gob.Encoder
108         )
109
110         throttle := throttle{Max: runtime.GOMAXPROCS(0)}
111         for _, path := range infiles {
112                 path := path
113                 throttle.Acquire()
114                 go func() {
115                         defer throttle.Release()
116                         f, err := open(path)
117                         if err != nil {
118                                 throttle.Report(err)
119                                 return
120                         }
121                         defer f.Close()
122                         log.Printf("reading %s", path)
123                         err = DecodeLibrary(f, strings.HasSuffix(path, ".gz"), func(ent *LibraryEntry) error {
124                                 if err := throttle.Err(); err != nil {
125                                         return err
126                                 }
127                                 if len(ent.TagSet) > 0 {
128                                         tagsetOnce.Do(func() {
129                                                 tagset = ent.TagSet
130                                                 var err error
131                                                 fs, bufws, gzws, encs, err = openOutFiles(dstdir, len(ent.TagSet), tagsPerFile)
132                                                 if err != nil {
133                                                         throttle.Report(err)
134                                                         return
135                                                 }
136                                                 for _, enc := range encs {
137                                                         err = enc.Encode(LibraryEntry{TagSet: tagset})
138                                                         if err != nil {
139                                                                 throttle.Report(err)
140                                                                 return
141                                                         }
142                                                 }
143                                         })
144                                 }
145                                 if err := throttle.Err(); err != nil {
146                                         return err
147                                 }
148                                 for _, tv := range ent.TileVariants {
149                                         err := encs[int(tv.Tag)/tagsPerFile].Encode(LibraryEntry{
150                                                 TileVariants: []TileVariant{tv},
151                                         })
152                                         if err != nil {
153                                                 return err
154                                         }
155                                 }
156                                 // Here, each output file gets a
157                                 // CompactGenome entry for each
158                                 // genome, even if there are no
159                                 // variants in the relevant range.
160                                 // Easier for downstream code.
161                                 for _, cg := range ent.CompactGenomes {
162                                         for i, enc := range encs {
163                                                 start := i * tagsPerFile
164                                                 end := start + tagsPerFile
165                                                 if max := len(cg.Variants)/2 + int(cg.StartTag); end > max {
166                                                         end = max
167                                                 }
168                                                 if start < int(cg.StartTag) {
169                                                         start = int(cg.StartTag)
170                                                 }
171                                                 var variants []tileVariantID
172                                                 if start < end {
173                                                         variants = cg.Variants[(start-int(cg.StartTag))*2 : (end-int(cg.StartTag))*2]
174                                                 }
175                                                 err := enc.Encode(LibraryEntry{CompactGenomes: []CompactGenome{{
176                                                         Name:     cg.Name,
177                                                         Variants: variants,
178                                                         StartTag: tagID(start),
179                                                         EndTag:   tagID(start + tagsPerFile),
180                                                 }}})
181                                                 if err != nil {
182                                                         return err
183                                                 }
184                                         }
185                                 }
186                                 // Write all ref seqs to the first
187                                 // slice. Easier for downstream code.
188                                 if len(ent.CompactSequences) > 0 {
189                                         err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
190                                         if err != nil {
191                                                 return err
192                                         }
193                                 }
194                                 return nil
195                         })
196                         throttle.Report(err)
197                 }()
198         }
199         throttle.Wait()
200         if throttle.Err() != nil {
201                 closeOutFiles(fs, bufws, gzws, encs)
202                 return throttle.Err()
203         }
204         return closeOutFiles(fs, bufws, gzws, encs)
205 }
206
207 func openOutFiles(dstdir string, tags, tagsPerFile int) (fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder, err error) {
208         nfiles := (tags + tagsPerFile - 1) / tagsPerFile
209         fs = make([]*os.File, nfiles)
210         bufws = make([]*bufio.Writer, nfiles)
211         gzws = make([]*pgzip.Writer, nfiles)
212         encs = make([]*gob.Encoder, nfiles)
213         for i := 0; i*tagsPerFile < tags; i++ {
214                 fs[i], err = os.Create(dstdir + fmt.Sprintf("/library%04d.gob.gz", i))
215                 if err != nil {
216                         return
217                 }
218                 bufws[i] = bufio.NewWriterSize(fs[i], 1<<26)
219                 gzws[i] = pgzip.NewWriter(bufws[i])
220                 encs[i] = gob.NewEncoder(gzws[i])
221         }
222         return
223 }
224
225 func closeOutFiles(fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder) error {
226         var firstErr error
227         for _, gzw := range gzws {
228                 if gzw != nil {
229                         err := gzw.Close()
230                         if err != nil && firstErr == nil {
231                                 firstErr = err
232                         }
233                 }
234         }
235         for _, bufw := range bufws {
236                 if bufw != nil {
237                         err := bufw.Flush()
238                         if err != nil && firstErr == nil {
239                                 firstErr = err
240                         }
241                 }
242         }
243         for _, f := range fs {
244                 if f != nil {
245                         err := f.Close()
246                         if err != nil && firstErr == nil {
247                                 firstErr = err
248                         }
249                 }
250         }
251         return firstErr
252 }