a55f47228db43b99c124a949c06f1642afa9df45
[lightning.git] / merge.go
1 package main
2
3 import (
4         "bufio"
5         "context"
6         "encoding/gob"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         _ "net/http/pprof"
14         "os"
15         "sync"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         log "github.com/sirupsen/logrus"
19 )
20
21 type merger struct {
22         stdin   io.Reader
23         inputs  []string
24         output  io.WriteCloser
25         tagSet  [][]byte
26         tilelib *tileLibrary
27         mapped  map[string]map[tileLibRef]tileVariantID
28         mtxTags sync.Mutex
29         errs    chan error
30 }
31
32 func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
33         var err error
34         defer func() {
35                 if err != nil {
36                         fmt.Fprintf(stderr, "%s\n", err)
37                 }
38         }()
39         flags := flag.NewFlagSet("", flag.ContinueOnError)
40         flags.SetOutput(stderr)
41         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
42         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
43         projectUUID := flags.String("project", "", "project `UUID` for output data")
44         priority := flags.Int("priority", 500, "container request priority")
45         outputFilename := flags.String("o", "-", "output `file`")
46         err = flags.Parse(args)
47         if err == flag.ErrHelp {
48                 err = nil
49                 return 0
50         } else if err != nil {
51                 return 2
52         }
53         cmd.stdin = stdin
54         cmd.inputs = flags.Args()
55
56         if *pprof != "" {
57                 go func() {
58                         log.Println(http.ListenAndServe(*pprof, nil))
59                 }()
60         }
61
62         if !*runlocal {
63                 if *outputFilename != "-" {
64                         err = errors.New("cannot specify output file in container mode: not implemented")
65                         return 1
66                 }
67                 runner := arvadosContainerRunner{
68                         Name:        "lightning filter",
69                         Client:      arvados.NewClientFromEnv(),
70                         ProjectUUID: *projectUUID,
71                         RAM:         64000000000,
72                         VCPUs:       2,
73                         Priority:    *priority,
74                 }
75                 for i := range cmd.inputs {
76                         err = runner.TranslatePaths(&cmd.inputs[i])
77                         if err != nil {
78                                 return 1
79                         }
80                 }
81                 runner.Args = append([]string{"merge", "-local=true",
82                         "-o", "/mnt/output/library.gob",
83                 }, cmd.inputs...)
84                 var output string
85                 output, err = runner.Run()
86                 if err != nil {
87                         return 1
88                 }
89                 fmt.Fprintln(stdout, output+"/library.gob")
90                 return 0
91         }
92
93         if *outputFilename == "-" {
94                 cmd.output = nopCloser{stdout}
95         } else {
96                 cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
97                 if err != nil {
98                         return 1
99                 }
100                 defer cmd.output.Close()
101         }
102
103         err = cmd.doMerge()
104         if err != nil {
105                 return 1
106         }
107         err = cmd.output.Close()
108         if err != nil {
109                 return 1
110         }
111         return 0
112 }
113
114 func (cmd *merger) setError(err error) {
115         select {
116         case cmd.errs <- err:
117         default:
118         }
119 }
120
121 func (cmd *merger) doMerge() error {
122         w := bufio.NewWriter(cmd.output)
123         encoder := gob.NewEncoder(w)
124
125         ctx, cancel := context.WithCancel(context.Background())
126         defer cancel()
127
128         cmd.errs = make(chan error, 1)
129         cmd.tilelib = &tileLibrary{
130                 encoder:        encoder,
131                 includeNoCalls: true,
132                 onLoadGenome: func(cg CompactGenome) {
133                         err := encoder.Encode(LibraryEntry{CompactGenomes: []CompactGenome{cg}})
134                         if err != nil {
135                                 cmd.setError(err)
136                                 cancel()
137                         }
138                 },
139         }
140
141         cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
142         for _, input := range cmd.inputs {
143                 cmd.mapped[input] = map[tileLibRef]tileVariantID{}
144         }
145
146         var wg sync.WaitGroup
147         for _, input := range cmd.inputs {
148                 var infile io.ReadCloser
149                 if input == "-" {
150                         infile = ioutil.NopCloser(cmd.stdin)
151                 } else {
152                         var err error
153                         infile, err = os.Open(input)
154                         if err != nil {
155                                 return err
156                         }
157                         defer infile.Close()
158                 }
159                 wg.Add(1)
160                 go func(input string) {
161                         defer wg.Done()
162                         log.Printf("%s: reading", input)
163                         err := cmd.tilelib.LoadGob(ctx, infile, nil)
164                         if err != nil {
165                                 cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
166                                 cancel()
167                                 return
168                         }
169                         err = infile.Close()
170                         if err != nil {
171                                 cmd.setError(fmt.Errorf("%s: error closing input file: %w", input, err))
172                                 cancel()
173                                 return
174                         }
175                         log.Printf("%s: done", input)
176                 }(input)
177         }
178         wg.Wait()
179         go close(cmd.errs)
180         if err := <-cmd.errs; err != nil {
181                 return err
182         }
183         log.Print("flushing")
184         err := w.Flush()
185         if err != nil {
186                 return err
187         }
188         return nil
189 }