1 // Copyright (C) The Lightning Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
25 "git.arvados.org/arvados.git/lib/cmd"
26 "git.arvados.org/arvados.git/sdk/go/arvados"
27 "git.arvados.org/arvados.git/sdk/go/arvadosclient"
28 "git.arvados.org/arvados.git/sdk/go/keepclient"
29 "github.com/klauspost/pgzip"
30 log "github.com/sirupsen/logrus"
31 "golang.org/x/crypto/blake2b"
32 "golang.org/x/net/websocket"
35 type eventMessage struct {
37 ObjectUUID string `json:"object_uuid"`
38 EventType string `json:"event_type"`
44 type arvadosClient struct {
46 notifying map[string]map[chan<- eventMessage]int
47 wantClose chan struct{}
48 wsconn *websocket.Conn
52 // Listen for events concerning the given uuids. When an event occurs
53 // (and after connecting/reconnecting to the event stream), send each
54 // uuid to ch. If a {ch, uuid} pair is subscribed twice, the uuid will
55 // be sent only once for each update, but two Unsubscribe calls will
56 // be needed to stop sending them.
57 func (client *arvadosClient) Subscribe(ch chan<- eventMessage, uuid string) {
59 defer client.mtx.Unlock()
60 if client.notifying == nil {
61 client.notifying = map[string]map[chan<- eventMessage]int{}
62 client.wantClose = make(chan struct{})
63 go client.runNotifier()
65 chmap := client.notifying[uuid]
67 chmap = map[chan<- eventMessage]int{}
68 client.notifying[uuid] = chmap
71 for _, nch := range chmap {
78 if needSub && client.wsconn != nil {
79 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
80 "method": "subscribe",
81 "filters": [][]interface{}{
82 {"object_uuid", "=", uuid},
83 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
89 func (client *arvadosClient) Unsubscribe(ch chan<- eventMessage, uuid string) {
91 defer client.mtx.Unlock()
92 chmap := client.notifying[uuid]
93 if n := chmap[ch] - 1; n == 0 {
96 delete(client.notifying, uuid)
98 if client.wsconn != nil {
99 go json.NewEncoder(client.wsconn).Encode(map[string]interface{}{
100 "method": "unsubscribe",
101 "filters": [][]interface{}{
102 {"object_uuid", "=", uuid},
103 {"event_type", "in", []string{"stderr", "crunch-run", "update"}},
112 func (client *arvadosClient) Close() {
114 defer client.mtx.Unlock()
115 if client.notifying != nil {
116 client.notifying = nil
117 close(client.wantClose)
121 func (client *arvadosClient) runNotifier() {
124 var cluster arvados.Cluster
125 err := client.RequestAndDecode(&cluster, "GET", arvados.EndpointConfigGet.Path, nil, nil)
127 log.Warnf("error getting cluster config: %s", err)
128 time.Sleep(5 * time.Second)
131 wsURL := cluster.Services.Websocket.ExternalURL
132 wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1)
133 wsURL.Path = "/websocket"
134 wsURLNoToken := wsURL.String()
135 wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode()
136 conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String())
138 log.Warnf("websocket connection error: %s", err)
139 time.Sleep(5 * time.Second)
142 log.Printf("connected to websocket at %s", wsURLNoToken)
146 resubscribe := make([]string, 0, len(client.notifying))
147 for uuid := range client.notifying {
148 resubscribe = append(resubscribe, uuid)
153 w := json.NewEncoder(conn)
154 for _, uuid := range resubscribe {
155 w.Encode(map[string]interface{}{
156 "method": "subscribe",
157 "filters": [][]interface{}{
158 {"object_uuid", "=", uuid},
159 {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}},
165 r := json.NewDecoder(conn)
168 err := r.Decode(&msg)
170 case <-client.wantClose:
174 log.Printf("error decoding websocket message: %s", err)
182 for ch := range client.notifying[msg.ObjectUUID] {
183 go func() { ch <- msg }()
191 var refreshTicker = time.NewTicker(5 * time.Second)
193 type arvadosContainerRunner struct {
194 Client *arvados.Client
201 Prog string // if empty, run /proc/self/exe
203 Mounts map[string]map[string]interface{}
205 KeepCache int // cache buffers per VCPU (0 for default)
209 func (runner *arvadosContainerRunner) Run() (string, error) {
210 return runner.RunContext(context.Background())
213 func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) {
214 if runner.ProjectUUID == "" {
215 return "", errors.New("cannot run arvados container: ProjectUUID not provided")
218 mounts := map[string]map[string]interface{}{
220 "kind": "collection",
224 for path, mnt := range runner.Mounts {
230 prog = "/mnt/cmd/lightning"
231 cmdUUID, err := runner.makeCommandCollection()
235 mounts["/mnt/cmd"] = map[string]interface{}{
236 "kind": "collection",
240 command := append([]string{prog}, runner.Args...)
242 priority := runner.Priority
246 keepCache := runner.KeepCache
250 rc := arvados.RuntimeConstraints{
251 API: runner.APIAccess,
254 KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs),
256 outname := &runner.OutputName
260 var cr arvados.ContainerRequest
261 err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
262 "container_request": map[string]interface{}{
263 "owner_uuid": runner.ProjectUUID,
265 "container_image": "lightning-runtime",
268 "use_existing": true,
269 "output_path": "/mnt/output",
270 "output_name": outname,
271 "runtime_constraints": rc,
272 "priority": runner.Priority,
273 "state": arvados.ContainerRequestStateCommitted,
274 "scheduling_parameters": arvados.SchedulingParameters{
275 Preemptible: runner.Preemptible,
276 Partitions: []string{},
278 "environment": map[string]string{
279 "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs),
281 "container_count_max": 1,
287 log.Printf("container request UUID: %s", cr.UUID)
288 log.Printf("container UUID: %s", cr.ContainerUUID)
290 logch := make(chan eventMessage)
291 client := arvadosClient{Client: runner.Client}
295 if subscribedUUID != "" {
296 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
297 client.Unsubscribe(logch, subscribedUUID)
302 logTell := map[string]int64{}
304 lastState := cr.State
305 refreshCR := func() {
306 ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Minute))
308 err = runner.Client.RequestAndDecodeContext(ctx, &cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil)
310 fmt.Fprint(os.Stderr, neednewline)
312 log.Printf("error getting container request: %s", err)
315 if lastState != cr.State {
316 fmt.Fprint(os.Stderr, neednewline)
318 log.Printf("container request state: %s", cr.State)
321 if subscribedUUID != cr.ContainerUUID {
322 fmt.Fprint(os.Stderr, neednewline)
324 if subscribedUUID != "" {
325 log.Printf("unsubscribe container UUID: %s", subscribedUUID)
326 client.Unsubscribe(logch, subscribedUUID)
328 log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
329 client.Subscribe(logch, cr.ContainerUUID)
330 subscribedUUID = cr.ContainerUUID
331 logTell = map[string]int64{}
335 var logWaitMax = time.Second * 10
336 var logWaitMin = time.Second
337 var logWait = logWaitMin
338 var logWaitDone = time.After(logWait)
339 var reCrunchstat = regexp.MustCompile(`mem .* (\d+) rss`)
341 for cr.State != arvados.ContainerRequestStateFinal {
344 err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{
345 "container_request": map[string]interface{}{
350 log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err)
353 case <-refreshTicker.C:
356 if msg.EventType == "update" {
361 for _, fnm := range []string{"stderr.txt", "crunchstat.txt"} {
362 req, err := http.NewRequest("GET", "https://"+runner.Client.APIHost+"/arvados/v1/container_requests/"+cr.UUID+"/log/"+cr.ContainerUUID+"/"+fnm, nil)
364 log.Errorf("error preparing log request: %s", err)
367 req.Header.Set("Range", fmt.Sprintf("bytes=%d-", logTell[fnm]))
368 resp, err := runner.Client.Do(req)
370 log.Errorf("error getting log data: %s", err)
372 } else if (resp.StatusCode == http.StatusNotFound && logTell[fnm] == 0) ||
373 (resp.StatusCode == http.StatusRequestedRangeNotSatisfiable && logTell[fnm] > 0) {
375 } else if resp.StatusCode >= 300 {
376 log.Errorf("error getting log data: %s", resp.Status)
379 logdata, err := io.ReadAll(resp.Body)
381 log.Errorf("error reading log data: %s", err)
384 if len(logdata) == 0 {
388 eol := bytes.Index(logdata, []byte{'\n'})
392 line := string(logdata[:eol])
393 logdata = logdata[eol+1:]
394 logTell[fnm] += int64(eol + 1)
399 if fnm == "stderr.txt" {
400 fmt.Fprint(os.Stderr, neednewline)
403 } else if fnm == "crunchstat.txt" {
404 m := reCrunchstat.FindStringSubmatch(line)
406 rss, _ := strconv.ParseInt(m[1], 10, 64)
407 fmt.Fprintf(os.Stderr, "%s rss %.3f GB \r", cr.UUID, float64(rss)/1e9)
416 logWait = logWait * 2
417 if logWait > logWaitMax {
421 logWaitDone = time.After(logWait)
424 fmt.Fprint(os.Stderr, neednewline)
426 if err := ctx.Err(); err != nil {
430 var c arvados.Container
431 err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil)
434 } else if c.State != arvados.ContainerStateComplete {
435 return "", fmt.Errorf("container did not complete: %s", c.State)
436 } else if c.ExitCode != 0 {
437 return "", fmt.Errorf("container exited %d", c.ExitCode)
439 return cr.OutputUUID, err
442 var collectionInPathRe = regexp.MustCompile(`^(.*/)?([0-9a-f]{32}\+[0-9]+|[0-9a-z]{5}-[0-9a-z]{5}-[0-9a-z]{15})(/.*)?$`)
444 func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error {
445 if runner.Mounts == nil {
446 runner.Mounts = make(map[string]map[string]interface{})
448 for _, path := range paths {
449 if *path == "" || *path == "-" {
452 m := collectionInPathRe.FindStringSubmatch(*path)
454 return fmt.Errorf("cannot find uuid in path: %q", *path)
457 mnt, ok := runner.Mounts["/mnt/"+collID]
459 mnt = map[string]interface{}{
460 "kind": "collection",
462 if len(collID) == 27 {
465 mnt["portable_data_hash"] = collID
467 runner.Mounts["/mnt/"+collID] = mnt
469 *path = "/mnt/" + collID + m[3]
474 var mtxMakeCommandCollection sync.Mutex
476 func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) {
477 mtxMakeCommandCollection.Lock()
478 defer mtxMakeCommandCollection.Unlock()
479 exe, err := ioutil.ReadFile("/proc/self/exe")
483 b2 := blake2b.Sum256(exe)
484 cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install"
485 var existing arvados.CollectionList
486 err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{
489 Filters: []arvados.Filter{
490 {Attr: "name", Operator: "=", Operand: cname},
491 {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID},
492 {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)},
498 if len(existing.Items) > 0 {
499 coll := existing.Items[0]
500 log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"])
501 return coll.UUID, nil
503 log.Printf("writing lightning binary to new collection %q", cname)
504 ac, err := arvadosclient.New(runner.Client)
508 kc := keepclient.New(ac)
509 var coll arvados.Collection
510 fs, err := coll.FileSystem(runner.Client, kc)
514 f, err := fs.OpenFile("lightning", os.O_CREATE|os.O_WRONLY, 0777)
518 _, err = f.Write(exe)
526 mtxt, err := fs.MarshalManifest(".")
530 err = runner.Client.RequestAndDecode(&coll, "POST", "arvados/v1/collections", nil, map[string]interface{}{
531 "collection": map[string]interface{}{
532 "owner_uuid": runner.ProjectUUID,
533 "manifest_text": mtxt,
535 "properties": map[string]interface{}{
536 "blake2b": fmt.Sprintf("%x", b2),
543 log.Printf("stored lightning binary in new collection %s", coll.UUID)
544 return coll.UUID, nil
547 // zopen returns a reader for the given file, using the arvados API
548 // instead of arv-mount/fuse where applicable, and transparently
549 // decompressing the input if fnm ends with ".gz".
550 func zopen(fnm string) (io.ReadCloser, error) {
552 if err != nil || !strings.HasSuffix(fnm, ".gz") {
555 rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024))
560 return gzipr{rdr, f}, nil
563 // gzipr wraps a ReadCloser and a Closer, presenting a single Close()
564 // method that closes both wrapped objects.
570 func (gr gzipr) Close() error {
571 e1 := gr.ReadCloser.Close()
572 e2 := gr.Closer.Close()
580 arvadosClientFromEnv = arvados.NewClientFromEnv()
581 keepClient *keepclient.KeepClient
582 siteFS arvados.CustomFileSystem
586 type file interface {
589 Readdir(n int) ([]os.FileInfo, error)
592 func open(fnm string) (file, error) {
593 if os.Getenv("ARVADOS_API_HOST") == "" {
596 m := collectionInPathRe.FindStringSubmatch(fnm)
600 collectionUUID := m[2]
601 collectionPath := m[3]
604 defer siteFSMtx.Unlock()
606 log.Info("setting up Arvados client")
607 ac, err := arvadosclient.New(arvadosClientFromEnv)
611 ac.Client = arvados.DefaultSecureClient
612 keepClient = keepclient.New(ac)
613 // Don't use keepclient's default short timeouts.
614 keepClient.HTTPClient = arvados.DefaultSecureClient
615 keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4}
616 siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient)
618 keepClient.BlockCache.MaxBlocks += 2
621 log.Infof("reading %q from %s using Arvados client", collectionPath, collectionUUID)
622 f, err := siteFS.Open("by_id/" + collectionUUID + collectionPath)
626 return &reduceCacheOnClose{file: f}, nil
629 type reduceCacheOnClose struct {
634 func (rc *reduceCacheOnClose) Close() error {
635 rc.once.Do(func() { keepClient.BlockCache.MaxBlocks -= 2 })
636 return rc.file.Close()