ec2701bcdc7c96664601351225aedb83f3b5e16a
[lightning.git] / arvados.go
1 package lightning
2
3 import (
4         "bufio"
5         "context"
6         "encoding/json"
7         "errors"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/url"
12         "os"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/cmd"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
21         "git.arvados.org/arvados.git/sdk/go/keepclient"
22         "github.com/klauspost/pgzip"
23         log "github.com/sirupsen/logrus"
24         "golang.org/x/crypto/blake2b"
25         "golang.org/x/net/websocket"
26 )
27
28 type eventMessage struct {
29         Status     int
30         ObjectUUID string `json:"object_uuid"`
31         EventType  string `json:"event_type"`
32         Properties struct {
33                 Text string
34         }
35 }
36
37 type arvadosClient struct {
38         *arvados.Client
39         notifying map[string]map[chan<- eventMessage]int
40         wantClose chan struct{}
41         wsconn    *websocket.Conn
42         mtx       sync.Mutex
43 }
44
45 // Listen for events concerning the given uuids. When an event occurs
46 // (and after connecting/reconnecting to the event stream), send each
47 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
48 // be sent only once for each update, but two Unsubscribe calls will
49 // be needed to stop sending them.
50 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
51         client.mtx.Lock()
52         defer client.mtx.Unlock()
53         if client.notifying == nil {
54                 client.notifying = map[string]map[chan<- eventMessage]int{}
55                 client.wantClose = make(chan struct{})
56                 go client.runNotifier()
57         }
58         chmap := client.notifying[uuid]
59         if chmap == nil {
60                 chmap = map[chan<- eventMessage]int{}
61                 client.notifying[uuid] = chmap
62         }
63         needSub := true
64         for _, nch := range chmap {
65                 if nch > 0 {
66                         needSub = false
67                         break
68                 }
69         }
70         chmap[ch]++
71         if needSub && client.wsconn != nil {
72                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
73                         "method": "subscribe",
74                         "filters": [][]interface{}{
75                                 {"object_uuid", "=", uuid},
76                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
77                         },
78                 })
79         }
80 }
81
82 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
83         client.mtx.Lock()
84         defer client.mtx.Unlock()
85         chmap := client.notifying[uuid]
86         if n := chmap[ch] - 1; n == 0 {
87                 delete(chmap, ch)
88                 if len(chmap) == 0 {
89                         delete(client.notifying, uuid)
90                 }
91                 if client.wsconn != nil {
92                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
93                                 "method": "unsubscribe",
94                                 "filters": [][]interface{}{
95                                         {"object_uuid", "=", uuid},
96                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
97                                 },
98                         })
99                 }
100         } else if n > 0 {
101                 chmap[ch] = n
102         }
103 }
104
105 func (client *arvadosClient) Close() {
106         client.mtx.Lock()
107         defer client.mtx.Unlock()
108         if client.notifying != nil {
109                 client.notifying = nil
110                 close(client.wantClose)
111         }
112 }
113
114 func (client *arvadosClient) runNotifier() {
115 reconnect:
116         for {
117                 var cluster arvados.Cluster
118                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
119                 if err != nil {
120                         log.Warnf("error getting cluster config: %s", err)
121                         time.Sleep(5 * time.Second)
122                         continue reconnect
123                 }
124                 wsURL := cluster.Services.Websocket.ExternalURL
125                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
126                 wsURL.Path = "/websocket"
127                 wsURLNoToken := wsURL.String()
128                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
129                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
130                 if err != nil {
131                         log.Warnf("websocket connection error: %s", err)
132                         time.Sleep(5 * time.Second)
133                         continue reconnect
134                 }
135                 log.Printf("connected to websocket at %s", wsURLNoToken)
136
137                 client.mtx.Lock()
138                 client.wsconn = conn
139                 resubscribe := make([]string, 0, len(client.notifying))
140                 for uuid := range client.notifying {
141                         resubscribe = append(resubscribe, uuid)
142                 }
143                 client.mtx.Unlock()
144
145                 go func() {
146                         w := json.NewEncoder(conn)
147                         for _, uuid := range resubscribe {
148                                 w.Encode(map[string]interface{}{
149                                         "method": "subscribe",
150                                         "filters": [][]interface{}{
151                                                 {"object_uuid", "=", uuid},
152                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
153                                         },
154                                 })
155                         }
156                 }()
157
158                 r := json.NewDecoder(conn)
159                 for {
160                         var msg eventMessage
161                         err := r.Decode(&msg)
162                         select {
163                         case <-client.wantClose:
164                                 return
165                         default:
166                                 if err != nil {
167                                         log.Printf("error decoding websocket message: %s", err)
168                                         client.mtx.Lock()
169                                         client.wsconn = nil
170                                         client.mtx.Unlock()
171                                         go conn.Close()
172                                         continue reconnect
173                                 }
174                                 client.mtx.Lock()
175                                 for ch := range client.notifying[msg.ObjectUUID] {
176                                         ch <- msg
177                                 }
178                                 client.mtx.Unlock()
179                         }
180                 }
181         }
182 }
183
184 var refreshTicker = time.NewTicker(5 * time.Second)
185
186 type arvadosContainerRunner struct {
187         Client      *arvados.Client
188         Name        string
189         OutputName  string
190         ProjectUUID string
191         APIAccess   bool
192         VCPUs       int
193         RAM         int64
194         Prog        string // if empty, run /proc/self/exe
195         Args        []string
196         Mounts      map[string]map[string]interface{}
197         Priority    int
198         KeepCache   int // cache buffers per VCPU (0 for default)
199 }
200
201 func (runner *arvadosContainerRunner) Run() (string, error) {
202         return runner.RunContext(context.Background())
203 }
204
205 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
206         if runner.ProjectUUID == "" {
207                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
208         }
209
210         mounts := map[string]map[string]interface{}{
211                 "/mnt/output": {
212                         "kind":     "collection",
213                         "writable": true,
214                 },
215         }
216         for path, mnt := range runner.Mounts {
217                 mounts[path] = mnt
218         }
219
220         prog := runner.Prog
221         if prog == "" {
222                 prog = "/mnt/cmd/lightning"
223                 cmdUUID, err := runner.makeCommandCollection()
224                 if err != nil {
225                         return "", err
226                 }
227                 mounts["/mnt/cmd"] = map[string]interface{}{
228                         "kind": "collection",
229                         "uuid": cmdUUID,
230                 }
231         }
232         command := append([]string{prog}, runner.Args...)
233
234         priority := runner.Priority
235         if priority < 1 {
236                 priority = 500
237         }
238         keepCache := runner.KeepCache
239         if keepCache < 1 {
240                 keepCache = 2
241         }
242         rc := arvados.RuntimeConstraints{
243                 API:          &runner.APIAccess,
244                 VCPUs:        runner.VCPUs,
245                 RAM:          runner.RAM,
246                 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
247         }
248         outname := &runner.OutputName
249         if *outname == "" {
250                 outname = nil
251         }
252         var cr arvados.ContainerRequest
253         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
254                 "container_request": map[string]interface{}{
255                         "owner_uuid":          runner.ProjectUUID,
256                         "name":                runner.Name,
257                         "container_image":     "lightning-runtime",
258                         "command":             command,
259                         "mounts":              mounts,
260                         "use_existing":        true,
261                         "output_path":         "/mnt/output",
262                         "output_name":         outname,
263                         "runtime_constraints": rc,
264                         "priority":            runner.Priority,
265                         "state":               arvados.ContainerRequestStateCommitted,
266                         "scheduling_parameters": arvados.SchedulingParameters{
267                                 Preemptible: true,
268                                 Partitions:  []string{},
269                         },
270                         "environment": map[string]string{
271                                 "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs),
272                         },
273                 },
274         })
275         if err != nil {
276                 return "", err
277         }
278         log.Printf("container request UUID: %s", cr.UUID)
279         log.Printf("container UUID: %s", cr.ContainerUUID)
280
281         logch := make(chan eventMessage)
282         client := arvadosClient{Client: runner.Client}
283         defer client.Close()
284         subscribedUUID := ""
285         defer func() {
286                 if subscribedUUID != "" {
287                         log.Printf("unsubscribe container UUID: %s", subscribedUUID)
288                         client.Unsubscribe(logch, subscribedUUID)
289                 }
290         }()
291
292         neednewline := ""
293
294         lastState := cr.State
295         refreshCR := func() {
296                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
297                 if err != nil {
298                         fmt.Fprint(os.Stderr, neednewline)
299                         log.Printf("error getting container request: %s", err)
300                         return
301                 }
302                 if lastState != cr.State {
303                         fmt.Fprint(os.Stderr, neednewline)
304                         log.Printf("container request state: %s", cr.State)
305                         lastState = cr.State
306                 }
307                 if subscribedUUID != cr.ContainerUUID {
308                         fmt.Fprint(os.Stderr, neednewline)
309                         neednewline = ""
310                         if subscribedUUID != "" {
311                                 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
312                                 client.Unsubscribe(logch, subscribedUUID)
313                         }
314                         log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
315                         client.Subscribe(logch, cr.ContainerUUID)
316                         subscribedUUID = cr.ContainerUUID
317                 }
318         }
319
320         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
321 waitctr:
322         for cr.State != arvados.ContainerRequestStateFinal {
323                 select {
324                 case <-ctx.Done():
325                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
326                                 "container_request": map[string]interface{}{
327                                         "priority": 0,
328                                 },
329                         })
330                         if err != nil {
331                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
332                         }
333                         break waitctr
334                 case <-refreshTicker.C:
335                         refreshCR()
336                 case msg := <-logch:
337                         switch msg.EventType {
338                         case "update":
339                                 refreshCR()
340                         case "stderr":
341                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
342                                         if line != "" {
343                                                 fmt.Fprint(os.Stderr, neednewline)
344                                                 neednewline = ""
345                                                 log.Print(line)
346                                         }
347                                 }
348                         case "crunchstat":
349                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
350                                         mem := reCrunchstat.FindString(line)
351                                         if mem != "" {
352                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
353                                                 neednewline = "\n"
354                                         }
355                                 }
356                         }
357                 }
358         }
359         fmt.Fprint(os.Stderr, neednewline)
360
361         if err := ctx.Err(); err != nil {
362                 return "", err
363         }
364
365         var c arvados.Container
366         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
367         if err != nil {
368                 return "", err
369         } else if c.State != arvados.ContainerStateComplete {
370                 return "", fmt.Errorf("container did not complete: %s", c.State)
371         } else if c.ExitCode != 0 {
372                 return "", fmt.Errorf("container exited %d", c.ExitCode)
373         }
374         return cr.OutputUUID, err
375 }
376
377 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
378
379 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
380         if runner.Mounts == nil {
381                 runner.Mounts = make(map[string]map[string]interface{})
382         }
383         for _, path := range paths {
384                 if *path == "" || *path == "-" {
385                         continue
386                 }
387                 m := collectionInPathRe.FindStringSubmatch(*path)
388                 if m == nil {
389                         return fmt.Errorf("cannot find uuid in path: %q", *path)
390                 }
391                 uuid := m[2]
392                 mnt, ok := runner.Mounts["/mnt/"+uuid]
393                 if !ok {
394                         mnt = map[string]interface{}{
395                                 "kind": "collection",
396                                 "uuid": uuid,
397                         }
398                         runner.Mounts["/mnt/"+uuid] = mnt
399                 }
400                 *path = "/mnt/" + uuid + m[3]
401         }
402         return nil
403 }
404
405 var mtxMakeCommandCollection sync.Mutex
406
407 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
408         mtxMakeCommandCollection.Lock()
409         defer mtxMakeCommandCollection.Unlock()
410         exe, err := ioutil.ReadFile("/proc/self/exe")
411         if err != nil {
412                 return "", err
413         }
414         b2 := blake2b.Sum256(exe)
415         cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
416         var existing arvados.CollectionList
417         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
418                 Limit: 1,
419                 Count: "none",
420                 Filters: []arvados.Filter{
421                         {Attr: "name", Operator: "=", Operand: cname},
422                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
423                         {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
424                 },
425         })
426         if err != nil {
427                 return "", err
428         }
429         if len(existing.Items) > 0 {
430                 coll := existing.Items[0]
431                 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
432                 return coll.UUID, nil
433         }
434         log.Printf("writing lightning binary to new collection %q", cname)
435         ac, err := arvadosclient.New(runner.Client)
436         if err != nil {
437                 return "", err
438         }
439         kc := keepclient.New(ac)
440         var coll arvados.Collection
441         fs, err := coll.FileSystem(runner.Client, kc)
442         if err != nil {
443                 return "", err
444         }
445         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
446         if err != nil {
447                 return "", err
448         }
449         _, err = f.Write(exe)
450         if err != nil {
451                 return "", err
452         }
453         err = f.Close()
454         if err != nil {
455                 return "", err
456         }
457         mtxt, err := fs.MarshalManifest(".")
458         if err != nil {
459                 return "", err
460         }
461         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
462                 "collection": map[string]interface{}{
463                         "owner_uuid":    runner.ProjectUUID,
464                         "manifest_text": mtxt,
465                         "name":          cname,
466                         "properties": map[string]interface{}{
467                                 "blake2b": fmt.Sprintf("%x", b2),
468                         },
469                 },
470         })
471         if err != nil {
472                 return "", err
473         }
474         log.Printf("stored lightning binary in new collection %s", coll.UUID)
475         return coll.UUID, nil
476 }
477
478 // zopen returns a reader for the given file, using the arvados API
479 // instead of arv-mount/fuse where applicable, and transparently
480 // decompressing the input if fnm ends with ".gz".
481 func zopen(fnm string) (io.ReadCloser, error) {
482         f, err := open(fnm)
483         if err != nil || !strings.HasSuffix(fnm, ".gz") {
484                 return f, err
485         }
486         rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024))
487         if err != nil {
488                 f.Close()
489                 return nil, err
490         }
491         return gzipr{rdr, f}, nil
492 }
493
494 // gzipr wraps a ReadCloser and a Closer, presenting a single Close()
495 // method that closes both wrapped objects.
496 type gzipr struct {
497         io.ReadCloser
498         io.Closer
499 }
500
501 func (gr gzipr) Close() error {
502         e1 := gr.ReadCloser.Close()
503         e2 := gr.Closer.Close()
504         if e1 != nil {
505                 return e1
506         }
507         return e2
508 }
509
510 var (
511         arvadosClientFromEnv = arvados.NewClientFromEnv()
512         keepClient           *keepclient.KeepClient
513         siteFS               arvados.CustomFileSystem
514         siteFSMtx            sync.Mutex
515 )
516
517 type file interface {
518         io.ReadCloser
519         Readdir(n int) ([]os.FileInfo, error)
520 }
521
522 func open(fnm string) (file, error) {
523         if os.Getenv("ARVADOS_API_HOST") == "" {
524                 return os.Open(fnm)
525         }
526         m := collectionInPathRe.FindStringSubmatch(fnm)
527         if m == nil {
528                 return os.Open(fnm)
529         }
530         uuid := m[2]
531         mnt := "/mnt/" + uuid
532         if fnm != mnt && !strings.HasPrefix(fnm, mnt+"/") {
533                 return os.Open(fnm)
534         }
535
536         siteFSMtx.Lock()
537         defer siteFSMtx.Unlock()
538         if siteFS == nil {
539                 log.Info("setting up Arvados client")
540                 ac, err := arvadosclient.New(arvadosClientFromEnv)
541                 if err != nil {
542                         return nil, err
543                 }
544                 ac.Client = arvados.DefaultSecureClient
545                 keepClient = keepclient.New(ac)
546                 // Don't use keepclient's default short timeouts.
547                 keepClient.HTTPClient = arvados.DefaultSecureClient
548                 keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4}
549                 siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient)
550         } else {
551                 keepClient.BlockCache.MaxBlocks++
552         }
553
554         log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid)
555         return siteFS.Open("by_id/" + uuid + fnm[len(mnt):])
556 }