00d5d6fddac4296c3b7f35c1ba0463e8bc8d48c5
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "net/url"
11         "os"
12         "regexp"
13         "runtime"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/cmd"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
21         "git.arvados.org/arvados.git/sdk/go/keepclient"
22         log "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/blake2b"
24         "golang.org/x/net/websocket"
25 )
26
27 type eventMessage struct {
28         Status     int
29         ObjectUUID string `json:"object_uuid"`
30         EventType  string `json:"event_type"`
31         Properties struct {
32                 Text string
33         }
34 }
35
36 type arvadosClient struct {
37         *arvados.Client
38         notifying map[string]map[chan<- eventMessage]int
39         wantClose chan struct{}
40         wsconn    *websocket.Conn
41         mtx       sync.Mutex
42 }
43
44 // Listen for events concerning the given uuids. When an event occurs
45 // (and after connecting/reconnecting to the event stream), send each
46 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
47 // be sent only once for each update, but two Unsubscribe calls will
48 // be needed to stop sending them.
49 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
50         client.mtx.Lock()
51         defer client.mtx.Unlock()
52         if client.notifying == nil {
53                 client.notifying = map[string]map[chan<- eventMessage]int{}
54                 client.wantClose = make(chan struct{})
55                 go client.runNotifier()
56         }
57         chmap := client.notifying[uuid]
58         if chmap == nil {
59                 chmap = map[chan<- eventMessage]int{}
60                 client.notifying[uuid] = chmap
61         }
62         needSub := true
63         for _, nch := range chmap {
64                 if nch > 0 {
65                         needSub = false
66                         break
67                 }
68         }
69         chmap[ch]++
70         if needSub && client.wsconn != nil {
71                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
72                         "method": "subscribe",
73                         "filters": [][]interface{}{
74                                 {"object_uuid", "=", uuid},
75                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
76                         },
77                 })
78         }
79 }
80
81 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
82         client.mtx.Lock()
83         defer client.mtx.Unlock()
84         chmap := client.notifying[uuid]
85         if n := chmap[ch] - 1; n == 0 {
86                 delete(chmap, ch)
87                 if len(chmap) == 0 {
88                         delete(client.notifying, uuid)
89                 }
90                 if client.wsconn != nil {
91                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
92                                 "method": "unsubscribe",
93                                 "filters": [][]interface{}{
94                                         {"object_uuid", "=", uuid},
95                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
96                                 },
97                         })
98                 }
99         } else if n > 0 {
100                 chmap[ch] = n
101         }
102 }
103
104 func (client *arvadosClient) Close() {
105         client.mtx.Lock()
106         defer client.mtx.Unlock()
107         if client.notifying != nil {
108                 client.notifying = nil
109                 close(client.wantClose)
110         }
111 }
112
113 func (client *arvadosClient) runNotifier() {
114 reconnect:
115         for {
116                 var cluster arvados.Cluster
117                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
118                 if err != nil {
119                         log.Warnf("error getting cluster config: %s", err)
120                         time.Sleep(5 * time.Second)
121                         continue reconnect
122                 }
123                 wsURL := cluster.Services.Websocket.ExternalURL
124                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
125                 wsURL.Path = "/websocket"
126                 wsURLNoToken := wsURL.String()
127                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
128                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
129                 if err != nil {
130                         log.Warnf("websocket connection error: %s", err)
131                         time.Sleep(5 * time.Second)
132                         continue reconnect
133                 }
134                 log.Printf("connected to websocket at %s", wsURLNoToken)
135
136                 client.mtx.Lock()
137                 client.wsconn = conn
138                 resubscribe := make([]string, 0, len(client.notifying))
139                 for uuid := range client.notifying {
140                         resubscribe = append(resubscribe, uuid)
141                 }
142                 client.mtx.Unlock()
143
144                 go func() {
145                         w := json.NewEncoder(conn)
146                         for _, uuid := range resubscribe {
147                                 w.Encode(map[string]interface{}{
148                                         "method": "subscribe",
149                                         "filters": [][]interface{}{
150                                                 {"object_uuid", "=", uuid},
151                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
152                                         },
153                                 })
154                         }
155                 }()
156
157                 r := json.NewDecoder(conn)
158                 for {
159                         var msg eventMessage
160                         err := r.Decode(&msg)
161                         select {
162                         case <-client.wantClose:
163                                 return
164                         default:
165                                 if err != nil {
166                                         log.Printf("error decoding websocket message: %s", err)
167                                         client.mtx.Lock()
168                                         client.wsconn = nil
169                                         client.mtx.Unlock()
170                                         go conn.Close()
171                                         continue reconnect
172                                 }
173                                 client.mtx.Lock()
174                                 for ch := range client.notifying[msg.ObjectUUID] {
175                                         ch <- msg
176                                 }
177                                 client.mtx.Unlock()
178                         }
179                 }
180         }
181 }
182
183 var refreshTicker = time.NewTicker(5 * time.Second)
184
185 type arvadosContainerRunner struct {
186         Client      *arvados.Client
187         Name        string
188         OutputName  string
189         ProjectUUID string
190         APIAccess   bool
191         VCPUs       int
192         RAM         int64
193         Prog        string // if empty, run /proc/self/exe
194         Args        []string
195         Mounts      map[string]map[string]interface{}
196         Priority    int
197         KeepCache   int // cache buffers per VCPU (0 for default)
198 }
199
200 func (runner *arvadosContainerRunner) Run() (string, error) {
201         return runner.RunContext(context.Background())
202 }
203
204 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
205         if runner.ProjectUUID == "" {
206                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
207         }
208
209         mounts := map[string]map[string]interface{}{
210                 "/mnt/output": {
211                         "kind":     "collection",
212                         "writable": true,
213                 },
214         }
215         for path, mnt := range runner.Mounts {
216                 mounts[path] = mnt
217         }
218
219         prog := runner.Prog
220         if prog == "" {
221                 prog = "/mnt/cmd/lightning"
222                 cmdUUID, err := runner.makeCommandCollection()
223                 if err != nil {
224                         return "", err
225                 }
226                 mounts["/mnt/cmd"] = map[string]interface{}{
227                         "kind": "collection",
228                         "uuid": cmdUUID,
229                 }
230         }
231         command := append([]string{prog}, runner.Args...)
232
233         priority := runner.Priority
234         if priority < 1 {
235                 priority = 500
236         }
237         keepCache := runner.KeepCache
238         if keepCache < 1 {
239                 keepCache = 2
240         }
241         rc := arvados.RuntimeConstraints{
242                 API:          &runner.APIAccess,
243                 VCPUs:        runner.VCPUs,
244                 RAM:          runner.RAM,
245                 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
246         }
247         outname := &runner.OutputName
248         if *outname == "" {
249                 outname = nil
250         }
251         var cr arvados.ContainerRequest
252         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
253                 "container_request": map[string]interface{}{
254                         "owner_uuid":          runner.ProjectUUID,
255                         "name":                runner.Name,
256                         "container_image":     "lightning-runtime",
257                         "command":             command,
258                         "mounts":              mounts,
259                         "use_existing":        true,
260                         "output_path":         "/mnt/output",
261                         "output_name":         outname,
262                         "runtime_constraints": rc,
263                         "priority":            runner.Priority,
264                         "state":               arvados.ContainerRequestStateCommitted,
265                 },
266         })
267         if err != nil {
268                 return "", err
269         }
270         log.Printf("container request UUID: %s", cr.UUID)
271         log.Printf("container UUID: %s", cr.ContainerUUID)
272
273         logch := make(chan eventMessage)
274         client := arvadosClient{Client: runner.Client}
275         defer client.Close()
276         subscribedUUID := ""
277         defer func() {
278                 if subscribedUUID != "" {
279                         client.Unsubscribe(logch, subscribedUUID)
280                 }
281         }()
282
283         neednewline := ""
284
285         lastState := cr.State
286         refreshCR := func() {
287                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
288                 if err != nil {
289                         fmt.Fprint(os.Stderr, neednewline)
290                         log.Printf("error getting container request: %s", err)
291                         return
292                 }
293                 if lastState != cr.State {
294                         fmt.Fprint(os.Stderr, neednewline)
295                         log.Printf("container request state: %s", cr.State)
296                         lastState = cr.State
297                 }
298                 if subscribedUUID != cr.ContainerUUID {
299                         fmt.Fprint(os.Stderr, neednewline)
300                         neednewline = ""
301                         if subscribedUUID != "" {
302                                 client.Unsubscribe(logch, subscribedUUID)
303                         }
304                         client.Subscribe(logch, cr.ContainerUUID)
305                         subscribedUUID = cr.ContainerUUID
306                 }
307         }
308
309         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
310 waitctr:
311         for cr.State != arvados.ContainerRequestStateFinal {
312                 select {
313                 case <-ctx.Done():
314                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
315                                 "container_request": map[string]interface{}{
316                                         "priority": 0,
317                                 },
318                         })
319                         if err != nil {
320                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
321                         }
322                         break waitctr
323                 case <-refreshTicker.C:
324                         refreshCR()
325                 case msg := <-logch:
326                         switch msg.EventType {
327                         case "update":
328                                 refreshCR()
329                         case "stderr":
330                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
331                                         if line != "" {
332                                                 fmt.Fprint(os.Stderr, neednewline)
333                                                 neednewline = ""
334                                                 log.Print(line)
335                                         }
336                                 }
337                         case "crunchstat":
338                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
339                                         mem := reCrunchstat.FindString(line)
340                                         if mem != "" {
341                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
342                                                 neednewline = "\n"
343                                         }
344                                 }
345                         }
346                 }
347         }
348         fmt.Fprint(os.Stderr, neednewline)
349
350         if err := ctx.Err(); err != nil {
351                 return "", err
352         }
353
354         var c arvados.Container
355         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
356         if err != nil {
357                 return "", err
358         } else if c.State != arvados.ContainerStateComplete {
359                 return "", fmt.Errorf("container did not complete: %s", c.State)
360         } else if c.ExitCode != 0 {
361                 return "", fmt.Errorf("container exited %d", c.ExitCode)
362         }
363         return cr.OutputUUID, err
364 }
365
366 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
367
368 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
369         if runner.Mounts == nil {
370                 runner.Mounts = make(map[string]map[string]interface{})
371         }
372         for _, path := range paths {
373                 if *path == "" || *path == "-" {
374                         continue
375                 }
376                 m := collectionInPathRe.FindStringSubmatch(*path)
377                 if m == nil {
378                         return fmt.Errorf("cannot find uuid in path: %q", *path)
379                 }
380                 uuid := m[2]
381                 mnt, ok := runner.Mounts["/mnt/"+uuid]
382                 if !ok {
383                         mnt = map[string]interface{}{
384                                 "kind": "collection",
385                                 "uuid": uuid,
386                         }
387                         runner.Mounts["/mnt/"+uuid] = mnt
388                 }
389                 *path = "/mnt/" + uuid + m[3]
390         }
391         return nil
392 }
393
394 var mtxMakeCommandCollection sync.Mutex
395
396 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
397         mtxMakeCommandCollection.Lock()
398         defer mtxMakeCommandCollection.Unlock()
399         exe, err := ioutil.ReadFile("/proc/self/exe")
400         if err != nil {
401                 return "", err
402         }
403         b2 := blake2b.Sum256(exe)
404         cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
405         var existing arvados.CollectionList
406         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
407                 Limit: 1,
408                 Count: "none",
409                 Filters: []arvados.Filter{
410                         {Attr: "name", Operator: "=", Operand: cname},
411                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
412                         {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
413                 },
414         })
415         if err != nil {
416                 return "", err
417         }
418         if len(existing.Items) > 0 {
419                 coll := existing.Items[0]
420                 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
421                 return coll.UUID, nil
422         }
423         log.Printf("writing lightning binary to new collection %q", cname)
424         ac, err := arvadosclient.New(runner.Client)
425         if err != nil {
426                 return "", err
427         }
428         kc := keepclient.New(ac)
429         var coll arvados.Collection
430         fs, err := coll.FileSystem(runner.Client, kc)
431         if err != nil {
432                 return "", err
433         }
434         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
435         if err != nil {
436                 return "", err
437         }
438         _, err = f.Write(exe)
439         if err != nil {
440                 return "", err
441         }
442         err = f.Close()
443         if err != nil {
444                 return "", err
445         }
446         mtxt, err := fs.MarshalManifest(".")
447         if err != nil {
448                 return "", err
449         }
450         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
451                 "collection": map[string]interface{}{
452                         "owner_uuid":    runner.ProjectUUID,
453                         "manifest_text": mtxt,
454                         "name":          cname,
455                         "properties": map[string]interface{}{
456                                 "blake2b": fmt.Sprintf("%x", b2),
457                         },
458                 },
459         })
460         if err != nil {
461                 return "", err
462         }
463         log.Printf("stored lightning binary in new collection %s", coll.UUID)
464         return coll.UUID, nil
465 }
466
467 var (
468         arvadosClientFromEnv = arvados.NewClientFromEnv()
469         siteFS               arvados.CustomFileSystem
470         siteFSMtx            sync.Mutex
471 )
472
473 func open(fnm string) (io.ReadCloser, error) {
474         if os.Getenv("ARVADOS_API_HOST") == "" {
475                 return os.Open(fnm)
476         }
477         m := collectionInPathRe.FindStringSubmatch(fnm)
478         if m == nil {
479                 return os.Open(fnm)
480         }
481         uuid := m[2]
482         mnt := "/mnt/" + uuid + "/"
483         if !strings.HasPrefix(fnm, mnt) {
484                 return os.Open(fnm)
485         }
486
487         siteFSMtx.Lock()
488         defer siteFSMtx.Unlock()
489         if siteFS == nil {
490                 log.Info("setting up Arvados client")
491                 ac, err := arvadosclient.New(arvadosClientFromEnv)
492                 if err != nil {
493                         return nil, err
494                 }
495                 ac.Client = arvados.DefaultSecureClient
496                 kc := keepclient.New(ac)
497                 // Don't use keepclient's default short timeouts.
498                 kc.HTTPClient = arvados.DefaultSecureClient
499                 // Guess max concurrent readers, hope to avoid cache
500                 // thrashing.
501                 kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3}
502                 siteFS = arvadosClientFromEnv.SiteFileSystem(kc)
503         }
504
505         log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid)
506         return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):])
507 }