f121542117f3e7b64e856a4a371c0f3a26246388
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "errors"
5         "fmt"
6         "io/ioutil"
7         "log"
8         "os"
9         "regexp"
10
11         "git.arvados.org/arvados.git/sdk/go/arvados"
12         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
13         "git.arvados.org/arvados.git/sdk/go/keepclient"
14         "golang.org/x/crypto/blake2b"
15 )
16
17 type arvadosContainerRunner struct {
18         Client      *arvados.Client
19         Name        string
20         ProjectUUID string
21         Args        []string
22         Mounts      map[string]string
23 }
24
25 var (
26         collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
27 )
28
29 func (runner *arvadosContainerRunner) Run() error {
30         if runner.ProjectUUID == "" {
31                 return errors.New("cannot run arvados container: ProjectUUID not provided")
32         }
33         prog := "/mnt/cmd/lightning"
34         cmdUUID, err := runner.makeCommandCollection()
35         if err != nil {
36                 return err
37         }
38         command := append([]string{prog}, runner.Args...)
39         mounts := map[string]map[string]interface{}{
40                 "/mnt/cmd": {
41                         "kind": "collection",
42                         "uuid": cmdUUID,
43                 },
44                 "/mnt/output": {
45                         "kind":     "tmp",
46                         "writable": true,
47                         "capacity": 100000000000,
48                 },
49         }
50         for uuid, mnt := range runner.Mounts {
51                 mounts[mnt] = map[string]interface{}{
52                         "kind": "collection",
53                         "uuid": uuid,
54                 }
55         }
56         cpus := 16
57         rc := arvados.RuntimeConstraints{
58                 VCPUs:        cpus,
59                 RAM:          64000000000,
60                 KeepCacheRAM: (1 << 26) * 2 * int64(cpus),
61         }
62         var cr arvados.ContainerRequest
63         err = runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
64                 "container_request": map[string]interface{}{
65                         "owner_uuid":          runner.ProjectUUID,
66                         "name":                runner.Name,
67                         "container_image":     "lightning-runtime",
68                         "command":             command,
69                         "mounts":              mounts,
70                         "use_existing":        true,
71                         "output_path":         "/mnt/output",
72                         "runtime_constraints": rc,
73                         "priority":            1,
74                         "state":               arvados.ContainerRequestStateCommitted,
75                 },
76         })
77         log.Print(cr.UUID)
78         return err
79 }
80
81 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
82         if runner.Mounts == nil {
83                 runner.Mounts = make(map[string]string)
84         }
85         for _, path := range paths {
86                 if *path == "" {
87                         continue
88                 }
89                 m := collectionInPathRe.FindStringSubmatch(*path)
90                 if m == nil {
91                         return fmt.Errorf("cannot find uuid in path: %q", *path)
92                 }
93                 uuid := m[2]
94                 mnt, ok := runner.Mounts[uuid]
95                 if !ok {
96                         mnt = "/mnt/" + uuid
97                         runner.Mounts[uuid] = mnt
98                 }
99                 *path = mnt + m[3]
100         }
101         return nil
102 }
103
104 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
105         exe, err := ioutil.ReadFile("/proc/self/exe")
106         if err != nil {
107                 return "", err
108         }
109         b2 := blake2b.Sum256(exe)
110         cname := fmt.Sprintf("lightning-%x", b2)
111         var existing arvados.CollectionList
112         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
113                 Limit: 1,
114                 Count: "none",
115                 Filters: []arvados.Filter{
116                         {"name", "=", cname},
117                         {"owner_uuid", "=", runner.ProjectUUID},
118                 },
119         })
120         if err != nil {
121                 return "", err
122         }
123         if len(existing.Items) > 0 {
124                 uuid := existing.Items[0].UUID
125                 log.Printf("using existing collection %q named %q (did not verify whether content matches)", uuid, cname)
126                 return uuid, nil
127         }
128         log.Printf("writing lightning binary to new collection %q", cname)
129         ac, err := arvadosclient.New(runner.Client)
130         if err != nil {
131                 return "", err
132         }
133         kc := keepclient.New(ac)
134         var coll arvados.Collection
135         fs, err := coll.FileSystem(runner.Client, kc)
136         if err != nil {
137                 return "", err
138         }
139         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
140         if err != nil {
141                 return "", err
142         }
143         _, err = f.Write(exe)
144         if err != nil {
145                 return "", err
146         }
147         err = f.Close()
148         if err != nil {
149                 return "", err
150         }
151         mtxt, err := fs.MarshalManifest(".")
152         if err != nil {
153                 return "", err
154         }
155         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
156                 "collection": map[string]interface{}{
157                         "owner_uuid":    runner.ProjectUUID,
158                         "manifest_text": mtxt,
159                         "name":          cname,
160                 },
161         })
162         if err != nil {
163                 return "", err
164         }
165         log.Printf("collection: %#v", coll)
166         return coll.UUID, nil
167 }