Container request priority flag.
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "fmt"
7         "io/ioutil"
8         "net/url"
9         "os"
10         "regexp"
11         "strings"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16         "git.arvados.org/arvados.git/sdk/go/keepclient"
17         log "github.com/sirupsen/logrus"
18         "golang.org/x/crypto/blake2b"
19         "golang.org/x/net/websocket"
20 )
21
22 type arvadosContainerRunner struct {
23         Client      *arvados.Client
24         Name        string
25         ProjectUUID string
26         VCPUs       int
27         RAM         int64
28         Prog        string // if empty, run /proc/self/exe
29         Args        []string
30         Mounts      map[string]map[string]interface{}
31         Priority    int
32 }
33
34 func (runner *arvadosContainerRunner) Run() (string, error) {
35         if runner.ProjectUUID == "" {
36                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
37         }
38
39         mounts := map[string]map[string]interface{}{
40                 "/mnt/output": {
41                         "kind":     "tmp",
42                         "writable": true,
43                         "capacity": 100000000000,
44                 },
45         }
46         for path, mnt := range runner.Mounts {
47                 mounts[path] = mnt
48         }
49
50         prog := runner.Prog
51         if prog == "" {
52                 prog = "/mnt/cmd/lightning"
53                 cmdUUID, err := runner.makeCommandCollection()
54                 if err != nil {
55                         return "", err
56                 }
57                 mounts["/mnt/cmd"] = map[string]interface{}{
58                         "kind": "collection",
59                         "uuid": cmdUUID,
60                 }
61         }
62         command := append([]string{prog}, runner.Args...)
63
64         priority := runner.Priority
65         if priority < 1 {
66                 priority = 500
67         }
68         rc := arvados.RuntimeConstraints{
69                 VCPUs:        runner.VCPUs,
70                 RAM:          runner.RAM,
71                 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
72         }
73         var cr arvados.ContainerRequest
74         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
75                 "container_request": map[string]interface{}{
76                         "owner_uuid":          runner.ProjectUUID,
77                         "name":                runner.Name,
78                         "container_image":     "lightning-runtime",
79                         "command":             command,
80                         "mounts":              mounts,
81                         "use_existing":        true,
82                         "output_path":         "/mnt/output",
83                         "runtime_constraints": rc,
84                         "priority":            runner.Priority,
85                         "state":               arvados.ContainerRequestStateCommitted,
86                 },
87         })
88         log.Printf("container request UUID: %s", cr.UUID)
89         log.Printf("container UUID: %s", cr.ContainerUUID)
90
91         var logch <-chan eventMessage
92         var logstream *logStream
93         defer func() {
94                 if logstream != nil {
95                         logstream.Close()
96                 }
97         }()
98
99         ticker := time.NewTicker(5 * time.Second)
100         defer ticker.Stop()
101
102         lastState := cr.State
103         refreshCR := func() {
104                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
105                 if err != nil {
106                         log.Printf("error getting container request: %s", err)
107                         return
108                 }
109                 if lastState != cr.State {
110                         log.Printf("container state: %s", cr.State)
111                         lastState = cr.State
112                 }
113         }
114
115         subscribedUUID := ""
116         for cr.State != arvados.ContainerRequestStateFinal {
117                 if logch == nil && cr.ContainerUUID != subscribedUUID {
118                         if logstream != nil {
119                                 logstream.Close()
120                         }
121                         logstream = runner.logStream(cr.ContainerUUID)
122                         logch = logstream.C
123                 }
124                 select {
125                 case msg, ok := <-logch:
126                         if !ok {
127                                 logstream.Close()
128                                 logstream = nil
129                                 logch = nil
130                                 break
131                         }
132                         switch msg.EventType {
133                         case "update":
134                                 refreshCR()
135                         default:
136                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
137                                         if line != "" {
138                                                 log.Print(line)
139                                         }
140                                 }
141                         }
142                 case <-ticker.C:
143                         refreshCR()
144                 }
145         }
146
147         var c arvados.Container
148         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
149         if err != nil {
150                 return "", err
151         }
152         if c.ExitCode != 0 {
153                 return "", fmt.Errorf("container exited %d", c.ExitCode)
154         }
155         return cr.OutputUUID, err
156 }
157
158 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
159
160 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
161         if runner.Mounts == nil {
162                 runner.Mounts = make(map[string]map[string]interface{})
163         }
164         for _, path := range paths {
165                 if *path == "" || *path == "-" {
166                         continue
167                 }
168                 m := collectionInPathRe.FindStringSubmatch(*path)
169                 if m == nil {
170                         return fmt.Errorf("cannot find uuid in path: %q", *path)
171                 }
172                 uuid := m[2]
173                 mnt, ok := runner.Mounts["/mnt/"+uuid]
174                 if !ok {
175                         mnt = map[string]interface{}{
176                                 "kind": "collection",
177                                 "uuid": uuid,
178                         }
179                         runner.Mounts["/mnt/"+uuid] = mnt
180                 }
181                 *path = "/mnt/" + uuid + m[3]
182         }
183         return nil
184 }
185
186 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
187         exe, err := ioutil.ReadFile("/proc/self/exe")
188         if err != nil {
189                 return "", err
190         }
191         b2 := blake2b.Sum256(exe)
192         cname := fmt.Sprintf("lightning-%x", b2)
193         var existing arvados.CollectionList
194         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
195                 Limit: 1,
196                 Count: "none",
197                 Filters: []arvados.Filter{
198                         {Attr: "name", Operator: "=", Operand: cname},
199                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
200                 },
201         })
202         if err != nil {
203                 return "", err
204         }
205         if len(existing.Items) > 0 {
206                 uuid := existing.Items[0].UUID
207                 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
208                 return uuid, nil
209         }
210         log.Printf("writing lightning binary to new collection %q", cname)
211         ac, err := arvadosclient.New(runner.Client)
212         if err != nil {
213                 return "", err
214         }
215         kc := keepclient.New(ac)
216         var coll arvados.Collection
217         fs, err := coll.FileSystem(runner.Client, kc)
218         if err != nil {
219                 return "", err
220         }
221         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
222         if err != nil {
223                 return "", err
224         }
225         _, err = f.Write(exe)
226         if err != nil {
227                 return "", err
228         }
229         err = f.Close()
230         if err != nil {
231                 return "", err
232         }
233         mtxt, err := fs.MarshalManifest(".")
234         if err != nil {
235                 return "", err
236         }
237         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
238                 "collection": map[string]interface{}{
239                         "owner_uuid":    runner.ProjectUUID,
240                         "manifest_text": mtxt,
241                         "name":          cname,
242                 },
243         })
244         if err != nil {
245                 return "", err
246         }
247         log.Printf("stored lightning binary in new collection %s", coll.UUID)
248         return coll.UUID, nil
249 }
250
251 type eventMessage struct {
252         Status     int
253         ObjectUUID string `json:"object_uuid"`
254         EventType  string `json:"event_type"`
255         Properties struct {
256                 Text string
257         }
258 }
259
260 type logStream struct {
261         C     <-chan eventMessage
262         Close func() error
263 }
264
265 func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
266         ch := make(chan eventMessage)
267         done := make(chan struct{})
268         go func() {
269                 defer close(ch)
270                 var cluster arvados.Cluster
271                 runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
272                 wsURL := cluster.Services.Websocket.ExternalURL
273                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
274                 wsURL.Path = "/websocket"
275                 wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
276                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
277                 if err != nil {
278                         log.Printf("websocket error: %s", err)
279                         return
280                 }
281                 w := json.NewEncoder(conn)
282                 go w.Encode(map[string]interface{}{
283                         "method": "subscribe",
284                         "filters": [][]interface{}{
285                                 {"object_uuid", "=", uuid},
286                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
287                         },
288                 })
289                 r := json.NewDecoder(conn)
290                 for {
291                         var msg eventMessage
292                         err := r.Decode(&msg)
293                         if err != nil {
294                                 log.Printf("error decoding websocket message: %s", err)
295                                 return
296                         }
297                         if msg.ObjectUUID == uuid {
298                                 ch <- msg
299                         }
300                         select {
301                         case <-done:
302                                 return
303                         default:
304                         }
305                 }
306         }()
307         return &logStream{
308                 C: ch,
309                 Close: func() error {
310                         close(done)
311                         return nil
312                 },
313         }
314 }