Gzip gob files.
[lightning.git] / merge.go
1 package main
2
3 import (
4         "bufio"
5         "compress/gzip"
6         "context"
7         "encoding/gob"
8         "errors"
9         "flag"
10         "fmt"
11         "io"
12         "io/ioutil"
13         "net/http"
14         _ "net/http/pprof"
15         "os"
16         "strings"
17         "sync"
18
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         log "github.com/sirupsen/logrus"
21 )
22
23 type merger struct {
24         stdin   io.Reader
25         inputs  []string
26         output  io.WriteCloser
27         tagSet  [][]byte
28         tilelib *tileLibrary
29         mapped  map[string]map[tileLibRef]tileVariantID
30         mtxTags sync.Mutex
31         errs    chan error
32 }
33
34 func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
35         var err error
36         defer func() {
37                 if err != nil {
38                         fmt.Fprintf(stderr, "%s\n", err)
39                 }
40         }()
41         flags := flag.NewFlagSet("", flag.ContinueOnError)
42         flags.SetOutput(stderr)
43         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
44         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
45         projectUUID := flags.String("project", "", "project `UUID` for output data")
46         priority := flags.Int("priority", 500, "container request priority")
47         outputFilename := flags.String("o", "-", "output `file`")
48         err = flags.Parse(args)
49         if err == flag.ErrHelp {
50                 err = nil
51                 return 0
52         } else if err != nil {
53                 return 2
54         }
55         cmd.stdin = stdin
56         cmd.inputs = flags.Args()
57
58         if *pprof != "" {
59                 go func() {
60                         log.Println(http.ListenAndServe(*pprof, nil))
61                 }()
62         }
63
64         if !*runlocal {
65                 if *outputFilename != "-" {
66                         err = errors.New("cannot specify output file in container mode: not implemented")
67                         return 1
68                 }
69                 runner := arvadosContainerRunner{
70                         Name:        "lightning filter",
71                         Client:      arvados.NewClientFromEnv(),
72                         ProjectUUID: *projectUUID,
73                         RAM:         64000000000,
74                         VCPUs:       2,
75                         Priority:    *priority,
76                 }
77                 for i := range cmd.inputs {
78                         err = runner.TranslatePaths(&cmd.inputs[i])
79                         if err != nil {
80                                 return 1
81                         }
82                 }
83                 runner.Args = append([]string{"merge", "-local=true",
84                         "-o", "/mnt/output/library.gob.gz",
85                 }, cmd.inputs...)
86                 var output string
87                 output, err = runner.Run()
88                 if err != nil {
89                         return 1
90                 }
91                 fmt.Fprintln(stdout, output+"/library.gob.gz")
92                 return 0
93         }
94
95         var outf, outw io.WriteCloser
96         if *outputFilename == "-" {
97                 outw = nopCloser{stdout}
98         } else {
99                 outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
100                 if err != nil {
101                         return 1
102                 }
103                 defer outf.Close()
104                 if strings.HasSuffix(*outputFilename, ".gz") {
105                         outw = gzip.NewWriter(outf)
106                 } else {
107                         outw = outf
108                 }
109         }
110         cmd.output = outw
111         err = cmd.doMerge()
112         if err != nil {
113                 return 1
114         }
115         err = outw.Close()
116         if err != nil {
117                 return 1
118         }
119         if outf != nil && outf != outw {
120                 err = outf.Close()
121                 if err != nil {
122                         return 1
123                 }
124         }
125         return 0
126 }
127
128 func (cmd *merger) setError(err error) {
129         select {
130         case cmd.errs <- err:
131         default:
132         }
133 }
134
135 func (cmd *merger) doMerge() error {
136         w := bufio.NewWriter(cmd.output)
137         encoder := gob.NewEncoder(w)
138
139         ctx, cancel := context.WithCancel(context.Background())
140         defer cancel()
141
142         cmd.errs = make(chan error, 1)
143         cmd.tilelib = &tileLibrary{
144                 encoder:       encoder,
145                 retainNoCalls: true,
146         }
147
148         cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
149         for _, input := range cmd.inputs {
150                 cmd.mapped[input] = map[tileLibRef]tileVariantID{}
151         }
152
153         var wg sync.WaitGroup
154         for _, input := range cmd.inputs {
155                 var infile io.ReadCloser
156                 if input == "-" {
157                         infile = ioutil.NopCloser(cmd.stdin)
158                 } else {
159                         var err error
160                         infile, err = os.Open(input)
161                         if err != nil {
162                                 return err
163                         }
164                         defer infile.Close()
165                 }
166                 wg.Add(1)
167                 go func(input string) {
168                         defer wg.Done()
169                         log.Printf("%s: reading", input)
170                         err := cmd.tilelib.LoadGob(ctx, infile, strings.HasSuffix(input, ".gz"), nil)
171                         if err != nil {
172                                 cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
173                                 cancel()
174                                 return
175                         }
176                         log.Printf("%s: done", input)
177                 }(input)
178         }
179         wg.Wait()
180         go close(cmd.errs)
181         if err := <-cmd.errs; err != nil {
182                 return err
183         }
184         log.Print("flushing")
185         err := w.Flush()
186         if err != nil {
187                 return err
188         }
189         return nil
190 }