Ask for preemptible instances.
[lightning.git] / merge.go
1 package lightning
2
3 import (
4         "bufio"
5         "context"
6         "encoding/gob"
7         "errors"
8         "flag"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         _ "net/http/pprof"
14         "os"
15         "strings"
16         "sync"
17
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "github.com/klauspost/pgzip"
20         log "github.com/sirupsen/logrus"
21 )
22
23 type merger struct {
24         stdin   io.Reader
25         inputs  []string
26         output  io.Writer
27         tagSet  [][]byte
28         tilelib *tileLibrary
29         mapped  map[string]map[tileLibRef]tileVariantID
30         mtxTags sync.Mutex
31         errs    chan error
32 }
33
34 func (cmd *merger) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
35         var err error
36         defer func() {
37                 if err != nil {
38                         fmt.Fprintf(stderr, "%s\n", err)
39                 }
40         }()
41         flags := flag.NewFlagSet("", flag.ContinueOnError)
42         flags.SetOutput(stderr)
43         pprof := flags.String("pprof", "", "serve Go profile data at http://`[addr]:port`")
44         runlocal := flags.Bool("local", false, "run on local host (default: run in an arvados container)")
45         projectUUID := flags.String("project", "", "project `UUID` for output data")
46         priority := flags.Int("priority", 500, "container request priority")
47         outputFilename := flags.String("o", "-", "output `file`")
48         err = flags.Parse(args)
49         if err == flag.ErrHelp {
50                 err = nil
51                 return 0
52         } else if err != nil {
53                 return 2
54         }
55         cmd.stdin = stdin
56         cmd.inputs = flags.Args()
57
58         if *pprof != "" {
59                 go func() {
60                         log.Println(http.ListenAndServe(*pprof, nil))
61                 }()
62         }
63
64         if !*runlocal {
65                 if *outputFilename != "-" {
66                         err = errors.New("cannot specify output file in container mode: not implemented")
67                         return 1
68                 }
69                 runner := arvadosContainerRunner{
70                         Name:        "lightning merge",
71                         Client:      arvados.NewClientFromEnv(),
72                         ProjectUUID: *projectUUID,
73                         RAM:         250000000000,
74                         VCPUs:       16,
75                         Priority:    *priority,
76                         APIAccess:   true,
77                         KeepCache:   1,
78                 }
79                 for i := range cmd.inputs {
80                         err = runner.TranslatePaths(&cmd.inputs[i])
81                         if err != nil {
82                                 return 1
83                         }
84                 }
85                 runner.Args = append([]string{"merge", "-local=true",
86                         "-o", "/mnt/output/library.gob.gz",
87                 }, cmd.inputs...)
88                 var output string
89                 output, err = runner.Run()
90                 if err != nil {
91                         return 1
92                 }
93                 fmt.Fprintln(stdout, output+"/library.gob.gz")
94                 return 0
95         }
96
97         var outf, outw io.WriteCloser
98         if *outputFilename == "-" {
99                 outw = nopCloser{stdout}
100         } else {
101                 outf, err = os.OpenFile(*outputFilename, os.O_CREATE|os.O_WRONLY, 0777)
102                 if err != nil {
103                         return 1
104                 }
105                 defer outf.Close()
106                 if strings.HasSuffix(*outputFilename, ".gz") {
107                         outw = pgzip.NewWriter(outf)
108                 } else {
109                         outw = nopCloser{outf}
110                 }
111         }
112         bufw := bufio.NewWriterSize(outw, 64*1024*1024)
113         cmd.output = bufw
114         err = cmd.doMerge()
115         if err != nil {
116                 return 1
117         }
118         err = bufw.Flush()
119         if err != nil {
120                 return 1
121         }
122         err = outw.Close()
123         if err != nil {
124                 return 1
125         }
126         if outf != nil {
127                 err = outf.Close()
128                 if err != nil {
129                         return 1
130                 }
131         }
132         return 0
133 }
134
135 func (cmd *merger) setError(err error) {
136         select {
137         case cmd.errs <- err:
138         default:
139         }
140 }
141
142 func (cmd *merger) doMerge() error {
143         w := bufio.NewWriter(cmd.output)
144         encoder := gob.NewEncoder(w)
145
146         ctx, cancel := context.WithCancel(context.Background())
147         defer cancel()
148
149         cmd.errs = make(chan error, 1)
150         cmd.tilelib = &tileLibrary{
151                 encoder:       encoder,
152                 retainNoCalls: true,
153         }
154
155         cmd.mapped = map[string]map[tileLibRef]tileVariantID{}
156         for _, input := range cmd.inputs {
157                 cmd.mapped[input] = map[tileLibRef]tileVariantID{}
158         }
159
160         var wg sync.WaitGroup
161         for _, input := range cmd.inputs {
162                 rdr := ioutil.NopCloser(cmd.stdin)
163                 if input != "-" {
164                         var err error
165                         rdr, err = open(input)
166                         if err != nil {
167                                 return err
168                         }
169                         defer rdr.Close()
170                 }
171                 rdr = ioutil.NopCloser(bufio.NewReaderSize(rdr, 8*1024*1024))
172                 wg.Add(1)
173                 go func(input string) {
174                         defer wg.Done()
175                         log.Printf("%s: reading", input)
176                         err := cmd.tilelib.LoadGob(ctx, rdr, strings.HasSuffix(input, ".gz"), nil)
177                         if err != nil {
178                                 cmd.setError(fmt.Errorf("%s: load failed: %w", input, err))
179                                 cancel()
180                                 return
181                         }
182                         log.Printf("%s: done", input)
183                 }(input)
184         }
185         wg.Wait()
186         go close(cmd.errs)
187         if err := <-cmd.errs; err != nil {
188                 return err
189         }
190         log.Print("flushing")
191         err := w.Flush()
192         if err != nil {
193                 return err
194         }
195         return nil
196 }