Ask for preemptible instances.
[lightning.git] / arvados.go
1 package lightning
2
3 import (
4         "context"
5         "encoding/json"
6         "errors"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "net/url"
11         "os"
12         "regexp"
13         "runtime"
14         "strings"
15         "sync"
16         "time"
17
18         "git.arvados.org/arvados.git/lib/cmd"
19         "git.arvados.org/arvados.git/sdk/go/arvados"
20         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
21         "git.arvados.org/arvados.git/sdk/go/keepclient"
22         log "github.com/sirupsen/logrus"
23         "golang.org/x/crypto/blake2b"
24         "golang.org/x/net/websocket"
25 )
26
27 type eventMessage struct {
28         Status     int
29         ObjectUUID string `json:"object_uuid"`
30         EventType  string `json:"event_type"`
31         Properties struct {
32                 Text string
33         }
34 }
35
36 type arvadosClient struct {
37         *arvados.Client
38         notifying map[string]map[chan<- eventMessage]int
39         wantClose chan struct{}
40         wsconn    *websocket.Conn
41         mtx       sync.Mutex
42 }
43
44 // Listen for events concerning the given uuids. When an event occurs
45 // (and after connecting/reconnecting to the event stream), send each
46 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
47 // be sent only once for each update, but two Unsubscribe calls will
48 // be needed to stop sending them.
49 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
50         client.mtx.Lock()
51         defer client.mtx.Unlock()
52         if client.notifying == nil {
53                 client.notifying = map[string]map[chan<- eventMessage]int{}
54                 client.wantClose = make(chan struct{})
55                 go client.runNotifier()
56         }
57         chmap := client.notifying[uuid]
58         if chmap == nil {
59                 chmap = map[chan<- eventMessage]int{}
60                 client.notifying[uuid] = chmap
61         }
62         needSub := true
63         for _, nch := range chmap {
64                 if nch > 0 {
65                         needSub = false
66                         break
67                 }
68         }
69         chmap[ch]++
70         if needSub && client.wsconn != nil {
71                 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
72                         "method": "subscribe",
73                         "filters": [][]interface{}{
74                                 {"object_uuid", "=", uuid},
75                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
76                         },
77                 })
78         }
79 }
80
81 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
82         client.mtx.Lock()
83         defer client.mtx.Unlock()
84         chmap := client.notifying[uuid]
85         if n := chmap[ch] - 1; n == 0 {
86                 delete(chmap, ch)
87                 if len(chmap) == 0 {
88                         delete(client.notifying, uuid)
89                 }
90                 if client.wsconn != nil {
91                         go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
92                                 "method": "unsubscribe",
93                                 "filters": [][]interface{}{
94                                         {"object_uuid", "=", uuid},
95                                         {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
96                                 },
97                         })
98                 }
99         } else if n > 0 {
100                 chmap[ch] = n
101         }
102 }
103
104 func (client *arvadosClient) Close() {
105         client.mtx.Lock()
106         defer client.mtx.Unlock()
107         if client.notifying != nil {
108                 client.notifying = nil
109                 close(client.wantClose)
110         }
111 }
112
113 func (client *arvadosClient) runNotifier() {
114 reconnect:
115         for {
116                 var cluster arvados.Cluster
117                 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
118                 if err != nil {
119                         log.Warnf("error getting cluster config: %s", err)
120                         time.Sleep(5 * time.Second)
121                         continue reconnect
122                 }
123                 wsURL := cluster.Services.Websocket.ExternalURL
124                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
125                 wsURL.Path = "/websocket"
126                 wsURLNoToken := wsURL.String()
127                 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
128                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
129                 if err != nil {
130                         log.Warnf("websocket connection error: %s", err)
131                         time.Sleep(5 * time.Second)
132                         continue reconnect
133                 }
134                 log.Printf("connected to websocket at %s", wsURLNoToken)
135
136                 client.mtx.Lock()
137                 client.wsconn = conn
138                 resubscribe := make([]string, 0, len(client.notifying))
139                 for uuid := range client.notifying {
140                         resubscribe = append(resubscribe, uuid)
141                 }
142                 client.mtx.Unlock()
143
144                 go func() {
145                         w := json.NewEncoder(conn)
146                         for _, uuid := range resubscribe {
147                                 w.Encode(map[string]interface{}{
148                                         "method": "subscribe",
149                                         "filters": [][]interface{}{
150                                                 {"object_uuid", "=", uuid},
151                                                 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
152                                         },
153                                 })
154                         }
155                 }()
156
157                 r := json.NewDecoder(conn)
158                 for {
159                         var msg eventMessage
160                         err := r.Decode(&msg)
161                         select {
162                         case <-client.wantClose:
163                                 return
164                         default:
165                                 if err != nil {
166                                         log.Printf("error decoding websocket message: %s", err)
167                                         client.mtx.Lock()
168                                         client.wsconn = nil
169                                         client.mtx.Unlock()
170                                         go conn.Close()
171                                         continue reconnect
172                                 }
173                                 client.mtx.Lock()
174                                 for ch := range client.notifying[msg.ObjectUUID] {
175                                         ch <- msg
176                                 }
177                                 client.mtx.Unlock()
178                         }
179                 }
180         }
181 }
182
183 var refreshTicker = time.NewTicker(5 * time.Second)
184
185 type arvadosContainerRunner struct {
186         Client      *arvados.Client
187         Name        string
188         OutputName  string
189         ProjectUUID string
190         APIAccess   bool
191         VCPUs       int
192         RAM         int64
193         Prog        string // if empty, run /proc/self/exe
194         Args        []string
195         Mounts      map[string]map[string]interface{}
196         Priority    int
197         KeepCache   int // cache buffers per VCPU (0 for default)
198 }
199
200 func (runner *arvadosContainerRunner) Run() (string, error) {
201         return runner.RunContext(context.Background())
202 }
203
204 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
205         if runner.ProjectUUID == "" {
206                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
207         }
208
209         mounts := map[string]map[string]interface{}{
210                 "/mnt/output": {
211                         "kind":     "collection",
212                         "writable": true,
213                 },
214         }
215         for path, mnt := range runner.Mounts {
216                 mounts[path] = mnt
217         }
218
219         prog := runner.Prog
220         if prog == "" {
221                 prog = "/mnt/cmd/lightning"
222                 cmdUUID, err := runner.makeCommandCollection()
223                 if err != nil {
224                         return "", err
225                 }
226                 mounts["/mnt/cmd"] = map[string]interface{}{
227                         "kind": "collection",
228                         "uuid": cmdUUID,
229                 }
230         }
231         command := append([]string{prog}, runner.Args...)
232
233         priority := runner.Priority
234         if priority < 1 {
235                 priority = 500
236         }
237         keepCache := runner.KeepCache
238         if keepCache < 1 {
239                 keepCache = 2
240         }
241         rc := arvados.RuntimeConstraints{
242                 API:          &runner.APIAccess,
243                 VCPUs:        runner.VCPUs,
244                 RAM:          runner.RAM,
245                 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
246         }
247         outname := &runner.OutputName
248         if *outname == "" {
249                 outname = nil
250         }
251         var cr arvados.ContainerRequest
252         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
253                 "container_request": map[string]interface{}{
254                         "owner_uuid":          runner.ProjectUUID,
255                         "name":                runner.Name,
256                         "container_image":     "lightning-runtime",
257                         "command":             command,
258                         "mounts":              mounts,
259                         "use_existing":        true,
260                         "output_path":         "/mnt/output",
261                         "output_name":         outname,
262                         "runtime_constraints": rc,
263                         "priority":            runner.Priority,
264                         "state":               arvados.ContainerRequestStateCommitted,
265                         "scheduling_parameters": arvados.SchedulingParameters{
266                                 Preemptible: true,
267                                 Partitions:  []string{},
268                         },
269                 },
270         })
271         if err != nil {
272                 return "", err
273         }
274         log.Printf("container request UUID: %s", cr.UUID)
275         log.Printf("container UUID: %s", cr.ContainerUUID)
276
277         logch := make(chan eventMessage)
278         client := arvadosClient{Client: runner.Client}
279         defer client.Close()
280         subscribedUUID := ""
281         defer func() {
282                 if subscribedUUID != "" {
283                         client.Unsubscribe(logch, subscribedUUID)
284                 }
285         }()
286
287         neednewline := ""
288
289         lastState := cr.State
290         refreshCR := func() {
291                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
292                 if err != nil {
293                         fmt.Fprint(os.Stderr, neednewline)
294                         log.Printf("error getting container request: %s", err)
295                         return
296                 }
297                 if lastState != cr.State {
298                         fmt.Fprint(os.Stderr, neednewline)
299                         log.Printf("container request state: %s", cr.State)
300                         lastState = cr.State
301                 }
302                 if subscribedUUID != cr.ContainerUUID {
303                         fmt.Fprint(os.Stderr, neednewline)
304                         neednewline = ""
305                         if subscribedUUID != "" {
306                                 client.Unsubscribe(logch, subscribedUUID)
307                         }
308                         client.Subscribe(logch, cr.ContainerUUID)
309                         subscribedUUID = cr.ContainerUUID
310                 }
311         }
312
313         var reCrunchstat = regexp.MustCompile(`mem .* rss`)
314 waitctr:
315         for cr.State != arvados.ContainerRequestStateFinal {
316                 select {
317                 case <-ctx.Done():
318                         err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
319                                 "container_request": map[string]interface{}{
320                                         "priority": 0,
321                                 },
322                         })
323                         if err != nil {
324                                 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
325                         }
326                         break waitctr
327                 case <-refreshTicker.C:
328                         refreshCR()
329                 case msg := <-logch:
330                         switch msg.EventType {
331                         case "update":
332                                 refreshCR()
333                         case "stderr":
334                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
335                                         if line != "" {
336                                                 fmt.Fprint(os.Stderr, neednewline)
337                                                 neednewline = ""
338                                                 log.Print(line)
339                                         }
340                                 }
341                         case "crunchstat":
342                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
343                                         mem := reCrunchstat.FindString(line)
344                                         if mem != "" {
345                                                 fmt.Fprintf(os.Stderr, "%s               \r", mem)
346                                                 neednewline = "\n"
347                                         }
348                                 }
349                         }
350                 }
351         }
352         fmt.Fprint(os.Stderr, neednewline)
353
354         if err := ctx.Err(); err != nil {
355                 return "", err
356         }
357
358         var c arvados.Container
359         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
360         if err != nil {
361                 return "", err
362         } else if c.State != arvados.ContainerStateComplete {
363                 return "", fmt.Errorf("container did not complete: %s", c.State)
364         } else if c.ExitCode != 0 {
365                 return "", fmt.Errorf("container exited %d", c.ExitCode)
366         }
367         return cr.OutputUUID, err
368 }
369
370 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
371
372 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
373         if runner.Mounts == nil {
374                 runner.Mounts = make(map[string]map[string]interface{})
375         }
376         for _, path := range paths {
377                 if *path == "" || *path == "-" {
378                         continue
379                 }
380                 m := collectionInPathRe.FindStringSubmatch(*path)
381                 if m == nil {
382                         return fmt.Errorf("cannot find uuid in path: %q", *path)
383                 }
384                 uuid := m[2]
385                 mnt, ok := runner.Mounts["/mnt/"+uuid]
386                 if !ok {
387                         mnt = map[string]interface{}{
388                                 "kind": "collection",
389                                 "uuid": uuid,
390                         }
391                         runner.Mounts["/mnt/"+uuid] = mnt
392                 }
393                 *path = "/mnt/" + uuid + m[3]
394         }
395         return nil
396 }
397
398 var mtxMakeCommandCollection sync.Mutex
399
400 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
401         mtxMakeCommandCollection.Lock()
402         defer mtxMakeCommandCollection.Unlock()
403         exe, err := ioutil.ReadFile("/proc/self/exe")
404         if err != nil {
405                 return "", err
406         }
407         b2 := blake2b.Sum256(exe)
408         cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
409         var existing arvados.CollectionList
410         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
411                 Limit: 1,
412                 Count: "none",
413                 Filters: []arvados.Filter{
414                         {Attr: "name", Operator: "=", Operand: cname},
415                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
416                         {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
417                 },
418         })
419         if err != nil {
420                 return "", err
421         }
422         if len(existing.Items) > 0 {
423                 coll := existing.Items[0]
424                 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
425                 return coll.UUID, nil
426         }
427         log.Printf("writing lightning binary to new collection %q", cname)
428         ac, err := arvadosclient.New(runner.Client)
429         if err != nil {
430                 return "", err
431         }
432         kc := keepclient.New(ac)
433         var coll arvados.Collection
434         fs, err := coll.FileSystem(runner.Client, kc)
435         if err != nil {
436                 return "", err
437         }
438         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
439         if err != nil {
440                 return "", err
441         }
442         _, err = f.Write(exe)
443         if err != nil {
444                 return "", err
445         }
446         err = f.Close()
447         if err != nil {
448                 return "", err
449         }
450         mtxt, err := fs.MarshalManifest(".")
451         if err != nil {
452                 return "", err
453         }
454         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
455                 "collection": map[string]interface{}{
456                         "owner_uuid":    runner.ProjectUUID,
457                         "manifest_text": mtxt,
458                         "name":          cname,
459                         "properties": map[string]interface{}{
460                                 "blake2b": fmt.Sprintf("%x", b2),
461                         },
462                 },
463         })
464         if err != nil {
465                 return "", err
466         }
467         log.Printf("stored lightning binary in new collection %s", coll.UUID)
468         return coll.UUID, nil
469 }
470
471 var (
472         arvadosClientFromEnv = arvados.NewClientFromEnv()
473         siteFS               arvados.CustomFileSystem
474         siteFSMtx            sync.Mutex
475 )
476
477 func open(fnm string) (io.ReadCloser, error) {
478         if os.Getenv("ARVADOS_API_HOST") == "" {
479                 return os.Open(fnm)
480         }
481         m := collectionInPathRe.FindStringSubmatch(fnm)
482         if m == nil {
483                 return os.Open(fnm)
484         }
485         uuid := m[2]
486         mnt := "/mnt/" + uuid + "/"
487         if !strings.HasPrefix(fnm, mnt) {
488                 return os.Open(fnm)
489         }
490
491         siteFSMtx.Lock()
492         defer siteFSMtx.Unlock()
493         if siteFS == nil {
494                 log.Info("setting up Arvados client")
495                 ac, err := arvadosclient.New(arvadosClientFromEnv)
496                 if err != nil {
497                         return nil, err
498                 }
499                 ac.Client = arvados.DefaultSecureClient
500                 kc := keepclient.New(ac)
501                 // Don't use keepclient's default short timeouts.
502                 kc.HTTPClient = arvados.DefaultSecureClient
503                 // Guess max concurrent readers, hope to avoid cache
504                 // thrashing.
505                 kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3}
506                 siteFS = arvadosClientFromEnv.SiteFileSystem(kc)
507         }
508
509         log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid)
510         return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):])
511 }