I/O pipeline, show arvados container logs.
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "fmt"
7         "io/ioutil"
8         "log"
9         "net/url"
10         "os"
11         "regexp"
12         "strings"
13         "time"
14
15         "git.arvados.org/arvados.git/sdk/go/arvados"
16         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
17         "git.arvados.org/arvados.git/sdk/go/keepclient"
18         "golang.org/x/crypto/blake2b"
19         "golang.org/x/net/websocket"
20 )
21
22 type arvadosContainerRunner struct {
23         Client      *arvados.Client
24         Name        string
25         ProjectUUID string
26         VCPUs       int
27         RAM         int64
28         Prog        string // if empty, run /proc/self/exe
29         Args        []string
30         Mounts      map[string]map[string]interface{}
31 }
32
33 func (runner *arvadosContainerRunner) Run() (string, error) {
34         if runner.ProjectUUID == "" {
35                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
36         }
37
38         mounts := map[string]map[string]interface{}{
39                 "/mnt/output": {
40                         "kind":     "tmp",
41                         "writable": true,
42                         "capacity": 100000000000,
43                 },
44         }
45         for path, mnt := range runner.Mounts {
46                 mounts[path] = mnt
47         }
48
49         prog := runner.Prog
50         if prog == "" {
51                 prog = "/mnt/cmd/lightning"
52                 cmdUUID, err := runner.makeCommandCollection()
53                 if err != nil {
54                         return "", err
55                 }
56                 mounts["/mnt/cmd"] = map[string]interface{}{
57                         "kind": "collection",
58                         "uuid": cmdUUID,
59                 }
60         }
61         command := append([]string{prog}, runner.Args...)
62
63         rc := arvados.RuntimeConstraints{
64                 VCPUs:        runner.VCPUs,
65                 RAM:          runner.RAM,
66                 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
67         }
68         var cr arvados.ContainerRequest
69         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
70                 "container_request": map[string]interface{}{
71                         "owner_uuid":          runner.ProjectUUID,
72                         "name":                runner.Name,
73                         "container_image":     "lightning-runtime",
74                         "command":             command,
75                         "mounts":              mounts,
76                         "use_existing":        true,
77                         "output_path":         "/mnt/output",
78                         "runtime_constraints": rc,
79                         "priority":            1,
80                         "state":               arvados.ContainerRequestStateCommitted,
81                 },
82         })
83         log.Print(cr.UUID)
84
85         var logch <-chan string
86         var logstream *logStream
87         defer func() {
88                 if logstream != nil {
89                         logstream.Close()
90                 }
91         }()
92
93         ticker := time.NewTicker(5 * time.Second)
94         defer ticker.Stop()
95
96         lastState := cr.State
97         subscribedUUID := ""
98         for cr.State != arvados.ContainerRequestState(arvados.ContainerRequestStateFinal) {
99                 if logch == nil && cr.ContainerUUID != subscribedUUID {
100                         if logstream != nil {
101                                 logstream.Close()
102                         }
103                         logstream = runner.logStream(cr.ContainerUUID)
104                         logch = logstream.C
105                 }
106                 select {
107                 case msg, ok := <-logch:
108                         if !ok {
109                                 logstream.Close()
110                                 logstream = nil
111                                 logch = nil
112                                 break
113                         }
114                         if msg != "" {
115                                 log.Print(msg)
116                                 continue
117                         }
118                         // empty message indicates an "update" event
119                         // -- fall out of the select and get the
120                         // latest version now, instead of waiting for
121                         // the next timer tick.
122                 case <-ticker.C:
123                 }
124                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
125                 if err != nil {
126                         return "", err
127                 }
128                 if lastState != cr.State {
129                         log.Printf("container state: %s", cr.State)
130                         lastState = cr.State
131                 }
132         }
133
134         var c arvados.Container
135         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
136         if err != nil {
137                 return "", err
138         }
139         if c.ExitCode != 0 {
140                 return "", fmt.Errorf("container exited %d", c.ExitCode)
141         }
142         return cr.OutputUUID, err
143 }
144
145 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
146
147 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
148         if runner.Mounts == nil {
149                 runner.Mounts = make(map[string]map[string]interface{})
150         }
151         for _, path := range paths {
152                 if *path == "" || *path == "-" {
153                         continue
154                 }
155                 m := collectionInPathRe.FindStringSubmatch(*path)
156                 if m == nil {
157                         return fmt.Errorf("cannot find uuid in path: %q", *path)
158                 }
159                 uuid := m[2]
160                 mnt, ok := runner.Mounts["/mnt/"+uuid]
161                 if !ok {
162                         mnt = map[string]interface{}{
163                                 "kind": "collection",
164                                 "uuid": uuid,
165                         }
166                         runner.Mounts["/mnt/"+uuid] = mnt
167                 }
168                 *path = "/mnt/" + uuid + m[3]
169         }
170         return nil
171 }
172
173 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
174         exe, err := ioutil.ReadFile("/proc/self/exe")
175         if err != nil {
176                 return "", err
177         }
178         b2 := blake2b.Sum256(exe)
179         cname := fmt.Sprintf("lightning-%x", b2)
180         var existing arvados.CollectionList
181         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
182                 Limit: 1,
183                 Count: "none",
184                 Filters: []arvados.Filter{
185                         {Attr: "name", Operator: "=", Operand: cname},
186                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
187                 },
188         })
189         if err != nil {
190                 return "", err
191         }
192         if len(existing.Items) > 0 {
193                 uuid := existing.Items[0].UUID
194                 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
195                 return uuid, nil
196         }
197         log.Printf("writing lightning binary to new collection %q", cname)
198         ac, err := arvadosclient.New(runner.Client)
199         if err != nil {
200                 return "", err
201         }
202         kc := keepclient.New(ac)
203         var coll arvados.Collection
204         fs, err := coll.FileSystem(runner.Client, kc)
205         if err != nil {
206                 return "", err
207         }
208         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
209         if err != nil {
210                 return "", err
211         }
212         _, err = f.Write(exe)
213         if err != nil {
214                 return "", err
215         }
216         err = f.Close()
217         if err != nil {
218                 return "", err
219         }
220         mtxt, err := fs.MarshalManifest(".")
221         if err != nil {
222                 return "", err
223         }
224         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
225                 "collection": map[string]interface{}{
226                         "owner_uuid":    runner.ProjectUUID,
227                         "manifest_text": mtxt,
228                         "name":          cname,
229                 },
230         })
231         if err != nil {
232                 return "", err
233         }
234         log.Printf("stored lightning binary in new collection %s", coll.UUID)
235         return coll.UUID, nil
236 }
237
238 type logStream struct {
239         C     <-chan string
240         Close func() error
241 }
242
243 func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
244         ch := make(chan string)
245         done := make(chan struct{})
246         go func() {
247                 defer close(ch)
248                 var cluster arvados.Cluster
249                 runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
250                 wsURL := cluster.Services.Websocket.ExternalURL
251                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
252                 wsURL.Path = "/websocket"
253                 wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
254                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
255                 if err != nil {
256                         ch <- fmt.Sprintf("websocket error: %s", err)
257                         return
258                 }
259                 w := json.NewEncoder(conn)
260                 go w.Encode(map[string]interface{}{
261                         "method": "subscribe",
262                         "filters": [][]interface{}{
263                                 {"object_uuid", "=", uuid},
264                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
265                         },
266                 })
267                 r := json.NewDecoder(conn)
268                 for {
269                         var msg struct {
270                                 Status     int
271                                 ObjectUUID string `json:"object_uuid"`
272                                 EventType  string `json:"event_type"`
273                                 Properties struct {
274                                         Text string
275                                 }
276                         }
277                         err := r.Decode(&msg)
278                         if err != nil {
279                                 log.Printf("error decoding websocket message: %s", err)
280                                 return
281                         }
282                         if msg.ObjectUUID == uuid {
283                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
284                                         if line != "" {
285                                                 ch <- line
286                                         }
287                                 }
288                                 if msg.EventType == "update" {
289                                         ch <- ""
290                                 }
291                         }
292                         select {
293                         case <-done:
294                                 return
295                         default:
296                         }
297                 }
298         }()
299         return &logStream{
300                 C: ch,
301                 Close: func() error {
302                         close(done)
303                         return nil
304                 },
305         }
306 }