X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/00b2acd54dd1aa412f6f2bddc24b1bbb31c7ae3f..80c6ada0f2ea8412185572484900bdbbe036ed4e:/arvados.go diff --git a/arvados.go b/arvados.go index ec09637ae9..5d23286fa0 100644 --- a/arvados.go +++ b/arvados.go @@ -1,16 +1,23 @@ +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package lightning import ( + "bufio" + "bytes" "context" "encoding/json" "errors" "fmt" "io" "io/ioutil" + "net/http" "net/url" "os" "regexp" - "runtime" + "strconv" "strings" "sync" "time" @@ -19,6 +26,7 @@ import ( "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" "git.arvados.org/arvados.git/sdk/go/keepclient" + "github.com/klauspost/pgzip" log "github.com/sirupsen/logrus" "golang.org/x/crypto/blake2b" "golang.org/x/net/websocket" @@ -172,7 +180,7 @@ reconnect: } client.mtx.Lock() for ch := range client.notifying[msg.ObjectUUID] { - ch <- msg + go func() { ch <- msg }() } client.mtx.Unlock() } @@ -195,6 +203,7 @@ type arvadosContainerRunner struct { Mounts map[string]map[string]interface{} Priority int KeepCache int // cache buffers per VCPU (0 for default) + Preemptible bool } func (runner *arvadosContainerRunner) Run() (string, error) { @@ -239,7 +248,7 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e keepCache = 2 } rc := arvados.RuntimeConstraints{ - API: &runner.APIAccess, + API: runner.APIAccess, VCPUs: runner.VCPUs, RAM: runner.RAM, KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs), @@ -262,6 +271,14 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e "runtime_constraints": rc, "priority": runner.Priority, "state": arvados.ContainerRequestStateCommitted, + "scheduling_parameters": arvados.SchedulingParameters{ + Preemptible: runner.Preemptible, + Partitions: []string{}, + }, + "environment": map[string]string{ + "GOMAXPROCS": fmt.Sprintf("%d", rc.VCPUs), + }, + "container_count_max": 1, }, }) if err != nil { @@ -276,22 +293,28 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e subscribedUUID := "" defer func() { if subscribedUUID != "" { + log.Printf("unsubscribe container UUID: %s", subscribedUUID) client.Unsubscribe(logch, subscribedUUID) } }() neednewline := "" + logTell := map[string]int64{} lastState := cr.State refreshCR := func() { - err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Minute)) + defer cancel() + err = runner.Client.RequestAndDecodeContext(ctx, &cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) if err != nil { fmt.Fprint(os.Stderr, neednewline) + neednewline = "" log.Printf("error getting container request: %s", err) return } if lastState != cr.State { fmt.Fprint(os.Stderr, neednewline) + neednewline = "" log.Printf("container request state: %s", cr.State) lastState = cr.State } @@ -299,14 +322,21 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e fmt.Fprint(os.Stderr, neednewline) neednewline = "" if subscribedUUID != "" { + log.Printf("unsubscribe container UUID: %s", subscribedUUID) client.Unsubscribe(logch, subscribedUUID) } + log.Printf("subscribe container UUID: %s", cr.ContainerUUID) client.Subscribe(logch, cr.ContainerUUID) subscribedUUID = cr.ContainerUUID + logTell = map[string]int64{} } } - var reCrunchstat = regexp.MustCompile(`mem .* rss`) + var logWaitMax = time.Second * 10 + var logWaitMin = time.Second + var logWait = logWaitMin + var logWaitDone = time.After(logWait) + var reCrunchstat = regexp.MustCompile(`mem .* (\d+) rss`) waitctr: for cr.State != arvados.ContainerRequestStateFinal { select { @@ -323,26 +353,72 @@ waitctr: case <-refreshTicker.C: refreshCR() case msg := <-logch: - switch msg.EventType { - case "update": + if msg.EventType == "update" { refreshCR() - case "stderr": - for _, line := range strings.Split(msg.Properties.Text, "\n") { - if line != "" { + } + case <-logWaitDone: + any := false + for _, fnm := range []string{"stderr.txt", "crunchstat.txt"} { + req, err := http.NewRequest("GET", "https://"+runner.Client.APIHost+"/arvados/v1/container_requests/"+cr.UUID+"/log/"+cr.ContainerUUID+"/"+fnm, nil) + if err != nil { + log.Errorf("error preparing log request: %s", err) + continue + } + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", logTell[fnm])) + resp, err := runner.Client.Do(req) + if err != nil { + log.Errorf("error getting log data: %s", err) + continue + } else if (resp.StatusCode == http.StatusNotFound && logTell[fnm] == 0) || + (resp.StatusCode == http.StatusRequestedRangeNotSatisfiable && logTell[fnm] > 0) { + continue + } else if resp.StatusCode >= 300 { + log.Errorf("error getting log data: %s", resp.Status) + continue + } + logdata, err := io.ReadAll(resp.Body) + if err != nil { + log.Errorf("error reading log data: %s", err) + continue + } + if len(logdata) == 0 { + continue + } + for { + eol := bytes.Index(logdata, []byte{'\n'}) + if eol < 0 { + break + } + line := string(logdata[:eol]) + logdata = logdata[eol+1:] + logTell[fnm] += int64(eol + 1) + if len(line) == 0 { + continue + } + any = true + if fnm == "stderr.txt" { fmt.Fprint(os.Stderr, neednewline) neednewline = "" log.Print(line) + } else if fnm == "crunchstat.txt" { + m := reCrunchstat.FindStringSubmatch(line) + if m != nil { + rss, _ := strconv.ParseInt(m[1], 10, 64) + fmt.Fprintf(os.Stderr, "%s rss %.3f GB \r", cr.UUID, float64(rss)/1e9) + neednewline = "\n" + } } } - case "crunchstat": - for _, line := range strings.Split(msg.Properties.Text, "\n") { - mem := reCrunchstat.FindString(line) - if mem != "" { - fmt.Fprintf(os.Stderr, "%s \r", mem) - neednewline = "\n" - } + } + if any { + logWait = logWaitMin + } else { + logWait = logWait * 2 + if logWait > logWaitMax { + logWait = logWaitMax } } + logWaitDone = time.After(logWait) } } fmt.Fprint(os.Stderr, neednewline) @@ -377,16 +453,20 @@ func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error { if m == nil { return fmt.Errorf("cannot find uuid in path: %q", *path) } - uuid := m[2] - mnt, ok := runner.Mounts["/mnt/"+uuid] + collID := m[2] + mnt, ok := runner.Mounts["/mnt/"+collID] if !ok { mnt = map[string]interface{}{ "kind": "collection", - "uuid": uuid, } - runner.Mounts["/mnt/"+uuid] = mnt + if len(collID) == 27 { + mnt["uuid"] = collID + } else { + mnt["portable_data_hash"] = collID + } + runner.Mounts["/mnt/"+collID] = mnt } - *path = "/mnt/" + uuid + m[3] + *path = "/mnt/" + collID + m[3] } return nil } @@ -464,13 +544,52 @@ func (runner *arvadosContainerRunner) makeCommandCollection() (string, error) { return coll.UUID, nil } +// zopen returns a reader for the given file, using the arvados API +// instead of arv-mount/fuse where applicable, and transparently +// decompressing the input if fnm ends with ".gz". +func zopen(fnm string) (io.ReadCloser, error) { + f, err := open(fnm) + if err != nil || !strings.HasSuffix(fnm, ".gz") { + return f, err + } + rdr, err := pgzip.NewReader(bufio.NewReaderSize(f, 4*1024*1024)) + if err != nil { + f.Close() + return nil, err + } + return gzipr{rdr, f}, nil +} + +// gzipr wraps a ReadCloser and a Closer, presenting a single Close() +// method that closes both wrapped objects. +type gzipr struct { + io.ReadCloser + io.Closer +} + +func (gr gzipr) Close() error { + e1 := gr.ReadCloser.Close() + e2 := gr.Closer.Close() + if e1 != nil { + return e1 + } + return e2 +} + var ( arvadosClientFromEnv = arvados.NewClientFromEnv() + keepClient *keepclient.KeepClient siteFS arvados.CustomFileSystem siteFSMtx sync.Mutex ) -func open(fnm string) (io.ReadCloser, error) { +type file interface { + io.ReadCloser + io.Seeker + Readdir(n int) ([]os.FileInfo, error) +} + +func open(fnm string) (file, error) { if os.Getenv("ARVADOS_API_HOST") == "" { return os.Open(fnm) } @@ -478,11 +597,8 @@ func open(fnm string) (io.ReadCloser, error) { if m == nil { return os.Open(fnm) } - uuid := m[2] - mnt := "/mnt/" + uuid + "/" - if !strings.HasPrefix(fnm, mnt) { - return os.Open(fnm) - } + collectionUUID := m[2] + collectionPath := m[3] siteFSMtx.Lock() defer siteFSMtx.Unlock() @@ -493,15 +609,29 @@ func open(fnm string) (io.ReadCloser, error) { return nil, err } ac.Client = arvados.DefaultSecureClient - kc := keepclient.New(ac) + keepClient = keepclient.New(ac) // Don't use keepclient's default short timeouts. - kc.HTTPClient = arvados.DefaultSecureClient - // Guess max concurrent readers, hope to avoid cache - // thrashing. - kc.BlockCache = &keepclient.BlockCache{MaxBlocks: runtime.NumCPU() * 3} - siteFS = arvadosClientFromEnv.SiteFileSystem(kc) + keepClient.HTTPClient = arvados.DefaultSecureClient + keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4} + siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient) + } else { + keepClient.BlockCache.MaxBlocks += 2 } - log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid) - return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):]) + log.Infof("reading %q from %s using Arvados client", collectionPath, collectionUUID) + f, err := siteFS.Open("by_id/" + collectionUUID + collectionPath) + if err != nil { + return nil, err + } + return &reduceCacheOnClose{file: f}, nil +} + +type reduceCacheOnClose struct { + file + once sync.Once +} + +func (rc *reduceCacheOnClose) Close() error { + rc.once.Do(func() { keepClient.BlockCache.MaxBlocks -= 2 }) + return rc.file.Close() }