Write hgvs-based numpy matrix.
[lightning.git] / arvados.go
1 package lightning
2
3 import (
4         "bufio"
5         "context"
6         "encoding/json"
7         "errors"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/url"
12         "os"
13         "regexp"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/cmd"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
21         "git.arvados.org/arvados.git/sdk/go/keepclient"
22         "github.com/klauspost/pgzip"
23         log "github.com/sirupsen/logrus"
24         "golang.org/x/crypto/blake2b"
25         "golang.org/x/net/websocket"
26 )
27
28 type eventMessage struct {
29         Status     int
30         ObjectUUID string `json:"object_uuid"`
31         EventType  string `json:"event_type"`
32         Properties struct {
33                 Text string
34         }
35 }
36
37 type arvadosClient struct {
38         *arvados.Client
39         notifying map[string]map[chan<- eventMessage]int
40         wantClose chan struct{}
41         wsconn    *websocket.Conn
42         mtx       sync.Mutex
43 }
44
45 // Listen for events concerning the given uuids. When an event occurs
46 // (and after connecting/reconnecting to the event stream), send each
47 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
48 // be sent only once for each update, but two Unsubscribe calls will
49 // be needed to stop sending them.
50 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
51         client.mtx.Lock()
52         defer client.mtx.Unlock()
53         if client.notifying == nil {
54                 client.notifying = map[string]map[chan<- eventMessage]int{}
55                 client.wantClose = make(chan struct{})
56                 go client.runNotifier()
57         }
58         chmap := client.notifying[uuid]
59         if chmap == nil {
60                 chmap = map[chan<- eventMessage]int{}
61                 client.notifying[uuid] = chmap
62         }
63         needSub := true
64         for _, nch := range chmap {
65                 if nch > 0 {
66                         needSub = false
67                         break
68                 }
69         }
70         chmap[ch]++
71         if needSub && client.wsconn != nil {
72                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
73                         "method": "subscribe",
74                         "filters": [][]interface{}{
75                                 {"object_uuid", "=", uuid},
76                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
77                         },
78                 })
79         }
80 }
81
82 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
83         client.mtx.Lock()
84         defer client.mtx.Unlock()
85         chmap := client.notifying[uuid]
86         if n := chmap[ch] - 1; n == 0 {
87                 delete(chmap, ch)
88                 if len(chmap) == 0 {
89                         delete(client.notifying, uuid)
90                 }
91                 if client.wsconn != nil {
92                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
93                                 "method": "unsubscribe",
94                                 "filters": [][]interface{}{
95                                         {"object_uuid", "=", uuid},
96                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
97                                 },
98                         })
99                 }
100         } else if n > 0 {
101                 chmap[ch] = n
102         }
103 }
104
105 func (client *arvadosClient) Close() {
106         client.mtx.Lock()
107         defer client.mtx.Unlock()
108         if client.notifying != nil {
109                 client.notifying = nil
110                 close(client.wantClose)
111         }
112 }
113
114 func (client *arvadosClient) runNotifier() {
115 reconnect:
116         for {
117                 var cluster arvados.Cluster
118                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
119                 if err != nil {
120                         log.Warnf("error getting cluster config: %s", err)
121                         time.Sleep(5 * time.Second)
122                         continue reconnect
123                 }
124                 wsURL := cluster.Services.Websocket.ExternalURL
125                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
126                 wsURL.Path = "/websocket"
127                 wsURLNoToken := wsURL.String()
128                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
129                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
130                 if err != nil {
131                         log.Warnf("websocket connection error: %s", err)
132                         time.Sleep(5 * time.Second)
133                         continue reconnect
134                 }
135                 log.Printf("connected to websocket at %s", wsURLNoToken)
136
137                 client.mtx.Lock()
138                 client.wsconn = conn
139                 resubscribe := make([]string, 0, len(client.notifying))
140                 for uuid := range client.notifying {
141                         resubscribe = append(resubscribe, uuid)
142                 }
143                 client.mtx.Unlock()
144
145                 go func() {
146                         w := json.NewEncoder(conn)
147                         for _, uuid := range resubscribe {
148                                 w.Encode(map[string]interface{}{
149                                         "method": "subscribe",
150                                         "filters": [][]interface{}{
151                                                 {"object_uuid", "=", uuid},
152                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
153                                         },
154                                 })
155                         }
156                 }()
157
158                 r := json.NewDecoder(conn)
159                 for {
160                         var msg eventMessage
161                         err := r.Decode(&msg)
162                         select {
163                         case <-client.wantClose:
164                                 return
165                         default:
166                                 if err != nil {
167                                         log.Printf("error decoding websocket message: %s", err)
168                                         client.mtx.Lock()
169                                         client.wsconn = nil
170                                         client.mtx.Unlock()
171                                         go conn.Close()
172                                         continue reconnect
173                                 }
174                                 client.mtx.Lock()
175                                 for ch := range client.notifying[msg.ObjectUUID] {
176                                         ch <- msg
177                                 }
178                                 client.mtx.Unlock()
179                         }
180                 }
181         }
182 }
183
184 var refreshTicker = time.NewTicker(5 * time.Second)
185
186 type arvadosContainerRunner struct {
187         Client      *arvados.Client
188         Name        string
189         OutputName  string
190         ProjectUUID string
191         APIAccess   bool
192         VCPUs       int
193         RAM         int64
194         Prog        string // if empty, run /proc/self/exe
195         Args        []string
196         Mounts      map[string]map[string]interface{}
197         Priority    int
198         KeepCache   int // cache buffers per VCPU (0 for default)
199 }
200
201 func (runner *arvadosContainerRunner) Run() (string, error) {
202         return runner.RunContext(context.Background())
203 }
204
205 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
206         if runner.ProjectUUID == "" {
207                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
208         }
209
210         mounts := map[string]map[string]interface{}{
211                 "/mnt/output": {
212                         "kind":     "collection",
213                         "writable": true,
214                 },
215         }
216         for path, mnt := range runner.Mounts {
217                 mounts[path] = mnt
218         }
219
220         prog := runner.Prog
221         if prog == "" {
222                 prog = "/mnt/cmd/lightning"
223                 cmdUUID, err := runner.makeCommandCollection()
224                 if err != nil {
225                         return "", err
226                 }
227                 mounts["/mnt/cmd"] = map[string]interface{}{
228                         "kind": "collection",
229                         "uuid": cmdUUID,
230                 }
231         }
232         command := append([]string{prog}, runner.Args...)
233
234         priority := runner.Priority
235         if priority < 1 {
236                 priority = 500
237         }
238         keepCache := runner.KeepCache
239         if keepCache < 1 {
240                 keepCache = 2
241         }
242         rc := arvados.RuntimeConstraints{
243                 API:          &runner.APIAccess,
244                 VCPUs:        runner.VCPUs,
245                 RAM:          runner.RAM,
246                 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
247         }
248         outname := &runner.OutputName
249         if *outname == "" {
250                 outname = nil
251         }
252         var cr arvados.ContainerRequest
253         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
254                 "container_request": map[string]interface{}{
255                         "owner_uuid":          runner.ProjectUUID,
256                         "name":                runner.Name,
257                         "container_image":     "lightning-runtime",
258                         "command":             command,
259                         "mounts":              mounts,
260                         "use_existing":        true,
261                         "output_path":         "/mnt/output",
262                         "output_name":         outname,
263                         "runtime_constraints": rc,
264                         "priority":            runner.Priority,
265                         "state":               arvados.ContainerRequestStateCommitted,
266                         "scheduling_parameters": arvados.SchedulingParameters{
267                                 Preemptible: true,
268                                 Partitions:  []string{},
269                         },
270                 },
271         })
272         if err != nil {
273                 return "", err
274         }
275         log.Printf("container request UUID: %s", cr.UUID)
276         log.Printf("container UUID: %s", cr.ContainerUUID)
277
278         logch := make(chan eventMessage)
279         client := arvadosClient{Client: runner.Client}
280         defer client.Close()
281         subscribedUUID := ""
282         defer func() {
283                 if subscribedUUID != "" {
284                         log.Printf("unsubscribe container UUID: %s", subscribedUUID)
285                         client.Unsubscribe(logch, subscribedUUID)
286                 }
287         }()
288
289         neednewline := ""
290
291         lastState := cr.State
292         refreshCR := func() {
293                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
294                 if err != nil {
295                         fmt.Fprint(os.Stderr, neednewline)
296                         log.Printf("error getting container request: %s", err)
297                         return
298                 }
299                 if lastState != cr.State {
300                         fmt.Fprint(os.Stderr, neednewline)
301                         log.Printf("container request state: %s", cr.State)
302                         lastState = cr.State
303                 }
304                 if subscribedUUID != cr.ContainerUUID {
305                         fmt.Fprint(os.Stderr, neednewline)
306                         neednewline = ""
307                         if subscribedUUID != "" {
308                                 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
309                                 client.Unsubscribe(logch, subscribedUUID)
310                         }
311                         log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
312                         client.Subscribe(logch, cr.ContainerUUID)
313                         subscribedUUID = cr.ContainerUUID
314                 }
315         }
316
317         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
318 waitctr:
319         for cr.State != arvados.ContainerRequestStateFinal {
320                 select {
321                 case <-ctx.Done():
322                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
323                                 "container_request": map[string]interface{}{
324                                         "priority": 0,
325                                 },
326                         })
327                         if err != nil {
328                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
329                         }
330                         break waitctr
331                 case <-refreshTicker.C:
332                         refreshCR()
333                 case msg := <-logch:
334                         switch msg.EventType {
335                         case "update":
336                                 refreshCR()
337                         case "stderr":
338                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
339                                         if line != "" {
340                                                 fmt.Fprint(os.Stderr, neednewline)
341                                                 neednewline = ""
342                                                 log.Print(line)
343                                         }
344                                 }
345                         case "crunchstat":
346                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
347                                         mem := reCrunchstat.FindString(line)
348                                         if mem != "" {
349                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
350                                                 neednewline = "\n"
351                                         }
352                                 }
353                         }
354                 }
355         }
356         fmt.Fprint(os.Stderr, neednewline)
357
358         if err := ctx.Err(); err != nil {
359                 return "", err
360         }
361
362         var c arvados.Container
363         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
364         if err != nil {
365                 return "", err
366         } else if c.State != arvados.ContainerStateComplete {
367                 return "", fmt.Errorf("container did not complete: %s", c.State)
368         } else if c.ExitCode != 0 {
369                 return "", fmt.Errorf("container exited %d", c.ExitCode)
370         }
371         return cr.OutputUUID, err
372 }
373
374 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
375
376 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
377         if runner.Mounts == nil {
378                 runner.Mounts = make(map[string]map[string]interface{})
379         }
380         for _, path := range paths {
381                 if *path == "" || *path == "-" {
382                         continue
383                 }
384                 m := collectionInPathRe.FindStringSubmatch(*path)
385                 if m == nil {
386                         return fmt.Errorf("cannot find uuid in path: %q", *path)
387                 }
388                 uuid := m[2]
389                 mnt, ok := runner.Mounts["/mnt/"+uuid]
390                 if !ok {
391                         mnt = map[string]interface{}{
392                                 "kind": "collection",
393                                 "uuid": uuid,
394                         }
395                         runner.Mounts["/mnt/"+uuid] = mnt
396                 }
397                 *path = "/mnt/" + uuid + m[3]
398         }
399         return nil
400 }
401
402 var mtxMakeCommandCollection sync.Mutex
403
404 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
405         mtxMakeCommandCollection.Lock()
406         defer mtxMakeCommandCollection.Unlock()
407         exe, err := ioutil.ReadFile("/proc/self/exe")
408         if err != nil {
409                 return "", err
410         }
411         b2 := blake2b.Sum256(exe)
412         cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
413         var existing arvados.CollectionList
414         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
415                 Limit: 1,
416                 Count: "none",
417                 Filters: []arvados.Filter{
418                         {Attr: "name", Operator: "=", Operand: cname},
419                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
420                         {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
421                 },
422         })
423         if err != nil {
424                 return "", err
425         }
426         if len(existing.Items) > 0 {
427                 coll := existing.Items[0]
428                 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
429                 return coll.UUID, nil
430         }
431         log.Printf("writing lightning binary to new collection %q", cname)
432         ac, err := arvadosclient.New(runner.Client)
433         if err != nil {
434                 return "", err
435         }
436         kc := keepclient.New(ac)
437         var coll arvados.Collection
438         fs, err := coll.FileSystem(runner.Client, kc)
439         if err != nil {
440                 return "", err
441         }
442         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
443         if err != nil {
444                 return "", err
445         }
446         _, err = f.Write(exe)
447         if err != nil {
448                 return "", err
449         }
450         err = f.Close()
451         if err != nil {
452                 return "", err
453         }
454         mtxt, err := fs.MarshalManifest(".")
455         if err != nil {
456                 return "", err
457         }
458         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
459                 "collection": map[string]interface{}{
460                         "owner_uuid":    runner.ProjectUUID,
461                         "manifest_text": mtxt,
462                         "name":          cname,
463                         "properties": map[string]interface{}{
464                                 "blake2b": fmt.Sprintf("%x", b2),
465                         },
466                 },
467         })
468         if err != nil {
469                 return "", err
470         }
471         log.Printf("stored lightning binary in new collection %s", coll.UUID)
472         return coll.UUID, nil
473 }
474
475 // zopen returns a reader for the given file, using the arvados API
476 // instead of arv-mount/fuse where applicable, and transparently
477 // decompressing the input if fnm ends with ".gz".
478 func zopen(fnm string) (io.ReadCloser, error) {
479         f, err := open(fnm)
480         if err != nil || !strings.HasSuffix(fnm, ".gz") {
481                 return f, err
482         }
483         rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024))
484         if err != nil {
485                 f.Close()
486                 return nil, err
487         }
488         return gzipr{rdr, f}, nil
489 }
490
491 // gzipr wraps a ReadCloser and a Closer, presenting a single Close()
492 // method that closes both wrapped objects.
493 type gzipr struct {
494         io.ReadCloser
495         io.Closer
496 }
497
498 func (gr gzipr) Close() error {
499         e1 := gr.ReadCloser.Close()
500         e2 := gr.Closer.Close()
501         if e1 != nil {
502                 return e1
503         }
504         return e2
505 }
506
507 var (
508         arvadosClientFromEnv = arvados.NewClientFromEnv()
509         keepClient           *keepclient.KeepClient
510         siteFS               arvados.CustomFileSystem
511         siteFSMtx            sync.Mutex
512 )
513
514 func open(fnm string) (io.ReadCloser, error) {
515         if os.Getenv("ARVADOS_API_HOST") == "" {
516                 return os.Open(fnm)
517         }
518         m := collectionInPathRe.FindStringSubmatch(fnm)
519         if m == nil {
520                 return os.Open(fnm)
521         }
522         uuid := m[2]
523         mnt := "/mnt/" + uuid + "/"
524         if !strings.HasPrefix(fnm, mnt) {
525                 return os.Open(fnm)
526         }
527
528         siteFSMtx.Lock()
529         defer siteFSMtx.Unlock()
530         if siteFS == nil {
531                 log.Info("setting up Arvados client")
532                 ac, err := arvadosclient.New(arvadosClientFromEnv)
533                 if err != nil {
534                         return nil, err
535                 }
536                 ac.Client = arvados.DefaultSecureClient
537                 keepClient = keepclient.New(ac)
538                 // Don't use keepclient's default short timeouts.
539                 keepClient.HTTPClient = arvados.DefaultSecureClient
540                 keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4}
541                 siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient)
542         } else {
543                 keepClient.BlockCache.MaxBlocks++
544         }
545
546         log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid)
547         return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):])
548 }