Faster merge output.
[lightning.git] / merge.go
1 package main
2
3 import (
4         "bufio"
5         "context"
6         "encoding/gob"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         _ "net/http/pprof"
14         "os"
15         "strings"
16         "sync"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "github.com/klauspost/pgzip"
20         log "github.com/sirupsen/logrus"
21 )
22
23 type merger struct {
24         stdin   io.Reader
25         inputs  []string
26         output  io.Writer
27         tagSet  [][]byte
28         tilelib *tileLibrary
29         mapped  map[string]map[tileLibRef]tileVariantID
30         mtxTags sync.Mutex
31         errs    chan error
32 }
33
34 func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
35         var err error
36         defer func() {
37                 if err != nil {
38                         fmt.Fprintf(stderr, "%s\n", err)
39                 }
40         }()
41         flags := flag.NewFlagSet("", flag.ContinueOnError)
42         flags.SetOutput(stderr)
43         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
44         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
45         projectUUID := flags.String("project", "", "project `UUID` for output data")
46         priority := flags.Int("priority", 500, "container request priority")
47         outputFilename := flags.String("o", "-", "output `file`")
48         err = flags.Parse(args)
49         if err == flag.ErrHelp {
50                 err = nil
51                 return 0
52         } else if err != nil {
53                 return 2
54         }
55         cmd.stdin = stdin
56         cmd.inputs = flags.Args()
57
58         if *pprof != "" {
59                 go func() {
60                         log.Println(http.ListenAndServe(*pprof, nil))
61                 }()
62         }
63
64         if !*runlocal {
65                 if *outputFilename != "-" {
66                         err = errors.New("cannot specify output file in container mode: not implemented")
67                         return 1
68                 }
69                 runner := arvadosContainerRunner{
70                         Name:        "lightning merge",
71                         Client:      arvados.NewClientFromEnv(),
72                         ProjectUUID: *projectUUID,
73                         RAM:         150000000000,
74                         VCPUs:       16,
75                         Priority:    *priority,
76                         APIAccess:   true,
77                 }
78                 for i := range cmd.inputs {
79                         err = runner.TranslatePaths(&cmd.inputs[i])
80                         if err != nil {
81                                 return 1
82                         }
83                 }
84                 runner.Args = append([]string{"merge", "-local=true",
85                         "-o", "/mnt/output/library.gob.gz",
86                 }, cmd.inputs...)
87                 var output string
88                 output, err = runner.Run()
89                 if err != nil {
90                         return 1
91                 }
92                 fmt.Fprintln(stdout, output+"/library.gob.gz")
93                 return 0
94         }
95
96         var outf, outw io.WriteCloser
97         if *outputFilename == "-" {
98                 outw = nopCloser{stdout}
99         } else {
100                 outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
101                 if err != nil {
102                         return 1
103                 }
104                 defer outf.Close()
105                 if strings.HasSuffix(*outputFilename, ".gz") {
106                         outw = pgzip.NewWriter(outf)
107                 } else {
108                         outw = nopCloser{outf}
109                 }
110         }
111         bufw := bufio.NewWriterSize(outw, 64*1024*1024)
112         cmd.output = bufw
113         err = cmd.doMerge()
114         if err != nil {
115                 return 1
116         }
117         err = bufw.Flush()
118         if err != nil {
119                 return 1
120         }
121         err = outw.Close()
122         if err != nil {
123                 return 1
124         }
125         if outf != nil {
126                 err = outf.Close()
127                 if err != nil {
128                         return 1
129                 }
130         }
131         return 0
132 }
133
134 func (cmd *merger) setError(err error) {
135         select {
136         case cmd.errs <- err:
137         default:
138         }
139 }
140
141 func (cmd *merger) doMerge() error {
142         w := bufio.NewWriter(cmd.output)
143         encoder := gob.NewEncoder(w)
144
145         ctx, cancel := context.WithCancel(context.Background())
146         defer cancel()
147
148         cmd.errs = make(chan error, 1)
149         cmd.tilelib = &tileLibrary{
150                 encoder:       encoder,
151                 retainNoCalls: true,
152         }
153
154         cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
155         for _, input := range cmd.inputs {
156                 cmd.mapped[input] = map[tileLibRef]tileVariantID{}
157         }
158
159         var wg sync.WaitGroup
160         for _, input := range cmd.inputs {
161                 rdr := ioutil.NopCloser(cmd.stdin)
162                 if input != "-" {
163                         var err error
164                         rdr, err = open(input)
165                         if err != nil {
166                                 return err
167                         }
168                         defer rdr.Close()
169                 }
170                 rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024))
171                 wg.Add(1)
172                 go func(input string) {
173                         defer wg.Done()
174                         log.Printf("%s: reading", input)
175                         err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"), nil)
176                         if err != nil {
177                                 cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
178                                 cancel()
179                                 return
180                         }
181                         log.Printf("%s: done", input)
182                 }(input)
183         }
184         wg.Wait()
185         go close(cmd.errs)
186         if err := <-cmd.errs; err != nil {
187                 return err
188         }
189         log.Print("flushing")
190         err := w.Flush()
191         if err != nil {
192                 return err
193         }
194         return nil
195 }