X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/57c0644d4fdc9c0cb67bba753c966a5e4d80d311..a2058d89dc74e903cc416b3c9ee87a58f79cfd81:/arvados.go diff --git a/arvados.go b/arvados.go index bd4f58a5e5..d4dcf7819e 100644 --- a/arvados.go +++ b/arvados.go @@ -1,9 +1,16 @@ -package main +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package lightning import ( + "bufio" + "context" "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net/url" "os" @@ -12,9 +19,11 @@ import ( "sync" "time" + "git.arvados.org/arvados.git/lib/cmd" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" "golang.org/x/crypto/blake2b" "golang.org/x/net/websocket" @@ -119,6 +128,7 @@ reconnect: wsURL := cluster.Services.Websocket.ExternalURL wsURL.Scheme = strings.Replace(wsURL.Scheme, "http", "ws", 1) wsURL.Path = "/websocket" + wsURLNoToken := wsURL.String() wsURL.RawQuery = url.Values{"api_token": []string{client.AuthToken}}.Encode() conn, err := websocket.Dial(wsURL.String(), "", cluster.Services.Controller.ExternalURL.String()) if err != nil { @@ -126,7 +136,7 @@ reconnect: time.Sleep(5 * time.Second) continue reconnect } - log.Printf("connected to websocket at %s", wsURL) + log.Printf("connected to websocket at %s", wsURLNoToken) client.mtx.Lock() client.wsconn = conn @@ -143,7 +153,7 @@ reconnect: "method": "subscribe", "filters": [][]interface{}{ {"object_uuid", "=", uuid}, - {"event_type", "in", []string{"stderr", "crunch-run", "update"}}, + {"event_type", "in", []string{"stderr", "crunch-run", "crunchstat", "update"}}, }, }) } @@ -175,19 +185,28 @@ reconnect: } } +var refreshTicker = time.NewTicker(5 * time.Second) + type arvadosContainerRunner struct { Client *arvados.Client Name string + OutputName string ProjectUUID string + APIAccess bool VCPUs int RAM int64 Prog string // if empty, run /proc/self/exe Args []string Mounts map[string]map[string]interface{} Priority int + KeepCache int // cache buffers per VCPU (0 for default) } func (runner *arvadosContainerRunner) Run() (string, error) { + return runner.RunContext(context.Background()) +} + +func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, error) { if runner.ProjectUUID == "" { return "", errors.New("cannot run arvados container: ProjectUUID not provided") } @@ -220,10 +239,19 @@ func (runner *arvadosContainerRunner) Run() (string, error) { if priority < 1 { priority = 500 } + keepCache := runner.KeepCache + if keepCache < 1 { + keepCache = 2 + } rc := arvados.RuntimeConstraints{ + API: &runner.APIAccess, VCPUs: runner.VCPUs, RAM: runner.RAM, - KeepCacheRAM: (1 << 26) * 2 * int64(runner.VCPUs), + KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs), + } + outname := &runner.OutputName + if *outname == "" { + outname = nil } var cr arvados.ContainerRequest err := runner.Client.RequestAndDecode(&cr, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{ @@ -235,9 +263,17 @@ func (runner *arvadosContainerRunner) Run() (string, error) { "mounts": mounts, "use_existing": true, "output_path": "/mnt/output", + "output_name": outname, "runtime_constraints": rc, "priority": runner.Priority, "state": arvados.ContainerRequestStateCommitted, + "scheduling_parameters": arvados.SchedulingParameters{ + Preemptible: true, + Partitions: []string{}, + }, + "environment": map[string]string{ + "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs), + }, }, }) if err != nil { @@ -252,50 +288,83 @@ func (runner *arvadosContainerRunner) Run() (string, error) { subscribedUUID := "" defer func() { if subscribedUUID != "" { + log.Printf("unsubscribe container UUID: %s", subscribedUUID) client.Unsubscribe(logch, subscribedUUID) } }() - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() + neednewline := "" lastState := cr.State refreshCR := func() { err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) if err != nil { + fmt.Fprint(os.Stderr, neednewline) log.Printf("error getting container request: %s", err) return } if lastState != cr.State { + fmt.Fprint(os.Stderr, neednewline) log.Printf("container request state: %s", cr.State) lastState = cr.State } if subscribedUUID != cr.ContainerUUID { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" if subscribedUUID != "" { + log.Printf("unsubscribe container UUID: %s", subscribedUUID) client.Unsubscribe(logch, subscribedUUID) } + log.Printf("subscribe container UUID: %s", cr.ContainerUUID) client.Subscribe(logch, cr.ContainerUUID) subscribedUUID = cr.ContainerUUID } } + var reCrunchstat = regexp.MustCompile(`mem .* rss`) +waitctr: for cr.State != arvados.ContainerRequestStateFinal { select { - case <-ticker.C: + case <-ctx.Done(): + err := runner.Client.RequestAndDecode(&cr, "PATCH", "arvados/v1/container_requests/"+cr.UUID, nil, map[string]interface{}{ + "container_request": map[string]interface{}{ + "priority": 0, + }, + }) + if err != nil { + log.Errorf("error while trying to cancel container request %s: %s", cr.UUID, err) + } + break waitctr + case <-refreshTicker.C: refreshCR() case msg := <-logch: switch msg.EventType { case "update": refreshCR() - default: + case "stderr": for _, line := range strings.Split(msg.Properties.Text, "\n") { if line != "" { + fmt.Fprint(os.Stderr, neednewline) + neednewline = "" log.Print(line) } } + case "crunchstat": + for _, line := range strings.Split(msg.Properties.Text, "\n") { + mem := reCrunchstat.FindString(line) + if mem != "" { + fmt.Fprintf(os.Stderr, "%s \r", mem) + neednewline = "\n" + } + } } } } + fmt.Fprint(os.Stderr, neednewline) + + if err := ctx.Err(); err != nil { + return "", err + } var c arvados.Container err = runner.Client.RequestAndDecode(&c, "GET", "arvados/v1/containers/"+cr.ContainerUUID, nil, nil) @@ -337,13 +406,17 @@ func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error { return nil } +var mtxMakeCommandCollection sync.Mutex + func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { + mtxMakeCommandCollection.Lock() + defer mtxMakeCommandCollection.Unlock() exe, err := ioutil.ReadFile("/proc/self/exe") if err != nil { return "", err } b2 := blake2b.Sum256(exe) - cname := fmt.Sprintf("lightning-%x", b2) + cname := "lightning " + cmd.Version.String() // must build with "make", not just "go install" var existing arvados.CollectionList err = runner.Client.RequestAndDecode(&existing, "GET", "arvados/v1/collections", nil, arvados.ListOptions{ Limit: 1, @@ -351,15 +424,16 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { Filters: []arvados.Filter{ {Attr: "name", Operator: "=", Operand: cname}, {Attr: "owner_uuid", Operator: "=", Operand: runner.ProjectUUID}, + {Attr: "properties.blake2b", Operator: "=", Operand: fmt.Sprintf("%x", b2)}, }, }) if err != nil { return "", err } if len(existing.Items) > 0 { - uuid := existing.Items[0].UUID - log.Printf("using lightning binary in existing collection %s (name is %q; did not verify whether content matches)", uuid, cname) - return uuid, nil + coll := existing.Items[0] + log.Printf("using lightning binary in existing collection %s (name is %q, hash is %q; did not verify whether content matches)", coll.UUID, cname, coll.Properties["blake2b"]) + return coll.UUID, nil } log.Printf("writing lightning binary to new collection %q", cname) ac, err := arvadosclient.New(runner.Client) @@ -393,6 +467,9 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { "owner_uuid": runner.ProjectUUID, "manifest_text": mtxt, "name": cname, + "properties": map[string]interface{}{ + "blake2b": fmt.Sprintf("%x", b2), + }, }, }) if err != nil { @@ -401,3 +478,98 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { log.Printf("stored lightning binary in new collection %s", coll.UUID) return coll.UUID, nil } + +// zopen returns a reader for the given file, using the arvados API +// instead of arv-mount/fuse where applicable, and transparently +// decompressing the input if fnm ends with ".gz". +func zopen(fnm string) (io.ReadCloser, error) { + f, err := open(fnm) + if err != nil || !strings.HasSuffix(fnm, ".gz") { + return f, err + } + rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024)) + if err != nil { + f.Close() + return nil, err + } + return gzipr{rdr, f}, nil +} + +// gzipr wraps a ReadCloser and a Closer, presenting a single Close() +// method that closes both wrapped objects. +type gzipr struct { + io.ReadCloser + io.Closer +} + +func (gr gzipr) Close() error { + e1 := gr.ReadCloser.Close() + e2 := gr.Closer.Close() + if e1 != nil { + return e1 + } + return e2 +} + +var ( + arvadosClientFromEnv = arvados.NewClientFromEnv() + keepClient *keepclient.KeepClient + siteFS arvados.CustomFileSystem + siteFSMtx sync.Mutex +) + +type file interface { + io.ReadCloser + io.Seeker + Readdir(n int) ([]os.FileInfo, error) +} + +func open(fnm string) (file, error) { + if os.Getenv("ARVADOS_API_HOST") == "" { + return os.Open(fnm) + } + m := collectionInPathRe.FindStringSubmatch(fnm) + if m == nil { + return os.Open(fnm) + } + uuid := m[2] + mnt := "/mnt/" + uuid + if fnm != mnt && !strings.HasPrefix(fnm, mnt+"/") { + return os.Open(fnm) + } + + siteFSMtx.Lock() + defer siteFSMtx.Unlock() + if siteFS == nil { + log.Info("setting up Arvados client") + ac, err := arvadosclient.New(arvadosClientFromEnv) + if err != nil { + return nil, err + } + ac.Client = arvados.DefaultSecureClient + keepClient = keepclient.New(ac) + // Don't use keepclient's default short timeouts. + keepClient.HTTPClient = arvados.DefaultSecureClient + keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4} + siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient) + } else { + keepClient.BlockCache.MaxBlocks += 2 + } + + log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid) + f, err := siteFS.Open("by_id/" + uuid + fnm[len(mnt):]) + if err != nil { + return nil, err + } + return &reduceCacheOnClose{file: f}, nil +} + +type reduceCacheOnClose struct { + file + once sync.Once +} + +func (rc *reduceCacheOnClose) Close() error { + rc.once.Do(func() { keepClient.BlockCache.MaxBlocks -= 2 }) + return rc.file.Close() +}