Bigger output buffer for annotate.
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "net/url"
11         "os"
12         "regexp"
13         "strings"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/lib/cmd"
18         "git.arvados.org/arvados.git/sdk/go/arvados"
19         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
20         "git.arvados.org/arvados.git/sdk/go/keepclient"
21         log "github.com/sirupsen/logrus"
22         "golang.org/x/crypto/blake2b"
23         "golang.org/x/net/websocket"
24 )
25
26 type eventMessage struct {
27         Status     int
28         ObjectUUID string `json:"object_uuid"`
29         EventType  string `json:"event_type"`
30         Properties struct {
31                 Text string
32         }
33 }
34
35 type arvadosClient struct {
36         *arvados.Client
37         notifying map[string]map[chan<- eventMessage]int
38         wantClose chan struct{}
39         wsconn    *websocket.Conn
40         mtx       sync.Mutex
41 }
42
43 // Listen for events concerning the given uuids. When an event occurs
44 // (and after connecting/reconnecting to the event stream), send each
45 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
46 // be sent only once for each update, but two Unsubscribe calls will
47 // be needed to stop sending them.
48 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
49         client.mtx.Lock()
50         defer client.mtx.Unlock()
51         if client.notifying == nil {
52                 client.notifying = map[string]map[chan<- eventMessage]int{}
53                 client.wantClose = make(chan struct{})
54                 go client.runNotifier()
55         }
56         chmap := client.notifying[uuid]
57         if chmap == nil {
58                 chmap = map[chan<- eventMessage]int{}
59                 client.notifying[uuid] = chmap
60         }
61         needSub := true
62         for _, nch := range chmap {
63                 if nch > 0 {
64                         needSub = false
65                         break
66                 }
67         }
68         chmap[ch]++
69         if needSub && client.wsconn != nil {
70                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
71                         "method": "subscribe",
72                         "filters": [][]interface{}{
73                                 {"object_uuid", "=", uuid},
74                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
75                         },
76                 })
77         }
78 }
79
80 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
81         client.mtx.Lock()
82         defer client.mtx.Unlock()
83         chmap := client.notifying[uuid]
84         if n := chmap[ch] - 1; n == 0 {
85                 delete(chmap, ch)
86                 if len(chmap) == 0 {
87                         delete(client.notifying, uuid)
88                 }
89                 if client.wsconn != nil {
90                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
91                                 "method": "unsubscribe",
92                                 "filters": [][]interface{}{
93                                         {"object_uuid", "=", uuid},
94                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
95                                 },
96                         })
97                 }
98         } else if n > 0 {
99                 chmap[ch] = n
100         }
101 }
102
103 func (client *arvadosClient) Close() {
104         client.mtx.Lock()
105         defer client.mtx.Unlock()
106         if client.notifying != nil {
107                 client.notifying = nil
108                 close(client.wantClose)
109         }
110 }
111
112 func (client *arvadosClient) runNotifier() {
113 reconnect:
114         for {
115                 var cluster arvados.Cluster
116                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
117                 if err != nil {
118                         log.Warnf("error getting cluster config: %s", err)
119                         time.Sleep(5 * time.Second)
120                         continue reconnect
121                 }
122                 wsURL := cluster.Services.Websocket.ExternalURL
123                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
124                 wsURL.Path = "/websocket"
125                 wsURLNoToken := wsURL.String()
126                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
127                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
128                 if err != nil {
129                         log.Warnf("websocket connection error: %s", err)
130                         time.Sleep(5 * time.Second)
131                         continue reconnect
132                 }
133                 log.Printf("connected to websocket at %s", wsURLNoToken)
134
135                 client.mtx.Lock()
136                 client.wsconn = conn
137                 resubscribe := make([]string, 0, len(client.notifying))
138                 for uuid := range client.notifying {
139                         resubscribe = append(resubscribe, uuid)
140                 }
141                 client.mtx.Unlock()
142
143                 go func() {
144                         w := json.NewEncoder(conn)
145                         for _, uuid := range resubscribe {
146                                 w.Encode(map[string]interface{}{
147                                         "method": "subscribe",
148                                         "filters": [][]interface{}{
149                                                 {"object_uuid", "=", uuid},
150                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
151                                         },
152                                 })
153                         }
154                 }()
155
156                 r := json.NewDecoder(conn)
157                 for {
158                         var msg eventMessage
159                         err := r.Decode(&msg)
160                         select {
161                         case <-client.wantClose:
162                                 return
163                         default:
164                                 if err != nil {
165                                         log.Printf("error decoding websocket message: %s", err)
166                                         client.mtx.Lock()
167                                         client.wsconn = nil
168                                         client.mtx.Unlock()
169                                         go conn.Close()
170                                         continue reconnect
171                                 }
172                                 client.mtx.Lock()
173                                 for ch := range client.notifying[msg.ObjectUUID] {
174                                         ch <- msg
175                                 }
176                                 client.mtx.Unlock()
177                         }
178                 }
179         }
180 }
181
182 var refreshTicker = time.NewTicker(5 * time.Second)
183
184 type arvadosContainerRunner struct {
185         Client      *arvados.Client
186         Name        string
187         OutputName  string
188         ProjectUUID string
189         APIAccess   bool
190         VCPUs       int
191         RAM         int64
192         Prog        string // if empty, run /proc/self/exe
193         Args        []string
194         Mounts      map[string]map[string]interface{}
195         Priority    int
196         KeepCache   int // cache buffers per VCPU (0 for default)
197 }
198
199 func (runner *arvadosContainerRunner) Run() (string, error) {
200         return runner.RunContext(context.Background())
201 }
202
203 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
204         if runner.ProjectUUID == "" {
205                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
206         }
207
208         mounts := map[string]map[string]interface{}{
209                 "/mnt/output": {
210                         "kind":     "collection",
211                         "writable": true,
212                 },
213         }
214         for path, mnt := range runner.Mounts {
215                 mounts[path] = mnt
216         }
217
218         prog := runner.Prog
219         if prog == "" {
220                 prog = "/mnt/cmd/lightning"
221                 cmdUUID, err := runner.makeCommandCollection()
222                 if err != nil {
223                         return "", err
224                 }
225                 mounts["/mnt/cmd"] = map[string]interface{}{
226                         "kind": "collection",
227                         "uuid": cmdUUID,
228                 }
229         }
230         command := append([]string{prog}, runner.Args...)
231
232         priority := runner.Priority
233         if priority < 1 {
234                 priority = 500
235         }
236         keepCache := runner.KeepCache
237         if keepCache < 1 {
238                 keepCache = 2
239         }
240         rc := arvados.RuntimeConstraints{
241                 API:          &runner.APIAccess,
242                 VCPUs:        runner.VCPUs,
243                 RAM:          runner.RAM,
244                 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
245         }
246         outname := &runner.OutputName
247         if *outname == "" {
248                 outname = nil
249         }
250         var cr arvados.ContainerRequest
251         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
252                 "container_request": map[string]interface{}{
253                         "owner_uuid":          runner.ProjectUUID,
254                         "name":                runner.Name,
255                         "container_image":     "lightning-runtime",
256                         "command":             command,
257                         "mounts":              mounts,
258                         "use_existing":        true,
259                         "output_path":         "/mnt/output",
260                         "output_name":         outname,
261                         "runtime_constraints": rc,
262                         "priority":            runner.Priority,
263                         "state":               arvados.ContainerRequestStateCommitted,
264                 },
265         })
266         if err != nil {
267                 return "", err
268         }
269         log.Printf("container request UUID: %s", cr.UUID)
270         log.Printf("container UUID: %s", cr.ContainerUUID)
271
272         logch := make(chan eventMessage)
273         client := arvadosClient{Client: runner.Client}
274         defer client.Close()
275         subscribedUUID := ""
276         defer func() {
277                 if subscribedUUID != "" {
278                         client.Unsubscribe(logch, subscribedUUID)
279                 }
280         }()
281
282         neednewline := ""
283
284         lastState := cr.State
285         refreshCR := func() {
286                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
287                 if err != nil {
288                         fmt.Fprint(os.Stderr, neednewline)
289                         log.Printf("error getting container request: %s", err)
290                         return
291                 }
292                 if lastState != cr.State {
293                         fmt.Fprint(os.Stderr, neednewline)
294                         log.Printf("container request state: %s", cr.State)
295                         lastState = cr.State
296                 }
297                 if subscribedUUID != cr.ContainerUUID {
298                         fmt.Fprint(os.Stderr, neednewline)
299                         neednewline = ""
300                         if subscribedUUID != "" {
301                                 client.Unsubscribe(logch, subscribedUUID)
302                         }
303                         client.Subscribe(logch, cr.ContainerUUID)
304                         subscribedUUID = cr.ContainerUUID
305                 }
306         }
307
308         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
309 waitctr:
310         for cr.State != arvados.ContainerRequestStateFinal {
311                 select {
312                 case <-ctx.Done():
313                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
314                                 "container_request": map[string]interface{}{
315                                         "priority": 0,
316                                 },
317                         })
318                         if err != nil {
319                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
320                         }
321                         break waitctr
322                 case <-refreshTicker.C:
323                         refreshCR()
324                 case msg := <-logch:
325                         switch msg.EventType {
326                         case "update":
327                                 refreshCR()
328                         case "stderr":
329                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
330                                         if line != "" {
331                                                 fmt.Fprint(os.Stderr, neednewline)
332                                                 neednewline = ""
333                                                 log.Print(line)
334                                         }
335                                 }
336                         case "crunchstat":
337                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
338                                         mem := reCrunchstat.FindString(line)
339                                         if mem != "" {
340                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
341                                                 neednewline = "\n"
342                                         }
343                                 }
344                         }
345                 }
346         }
347         fmt.Fprint(os.Stderr, neednewline)
348
349         if err := ctx.Err(); err != nil {
350                 return "", err
351         }
352
353         var c arvados.Container
354         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
355         if err != nil {
356                 return "", err
357         } else if c.State != arvados.ContainerStateComplete {
358                 return "", fmt.Errorf("container did not complete: %s", c.State)
359         } else if c.ExitCode != 0 {
360                 return "", fmt.Errorf("container exited %d", c.ExitCode)
361         }
362         return cr.OutputUUID, err
363 }
364
365 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
366
367 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
368         if runner.Mounts == nil {
369                 runner.Mounts = make(map[string]map[string]interface{})
370         }
371         for _, path := range paths {
372                 if *path == "" || *path == "-" {
373                         continue
374                 }
375                 m := collectionInPathRe.FindStringSubmatch(*path)
376                 if m == nil {
377                         return fmt.Errorf("cannot find uuid in path: %q", *path)
378                 }
379                 uuid := m[2]
380                 mnt, ok := runner.Mounts["/mnt/"+uuid]
381                 if !ok {
382                         mnt = map[string]interface{}{
383                                 "kind": "collection",
384                                 "uuid": uuid,
385                         }
386                         runner.Mounts["/mnt/"+uuid] = mnt
387                 }
388                 *path = "/mnt/" + uuid + m[3]
389         }
390         return nil
391 }
392
393 var mtxMakeCommandCollection sync.Mutex
394
395 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
396         mtxMakeCommandCollection.Lock()
397         defer mtxMakeCommandCollection.Unlock()
398         exe, err := ioutil.ReadFile("/proc/self/exe")
399         if err != nil {
400                 return "", err
401         }
402         b2 := blake2b.Sum256(exe)
403         cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
404         var existing arvados.CollectionList
405         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
406                 Limit: 1,
407                 Count: "none",
408                 Filters: []arvados.Filter{
409                         {Attr: "name", Operator: "=", Operand: cname},
410                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
411                         {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
412                 },
413         })
414         if err != nil {
415                 return "", err
416         }
417         if len(existing.Items) > 0 {
418                 coll := existing.Items[0]
419                 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
420                 return coll.UUID, nil
421         }
422         log.Printf("writing lightning binary to new collection %q", cname)
423         ac, err := arvadosclient.New(runner.Client)
424         if err != nil {
425                 return "", err
426         }
427         kc := keepclient.New(ac)
428         var coll arvados.Collection
429         fs, err := coll.FileSystem(runner.Client, kc)
430         if err != nil {
431                 return "", err
432         }
433         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
434         if err != nil {
435                 return "", err
436         }
437         _, err = f.Write(exe)
438         if err != nil {
439                 return "", err
440         }
441         err = f.Close()
442         if err != nil {
443                 return "", err
444         }
445         mtxt, err := fs.MarshalManifest(".")
446         if err != nil {
447                 return "", err
448         }
449         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
450                 "collection": map[string]interface{}{
451                         "owner_uuid":    runner.ProjectUUID,
452                         "manifest_text": mtxt,
453                         "name":          cname,
454                         "properties": map[string]interface{}{
455                                 "blake2b": fmt.Sprintf("%x", b2),
456                         },
457                 },
458         })
459         if err != nil {
460                 return "", err
461         }
462         log.Printf("stored lightning binary in new collection %s", coll.UUID)
463         return coll.UUID, nil
464 }
465
466 var arvadosClientFromEnv = arvados.NewClientFromEnv()
467
468 func open(fnm string) (io.ReadCloser, error) {
469         if os.Getenv("ARVADOS_API_HOST") == "" {
470                 return os.Open(fnm)
471         }
472         m := collectionInPathRe.FindStringSubmatch(fnm)
473         if m == nil {
474                 return os.Open(fnm)
475         }
476         uuid := m[2]
477         mnt := "/mnt/" + uuid + "/"
478         if !strings.HasPrefix(fnm, mnt) {
479                 return os.Open(fnm)
480         }
481
482         log.Infof("reading %q from %s using Arvados client library", fnm[len(mnt):], uuid)
483         ac, err := arvadosclient.New(arvadosClientFromEnv)
484         if err != nil {
485                 return nil, err
486         }
487         ac.Client = arvados.DefaultSecureClient
488         kc := keepclient.New(ac)
489         // Don't use keepclient's default short timeouts.
490         kc.HTTPClient = arvados.DefaultSecureClient
491         // Don't cache more than one block for this file.
492         kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 1}
493
494         var coll arvados.Collection
495         err = arvadosClientFromEnv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+uuid, nil, arvados.GetOptions{Select: []string{"uuid", "manifest_text"}})
496         if err != nil {
497                 return nil, err
498         }
499         fs, err := coll.FileSystem(arvadosClientFromEnv, kc)
500         if err != nil {
501                 return nil, err
502         }
503         return fs.Open(fnm[len(mnt):])
504 }