c1b54d98236661136a5112672950c17ed6605807
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "errors"
5         "fmt"
6         "io/ioutil"
7         "log"
8         "os"
9         "regexp"
10
11         "git.arvados.org/arvados.git/sdk/go/arvados"
12         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
13         "git.arvados.org/arvados.git/sdk/go/keepclient"
14         "golang.org/x/crypto/blake2b"
15 )
16
17 type arvadosContainerRunner struct {
18         Client      *arvados.Client
19         Name        string
20         ProjectUUID string
21         VCPUs       int
22         RAM         int64
23         Prog        string // if empty, run /proc/self/exe
24         Args        []string
25         Mounts      map[string]map[string]interface{}
26 }
27
28 func (runner *arvadosContainerRunner) Run() error {
29         if runner.ProjectUUID == "" {
30                 return errors.New("cannot run arvados container: ProjectUUID not provided")
31         }
32
33         mounts := map[string]map[string]interface{}{
34                 "/mnt/output": {
35                         "kind":     "tmp",
36                         "writable": true,
37                         "capacity": 100000000000,
38                 },
39         }
40         for path, mnt := range runner.Mounts {
41                 mounts[path] = mnt
42         }
43
44         prog := runner.Prog
45         if prog == "" {
46                 prog = "/mnt/cmd/lightning"
47                 cmdUUID, err := runner.makeCommandCollection()
48                 if err != nil {
49                         return err
50                 }
51                 mounts["/mnt/cmd"] = map[string]interface{}{
52                         "kind": "collection",
53                         "uuid": cmdUUID,
54                 }
55         }
56         command := append([]string{prog}, runner.Args...)
57
58         rc := arvados.RuntimeConstraints{
59                 VCPUs:        runner.VCPUs,
60                 RAM:          runner.RAM,
61                 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
62         }
63         var cr arvados.ContainerRequest
64         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
65                 "container_request": map[string]interface{}{
66                         "owner_uuid":          runner.ProjectUUID,
67                         "name":                runner.Name,
68                         "container_image":     "lightning-runtime",
69                         "command":             command,
70                         "mounts":              mounts,
71                         "use_existing":        true,
72                         "output_path":         "/mnt/output",
73                         "runtime_constraints": rc,
74                         "priority":            1,
75                         "state":               arvados.ContainerRequestStateCommitted,
76                 },
77         })
78         log.Print(cr.UUID)
79         return err
80 }
81
82 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
83
84 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
85         if runner.Mounts == nil {
86                 runner.Mounts = make(map[string]map[string]interface{})
87         }
88         for _, path := range paths {
89                 if *path == "" {
90                         continue
91                 }
92                 m := collectionInPathRe.FindStringSubmatch(*path)
93                 if m == nil {
94                         return fmt.Errorf("cannot find uuid in path: %q", *path)
95                 }
96                 uuid := m[2]
97                 mnt, ok := runner.Mounts["/mnt/"+uuid]
98                 if !ok {
99                         mnt = map[string]interface{}{
100                                 "kind": "collection",
101                                 "uuid": uuid,
102                         }
103                         runner.Mounts["/mnt/"+uuid] = mnt
104                 }
105                 *path = "/mnt/" + uuid + m[3]
106         }
107         return nil
108 }
109
110 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
111         exe, err := ioutil.ReadFile("/proc/self/exe")
112         if err != nil {
113                 return "", err
114         }
115         b2 := blake2b.Sum256(exe)
116         cname := fmt.Sprintf("lightning-%x", b2)
117         var existing arvados.CollectionList
118         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
119                 Limit: 1,
120                 Count: "none",
121                 Filters: []arvados.Filter{
122                         {Attr: "name", Operator: "=", Operand: cname},
123                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
124                 },
125         })
126         if err != nil {
127                 return "", err
128         }
129         if len(existing.Items) > 0 {
130                 uuid := existing.Items[0].UUID
131                 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
132                 return uuid, nil
133         }
134         log.Printf("writing lightning binary to new collection %q", cname)
135         ac, err := arvadosclient.New(runner.Client)
136         if err != nil {
137                 return "", err
138         }
139         kc := keepclient.New(ac)
140         var coll arvados.Collection
141         fs, err := coll.FileSystem(runner.Client, kc)
142         if err != nil {
143                 return "", err
144         }
145         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
146         if err != nil {
147                 return "", err
148         }
149         _, err = f.Write(exe)
150         if err != nil {
151                 return "", err
152         }
153         err = f.Close()
154         if err != nil {
155                 return "", err
156         }
157         mtxt, err := fs.MarshalManifest(".")
158         if err != nil {
159                 return "", err
160         }
161         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
162                 "collection": map[string]interface{}{
163                         "owner_uuid":    runner.ProjectUUID,
164                         "manifest_text": mtxt,
165                         "name":          cname,
166                 },
167         })
168         if err != nil {
169                 return "", err
170         }
171         log.Printf("stored lightning binary in new collection %s", coll.UUID)
172         return coll.UUID, nil
173 }