Add merge command.
[lightning.git] / merge.go
1 package main
2
3 import (
4         "bufio"
5         "bytes"
6         "encoding/gob"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         _ "net/http/pprof"
14         "os"
15         "sync"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         log "github.com/sirupsen/logrus"
19 )
20
21 type merger struct {
22         stdin   io.Reader
23         inputs  []string
24         output  io.WriteCloser
25         tagSet  [][]byte
26         tilelib *tileLibrary
27         mapped  map[string]map[tileLibRef]tileVariantID
28         todo    []liftCompactGenome
29         mtxTags sync.Mutex
30         errs    chan error
31 }
32
33 func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
34         var err error
35         defer func() {
36                 if err != nil {
37                         fmt.Fprintf(stderr, "%s\n", err)
38                 }
39         }()
40         flags := flag.NewFlagSet("", flag.ContinueOnError)
41         flags.SetOutput(stderr)
42         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
43         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
44         projectUUID := flags.String("project", "", "project `UUID` for output data")
45         priority := flags.Int("priority", 500, "container request priority")
46         outputFilename := flags.String("o", "-", "output `file`")
47         err = flags.Parse(args)
48         if err == flag.ErrHelp {
49                 err = nil
50                 return 0
51         } else if err != nil {
52                 return 2
53         }
54         cmd.stdin = stdin
55         cmd.inputs = flags.Args()
56
57         if *pprof != "" {
58                 go func() {
59                         log.Println(http.ListenAndServe(*pprof, nil))
60                 }()
61         }
62
63         if !*runlocal {
64                 if *outputFilename != "-" {
65                         err = errors.New("cannot specify output file in container mode: not implemented")
66                         return 1
67                 }
68                 runner := arvadosContainerRunner{
69                         Name:        "lightning filter",
70                         Client:      arvados.NewClientFromEnv(),
71                         ProjectUUID: *projectUUID,
72                         RAM:         64000000000,
73                         VCPUs:       2,
74                         Priority:    *priority,
75                 }
76                 for i := range cmd.inputs {
77                         err = runner.TranslatePaths(&cmd.inputs[i])
78                         if err != nil {
79                                 return 1
80                         }
81                 }
82                 runner.Args = append([]string{"merge", "-local=true",
83                         "-o", "/mnt/output/library.gob",
84                 }, cmd.inputs...)
85                 var output string
86                 output, err = runner.Run()
87                 if err != nil {
88                         return 1
89                 }
90                 fmt.Fprintln(stdout, output+"/library.gob")
91                 return 0
92         }
93
94         if *outputFilename == "-" {
95                 cmd.output = nopCloser{stdout}
96         } else {
97                 cmd.output, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
98                 if err != nil {
99                         return 1
100                 }
101                 defer cmd.output.Close()
102         }
103
104         err = cmd.doMerge()
105         if err != nil {
106                 return 1
107         }
108         err = cmd.output.Close()
109         if err != nil {
110                 return 1
111         }
112         return 0
113 }
114
115 func (cmd *merger) mergeLibraryEntry(ent *LibraryEntry, src string) error {
116         mapped := cmd.mapped[src]
117         if len(cmd.errs) > 0 {
118                 return errors.New("stopping after error in other goroutine")
119         }
120         if len(ent.TagSet) > 0 {
121                 // We don't need the tagset to do a merge, but if it
122                 // does appear in the input, we (a) output it once,
123                 // and (b) do a sanity check, erroring out if the
124                 // inputs have different tagsets.
125                 cmd.mtxTags.Lock()
126                 defer cmd.mtxTags.Unlock()
127                 if len(cmd.tagSet) == 0 {
128                         cmd.tagSet = ent.TagSet
129                         if cmd.tilelib.encoder != nil {
130                                 go cmd.tilelib.encoder.Encode(LibraryEntry{
131                                         TagSet: cmd.tagSet,
132                                 })
133                         }
134                 } else if len(cmd.tagSet) != len(ent.TagSet) {
135                         return fmt.Errorf("cannot merge libraries with differing tagsets")
136                 } else {
137                         for i := range ent.TagSet {
138                                 if !bytes.Equal(ent.TagSet[i], cmd.tagSet[i]) {
139                                         return fmt.Errorf("cannot merge libraries with differing tagsets")
140                                 }
141                         }
142                 }
143         }
144         for _, tv := range ent.TileVariants {
145                 // Assign a new variant ID (unique across all inputs)
146                 // for each input variant.
147                 mapped[tileLibRef{tag: tv.Tag, variant: tv.Variant}] = cmd.tilelib.getRef(tv.Tag, tv.Sequence).variant
148         }
149         for _, cg := range ent.CompactGenomes {
150                 cmd.todo = append(cmd.todo, liftCompactGenome{cg, mapped})
151         }
152         return nil
153 }
154
155 type liftCompactGenome struct {
156         CompactGenome
157         mapped map[tileLibRef]tileVariantID
158 }
159
160 // Translate old variant IDs to new (mapped) variant IDs.
161 func (cg liftCompactGenome) lift() error {
162         for i, variant := range cg.Variants {
163                 if variant == 0 {
164                         continue
165                 }
166                 tag := tagID(i / 2)
167                 newvariant, ok := cg.mapped[tileLibRef{tag: tag, variant: variant}]
168                 if !ok {
169                         return fmt.Errorf("oops: ent.CompactGenomes[] (%q) refs tag %d variant %d which is not in library", cg.Name, tag, variant)
170                 }
171                 cg.Variants[tag] = newvariant
172         }
173         return nil
174 }
175
176 func (cmd *merger) setError(err error) {
177         select {
178         case cmd.errs <- err:
179         default:
180         }
181 }
182
183 func (cmd *merger) doMerge() error {
184         w := bufio.NewWriter(cmd.output)
185         cmd.errs = make(chan error, 1)
186         cmd.tilelib = &tileLibrary{
187                 encoder:        gob.NewEncoder(w),
188                 includeNoCalls: true,
189         }
190
191         cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
192         for _, input := range cmd.inputs {
193                 cmd.mapped[input] = map[tileLibRef]tileVariantID{}
194         }
195
196         var wg sync.WaitGroup
197         for _, input := range cmd.inputs {
198                 var infile io.ReadCloser
199                 if input == "-" {
200                         infile = ioutil.NopCloser(cmd.stdin)
201                 } else {
202                         var err error
203                         infile, err = os.Open(input)
204                         if err != nil {
205                                 return err
206                         }
207                         defer infile.Close()
208                 }
209                 wg.Add(1)
210                 go func(input string) {
211                         defer wg.Done()
212                         log.Printf("%s: reading", input)
213                         err := DecodeLibrary(infile, func(ent *LibraryEntry) error {
214                                 return cmd.mergeLibraryEntry(ent, input)
215                         })
216                         if err != nil {
217                                 cmd.setError(fmt.Errorf("%s: decode: %w", input, err))
218                                 return
219                         }
220                         err = infile.Close()
221                         if err != nil {
222                                 cmd.setError(fmt.Errorf("%s: close: %w", input, err))
223                                 return
224                         }
225                         log.Printf("%s: done", input)
226                 }(input)
227         }
228         wg.Wait()
229         go close(cmd.errs)
230         if err := <-cmd.errs; err != nil {
231                 return err
232         }
233
234         var cgs []CompactGenome
235         for _, cg := range cmd.todo {
236                 err := cg.lift()
237                 if err != nil {
238                         return err
239                 }
240                 cgs = append(cgs, cg.CompactGenome)
241         }
242         err := cmd.tilelib.encoder.Encode(LibraryEntry{
243                 CompactGenomes: cgs,
244         })
245         if err != nil {
246                 return err
247         }
248
249         log.Print("flushing")
250         err = w.Flush()
251         if err != nil {
252                 return err
253         }
254         return nil
255 }