19995: Add -max-frequency filter.
[lightning.git] / arvados.go
1 // Copyright (C) The Lightning Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package lightning
6
7 import (
8         "bufio"
9         "context"
10         "encoding/json"
11         "errors"
12         "fmt"
13         "io"
14         "io/ioutil"
15         "net/url"
16         "os"
17         "regexp"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.arvados.org/arvados.git/lib/cmd"
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
26         "git.arvados.org/arvados.git/sdk/go/keepclient"
27         "github.com/klauspost/pgzip"
28         log "github.com/sirupsen/logrus"
29         "golang.org/x/crypto/blake2b"
30         "golang.org/x/net/websocket"
31 )
32
33 type eventMessage struct {
34         Status     int
35         ObjectUUID string `json:"object_uuid"`
36         EventType  string `json:"event_type"`
37         Properties struct {
38                 Text string
39         }
40 }
41
42 type arvadosClient struct {
43         *arvados.Client
44         notifying map[string]map[chan<- eventMessage]int
45         wantClose chan struct{}
46         wsconn    *websocket.Conn
47         mtx       sync.Mutex
48 }
49
50 // Listen for events concerning the given uuids. When an event occurs
51 // (and after connecting/reconnecting to the event stream), send each
52 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
53 // be sent only once for each update, but two Unsubscribe calls will
54 // be needed to stop sending them.
55 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
56         client.mtx.Lock()
57         defer client.mtx.Unlock()
58         if client.notifying == nil {
59                 client.notifying = map[string]map[chan<- eventMessage]int{}
60                 client.wantClose = make(chan struct{})
61                 go client.runNotifier()
62         }
63         chmap := client.notifying[uuid]
64         if chmap == nil {
65                 chmap = map[chan<- eventMessage]int{}
66                 client.notifying[uuid] = chmap
67         }
68         needSub := true
69         for _, nch := range chmap {
70                 if nch > 0 {
71                         needSub = false
72                         break
73                 }
74         }
75         chmap[ch]++
76         if needSub && client.wsconn != nil {
77                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
78                         "method": "subscribe",
79                         "filters": [][]interface{}{
80                                 {"object_uuid", "=", uuid},
81                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
82                         },
83                 })
84         }
85 }
86
87 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
88         client.mtx.Lock()
89         defer client.mtx.Unlock()
90         chmap := client.notifying[uuid]
91         if n := chmap[ch] - 1; n == 0 {
92                 delete(chmap, ch)
93                 if len(chmap) == 0 {
94                         delete(client.notifying, uuid)
95                 }
96                 if client.wsconn != nil {
97                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
98                                 "method": "unsubscribe",
99                                 "filters": [][]interface{}{
100                                         {"object_uuid", "=", uuid},
101                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
102                                 },
103                         })
104                 }
105         } else if n > 0 {
106                 chmap[ch] = n
107         }
108 }
109
110 func (client *arvadosClient) Close() {
111         client.mtx.Lock()
112         defer client.mtx.Unlock()
113         if client.notifying != nil {
114                 client.notifying = nil
115                 close(client.wantClose)
116         }
117 }
118
119 func (client *arvadosClient) runNotifier() {
120 reconnect:
121         for {
122                 var cluster arvados.Cluster
123                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
124                 if err != nil {
125                         log.Warnf("error getting cluster config: %s", err)
126                         time.Sleep(5 * time.Second)
127                         continue reconnect
128                 }
129                 wsURL := cluster.Services.Websocket.ExternalURL
130                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
131                 wsURL.Path = "/websocket"
132                 wsURLNoToken := wsURL.String()
133                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
134                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
135                 if err != nil {
136                         log.Warnf("websocket connection error: %s", err)
137                         time.Sleep(5 * time.Second)
138                         continue reconnect
139                 }
140                 log.Printf("connected to websocket at %s", wsURLNoToken)
141
142                 client.mtx.Lock()
143                 client.wsconn = conn
144                 resubscribe := make([]string, 0, len(client.notifying))
145                 for uuid := range client.notifying {
146                         resubscribe = append(resubscribe, uuid)
147                 }
148                 client.mtx.Unlock()
149
150                 go func() {
151                         w := json.NewEncoder(conn)
152                         for _, uuid := range resubscribe {
153                                 w.Encode(map[string]interface{}{
154                                         "method": "subscribe",
155                                         "filters": [][]interface{}{
156                                                 {"object_uuid", "=", uuid},
157                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
158                                         },
159                                 })
160                         }
161                 }()
162
163                 r := json.NewDecoder(conn)
164                 for {
165                         var msg eventMessage
166                         err := r.Decode(&msg)
167                         select {
168                         case <-client.wantClose:
169                                 return
170                         default:
171                                 if err != nil {
172                                         log.Printf("error decoding websocket message: %s", err)
173                                         client.mtx.Lock()
174                                         client.wsconn = nil
175                                         client.mtx.Unlock()
176                                         go conn.Close()
177                                         continue reconnect
178                                 }
179                                 client.mtx.Lock()
180                                 for ch := range client.notifying[msg.ObjectUUID] {
181                                         go func() { ch <- msg }()
182                                 }
183                                 client.mtx.Unlock()
184                         }
185                 }
186         }
187 }
188
189 var refreshTicker = time.NewTicker(5 * time.Second)
190
191 type arvadosContainerRunner struct {
192         Client      *arvados.Client
193         Name        string
194         OutputName  string
195         ProjectUUID string
196         APIAccess   bool
197         VCPUs       int
198         RAM         int64
199         Prog        string // if empty, run /proc/self/exe
200         Args        []string
201         Mounts      map[string]map[string]interface{}
202         Priority    int
203         KeepCache   int // cache buffers per VCPU (0 for default)
204         Preemptible bool
205 }
206
207 func (runner *arvadosContainerRunner) Run() (string, error) {
208         return runner.RunContext(context.Background())
209 }
210
211 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
212         if runner.ProjectUUID == "" {
213                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
214         }
215
216         mounts := map[string]map[string]interface{}{
217                 "/mnt/output": {
218                         "kind":     "collection",
219                         "writable": true,
220                 },
221         }
222         for path, mnt := range runner.Mounts {
223                 mounts[path] = mnt
224         }
225
226         prog := runner.Prog
227         if prog == "" {
228                 prog = "/mnt/cmd/lightning"
229                 cmdUUID, err := runner.makeCommandCollection()
230                 if err != nil {
231                         return "", err
232                 }
233                 mounts["/mnt/cmd"] = map[string]interface{}{
234                         "kind": "collection",
235                         "uuid": cmdUUID,
236                 }
237         }
238         command := append([]string{prog}, runner.Args...)
239
240         priority := runner.Priority
241         if priority < 1 {
242                 priority = 500
243         }
244         keepCache := runner.KeepCache
245         if keepCache < 1 {
246                 keepCache = 2
247         }
248         rc := arvados.RuntimeConstraints{
249                 API:          runner.APIAccess,
250                 VCPUs:        runner.VCPUs,
251                 RAM:          runner.RAM,
252                 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
253         }
254         outname := &runner.OutputName
255         if *outname == "" {
256                 outname = nil
257         }
258         var cr arvados.ContainerRequest
259         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
260                 "container_request": map[string]interface{}{
261                         "owner_uuid":          runner.ProjectUUID,
262                         "name":                runner.Name,
263                         "container_image":     "lightning-runtime",
264                         "command":             command,
265                         "mounts":              mounts,
266                         "use_existing":        true,
267                         "output_path":         "/mnt/output",
268                         "output_name":         outname,
269                         "runtime_constraints": rc,
270                         "priority":            runner.Priority,
271                         "state":               arvados.ContainerRequestStateCommitted,
272                         "scheduling_parameters": arvados.SchedulingParameters{
273                                 Preemptible: runner.Preemptible,
274                                 Partitions:  []string{},
275                         },
276                         "environment": map[string]string{
277                                 "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs),
278                         },
279                         "container_count_max": 1,
280                 },
281         })
282         if err != nil {
283                 return "", err
284         }
285         log.Printf("container request UUID: %s", cr.UUID)
286         log.Printf("container UUID: %s", cr.ContainerUUID)
287
288         logch := make(chan eventMessage)
289         client := arvadosClient{Client: runner.Client}
290         defer client.Close()
291         subscribedUUID := ""
292         defer func() {
293                 if subscribedUUID != "" {
294                         log.Printf("unsubscribe container UUID: %s", subscribedUUID)
295                         client.Unsubscribe(logch, subscribedUUID)
296                 }
297         }()
298
299         neednewline := ""
300
301         lastState := cr.State
302         refreshCR := func() {
303                 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Minute))
304                 defer cancel()
305                 err = runner.Client.RequestAndDecodeContext(ctx, &cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
306                 if err != nil {
307                         fmt.Fprint(os.Stderr, neednewline)
308                         neednewline = ""
309                         log.Printf("error getting container request: %s", err)
310                         return
311                 }
312                 if lastState != cr.State {
313                         fmt.Fprint(os.Stderr, neednewline)
314                         neednewline = ""
315                         log.Printf("container request state: %s", cr.State)
316                         lastState = cr.State
317                 }
318                 if subscribedUUID != cr.ContainerUUID {
319                         fmt.Fprint(os.Stderr, neednewline)
320                         neednewline = ""
321                         if subscribedUUID != "" {
322                                 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
323                                 client.Unsubscribe(logch, subscribedUUID)
324                         }
325                         log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
326                         client.Subscribe(logch, cr.ContainerUUID)
327                         subscribedUUID = cr.ContainerUUID
328                 }
329         }
330
331         var reCrunchstat = regexp.MustCompile(`mem .* (\d+) rss`)
332 waitctr:
333         for cr.State != arvados.ContainerRequestStateFinal {
334                 select {
335                 case <-ctx.Done():
336                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
337                                 "container_request": map[string]interface{}{
338                                         "priority": 0,
339                                 },
340                         })
341                         if err != nil {
342                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
343                         }
344                         break waitctr
345                 case <-refreshTicker.C:
346                         refreshCR()
347                 case msg := <-logch:
348                         switch msg.EventType {
349                         case "update":
350                                 refreshCR()
351                         case "stderr":
352                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
353                                         if line != "" {
354                                                 fmt.Fprint(os.Stderr, neednewline)
355                                                 neednewline = ""
356                                                 log.Print(line)
357                                         }
358                                 }
359                         case "crunchstat":
360                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
361                                         m := reCrunchstat.FindStringSubmatch(line)
362                                         if m != nil {
363                                                 rss, _ := strconv.ParseInt(m[1], 10, 64)
364                                                 fmt.Fprintf(os.Stderr, "%s rss %.3f GB           \r", cr.UUID, float64(rss)/1e9)
365                                                 neednewline = "\n"
366                                         }
367                                 }
368                         }
369                 }
370         }
371         fmt.Fprint(os.Stderr, neednewline)
372
373         if err := ctx.Err(); err != nil {
374                 return "", err
375         }
376
377         var c arvados.Container
378         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
379         if err != nil {
380                 return "", err
381         } else if c.State != arvados.ContainerStateComplete {
382                 return "", fmt.Errorf("container did not complete: %s", c.State)
383         } else if c.ExitCode != 0 {
384                 return "", fmt.Errorf("container exited %d", c.ExitCode)
385         }
386         return cr.OutputUUID, err
387 }
388
389 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
390
391 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
392         if runner.Mounts == nil {
393                 runner.Mounts = make(map[string]map[string]interface{})
394         }
395         for _, path := range paths {
396                 if *path == "" || *path == "-" {
397                         continue
398                 }
399                 m := collectionInPathRe.FindStringSubmatch(*path)
400                 if m == nil {
401                         return fmt.Errorf("cannot find uuid in path: %q", *path)
402                 }
403                 collID := m[2]
404                 mnt, ok := runner.Mounts["/mnt/"+collID]
405                 if !ok {
406                         mnt = map[string]interface{}{
407                                 "kind": "collection",
408                         }
409                         if len(collID) == 27 {
410                                 mnt["uuid"] = collID
411                         } else {
412                                 mnt["portable_data_hash"] = collID
413                         }
414                         runner.Mounts["/mnt/"+collID] = mnt
415                 }
416                 *path = "/mnt/" + collID + m[3]
417         }
418         return nil
419 }
420
421 var mtxMakeCommandCollection sync.Mutex
422
423 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
424         mtxMakeCommandCollection.Lock()
425         defer mtxMakeCommandCollection.Unlock()
426         exe, err := ioutil.ReadFile("/proc/self/exe")
427         if err != nil {
428                 return "", err
429         }
430         b2 := blake2b.Sum256(exe)
431         cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
432         var existing arvados.CollectionList
433         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
434                 Limit: 1,
435                 Count: "none",
436                 Filters: []arvados.Filter{
437                         {Attr: "name", Operator: "=", Operand: cname},
438                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
439                         {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
440                 },
441         })
442         if err != nil {
443                 return "", err
444         }
445         if len(existing.Items) > 0 {
446                 coll := existing.Items[0]
447                 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
448                 return coll.UUID, nil
449         }
450         log.Printf("writing lightning binary to new collection %q", cname)
451         ac, err := arvadosclient.New(runner.Client)
452         if err != nil {
453                 return "", err
454         }
455         kc := keepclient.New(ac)
456         var coll arvados.Collection
457         fs, err := coll.FileSystem(runner.Client, kc)
458         if err != nil {
459                 return "", err
460         }
461         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
462         if err != nil {
463                 return "", err
464         }
465         _, err = f.Write(exe)
466         if err != nil {
467                 return "", err
468         }
469         err = f.Close()
470         if err != nil {
471                 return "", err
472         }
473         mtxt, err := fs.MarshalManifest(".")
474         if err != nil {
475                 return "", err
476         }
477         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
478                 "collection": map[string]interface{}{
479                         "owner_uuid":    runner.ProjectUUID,
480                         "manifest_text": mtxt,
481                         "name":          cname,
482                         "properties": map[string]interface{}{
483                                 "blake2b": fmt.Sprintf("%x", b2),
484                         },
485                 },
486         })
487         if err != nil {
488                 return "", err
489         }
490         log.Printf("stored lightning binary in new collection %s", coll.UUID)
491         return coll.UUID, nil
492 }
493
494 // zopen returns a reader for the given file, using the arvados API
495 // instead of arv-mount/fuse where applicable, and transparently
496 // decompressing the input if fnm ends with ".gz".
497 func zopen(fnm string) (io.ReadCloser, error) {
498         f, err := open(fnm)
499         if err != nil || !strings.HasSuffix(fnm, ".gz") {
500                 return f, err
501         }
502         rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024))
503         if err != nil {
504                 f.Close()
505                 return nil, err
506         }
507         return gzipr{rdr, f}, nil
508 }
509
510 // gzipr wraps a ReadCloser and a Closer, presenting a single Close()
511 // method that closes both wrapped objects.
512 type gzipr struct {
513         io.ReadCloser
514         io.Closer
515 }
516
517 func (gr gzipr) Close() error {
518         e1 := gr.ReadCloser.Close()
519         e2 := gr.Closer.Close()
520         if e1 != nil {
521                 return e1
522         }
523         return e2
524 }
525
526 var (
527         arvadosClientFromEnv = arvados.NewClientFromEnv()
528         keepClient           *keepclient.KeepClient
529         siteFS               arvados.CustomFileSystem
530         siteFSMtx            sync.Mutex
531 )
532
533 type file interface {
534         io.ReadCloser
535         io.Seeker
536         Readdir(n int) ([]os.FileInfo, error)
537 }
538
539 func open(fnm string) (file, error) {
540         if os.Getenv("ARVADOS_API_HOST") == "" {
541                 return os.Open(fnm)
542         }
543         m := collectionInPathRe.FindStringSubmatch(fnm)
544         if m == nil {
545                 return os.Open(fnm)
546         }
547         collectionUUID := m[2]
548         collectionPath := m[3]
549
550         siteFSMtx.Lock()
551         defer siteFSMtx.Unlock()
552         if siteFS == nil {
553                 log.Info("setting up Arvados client")
554                 ac, err := arvadosclient.New(arvadosClientFromEnv)
555                 if err != nil {
556                         return nil, err
557                 }
558                 ac.Client = arvados.DefaultSecureClient
559                 keepClient = keepclient.New(ac)
560                 // Don't use keepclient's default short timeouts.
561                 keepClient.HTTPClient = arvados.DefaultSecureClient
562                 keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4}
563                 siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient)
564         } else {
565                 keepClient.BlockCache.MaxBlocks += 2
566         }
567
568         log.Infof("reading %q from %s using Arvados client", collectionPath, collectionUUID)
569         f, err := siteFS.Open("by_id/" + collectionUUID + collectionPath)
570         if err != nil {
571                 return nil, err
572         }
573         return &reduceCacheOnClose{file: f}, nil
574 }
575
576 type reduceCacheOnClose struct {
577         file
578         once sync.Once
579 }
580
581 func (rc *reduceCacheOnClose) Close() error {
582         rc.once.Do(func() { keepClient.BlockCache.MaxBlocks -= 2 })
583         return rc.file.Close()
584 }