More memory for import.
[lightning.git] / import.go
1 package main
2
3 import (
4         "bufio"
5         "compress/gzip"
6         "encoding/gob"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "net/http"
12         _ "net/http/pprof"
13         "os"
14         "os/exec"
15         "path/filepath"
16         "regexp"
17         "runtime"
18         "sort"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         log "github.com/sirupsen/logrus"
26 )
27
28 type importer struct {
29         tagLibraryFile string
30         refFile        string
31         outputFile     string
32         projectUUID    string
33         runLocal       bool
34         skipOOO        bool
35         encoder        *gob.Encoder
36 }
37
38 func (cmd *importer) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
39         var err error
40         defer func() {
41                 if err != nil {
42                         fmt.Fprintf(stderr, "%s\n", err)
43                 }
44         }()
45         flags := flag.NewFlagSet("", flag.ContinueOnError)
46         flags.SetOutput(stderr)
47         flags.StringVar(&cmd.tagLibraryFile, "tag-library", "", "tag library fasta `file`")
48         flags.StringVar(&cmd.refFile, "ref", "", "reference fasta `file`")
49         flags.StringVar(&cmd.outputFile, "o", "-", "output `file`")
50         flags.StringVar(&cmd.projectUUID, "project", "", "project `UUID` for output data")
51         flags.BoolVar(&cmd.runLocal, "local", false, "run on local host (default: run in an arvados container)")
52         flags.BoolVar(&cmd.skipOOO, "skip-ooo", false, "skip out-of-order tags")
53         priority := flags.Int("priority", 500, "container request priority")
54         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
55         err = flags.Parse(args)
56         if err == flag.ErrHelp {
57                 err = nil
58                 return 0
59         } else if err != nil {
60                 return 2
61         } else if cmd.tagLibraryFile == "" {
62                 fmt.Fprintln(os.Stderr, "cannot import without -tag-library argument")
63                 return 2
64         } else if flags.NArg() == 0 {
65                 flags.Usage()
66                 return 2
67         }
68
69         if *pprof != "" {
70                 go func() {
71                         log.Println(http.ListenAndServe(*pprof, nil))
72                 }()
73         }
74
75         if !cmd.runLocal {
76                 runner := arvadosContainerRunner{
77                         Name:        "lightning import",
78                         Client:      arvados.NewClientFromEnv(),
79                         ProjectUUID: cmd.projectUUID,
80                         RAM:         60000000000,
81                         VCPUs:       16,
82                         Priority:    *priority,
83                 }
84                 err = runner.TranslatePaths(&cmd.tagLibraryFile, &cmd.refFile, &cmd.outputFile)
85                 if err != nil {
86                         return 1
87                 }
88                 inputs := flags.Args()
89                 for i := range inputs {
90                         err = runner.TranslatePaths(&inputs[i])
91                         if err != nil {
92                                 return 1
93                         }
94                 }
95                 if cmd.outputFile == "-" {
96                         cmd.outputFile = "/mnt/output/library.gob"
97                 } else {
98                         // Not yet implemented, but this should write
99                         // the collection to an existing collection,
100                         // possibly even an in-place update.
101                         err = errors.New("cannot specify output file in container mode: not implemented")
102                         return 1
103                 }
104                 runner.Args = append([]string{"import", "-local=true", fmt.Sprintf("-skip-ooo=%v", cmd.skipOOO), "-tag-library", cmd.tagLibraryFile, "-ref", cmd.refFile, "-o", cmd.outputFile}, inputs...)
105                 var output string
106                 output, err = runner.Run()
107                 if err != nil {
108                         return 1
109                 }
110                 fmt.Fprintln(stdout, output+"/library.gob")
111                 return 0
112         }
113
114         infiles, err := listInputFiles(flags.Args())
115         if err != nil {
116                 return 1
117         }
118
119         tilelib, err := cmd.loadTileLibrary()
120         if err != nil {
121                 return 1
122         }
123         go func() {
124                 for range time.Tick(10 * time.Minute) {
125                         log.Printf("tilelib.Len() == %d", tilelib.Len())
126                 }
127         }()
128
129         var output io.WriteCloser
130         if cmd.outputFile == "-" {
131                 output = nopCloser{stdout}
132         } else {
133                 output, err = os.OpenFile(cmd.outputFile, os.O_CREATE|os.O_WRONLY, 0777)
134                 if err != nil {
135                         return 1
136                 }
137                 defer output.Close()
138         }
139         bufw := bufio.NewWriter(output)
140         cmd.encoder = gob.NewEncoder(bufw)
141
142         err = cmd.tileInputs(tilelib, infiles)
143         if err != nil {
144                 return 1
145         }
146         err = bufw.Flush()
147         if err != nil {
148                 return 1
149         }
150         err = output.Close()
151         if err != nil {
152                 return 1
153         }
154         return 0
155 }
156
157 func (cmd *importer) tileFasta(tilelib *tileLibrary, infile string) (tileSeq, error) {
158         var input io.ReadCloser
159         input, err := os.Open(infile)
160         if err != nil {
161                 return nil, err
162         }
163         defer input.Close()
164         if strings.HasSuffix(infile, ".gz") {
165                 input, err = gzip.NewReader(input)
166                 if err != nil {
167                         return nil, err
168                 }
169                 defer input.Close()
170         }
171         return tilelib.TileFasta(infile, input)
172 }
173
174 func (cmd *importer) loadTileLibrary() (*tileLibrary, error) {
175         log.Printf("tag library %s load starting", cmd.tagLibraryFile)
176         f, err := os.Open(cmd.tagLibraryFile)
177         if err != nil {
178                 return nil, err
179         }
180         defer f.Close()
181         var rdr io.ReadCloser = f
182         if strings.HasSuffix(cmd.tagLibraryFile, ".gz") {
183                 rdr, err = gzip.NewReader(f)
184                 if err != nil {
185                         return nil, fmt.Errorf("%s: gzip: %s", cmd.tagLibraryFile, err)
186                 }
187                 defer rdr.Close()
188         }
189         var taglib tagLibrary
190         err = taglib.Load(rdr)
191         if err != nil {
192                 return nil, err
193         }
194         if taglib.Len() < 1 {
195                 return nil, fmt.Errorf("cannot tile: tag library is empty")
196         }
197         log.Printf("tag library %s load done", cmd.tagLibraryFile)
198         return &tileLibrary{taglib: &taglib, skipOOO: cmd.skipOOO}, nil
199 }
200
201 func listInputFiles(paths []string) (files []string, err error) {
202         for _, path := range paths {
203                 if fi, err := os.Stat(path); err != nil {
204                         return nil, fmt.Errorf("%s: stat failed: %s", path, err)
205                 } else if !fi.IsDir() {
206                         if !strings.HasSuffix(path, ".2.fasta") || strings.HasSuffix(path, ".2.fasta.gz") {
207                                 files = append(files, path)
208                         }
209                         continue
210                 }
211                 d, err := os.Open(path)
212                 if err != nil {
213                         return nil, fmt.Errorf("%s: open failed: %s", path, err)
214                 }
215                 defer d.Close()
216                 names, err := d.Readdirnames(0)
217                 if err != nil {
218                         return nil, fmt.Errorf("%s: readdir failed: %s", path, err)
219                 }
220                 sort.Strings(names)
221                 for _, name := range names {
222                         if strings.HasSuffix(name, ".vcf") || strings.HasSuffix(name, ".vcf.gz") {
223                                 files = append(files, filepath.Join(path, name))
224                         } else if strings.HasSuffix(name, ".1.fasta") || strings.HasSuffix(name, ".1.fasta.gz") {
225                                 files = append(files, filepath.Join(path, name))
226                         }
227                 }
228                 d.Close()
229         }
230         for _, file := range files {
231                 if strings.HasSuffix(file, ".1.fasta") || strings.HasSuffix(file, ".1.fasta.gz") {
232                         continue
233                 } else if _, err := os.Stat(file + ".csi"); err == nil {
234                         continue
235                 } else if _, err = os.Stat(file + ".tbi"); err == nil {
236                         continue
237                 } else {
238                         return nil, fmt.Errorf("%s: cannot read without .tbi or .csi index file", file)
239                 }
240         }
241         return
242 }
243
244 func (cmd *importer) tileInputs(tilelib *tileLibrary, infiles []string) error {
245         starttime := time.Now()
246         errs := make(chan error, 1)
247         todo := make(chan func() error, len(infiles)*2)
248         var encodeJobs sync.WaitGroup
249         for _, infile := range infiles {
250                 infile := infile
251                 var phases sync.WaitGroup
252                 phases.Add(2)
253                 variants := make([][]tileVariantID, 2)
254                 if strings.HasSuffix(infile, ".1.fasta") || strings.HasSuffix(infile, ".1.fasta.gz") {
255                         todo <- func() error {
256                                 defer phases.Done()
257                                 log.Printf("%s starting", infile)
258                                 defer log.Printf("%s done", infile)
259                                 tseqs, err := cmd.tileFasta(tilelib, infile)
260                                 variants[0] = tseqs.Variants()
261                                 return err
262                         }
263                         infile2 := regexp.MustCompile(`\.1\.fasta(\.gz)?$`).ReplaceAllString(infile, `.2.fasta$1`)
264                         todo <- func() error {
265                                 defer phases.Done()
266                                 log.Printf("%s starting", infile2)
267                                 defer log.Printf("%s done", infile2)
268                                 tseqs, err := cmd.tileFasta(tilelib, infile2)
269                                 variants[1] = tseqs.Variants()
270                                 return err
271                         }
272                 } else {
273                         for phase := 0; phase < 2; phase++ {
274                                 phase := phase
275                                 todo <- func() error {
276                                         defer phases.Done()
277                                         log.Printf("%s phase %d starting", infile, phase+1)
278                                         defer log.Printf("%s phase %d done", infile, phase+1)
279                                         tseqs, err := cmd.tileGVCF(tilelib, infile, phase)
280                                         variants[phase] = tseqs.Variants()
281                                         return err
282                                 }
283                         }
284                 }
285                 encodeJobs.Add(1)
286                 go func() {
287                         defer encodeJobs.Done()
288                         phases.Wait()
289                         if len(errs) > 0 {
290                                 return
291                         }
292                         ntags := len(variants[0])
293                         if ntags < len(variants[1]) {
294                                 ntags = len(variants[1])
295                         }
296                         flat := make([]tileVariantID, ntags*2)
297                         for i := 0; i < ntags; i++ {
298                                 for hap := 0; hap < 2; hap++ {
299                                         if i < len(variants[hap]) {
300                                                 flat[i*2+hap] = variants[hap][i]
301                                         }
302                                 }
303                         }
304                         err := cmd.encoder.Encode(LibraryEntry{
305                                 CompactGenomes: []CompactGenome{{Name: infile, Variants: flat}},
306                         })
307                         if err != nil {
308                                 select {
309                                 case errs <- err:
310                                 default:
311                                 }
312                         }
313                 }()
314         }
315         go close(todo)
316         var tileJobs sync.WaitGroup
317         var running int64
318         for i := 0; i < runtime.NumCPU()*9/8+1; i++ {
319                 tileJobs.Add(1)
320                 atomic.AddInt64(&running, 1)
321                 go func() {
322                         defer tileJobs.Done()
323                         defer atomic.AddInt64(&running, -1)
324                         for fn := range todo {
325                                 if len(errs) > 0 {
326                                         return
327                                 }
328                                 err := fn()
329                                 if err != nil {
330                                         select {
331                                         case errs <- err:
332                                         default:
333                                         }
334                                 }
335                                 remain := len(todo) + int(atomic.LoadInt64(&running)) - 1
336                                 ttl := time.Now().Sub(starttime) * time.Duration(remain) / time.Duration(cap(todo)-remain)
337                                 eta := time.Now().Add(ttl)
338                                 log.Printf("progress %d/%d, eta %v (%v)", cap(todo)-remain, cap(todo), eta, ttl)
339                         }
340                 }()
341         }
342         tileJobs.Wait()
343         encodeJobs.Wait()
344         go close(errs)
345         return <-errs
346 }
347
348 func (cmd *importer) tileGVCF(tilelib *tileLibrary, infile string, phase int) (tileseq tileSeq, err error) {
349         if cmd.refFile == "" {
350                 err = errors.New("cannot import vcf: reference data (-ref) not specified")
351                 return
352         }
353         args := []string{"bcftools", "consensus", "--fasta-ref", cmd.refFile, "-H", fmt.Sprint(phase + 1), infile}
354         indexsuffix := ".tbi"
355         if _, err := os.Stat(infile + ".csi"); err == nil {
356                 indexsuffix = ".csi"
357         }
358         if out, err := exec.Command("docker", "image", "ls", "-q", "lightning-runtime").Output(); err == nil && len(out) > 0 {
359                 args = append([]string{
360                         "docker", "run", "--rm",
361                         "--log-driver=none",
362                         "--volume=" + infile + ":" + infile + ":ro",
363                         "--volume=" + infile + indexsuffix + ":" + infile + indexsuffix + ":ro",
364                         "--volume=" + cmd.refFile + ":" + cmd.refFile + ":ro",
365                         "lightning-runtime",
366                 }, args...)
367         }
368         consensus := exec.Command(args[0], args[1:]...)
369         consensus.Stderr = os.Stderr
370         stdout, err := consensus.StdoutPipe()
371         defer stdout.Close()
372         if err != nil {
373                 return
374         }
375         err = consensus.Start()
376         if err != nil {
377                 return
378         }
379         defer consensus.Wait()
380         tileseq, err = tilelib.TileFasta(fmt.Sprintf("%s phase %d", infile, phase+1), stdout)
381         if err != nil {
382                 return
383         }
384         err = stdout.Close()
385         if err != nil {
386                 return
387         }
388         err = consensus.Wait()
389         if err != nil {
390                 err = fmt.Errorf("%s phase %d: bcftools: %s", infile, phase, err)
391                 return
392         }
393         return
394 }