Run "filter" and "export" in arvados containers.
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "errors"
5         "fmt"
6         "io/ioutil"
7         "log"
8         "os"
9         "regexp"
10
11         "git.arvados.org/arvados.git/sdk/go/arvados"
12         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
13         "git.arvados.org/arvados.git/sdk/go/keepclient"
14         "golang.org/x/crypto/blake2b"
15 )
16
17 type arvadosContainerRunner struct {
18         Client      *arvados.Client
19         Name        string
20         ProjectUUID string
21         VCPUs       int
22         RAM         int64
23         Args        []string
24         Mounts      map[string]string
25 }
26
27 var (
28         collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
29 )
30
31 func (runner *arvadosContainerRunner) Run() error {
32         if runner.ProjectUUID == "" {
33                 return errors.New("cannot run arvados container: ProjectUUID not provided")
34         }
35         prog := "/mnt/cmd/lightning"
36         cmdUUID, err := runner.makeCommandCollection()
37         if err != nil {
38                 return err
39         }
40         command := append([]string{prog}, runner.Args...)
41         mounts := map[string]map[string]interface{}{
42                 "/mnt/cmd": {
43                         "kind": "collection",
44                         "uuid": cmdUUID,
45                 },
46                 "/mnt/output": {
47                         "kind":     "tmp",
48                         "writable": true,
49                         "capacity": 100000000000,
50                 },
51         }
52         for uuid, mnt := range runner.Mounts {
53                 mounts[mnt] = map[string]interface{}{
54                         "kind": "collection",
55                         "uuid": uuid,
56                 }
57         }
58         rc := arvados.RuntimeConstraints{
59                 VCPUs:        runner.VCPUs,
60                 RAM:          runner.RAM,
61                 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
62         }
63         var cr arvados.ContainerRequest
64         err = runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
65                 "container_request": map[string]interface{}{
66                         "owner_uuid":          runner.ProjectUUID,
67                         "name":                runner.Name,
68                         "container_image":     "lightning-runtime",
69                         "command":             command,
70                         "mounts":              mounts,
71                         "use_existing":        true,
72                         "output_path":         "/mnt/output",
73                         "runtime_constraints": rc,
74                         "priority":            1,
75                         "state":               arvados.ContainerRequestStateCommitted,
76                 },
77         })
78         log.Print(cr.UUID)
79         return err
80 }
81
82 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
83         if runner.Mounts == nil {
84                 runner.Mounts = make(map[string]string)
85         }
86         for _, path := range paths {
87                 if *path == "" {
88                         continue
89                 }
90                 m := collectionInPathRe.FindStringSubmatch(*path)
91                 if m == nil {
92                         return fmt.Errorf("cannot find uuid in path: %q", *path)
93                 }
94                 uuid := m[2]
95                 mnt, ok := runner.Mounts[uuid]
96                 if !ok {
97                         mnt = "/mnt/" + uuid
98                         runner.Mounts[uuid] = mnt
99                 }
100                 *path = mnt + m[3]
101         }
102         return nil
103 }
104
105 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
106         exe, err := ioutil.ReadFile("/proc/self/exe")
107         if err != nil {
108                 return "", err
109         }
110         b2 := blake2b.Sum256(exe)
111         cname := fmt.Sprintf("lightning-%x", b2)
112         var existing arvados.CollectionList
113         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
114                 Limit: 1,
115                 Count: "none",
116                 Filters: []arvados.Filter{
117                         {Attr: "name", Operator: "=", Operand: cname},
118                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
119                 },
120         })
121         if err != nil {
122                 return "", err
123         }
124         if len(existing.Items) > 0 {
125                 uuid := existing.Items[0].UUID
126                 log.Printf("using existing collection %q named %q (did not verify whether content matches)", uuid, cname)
127                 return uuid, nil
128         }
129         log.Printf("writing lightning binary to new collection %q", cname)
130         ac, err := arvadosclient.New(runner.Client)
131         if err != nil {
132                 return "", err
133         }
134         kc := keepclient.New(ac)
135         var coll arvados.Collection
136         fs, err := coll.FileSystem(runner.Client, kc)
137         if err != nil {
138                 return "", err
139         }
140         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
141         if err != nil {
142                 return "", err
143         }
144         _, err = f.Write(exe)
145         if err != nil {
146                 return "", err
147         }
148         err = f.Close()
149         if err != nil {
150                 return "", err
151         }
152         mtxt, err := fs.MarshalManifest(".")
153         if err != nil {
154                 return "", err
155         }
156         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
157                 "collection": map[string]interface{}{
158                         "owner_uuid":    runner.ProjectUUID,
159                         "manifest_text": mtxt,
160                         "name":          cname,
161                 },
162         })
163         if err != nil {
164                 return "", err
165         }
166         log.Printf("collection: %#v", coll)
167         return coll.UUID, nil
168 }