811d512e65339bbfcade4f93121256d809b07be5
[lightning.git] / arvados.go
1 package lightning
2
3 import (
4         "bufio"
5         "context"
6         "encoding/json"
7         "errors"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "net/url"
12         "os"
13         "regexp"
14         "runtime"
15         "strings"
16         "sync"
17         "time"
18
19         "git.arvados.org/arvados.git/lib/cmd"
20         "git.arvados.org/arvados.git/sdk/go/arvados"
21         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
22         "git.arvados.org/arvados.git/sdk/go/keepclient"
23         "github.com/klauspost/pgzip"
24         log "github.com/sirupsen/logrus"
25         "golang.org/x/crypto/blake2b"
26         "golang.org/x/net/websocket"
27 )
28
29 type eventMessage struct {
30         Status     int
31         ObjectUUID string `json:"object_uuid"`
32         EventType  string `json:"event_type"`
33         Properties struct {
34                 Text string
35         }
36 }
37
38 type arvadosClient struct {
39         *arvados.Client
40         notifying map[string]map[chan<- eventMessage]int
41         wantClose chan struct{}
42         wsconn    *websocket.Conn
43         mtx       sync.Mutex
44 }
45
46 // Listen for events concerning the given uuids. When an event occurs
47 // (and after connecting/reconnecting to the event stream), send each
48 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
49 // be sent only once for each update, but two Unsubscribe calls will
50 // be needed to stop sending them.
51 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
52         client.mtx.Lock()
53         defer client.mtx.Unlock()
54         if client.notifying == nil {
55                 client.notifying = map[string]map[chan<- eventMessage]int{}
56                 client.wantClose = make(chan struct{})
57                 go client.runNotifier()
58         }
59         chmap := client.notifying[uuid]
60         if chmap == nil {
61                 chmap = map[chan<- eventMessage]int{}
62                 client.notifying[uuid] = chmap
63         }
64         needSub := true
65         for _, nch := range chmap {
66                 if nch > 0 {
67                         needSub = false
68                         break
69                 }
70         }
71         chmap[ch]++
72         if needSub && client.wsconn != nil {
73                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
74                         "method": "subscribe",
75                         "filters": [][]interface{}{
76                                 {"object_uuid", "=", uuid},
77                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
78                         },
79                 })
80         }
81 }
82
83 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
84         client.mtx.Lock()
85         defer client.mtx.Unlock()
86         chmap := client.notifying[uuid]
87         if n := chmap[ch] - 1; n == 0 {
88                 delete(chmap, ch)
89                 if len(chmap) == 0 {
90                         delete(client.notifying, uuid)
91                 }
92                 if client.wsconn != nil {
93                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
94                                 "method": "unsubscribe",
95                                 "filters": [][]interface{}{
96                                         {"object_uuid", "=", uuid},
97                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
98                                 },
99                         })
100                 }
101         } else if n > 0 {
102                 chmap[ch] = n
103         }
104 }
105
106 func (client *arvadosClient) Close() {
107         client.mtx.Lock()
108         defer client.mtx.Unlock()
109         if client.notifying != nil {
110                 client.notifying = nil
111                 close(client.wantClose)
112         }
113 }
114
115 func (client *arvadosClient) runNotifier() {
116 reconnect:
117         for {
118                 var cluster arvados.Cluster
119                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
120                 if err != nil {
121                         log.Warnf("error getting cluster config: %s", err)
122                         time.Sleep(5 * time.Second)
123                         continue reconnect
124                 }
125                 wsURL := cluster.Services.Websocket.ExternalURL
126                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
127                 wsURL.Path = "/websocket"
128                 wsURLNoToken := wsURL.String()
129                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
130                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
131                 if err != nil {
132                         log.Warnf("websocket connection error: %s", err)
133                         time.Sleep(5 * time.Second)
134                         continue reconnect
135                 }
136                 log.Printf("connected to websocket at %s", wsURLNoToken)
137
138                 client.mtx.Lock()
139                 client.wsconn = conn
140                 resubscribe := make([]string, 0, len(client.notifying))
141                 for uuid := range client.notifying {
142                         resubscribe = append(resubscribe, uuid)
143                 }
144                 client.mtx.Unlock()
145
146                 go func() {
147                         w := json.NewEncoder(conn)
148                         for _, uuid := range resubscribe {
149                                 w.Encode(map[string]interface{}{
150                                         "method": "subscribe",
151                                         "filters": [][]interface{}{
152                                                 {"object_uuid", "=", uuid},
153                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
154                                         },
155                                 })
156                         }
157                 }()
158
159                 r := json.NewDecoder(conn)
160                 for {
161                         var msg eventMessage
162                         err := r.Decode(&msg)
163                         select {
164                         case <-client.wantClose:
165                                 return
166                         default:
167                                 if err != nil {
168                                         log.Printf("error decoding websocket message: %s", err)
169                                         client.mtx.Lock()
170                                         client.wsconn = nil
171                                         client.mtx.Unlock()
172                                         go conn.Close()
173                                         continue reconnect
174                                 }
175                                 client.mtx.Lock()
176                                 for ch := range client.notifying[msg.ObjectUUID] {
177                                         ch <- msg
178                                 }
179                                 client.mtx.Unlock()
180                         }
181                 }
182         }
183 }
184
185 var refreshTicker = time.NewTicker(5 * time.Second)
186
187 type arvadosContainerRunner struct {
188         Client      *arvados.Client
189         Name        string
190         OutputName  string
191         ProjectUUID string
192         APIAccess   bool
193         VCPUs       int
194         RAM         int64
195         Prog        string // if empty, run /proc/self/exe
196         Args        []string
197         Mounts      map[string]map[string]interface{}
198         Priority    int
199         KeepCache   int // cache buffers per VCPU (0 for default)
200 }
201
202 func (runner *arvadosContainerRunner) Run() (string, error) {
203         return runner.RunContext(context.Background())
204 }
205
206 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
207         if runner.ProjectUUID == "" {
208                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
209         }
210
211         mounts := map[string]map[string]interface{}{
212                 "/mnt/output": {
213                         "kind":     "collection",
214                         "writable": true,
215                 },
216         }
217         for path, mnt := range runner.Mounts {
218                 mounts[path] = mnt
219         }
220
221         prog := runner.Prog
222         if prog == "" {
223                 prog = "/mnt/cmd/lightning"
224                 cmdUUID, err := runner.makeCommandCollection()
225                 if err != nil {
226                         return "", err
227                 }
228                 mounts["/mnt/cmd"] = map[string]interface{}{
229                         "kind": "collection",
230                         "uuid": cmdUUID,
231                 }
232         }
233         command := append([]string{prog}, runner.Args...)
234
235         priority := runner.Priority
236         if priority < 1 {
237                 priority = 500
238         }
239         keepCache := runner.KeepCache
240         if keepCache < 1 {
241                 keepCache = 2
242         }
243         rc := arvados.RuntimeConstraints{
244                 API:          &runner.APIAccess,
245                 VCPUs:        runner.VCPUs,
246                 RAM:          runner.RAM,
247                 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
248         }
249         outname := &runner.OutputName
250         if *outname == "" {
251                 outname = nil
252         }
253         var cr arvados.ContainerRequest
254         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
255                 "container_request": map[string]interface{}{
256                         "owner_uuid":          runner.ProjectUUID,
257                         "name":                runner.Name,
258                         "container_image":     "lightning-runtime",
259                         "command":             command,
260                         "mounts":              mounts,
261                         "use_existing":        true,
262                         "output_path":         "/mnt/output",
263                         "output_name":         outname,
264                         "runtime_constraints": rc,
265                         "priority":            runner.Priority,
266                         "state":               arvados.ContainerRequestStateCommitted,
267                         "scheduling_parameters": arvados.SchedulingParameters{
268                                 Preemptible: true,
269                                 Partitions:  []string{},
270                         },
271                 },
272         })
273         if err != nil {
274                 return "", err
275         }
276         log.Printf("container request UUID: %s", cr.UUID)
277         log.Printf("container UUID: %s", cr.ContainerUUID)
278
279         logch := make(chan eventMessage)
280         client := arvadosClient{Client: runner.Client}
281         defer client.Close()
282         subscribedUUID := ""
283         defer func() {
284                 if subscribedUUID != "" {
285                         client.Unsubscribe(logch, subscribedUUID)
286                 }
287         }()
288
289         neednewline := ""
290
291         lastState := cr.State
292         refreshCR := func() {
293                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
294                 if err != nil {
295                         fmt.Fprint(os.Stderr, neednewline)
296                         log.Printf("error getting container request: %s", err)
297                         return
298                 }
299                 if lastState != cr.State {
300                         fmt.Fprint(os.Stderr, neednewline)
301                         log.Printf("container request state: %s", cr.State)
302                         lastState = cr.State
303                 }
304                 if subscribedUUID != cr.ContainerUUID {
305                         fmt.Fprint(os.Stderr, neednewline)
306                         neednewline = ""
307                         if subscribedUUID != "" {
308                                 client.Unsubscribe(logch, subscribedUUID)
309                         }
310                         client.Subscribe(logch, cr.ContainerUUID)
311                         subscribedUUID = cr.ContainerUUID
312                 }
313         }
314
315         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
316 waitctr:
317         for cr.State != arvados.ContainerRequestStateFinal {
318                 select {
319                 case <-ctx.Done():
320                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
321                                 "container_request": map[string]interface{}{
322                                         "priority": 0,
323                                 },
324                         })
325                         if err != nil {
326                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
327                         }
328                         break waitctr
329                 case <-refreshTicker.C:
330                         refreshCR()
331                 case msg := <-logch:
332                         switch msg.EventType {
333                         case "update":
334                                 refreshCR()
335                         case "stderr":
336                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
337                                         if line != "" {
338                                                 fmt.Fprint(os.Stderr, neednewline)
339                                                 neednewline = ""
340                                                 log.Print(line)
341                                         }
342                                 }
343                         case "crunchstat":
344                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
345                                         mem := reCrunchstat.FindString(line)
346                                         if mem != "" {
347                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
348                                                 neednewline = "\n"
349                                         }
350                                 }
351                         }
352                 }
353         }
354         fmt.Fprint(os.Stderr, neednewline)
355
356         if err := ctx.Err(); err != nil {
357                 return "", err
358         }
359
360         var c arvados.Container
361         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
362         if err != nil {
363                 return "", err
364         } else if c.State != arvados.ContainerStateComplete {
365                 return "", fmt.Errorf("container did not complete: %s", c.State)
366         } else if c.ExitCode != 0 {
367                 return "", fmt.Errorf("container exited %d", c.ExitCode)
368         }
369         return cr.OutputUUID, err
370 }
371
372 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
373
374 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
375         if runner.Mounts == nil {
376                 runner.Mounts = make(map[string]map[string]interface{})
377         }
378         for _, path := range paths {
379                 if *path == "" || *path == "-" {
380                         continue
381                 }
382                 m := collectionInPathRe.FindStringSubmatch(*path)
383                 if m == nil {
384                         return fmt.Errorf("cannot find uuid in path: %q", *path)
385                 }
386                 uuid := m[2]
387                 mnt, ok := runner.Mounts["/mnt/"+uuid]
388                 if !ok {
389                         mnt = map[string]interface{}{
390                                 "kind": "collection",
391                                 "uuid": uuid,
392                         }
393                         runner.Mounts["/mnt/"+uuid] = mnt
394                 }
395                 *path = "/mnt/" + uuid + m[3]
396         }
397         return nil
398 }
399
400 var mtxMakeCommandCollection sync.Mutex
401
402 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
403         mtxMakeCommandCollection.Lock()
404         defer mtxMakeCommandCollection.Unlock()
405         exe, err := ioutil.ReadFile("/proc/self/exe")
406         if err != nil {
407                 return "", err
408         }
409         b2 := blake2b.Sum256(exe)
410         cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
411         var existing arvados.CollectionList
412         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
413                 Limit: 1,
414                 Count: "none",
415                 Filters: []arvados.Filter{
416                         {Attr: "name", Operator: "=", Operand: cname},
417                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
418                         {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
419                 },
420         })
421         if err != nil {
422                 return "", err
423         }
424         if len(existing.Items) > 0 {
425                 coll := existing.Items[0]
426                 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
427                 return coll.UUID, nil
428         }
429         log.Printf("writing lightning binary to new collection %q", cname)
430         ac, err := arvadosclient.New(runner.Client)
431         if err != nil {
432                 return "", err
433         }
434         kc := keepclient.New(ac)
435         var coll arvados.Collection
436         fs, err := coll.FileSystem(runner.Client, kc)
437         if err != nil {
438                 return "", err
439         }
440         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
441         if err != nil {
442                 return "", err
443         }
444         _, err = f.Write(exe)
445         if err != nil {
446                 return "", err
447         }
448         err = f.Close()
449         if err != nil {
450                 return "", err
451         }
452         mtxt, err := fs.MarshalManifest(".")
453         if err != nil {
454                 return "", err
455         }
456         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
457                 "collection": map[string]interface{}{
458                         "owner_uuid":    runner.ProjectUUID,
459                         "manifest_text": mtxt,
460                         "name":          cname,
461                         "properties": map[string]interface{}{
462                                 "blake2b": fmt.Sprintf("%x", b2),
463                         },
464                 },
465         })
466         if err != nil {
467                 return "", err
468         }
469         log.Printf("stored lightning binary in new collection %s", coll.UUID)
470         return coll.UUID, nil
471 }
472
473 // zopen returns a reader for the given file, using the arvados API
474 // instead of arv-mount/fuse where applicable, and transparently
475 // decompressing the input if fnm ends with ".gz".
476 func zopen(fnm string) (io.ReadCloser, error) {
477         f, err := open(fnm)
478         if err != nil || !strings.HasSuffix(fnm, ".gz") {
479                 return f, err
480         }
481         rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024))
482         if err != nil {
483                 f.Close()
484                 return nil, err
485         }
486         return gzipr{rdr, f}, nil
487 }
488
489 // gzipr wraps a ReadCloser and a Closer, presenting a single Close()
490 // method that closes both wrapped objects.
491 type gzipr struct {
492         io.ReadCloser
493         io.Closer
494 }
495
496 func (gr gzipr) Close() error {
497         e1 := gr.ReadCloser.Close()
498         e2 := gr.Closer.Close()
499         if e1 != nil {
500                 return e1
501         }
502         return e2
503 }
504
505 var (
506         arvadosClientFromEnv = arvados.NewClientFromEnv()
507         siteFS               arvados.CustomFileSystem
508         siteFSMtx            sync.Mutex
509 )
510
511 func open(fnm string) (io.ReadCloser, error) {
512         if os.Getenv("ARVADOS_API_HOST") == "" {
513                 return os.Open(fnm)
514         }
515         m := collectionInPathRe.FindStringSubmatch(fnm)
516         if m == nil {
517                 return os.Open(fnm)
518         }
519         uuid := m[2]
520         mnt := "/mnt/" + uuid + "/"
521         if !strings.HasPrefix(fnm, mnt) {
522                 return os.Open(fnm)
523         }
524
525         siteFSMtx.Lock()
526         defer siteFSMtx.Unlock()
527         if siteFS == nil {
528                 log.Info("setting up Arvados client")
529                 ac, err := arvadosclient.New(arvadosClientFromEnv)
530                 if err != nil {
531                         return nil, err
532                 }
533                 ac.Client = arvados.DefaultSecureClient
534                 kc := keepclient.New(ac)
535                 // Don't use keepclient's default short timeouts.
536                 kc.HTTPClient = arvados.DefaultSecureClient
537                 // Guess max concurrent readers, hope to avoid cache
538                 // thrashing.
539                 kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3}
540                 siteFS = arvadosClientFromEnv.SiteFileSystem(kc)
541         }
542
543         log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid)
544         return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):])
545 }