More memory + direct Keep access for merge and exportnumpy.
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "net/url"
11         "os"
12         "regexp"
13         "strings"
14         "sync"
15         "time"
16
17         "git.arvados.org/arvados.git/sdk/go/arvados"
18         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
19         "git.arvados.org/arvados.git/sdk/go/keepclient"
20         log "github.com/sirupsen/logrus"
21         "golang.org/x/crypto/blake2b"
22         "golang.org/x/net/websocket"
23 )
24
25 type eventMessage struct {
26         Status     int
27         ObjectUUID string `json:"object_uuid"`
28         EventType  string `json:"event_type"`
29         Properties struct {
30                 Text string
31         }
32 }
33
34 type arvadosClient struct {
35         *arvados.Client
36         notifying map[string]map[chan<- eventMessage]int
37         wantClose chan struct{}
38         wsconn    *websocket.Conn
39         mtx       sync.Mutex
40 }
41
42 // Listen for events concerning the given uuids. When an event occurs
43 // (and after connecting/reconnecting to the event stream), send each
44 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
45 // be sent only once for each update, but two Unsubscribe calls will
46 // be needed to stop sending them.
47 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
48         client.mtx.Lock()
49         defer client.mtx.Unlock()
50         if client.notifying == nil {
51                 client.notifying = map[string]map[chan<- eventMessage]int{}
52                 client.wantClose = make(chan struct{})
53                 go client.runNotifier()
54         }
55         chmap := client.notifying[uuid]
56         if chmap == nil {
57                 chmap = map[chan<- eventMessage]int{}
58                 client.notifying[uuid] = chmap
59         }
60         needSub := true
61         for _, nch := range chmap {
62                 if nch > 0 {
63                         needSub = false
64                         break
65                 }
66         }
67         chmap[ch]++
68         if needSub && client.wsconn != nil {
69                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
70                         "method": "subscribe",
71                         "filters": [][]interface{}{
72                                 {"object_uuid", "=", uuid},
73                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
74                         },
75                 })
76         }
77 }
78
79 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
80         client.mtx.Lock()
81         defer client.mtx.Unlock()
82         chmap := client.notifying[uuid]
83         if n := chmap[ch] - 1; n == 0 {
84                 delete(chmap, ch)
85                 if len(chmap) == 0 {
86                         delete(client.notifying, uuid)
87                 }
88                 if client.wsconn != nil {
89                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
90                                 "method": "unsubscribe",
91                                 "filters": [][]interface{}{
92                                         {"object_uuid", "=", uuid},
93                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
94                                 },
95                         })
96                 }
97         } else if n > 0 {
98                 chmap[ch] = n
99         }
100 }
101
102 func (client *arvadosClient) Close() {
103         client.mtx.Lock()
104         defer client.mtx.Unlock()
105         if client.notifying != nil {
106                 client.notifying = nil
107                 close(client.wantClose)
108         }
109 }
110
111 func (client *arvadosClient) runNotifier() {
112 reconnect:
113         for {
114                 var cluster arvados.Cluster
115                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
116                 if err != nil {
117                         log.Warnf("error getting cluster config: %s", err)
118                         time.Sleep(5 * time.Second)
119                         continue reconnect
120                 }
121                 wsURL := cluster.Services.Websocket.ExternalURL
122                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
123                 wsURL.Path = "/websocket"
124                 wsURLNoToken := wsURL.String()
125                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
126                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
127                 if err != nil {
128                         log.Warnf("websocket connection error: %s", err)
129                         time.Sleep(5 * time.Second)
130                         continue reconnect
131                 }
132                 log.Printf("connected to websocket at %s", wsURLNoToken)
133
134                 client.mtx.Lock()
135                 client.wsconn = conn
136                 resubscribe := make([]string, 0, len(client.notifying))
137                 for uuid := range client.notifying {
138                         resubscribe = append(resubscribe, uuid)
139                 }
140                 client.mtx.Unlock()
141
142                 go func() {
143                         w := json.NewEncoder(conn)
144                         for _, uuid := range resubscribe {
145                                 w.Encode(map[string]interface{}{
146                                         "method": "subscribe",
147                                         "filters": [][]interface{}{
148                                                 {"object_uuid", "=", uuid},
149                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
150                                         },
151                                 })
152                         }
153                 }()
154
155                 r := json.NewDecoder(conn)
156                 for {
157                         var msg eventMessage
158                         err := r.Decode(&msg)
159                         select {
160                         case <-client.wantClose:
161                                 return
162                         default:
163                                 if err != nil {
164                                         log.Printf("error decoding websocket message: %s", err)
165                                         client.mtx.Lock()
166                                         client.wsconn = nil
167                                         client.mtx.Unlock()
168                                         go conn.Close()
169                                         continue reconnect
170                                 }
171                                 client.mtx.Lock()
172                                 for ch := range client.notifying[msg.ObjectUUID] {
173                                         ch <- msg
174                                 }
175                                 client.mtx.Unlock()
176                         }
177                 }
178         }
179 }
180
181 var refreshTicker = time.NewTicker(5 * time.Second)
182
183 type arvadosContainerRunner struct {
184         Client      *arvados.Client
185         Name        string
186         OutputName  string
187         ProjectUUID string
188         APIAccess   bool
189         VCPUs       int
190         RAM         int64
191         Prog        string // if empty, run /proc/self/exe
192         Args        []string
193         Mounts      map[string]map[string]interface{}
194         Priority    int
195         KeepCache   int // cache buffers per VCPU (0 for default)
196 }
197
198 func (runner *arvadosContainerRunner) Run() (string, error) {
199         return runner.RunContext(context.Background())
200 }
201
202 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
203         if runner.ProjectUUID == "" {
204                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
205         }
206
207         mounts := map[string]map[string]interface{}{
208                 "/mnt/output": {
209                         "kind":     "collection",
210                         "writable": true,
211                 },
212         }
213         for path, mnt := range runner.Mounts {
214                 mounts[path] = mnt
215         }
216
217         prog := runner.Prog
218         if prog == "" {
219                 prog = "/mnt/cmd/lightning"
220                 cmdUUID, err := runner.makeCommandCollection()
221                 if err != nil {
222                         return "", err
223                 }
224                 mounts["/mnt/cmd"] = map[string]interface{}{
225                         "kind": "collection",
226                         "uuid": cmdUUID,
227                 }
228         }
229         command := append([]string{prog}, runner.Args...)
230
231         priority := runner.Priority
232         if priority < 1 {
233                 priority = 500
234         }
235         keepCache := runner.KeepCache
236         if keepCache < 1 {
237                 keepCache = 2
238         }
239         rc := arvados.RuntimeConstraints{
240                 API:          &runner.APIAccess,
241                 VCPUs:        runner.VCPUs,
242                 RAM:          runner.RAM,
243                 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
244         }
245         outname := &runner.OutputName
246         if *outname == "" {
247                 outname = nil
248         }
249         var cr arvados.ContainerRequest
250         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
251                 "container_request": map[string]interface{}{
252                         "owner_uuid":          runner.ProjectUUID,
253                         "name":                runner.Name,
254                         "container_image":     "lightning-runtime",
255                         "command":             command,
256                         "mounts":              mounts,
257                         "use_existing":        true,
258                         "output_path":         "/mnt/output",
259                         "output_name":         outname,
260                         "runtime_constraints": rc,
261                         "priority":            runner.Priority,
262                         "state":               arvados.ContainerRequestStateCommitted,
263                 },
264         })
265         if err != nil {
266                 return "", err
267         }
268         log.Printf("container request UUID: %s", cr.UUID)
269         log.Printf("container UUID: %s", cr.ContainerUUID)
270
271         logch := make(chan eventMessage)
272         client := arvadosClient{Client: runner.Client}
273         defer client.Close()
274         subscribedUUID := ""
275         defer func() {
276                 if subscribedUUID != "" {
277                         client.Unsubscribe(logch, subscribedUUID)
278                 }
279         }()
280
281         neednewline := ""
282
283         lastState := cr.State
284         refreshCR := func() {
285                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
286                 if err != nil {
287                         fmt.Fprint(os.Stderr, neednewline)
288                         log.Printf("error getting container request: %s", err)
289                         return
290                 }
291                 if lastState != cr.State {
292                         fmt.Fprint(os.Stderr, neednewline)
293                         log.Printf("container request state: %s", cr.State)
294                         lastState = cr.State
295                 }
296                 if subscribedUUID != cr.ContainerUUID {
297                         fmt.Fprint(os.Stderr, neednewline)
298                         neednewline = ""
299                         if subscribedUUID != "" {
300                                 client.Unsubscribe(logch, subscribedUUID)
301                         }
302                         client.Subscribe(logch, cr.ContainerUUID)
303                         subscribedUUID = cr.ContainerUUID
304                 }
305         }
306
307         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
308 waitctr:
309         for cr.State != arvados.ContainerRequestStateFinal {
310                 select {
311                 case <-ctx.Done():
312                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
313                                 "container_request": map[string]interface{}{
314                                         "priority": 0,
315                                 },
316                         })
317                         if err != nil {
318                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
319                         }
320                         break waitctr
321                 case <-refreshTicker.C:
322                         refreshCR()
323                 case msg := <-logch:
324                         switch msg.EventType {
325                         case "update":
326                                 refreshCR()
327                         case "stderr":
328                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
329                                         if line != "" {
330                                                 fmt.Fprint(os.Stderr, neednewline)
331                                                 neednewline = ""
332                                                 log.Print(line)
333                                         }
334                                 }
335                         case "crunchstat":
336                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
337                                         mem := reCrunchstat.FindString(line)
338                                         if mem != "" {
339                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
340                                                 neednewline = "\n"
341                                         }
342                                 }
343                         }
344                 }
345         }
346         fmt.Fprint(os.Stderr, neednewline)
347
348         if err := ctx.Err(); err != nil {
349                 return "", err
350         }
351
352         var c arvados.Container
353         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
354         if err != nil {
355                 return "", err
356         } else if c.State != arvados.ContainerStateComplete {
357                 return "", fmt.Errorf("container did not complete: %s", c.State)
358         } else if c.ExitCode != 0 {
359                 return "", fmt.Errorf("container exited %d", c.ExitCode)
360         }
361         return cr.OutputUUID, err
362 }
363
364 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
365
366 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
367         if runner.Mounts == nil {
368                 runner.Mounts = make(map[string]map[string]interface{})
369         }
370         for _, path := range paths {
371                 if *path == "" || *path == "-" {
372                         continue
373                 }
374                 m := collectionInPathRe.FindStringSubmatch(*path)
375                 if m == nil {
376                         return fmt.Errorf("cannot find uuid in path: %q", *path)
377                 }
378                 uuid := m[2]
379                 mnt, ok := runner.Mounts["/mnt/"+uuid]
380                 if !ok {
381                         mnt = map[string]interface{}{
382                                 "kind": "collection",
383                                 "uuid": uuid,
384                         }
385                         runner.Mounts["/mnt/"+uuid] = mnt
386                 }
387                 *path = "/mnt/" + uuid + m[3]
388         }
389         return nil
390 }
391
392 var mtxMakeCommandCollection sync.Mutex
393
394 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
395         mtxMakeCommandCollection.Lock()
396         defer mtxMakeCommandCollection.Unlock()
397         exe, err := ioutil.ReadFile("/proc/self/exe")
398         if err != nil {
399                 return "", err
400         }
401         b2 := blake2b.Sum256(exe)
402         cname := fmt.Sprintf("lightning-%x", b2)
403         var existing arvados.CollectionList
404         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
405                 Limit: 1,
406                 Count: "none",
407                 Filters: []arvados.Filter{
408                         {Attr: "name", Operator: "=", Operand: cname},
409                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
410                 },
411         })
412         if err != nil {
413                 return "", err
414         }
415         if len(existing.Items) > 0 {
416                 uuid := existing.Items[0].UUID
417                 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
418                 return uuid, nil
419         }
420         log.Printf("writing lightning binary to new collection %q", cname)
421         ac, err := arvadosclient.New(runner.Client)
422         if err != nil {
423                 return "", err
424         }
425         kc := keepclient.New(ac)
426         var coll arvados.Collection
427         fs, err := coll.FileSystem(runner.Client, kc)
428         if err != nil {
429                 return "", err
430         }
431         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
432         if err != nil {
433                 return "", err
434         }
435         _, err = f.Write(exe)
436         if err != nil {
437                 return "", err
438         }
439         err = f.Close()
440         if err != nil {
441                 return "", err
442         }
443         mtxt, err := fs.MarshalManifest(".")
444         if err != nil {
445                 return "", err
446         }
447         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
448                 "collection": map[string]interface{}{
449                         "owner_uuid":    runner.ProjectUUID,
450                         "manifest_text": mtxt,
451                         "name":          cname,
452                 },
453         })
454         if err != nil {
455                 return "", err
456         }
457         log.Printf("stored lightning binary in new collection %s", coll.UUID)
458         return coll.UUID, nil
459 }
460
461 var arvadosClientFromEnv = arvados.NewClientFromEnv()
462
463 func open(fnm string) (io.ReadCloser, error) {
464         if os.Getenv("ARVADOS_API_HOST") == "" {
465                 return os.Open(fnm)
466         }
467         m := collectionInPathRe.FindStringSubmatch(fnm)
468         if m == nil {
469                 return os.Open(fnm)
470         }
471         uuid := m[2]
472         mnt := "/mnt/" + uuid + "/"
473         if !strings.HasPrefix(fnm, mnt) {
474                 return os.Open(fnm)
475         }
476
477         log.Infof("reading %q from %s using Arvados client library", fnm[len(mnt):], uuid)
478         ac, err := arvadosclient.New(arvadosClientFromEnv)
479         if err != nil {
480                 return nil, err
481         }
482         ac.Client = arvados.DefaultSecureClient
483         kc := keepclient.New(ac)
484         // Don't use keepclient's default short timeouts.
485         kc.HTTPClient = arvados.DefaultSecureClient
486         // Don't cache more than one block for this file.
487         kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 1}
488
489         var coll arvados.Collection
490         err = arvadosClientFromEnv.RequestAndDecode(&coll, "GET", "arvados/v1/collections/"+uuid, nil, arvados.GetOptions{Select: []string{"uuid", "manifest_text"}})
491         if err != nil {
492                 return nil, err
493         }
494         fs, err := coll.FileSystem(arvadosClientFromEnv, kc)
495         if err != nil {
496                 return nil, err
497         }
498         return fs.Open(fnm[len(mnt):])
499 }