Less verbose logging.
[lightning.git] / arvados.go
1 package main
2
3 import (
4         "encoding/json"
5         "errors"
6         "fmt"
7         "io/ioutil"
8         "net/url"
9         "os"
10         "regexp"
11         "strings"
12         "time"
13
14         "git.arvados.org/arvados.git/sdk/go/arvados"
15         "git.arvados.org/arvados.git/sdk/go/arvadosclient"
16         "git.arvados.org/arvados.git/sdk/go/keepclient"
17         log "github.com/sirupsen/logrus"
18         "golang.org/x/crypto/blake2b"
19         "golang.org/x/net/websocket"
20 )
21
22 type arvadosContainerRunner struct {
23         Client      *arvados.Client
24         Name        string
25         ProjectUUID string
26         VCPUs       int
27         RAM         int64
28         Prog        string // if empty, run /proc/self/exe
29         Args        []string
30         Mounts      map[string]map[string]interface{}
31 }
32
33 func (runner *arvadosContainerRunner) Run() (string, error) {
34         if runner.ProjectUUID == "" {
35                 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
36         }
37
38         mounts := map[string]map[string]interface{}{
39                 "/mnt/output": {
40                         "kind":     "tmp",
41                         "writable": true,
42                         "capacity": 100000000000,
43                 },
44         }
45         for path, mnt := range runner.Mounts {
46                 mounts[path] = mnt
47         }
48
49         prog := runner.Prog
50         if prog == "" {
51                 prog = "/mnt/cmd/lightning"
52                 cmdUUID, err := runner.makeCommandCollection()
53                 if err != nil {
54                         return "", err
55                 }
56                 mounts["/mnt/cmd"] = map[string]interface{}{
57                         "kind": "collection",
58                         "uuid": cmdUUID,
59                 }
60         }
61         command := append([]string{prog}, runner.Args...)
62
63         rc := arvados.RuntimeConstraints{
64                 VCPUs:        runner.VCPUs,
65                 RAM:          runner.RAM,
66                 KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs),
67         }
68         var cr arvados.ContainerRequest
69         err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
70                 "container_request": map[string]interface{}{
71                         "owner_uuid":          runner.ProjectUUID,
72                         "name":                runner.Name,
73                         "container_image":     "lightning-runtime",
74                         "command":             command,
75                         "mounts":              mounts,
76                         "use_existing":        true,
77                         "output_path":         "/mnt/output",
78                         "runtime_constraints": rc,
79                         "priority":            1,
80                         "state":               arvados.ContainerRequestStateCommitted,
81                 },
82         })
83         log.Printf("container request UUID: %s", cr.UUID)
84         log.Printf("container UUID: %s", cr.ContainerUUID)
85
86         var logch <-chan eventMessage
87         var logstream *logStream
88         defer func() {
89                 if logstream != nil {
90                         logstream.Close()
91                 }
92         }()
93
94         ticker := time.NewTicker(5 * time.Second)
95         defer ticker.Stop()
96
97         lastState := cr.State
98         refreshCR := func() {
99                 err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
100                 if err != nil {
101                         log.Printf("error getting container request: %s", err)
102                         return
103                 }
104                 if lastState != cr.State {
105                         log.Printf("container state: %s", cr.State)
106                         lastState = cr.State
107                 }
108         }
109
110         subscribedUUID := ""
111         for cr.State != arvados.ContainerRequestStateFinal {
112                 if logch == nil && cr.ContainerUUID != subscribedUUID {
113                         if logstream != nil {
114                                 logstream.Close()
115                         }
116                         logstream = runner.logStream(cr.ContainerUUID)
117                         logch = logstream.C
118                 }
119                 select {
120                 case msg, ok := <-logch:
121                         if !ok {
122                                 logstream.Close()
123                                 logstream = nil
124                                 logch = nil
125                                 break
126                         }
127                         switch msg.EventType {
128                         case "update":
129                                 refreshCR()
130                         default:
131                                 for _, line := range strings.Split(msg.Properties.Text, "\n") {
132                                         if line != "" {
133                                                 log.Print(line)
134                                         }
135                                 }
136                         }
137                 case <-ticker.C:
138                         refreshCR()
139                 }
140         }
141
142         var c arvados.Container
143         err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
144         if err != nil {
145                 return "", err
146         }
147         if c.ExitCode != 0 {
148                 return "", fmt.Errorf("container exited %d", c.ExitCode)
149         }
150         return cr.OutputUUID, err
151 }
152
153 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
154
155 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
156         if runner.Mounts == nil {
157                 runner.Mounts = make(map[string]map[string]interface{})
158         }
159         for _, path := range paths {
160                 if *path == "" || *path == "-" {
161                         continue
162                 }
163                 m := collectionInPathRe.FindStringSubmatch(*path)
164                 if m == nil {
165                         return fmt.Errorf("cannot find uuid in path: %q", *path)
166                 }
167                 uuid := m[2]
168                 mnt, ok := runner.Mounts["/mnt/"+uuid]
169                 if !ok {
170                         mnt = map[string]interface{}{
171                                 "kind": "collection",
172                                 "uuid": uuid,
173                         }
174                         runner.Mounts["/mnt/"+uuid] = mnt
175                 }
176                 *path = "/mnt/" + uuid + m[3]
177         }
178         return nil
179 }
180
181 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
182         exe, err := ioutil.ReadFile("/proc/self/exe")
183         if err != nil {
184                 return "", err
185         }
186         b2 := blake2b.Sum256(exe)
187         cname := fmt.Sprintf("lightning-%x", b2)
188         var existing arvados.CollectionList
189         err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
190                 Limit: 1,
191                 Count: "none",
192                 Filters: []arvados.Filter{
193                         {Attr: "name", Operator: "=", Operand: cname},
194                         {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
195                 },
196         })
197         if err != nil {
198                 return "", err
199         }
200         if len(existing.Items) > 0 {
201                 uuid := existing.Items[0].UUID
202                 log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname)
203                 return uuid, nil
204         }
205         log.Printf("writing lightning binary to new collection %q", cname)
206         ac, err := arvadosclient.New(runner.Client)
207         if err != nil {
208                 return "", err
209         }
210         kc := keepclient.New(ac)
211         var coll arvados.Collection
212         fs, err := coll.FileSystem(runner.Client, kc)
213         if err != nil {
214                 return "", err
215         }
216         f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
217         if err != nil {
218                 return "", err
219         }
220         _, err = f.Write(exe)
221         if err != nil {
222                 return "", err
223         }
224         err = f.Close()
225         if err != nil {
226                 return "", err
227         }
228         mtxt, err := fs.MarshalManifest(".")
229         if err != nil {
230                 return "", err
231         }
232         err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
233                 "collection": map[string]interface{}{
234                         "owner_uuid":    runner.ProjectUUID,
235                         "manifest_text": mtxt,
236                         "name":          cname,
237                 },
238         })
239         if err != nil {
240                 return "", err
241         }
242         log.Printf("stored lightning binary in new collection %s", coll.UUID)
243         return coll.UUID, nil
244 }
245
246 type eventMessage struct {
247         Status     int
248         ObjectUUID string `json:"object_uuid"`
249         EventType  string `json:"event_type"`
250         Properties struct {
251                 Text string
252         }
253 }
254
255 type logStream struct {
256         C     <-chan eventMessage
257         Close func() error
258 }
259
260 func (runner *arvadosContainerRunner) logStream(uuid string) *logStream {
261         ch := make(chan eventMessage)
262         done := make(chan struct{})
263         go func() {
264                 defer close(ch)
265                 var cluster arvados.Cluster
266                 runner.Client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
267                 wsURL := cluster.Services.Websocket.ExternalURL
268                 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
269                 wsURL.Path = "/websocket"
270                 wsURL.RawQuery = url.Values{"api_token": []string{runner.Client.AuthToken}}.Encode()
271                 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
272                 if err != nil {
273                         log.Printf("websocket error: %s", err)
274                         return
275                 }
276                 w := json.NewEncoder(conn)
277                 go w.Encode(map[string]interface{}{
278                         "method": "subscribe",
279                         "filters": [][]interface{}{
280                                 {"object_uuid", "=", uuid},
281                                 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
282                         },
283                 })
284                 r := json.NewDecoder(conn)
285                 for {
286                         var msg eventMessage
287                         err := r.Decode(&msg)
288                         if err != nil {
289                                 log.Printf("error decoding websocket message: %s", err)
290                                 return
291                         }
292                         ch <- msg
293                         select {
294                         case <-done:
295                                 return
296                         default:
297                         }
298                 }
299         }()
300         return &logStream{
301                 C: ch,
302                 Close: func() error {
303                         close(done)
304                         return nil
305                 },
306         }
307 }