16347: Merge branch 'main'
authorTom Clegg <tom@curii.com>
Tue, 19 Oct 2021 13:21:44 +0000 (09:21 -0400)
committerTom Clegg <tom@curii.com>
Tue, 19 Oct 2021 13:21:44 +0000 (09:21 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

19 files changed:
lib/config/config.default.yml
lib/config/export.go
lib/config/generated_config.go
lib/config/load.go
lib/crunchrun/background.go
lib/crunchrun/bufthenwrite.go [new file with mode: 0644]
lib/crunchrun/crunchrun.go
lib/crunchrun/integration_test.go
lib/crunchrun/logging.go
lib/crunchrun/logging_test.go
lib/dispatchcloud/test/stub_driver.go
lib/dispatchcloud/worker/pool.go
lib/dispatchcloud/worker/pool_test.go
lib/dispatchcloud/worker/runner.go
lib/dispatchcloud/worker/worker_test.go
sdk/go/arvados/config.go
services/keepstore/azure_blob_volume.go
services/keepstore/handler_test.go
services/keepstore/handlers.go

index 4e2a0e26d4bb599314e6f3292aeaf6cc1b2fef85..c863bbcbcea8b6df3146e201d52038ed9fa5ee87 100644 (file)
@@ -911,6 +911,42 @@ Clusters:
       # Container runtime: "docker" (default) or "singularity"
       RuntimeEngine: docker
 
+      # When running a container, run a dedicated keepstore process,
+      # using the specified number of 64 MiB memory buffers per
+      # allocated CPU core (VCPUs in the container's runtime
+      # constraints). The dedicated keepstore handles I/O for
+      # collections mounted in the container, as well as saving
+      # container logs.
+      #
+      # A zero value disables this feature.
+      #
+      # In order for this feature to be activated, no volume may use
+      # AccessViaHosts, and each volume must have Replication higher
+      # than Collections.DefaultReplication. If these requirements are
+      # not satisfied, the feature is disabled automatically
+      # regardless of the value given here.
+      #
+      # Note that when this configuration is enabled, the entire
+      # cluster configuration file, including the system root token,
+      # is copied to the worker node and held in memory for the
+      # duration of the container.
+      LocalKeepBlobBuffersPerVCPU: 1
+
+      # When running a dedicated keepstore process for a container
+      # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+      # messages to keepstore.txt in the container's log collection.
+      #
+      # These log messages can reveal some volume configuration
+      # details, error messages from the cloud storage provider, etc.,
+      # which are not otherwise visible to users.
+      #
+      # Accepted values:
+      # * "none" -- no keepstore.txt file
+      # * "all" -- all logs, including request and response lines
+      # * "errors" -- all logs except "response" logs with 2xx
+      #   response codes and "request" logs
+      LocalKeepLogsToContainerLog: none
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
index 92e2d7b4d522e2b1a07b8a5602f3318f04720645..e36d6e76cae40d17c54217e643bf82341e298b87 100644 (file)
@@ -119,6 +119,8 @@ var whitelist = map[string]bool{
        "Containers.JobsAPI":                                  true,
        "Containers.JobsAPI.Enable":                           true,
        "Containers.JobsAPI.GitInternalDir":                   false,
+       "Containers.LocalKeepBlobBuffersPerVCPU":              false,
+       "Containers.LocalKeepLogsToContainerLog":              false,
        "Containers.Logging":                                  false,
        "Containers.LogReuseDecisions":                        false,
        "Containers.LSF":                                      false,
index 875939a3e191731d33892127f5376ed93c18af78..4742c640587c4cbbda4b477f35b3cfbb2934587b 100644 (file)
@@ -917,6 +917,42 @@ Clusters:
       # Container runtime: "docker" (default) or "singularity"
       RuntimeEngine: docker
 
+      # When running a container, run a dedicated keepstore process,
+      # using the specified number of 64 MiB memory buffers per
+      # allocated CPU core (VCPUs in the container's runtime
+      # constraints). The dedicated keepstore handles I/O for
+      # collections mounted in the container, as well as saving
+      # container logs.
+      #
+      # A zero value disables this feature.
+      #
+      # In order for this feature to be activated, no volume may use
+      # AccessViaHosts, and each volume must have Replication higher
+      # than Collections.DefaultReplication. If these requirements are
+      # not satisfied, the feature is disabled automatically
+      # regardless of the value given here.
+      #
+      # Note that when this configuration is enabled, the entire
+      # cluster configuration file, including the system root token,
+      # is copied to the worker node and held in memory for the
+      # duration of the container.
+      LocalKeepBlobBuffersPerVCPU: 1
+
+      # When running a dedicated keepstore process for a container
+      # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+      # messages to keepstore.txt in the container's log collection.
+      #
+      # These log messages can reveal some volume configuration
+      # details, error messages from the cloud storage provider, etc.,
+      # which are not otherwise visible to users.
+      #
+      # Accepted values:
+      # * "none" -- no keepstore.txt file
+      # * "all" -- all logs, including request and response lines
+      # * "errors" -- all logs except "response" logs with 2xx
+      #   response codes and "request" logs
+      LocalKeepLogsToContainerLog: none
+
       Logging:
         # When you run the db:delete_old_container_logs task, it will find
         # containers that have been finished for at least this many seconds,
index 248960beb99f875620dca5ee7738f8575877c57d..956a47b1a4ac2ef992958739d5189eaf5e519ed5 100644 (file)
@@ -295,6 +295,7 @@ func (ldr *Loader) Load() (*arvados.Config, error) {
                        ldr.checkToken(fmt.Sprintf("Clusters.%s.SystemRootToken", id), cc.SystemRootToken),
                        ldr.checkToken(fmt.Sprintf("Clusters.%s.Collections.BlobSigningKey", id), cc.Collections.BlobSigningKey),
                        checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
+                       ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"),
                        ldr.checkEmptyKeepstores(cc),
                        ldr.checkUnlistedKeepstores(cc),
                        ldr.checkStorageClasses(cc),
@@ -338,6 +339,15 @@ func (ldr *Loader) checkToken(label, token string) error {
        return nil
 }
 
+func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
+       for _, s := range accepted {
+               if s == value {
+                       return nil
+               }
+       }
+       return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
+}
+
 func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
 cluster:
        for id, cc := range cfg.Clusters {
index 4bb249380fd98b2e340bc4bd3bacb2b78d5f0a47..8a919bc5e2ba3080283d39a9f9784b1eddbec37d 100644 (file)
@@ -36,10 +36,10 @@ type procinfo struct {
 //
 // Stdout and stderr in the child process are sent to the systemd
 // journal using the systemd-cat program.
-func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
-       return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
 }
-func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
        lockfile, err := func() (*os.File, error) {
                // We must hold the dir-level lock between
                // opening/creating the lockfile and acquiring LOCK_EX
@@ -77,20 +77,24 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e
                // invoked as "/path/to/crunch-run"
                execargs = append([]string{prog}, execargs...)
        }
-       execargs = append([]string{
-               // Here, if the inner systemd-cat can't exec
-               // crunch-run, it writes an error message to stderr,
-               // and the outer systemd-cat writes it to the journal
-               // where the operator has a chance to discover it. (If
-               // we only used one systemd-cat command, it would be
-               // up to us to report the error -- but we are going to
-               // detach and exit, not wait for something to appear
-               // on stderr.)  Note these systemd-cat calls don't
-               // result in additional processes -- they just connect
-               // stderr/stdout to sockets and call exec().
-               "systemd-cat", "--identifier=crunch-run",
-               "systemd-cat", "--identifier=crunch-run",
-       }, execargs...)
+       if _, err := exec.LookPath("systemd-cat"); err == nil {
+               execargs = append([]string{
+                       // Here, if the inner systemd-cat can't exec
+                       // crunch-run, it writes an error message to
+                       // stderr, and the outer systemd-cat writes it
+                       // to the journal where the operator has a
+                       // chance to discover it. (If we only used one
+                       // systemd-cat command, it would be up to us
+                       // to report the error -- but we are going to
+                       // detach and exit, not wait for something to
+                       // appear on stderr.)  Note these systemd-cat
+                       // calls don't result in additional processes
+                       // -- they just connect stderr/stdout to
+                       // sockets and call exec().
+                       "systemd-cat", "--identifier=crunch-run",
+                       "systemd-cat", "--identifier=crunch-run",
+               }, execargs...)
+       }
 
        cmd := exec.Command(execargs[0], execargs[1:]...)
        // Child inherits lockfile.
@@ -99,10 +103,26 @@ func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) e
        // from parent (sshd) while sending lockfile content to
        // caller.
        cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+       // We need to manage our own OS pipe here to ensure the child
+       // process reads all of our stdin pipe before we return.
+       piper, pipew, err := os.Pipe()
+       if err != nil {
+               return err
+       }
+       defer pipew.Close()
+       cmd.Stdin = piper
        err = cmd.Start()
        if err != nil {
                return fmt.Errorf("exec %s: %s", cmd.Path, err)
        }
+       _, err = io.Copy(pipew, stdin)
+       if err != nil {
+               return err
+       }
+       err = pipew.Close()
+       if err != nil {
+               return err
+       }
 
        w := io.MultiWriter(stdout, lockfile)
        return json.NewEncoder(w).Encode(procinfo{
diff --git a/lib/crunchrun/bufthenwrite.go b/lib/crunchrun/bufthenwrite.go
new file mode 100644 (file)
index 0000000..2d1c407
--- /dev/null
@@ -0,0 +1,34 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+       "bytes"
+       "io"
+       "sync"
+)
+
+type bufThenWrite struct {
+       buf bytes.Buffer
+       w   io.Writer
+       mtx sync.Mutex
+}
+
+func (btw *bufThenWrite) SetWriter(w io.Writer) error {
+       btw.mtx.Lock()
+       defer btw.mtx.Unlock()
+       btw.w = w
+       _, err := io.Copy(w, &btw.buf)
+       return err
+}
+
+func (btw *bufThenWrite) Write(p []byte) (int, error) {
+       btw.mtx.Lock()
+       defer btw.mtx.Unlock()
+       if btw.w == nil {
+               btw.w = &btw.buf
+       }
+       return btw.w.Write(p)
+}
index 42f143f1cb8fe6bb97bfff2511e9f318f37359af..ba5673f917a54267916dddcb360f1b38cf548238 100644 (file)
@@ -6,6 +6,7 @@ package crunchrun
 
 import (
        "bytes"
+       "context"
        "encoding/json"
        "errors"
        "flag"
@@ -13,6 +14,8 @@ import (
        "io"
        "io/ioutil"
        "log"
+       "net"
+       "net/http"
        "os"
        "os/exec"
        "os/signal"
@@ -33,13 +36,20 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
-       "golang.org/x/net/context"
 )
 
 type command struct{}
 
 var Command = command{}
 
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+       Env         map[string]string
+       KeepBuffers int
+       Cluster     *arvados.Cluster
+}
+
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
 type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@ -127,6 +137,8 @@ type ContainerRunner struct {
        finalState    string
        parentTemp    string
 
+       keepstoreLogger  io.WriteCloser
+       keepstoreLogbuf  *bufThenWrite
        statLogger       io.WriteCloser
        statReporter     *crunchstat.Reporter
        hoststatLogger   io.WriteCloser
@@ -1267,6 +1279,16 @@ func (runner *ContainerRunner) CommitLogs() error {
                runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
        }()
 
+       if runner.keepstoreLogger != nil {
+               // Flush any buffered logs from our local keepstore
+               // process.  Discard anything logged after this point
+               // -- it won't end up in the log collection, so
+               // there's no point writing it to the collectionfs.
+               runner.keepstoreLogbuf.SetWriter(io.Discard)
+               runner.keepstoreLogger.Close()
+               runner.keepstoreLogger = nil
+       }
+
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
                // we must be closing the re-opened log, which won't
@@ -1275,6 +1297,7 @@ func (runner *ContainerRunner) CommitLogs() error {
                // -- it exists only to send logs to other channels.
                return nil
        }
+
        saved, err := runner.saveLogCollection(true)
        if err != nil {
                return fmt.Errorf("error saving log collection: %s", err)
@@ -1637,6 +1660,7 @@ func NewContainerRunner(dispatcherClient *arvados.Client,
 }
 
 func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+       log := log.New(stderr, "", 0)
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
        cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
@@ -1644,7 +1668,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
        caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
        detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
-       stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+       stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1674,33 +1698,45 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
-       if *stdinEnv && !ignoreDetachFlag {
-               // Load env vars on stdin if asked (but not in a
-               // detached child process, in which case stdin is
-               // /dev/null).
-               err := loadEnv(os.Stdin)
-               if err != nil {
-                       log.Print(err)
-                       return 1
-               }
-       }
-
        containerUUID := flags.Arg(0)
 
        switch {
        case *detach && !ignoreDetachFlag:
-               return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
+               return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
        case *kill >= 0:
                return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
        case *list:
                return ListProcesses(os.Stdout, os.Stderr)
        }
 
-       if containerUUID == "" {
+       if len(containerUUID) != 27 {
                log.Printf("usage: %s [options] UUID", prog)
                return 1
        }
 
+       var conf ConfigData
+       if *stdinConfig {
+               err := json.NewDecoder(stdin).Decode(&conf)
+               if err != nil {
+                       log.Printf("decode stdin: %s", err)
+                       return 1
+               }
+               for k, v := range conf.Env {
+                       err = os.Setenv(k, v)
+                       if err != nil {
+                               log.Printf("setenv(%q): %s", k, err)
+                               return 1
+                       }
+               }
+               if conf.Cluster != nil {
+                       // ClusterID is missing from the JSON
+                       // representation, but we need it to generate
+                       // a valid config file for keepstore, so we
+                       // fill it using the container UUID prefix.
+                       conf.Cluster.ClusterID = containerUUID[:5]
+               }
+       }
+
        log.Printf("crunch-run %s started", cmd.Version.String())
        time.Sleep(*sleep)
 
@@ -1708,6 +1744,16 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
+       var keepstoreLogbuf bufThenWrite
+       keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
+       if err != nil {
+               log.Print(err)
+               return 1
+       }
+       if keepstore != nil {
+               defer keepstore.Process.Kill()
+       }
+
        api, err := arvadosclient.MakeArvadosClient()
        if err != nil {
                log.Printf("%s: %v", containerUUID, err)
@@ -1715,9 +1761,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        }
        api.Retries = 8
 
-       kc, kcerr := keepclient.MakeKeepClient(api)
-       if kcerr != nil {
-               log.Printf("%s: %v", containerUUID, kcerr)
+       kc, err := keepclient.MakeKeepClient(api)
+       if err != nil {
+               log.Printf("%s: %v", containerUUID, err)
                return 1
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
@@ -1729,6 +1775,43 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       if keepstore == nil {
+               // Log explanation (if any) for why we're not running
+               // a local keepstore.
+               var buf bytes.Buffer
+               keepstoreLogbuf.SetWriter(&buf)
+               if buf.Len() > 0 {
+                       cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
+               }
+       } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+               cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+               keepstoreLogbuf.SetWriter(io.Discard)
+       } else {
+               cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+               logwriter, err := cr.NewLogWriter("keepstore")
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+               var writer io.WriteCloser = cr.keepstoreLogger
+               if logWhat == "errors" {
+                       writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+               } else if logWhat != "all" {
+                       // should have been caught earlier by
+                       // dispatcher's config loader
+                       log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+                       return 1
+               }
+               err = keepstoreLogbuf.SetWriter(writer)
+               if err != nil {
+                       log.Print(err)
+                       return 1
+               }
+               cr.keepstoreLogbuf = &keepstoreLogbuf
+       }
+
        switch *runtimeEngine {
        case "docker":
                cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
@@ -1816,21 +1899,98 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        return 0
 }
 
-func loadEnv(rdr io.Reader) error {
-       buf, err := ioutil.ReadAll(rdr)
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+       if configData.Cluster == nil || configData.KeepBuffers < 1 {
+               return nil, nil
+       }
+       for uuid, vol := range configData.Cluster.Volumes {
+               if len(vol.AccessViaHosts) > 0 {
+                       fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+                       return nil, nil
+               }
+               if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+                       fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+                       return nil, nil
+               }
+       }
+
+       // Rather than have an alternate way to tell keepstore how
+       // many buffers to use when starting it this way, we just
+       // modify the cluster configuration that we feed it on stdin.
+       configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+       ln, err := net.Listen("tcp", "localhost:0")
+       if err != nil {
+               return nil, err
+       }
+       _, port, err := net.SplitHostPort(ln.Addr().String())
+       if err != nil {
+               ln.Close()
+               return nil, err
+       }
+       ln.Close()
+       url := "http://localhost:" + port
+
+       fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+       var confJSON bytes.Buffer
+       err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+               Clusters: map[string]arvados.Cluster{
+                       configData.Cluster.ClusterID: *configData.Cluster,
+               },
+       })
        if err != nil {
-               return fmt.Errorf("read stdin: %s", err)
+               return nil, err
        }
-       var env map[string]string
-       err = json.Unmarshal(buf, &env)
+       cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+       if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+               // If we're a 'go test' process, running
+               // /proc/self/exe would start the test suite in a
+               // child process, which is not what we want.
+               cmd.Path, _ = exec.LookPath("go")
+               cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+               cmd.Env = os.Environ()
+       }
+       cmd.Stdin = &confJSON
+       cmd.Stdout = logbuf
+       cmd.Stderr = logbuf
+       cmd.Env = append(cmd.Env,
+               "GOGC=10",
+               "ARVADOS_SERVICE_INTERNAL_URL="+url)
+       err = cmd.Start()
        if err != nil {
-               return fmt.Errorf("decode stdin: %s", err)
+               return nil, fmt.Errorf("error starting keepstore process: %w", err)
        }
-       for k, v := range env {
-               err = os.Setenv(k, v)
+       cmdExited := false
+       go func() {
+               cmd.Wait()
+               cmdExited = true
+       }()
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+       defer cancel()
+       poll := time.NewTicker(time.Second / 10)
+       defer poll.Stop()
+       client := http.Client{}
+       for range poll.C {
+               testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+               testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
                if err != nil {
-                       return fmt.Errorf("setenv(%q): %s", k, err)
+                       return nil, err
+               }
+               resp, err := client.Do(testReq)
+               if err == nil {
+                       resp.Body.Close()
+                       if resp.StatusCode == http.StatusOK {
+                               break
+                       }
+               }
+               if cmdExited {
+                       return nil, fmt.Errorf("keepstore child process exited")
+               }
+               if ctx.Err() != nil {
+                       return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
                }
        }
-       return nil
+       os.Setenv("ARVADOS_KEEP_SERVICES", url)
+       return cmd, nil
 }
index c688248c64fbdc26330144ebeafb3abf8c42edc6..8adddd7053921ee25a18a8eead02af6d7c464c77 100644 (file)
@@ -6,6 +6,7 @@ package crunchrun
 
 import (
        "bytes"
+       "encoding/json"
        "fmt"
        "io"
        "io/ioutil"
@@ -13,9 +14,11 @@ import (
        "os/exec"
        "strings"
 
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        . "gopkg.in/check.v1"
 )
@@ -33,6 +36,9 @@ type integrationSuite struct {
        client *arvados.Client
        ac     *arvadosclient.ArvadosClient
        kc     *keepclient.KeepClient
+
+       logCollection    arvados.Collection
+       outputCollection arvados.Collection
 }
 
 func (s *integrationSuite) SetUpSuite(c *C) {
@@ -49,7 +55,12 @@ func (s *integrationSuite) SetUpSuite(c *C) {
        out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
        imageUUID := strings.TrimSpace(string(out))
        c.Logf("image uuid %s", imageUUID)
-       c.Assert(err, IsNil)
+       if !c.Check(err, IsNil) {
+               if err, ok := err.(*exec.ExitError); ok {
+                       c.Logf("%s", err.Stderr)
+               }
+               c.Fail()
+       }
        err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
        c.Assert(err, IsNil)
        c.Logf("image pdh %s", s.image.PortableDataHash)
@@ -92,6 +103,8 @@ func (s *integrationSuite) SetUpTest(c *C) {
        s.stdin = bytes.Buffer{}
        s.stdout = bytes.Buffer{}
        s.stderr = bytes.Buffer{}
+       s.logCollection = arvados.Collection{}
+       s.outputCollection = arvados.Collection{}
        s.cr = arvados.ContainerRequest{
                Priority:       1,
                State:          "Committed",
@@ -150,17 +163,76 @@ func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
        s.testRunTrivialContainer(c)
 }
 
+func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
+       for _, trial := range []struct {
+               logConfig           string
+               matchGetReq         Checker
+               matchPutReq         Checker
+               matchStartupMessage Checker
+       }{
+               {"none", Not(Matches), Not(Matches), Not(Matches)},
+               {"all", Matches, Matches, Matches},
+               {"errors", Not(Matches), Not(Matches), Matches},
+       } {
+               c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
+               s.SetUpTest(c)
+
+               cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+               c.Assert(err, IsNil)
+               cluster, err := cfg.GetCluster("")
+               c.Assert(err, IsNil)
+               for uuid, volume := range cluster.Volumes {
+                       volume.AccessViaHosts = nil
+                       volume.Replication = 2
+                       cluster.Volumes[uuid] = volume
+               }
+               cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
+
+               s.stdin.Reset()
+               err = json.NewEncoder(&s.stdin).Encode(ConfigData{
+                       Env:         nil,
+                       KeepBuffers: 1,
+                       Cluster:     cluster,
+               })
+               c.Assert(err, IsNil)
+
+               s.engine = "docker"
+               s.testRunTrivialContainer(c)
+
+               fs, err := s.logCollection.FileSystem(s.client, s.kc)
+               c.Assert(err, IsNil)
+               f, err := fs.Open("keepstore.txt")
+               if trial.logConfig == "none" {
+                       c.Check(err, NotNil)
+                       c.Check(os.IsNotExist(err), Equals, true)
+               } else {
+                       c.Assert(err, IsNil)
+                       buf, err := ioutil.ReadAll(f)
+                       c.Assert(err, IsNil)
+                       c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+                       c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+               }
+       }
+}
+
 func (s *integrationSuite) testRunTrivialContainer(c *C) {
        if err := exec.Command("which", s.engine).Run(); err != nil {
                c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
        }
        s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
        s.setup(c)
-       code := command{}.RunCommand("crunch-run", []string{
+
+       args := []string{
                "-runtime-engine=" + s.engine,
                "-enable-memory-limit=false",
                s.cr.ContainerUUID,
-       }, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+       }
+       if s.stdin.Len() > 0 {
+               args = append([]string{"-stdin-config=true"}, args...)
+       }
+       code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+       c.Logf("\n===== stdout =====\n%s", s.stdout.String())
+       c.Logf("\n===== stderr =====\n%s", s.stderr.String())
        c.Check(code, Equals, 0)
        err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
        c.Assert(err, IsNil)
@@ -185,6 +257,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
                        c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
                }
        }
+       s.logCollection = log
 
        var output arvados.Collection
        err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
@@ -218,4 +291,5 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
                        c.Check(fi.Name(), Equals, ".keep")
                }
        }
+       s.outputCollection = output
 }
index 050894383d757a1e90e79999ef9fa8f2f08b9f01..76a55c4992bbd933085e83282391b3e3b241fb04 100644 (file)
@@ -7,6 +7,7 @@ package crunchrun
 import (
        "bufio"
        "bytes"
+       "encoding/json"
        "fmt"
        "io"
        "log"
@@ -404,3 +405,53 @@ func loadLogThrottleParams(clnt IArvadosClient) {
        loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
 
 }
+
+type filterKeepstoreErrorsOnly struct {
+       io.WriteCloser
+       buf []byte
+}
+
+func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
+       log.Printf("filterKeepstoreErrorsOnly: write %q", p)
+       f.buf = append(f.buf, p...)
+       start := 0
+       for i := len(f.buf) - len(p); i < len(f.buf); i++ {
+               if f.buf[i] == '\n' {
+                       if f.check(f.buf[start:i]) {
+                               _, err := f.WriteCloser.Write(f.buf[start : i+1])
+                               if err != nil {
+                                       return 0, err
+                               }
+                       }
+                       start = i + 1
+               }
+       }
+       if start > 0 {
+               copy(f.buf, f.buf[start:])
+               f.buf = f.buf[:len(f.buf)-start]
+       }
+       return len(p), nil
+}
+
+func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
+       if len(line) == 0 {
+               return false
+       }
+       if line[0] != '{' {
+               return true
+       }
+       var m map[string]interface{}
+       err := json.Unmarshal(line, &m)
+       if err != nil {
+               return true
+       }
+       if m["msg"] == "request" {
+               return false
+       }
+       if m["msg"] == "response" {
+               if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
+                       return false
+               }
+       }
+       return true
+}
index 55460af379b3338423d9692e9a12dacc103a0591..fdd4f27b7f9af5463517e3658020795c75ceb8d5 100644 (file)
@@ -5,7 +5,9 @@
 package crunchrun
 
 import (
+       "bytes"
        "fmt"
+       "io"
        "strings"
        "testing"
        "time"
@@ -13,6 +15,7 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        . "gopkg.in/check.v1"
+       check "gopkg.in/check.v1"
 )
 
 type LoggingTestSuite struct {
@@ -219,3 +222,34 @@ func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string
        c.Check(true, Equals, strings.Contains(stderrLog, expected))
        c.Check(string(kc.Content), Equals, logtext)
 }
+
+type filterSuite struct{}
+
+var _ = Suite(&filterSuite{})
+
+func (*filterSuite) TestFilterKeepstoreErrorsOnly(c *check.C) {
+       var buf bytes.Buffer
+       f := filterKeepstoreErrorsOnly{WriteCloser: nopCloser{&buf}}
+       for _, s := range []string{
+               "not j",
+               "son\n" + `{"msg":"foo"}` + "\n{}\n" + `{"msg":"request"}` + "\n" + `{"msg":1234}` + "\n\n",
+               "\n[\n",
+               `{"msg":"response","respStatusCode":404,"foo": "bar"}` + "\n",
+               `{"msg":"response","respStatusCode":206}` + "\n",
+       } {
+               f.Write([]byte(s))
+       }
+       c.Check(buf.String(), check.Equals, `not json
+{"msg":"foo"}
+{}
+{"msg":1234}
+[
+{"msg":"response","respStatusCode":404,"foo": "bar"}
+`)
+}
+
+type nopCloser struct {
+       io.Writer
+}
+
+func (nopCloser) Close() error { return nil }
index 1b31a71a264fabf865f981f5f94eab1649847ac4..f57db0f09fdeed0ee9d3fe597ad3215e3d996463 100644 (file)
@@ -18,6 +18,7 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/crunchrun"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "github.com/sirupsen/logrus"
        "golang.org/x/crypto/ssh"
@@ -193,7 +194,7 @@ type StubVM struct {
        ArvMountDeadlockRate  float64
        ExecuteContainer      func(arvados.Container) int
        CrashRunningContainer func(arvados.Container)
-       ExtraCrunchRunArgs    string // extra args expected after "crunch-run --detach --stdin-env "
+       ExtraCrunchRunArgs    string // extra args expected after "crunch-run --detach --stdin-config "
 
        sis          *StubInstanceSet
        id           cloud.InstanceID
@@ -252,15 +253,15 @@ func (svm *StubVM) Exec(env map[string]string, command string, stdin io.Reader,
                fmt.Fprint(stderr, "crunch-run: command not found\n")
                return 1
        }
-       if strings.HasPrefix(command, "crunch-run --detach --stdin-env "+svm.ExtraCrunchRunArgs) {
-               var stdinKV map[string]string
-               err := json.Unmarshal(stdinData, &stdinKV)
+       if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
+               var configData crunchrun.ConfigData
+               err := json.Unmarshal(stdinData, &configData)
                if err != nil {
                        fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
                        return 1
                }
                for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
-                       if stdinKV[name] == "" {
+                       if configData.Env[name] == "" {
                                fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
                                return 1
                        }
index a5924cf997f7b4bc9d838134054be58b3bd25127..37e3fa9882ddec9ad9a0623ad62f9b3d75d94433 100644 (file)
@@ -103,6 +103,7 @@ func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *promethe
                instanceSetID:                  instanceSetID,
                instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
                newExecutor:                    newExecutor,
+               cluster:                        cluster,
                bootProbeCommand:               cluster.Containers.CloudVMs.BootProbeCommand,
                runnerSource:                   cluster.Containers.CloudVMs.DeployRunnerBinary,
                imageID:                        cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
@@ -144,6 +145,7 @@ type Pool struct {
        instanceSetID                  cloud.InstanceSetID
        instanceSet                    *throttledInstanceSet
        newExecutor                    func(cloud.Instance) Executor
+       cluster                        *arvados.Cluster
        bootProbeCommand               string
        runnerSource                   string
        imageID                        cloud.ImageID
index 0f5c5ee196d2866269f2bb999292e8a9672c3e47..7b5634605fee5c20b987c06078eb78b0dc6841b6 100644 (file)
@@ -10,10 +10,12 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/dispatchcloud/test"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
@@ -31,7 +33,18 @@ func (*lessChecker) Check(params []interface{}, names []string) (result bool, er
 
 var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
 
-type PoolSuite struct{}
+type PoolSuite struct {
+       logger      logrus.FieldLogger
+       testCluster *arvados.Cluster
+}
+
+func (suite *PoolSuite) SetUpTest(c *check.C) {
+       suite.logger = ctxlog.TestLogger(c)
+       cfg, err := config.NewLoader(nil, suite.logger).Load()
+       c.Assert(err, check.IsNil)
+       suite.testCluster, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+}
 
 func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
        type1 := test.InstanceType(1)
@@ -63,10 +76,9 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
 
-       logger := ctxlog.TestLogger(c)
        driver := &test.StubDriver{}
        instanceSetID := cloud.InstanceSetID("test-instance-set-id")
-       is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+       is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        newExecutor := func(cloud.Instance) Executor {
@@ -78,25 +90,21 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
 
-       cluster := &arvados.Cluster{
-               Containers: arvados.ContainersConfig{
-                       CloudVMs: arvados.CloudVMsConfig{
-                               BootProbeCommand:   "true",
-                               MaxProbesPerSecond: 1000,
-                               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
-                               SyncInterval:       arvados.Duration(time.Millisecond * 10),
-                               TagKeyPrefix:       "testprefix:",
-                       },
-                       CrunchRunCommand: "crunch-run-custom",
-               },
-               InstanceTypes: arvados.InstanceTypeMap{
-                       type1.Name: type1,
-                       type2.Name: type2,
-                       type3.Name: type3,
-               },
+       suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
+               BootProbeCommand:   "true",
+               MaxProbesPerSecond: 1000,
+               ProbeInterval:      arvados.Duration(time.Millisecond * 10),
+               SyncInterval:       arvados.Duration(time.Millisecond * 10),
+               TagKeyPrefix:       "testprefix:",
+       }
+       suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
+       suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
+               type1.Name: type1,
+               type2.Name: type2,
+               type3.Name: type3,
        }
 
-       pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+       pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
        notify := pool.Subscribe()
        defer pool.Unsubscribe(notify)
        pool.Create(type1)
@@ -111,7 +119,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
                }
        }
        // Wait for the tags to save to the cloud provider
-       tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+       tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
        deadline := time.Now().Add(time.Second)
        for !func() bool {
                pool.mtx.RLock()
@@ -132,7 +140,7 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 
        c.Log("------- starting new pool, waiting to recover state")
 
-       pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+       pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
        notify2 := pool2.Subscribe()
        defer pool2.Unsubscribe(notify2)
        waitForIdle(pool2, notify2)
@@ -148,9 +156,8 @@ func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
 }
 
 func (suite *PoolSuite) TestDrain(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{}
-       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        ac := arvados.NewClientFromEnv()
@@ -158,8 +165,9 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
        type1 := test.InstanceType(1)
        pool := &Pool{
                arvClient:   ac,
-               logger:      logger,
+               logger:      suite.logger,
                newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               cluster:     suite.testCluster,
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
@@ -201,15 +209,15 @@ func (suite *PoolSuite) TestDrain(c *check.C) {
 }
 
 func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}
-       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        type1 := test.InstanceType(1)
        pool := &Pool{
-               logger:                         logger,
+               logger:                         suite.logger,
                instanceSet:                    &throttledInstanceSet{InstanceSet: instanceSet},
+               cluster:                        suite.testCluster,
                maxConcurrentInstanceCreateOps: 1,
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
@@ -241,17 +249,17 @@ func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
 }
 
 func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        driver := test.StubDriver{HoldCloudOps: true}
-       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+       instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
 
        type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
        type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
        type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
        pool := &Pool{
-               logger:      logger,
+               logger:      suite.logger,
                newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+               cluster:     suite.testCluster,
                instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
                instanceTypes: arvados.InstanceTypeMap{
                        type1.Name: type1,
index 63561874c9c5e570187048922addbbc4e4ece502..29c4b8e0a36a3be2a721e1bc509335817e86842c 100644 (file)
@@ -13,6 +13,7 @@ import (
        "syscall"
        "time"
 
+       "git.arvados.org/arvados.git/lib/crunchrun"
        "github.com/sirupsen/logrus"
 )
 
@@ -21,7 +22,7 @@ import (
 type remoteRunner struct {
        uuid          string
        executor      Executor
-       envJSON       json.RawMessage
+       configJSON    json.RawMessage
        runnerCmd     string
        runnerArgs    []string
        remoteUser    string
@@ -47,7 +48,8 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
        if err := enc.Encode(wkr.instType); err != nil {
                panic(err)
        }
-       env := map[string]string{
+       var configData crunchrun.ConfigData
+       configData.Env = map[string]string{
                "ARVADOS_API_HOST":  wkr.wp.arvClient.APIHost,
                "ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
                "InstanceType":      instJSON.String(),
@@ -55,16 +57,20 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
                "GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
        }
        if wkr.wp.arvClient.Insecure {
-               env["ARVADOS_API_HOST_INSECURE"] = "1"
+               configData.Env["ARVADOS_API_HOST_INSECURE"] = "1"
        }
-       envJSON, err := json.Marshal(env)
+       if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 {
+               configData.Cluster = wkr.wp.cluster
+               configData.KeepBuffers = bufs * wkr.instType.VCPUs
+       }
+       configJSON, err := json.Marshal(configData)
        if err != nil {
                panic(err)
        }
        rr := &remoteRunner{
                uuid:          uuid,
                executor:      wkr.executor,
-               envJSON:       envJSON,
+               configJSON:    configJSON,
                runnerCmd:     wkr.wp.runnerCmd,
                runnerArgs:    wkr.wp.runnerArgs,
                remoteUser:    wkr.instance.RemoteUser(),
@@ -84,7 +90,7 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
 // assume the remote process _might_ have started, at least until it
 // probes the worker and finds otherwise.
 func (rr *remoteRunner) Start() {
-       cmd := rr.runnerCmd + " --detach --stdin-env"
+       cmd := rr.runnerCmd + " --detach --stdin-config"
        for _, arg := range rr.runnerArgs {
                cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
        }
@@ -92,7 +98,7 @@ func (rr *remoteRunner) Start() {
        if rr.remoteUser != "root" {
                cmd = "sudo " + cmd
        }
-       stdin := bytes.NewBuffer(rr.envJSON)
+       stdin := bytes.NewBuffer(rr.configJSON)
        stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
        if err != nil {
                rr.logger.WithField("stdout", string(stdout)).
index 4134788b2e27b00544151b857282d376c57a0ccf..2ee6b7c3622d66a5b85299826b035fa47ce97d26 100644 (file)
@@ -14,24 +14,36 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/dispatchcloud/test"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "github.com/prometheus/client_golang/prometheus"
+       "github.com/sirupsen/logrus"
        check "gopkg.in/check.v1"
 )
 
 var _ = check.Suite(&WorkerSuite{})
 
-type WorkerSuite struct{}
+type WorkerSuite struct {
+       logger      logrus.FieldLogger
+       testCluster *arvados.Cluster
+}
+
+func (suite *WorkerSuite) SetUpTest(c *check.C) {
+       suite.logger = ctxlog.TestLogger(c)
+       cfg, err := config.NewLoader(nil, suite.logger).Load()
+       c.Assert(err, check.IsNil)
+       suite.testCluster, err = cfg.GetCluster("")
+       c.Assert(err, check.IsNil)
+}
 
 func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
-       logger := ctxlog.TestLogger(c)
        bootTimeout := time.Minute
        probeTimeout := time.Second
 
        ac := arvados.NewClientFromEnv()
-       is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
+       is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
        c.Assert(err, check.IsNil)
        inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
        c.Assert(err, check.IsNil)
@@ -232,6 +244,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                wp := &Pool{
                        arvClient:        ac,
                        newExecutor:      func(cloud.Instance) Executor { return exr },
+                       cluster:          suite.testCluster,
                        bootProbeCommand: "bootprobe",
                        timeoutBooting:   bootTimeout,
                        timeoutProbe:     probeTimeout,
@@ -249,7 +262,7 @@ func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
                        exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
                }
                wkr := &worker{
-                       logger:   logger,
+                       logger:   suite.logger,
                        executor: exr,
                        wp:       wp,
                        mtx:      &wp.mtx,
index f1d27b8dcdc5f2b3d79f82273ee38b278b089cb8..e736f79fd7f2bcee24e449596b4950bd279881bc 100644 (file)
@@ -435,6 +435,8 @@ type ContainersConfig struct {
        SupportedDockerImageFormats StringSet
        UsePreemptibleInstances     bool
        RuntimeEngine               string
+       LocalKeepBlobBuffersPerVCPU int
+       LocalKeepLogsToContainerLog string
 
        JobsAPI struct {
                Enable         string
index cf655c2a5a96d9ca92194321ad813fd5712ab523..f9b383e70e5a1d531c414f7c180e1d623e7c55b2 100644 (file)
@@ -558,6 +558,9 @@ func (v *AzureBlobVolume) translateError(err error) error {
        case strings.Contains(err.Error(), "Not Found"):
                // "storage: service returned without a response body (404 Not Found)"
                return os.ErrNotExist
+       case strings.Contains(err.Error(), "ErrorCode=BlobNotFound"):
+               // "storage: service returned error: StatusCode=404, ErrorCode=BlobNotFound, ErrorMessage=The specified blob does not exist.\n..."
+               return os.ErrNotExist
        default:
                return err
        }
index cbb7f38bb14100468710b74f68d28e6513b6f4d7..d545bde0ab9193151b655328d3323f01932a2b19 100644 (file)
@@ -1154,15 +1154,6 @@ func (s *HandlerSuite) TestPutHandlerNoBufferleak(c *check.C) {
        }
 }
 
-type notifyingResponseRecorder struct {
-       *httptest.ResponseRecorder
-       closer chan bool
-}
-
-func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
-       return r.closer
-}
-
 func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
        s.cluster.Collections.BlobSigning = false
        c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
@@ -1173,23 +1164,15 @@ func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
        bufs = newBufferPool(ctxlog.TestLogger(c), 1, BlockSize)
        defer bufs.Put(bufs.Get(BlockSize))
 
-       if err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
-               c.Error(err)
-       }
-
-       resp := &notifyingResponseRecorder{
-               ResponseRecorder: httptest.NewRecorder(),
-               closer:           make(chan bool, 1),
-       }
-       if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
-               c.Fatal("notifyingResponseRecorder is broken")
-       }
-       // If anyone asks, the client has disconnected.
-       resp.closer <- true
+       err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock)
+       c.Assert(err, check.IsNil)
 
+       resp := httptest.NewRecorder()
        ok := make(chan struct{})
        go func() {
-               req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+               ctx, cancel := context.WithCancel(context.Background())
+               req, _ := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+               cancel()
                s.handler.ServeHTTP(resp, req)
                ok <- struct{}{}
        }()
@@ -1200,7 +1183,7 @@ func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
        case <-ok:
        }
 
-       ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+       ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp)
        for i, v := range s.handler.volmgr.AllWritable() {
                if calls := v.Volume.(*MockVolume).called["GET"]; calls != 0 {
                        c.Errorf("volume %d got %d calls, expected 0", i, calls)
index 29e7b2ca9c499e0b5eae3461084775ef7af40506..2a90705a56cbf0c1eec78a03dcd5b45463f84851 100644 (file)
@@ -112,12 +112,9 @@ func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
 }
 
 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        locator := req.URL.Path[1:]
        if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
-               rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+               rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
                return
        }
 
@@ -136,14 +133,14 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        // isn't here, we can return 404 now instead of waiting for a
        // buffer.
 
-       buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+       buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
        }
        defer bufs.Put(buf)
 
-       size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+       size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
        if err != nil {
                code := http.StatusInternalServerError
                if err, ok := err.(*KeepError); ok {
@@ -158,21 +155,6 @@ func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
        resp.Write(buf[:size])
 }
 
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
-       ctx, cancel := context.WithCancel(parent)
-       if cn, ok := resp.(http.CloseNotifier); ok {
-               go func(c <-chan bool) {
-                       select {
-                       case <-c:
-                               cancel()
-                       case <-ctx.Done():
-                       }
-               }(cn.CloseNotify())
-       }
-       return ctx, cancel
-}
-
 // Get a buffer from the pool -- but give up and return a non-nil
 // error if ctx ends before we get a buffer.
 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
@@ -223,9 +205,6 @@ func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
 }
 
 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
-       ctx, cancel := contextForResponse(context.TODO(), resp)
-       defer cancel()
-
        hash := mux.Vars(req)["hash"]
 
        // Detect as many error conditions as possible before reading
@@ -262,7 +241,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                }
        }
 
-       buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+       buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
        if err != nil {
                http.Error(resp, err.Error(), http.StatusServiceUnavailable)
                return
@@ -275,7 +254,7 @@ func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
                return
        }
 
-       result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+       result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
        bufs.Put(buf)
 
        if err != nil {