Add CalledBases to stats.
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "fmt"
7         "io/ioutil"
8         "net/url"
9         "os"
10         "regexp"
11         "strings"
12         "sync"
13         "time"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
17         "git.arvados.org/arvados.git/sdk/go/keepclient"
18         log "github.com/sirupsen/logrus"
19         "golang.org/x/crypto/blake2b"
20         "golang.org/x/net/websocket"
21 )
22
23 type eventMessage struct {
24         Status     int
25         ObjectUUID string `json:"object_uuid"`
26         EventType  string `json:"event_type"`
27         Properties struct {
28                 Text string
29         }
30 }
31
32 type arvadosClient struct {
33         *arvados.Client
34         notifying map[string]map[chan<- eventMessage]int
35         wantClose chan struct{}
36         wsconn    *websocket.Conn
37         mtx       sync.Mutex
38 }
39
40 // Listen for events concerning the given uuids. When an event occurs
41 // (and after connecting/reconnecting to the event stream), send each
42 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
43 // be sent only once for each update, but two Unsubscribe calls will
44 // be needed to stop sending them.
45 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
46         client.mtx.Lock()
47         defer client.mtx.Unlock()
48         if client.notifying == nil {
49                 client.notifying = map[string]map[chan<- eventMessage]int{}
50                 client.wantClose = make(chan struct{})
51                 go client.runNotifier()
52         }
53         chmap := client.notifying[uuid]
54         if chmap == nil {
55                 chmap = map[chan<- eventMessage]int{}
56                 client.notifying[uuid] = chmap
57         }
58         needSub := true
59         for _, nch := range chmap {
60                 if nch > 0 {
61                         needSub = false
62                         break
63                 }
64         }
65         chmap[ch]++
66         if needSub && client.wsconn != nil {
67                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
68                         "method": "subscribe",
69                         "filters": [][]interface{}{
70                                 {"object_uuid", "=", uuid},
71                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
72                         },
73                 })
74         }
75 }
76
77 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
78         client.mtx.Lock()
79         defer client.mtx.Unlock()
80         chmap := client.notifying[uuid]
81         if n := chmap[ch] - 1; n == 0 {
82                 delete(chmap, ch)
83                 if len(chmap) == 0 {
84                         delete(client.notifying, uuid)
85                 }
86                 if client.wsconn != nil {
87                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
88                                 "method": "unsubscribe",
89                                 "filters": [][]interface{}{
90                                         {"object_uuid", "=", uuid},
91                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
92                                 },
93                         })
94                 }
95         } else if n > 0 {
96                 chmap[ch] = n
97         }
98 }
99
100 func (client *arvadosClient) Close() {
101         client.mtx.Lock()
102         defer client.mtx.Unlock()
103         if client.notifying != nil {
104                 client.notifying = nil
105                 close(client.wantClose)
106         }
107 }
108
109 func (client *arvadosClient) runNotifier() {
110 reconnect:
111         for {
112                 var cluster arvados.Cluster
113                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
114                 if err != nil {
115                         log.Warnf("error getting cluster config: %s", err)
116                         time.Sleep(5 * time.Second)
117                         continue reconnect
118                 }
119                 wsURL := cluster.Services.Websocket.ExternalURL
120                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
121                 wsURL.Path = "/websocket"
122                 wsURLNoToken := wsURL.String()
123                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
124                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
125                 if err != nil {
126                         log.Warnf("websocket connection error: %s", err)
127                         time.Sleep(5 * time.Second)
128                         continue reconnect
129                 }
130                 log.Printf("connected to websocket at %s", wsURLNoToken)
131
132                 client.mtx.Lock()
133                 client.wsconn = conn
134                 resubscribe := make([]string, 0, len(client.notifying))
135                 for uuid := range client.notifying {
136                         resubscribe = append(resubscribe, uuid)
137                 }
138                 client.mtx.Unlock()
139
140                 go func() {
141                         w := json.NewEncoder(conn)
142                         for _, uuid := range resubscribe {
143                                 w.Encode(map[string]interface{}{
144                                         "method": "subscribe",
145                                         "filters": [][]interface{}{
146                                                 {"object_uuid", "=", uuid},
147                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
148                                         },
149                                 })
150                         }
151                 }()
152
153                 r := json.NewDecoder(conn)
154                 for {
155                         var msg eventMessage
156                         err := r.Decode(&msg)
157                         select {
158                         case <-client.wantClose:
159                                 return
160                         default:
161                                 if err != nil {
162                                         log.Printf("error decoding websocket message: %s", err)
163                                         client.mtx.Lock()
164                                         client.wsconn = nil
165                                         client.mtx.Unlock()
166                                         go conn.Close()
167                                         continue reconnect
168                                 }
169                                 client.mtx.Lock()
170                                 for ch := range client.notifying[msg.ObjectUUID] {
171                                         ch <- msg
172                                 }
173                                 client.mtx.Unlock()
174                         }
175                 }
176         }
177 }
178
179 type arvadosContainerRunner struct {
180         Client      *arvados.Client
181         Name        string
182         ProjectUUID string
183         VCPUs       int
184         RAM         int64
185         Prog        string // if empty, run /proc/self/exe
186         Args        []string
187         Mounts      map[string]map[string]interface{}
188         Priority    int
189 }
190
191 func (runner *arvadosContainerRunner) Run() (string, error) {
192         if runner.ProjectUUID == "" {
193                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
194         }
195
196         mounts := map[string]map[string]interface{}{
197                 "/mnt/output": {
198                         "kind":     "collection",
199                         "writable": true,
200                 },
201         }
202         for path, mnt := range runner.Mounts {
203                 mounts[path] = mnt
204         }
205
206         prog := runner.Prog
207         if prog == "" {
208                 prog = "/mnt/cmd/lightning"
209                 cmdUUID, err := runner.makeCommandCollection()
210                 if err != nil {
211                         return "", err
212                 }
213                 mounts["/mnt/cmd"] = map[string]interface{}{
214                         "kind": "collection",
215                         "uuid": cmdUUID,
216                 }
217         }
218         command := append([]string{prog}, runner.Args...)
219
220         priority := runner.Priority
221         if priority < 1 {
222                 priority = 500
223         }
224         rc := arvados.RuntimeConstraints{
225                 VCPUs:        runner.VCPUs,
226                 RAM:          runner.RAM,
227                 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
228         }
229         var cr arvados.ContainerRequest
230         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
231                 "container_request": map[string]interface{}{
232                         "owner_uuid":          runner.ProjectUUID,
233                         "name":                runner.Name,
234                         "container_image":     "lightning-runtime",
235                         "command":             command,
236                         "mounts":              mounts,
237                         "use_existing":        true,
238                         "output_path":         "/mnt/output",
239                         "runtime_constraints": rc,
240                         "priority":            runner.Priority,
241                         "state":               arvados.ContainerRequestStateCommitted,
242                 },
243         })
244         if err != nil {
245                 return "", err
246         }
247         log.Printf("container request UUID: %s", cr.UUID)
248         log.Printf("container UUID: %s", cr.ContainerUUID)
249
250         logch := make(chan eventMessage)
251         client := arvadosClient{Client: runner.Client}
252         defer client.Close()
253         subscribedUUID := ""
254         defer func() {
255                 if subscribedUUID != "" {
256                         client.Unsubscribe(logch, subscribedUUID)
257                 }
258         }()
259
260         ticker := time.NewTicker(5 * time.Second)
261         defer ticker.Stop()
262
263         neednewline := ""
264
265         lastState := cr.State
266         refreshCR := func() {
267                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
268                 if err != nil {
269                         fmt.Fprint(os.Stderr, neednewline)
270                         log.Printf("error getting container request: %s", err)
271                         return
272                 }
273                 if lastState != cr.State {
274                         fmt.Fprint(os.Stderr, neednewline)
275                         log.Printf("container request state: %s", cr.State)
276                         lastState = cr.State
277                 }
278                 if subscribedUUID != cr.ContainerUUID {
279                         fmt.Fprint(os.Stderr, neednewline)
280                         neednewline = ""
281                         if subscribedUUID != "" {
282                                 client.Unsubscribe(logch, subscribedUUID)
283                         }
284                         client.Subscribe(logch, cr.ContainerUUID)
285                         subscribedUUID = cr.ContainerUUID
286                 }
287         }
288
289         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
290         for cr.State != arvados.ContainerRequestStateFinal {
291                 select {
292                 case <-ticker.C:
293                         refreshCR()
294                 case msg := <-logch:
295                         switch msg.EventType {
296                         case "update":
297                                 refreshCR()
298                         case "stderr":
299                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
300                                         if line != "" {
301                                                 fmt.Fprint(os.Stderr, neednewline)
302                                                 neednewline = ""
303                                                 log.Print(line)
304                                         }
305                                 }
306                         case "crunchstat":
307                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
308                                         mem := reCrunchstat.FindString(line)
309                                         if mem != "" {
310                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
311                                                 neednewline = "\n"
312                                         }
313                                 }
314                         }
315                 }
316         }
317         fmt.Fprint(os.Stderr, neednewline)
318
319         var c arvados.Container
320         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
321         if err != nil {
322                 return "", err
323         } else if c.State != arvados.ContainerStateComplete {
324                 return "", fmt.Errorf("container did not complete: %s", c.State)
325         } else if c.ExitCode != 0 {
326                 return "", fmt.Errorf("container exited %d", c.ExitCode)
327         }
328         return cr.OutputUUID, err
329 }
330
331 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
332
333 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
334         if runner.Mounts == nil {
335                 runner.Mounts = make(map[string]map[string]interface{})
336         }
337         for _, path := range paths {
338                 if *path == "" || *path == "-" {
339                         continue
340                 }
341                 m := collectionInPathRe.FindStringSubmatch(*path)
342                 if m == nil {
343                         return fmt.Errorf("cannot find uuid in path: %q", *path)
344                 }
345                 uuid := m[2]
346                 mnt, ok := runner.Mounts["/mnt/"+uuid]
347                 if !ok {
348                         mnt = map[string]interface{}{
349                                 "kind": "collection",
350                                 "uuid": uuid,
351                         }
352                         runner.Mounts["/mnt/"+uuid] = mnt
353                 }
354                 *path = "/mnt/" + uuid + m[3]
355         }
356         return nil
357 }
358
359 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
360         exe, err := ioutil.ReadFile("/proc/self/exe")
361         if err != nil {
362                 return "", err
363         }
364         b2 := blake2b.Sum256(exe)
365         cname := fmt.Sprintf("lightning-%x", b2)
366         var existing arvados.CollectionList
367         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
368                 Limit: 1,
369                 Count: "none",
370                 Filters: []arvados.Filter{
371                         {Attr: "name", Operator: "=", Operand: cname},
372                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
373                 },
374         })
375         if err != nil {
376                 return "", err
377         }
378         if len(existing.Items) > 0 {
379                 uuid := existing.Items[0].UUID
380                 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
381                 return uuid, nil
382         }
383         log.Printf("writing lightning binary to new collection %q", cname)
384         ac, err := arvadosclient.New(runner.Client)
385         if err != nil {
386                 return "", err
387         }
388         kc := keepclient.New(ac)
389         var coll arvados.Collection
390         fs, err := coll.FileSystem(runner.Client, kc)
391         if err != nil {
392                 return "", err
393         }
394         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
395         if err != nil {
396                 return "", err
397         }
398         _, err = f.Write(exe)
399         if err != nil {
400                 return "", err
401         }
402         err = f.Close()
403         if err != nil {
404                 return "", err
405         }
406         mtxt, err := fs.MarshalManifest(".")
407         if err != nil {
408                 return "", err
409         }
410         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
411                 "collection": map[string]interface{}{
412                         "owner_uuid":    runner.ProjectUUID,
413                         "manifest_text": mtxt,
414                         "name":          cname,
415                 },
416         })
417         if err != nil {
418                 return "", err
419         }
420         log.Printf("stored lightning binary in new collection %s", coll.UUID)
421         return coll.UUID, nil
422 }