18264: Merge branch 'main' into 18264-cwl-test-running-improvements
authorWard Vandewege <ward@curii.com>
Thu, 28 Oct 2021 17:40:12 +0000 (13:40 -0400)
committerWard Vandewege <ward@curii.com>
Thu, 28 Oct 2021 17:40:12 +0000 (13:40 -0400)
Arvados-DCO-1.1-Signed-off-by: Ward Vandewege <ward@curii.com>

26 files changed:
doc/admin/upgrading.html.textile.liquid
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/config/load.go
lib/crunchrun/background.go
lib/crunchrun/bufthenwrite.go [new file with mode: 0644]
lib/crunchrun/crunchrun.go
lib/crunchrun/integration_test.go
lib/crunchrun/logging.go
lib/crunchrun/logging_test.go
lib/dispatchcloud/node_size.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/pool_test.go
lib/dispatchcloud/worker/runner.go
lib/dispatchcloud/worker/worker_test.go
sdk/go/arvados/config.go
services/api/app/models/user.rb
services/api/db/migrate/20211027154300_delete_disabled_user_tokens_and_keys.rb [new file with mode: 0644]
services/api/db/structure.sql
services/api/test/integration/users_test.rb
services/keep-web/handler_test.go
services/keepstore/azure_blob_volume.go
services/keepstore/handler_test.go
services/keepstore/handlers.go

index 399ec35d2b75f381d0aed546e4d44e2ee10216a2..0aea90bd0ddab04164ee6a668e26c1851ab6e934 100644 (file)
@@ -35,9 +35,20 @@ TODO: extract this information based on git commit messages and generate changel
 <div class="releasenotes">
 </notextile>
 
-h2(#main). development main (as of 2021-09-07)
+h2(#main). development main (as of 2021-10-27)
 
-"Upgrading from 2.2.0":#v2_2_0
+"previous: Upgrading from 2.3.0":#v2_3_0
+
+h3. Dedicated keepstore process for each container
+
+When Arvados runs a container via @arvados-dispatch-cloud@, the @crunch-run@ supervisor process now brings up its own keepstore server to handle I/O for mounted collections, outputs, and logs. With the default configuration, the keepstore process allocates one 64 MiB block buffer per VCPU requested by the container. For most workloads this will increase throughput, reduce total network traffic, and make it possible to run more containers at once without provisioning additional keepstore nodes to handle the I/O load.
+* If you have containers that can effectively handle multiple I/O threads per VCPU, consider increasing the @Containers.LocalKeepBlobBuffersPerVCPU@ value.
+* If you already have a robust permanent keepstore infrastructure, you can set @Containers.LocalKeepBlobBuffersPerVCPU@ to 0 to disable this feature and preserve the previous behavior of sending container I/O traffic to your separately provisioned keepstore servers.
+* This feature is enabled only if no volumes use @AccessViaHosts@, and no volumes have underlying @Replication@ less than @Collections.DefaultReplication@. If the feature is configured but cannot be enabled due to an incompatible volume configuration, this will be noted in the @crunch-run.txt@ file in the container log.
+
+h2(#v2_3_0). v2.3.0 (2021-10-27)
+
+"previous: Upgrading to 2.2.0":#v2_2_0
 
 h3. Ubuntu 18.04 packages for arvados-api-server and arvados-workbench now conflict with ruby-bundler
 
@@ -69,7 +80,7 @@ Typically a docker image collection contains a single @.tar@ file at the top lev
 
 h2(#v2_2_0). v2.2.0 (2021-06-03)
 
-"Upgrading from 2.1.0":#v2_1_0
+"previous: Upgrading to 2.1.0":#v2_1_0
 
 h3. New spelling of S3 credential configs
 
@@ -104,7 +115,7 @@ The ForceLegacyAPI14 configuration option has been removed. In the unlikely even
 
 h2(#v2_1_0). v2.1.0 (2020-10-13)
 
-"Upgrading from 2.0.0":#v2_0_0
+"previous: Upgrading to 2.0.0":#v2_0_0
 
 h3. LoginCluster conflicts with other Login providers
 
@@ -191,7 +202,7 @@ As a side effect of new permission system constraints, "star" links (indicating
 
 h2(#v2_0_0). v2.0.0 (2020-02-07)
 
-"Upgrading from 1.4":#v1_4_1
+"previous: Upgrading to 1.4.1":#v1_4_1
 
 Arvados 2.0 is a major upgrade, with many changes.  Please read these upgrade notes carefully before you begin.
 
@@ -319,7 +330,7 @@ The API server accepts both PUT and PATCH for updates, but they will be normaliz
 
 h2(#v1_4_1). v1.4.1 (2019-09-20)
 
-"Upgrading from 1.4.0":#v1_4_0
+"previous: Upgrading to 1.4.0":#v1_4_0
 
 h3. Centos7 Python 3 dependency upgraded to rh-python36
 
@@ -327,7 +338,7 @@ The Python 3 dependency for Centos7 Arvados packages was upgraded from rh-python
 
 h2(#v1_4_0). v1.4.0 (2019-06-05)
 
-"Upgrading from 1.3.3":#v1_3_3
+"previous: Upgrading to 1.3.3":#v1_3_3
 
 h3. Populating the new file_count and file_size_total columns on the collections table
 
@@ -434,7 +445,7 @@ Arvados is migrating to a centralized configuration file for all components.  Du
 
 h2(#v1_3_3). v1.3.3 (2019-05-14)
 
-"Upgrading from 1.3.0":#v1_3_0
+"previous: Upgrading to 1.3.0":#v1_3_0
 
 This release corrects a potential data loss issue, if you are running Arvados 1.3.0 or 1.3.1 we strongly recommended disabling @keep-balance@ until you can upgrade to 1.3.3 or 1.4.0. With keep-balance disabled, there is no chance of data loss.
 
@@ -442,7 +453,7 @@ We've put together a "wiki page":https://dev.arvados.org/projects/arvados/wiki/R
 
 h2(#v1_3_0). v1.3.0 (2018-12-05)
 
-"Upgrading from 1.2":#v1_2_0
+"previous: Upgrading to 1.2":#v1_2_0
 
 This release includes several database migrations, which will be executed automatically as part of the API server upgrade. On large Arvados installations, these migrations will take a while. We've seen the upgrade take 30 minutes or more on installations with a lot of collections.
 
@@ -456,7 +467,7 @@ There are no special upgrade notes for this release.
 
 h2(#v1_2_0). v1.2.0 (2018-09-05)
 
-"Upgrading from 1.1.2 or 1.1.3":#v1_1_2
+"previous: Upgrading to 1.1.2 or 1.1.3":#v1_1_2
 
 h3. Regenerate Postgres table statistics
 
@@ -488,7 +499,7 @@ Verify your setup by confirming that API calls appear in the controller's logs (
 
 h2(#v1_1_4). v1.1.4 (2018-04-10)
 
-"Upgrading from 1.1.3":#v1_1_3
+"previous: Upgrading to 1.1.3":#v1_1_3
 
 h3. arvados-cwl-runner regressions (2018-04-05)
 
@@ -621,7 +632,7 @@ There are no special upgrade notes for this release.
 
 h2(#v1_1_2). v1.1.2 (2017-12-22)
 
-"Upgrading from 1.1.0 or 1.1.1":#v1_1_0
+"previous: Upgrading to 1.1.0 or 1.1.1":#v1_1_0
 
 h3. The minimum version for Postgres is now 9.4 (2017-12-08)
 
index 3ff5382059033d2987073207d7fd52a5b6969429..97ded6bf6863739e23ed53d365be8db1014a5b35 100644 (file)
@@ -911,6 +911,42 @@ Clusters:
       # Container runtime: "docker" (default) or "singularity"
       RuntimeEngine: docker
 
+      # When running a container, run a dedicated keepstore process,
+      # using the specified number of 64 MiB memory buffers per
+      # allocated CPU core (VCPUs in the container's runtime
+      # constraints). The dedicated keepstore handles I/O for
+      # collections mounted in the container, as well as saving
+      # container logs.
+      #
+      # A zero value disables this feature.
+      #
+      # In order for this feature to be activated, no volume may use
+      # AccessViaHosts, and each volume must have Replication higher
+      # than Collections.DefaultReplication. If these requirements are
+      # not satisfied, the feature is disabled automatically
+      # regardless of the value given here.
+      #
+      # Note that when this configuration is enabled, the entire
+      # cluster configuration file, including the system root token,
+      # is copied to the worker node and held in memory for the
+      # duration of the container.
+      LocalKeepBlobBuffersPerVCPU: 1
+
+      # When running a dedicated keepstore process for a container
+      # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+      # messages to keepstore.txt in the container's log collection.
+      #
+      # These log messages can reveal some volume configuration
+      # details, error messages from the cloud storage provider, etc.,
+      # which are not otherwise visible to users.
+      #
+      # Accepted values:
+      # * "none" -- no keepstore.txt file
+      # * "all" -- all logs, including request and response lines
+      # * "errors" -- all logs except "response" logs with 2xx
+      #   response codes and "request" logs
+      LocalKeepLogsToContainerLog: none
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
index 92e2d7b4d522e2b1a07b8a5602f3318f04720645..e36d6e76cae40d17c54217e643bf82341e298b87 100644 (file)
@@ -119,6 +119,8 @@ var whitelist = map[string]bool{
        "Containers.JobsAPI":                                  true,
        "Containers.JobsAPI.Enable":                           true,
        "Containers.JobsAPI.GitInternalDir":                   false,
+       "Containers.LocalKeepBlobBuffersPerVCPU":              false,
+       "Containers.LocalKeepLogsToContainerLog":              false,
        "Containers.Logging":                                  false,
        "Containers.LogReuseDecisions":                        false,
        "Containers.LSF":                                      false,
index 2fe207eff8c500cdb4d71c7c45588449a57d681f..f7849d6142cda6e4353a076a90a561295f7fa034 100644 (file)
@@ -917,6 +917,42 @@ Clusters:
       # Container runtime: "docker" (default) or "singularity"
       RuntimeEngine: docker
 
+      # When running a container, run a dedicated keepstore process,
+      # using the specified number of 64 MiB memory buffers per
+      # allocated CPU core (VCPUs in the container's runtime
+      # constraints). The dedicated keepstore handles I/O for
+      # collections mounted in the container, as well as saving
+      # container logs.
+      #
+      # A zero value disables this feature.
+      #
+      # In order for this feature to be activated, no volume may use
+      # AccessViaHosts, and each volume must have Replication higher
+      # than Collections.DefaultReplication. If these requirements are
+      # not satisfied, the feature is disabled automatically
+      # regardless of the value given here.
+      #
+      # Note that when this configuration is enabled, the entire
+      # cluster configuration file, including the system root token,
+      # is copied to the worker node and held in memory for the
+      # duration of the container.
+      LocalKeepBlobBuffersPerVCPU: 1
+
+      # When running a dedicated keepstore process for a container
+      # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+      # messages to keepstore.txt in the container's log collection.
+      #
+      # These log messages can reveal some volume configuration
+      # details, error messages from the cloud storage provider, etc.,
+      # which are not otherwise visible to users.
+      #
+      # Accepted values:
+      # * "none" -- no keepstore.txt file
+      # * "all" -- all logs, including request and response lines
+      # * "errors" -- all logs except "response" logs with 2xx
+      #   response codes and "request" logs
+      LocalKeepLogsToContainerLog: none
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
index 248960beb99f875620dca5ee7738f8575877c57d..956a47b1a4ac2ef992958739d5189eaf5e519ed5 100644 (file)
@@ -295,6 +295,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
                        ldr.checkToken(fmt.Sprintf("Clusters.%s.SystemRootToken", id), cc.SystemRootToken),
                        ldr.checkToken(fmt.Sprintf("Clusters.%s.Collections.BlobSigningKey", id), cc.Collections.BlobSigningKey),
                        checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
+                       ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"),
                        ldr.checkEmptyKeepstores(cc),
                        ldr.checkUnlistedKeepstores(cc),
                        ldr.checkStorageClasses(cc),
@@ -338,6 +339,15 @@ func (ldr *Loader) checkToken(label, token string) error {
        return nil
 }
 
+func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
+       for _, s := range accepted {
+               if s == value {
+                       return nil
+               }
+       }
+       return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
+}
+
 func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
 cluster:
        for id, cc := range cfg.Clusters {
index 4bb249380fd98b2e340bc4bd3bacb2b78d5f0a47..8a919bc5e2ba3080283d39a9f9784b1eddbec37d 100644 (file)
@@ -36,10 +36,10 @@ type procinfo struct {
 //
 // Stdout and stderr in the child process are sent to the systemd
 // journal using the systemd-cat program.
-func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
-       return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
 }
-func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
        lockfile, err := func() (*os.File, error) {
                // We must hold the dir-level lock between
                // opening/creating the lockfile and acquiring LOCK_EX
@@ -77,20 +77,24 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e
                // invoked as "/path/to/crunch-run"
                execargs = append([]string{prog}, execargs...)
        }
-       execargs = append([]string{
-               // Here, if the inner systemd-cat can't exec
-               // crunch-run, it writes an error message to stderr,
-               // and the outer systemd-cat writes it to the journal
-               // where the operator has a chance to discover it. (If
-               // we only used one systemd-cat command, it would be
-               // up to us to report the error -- but we are going to
-               // detach and exit, not wait for something to appear
-               // on stderr.)  Note these systemd-cat calls don't
-               // result in additional processes -- they just connect
-               // stderr/stdout to sockets and call exec().
-               "systemd-cat", "--identifier=crunch-run",
-               "systemd-cat", "--identifier=crunch-run",
-       }, execargs...)
+       if _, err := exec.LookPath("systemd-cat"); err == nil {
+               execargs = append([]string{
+                       // Here, if the inner systemd-cat can't exec
+                       // crunch-run, it writes an error message to
+                       // stderr, and the outer systemd-cat writes it
+                       // to the journal where the operator has a
+                       // chance to discover it. (If we only used one
+                       // systemd-cat command, it would be up to us
+                       // to report the error -- but we are going to
+                       // detach and exit, not wait for something to
+                       // appear on stderr.)  Note these systemd-cat
+                       // calls don't result in additional processes
+                       // -- they just connect stderr/stdout to
+                       // sockets and call exec().
+                       "systemd-cat", "--identifier=crunch-run",
+                       "systemd-cat", "--identifier=crunch-run",
+               }, execargs...)
+       }
 
        cmd := exec.Command(execargs[0], execargs[1:]...)
        // Child inherits lockfile.
@@ -99,10 +103,26 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e
        // from parent (sshd) while sending lockfile content to
        // caller.
        cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+       // We need to manage our own OS pipe here to ensure the child
+       // process reads all of our stdin pipe before we return.
+       piper, pipew, err := os.Pipe()
+       if err != nil {
+               return err
+       }
+       defer pipew.Close()
+       cmd.Stdin = piper
        err = cmd.Start()
        if err != nil {
                return fmt.Errorf("exec %s: %s", cmd.Path, err)
        }
+       _, err = io.Copy(pipew, stdin)
+       if err != nil {
+               return err
+       }
+       err = pipew.Close()
+       if err != nil {
+               return err
+       }
 
        w := io.MultiWriter(stdout, lockfile)
        return json.NewEncoder(w).Encode(procinfo{
diff --git a/lib/crunchrun/bufthenwrite.go b/lib/crunchrun/bufthenwrite.go
new file mode 100644 (file)
index 0000000..2d1c407
--- /dev/null
@@ -0,0 +1,34 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "bytes"
+       "io"
+       "sync"
+)
+
+type bufThenWrite struct {
+       buf bytes.Buffer
+       w   io.Writer
+       mtx sync.Mutex
+}
+
+func (btw *bufThenWrite) SetWriter(w io.Writer) error {
+       btw.mtx.Lock()
+       defer btw.mtx.Unlock()
+       btw.w = w
+       _, err := io.Copy(w, &btw.buf)
+       return err
+}
+
+func (btw *bufThenWrite) Write(p []byte) (int, error) {
+       btw.mtx.Lock()
+       defer btw.mtx.Unlock()
+       if btw.w == nil {
+               btw.w = &btw.buf
+       }
+       return btw.w.Write(p)
+}
index 3036d5555a1c197668f5ff156f5368e02e7c3494..8f3a30203911187c28b71c405a92caac8cab14e5 100644 (file)
@@ -6,6 +6,7 @@ package crunchrun
 
 import (
        "bytes"
+       "context"
        "encoding/json"
        "errors"
        "flag"
@@ -13,6 +14,8 @@ import (
        "io"
        "io/ioutil"
        "log"
+       "net"
+       "net/http"
        "os"
        "os/exec"
        "os/signal"
@@ -33,13 +36,20 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
-       "golang.org/x/net/context"
 )
 
 type command struct{}
 
 var Command = command{}
 
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+       Env         map[string]string
+       KeepBuffers int
+       Cluster     *arvados.Cluster
+}
+
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@ -127,6 +137,8 @@ type ContainerRunner struct {
        finalState    string
        parentTemp    string
 
+       keepstoreLogger  io.WriteCloser
+       keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
        hoststatLogger   io.WriteCloser
@@ -1270,6 +1282,16 @@ func (runner *ContainerRunner) CommitLogs() error {
                runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
        }()
 
+       if runner.keepstoreLogger != nil {
+               // Flush any buffered logs from our local keepstore
+               // process.  Discard anything logged after this point
+               // -- it won't end up in the log collection, so
+               // there's no point writing it to the collectionfs.
+               runner.keepstoreLogbuf.SetWriter(io.Discard)
+               runner.keepstoreLogger.Close()
+               runner.keepstoreLogger = nil
+       }
+
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
                // we must be closing the re-opened log, which won't
@@ -1278,6 +1300,7 @@ func (runner *ContainerRunner) CommitLogs() error {
                // -- it exists only to send logs to other channels.
                return nil
        }
+
        saved, err := runner.saveLogCollection(true)
        if err != nil {
                return fmt.Errorf("error saving log collection: %s", err)
@@ -1640,6 +1663,7 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 }
 
 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       log := log.New(stderr, "", 0)
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
        cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
@@ -1647,7 +1671,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
        detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
-       stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+       stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1677,33 +1701,45 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
-       if *stdinEnv && !ignoreDetachFlag {
-               // Load env vars on stdin if asked (but not in a
-               // detached child process, in which case stdin is
-               // /dev/null).
-               err := loadEnv(os.Stdin)
-               if err != nil {
-                       log.Print(err)
-                       return 1
-               }
-       }
-
        containerUUID := flags.Arg(0)
 
        switch {
        case *detach && !ignoreDetachFlag:
-               return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
+               return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
        case *kill >= 0:
                return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
        case *list:
                return ListProcesses(os.Stdout, os.Stderr)
        }
 
-       if containerUUID == "" {
+       if len(containerUUID) != 27 {
                log.Printf("usage: %s [options] UUID", prog)
                return 1
        }
 
+       var conf ConfigData
+       if *stdinConfig {
+               err := json.NewDecoder(stdin).Decode(&conf)
+               if err != nil {
+                       log.Printf("decode stdin: %s", err)
+                       return 1
+               }
+               for k, v := range conf.Env {
+                       err = os.Setenv(k, v)
+                       if err != nil {
+                               log.Printf("setenv(%q): %s", k, err)
+                               return 1
+                       }
+               }
+               if conf.Cluster != nil {
+                       // ClusterID is missing from the JSON
+                       // representation, but we need it to generate
+                       // a valid config file for keepstore, so we
+                       // fill it using the container UUID prefix.
+                       conf.Cluster.ClusterID = containerUUID[:5]
+               }
+       }
+
        log.Printf("crunch-run %s started", cmd.Version.String())
        time.Sleep(*sleep)
 
@@ -1711,6 +1747,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
+       var keepstoreLogbuf bufThenWrite
+       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
+       if err != nil {
+               log.Print(err)
+               return 1
+       }
+       if keepstore != nil {
+               defer keepstore.Process.Kill()
+       }
+
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Printf("%s: %v", containerUUID, err)
@@ -1718,9 +1764,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
        api.Retries = 8
 
-       kc, kcerr := keepclient.MakeKeepClient(api)
-       if kcerr != nil {
-               log.Printf("%s: %v", containerUUID, kcerr)
+       kc, err := keepclient.MakeKeepClient(api)
+       if err != nil {
+               log.Printf("%s: %v", containerUUID, err)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
@@ -1732,6 +1778,43 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       if keepstore == nil {
+               // Log explanation (if any) for why we're not running
+               // a local keepstore.
+               var buf bytes.Buffer
+               keepstoreLogbuf.SetWriter(&buf)
+               if buf.Len() > 0 {
+                       cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
+               }
+       } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+               cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+               keepstoreLogbuf.SetWriter(io.Discard)
+       } else {
+               cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+               logwriter, err := cr.NewLogWriter("keepstore")
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+               var writer io.WriteCloser = cr.keepstoreLogger
+               if logWhat == "errors" {
+                       writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+               } else if logWhat != "all" {
+                       // should have been caught earlier by
+                       // dispatcher's config loader
+                       log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+                       return 1
+               }
+               err = keepstoreLogbuf.SetWriter(writer)
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               cr.keepstoreLogbuf = &keepstoreLogbuf
+       }
+
        switch *runtimeEngine {
        case "docker":
                cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
@@ -1819,21 +1902,98 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        return 0
 }
 
-func loadEnv(rdr io.Reader) error {
-       buf, err := ioutil.ReadAll(rdr)
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+       if configData.Cluster == nil || configData.KeepBuffers < 1 {
+               return nil, nil
+       }
+       for uuid, vol := range configData.Cluster.Volumes {
+               if len(vol.AccessViaHosts) > 0 {
+                       fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+                       return nil, nil
+               }
+               if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+                       fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+                       return nil, nil
+               }
+       }
+
+       // Rather than have an alternate way to tell keepstore how
+       // many buffers to use when starting it this way, we just
+       // modify the cluster configuration that we feed it on stdin.
+       configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+       ln, err := net.Listen("tcp", "localhost:0")
+       if err != nil {
+               return nil, err
+       }
+       _, port, err := net.SplitHostPort(ln.Addr().String())
+       if err != nil {
+               ln.Close()
+               return nil, err
+       }
+       ln.Close()
+       url := "http://localhost:" + port
+
+       fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+       var confJSON bytes.Buffer
+       err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+               Clusters: map[string]arvados.Cluster{
+                       configData.Cluster.ClusterID: *configData.Cluster,
+               },
+       })
        if err != nil {
-               return fmt.Errorf("read stdin: %s", err)
+               return nil, err
        }
-       var env map[string]string
-       err = json.Unmarshal(buf, &env)
+       cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+       if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+               // If we're a 'go test' process, running
+               // /proc/self/exe would start the test suite in a
+               // child process, which is not what we want.
+               cmd.Path, _ = exec.LookPath("go")
+               cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+               cmd.Env = os.Environ()
+       }
+       cmd.Stdin = &confJSON
+       cmd.Stdout = logbuf
+       cmd.Stderr = logbuf
+       cmd.Env = append(cmd.Env,
+               "GOGC=10",
+               "ARVADOS_SERVICE_INTERNAL_URL="+url)
+       err = cmd.Start()
        if err != nil {
-               return fmt.Errorf("decode stdin: %s", err)
+               return nil, fmt.Errorf("error starting keepstore process: %w", err)
        }
-       for k, v := range env {
-               err = os.Setenv(k, v)
+       cmdExited := false
+       go func() {
+               cmd.Wait()
+               cmdExited = true
+       }()
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+       defer cancel()
+       poll := time.NewTicker(time.Second / 10)
+       defer poll.Stop()
+       client := http.Client{}
+       for range poll.C {
+               testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+               testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
                if err != nil {
-                       return fmt.Errorf("setenv(%q): %s", k, err)
+                       return nil, err
+               }
+               resp, err := client.Do(testReq)
+               if err == nil {
+                       resp.Body.Close()
+                       if resp.StatusCode == http.StatusOK {
+                               break
+                       }
+               }
+               if cmdExited {
+                       return nil, fmt.Errorf("keepstore child process exited")
+               }
+               if ctx.Err() != nil {
+                       return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
                }
        }
-       return nil
+       os.Setenv("ARVADOS_KEEP_SERVICES", url)
+       return cmd, nil
 }
index c688248c64fbdc26330144ebeafb3abf8c42edc6..9b797fd8676a5dc5690c15ce7adbf5840488a292 100644 (file)
@@ -6,6 +6,7 @@ package crunchrun
 
 import (
        "bytes"
+       "encoding/json"
        "fmt"
        "io"
        "io/ioutil"
@@ -13,9 +14,11 @@ import (
        "os/exec"
        "strings"
 
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        . "gopkg.in/check.v1"
 )
@@ -33,6 +36,9 @@ type integrationSuite struct {
        client *arvados.Client
        ac     *arvadosclient.ArvadosClient
        kc     *keepclient.KeepClient
+
+       logCollection    arvados.Collection
+       outputCollection arvados.Collection
 }
 
 func (s *integrationSuite) SetUpSuite(c *C) {
@@ -49,7 +55,12 @@ func (s *integrationSuite) SetUpSuite(c *C) {
        out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
        imageUUID := strings.TrimSpace(string(out))
        c.Logf("image uuid %s", imageUUID)
-       c.Assert(err, IsNil)
+       if !c.Check(err, IsNil) {
+               if err, ok := err.(*exec.ExitError); ok {
+                       c.Logf("%s", err.Stderr)
+               }
+               c.Fail()
+       }
        err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
        c.Assert(err, IsNil)
        c.Logf("image pdh %s", s.image.PortableDataHash)
@@ -79,6 +90,7 @@ func (s *integrationSuite) SetUpSuite(c *C) {
 }
 
 func (s *integrationSuite) TearDownSuite(c *C) {
+       os.Unsetenv("ARVADOS_KEEP_SERVICES")
        if s.client == nil {
                // didn't set up
                return
@@ -88,10 +100,13 @@ func (s *integrationSuite) TearDownSuite(c *C) {
 }
 
 func (s *integrationSuite) SetUpTest(c *C) {
+       os.Unsetenv("ARVADOS_KEEP_SERVICES")
        s.engine = "docker"
        s.stdin = bytes.Buffer{}
        s.stdout = bytes.Buffer{}
        s.stderr = bytes.Buffer{}
+       s.logCollection = arvados.Collection{}
+       s.outputCollection = arvados.Collection{}
        s.cr = arvados.ContainerRequest{
                Priority:       1,
                State:          "Committed",
@@ -150,17 +165,76 @@ func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
        s.testRunTrivialContainer(c)
 }
 
+func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
+       for _, trial := range []struct {
+               logConfig           string
+               matchGetReq         Checker
+               matchPutReq         Checker
+               matchStartupMessage Checker
+       }{
+               {"none", Not(Matches), Not(Matches), Not(Matches)},
+               {"all", Matches, Matches, Matches},
+               {"errors", Not(Matches), Not(Matches), Matches},
+       } {
+               c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
+               s.SetUpTest(c)
+
+               cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+               c.Assert(err, IsNil)
+               cluster, err := cfg.GetCluster("")
+               c.Assert(err, IsNil)
+               for uuid, volume := range cluster.Volumes {
+                       volume.AccessViaHosts = nil
+                       volume.Replication = 2
+                       cluster.Volumes[uuid] = volume
+               }
+               cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
+
+               s.stdin.Reset()
+               err = json.NewEncoder(&s.stdin).Encode(ConfigData{
+                       Env:         nil,
+                       KeepBuffers: 1,
+                       Cluster:     cluster,
+               })
+               c.Assert(err, IsNil)
+
+               s.engine = "docker"
+               s.testRunTrivialContainer(c)
+
+               fs, err := s.logCollection.FileSystem(s.client, s.kc)
+               c.Assert(err, IsNil)
+               f, err := fs.Open("keepstore.txt")
+               if trial.logConfig == "none" {
+                       c.Check(err, NotNil)
+                       c.Check(os.IsNotExist(err), Equals, true)
+               } else {
+                       c.Assert(err, IsNil)
+                       buf, err := ioutil.ReadAll(f)
+                       c.Assert(err, IsNil)
+                       c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+                       c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+               }
+       }
+}
+
 func (s *integrationSuite) testRunTrivialContainer(c *C) {
        if err := exec.Command("which", s.engine).Run(); err != nil {
                c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
        }
        s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
        s.setup(c)
-       code := command{}.RunCommand("crunch-run", []string{
+
+       args := []string{
                "-runtime-engine=" + s.engine,
                "-enable-memory-limit=false",
                s.cr.ContainerUUID,
-       }, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+       }
+       if s.stdin.Len() > 0 {
+               args = append([]string{"-stdin-config=true"}, args...)
+       }
+       code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+       c.Logf("\n===== stdout =====\n%s", s.stdout.String())
+       c.Logf("\n===== stderr =====\n%s", s.stderr.String())
        c.Check(code, Equals, 0)
        err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
        c.Assert(err, IsNil)
@@ -185,6 +259,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
                        c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
                }
        }
+       s.logCollection = log
 
        var output arvados.Collection
        err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
@@ -218,4 +293,5 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
                        c.Check(fi.Name(), Equals, ".keep")
                }
        }
+       s.outputCollection = output
 }
index 050894383d757a1e90e79999ef9fa8f2f08b9f01..76a55c4992bbd933085e83282391b3e3b241fb04 100644 (file)
@@ -7,6 +7,7 @@ package crunchrun
 import (
        "bufio"
        "bytes"
+       "encoding/json"
        "fmt"
        "io"
        "log"
@@ -404,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) {
        loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
 
 }
+
+type filterKeepstoreErrorsOnly struct {
+       io.WriteCloser
+       buf []byte
+}
+
+func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
+       log.Printf("filterKeepstoreErrorsOnly: write %q", p)
+       f.buf = append(f.buf, p...)
+       start := 0
+       for i := len(f.buf) - len(p); i < len(f.buf); i++ {
+               if f.buf[i] == '\n' {
+                       if f.check(f.buf[start:i]) {
+                               _, err := f.WriteCloser.Write(f.buf[start : i+1])
+                               if err != nil {
+                                       return 0, err
+                               }
+                       }
+                       start = i + 1
+               }
+       }
+       if start > 0 {
+               copy(f.buf, f.buf[start:])
+               f.buf = f.buf[:len(f.buf)-start]
+       }
+       return len(p), nil
+}
+
+func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
+       if len(line) == 0 {
+               return false
+       }
+       if line[0] != '{' {
+               return true
+       }
+       var m map[string]interface{}
+       err := json.Unmarshal(line, &m)
+       if err != nil {
+               return true
+       }
+       if m["msg"] == "request" {
+               return false
+       }
+       if m["msg"] == "response" {
+               if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
+                       return false
+               }
+       }
+       return true
+}
index 55460af379b3338423d9692e9a12dacc103a0591..fdd4f27b7f9af5463517e3658020795c75ceb8d5 100644 (file)
@@ -5,7 +5,9 @@
 package crunchrun
 
 import (
+       "bytes"
        "fmt"
+       "io"
        "strings"
        "testing"
        "time"
@@ -13,6 +15,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        . "gopkg.in/check.v1"
+       check "gopkg.in/check.v1"
 )
 
 type LoggingTestSuite struct {
@@ -219,3 +222,34 @@ func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string
        c.Check(true, Equals, strings.Contains(stderrLog, expected))
        c.Check(string(kc.Content), Equals, logtext)
 }
+
+type filterSuite struct{}
+
+var _ = Suite(&filterSuite{})
+
+func (*filterSuite) TestFilterKeepstoreErrorsOnly(c *check.C) {
+       var buf bytes.Buffer
+       f := filterKeepstoreErrorsOnly{WriteCloser: nopCloser{&buf}}
+       for _, s := range []string{
+               "not j",
+               "son\n" + `{"msg":"foo"}` + "\n{}\n" + `{"msg":"request"}` + "\n" + `{"msg":1234}` + "\n\n",
+               "\n[\n",
+               `{"msg":"response","respStatusCode":404,"foo": "bar"}` + "\n",
+               `{"msg":"response","respStatusCode":206}` + "\n",
+       } {
+               f.Write([]byte(s))
+       }
+       c.Check(buf.String(), check.Equals, `not json
+{"msg":"foo"}
+{}
+{"msg":1234}
+[
+{"msg":"response","respStatusCode":404,"foo": "bar"}
+`)
+}
+
+type nopCloser struct {
+       io.Writer
+}
+
+func (nopCloser) Close() error { return nil }
index 7e8ce0bf4206834c80a176a7411ea614c2e1e3f7..1b10826cbb6227a589d0c74217c808c59c220c8b 100644 (file)
@@ -97,6 +97,7 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
 
        needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
        needRAM += int64(cc.Containers.ReserveExtraRAM)
+       needRAM += int64(cc.Containers.LocalKeepBlobBuffersPerVCPU * needVCPUs * (1 << 26))
        needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
 
        ok := false
index 1b31a71a264fabf865f981f5f94eab1649847ac4..f57db0f09fdeed0ee9d3fe597ad3215e3d996463 100644 (file)
@@ -18,6 +18,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
@@ -193,7 +194,7 @@ type StubVM struct {
        ArvMountDeadlockRate  float64
        ExecuteContainer      func(arvados.Container) int
        CrashRunningContainer func(arvados.Container)
-       ExtraCrunchRunArgs    string // extra args expected after "crunch-run --detach --stdin-env "
+       ExtraCrunchRunArgs    string // extra args expected after "crunch-run --detach --stdin-config "
 
        sis          *StubInstanceSet
        id           cloud.InstanceID
@@ -252,15 +253,15 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                fmt.Fprint(stderr, "crunch-run: command not found\n")
                return 1
        }
-       if strings.HasPrefix(command, "crunch-run --detach --stdin-env "+svm.ExtraCrunchRunArgs) {
-               var stdinKV map[string]string
-               err := json.Unmarshal(stdinData, &stdinKV)
+       if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
+               var configData crunchrun.ConfigData
+               err := json.Unmarshal(stdinData, &configData)
                if err != nil {
                        fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
                        return 1
                }
                for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
-                       if stdinKV[name] == "" {
+                       if configData.Env[name] == "" {
                                fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
                                return 1
                        }
index a5924cf997f7b4bc9d838134054be58b3bd25127..37e3fa9882ddec9ad9a0623ad62f9b3d75d94433 100644 (file)
@@ -103,6 +103,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                instanceSetID:                  instanceSetID,
                instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:                    newExecutor,
+               cluster:                        cluster,
                bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
                runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
                imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
@@ -144,6 +145,7 @@ type Pool struct {
        instanceSetID                  cloud.InstanceSetID
        instanceSet                    *throttledInstanceSet
        newExecutor                    func(cloud.Instance) Executor
+       cluster                        *arvados.Cluster
        bootProbeCommand               string
        runnerSource                   string
        imageID                        cloud.ImageID
index 0f5c5ee196d2866269f2bb999292e8a9672c3e47..7b5634605fee5c20b987c06078eb78b0dc6841b6 100644 (file)
@@ -10,10 +10,12 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/dispatchcloud/test"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
@@ -31,7 +33,18 @@ func (*lessChecker) Check(params []interface{}, names []string) (result bool, er
 
 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
 
-type PoolSuite struct{}
+type PoolSuite struct {
+       logger      logrus.FieldLogger
+       testCluster *arvados.Cluster
+}
+
+func (suite *PoolSuite) SetUpTest(c *check.C) {
+       suite.logger = ctxlog.TestLogger(c)
+       cfg, err := config.NewLoader(nil, suite.logger).Load()
+       c.Assert(err, check.IsNil)
+       suite.testCluster, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+}
 
 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        type1 := test.InstanceType(1)
@@ -63,10 +76,9 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
 
-       logger := ctxlog.TestLogger(c)
        driver := &test.StubDriver{}
        instanceSetID := cloud.InstanceSetID("test-instance-set-id")
-       is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+       is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        newExecutor := func(cloud.Instance) Executor {
@@ -78,25 +90,21 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
 
-       cluster := &arvados.Cluster{
-               Containers: arvados.ContainersConfig{
-                       CloudVMs: arvados.CloudVMsConfig{
-                               BootProbeCommand:   "true",
-                               MaxProbesPerSecond: 1000,
-                               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
-                               SyncInterval:       arvados.Duration(time.Millisecond * 10),
-                               TagKeyPrefix:       "testprefix:",
-                       },
-                       CrunchRunCommand: "crunch-run-custom",
-               },
-               InstanceTypes: arvados.InstanceTypeMap{
-                       type1.Name: type1,
-                       type2.Name: type2,
-                       type3.Name: type3,
-               },
+       suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
+               BootProbeCommand:   "true",
+               MaxProbesPerSecond: 1000,
+               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
+               SyncInterval:       arvados.Duration(time.Millisecond * 10),
+               TagKeyPrefix:       "testprefix:",
+       }
+       suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
+       suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
+               type1.Name: type1,
+               type2.Name: type2,
+               type3.Name: type3,
        }
 
-       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+       pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
        pool.Create(type1)
@@ -111,7 +119,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
        // Wait for the tags to save to the cloud provider
-       tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+       tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
        deadline := time.Now().Add(time.Second)
        for !func() bool {
                pool.mtx.RLock()
@@ -132,7 +140,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 
        c.Log("------- starting new pool, waiting to recover state")
 
-       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+       pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
        notify2 := pool2.Subscribe()
        defer pool2.Unsubscribe(notify2)
        waitForIdle(pool2, notify2)
@@ -148,9 +156,8 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 }
 
 func (suite *PoolSuite) TestDrain(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{}
-       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        ac := arvados.NewClientFromEnv()
@@ -158,8 +165,9 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
        type1 := test.InstanceType(1)
        pool := &Pool{
                arvClient:   ac,
-               logger:      logger,
+               logger:      suite.logger,
                newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               cluster:     suite.testCluster,
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
@@ -201,15 +209,15 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
 }
 
 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}
-       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        type1 := test.InstanceType(1)
        pool := &Pool{
-               logger:                         logger,
+               logger:                         suite.logger,
                instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
+               cluster:                        suite.testCluster,
                maxConcurrentInstanceCreateOps: 1,
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
@@ -241,17 +249,17 @@ func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
 }
 
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}
-       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
        type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
        type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
-               logger:      logger,
+               logger:      suite.logger,
                newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               cluster:     suite.testCluster,
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
index 63561874c9c5e570187048922addbbc4e4ece502..29c4b8e0a36a3be2a721e1bc509335817e86842c 100644 (file)
@@ -13,6 +13,7 @@ import (
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/crunchrun"
        "github.com/sirupsen/logrus"
 )
 
@@ -21,7 +22,7 @@ import (
 type remoteRunner struct {
        uuid          string
        executor      Executor
-       envJSON       json.RawMessage
+       configJSON    json.RawMessage
        runnerCmd     string
        runnerArgs    []string
        remoteUser    string
@@ -47,7 +48,8 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
        if err := enc.Encode(wkr.instType); err != nil {
                panic(err)
        }
-       env := map[string]string{
+       var configData crunchrun.ConfigData
+       configData.Env = map[string]string{
                "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
                "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
                "InstanceType":      instJSON.String(),
@@ -55,16 +57,20 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
                "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
        }
        if wkr.wp.arvClient.Insecure {
-               env["ARVADOS_API_HOST_INSECURE"] = "1"
+               configData.Env["ARVADOS_API_HOST_INSECURE"] = "1"
        }
-       envJSON, err := json.Marshal(env)
+       if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 {
+               configData.Cluster = wkr.wp.cluster
+               configData.KeepBuffers = bufs * wkr.instType.VCPUs
+       }
+       configJSON, err := json.Marshal(configData)
        if err != nil {
                panic(err)
        }
        rr := &remoteRunner{
                uuid:          uuid,
                executor:      wkr.executor,
-               envJSON:       envJSON,
+               configJSON:    configJSON,
                runnerCmd:     wkr.wp.runnerCmd,
                runnerArgs:    wkr.wp.runnerArgs,
                remoteUser:    wkr.instance.RemoteUser(),
@@ -84,7 +90,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-       cmd := rr.runnerCmd + " --detach --stdin-env"
+       cmd := rr.runnerCmd + " --detach --stdin-config"
        for _, arg := range rr.runnerArgs {
                cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
        }
@@ -92,7 +98,7 @@ func (rr *remoteRunner) Start() {
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
-       stdin := bytes.NewBuffer(rr.envJSON)
+       stdin := bytes.NewBuffer(rr.configJSON)
        stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
        if err != nil {
                rr.logger.WithField("stdout", string(stdout)).
index 4134788b2e27b00544151b857282d376c57a0ccf..2ee6b7c3622d66a5b85299826b035fa47ce97d26 100644 (file)
@@ -14,24 +14,36 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/dispatchcloud/test"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
 var _ = check.Suite(&WorkerSuite{})
 
-type WorkerSuite struct{}
+type WorkerSuite struct {
+       logger      logrus.FieldLogger
+       testCluster *arvados.Cluster
+}
+
+func (suite *WorkerSuite) SetUpTest(c *check.C) {
+       suite.logger = ctxlog.TestLogger(c)
+       cfg, err := config.NewLoader(nil, suite.logger).Load()
+       c.Assert(err, check.IsNil)
+       suite.testCluster, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+}
 
 func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        bootTimeout := time.Minute
        probeTimeout := time.Second
 
        ac := arvados.NewClientFromEnv()
-       is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
+       is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
        inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
        c.Assert(err, check.IsNil)
@@ -232,6 +244,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                wp := &Pool{
                        arvClient:        ac,
                        newExecutor:      func(cloud.Instance) Executor { return exr },
+                       cluster:          suite.testCluster,
                        bootProbeCommand: "bootprobe",
                        timeoutBooting:   bootTimeout,
                        timeoutProbe:     probeTimeout,
@@ -249,7 +262,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
                }
                wkr := &worker{
-                       logger:   logger,
+                       logger:   suite.logger,
                        executor: exr,
                        wp:       wp,
                        mtx:      &wp.mtx,
index f1d27b8dcdc5f2b3d79f82273ee38b278b089cb8..e736f79fd7f2bcee24e449596b4950bd279881bc 100644 (file)
@@ -435,6 +435,8 @@ type ContainersConfig struct {
        SupportedDockerImageFormats StringSet
        UsePreemptibleInstances     bool
        RuntimeEngine               string
+       LocalKeepBlobBuffersPerVCPU int
+       LocalKeepLogsToContainerLog string
 
        JobsAPI struct {
                Enable         string
index 2e862d3ae6ba047584f7650af29fa4a44d1af107..366c03e309ca6f54c0ef5bd2ad9f3aa331c592ea 100644 (file)
@@ -300,6 +300,12 @@ SELECT target_uuid, perm_level
     Link.where(link_class: 'signature',
                      tail_uuid: self.uuid).destroy_all
 
+    # delete tokens for this user
+    ApiClientAuthorization.where(user_id: self.id).destroy_all
+    # delete ssh keys for this user
+    AuthorizedKey.where(owner_uuid: self.uuid).destroy_all
+    AuthorizedKey.where(authorized_user_uuid: self.uuid).destroy_all
+
     # delete user preferences (including profile)
     self.prefs = {}
 
diff --git a/services/api/db/migrate/20211027154300_delete_disabled_user_tokens_and_keys.rb b/services/api/db/migrate/20211027154300_delete_disabled_user_tokens_and_keys.rb
new file mode 100644 (file)
index 0000000..df3db6f
--- /dev/null
@@ -0,0 +1,15 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class DeleteDisabledUserTokensAndKeys < ActiveRecord::Migration[5.2]
+  def up
+    execute "delete from api_client_authorizations where user_id in (select id from users where is_active ='false' and uuid not like '%-tpzed-anonymouspublic' and uuid not like '%-tpzed-000000000000000')"
+    execute "delete from authorized_keys where owner_uuid in (select uuid from users where is_active ='false' and uuid not like '%-tpzed-anonymouspublic' and uuid not like '%-tpzed-000000000000000')"
+    execute "delete from authorized_keys where authorized_user_uuid in (select uuid from users where is_active ='false' and uuid not like '%-tpzed-anonymouspublic' and uuid not like '%-tpzed-000000000000000')"
+  end
+
+  def down
+    # This migration is not reversible.
+  end
+end
index 2f7748335694310b09de911104a446ce54885093..da9959593c0a28853831d3cc6bd464be28bdbe07 100644 (file)
@@ -3146,6 +3146,7 @@ INSERT INTO "schema_migrations" (version) VALUES
 ('20210108033940'),
 ('20210126183521'),
 ('20210621204455'),
-('20210816191509');
+('20210816191509'),
+('20211027154300');
 
 
index b24ddc5a52c02c495b14f9871763411fa4ccbea8..81168e15b7b43b134036717bbe9f427b9f0de0be 100644 (file)
@@ -198,6 +198,13 @@ class UsersTest < ActionDispatch::IntegrationTest
 
     verify_link_existence created['uuid'], created['email'], true, true, true, true, false
 
+    # create a token
+    token = act_as_system_user do
+      ApiClientAuthorization.create!(user: User.find_by_uuid(created['uuid']), api_client: ApiClient.all.first).api_token
+    end
+
+    assert_equal 1, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'expected token not found'
+
     post "/arvados/v1/users/#{created['uuid']}/unsetup", params: {}, headers: auth(:admin)
 
     assert_response :success
@@ -205,6 +212,7 @@ class UsersTest < ActionDispatch::IntegrationTest
     created2 = json_response
     assert_not_nil created2['uuid'], 'expected uuid for the newly created user'
     assert_equal created['uuid'], created2['uuid'], 'expected uuid not found'
+    assert_equal 0, ApiClientAuthorization.where(user_id: User.find_by_uuid(created['uuid']).id).size, 'token should have been deleted by user unsetup'
 
     verify_link_existence created['uuid'], created['email'], false, false, false, false, false
   end
index 55c122b0ff7123ba4eb1c670900a5d3c6b5c4dba..6412789ff9f47c7a2c017a79bf31c41ccd926f1a 100644 (file)
@@ -1202,7 +1202,6 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, h *handler, re
        c.Check(err, check.IsNil)
        c.Check(logentries.Items, check.HasLen, 1)
        lastLogId := logentries.Items[0].ID
-       nextLogId := lastLogId
 
        var logbuf bytes.Buffer
        logger := logrus.New()
@@ -1216,24 +1215,29 @@ func (s *IntegrationSuite) checkUploadDownloadRequest(c *check.C, h *handler, re
                c.Check(logbuf.String(), check.Matches, `(?ms).*msg="File `+direction+`".*`)
                c.Check(logbuf.String(), check.Not(check.Matches), `(?ms).*level=error.*`)
 
-               count := 0
-               for ; nextLogId == lastLogId && count < 20; count++ {
-                       time.Sleep(50 * time.Millisecond)
+               deadline := time.Now().Add(time.Second)
+               for {
+                       c.Assert(time.Now().After(deadline), check.Equals, false, check.Commentf("timed out waiting for log entry"))
                        err = client.RequestAndDecode(&logentries, "GET", "arvados/v1/logs", nil,
                                arvados.ResourceListParams{
-                                       Filters: []arvados.Filter{arvados.Filter{Attr: "event_type", Operator: "=", Operand: "file_" + direction}},
-                                       Limit:   &limit1,
-                                       Order:   "created_at desc",
+                                       Filters: []arvados.Filter{
+                                               {Attr: "event_type", Operator: "=", Operand: "file_" + direction},
+                                               {Attr: "object_uuid", Operator: "=", Operand: userUuid},
+                                       },
+                                       Limit: &limit1,
+                                       Order: "created_at desc",
                                })
-                       c.Check(err, check.IsNil)
-                       if len(logentries.Items) > 0 {
-                               nextLogId = logentries.Items[0].ID
+                       c.Assert(err, check.IsNil)
+                       if len(logentries.Items) > 0 &&
+                               logentries.Items[0].ID > lastLogId &&
+                               logentries.Items[0].ObjectUUID == userUuid &&
+                               logentries.Items[0].Properties["collection_uuid"] == collectionUuid &&
+                               logentries.Items[0].Properties["collection_file_path"] == filepath {
+                               break
                        }
+                       c.Logf("logentries.Items: %+v", logentries.Items)
+                       time.Sleep(50 * time.Millisecond)
                }
-               c.Check(count, check.Not(check.Equals), 20)
-               c.Check(logentries.Items[0].ObjectUUID, check.Equals, userUuid)
-               c.Check(logentries.Items[0].Properties["collection_uuid"], check.Equals, collectionUuid)
-               c.Check(logentries.Items[0].Properties["collection_file_path"], check.Equals, filepath)
        } else {
                c.Check(resp.Result().StatusCode, check.Equals, http.StatusForbidden)
                c.Check(logbuf.String(), check.Equals, "")
index cf655c2a5a96d9ca92194321ad813fd5712ab523..f9b383e70e5a1d531c414f7c180e1d623e7c55b2 100644 (file)
@@ -558,6 +558,9 @@ func (v *AzureBlobVolume) translateError(err error) error {
        case strings.Contains(err.Error(), "Not Found"):
                // "storage: service returned without a response body (404 Not Found)"
                return os.ErrNotExist
+       case strings.Contains(err.Error(), "ErrorCode=BlobNotFound"):
+               // "storage: service returned error: StatusCode=404, ErrorCode=BlobNotFound, ErrorMessage=The specified blob does not exist.\n..."
+               return os.ErrNotExist
        default:
                return err
        }
index cbb7f38bb14100468710b74f68d28e6513b6f4d7..d545bde0ab9193151b655328d3323f01932a2b19 100644 (file)
@@ -1154,15 +1154,6 @@ func (s *HandlerSuite) TestPutHandlerNoBufferleak(c *check.C) {
        }
 }
 
-type notifyingResponseRecorder struct {
-       *httptest.ResponseRecorder
-       closer chan bool
-}
-
-func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
-       return r.closer
-}
-
 func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
        s.cluster.Collections.BlobSigning = false
        c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
@@ -1173,23 +1164,15 @@ func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
        bufs = newBufferPool(ctxlog.TestLogger(c), 1, BlockSize)
        defer bufs.Put(bufs.Get(BlockSize))
 
-       if err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
-               c.Error(err)
-       }
-
-       resp := &notifyingResponseRecorder{
-               ResponseRecorder: httptest.NewRecorder(),
-               closer:           make(chan bool, 1),
-       }
-       if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
-               c.Fatal("notifyingResponseRecorder is broken")
-       }
-       // If anyone asks, the client has disconnected.
-       resp.closer <- true
+       err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock)
+       c.Assert(err, check.IsNil)
 
+       resp := httptest.NewRecorder()
        ok := make(chan struct{})
        go func() {
-               req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+               ctx, cancel := context.WithCancel(context.Background())
+               req, _ := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+               cancel()
                s.handler.ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
@@ -1200,7 +1183,7 @@ func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
        case <-ok:
        }
 
-       ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+       ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp)
        for i, v := range s.handler.volmgr.AllWritable() {
                if calls := v.Volume.(*MockVolume).called["GET"]; calls != 0 {
                        c.Errorf("volume %d got %d calls, expected 0", i, calls)
index 29e7b2ca9c499e0b5eae3461084775ef7af40506..2a90705a56cbf0c1eec78a03dcd5b45463f84851 100644 (file)
@@ -112,12 +112,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 }
 
 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        locator := req.URL.Path[1:]
        if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
-               rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+               rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
                return
        }
 
@@ -136,14 +133,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        // isn't here, we can return 404 now instead of waiting for a
        // buffer.
 
-       buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+       buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
        }
        defer bufs.Put(buf)
 
-       size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+       size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
        if err != nil {
                code := http.StatusInternalServerError
                if err, ok := err.(*KeepError); ok {
@@ -158,21 +155,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        resp.Write(buf[:size])
 }
 
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
-       ctx, cancel := context.WithCancel(parent)
-       if cn, ok := resp.(http.CloseNotifier); ok {
-               go func(c <-chan bool) {
-                       select {
-                       case <-c:
-                               cancel()
-                       case <-ctx.Done():
-                       }
-               }(cn.CloseNotify())
-       }
-       return ctx, cancel
-}
-
 // Get a buffer from the pool -- but give up and return a non-nil
 // error if ctx ends before we get a buffer.
 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
@@ -223,9 +205,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
 }
 
 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        hash := mux.Vars(req)["hash"]
 
        // Detect as many error conditions as possible before reading
@@ -262,7 +241,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+       buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
@@ -275,7 +254,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+       result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
        bufs.Put(buf)
 
        if err != nil {