Bump slice-numpy memory.
[lightning.git] / arvados.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/url"
16         "os"
17         "regexp"
18         "strings"
19         "sync"
20         "time"
21
22         "git.arvados.org/arvados.git/lib/cmd"
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
25         "git.arvados.org/arvados.git/sdk/go/keepclient"
26         "github.com/klauspost/pgzip"
27         log "github.com/sirupsen/logrus"
28         "golang.org/x/crypto/blake2b"
29         "golang.org/x/net/websocket"
30 )
31
32 type eventMessage struct {
33         Status     int
34         ObjectUUID string `json:"object_uuid"`
35         EventType  string `json:"event_type"`
36         Properties struct {
37                 Text string
38         }
39 }
40
41 type arvadosClient struct {
42         *arvados.Client
43         notifying map[string]map[chan<- eventMessage]int
44         wantClose chan struct{}
45         wsconn    *websocket.Conn
46         mtx       sync.Mutex
47 }
48
49 // Listen for events concerning the given uuids. When an event occurs
50 // (and after connecting/reconnecting to the event stream), send each
51 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
52 // be sent only once for each update, but two Unsubscribe calls will
53 // be needed to stop sending them.
54 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
55         client.mtx.Lock()
56         defer client.mtx.Unlock()
57         if client.notifying == nil {
58                 client.notifying = map[string]map[chan<- eventMessage]int{}
59                 client.wantClose = make(chan struct{})
60                 go client.runNotifier()
61         }
62         chmap := client.notifying[uuid]
63         if chmap == nil {
64                 chmap = map[chan<- eventMessage]int{}
65                 client.notifying[uuid] = chmap
66         }
67         needSub := true
68         for _, nch := range chmap {
69                 if nch > 0 {
70                         needSub = false
71                         break
72                 }
73         }
74         chmap[ch]++
75         if needSub && client.wsconn != nil {
76                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
77                         "method": "subscribe",
78                         "filters": [][]interface{}{
79                                 {"object_uuid", "=", uuid},
80                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
81                         },
82                 })
83         }
84 }
85
86 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
87         client.mtx.Lock()
88         defer client.mtx.Unlock()
89         chmap := client.notifying[uuid]
90         if n := chmap[ch] - 1; n == 0 {
91                 delete(chmap, ch)
92                 if len(chmap) == 0 {
93                         delete(client.notifying, uuid)
94                 }
95                 if client.wsconn != nil {
96                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
97                                 "method": "unsubscribe",
98                                 "filters": [][]interface{}{
99                                         {"object_uuid", "=", uuid},
100                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
101                                 },
102                         })
103                 }
104         } else if n > 0 {
105                 chmap[ch] = n
106         }
107 }
108
109 func (client *arvadosClient) Close() {
110         client.mtx.Lock()
111         defer client.mtx.Unlock()
112         if client.notifying != nil {
113                 client.notifying = nil
114                 close(client.wantClose)
115         }
116 }
117
118 func (client *arvadosClient) runNotifier() {
119 reconnect:
120         for {
121                 var cluster arvados.Cluster
122                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
123                 if err != nil {
124                         log.Warnf("error getting cluster config: %s", err)
125                         time.Sleep(5 * time.Second)
126                         continue reconnect
127                 }
128                 wsURL := cluster.Services.Websocket.ExternalURL
129                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
130                 wsURL.Path = "/websocket"
131                 wsURLNoToken := wsURL.String()
132                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
133                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
134                 if err != nil {
135                         log.Warnf("websocket connection error: %s", err)
136                         time.Sleep(5 * time.Second)
137                         continue reconnect
138                 }
139                 log.Printf("connected to websocket at %s", wsURLNoToken)
140
141                 client.mtx.Lock()
142                 client.wsconn = conn
143                 resubscribe := make([]string, 0, len(client.notifying))
144                 for uuid := range client.notifying {
145                         resubscribe = append(resubscribe, uuid)
146                 }
147                 client.mtx.Unlock()
148
149                 go func() {
150                         w := json.NewEncoder(conn)
151                         for _, uuid := range resubscribe {
152                                 w.Encode(map[string]interface{}{
153                                         "method": "subscribe",
154                                         "filters": [][]interface{}{
155                                                 {"object_uuid", "=", uuid},
156                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
157                                         },
158                                 })
159                         }
160                 }()
161
162                 r := json.NewDecoder(conn)
163                 for {
164                         var msg eventMessage
165                         err := r.Decode(&msg)
166                         select {
167                         case <-client.wantClose:
168                                 return
169                         default:
170                                 if err != nil {
171                                         log.Printf("error decoding websocket message: %s", err)
172                                         client.mtx.Lock()
173                                         client.wsconn = nil
174                                         client.mtx.Unlock()
175                                         go conn.Close()
176                                         continue reconnect
177                                 }
178                                 client.mtx.Lock()
179                                 for ch := range client.notifying[msg.ObjectUUID] {
180                                         ch <- msg
181                                 }
182                                 client.mtx.Unlock()
183                         }
184                 }
185         }
186 }
187
188 var refreshTicker = time.NewTicker(5 * time.Second)
189
190 type arvadosContainerRunner struct {
191         Client      *arvados.Client
192         Name        string
193         OutputName  string
194         ProjectUUID string
195         APIAccess   bool
196         VCPUs       int
197         RAM         int64
198         Prog        string // if empty, run /proc/self/exe
199         Args        []string
200         Mounts      map[string]map[string]interface{}
201         Priority    int
202         KeepCache   int // cache buffers per VCPU (0 for default)
203 }
204
205 func (runner *arvadosContainerRunner) Run() (string, error) {
206         return runner.RunContext(context.Background())
207 }
208
209 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
210         if runner.ProjectUUID == "" {
211                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
212         }
213
214         mounts := map[string]map[string]interface{}{
215                 "/mnt/output": {
216                         "kind":     "collection",
217                         "writable": true,
218                 },
219         }
220         for path, mnt := range runner.Mounts {
221                 mounts[path] = mnt
222         }
223
224         prog := runner.Prog
225         if prog == "" {
226                 prog = "/mnt/cmd/lightning"
227                 cmdUUID, err := runner.makeCommandCollection()
228                 if err != nil {
229                         return "", err
230                 }
231                 mounts["/mnt/cmd"] = map[string]interface{}{
232                         "kind": "collection",
233                         "uuid": cmdUUID,
234                 }
235         }
236         command := append([]string{prog}, runner.Args...)
237
238         priority := runner.Priority
239         if priority < 1 {
240                 priority = 500
241         }
242         keepCache := runner.KeepCache
243         if keepCache < 1 {
244                 keepCache = 2
245         }
246         rc := arvados.RuntimeConstraints{
247                 API:          &runner.APIAccess,
248                 VCPUs:        runner.VCPUs,
249                 RAM:          runner.RAM,
250                 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
251         }
252         outname := &runner.OutputName
253         if *outname == "" {
254                 outname = nil
255         }
256         var cr arvados.ContainerRequest
257         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
258                 "container_request": map[string]interface{}{
259                         "owner_uuid":          runner.ProjectUUID,
260                         "name":                runner.Name,
261                         "container_image":     "lightning-runtime",
262                         "command":             command,
263                         "mounts":              mounts,
264                         "use_existing":        true,
265                         "output_path":         "/mnt/output",
266                         "output_name":         outname,
267                         "runtime_constraints": rc,
268                         "priority":            runner.Priority,
269                         "state":               arvados.ContainerRequestStateCommitted,
270                         "scheduling_parameters": arvados.SchedulingParameters{
271                                 Preemptible: true,
272                                 Partitions:  []string{},
273                         },
274                         "environment": map[string]string{
275                                 "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs),
276                         },
277                 },
278         })
279         if err != nil {
280                 return "", err
281         }
282         log.Printf("container request UUID: %s", cr.UUID)
283         log.Printf("container UUID: %s", cr.ContainerUUID)
284
285         logch := make(chan eventMessage)
286         client := arvadosClient{Client: runner.Client}
287         defer client.Close()
288         subscribedUUID := ""
289         defer func() {
290                 if subscribedUUID != "" {
291                         log.Printf("unsubscribe container UUID: %s", subscribedUUID)
292                         client.Unsubscribe(logch, subscribedUUID)
293                 }
294         }()
295
296         neednewline := ""
297
298         lastState := cr.State
299         refreshCR := func() {
300                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
301                 if err != nil {
302                         fmt.Fprint(os.Stderr, neednewline)
303                         log.Printf("error getting container request: %s", err)
304                         return
305                 }
306                 if lastState != cr.State {
307                         fmt.Fprint(os.Stderr, neednewline)
308                         log.Printf("container request state: %s", cr.State)
309                         lastState = cr.State
310                 }
311                 if subscribedUUID != cr.ContainerUUID {
312                         fmt.Fprint(os.Stderr, neednewline)
313                         neednewline = ""
314                         if subscribedUUID != "" {
315                                 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
316                                 client.Unsubscribe(logch, subscribedUUID)
317                         }
318                         log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
319                         client.Subscribe(logch, cr.ContainerUUID)
320                         subscribedUUID = cr.ContainerUUID
321                 }
322         }
323
324         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
325 waitctr:
326         for cr.State != arvados.ContainerRequestStateFinal {
327                 select {
328                 case <-ctx.Done():
329                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
330                                 "container_request": map[string]interface{}{
331                                         "priority": 0,
332                                 },
333                         })
334                         if err != nil {
335                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
336                         }
337                         break waitctr
338                 case <-refreshTicker.C:
339                         refreshCR()
340                 case msg := <-logch:
341                         switch msg.EventType {
342                         case "update":
343                                 refreshCR()
344                         case "stderr":
345                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
346                                         if line != "" {
347                                                 fmt.Fprint(os.Stderr, neednewline)
348                                                 neednewline = ""
349                                                 log.Print(line)
350                                         }
351                                 }
352                         case "crunchstat":
353                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
354                                         mem := reCrunchstat.FindString(line)
355                                         if mem != "" {
356                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
357                                                 neednewline = "\n"
358                                         }
359                                 }
360                         }
361                 }
362         }
363         fmt.Fprint(os.Stderr, neednewline)
364
365         if err := ctx.Err(); err != nil {
366                 return "", err
367         }
368
369         var c arvados.Container
370         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
371         if err != nil {
372                 return "", err
373         } else if c.State != arvados.ContainerStateComplete {
374                 return "", fmt.Errorf("container did not complete: %s", c.State)
375         } else if c.ExitCode != 0 {
376                 return "", fmt.Errorf("container exited %d", c.ExitCode)
377         }
378         return cr.OutputUUID, err
379 }
380
381 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
382
383 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
384         if runner.Mounts == nil {
385                 runner.Mounts = make(map[string]map[string]interface{})
386         }
387         for _, path := range paths {
388                 if *path == "" || *path == "-" {
389                         continue
390                 }
391                 m := collectionInPathRe.FindStringSubmatch(*path)
392                 if m == nil {
393                         return fmt.Errorf("cannot find uuid in path: %q", *path)
394                 }
395                 collID := m[2]
396                 mnt, ok := runner.Mounts["/mnt/"+collID]
397                 if !ok {
398                         mnt = map[string]interface{}{
399                                 "kind": "collection",
400                         }
401                         if len(collID) == 27 {
402                                 mnt["uuid"] = collID
403                         } else {
404                                 mnt["portable_data_hash"] = collID
405                         }
406                         runner.Mounts["/mnt/"+collID] = mnt
407                 }
408                 *path = "/mnt/" + collID + m[3]
409         }
410         return nil
411 }
412
413 var mtxMakeCommandCollection sync.Mutex
414
415 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
416         mtxMakeCommandCollection.Lock()
417         defer mtxMakeCommandCollection.Unlock()
418         exe, err := ioutil.ReadFile("/proc/self/exe")
419         if err != nil {
420                 return "", err
421         }
422         b2 := blake2b.Sum256(exe)
423         cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
424         var existing arvados.CollectionList
425         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
426                 Limit: 1,
427                 Count: "none",
428                 Filters: []arvados.Filter{
429                         {Attr: "name", Operator: "=", Operand: cname},
430                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
431                         {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
432                 },
433         })
434         if err != nil {
435                 return "", err
436         }
437         if len(existing.Items) > 0 {
438                 coll := existing.Items[0]
439                 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
440                 return coll.UUID, nil
441         }
442         log.Printf("writing lightning binary to new collection %q", cname)
443         ac, err := arvadosclient.New(runner.Client)
444         if err != nil {
445                 return "", err
446         }
447         kc := keepclient.New(ac)
448         var coll arvados.Collection
449         fs, err := coll.FileSystem(runner.Client, kc)
450         if err != nil {
451                 return "", err
452         }
453         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
454         if err != nil {
455                 return "", err
456         }
457         _, err = f.Write(exe)
458         if err != nil {
459                 return "", err
460         }
461         err = f.Close()
462         if err != nil {
463                 return "", err
464         }
465         mtxt, err := fs.MarshalManifest(".")
466         if err != nil {
467                 return "", err
468         }
469         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
470                 "collection": map[string]interface{}{
471                         "owner_uuid":    runner.ProjectUUID,
472                         "manifest_text": mtxt,
473                         "name":          cname,
474                         "properties": map[string]interface{}{
475                                 "blake2b": fmt.Sprintf("%x", b2),
476                         },
477                 },
478         })
479         if err != nil {
480                 return "", err
481         }
482         log.Printf("stored lightning binary in new collection %s", coll.UUID)
483         return coll.UUID, nil
484 }
485
486 // zopen returns a reader for the given file, using the arvados API
487 // instead of arv-mount/fuse where applicable, and transparently
488 // decompressing the input if fnm ends with ".gz".
489 func zopen(fnm string) (io.ReadCloser, error) {
490         f, err := open(fnm)
491         if err != nil || !strings.HasSuffix(fnm, ".gz") {
492                 return f, err
493         }
494         rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024))
495         if err != nil {
496                 f.Close()
497                 return nil, err
498         }
499         return gzipr{rdr, f}, nil
500 }
501
502 // gzipr wraps a ReadCloser and a Closer, presenting a single Close()
503 // method that closes both wrapped objects.
504 type gzipr struct {
505         io.ReadCloser
506         io.Closer
507 }
508
509 func (gr gzipr) Close() error {
510         e1 := gr.ReadCloser.Close()
511         e2 := gr.Closer.Close()
512         if e1 != nil {
513                 return e1
514         }
515         return e2
516 }
517
518 var (
519         arvadosClientFromEnv = arvados.NewClientFromEnv()
520         keepClient           *keepclient.KeepClient
521         siteFS               arvados.CustomFileSystem
522         siteFSMtx            sync.Mutex
523 )
524
525 type file interface {
526         io.ReadCloser
527         io.Seeker
528         Readdir(n int) ([]os.FileInfo, error)
529 }
530
531 func open(fnm string) (file, error) {
532         if os.Getenv("ARVADOS_API_HOST") == "" {
533                 return os.Open(fnm)
534         }
535         m := collectionInPathRe.FindStringSubmatch(fnm)
536         if m == nil {
537                 return os.Open(fnm)
538         }
539         collectionUUID := m[2]
540         collectionPath := m[3]
541
542         siteFSMtx.Lock()
543         defer siteFSMtx.Unlock()
544         if siteFS == nil {
545                 log.Info("setting up Arvados client")
546                 ac, err := arvadosclient.New(arvadosClientFromEnv)
547                 if err != nil {
548                         return nil, err
549                 }
550                 ac.Client = arvados.DefaultSecureClient
551                 keepClient = keepclient.New(ac)
552                 // Don't use keepclient's default short timeouts.
553                 keepClient.HTTPClient = arvados.DefaultSecureClient
554                 keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4}
555                 siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient)
556         } else {
557                 keepClient.BlockCache.MaxBlocks += 2
558         }
559
560         log.Infof("reading %q from %s using Arvados client", collectionPath, collectionUUID)
561         f, err := siteFS.Open("by_id/" + collectionUUID + collectionPath)
562         if err != nil {
563                 return nil, err
564         }
565         return &reduceCacheOnClose{file: f}, nil
566 }
567
568 type reduceCacheOnClose struct {
569         file
570         once sync.Once
571 }
572
573 func (rc *reduceCacheOnClose) Close() error {
574         rc.once.Do(func() { keepClient.BlockCache.MaxBlocks -= 2 })
575         return rc.file.Close()
576 }