5f78a36408bdbbb8e2aba29b4571fbfd87749c85
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "fmt"
8         "io/ioutil"
9         "net/url"
10         "os"
11         "regexp"
12         "strings"
13         "sync"
14         "time"
15
16         "git.arvados.org/arvados.git/sdk/go/arvados"
17         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
18         "git.arvados.org/arvados.git/sdk/go/keepclient"
19         log "github.com/sirupsen/logrus"
20         "golang.org/x/crypto/blake2b"
21         "golang.org/x/net/websocket"
22 )
23
24 type eventMessage struct {
25         Status     int
26         ObjectUUID string `json:"object_uuid"`
27         EventType  string `json:"event_type"`
28         Properties struct {
29                 Text string
30         }
31 }
32
33 type arvadosClient struct {
34         *arvados.Client
35         notifying map[string]map[chan<- eventMessage]int
36         wantClose chan struct{}
37         wsconn    *websocket.Conn
38         mtx       sync.Mutex
39 }
40
41 // Listen for events concerning the given uuids. When an event occurs
42 // (and after connecting/reconnecting to the event stream), send each
43 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
44 // be sent only once for each update, but two Unsubscribe calls will
45 // be needed to stop sending them.
46 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
47         client.mtx.Lock()
48         defer client.mtx.Unlock()
49         if client.notifying == nil {
50                 client.notifying = map[string]map[chan<- eventMessage]int{}
51                 client.wantClose = make(chan struct{})
52                 go client.runNotifier()
53         }
54         chmap := client.notifying[uuid]
55         if chmap == nil {
56                 chmap = map[chan<- eventMessage]int{}
57                 client.notifying[uuid] = chmap
58         }
59         needSub := true
60         for _, nch := range chmap {
61                 if nch > 0 {
62                         needSub = false
63                         break
64                 }
65         }
66         chmap[ch]++
67         if needSub && client.wsconn != nil {
68                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
69                         "method": "subscribe",
70                         "filters": [][]interface{}{
71                                 {"object_uuid", "=", uuid},
72                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
73                         },
74                 })
75         }
76 }
77
78 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
79         client.mtx.Lock()
80         defer client.mtx.Unlock()
81         chmap := client.notifying[uuid]
82         if n := chmap[ch] - 1; n == 0 {
83                 delete(chmap, ch)
84                 if len(chmap) == 0 {
85                         delete(client.notifying, uuid)
86                 }
87                 if client.wsconn != nil {
88                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
89                                 "method": "unsubscribe",
90                                 "filters": [][]interface{}{
91                                         {"object_uuid", "=", uuid},
92                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
93                                 },
94                         })
95                 }
96         } else if n > 0 {
97                 chmap[ch] = n
98         }
99 }
100
101 func (client *arvadosClient) Close() {
102         client.mtx.Lock()
103         defer client.mtx.Unlock()
104         if client.notifying != nil {
105                 client.notifying = nil
106                 close(client.wantClose)
107         }
108 }
109
110 func (client *arvadosClient) runNotifier() {
111 reconnect:
112         for {
113                 var cluster arvados.Cluster
114                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
115                 if err != nil {
116                         log.Warnf("error getting cluster config: %s", err)
117                         time.Sleep(5 * time.Second)
118                         continue reconnect
119                 }
120                 wsURL := cluster.Services.Websocket.ExternalURL
121                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
122                 wsURL.Path = "/websocket"
123                 wsURLNoToken := wsURL.String()
124                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
125                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
126                 if err != nil {
127                         log.Warnf("websocket connection error: %s", err)
128                         time.Sleep(5 * time.Second)
129                         continue reconnect
130                 }
131                 log.Printf("connected to websocket at %s", wsURLNoToken)
132
133                 client.mtx.Lock()
134                 client.wsconn = conn
135                 resubscribe := make([]string, 0, len(client.notifying))
136                 for uuid := range client.notifying {
137                         resubscribe = append(resubscribe, uuid)
138                 }
139                 client.mtx.Unlock()
140
141                 go func() {
142                         w := json.NewEncoder(conn)
143                         for _, uuid := range resubscribe {
144                                 w.Encode(map[string]interface{}{
145                                         "method": "subscribe",
146                                         "filters": [][]interface{}{
147                                                 {"object_uuid", "=", uuid},
148                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
149                                         },
150                                 })
151                         }
152                 }()
153
154                 r := json.NewDecoder(conn)
155                 for {
156                         var msg eventMessage
157                         err := r.Decode(&msg)
158                         select {
159                         case <-client.wantClose:
160                                 return
161                         default:
162                                 if err != nil {
163                                         log.Printf("error decoding websocket message: %s", err)
164                                         client.mtx.Lock()
165                                         client.wsconn = nil
166                                         client.mtx.Unlock()
167                                         go conn.Close()
168                                         continue reconnect
169                                 }
170                                 client.mtx.Lock()
171                                 for ch := range client.notifying[msg.ObjectUUID] {
172                                         ch <- msg
173                                 }
174                                 client.mtx.Unlock()
175                         }
176                 }
177         }
178 }
179
180 var refreshTicker = time.NewTicker(5 * time.Second)
181
182 type arvadosContainerRunner struct {
183         Client      *arvados.Client
184         Name        string
185         OutputName  string
186         ProjectUUID string
187         VCPUs       int
188         RAM         int64
189         Prog        string // if empty, run /proc/self/exe
190         Args        []string
191         Mounts      map[string]map[string]interface{}
192         Priority    int
193 }
194
195 func (runner *arvadosContainerRunner) Run() (string, error) {
196         return runner.RunContext(context.Background())
197 }
198
199 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
200         if runner.ProjectUUID == "" {
201                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
202         }
203
204         mounts := map[string]map[string]interface{}{
205                 "/mnt/output": {
206                         "kind":     "collection",
207                         "writable": true,
208                 },
209         }
210         for path, mnt := range runner.Mounts {
211                 mounts[path] = mnt
212         }
213
214         prog := runner.Prog
215         if prog == "" {
216                 prog = "/mnt/cmd/lightning"
217                 cmdUUID, err := runner.makeCommandCollection()
218                 if err != nil {
219                         return "", err
220                 }
221                 mounts["/mnt/cmd"] = map[string]interface{}{
222                         "kind": "collection",
223                         "uuid": cmdUUID,
224                 }
225         }
226         command := append([]string{prog}, runner.Args...)
227
228         priority := runner.Priority
229         if priority < 1 {
230                 priority = 500
231         }
232         rc := arvados.RuntimeConstraints{
233                 VCPUs:        runner.VCPUs,
234                 RAM:          runner.RAM,
235                 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
236         }
237         outname := &runner.OutputName
238         if *outname == "" {
239                 outname = nil
240         }
241         var cr arvados.ContainerRequest
242         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
243                 "container_request": map[string]interface{}{
244                         "owner_uuid":          runner.ProjectUUID,
245                         "name":                runner.Name,
246                         "container_image":     "lightning-runtime",
247                         "command":             command,
248                         "mounts":              mounts,
249                         "use_existing":        true,
250                         "output_path":         "/mnt/output",
251                         "output_name":         outname,
252                         "runtime_constraints": rc,
253                         "priority":            runner.Priority,
254                         "state":               arvados.ContainerRequestStateCommitted,
255                 },
256         })
257         if err != nil {
258                 return "", err
259         }
260         log.Printf("container request UUID: %s", cr.UUID)
261         log.Printf("container UUID: %s", cr.ContainerUUID)
262
263         logch := make(chan eventMessage)
264         client := arvadosClient{Client: runner.Client}
265         defer client.Close()
266         subscribedUUID := ""
267         defer func() {
268                 if subscribedUUID != "" {
269                         client.Unsubscribe(logch, subscribedUUID)
270                 }
271         }()
272
273         neednewline := ""
274
275         lastState := cr.State
276         refreshCR := func() {
277                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
278                 if err != nil {
279                         fmt.Fprint(os.Stderr, neednewline)
280                         log.Printf("error getting container request: %s", err)
281                         return
282                 }
283                 if lastState != cr.State {
284                         fmt.Fprint(os.Stderr, neednewline)
285                         log.Printf("container request state: %s", cr.State)
286                         lastState = cr.State
287                 }
288                 if subscribedUUID != cr.ContainerUUID {
289                         fmt.Fprint(os.Stderr, neednewline)
290                         neednewline = ""
291                         if subscribedUUID != "" {
292                                 client.Unsubscribe(logch, subscribedUUID)
293                         }
294                         client.Subscribe(logch, cr.ContainerUUID)
295                         subscribedUUID = cr.ContainerUUID
296                 }
297         }
298
299         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
300 waitctr:
301         for cr.State != arvados.ContainerRequestStateFinal {
302                 select {
303                 case <-ctx.Done():
304                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
305                                 "container_request": map[string]interface{}{
306                                         "priority": 0,
307                                 },
308                         })
309                         if err != nil {
310                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
311                         }
312                         break waitctr
313                 case <-refreshTicker.C:
314                         refreshCR()
315                 case msg := <-logch:
316                         switch msg.EventType {
317                         case "update":
318                                 refreshCR()
319                         case "stderr":
320                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
321                                         if line != "" {
322                                                 fmt.Fprint(os.Stderr, neednewline)
323                                                 neednewline = ""
324                                                 log.Print(line)
325                                         }
326                                 }
327                         case "crunchstat":
328                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
329                                         mem := reCrunchstat.FindString(line)
330                                         if mem != "" {
331                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
332                                                 neednewline = "\n"
333                                         }
334                                 }
335                         }
336                 }
337         }
338         fmt.Fprint(os.Stderr, neednewline)
339
340         if err := ctx.Err(); err != nil {
341                 return "", err
342         }
343
344         var c arvados.Container
345         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
346         if err != nil {
347                 return "", err
348         } else if c.State != arvados.ContainerStateComplete {
349                 return "", fmt.Errorf("container did not complete: %s", c.State)
350         } else if c.ExitCode != 0 {
351                 return "", fmt.Errorf("container exited %d", c.ExitCode)
352         }
353         return cr.OutputUUID, err
354 }
355
356 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
357
358 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
359         if runner.Mounts == nil {
360                 runner.Mounts = make(map[string]map[string]interface{})
361         }
362         for _, path := range paths {
363                 if *path == "" || *path == "-" {
364                         continue
365                 }
366                 m := collectionInPathRe.FindStringSubmatch(*path)
367                 if m == nil {
368                         return fmt.Errorf("cannot find uuid in path: %q", *path)
369                 }
370                 uuid := m[2]
371                 mnt, ok := runner.Mounts["/mnt/"+uuid]
372                 if !ok {
373                         mnt = map[string]interface{}{
374                                 "kind": "collection",
375                                 "uuid": uuid,
376                         }
377                         runner.Mounts["/mnt/"+uuid] = mnt
378                 }
379                 *path = "/mnt/" + uuid + m[3]
380         }
381         return nil
382 }
383
384 var mtxMakeCommandCollection sync.Mutex
385
386 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
387         mtxMakeCommandCollection.Lock()
388         defer mtxMakeCommandCollection.Unlock()
389         exe, err := ioutil.ReadFile("/proc/self/exe")
390         if err != nil {
391                 return "", err
392         }
393         b2 := blake2b.Sum256(exe)
394         cname := fmt.Sprintf("lightning-%x", b2)
395         var existing arvados.CollectionList
396         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
397                 Limit: 1,
398                 Count: "none",
399                 Filters: []arvados.Filter{
400                         {Attr: "name", Operator: "=", Operand: cname},
401                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
402                 },
403         })
404         if err != nil {
405                 return "", err
406         }
407         if len(existing.Items) > 0 {
408                 uuid := existing.Items[0].UUID
409                 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
410                 return uuid, nil
411         }
412         log.Printf("writing lightning binary to new collection %q", cname)
413         ac, err := arvadosclient.New(runner.Client)
414         if err != nil {
415                 return "", err
416         }
417         kc := keepclient.New(ac)
418         var coll arvados.Collection
419         fs, err := coll.FileSystem(runner.Client, kc)
420         if err != nil {
421                 return "", err
422         }
423         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
424         if err != nil {
425                 return "", err
426         }
427         _, err = f.Write(exe)
428         if err != nil {
429                 return "", err
430         }
431         err = f.Close()
432         if err != nil {
433                 return "", err
434         }
435         mtxt, err := fs.MarshalManifest(".")
436         if err != nil {
437                 return "", err
438         }
439         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
440                 "collection": map[string]interface{}{
441                         "owner_uuid":    runner.ProjectUUID,
442                         "manifest_text": mtxt,
443                         "name":          cname,
444                 },
445         })
446         if err != nil {
447                 return "", err
448         }
449         log.Printf("stored lightning binary in new collection %s", coll.UUID)
450         return coll.UUID, nil
451 }