Fix some tests.
[lightning.git] / slice.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "encoding/gob"
10         "errors"
11         "flag"
12         "fmt"
13         "io"
14         "net/http"
15         _ "net/http/pprof"
16         "os"
17         "path/filepath"
18         "runtime"
19         "strings"
20         "sync"
21         "sync/atomic"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/klauspost/pgzip"
25         log "github.com/sirupsen/logrus"
26 )
27
28 type slicecmd struct{}
29
30 func (cmd *slicecmd) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
31         var err error
32         defer func() {
33                 if err != nil {
34                         fmt.Fprintf(stderr, "%s\n", err)
35                 }
36         }()
37         flags := flag.NewFlagSet("", flag.ContinueOnError)
38         flags.SetOutput(stderr)
39         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
40         pprofdir := flags.String("pprof-dir", "", "write Go profile data to `directory` periodically")
41         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
42         projectUUID := flags.String("project", "", "project `UUID` for output data")
43         priority := flags.Int("priority", 500, "container request priority")
44         preemptible := flags.Bool("preemptible", true, "request preemptible instance")
45         outputDir := flags.String("output-dir", "./out", "output `directory`")
46         tagsPerFile := flags.Int("tags-per-file", 50000, "tags per file (nfiles will be ~10M÷x)")
47         err = flags.Parse(args)
48         if err == flag.ErrHelp {
49                 err = nil
50                 return 0
51         } else if err != nil {
52                 return 2
53         }
54         inputDirs := flags.Args()
55         if len(inputDirs) == 0 {
56                 err = errors.New("no input dirs specified")
57                 return 2
58         }
59
60         if *pprof != "" {
61                 go func() {
62                         log.Println(http.ListenAndServe(*pprof, nil))
63                 }()
64         }
65         if *pprofdir != "" {
66                 go writeProfilesPeriodically(*pprofdir)
67         }
68
69         if !*runlocal {
70                 runner := arvadosContainerRunner{
71                         Name:        "lightning slice",
72                         Client:      arvados.NewClientFromEnv(),
73                         ProjectUUID: *projectUUID,
74                         RAM:         500000000000,
75                         VCPUs:       64,
76                         Priority:    *priority,
77                         KeepCache:   2,
78                         APIAccess:   true,
79                         Preemptible: *preemptible,
80                 }
81                 for i := range inputDirs {
82                         err = runner.TranslatePaths(&inputDirs[i])
83                         if err != nil {
84                                 return 1
85                         }
86                 }
87                 runner.Args = append([]string{"slice", "-local=true",
88                         "-pprof", ":6060",
89                         "-output-dir", "/mnt/output",
90                 }, inputDirs...)
91                 var output string
92                 output, err = runner.Run()
93                 if err != nil {
94                         return 1
95                 }
96                 fmt.Fprintln(stdout, output)
97                 return 0
98         }
99
100         err = Slice(*tagsPerFile, *outputDir, inputDirs)
101         if err != nil {
102                 return 1
103         }
104         return 0
105 }
106
107 // Read tags+tiles+genomes from srcdir, write to dstdir with (up to)
108 // the specified number of tags per file.
109 func Slice(tagsPerFile int, dstdir string, srcdirs []string) error {
110         var infiles []string
111         for _, srcdir := range srcdirs {
112                 files, err := allFiles(srcdir, matchGobFile)
113                 if err != nil {
114                         return err
115                 }
116                 infiles = append(infiles, files...)
117         }
118         // dirNamespace[dir] is an int in [0,len(dirNamespace)), used below to
119         // namespace variant numbers from different dirs.
120         dirNamespace := map[string]tileVariantID{}
121         for _, path := range infiles {
122                 dir, _ := filepath.Split(path)
123                 if _, ok := dirNamespace[dir]; !ok {
124                         dirNamespace[dir] = tileVariantID(len(dirNamespace))
125                 }
126         }
127         namespaces := tileVariantID(len(dirNamespace))
128
129         var (
130                 tagset     [][]byte
131                 tagsetOnce sync.Once
132                 fs         []*os.File
133                 bufws      []*bufio.Writer
134                 gzws       []*pgzip.Writer
135                 encs       []*gob.Encoder
136
137                 countTileVariants int64
138                 countGenomes      int64
139                 countReferences   int64
140         )
141
142         throttle := throttle{Max: runtime.GOMAXPROCS(0)}
143         for _, infile := range infiles {
144                 infile := infile
145                 throttle.Go(func() error {
146                         f, err := open(infile)
147                         if err != nil {
148                                 return err
149                         }
150                         defer f.Close()
151                         dir, _ := filepath.Split(infile)
152                         namespace := dirNamespace[dir]
153                         log.Printf("reading %s (namespace %d)", infile, namespace)
154                         return DecodeLibrary(f, strings.HasSuffix(infile, ".gz"), func(ent *LibraryEntry) error {
155                                 if err := throttle.Err(); err != nil {
156                                         return err
157                                 }
158                                 if len(ent.TagSet) > 0 {
159                                         tagsetOnce.Do(func() {
160                                                 tagset = ent.TagSet
161                                                 var err error
162                                                 fs, bufws, gzws, encs, err = openOutFiles(dstdir, len(ent.TagSet), tagsPerFile)
163                                                 if err != nil {
164                                                         throttle.Report(err)
165                                                         return
166                                                 }
167                                                 for _, enc := range encs {
168                                                         err = enc.Encode(LibraryEntry{TagSet: tagset})
169                                                         if err != nil {
170                                                                 throttle.Report(err)
171                                                                 return
172                                                         }
173                                                 }
174                                         })
175                                 }
176                                 if err := throttle.Err(); err != nil {
177                                         return err
178                                 }
179                                 atomic.AddInt64(&countTileVariants, int64(len(ent.TileVariants)))
180                                 for _, tv := range ent.TileVariants {
181                                         tv.Variant = tv.Variant*namespaces + namespace
182                                         fileno := 0
183                                         if !tv.Ref {
184                                                 fileno = int(tv.Tag) / tagsPerFile
185                                         }
186                                         err := encs[fileno].Encode(LibraryEntry{
187                                                 TileVariants: []TileVariant{tv},
188                                         })
189                                         if err != nil {
190                                                 return err
191                                         }
192                                 }
193                                 // Here, each output file gets a
194                                 // CompactGenome entry for each
195                                 // genome, even if there are no
196                                 // variants in the relevant range.
197                                 // Easier for downstream code.
198                                 atomic.AddInt64(&countGenomes, int64(len(ent.CompactGenomes)))
199                                 for _, cg := range ent.CompactGenomes {
200                                         for i, v := range cg.Variants {
201                                                 if v > 0 {
202                                                         cg.Variants[i] = v*namespaces + namespace
203                                                 }
204                                         }
205                                         for i, enc := range encs {
206                                                 start := i * tagsPerFile
207                                                 end := start + tagsPerFile
208                                                 if max := len(cg.Variants)/2 + int(cg.StartTag); end > max {
209                                                         end = max
210                                                 }
211                                                 if start < int(cg.StartTag) {
212                                                         start = int(cg.StartTag)
213                                                 }
214                                                 var variants []tileVariantID
215                                                 if start < end {
216                                                         variants = cg.Variants[(start-int(cg.StartTag))*2 : (end-int(cg.StartTag))*2]
217                                                 }
218                                                 err := enc.Encode(LibraryEntry{CompactGenomes: []CompactGenome{{
219                                                         Name:     cg.Name,
220                                                         Variants: variants,
221                                                         StartTag: tagID(start),
222                                                         EndTag:   tagID(start + tagsPerFile),
223                                                 }}})
224                                                 if err != nil {
225                                                         return err
226                                                 }
227                                         }
228                                 }
229                                 // Write all ref seqs to the first
230                                 // slice. Easier for downstream code.
231                                 atomic.AddInt64(&countReferences, int64(len(ent.CompactSequences)))
232                                 if len(ent.CompactSequences) > 0 {
233                                         for _, cs := range ent.CompactSequences {
234                                                 for _, tseq := range cs.TileSequences {
235                                                         for i, libref := range tseq {
236                                                                 tseq[i].Variant = libref.Variant*namespaces + namespace
237                                                         }
238                                                 }
239                                         }
240                                         err := encs[0].Encode(LibraryEntry{CompactSequences: ent.CompactSequences})
241                                         if err != nil {
242                                                 return err
243                                         }
244                                 }
245                                 return nil
246                         })
247                 })
248         }
249         throttle.Wait()
250         if throttle.Err() != nil {
251                 closeOutFiles(fs, bufws, gzws, encs)
252                 return throttle.Err()
253         }
254         defer log.Printf("Total %d tile variants, %d genomes, %d reference sequences", countTileVariants, countGenomes, countReferences)
255         return closeOutFiles(fs, bufws, gzws, encs)
256 }
257
258 func openOutFiles(dstdir string, tags, tagsPerFile int) (fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder, err error) {
259         nfiles := (tags + tagsPerFile - 1) / tagsPerFile
260         fs = make([]*os.File, nfiles)
261         bufws = make([]*bufio.Writer, nfiles)
262         gzws = make([]*pgzip.Writer, nfiles)
263         encs = make([]*gob.Encoder, nfiles)
264         for i := 0; i*tagsPerFile < tags; i++ {
265                 fs[i], err = os.Create(dstdir + fmt.Sprintf("/library%04d.gob.gz", i))
266                 if err != nil {
267                         return
268                 }
269                 bufws[i] = bufio.NewWriterSize(fs[i], 1<<26)
270                 gzws[i] = pgzip.NewWriter(bufws[i])
271                 encs[i] = gob.NewEncoder(gzws[i])
272         }
273         return
274 }
275
276 func closeOutFiles(fs []*os.File, bufws []*bufio.Writer, gzws []*pgzip.Writer, encs []*gob.Encoder) error {
277         var firstErr error
278         for _, gzw := range gzws {
279                 if gzw != nil {
280                         err := gzw.Close()
281                         if err != nil && firstErr == nil {
282                                 firstErr = err
283                         }
284                 }
285         }
286         for _, bufw := range bufws {
287                 if bufw != nil {
288                         err := bufw.Flush()
289                         if err != nil && firstErr == nil {
290                                 firstErr = err
291                         }
292                 }
293         }
294         for _, f := range fs {
295                 if f != nil {
296                         err := f.Close()
297                         if err != nil && firstErr == nil {
298                                 firstErr = err
299                         }
300                 }
301         }
302         return firstErr
303 }