X-Git-Url: https://git.arvados.org/lightning.git/blobdiff_plain/5571e392bb3ca21e31ddabe21958cb300ca8b731..a6d170a665e39e9d269363617f6cb5acffa7b175:/arvados.go diff --git a/arvados.go b/arvados.go index 0eb99686aa..68f275508d 100644 --- a/arvados.go +++ b/arvados.go @@ -1,3 +1,7 @@ +// Copyright (C) The Lightning Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + package lightning import ( @@ -11,6 +15,7 @@ import ( "net/url" "os" "regexp" + "strconv" "strings" "sync" "time" @@ -173,7 +178,7 @@ reconnect: } client.mtx.Lock() for ch := range client.notifying[msg.ObjectUUID] { - ch <- msg + go func() { ch <- msg }() } client.mtx.Unlock() } @@ -240,7 +245,7 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e keepCache = 2 } rc := arvados.RuntimeConstraints{ - API: &runner.APIAccess, + API: runner.APIAccess, VCPUs: runner.VCPUs, RAM: runner.RAM, KeepCacheRAM: (1 << 26) * int64(keepCache) * int64(runner.VCPUs), @@ -264,7 +269,7 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e "priority": runner.Priority, "state": arvados.ContainerRequestStateCommitted, "scheduling_parameters": arvados.SchedulingParameters{ - Preemptible: true, + Preemptible: false, Partitions: []string{}, }, "environment": map[string]string{ @@ -293,14 +298,18 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e lastState := cr.State refreshCR := func() { - err = runner.Client.RequestAndDecode(&cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Minute)) + defer cancel() + err = runner.Client.RequestAndDecodeContext(ctx, &cr, "GET", "arvados/v1/container_requests/"+cr.UUID, nil, nil) if err != nil { fmt.Fprint(os.Stderr, neednewline) + neednewline = "" log.Printf("error getting container request: %s", err) return } if lastState != cr.State { fmt.Fprint(os.Stderr, neednewline) + neednewline = "" log.Printf("container request state: %s", cr.State) lastState = cr.State } @@ -317,7 +326,7 @@ func (runner *arvadosContainerRunner) RunContext(ctx context.Context) (string, e } } - var reCrunchstat = regexp.MustCompile(`mem .* rss`) + var reCrunchstat = regexp.MustCompile(`mem .* (\d+) rss`) waitctr: for cr.State != arvados.ContainerRequestStateFinal { select { @@ -347,9 +356,10 @@ waitctr: } case "crunchstat": for _, line := range strings.Split(msg.Properties.Text, "\n") { - mem := reCrunchstat.FindString(line) - if mem != "" { - fmt.Fprintf(os.Stderr, "%s \r", mem) + m := reCrunchstat.FindStringSubmatch(line) + if m != nil { + rss, _ := strconv.ParseInt(m[1], 10, 64) + fmt.Fprintf(os.Stderr, "%s rss %.3f GB \r", cr.UUID, float64(rss)/1e9) neednewline = "\n" } } @@ -388,16 +398,20 @@ func (runner *arvadosContainerRunner) TranslatePaths(paths ...*string) error { if m == nil { return fmt.Errorf("cannot find uuid in path: %q", *path) } - uuid := m[2] - mnt, ok := runner.Mounts["/mnt/"+uuid] + collID := m[2] + mnt, ok := runner.Mounts["/mnt/"+collID] if !ok { mnt = map[string]interface{}{ "kind": "collection", - "uuid": uuid, } - runner.Mounts["/mnt/"+uuid] = mnt + if len(collID) == 27 { + mnt["uuid"] = collID + } else { + mnt["portable_data_hash"] = collID + } + runner.Mounts["/mnt/"+collID] = mnt } - *path = "/mnt/" + uuid + m[3] + *path = "/mnt/" + collID + m[3] } return nil } @@ -516,6 +530,7 @@ var ( type file interface { io.ReadCloser + io.Seeker Readdir(n int) ([]os.FileInfo, error) } @@ -527,11 +542,8 @@ func open(fnm string) (file, error) { if m == nil { return os.Open(fnm) } - uuid := m[2] - mnt := "/mnt/" + uuid + "/" - if !strings.HasPrefix(fnm, mnt) { - return os.Open(fnm) - } + collectionUUID := m[2] + collectionPath := m[3] siteFSMtx.Lock() defer siteFSMtx.Unlock() @@ -548,9 +560,23 @@ func open(fnm string) (file, error) { keepClient.BlockCache = &keepclient.BlockCache{MaxBlocks: 4} siteFS = arvadosClientFromEnv.SiteFileSystem(keepClient) } else { - keepClient.BlockCache.MaxBlocks++ + keepClient.BlockCache.MaxBlocks += 2 } - log.Infof("reading %q from %s using Arvados client", fnm[len(mnt):], uuid) - return siteFS.Open("by_id/" + uuid + "/" + fnm[len(mnt):]) + log.Infof("reading %q from %s using Arvados client", collectionPath, collectionUUID) + f, err := siteFS.Open("by_id/" + collectionUUID + collectionPath) + if err != nil { + return nil, err + } + return &reduceCacheOnClose{file: f}, nil +} + +type reduceCacheOnClose struct { + file + once sync.Once +} + +func (rc *reduceCacheOnClose) Close() error { + rc.once.Do(func() { keepClient.BlockCache.MaxBlocks -= 2 }) + return rc.file.Close() }