<div class="releasenotes">
</notextile>
+h2(#main). development main (as of 2021-10-27)
+
+"previous: Upgrading from 2.3.0":#v2_3_0
+
+h3. Dedicated keepstore process for each container
+
+When Arvados runs a container via @arvados-dispatch-cloud@, the @crunch-run@ supervisor process now brings up its own keepstore server to handle I/O for mounted collections, outputs, and logs. With the default configuration, the keepstore process allocates one 64 MiB block buffer per VCPU requested by the container. For most workloads this will increase throughput, reduce total network traffic, and make it possible to run more containers at once without provisioning additional keepstore nodes to handle the I/O load.
+* If you have containers that can effectively handle multiple I/O threads per VCPU, consider increasing the @Containers.LocalKeepBlobBuffersPerVCPU@ value.
+* If you already have a robust permanent keepstore infrastructure, you can set @Containers.LocalKeepBlobBuffersPerVCPU@ to 0 to disable this feature and preserve the previous behavior of sending container I/O traffic to your separately provisioned keepstore servers.
+* This feature is enabled only if no volumes use @AccessViaHosts@, and no volumes have underlying @Replication@ less than @Collections.DefaultReplication@. If the feature is configured but cannot be enabled due to an incompatible volume configuration, this will be noted in the @crunch-run.txt@ file in the container log.
+
h2(#v2_3_0). v2.3.0 (2021-10-27)
"previous: Upgrading from 2.2.0":#v2_2_0
# Container runtime: "docker" (default) or "singularity"
RuntimeEngine: docker
+ # When running a container, run a dedicated keepstore process,
+ # using the specified number of 64 MiB memory buffers per
+ # allocated CPU core (VCPUs in the container's runtime
+ # constraints). The dedicated keepstore handles I/O for
+ # collections mounted in the container, as well as saving
+ # container logs.
+ #
+ # A zero value disables this feature.
+ #
+ # In order for this feature to be activated, no volume may use
+ # AccessViaHosts, and each volume must have Replication higher
+ # than Collections.DefaultReplication. If these requirements are
+ # not satisfied, the feature is disabled automatically
+ # regardless of the value given here.
+ #
+ # Note that when this configuration is enabled, the entire
+ # cluster configuration file, including the system root token,
+ # is copied to the worker node and held in memory for the
+ # duration of the container.
+ LocalKeepBlobBuffersPerVCPU: 1
+
+ # When running a dedicated keepstore process for a container
+ # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+ # messages to keepstore.txt in the container's log collection.
+ #
+ # These log messages can reveal some volume configuration
+ # details, error messages from the cloud storage provider, etc.,
+ # which are not otherwise visible to users.
+ #
+ # Accepted values:
+ # * "none" -- no keepstore.txt file
+ # * "all" -- all logs, including request and response lines
+ # * "errors" -- all logs except "response" logs with 2xx
+ # response codes and "request" logs
+ LocalKeepLogsToContainerLog: none
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
"Containers.JobsAPI": true,
"Containers.JobsAPI.Enable": true,
"Containers.JobsAPI.GitInternalDir": false,
+ "Containers.LocalKeepBlobBuffersPerVCPU": false,
+ "Containers.LocalKeepLogsToContainerLog": false,
"Containers.Logging": false,
"Containers.LogReuseDecisions": false,
"Containers.LSF": false,
# Container runtime: "docker" (default) or "singularity"
RuntimeEngine: docker
+ # When running a container, run a dedicated keepstore process,
+ # using the specified number of 64 MiB memory buffers per
+ # allocated CPU core (VCPUs in the container's runtime
+ # constraints). The dedicated keepstore handles I/O for
+ # collections mounted in the container, as well as saving
+ # container logs.
+ #
+ # A zero value disables this feature.
+ #
+ # In order for this feature to be activated, no volume may use
+ # AccessViaHosts, and each volume must have Replication higher
+ # than Collections.DefaultReplication. If these requirements are
+ # not satisfied, the feature is disabled automatically
+ # regardless of the value given here.
+ #
+ # Note that when this configuration is enabled, the entire
+ # cluster configuration file, including the system root token,
+ # is copied to the worker node and held in memory for the
+ # duration of the container.
+ LocalKeepBlobBuffersPerVCPU: 1
+
+ # When running a dedicated keepstore process for a container
+ # (see LocalKeepBlobBuffersPerVCPU), write keepstore log
+ # messages to keepstore.txt in the container's log collection.
+ #
+ # These log messages can reveal some volume configuration
+ # details, error messages from the cloud storage provider, etc.,
+ # which are not otherwise visible to users.
+ #
+ # Accepted values:
+ # * "none" -- no keepstore.txt file
+ # * "all" -- all logs, including request and response lines
+ # * "errors" -- all logs except "response" logs with 2xx
+ # response codes and "request" logs
+ LocalKeepLogsToContainerLog: none
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
ldr.checkToken(fmt.Sprintf("Clusters.%s.SystemRootToken", id), cc.SystemRootToken),
ldr.checkToken(fmt.Sprintf("Clusters.%s.Collections.BlobSigningKey", id), cc.Collections.BlobSigningKey),
checkKeyConflict(fmt.Sprintf("Clusters.%s.PostgreSQL.Connection", id), cc.PostgreSQL.Connection),
+ ldr.checkEnum("Containers.LocalKeepLogsToContainerLog", cc.Containers.LocalKeepLogsToContainerLog, "none", "all", "errors"),
ldr.checkEmptyKeepstores(cc),
ldr.checkUnlistedKeepstores(cc),
ldr.checkStorageClasses(cc),
return nil
}
+func (ldr *Loader) checkEnum(label, value string, accepted ...string) error {
+ for _, s := range accepted {
+ if s == value {
+ return nil
+ }
+ }
+ return fmt.Errorf("%s: unacceptable value %q: must be one of %q", label, value, accepted)
+}
+
func (ldr *Loader) setImplicitStorageClasses(cfg *arvados.Config) error {
cluster:
for id, cc := range cfg.Clusters {
//
// Stdout and stderr in the child process are sent to the systemd
// journal using the systemd-cat program.
-func Detach(uuid string, prog string, args []string, stdout, stderr io.Writer) int {
- return exitcode(stderr, detach(uuid, prog, args, stdout, stderr))
+func Detach(uuid string, prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ return exitcode(stderr, detach(uuid, prog, args, stdin, stdout))
}
-func detach(uuid string, prog string, args []string, stdout, stderr io.Writer) error {
+func detach(uuid string, prog string, args []string, stdin io.Reader, stdout io.Writer) error {
lockfile, err := func() (*os.File, error) {
// We must hold the dir-level lock between
// opening/creating the lockfile and acquiring LOCK_EX
// invoked as "/path/to/crunch-run"
execargs = append([]string{prog}, execargs...)
}
- execargs = append([]string{
- // Here, if the inner systemd-cat can't exec
- // crunch-run, it writes an error message to stderr,
- // and the outer systemd-cat writes it to the journal
- // where the operator has a chance to discover it. (If
- // we only used one systemd-cat command, it would be
- // up to us to report the error -- but we are going to
- // detach and exit, not wait for something to appear
- // on stderr.) Note these systemd-cat calls don't
- // result in additional processes -- they just connect
- // stderr/stdout to sockets and call exec().
- "systemd-cat", "--identifier=crunch-run",
- "systemd-cat", "--identifier=crunch-run",
- }, execargs...)
+ if _, err := exec.LookPath("systemd-cat"); err == nil {
+ execargs = append([]string{
+ // Here, if the inner systemd-cat can't exec
+ // crunch-run, it writes an error message to
+ // stderr, and the outer systemd-cat writes it
+ // to the journal where the operator has a
+ // chance to discover it. (If we only used one
+ // systemd-cat command, it would be up to us
+ // to report the error -- but we are going to
+ // detach and exit, not wait for something to
+ // appear on stderr.) Note these systemd-cat
+ // calls don't result in additional processes
+ // -- they just connect stderr/stdout to
+ // sockets and call exec().
+ "systemd-cat", "--identifier=crunch-run",
+ "systemd-cat", "--identifier=crunch-run",
+ }, execargs...)
+ }
cmd := exec.Command(execargs[0], execargs[1:]...)
// Child inherits lockfile.
// from parent (sshd) while sending lockfile content to
// caller.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
+ // We need to manage our own OS pipe here to ensure the child
+ // process reads all of our stdin pipe before we return.
+ piper, pipew, err := os.Pipe()
+ if err != nil {
+ return err
+ }
+ defer pipew.Close()
+ cmd.Stdin = piper
err = cmd.Start()
if err != nil {
return fmt.Errorf("exec %s: %s", cmd.Path, err)
}
+ _, err = io.Copy(pipew, stdin)
+ if err != nil {
+ return err
+ }
+ err = pipew.Close()
+ if err != nil {
+ return err
+ }
w := io.MultiWriter(stdout, lockfile)
return json.NewEncoder(w).Encode(procinfo{
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package crunchrun
+
+import (
+ "bytes"
+ "io"
+ "sync"
+)
+
+type bufThenWrite struct {
+ buf bytes.Buffer
+ w io.Writer
+ mtx sync.Mutex
+}
+
+func (btw *bufThenWrite) SetWriter(w io.Writer) error {
+ btw.mtx.Lock()
+ defer btw.mtx.Unlock()
+ btw.w = w
+ _, err := io.Copy(w, &btw.buf)
+ return err
+}
+
+func (btw *bufThenWrite) Write(p []byte) (int, error) {
+ btw.mtx.Lock()
+ defer btw.mtx.Unlock()
+ if btw.w == nil {
+ btw.w = &btw.buf
+ }
+ return btw.w.Write(p)
+}
import (
"bytes"
+ "context"
"encoding/json"
"errors"
"flag"
"io"
"io/ioutil"
"log"
+ "net"
+ "net/http"
"os"
"os/exec"
"os/signal"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
- "golang.org/x/net/context"
)
type command struct{}
var Command = command{}
+// ConfigData contains environment variables and (when needed) cluster
+// configuration, passed from dispatchcloud to crunch-run on stdin.
+type ConfigData struct {
+ Env map[string]string
+ KeepBuffers int
+ Cluster *arvados.Cluster
+}
+
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
type IArvadosClient interface {
Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
finalState string
parentTemp string
+ keepstoreLogger io.WriteCloser
+ keepstoreLogbuf *bufThenWrite
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
hoststatLogger io.WriteCloser
runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
}()
+ if runner.keepstoreLogger != nil {
+ // Flush any buffered logs from our local keepstore
+ // process. Discard anything logged after this point
+ // -- it won't end up in the log collection, so
+ // there's no point writing it to the collectionfs.
+ runner.keepstoreLogbuf.SetWriter(io.Discard)
+ runner.keepstoreLogger.Close()
+ runner.keepstoreLogger = nil
+ }
+
if runner.LogsPDH != nil {
// If we have already assigned something to LogsPDH,
// we must be closing the re-opened log, which won't
// -- it exists only to send logs to other channels.
return nil
}
+
saved, err := runner.saveLogCollection(true)
if err != nil {
return fmt.Errorf("error saving log collection: %s", err)
}
func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
+ log := log.New(stderr, "", 0)
flags := flag.NewFlagSet(prog, flag.ContinueOnError)
statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
- stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin")
+ stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
return 1
}
- if *stdinEnv && !ignoreDetachFlag {
- // Load env vars on stdin if asked (but not in a
- // detached child process, in which case stdin is
- // /dev/null).
- err := loadEnv(os.Stdin)
- if err != nil {
- log.Print(err)
- return 1
- }
- }
-
containerUUID := flags.Arg(0)
switch {
case *detach && !ignoreDetachFlag:
- return Detach(containerUUID, prog, args, os.Stdout, os.Stderr)
+ return Detach(containerUUID, prog, args, os.Stdin, os.Stdout, os.Stderr)
case *kill >= 0:
return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr)
case *list:
return ListProcesses(os.Stdout, os.Stderr)
}
- if containerUUID == "" {
+ if len(containerUUID) != 27 {
log.Printf("usage: %s [options] UUID", prog)
return 1
}
+ var conf ConfigData
+ if *stdinConfig {
+ err := json.NewDecoder(stdin).Decode(&conf)
+ if err != nil {
+ log.Printf("decode stdin: %s", err)
+ return 1
+ }
+ for k, v := range conf.Env {
+ err = os.Setenv(k, v)
+ if err != nil {
+ log.Printf("setenv(%q): %s", k, err)
+ return 1
+ }
+ }
+ if conf.Cluster != nil {
+ // ClusterID is missing from the JSON
+ // representation, but we need it to generate
+ // a valid config file for keepstore, so we
+ // fill it using the container UUID prefix.
+ conf.Cluster.ClusterID = containerUUID[:5]
+ }
+ }
+
log.Printf("crunch-run %s started", cmd.Version.String())
time.Sleep(*sleep)
arvadosclient.CertFiles = []string{*caCertsPath}
}
+ var keepstoreLogbuf bufThenWrite
+ keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ if keepstore != nil {
+ defer keepstore.Process.Kill()
+ }
+
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
log.Printf("%s: %v", containerUUID, err)
}
api.Retries = 8
- kc, kcerr := keepclient.MakeKeepClient(api)
- if kcerr != nil {
- log.Printf("%s: %v", containerUUID, kcerr)
+ kc, err := keepclient.MakeKeepClient(api)
+ if err != nil {
+ log.Printf("%s: %v", containerUUID, err)
return 1
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
return 1
}
+ if keepstore == nil {
+ // Log explanation (if any) for why we're not running
+ // a local keepstore.
+ var buf bytes.Buffer
+ keepstoreLogbuf.SetWriter(&buf)
+ if buf.Len() > 0 {
+ cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String()))
+ }
+ } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ keepstoreLogbuf.SetWriter(io.Discard)
+ } else {
+ cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES"))
+ logwriter, err := cr.NewLogWriter("keepstore")
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ cr.keepstoreLogger = NewThrottledLogger(logwriter)
+
+ var writer io.WriteCloser = cr.keepstoreLogger
+ if logWhat == "errors" {
+ writer = &filterKeepstoreErrorsOnly{WriteCloser: writer}
+ } else if logWhat != "all" {
+ // should have been caught earlier by
+ // dispatcher's config loader
+ log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat)
+ return 1
+ }
+ err = keepstoreLogbuf.SetWriter(writer)
+ if err != nil {
+ log.Print(err)
+ return 1
+ }
+ cr.keepstoreLogbuf = &keepstoreLogbuf
+ }
+
switch *runtimeEngine {
case "docker":
cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval)
return 0
}
-func loadEnv(rdr io.Reader) error {
- buf, err := ioutil.ReadAll(rdr)
+func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
+ if configData.Cluster == nil || configData.KeepBuffers < 1 {
+ return nil, nil
+ }
+ for uuid, vol := range configData.Cluster.Volumes {
+ if len(vol.AccessViaHosts) > 0 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid)
+ return nil, nil
+ }
+ if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication)
+ return nil, nil
+ }
+ }
+
+ // Rather than have an alternate way to tell keepstore how
+ // many buffers to use when starting it this way, we just
+ // modify the cluster configuration that we feed it on stdin.
+ configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers
+
+ ln, err := net.Listen("tcp", "localhost:0")
+ if err != nil {
+ return nil, err
+ }
+ _, port, err := net.SplitHostPort(ln.Addr().String())
+ if err != nil {
+ ln.Close()
+ return nil, err
+ }
+ ln.Close()
+ url := "http://localhost:" + port
+
+ fmt.Fprintf(logbuf, "starting keepstore on %s\n", url)
+
+ var confJSON bytes.Buffer
+ err = json.NewEncoder(&confJSON).Encode(arvados.Config{
+ Clusters: map[string]arvados.Cluster{
+ configData.Cluster.ClusterID: *configData.Cluster,
+ },
+ })
if err != nil {
- return fmt.Errorf("read stdin: %s", err)
+ return nil, err
}
- var env map[string]string
- err = json.Unmarshal(buf, &env)
+ cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-")
+ if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") {
+ // If we're a 'go test' process, running
+ // /proc/self/exe would start the test suite in a
+ // child process, which is not what we want.
+ cmd.Path, _ = exec.LookPath("go")
+ cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...)
+ cmd.Env = os.Environ()
+ }
+ cmd.Stdin = &confJSON
+ cmd.Stdout = logbuf
+ cmd.Stderr = logbuf
+ cmd.Env = append(cmd.Env,
+ "GOGC=10",
+ "ARVADOS_SERVICE_INTERNAL_URL="+url)
+ err = cmd.Start()
if err != nil {
- return fmt.Errorf("decode stdin: %s", err)
+ return nil, fmt.Errorf("error starting keepstore process: %w", err)
}
- for k, v := range env {
- err = os.Setenv(k, v)
+ cmdExited := false
+ go func() {
+ cmd.Wait()
+ cmdExited = true
+ }()
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
+ defer cancel()
+ poll := time.NewTicker(time.Second / 10)
+ defer poll.Stop()
+ client := http.Client{}
+ for range poll.C {
+ testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil)
+ testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken)
if err != nil {
- return fmt.Errorf("setenv(%q): %s", k, err)
+ return nil, err
+ }
+ resp, err := client.Do(testReq)
+ if err == nil {
+ resp.Body.Close()
+ if resp.StatusCode == http.StatusOK {
+ break
+ }
+ }
+ if cmdExited {
+ return nil, fmt.Errorf("keepstore child process exited")
+ }
+ if ctx.Err() != nil {
+ return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy")
}
}
- return nil
+ os.Setenv("ARVADOS_KEEP_SERVICES", url)
+ return cmd, nil
}
import (
"bytes"
+ "encoding/json"
"fmt"
"io"
"io/ioutil"
"os/exec"
"strings"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
. "gopkg.in/check.v1"
)
client *arvados.Client
ac *arvadosclient.ArvadosClient
kc *keepclient.KeepClient
+
+ logCollection arvados.Collection
+ outputCollection arvados.Collection
}
func (s *integrationSuite) SetUpSuite(c *C) {
out, err = exec.Command("arv-keepdocker", "--no-resume", "busybox:uclibc").Output()
imageUUID := strings.TrimSpace(string(out))
c.Logf("image uuid %s", imageUUID)
- c.Assert(err, IsNil)
+ if !c.Check(err, IsNil) {
+ if err, ok := err.(*exec.ExitError); ok {
+ c.Logf("%s", err.Stderr)
+ }
+ c.Fail()
+ }
err = arvados.NewClientFromEnv().RequestAndDecode(&s.image, "GET", "arvados/v1/collections/"+imageUUID, nil, nil)
c.Assert(err, IsNil)
c.Logf("image pdh %s", s.image.PortableDataHash)
}
func (s *integrationSuite) TearDownSuite(c *C) {
+ os.Unsetenv("ARVADOS_KEEP_SERVICES")
if s.client == nil {
// didn't set up
return
}
func (s *integrationSuite) SetUpTest(c *C) {
+ os.Unsetenv("ARVADOS_KEEP_SERVICES")
s.engine = "docker"
s.stdin = bytes.Buffer{}
s.stdout = bytes.Buffer{}
s.stderr = bytes.Buffer{}
+ s.logCollection = arvados.Collection{}
+ s.outputCollection = arvados.Collection{}
s.cr = arvados.ContainerRequest{
Priority: 1,
State: "Committed",
s.testRunTrivialContainer(c)
}
+func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
+ for _, trial := range []struct {
+ logConfig string
+ matchGetReq Checker
+ matchPutReq Checker
+ matchStartupMessage Checker
+ }{
+ {"none", Not(Matches), Not(Matches), Not(Matches)},
+ {"all", Matches, Matches, Matches},
+ {"errors", Not(Matches), Not(Matches), Matches},
+ } {
+ c.Logf("=== testing with Containers.LocalKeepLogsToContainerLog: %q", trial.logConfig)
+ s.SetUpTest(c)
+
+ cfg, err := config.NewLoader(nil, ctxlog.TestLogger(c)).Load()
+ c.Assert(err, IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, IsNil)
+ for uuid, volume := range cluster.Volumes {
+ volume.AccessViaHosts = nil
+ volume.Replication = 2
+ cluster.Volumes[uuid] = volume
+ }
+ cluster.Containers.LocalKeepLogsToContainerLog = trial.logConfig
+
+ s.stdin.Reset()
+ err = json.NewEncoder(&s.stdin).Encode(ConfigData{
+ Env: nil,
+ KeepBuffers: 1,
+ Cluster: cluster,
+ })
+ c.Assert(err, IsNil)
+
+ s.engine = "docker"
+ s.testRunTrivialContainer(c)
+
+ fs, err := s.logCollection.FileSystem(s.client, s.kc)
+ c.Assert(err, IsNil)
+ f, err := fs.Open("keepstore.txt")
+ if trial.logConfig == "none" {
+ c.Check(err, NotNil)
+ c.Check(os.IsNotExist(err), Equals, true)
+ } else {
+ c.Assert(err, IsNil)
+ buf, err := ioutil.ReadAll(f)
+ c.Assert(err, IsNil)
+ c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+ c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+ }
+ }
+}
+
func (s *integrationSuite) testRunTrivialContainer(c *C) {
if err := exec.Command("which", s.engine).Run(); err != nil {
c.Skip(fmt.Sprintf("%s: %s", s.engine, err))
}
s.cr.Command = []string{"sh", "-c", "cat /mnt/in/inputfile >/mnt/out/inputfile && cat /mnt/json >/mnt/out/json && ! touch /mnt/in/shouldbereadonly && mkdir /mnt/out/emptydir"}
s.setup(c)
- code := command{}.RunCommand("crunch-run", []string{
+
+ args := []string{
"-runtime-engine=" + s.engine,
"-enable-memory-limit=false",
s.cr.ContainerUUID,
- }, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+ }
+ if s.stdin.Len() > 0 {
+ args = append([]string{"-stdin-config=true"}, args...)
+ }
+ code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
+ c.Logf("\n===== stdout =====\n%s", s.stdout.String())
+ c.Logf("\n===== stderr =====\n%s", s.stderr.String())
c.Check(code, Equals, 0)
err := s.client.RequestAndDecode(&s.cr, "GET", "arvados/v1/container_requests/"+s.cr.UUID, nil, nil)
c.Assert(err, IsNil)
c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
}
}
+ s.logCollection = log
var output arvados.Collection
err = s.client.RequestAndDecode(&output, "GET", "arvados/v1/collections/"+s.cr.OutputUUID, nil, nil)
c.Check(fi.Name(), Equals, ".keep")
}
}
+ s.outputCollection = output
}
import (
"bufio"
"bytes"
+ "encoding/json"
"fmt"
"io"
"log"
loadDuration(&crunchLogUpdatePeriod, "crunchLogUpdatePeriod")
}
+
+type filterKeepstoreErrorsOnly struct {
+ io.WriteCloser
+ buf []byte
+}
+
+func (f *filterKeepstoreErrorsOnly) Write(p []byte) (int, error) {
+ log.Printf("filterKeepstoreErrorsOnly: write %q", p)
+ f.buf = append(f.buf, p...)
+ start := 0
+ for i := len(f.buf) - len(p); i < len(f.buf); i++ {
+ if f.buf[i] == '\n' {
+ if f.check(f.buf[start:i]) {
+ _, err := f.WriteCloser.Write(f.buf[start : i+1])
+ if err != nil {
+ return 0, err
+ }
+ }
+ start = i + 1
+ }
+ }
+ if start > 0 {
+ copy(f.buf, f.buf[start:])
+ f.buf = f.buf[:len(f.buf)-start]
+ }
+ return len(p), nil
+}
+
+func (f *filterKeepstoreErrorsOnly) check(line []byte) bool {
+ if len(line) == 0 {
+ return false
+ }
+ if line[0] != '{' {
+ return true
+ }
+ var m map[string]interface{}
+ err := json.Unmarshal(line, &m)
+ if err != nil {
+ return true
+ }
+ if m["msg"] == "request" {
+ return false
+ }
+ if m["msg"] == "response" {
+ if code, _ := m["respStatusCode"].(float64); code >= 200 && code < 300 {
+ return false
+ }
+ }
+ return true
+}
package crunchrun
import (
+ "bytes"
"fmt"
+ "io"
"strings"
"testing"
"time"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
. "gopkg.in/check.v1"
+ check "gopkg.in/check.v1"
)
type LoggingTestSuite struct {
c.Check(true, Equals, strings.Contains(stderrLog, expected))
c.Check(string(kc.Content), Equals, logtext)
}
+
+type filterSuite struct{}
+
+var _ = Suite(&filterSuite{})
+
+func (*filterSuite) TestFilterKeepstoreErrorsOnly(c *check.C) {
+ var buf bytes.Buffer
+ f := filterKeepstoreErrorsOnly{WriteCloser: nopCloser{&buf}}
+ for _, s := range []string{
+ "not j",
+ "son\n" + `{"msg":"foo"}` + "\n{}\n" + `{"msg":"request"}` + "\n" + `{"msg":1234}` + "\n\n",
+ "\n[\n",
+ `{"msg":"response","respStatusCode":404,"foo": "bar"}` + "\n",
+ `{"msg":"response","respStatusCode":206}` + "\n",
+ } {
+ f.Write([]byte(s))
+ }
+ c.Check(buf.String(), check.Equals, `not json
+{"msg":"foo"}
+{}
+{"msg":1234}
+[
+{"msg":"response","respStatusCode":404,"foo": "bar"}
+`)
+}
+
+type nopCloser struct {
+ io.Writer
+}
+
+func (nopCloser) Close() error { return nil }
needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
needRAM += int64(cc.Containers.ReserveExtraRAM)
+ needRAM += int64(cc.Containers.LocalKeepBlobBuffersPerVCPU * needVCPUs * (1 << 26))
needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
ok := false
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/sirupsen/logrus"
"golang.org/x/crypto/ssh"
ArvMountDeadlockRate float64
ExecuteContainer func(arvados.Container) int
CrashRunningContainer func(arvados.Container)
- ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-env "
+ ExtraCrunchRunArgs string // extra args expected after "crunch-run --detach --stdin-config "
sis *StubInstanceSet
id cloud.InstanceID
fmt.Fprint(stderr, "crunch-run: command not found\n")
return 1
}
- if strings.HasPrefix(command, "crunch-run --detach --stdin-env "+svm.ExtraCrunchRunArgs) {
- var stdinKV map[string]string
- err := json.Unmarshal(stdinData, &stdinKV)
+ if strings.HasPrefix(command, "crunch-run --detach --stdin-config "+svm.ExtraCrunchRunArgs) {
+ var configData crunchrun.ConfigData
+ err := json.Unmarshal(stdinData, &configData)
if err != nil {
fmt.Fprintf(stderr, "unmarshal stdin: %s (stdin was: %q)\n", err, stdinData)
return 1
}
for _, name := range []string{"ARVADOS_API_HOST", "ARVADOS_API_TOKEN"} {
- if stdinKV[name] == "" {
+ if configData.Env[name] == "" {
fmt.Fprintf(stderr, "%s env var missing from stdin %q\n", name, stdinData)
return 1
}
instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
+ cluster: cluster,
bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
+ cluster *arvados.Cluster
bootProbeCommand string
runnerSource string
imageID cloud.ImageID
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
var less = &lessChecker{&check.CheckerInfo{Name: "less", Params: []string{"obtained", "expected"}}}
-type PoolSuite struct{}
+type PoolSuite struct {
+ logger logrus.FieldLogger
+ testCluster *arvados.Cluster
+}
+
+func (suite *PoolSuite) SetUpTest(c *check.C) {
+ suite.logger = ctxlog.TestLogger(c)
+ cfg, err := config.NewLoader(nil, suite.logger).Load()
+ c.Assert(err, check.IsNil)
+ suite.testCluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+}
func (suite *PoolSuite) TestResumeAfterRestart(c *check.C) {
type1 := test.InstanceType(1)
}
}
- logger := ctxlog.TestLogger(c)
driver := &test.StubDriver{}
instanceSetID := cloud.InstanceSetID("test-instance-set-id")
- is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
+ is, err := driver.InstanceSet(nil, instanceSetID, nil, suite.logger)
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
}
}
- cluster := &arvados.Cluster{
- Containers: arvados.ContainersConfig{
- CloudVMs: arvados.CloudVMsConfig{
- BootProbeCommand: "true",
- MaxProbesPerSecond: 1000,
- ProbeInterval: arvados.Duration(time.Millisecond * 10),
- SyncInterval: arvados.Duration(time.Millisecond * 10),
- TagKeyPrefix: "testprefix:",
- },
- CrunchRunCommand: "crunch-run-custom",
- },
- InstanceTypes: arvados.InstanceTypeMap{
- type1.Name: type1,
- type2.Name: type2,
- type3.Name: type3,
- },
+ suite.testCluster.Containers.CloudVMs = arvados.CloudVMsConfig{
+ BootProbeCommand: "true",
+ MaxProbesPerSecond: 1000,
+ ProbeInterval: arvados.Duration(time.Millisecond * 10),
+ SyncInterval: arvados.Duration(time.Millisecond * 10),
+ TagKeyPrefix: "testprefix:",
+ }
+ suite.testCluster.Containers.CrunchRunCommand = "crunch-run-custom"
+ suite.testCluster.InstanceTypes = arvados.InstanceTypeMap{
+ type1.Name: type1,
+ type2.Name: type2,
+ type3.Name: type3,
}
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+ pool := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
pool.Create(type1)
}
}
// Wait for the tags to save to the cloud provider
- tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
+ tagKey := suite.testCluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
deadline := time.Now().Add(time.Second)
for !func() bool {
pool.mtx.RLock()
c.Log("------- starting new pool, waiting to recover state")
- pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
+ pool2 := NewPool(suite.logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, suite.testCluster)
notify2 := pool2.Subscribe()
defer pool2.Unsubscribe(notify2)
waitForIdle(pool2, notify2)
}
func (suite *PoolSuite) TestDrain(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
ac := arvados.NewClientFromEnv()
type1 := test.InstanceType(1)
pool := &Pool{
arvClient: ac,
- logger: logger,
+ logger: suite.logger,
newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+ cluster: suite.testCluster,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
}
func (suite *PoolSuite) TestNodeCreateThrottle(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
type1 := test.InstanceType(1)
pool := &Pool{
- logger: logger,
+ logger: suite.logger,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
+ cluster: suite.testCluster,
maxConcurrentInstanceCreateOps: 1,
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
}
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
- logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
type2 := arvados.InstanceType{Name: "a2m", ProviderType: "a2.medium", VCPUs: 2, RAM: 2 * GiB, Price: .02}
type3 := arvados.InstanceType{Name: "a2l", ProviderType: "a2.large", VCPUs: 4, RAM: 4 * GiB, Price: .04}
pool := &Pool{
- logger: logger,
+ logger: suite.logger,
newExecutor: func(cloud.Instance) Executor { return &stubExecutor{} },
+ cluster: suite.testCluster,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
instanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/crunchrun"
"github.com/sirupsen/logrus"
)
type remoteRunner struct {
uuid string
executor Executor
- envJSON json.RawMessage
+ configJSON json.RawMessage
runnerCmd string
runnerArgs []string
remoteUser string
if err := enc.Encode(wkr.instType); err != nil {
panic(err)
}
- env := map[string]string{
+ var configData crunchrun.ConfigData
+ configData.Env = map[string]string{
"ARVADOS_API_HOST": wkr.wp.arvClient.APIHost,
"ARVADOS_API_TOKEN": wkr.wp.arvClient.AuthToken,
"InstanceType": instJSON.String(),
"GatewayAuthSecret": wkr.wp.gatewayAuthSecret(uuid),
}
if wkr.wp.arvClient.Insecure {
- env["ARVADOS_API_HOST_INSECURE"] = "1"
+ configData.Env["ARVADOS_API_HOST_INSECURE"] = "1"
}
- envJSON, err := json.Marshal(env)
+ if bufs := wkr.wp.cluster.Containers.LocalKeepBlobBuffersPerVCPU; bufs > 0 {
+ configData.Cluster = wkr.wp.cluster
+ configData.KeepBuffers = bufs * wkr.instType.VCPUs
+ }
+ configJSON, err := json.Marshal(configData)
if err != nil {
panic(err)
}
rr := &remoteRunner{
uuid: uuid,
executor: wkr.executor,
- envJSON: envJSON,
+ configJSON: configJSON,
runnerCmd: wkr.wp.runnerCmd,
runnerArgs: wkr.wp.runnerArgs,
remoteUser: wkr.instance.RemoteUser(),
// assume the remote process _might_ have started, at least until it
// probes the worker and finds otherwise.
func (rr *remoteRunner) Start() {
- cmd := rr.runnerCmd + " --detach --stdin-env"
+ cmd := rr.runnerCmd + " --detach --stdin-config"
for _, arg := range rr.runnerArgs {
cmd += " '" + strings.Replace(arg, "'", "'\\''", -1) + "'"
}
if rr.remoteUser != "root" {
cmd = "sudo " + cmd
}
- stdin := bytes.NewBuffer(rr.envJSON)
+ stdin := bytes.NewBuffer(rr.configJSON)
stdout, stderr, err := rr.executor.Execute(nil, cmd, stdin)
if err != nil {
rr.logger.WithField("stdout", string(stdout)).
"time"
"git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud/test"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
var _ = check.Suite(&WorkerSuite{})
-type WorkerSuite struct{}
+type WorkerSuite struct {
+ logger logrus.FieldLogger
+ testCluster *arvados.Cluster
+}
+
+func (suite *WorkerSuite) SetUpTest(c *check.C) {
+ suite.logger = ctxlog.TestLogger(c)
+ cfg, err := config.NewLoader(nil, suite.logger).Load()
+ c.Assert(err, check.IsNil)
+ suite.testCluster, err = cfg.GetCluster("")
+ c.Assert(err, check.IsNil)
+}
func (suite *WorkerSuite) TestProbeAndUpdate(c *check.C) {
- logger := ctxlog.TestLogger(c)
bootTimeout := time.Minute
probeTimeout := time.Second
ac := arvados.NewClientFromEnv()
- is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
+ is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, suite.logger)
c.Assert(err, check.IsNil)
inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
c.Assert(err, check.IsNil)
wp := &Pool{
arvClient: ac,
newExecutor: func(cloud.Instance) Executor { return exr },
+ cluster: suite.testCluster,
bootProbeCommand: "bootprobe",
timeoutBooting: bootTimeout,
timeoutProbe: probeTimeout,
exr.response[wp.runnerCmd+" --list"] = trial.respRunDeployed
}
wkr := &worker{
- logger: logger,
+ logger: suite.logger,
executor: exr,
wp: wp,
mtx: &wp.mtx,
SupportedDockerImageFormats StringSet
UsePreemptibleInstances bool
RuntimeEngine string
+ LocalKeepBlobBuffersPerVCPU int
+ LocalKeepLogsToContainerLog string
JobsAPI struct {
Enable string
c.Check(err, check.IsNil)
c.Check(logentries.Items, check.HasLen, 1)
lastLogId := logentries.Items[0].ID
- nextLogId := lastLogId
var logbuf bytes.Buffer
logger := logrus.New()
c.Check(logbuf.String(), check.Matches, `(?ms).*msg="File `+direction+`".*`)
c.Check(logbuf.String(), check.Not(check.Matches), `(?ms).*level=error.*`)
- count := 0
- for ; nextLogId == lastLogId && count < 20; count++ {
- time.Sleep(50 * time.Millisecond)
+ deadline := time.Now().Add(time.Second)
+ for {
+ c.Assert(time.Now().After(deadline), check.Equals, false, check.Commentf("timed out waiting for log entry"))
err = client.RequestAndDecode(&logentries, "GET", "arvados/v1/logs", nil,
arvados.ResourceListParams{
- Filters: []arvados.Filter{arvados.Filter{Attr: "event_type", Operator: "=", Operand: "file_" + direction}},
- Limit: &limit1,
- Order: "created_at desc",
+ Filters: []arvados.Filter{
+ {Attr: "event_type", Operator: "=", Operand: "file_" + direction},
+ {Attr: "object_uuid", Operator: "=", Operand: userUuid},
+ },
+ Limit: &limit1,
+ Order: "created_at desc",
})
- c.Check(err, check.IsNil)
- if len(logentries.Items) > 0 {
- nextLogId = logentries.Items[0].ID
+ c.Assert(err, check.IsNil)
+ if len(logentries.Items) > 0 &&
+ logentries.Items[0].ID > lastLogId &&
+ logentries.Items[0].ObjectUUID == userUuid &&
+ logentries.Items[0].Properties["collection_uuid"] == collectionUuid &&
+ logentries.Items[0].Properties["collection_file_path"] == filepath {
+ break
}
+ c.Logf("logentries.Items: %+v", logentries.Items)
+ time.Sleep(50 * time.Millisecond)
}
- c.Check(count, check.Not(check.Equals), 20)
- c.Check(logentries.Items[0].ObjectUUID, check.Equals, userUuid)
- c.Check(logentries.Items[0].Properties["collection_uuid"], check.Equals, collectionUuid)
- c.Check(logentries.Items[0].Properties["collection_file_path"], check.Equals, filepath)
} else {
c.Check(resp.Result().StatusCode, check.Equals, http.StatusForbidden)
c.Check(logbuf.String(), check.Equals, "")
case strings.Contains(err.Error(), "Not Found"):
// "storage: service returned without a response body (404 Not Found)"
return os.ErrNotExist
+ case strings.Contains(err.Error(), "ErrorCode=BlobNotFound"):
+ // "storage: service returned error: StatusCode=404, ErrorCode=BlobNotFound, ErrorMessage=The specified blob does not exist.\n..."
+ return os.ErrNotExist
default:
return err
}
}
}
-type notifyingResponseRecorder struct {
- *httptest.ResponseRecorder
- closer chan bool
-}
-
-func (r *notifyingResponseRecorder) CloseNotify() <-chan bool {
- return r.closer
-}
-
func (s *HandlerSuite) TestGetHandlerClientDisconnect(c *check.C) {
s.cluster.Collections.BlobSigning = false
c.Assert(s.handler.setup(context.Background(), s.cluster, "", prometheus.NewRegistry(), testServiceURL), check.IsNil)
bufs = newBufferPool(ctxlog.TestLogger(c), 1, BlockSize)
defer bufs.Put(bufs.Get(BlockSize))
- if err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock); err != nil {
- c.Error(err)
- }
-
- resp := ¬ifyingResponseRecorder{
- ResponseRecorder: httptest.NewRecorder(),
- closer: make(chan bool, 1),
- }
- if _, ok := http.ResponseWriter(resp).(http.CloseNotifier); !ok {
- c.Fatal("notifyingResponseRecorder is broken")
- }
- // If anyone asks, the client has disconnected.
- resp.closer <- true
+ err := s.handler.volmgr.AllWritable()[0].Put(context.Background(), TestHash, TestBlock)
+ c.Assert(err, check.IsNil)
+ resp := httptest.NewRecorder()
ok := make(chan struct{})
go func() {
- req, _ := http.NewRequest("GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+ ctx, cancel := context.WithCancel(context.Background())
+ req, _ := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("/%s+%d", TestHash, len(TestBlock)), nil)
+ cancel()
s.handler.ServeHTTP(resp, req)
ok <- struct{}{}
}()
case <-ok:
}
- ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp.ResponseRecorder)
+ ExpectStatusCode(c, "client disconnect", http.StatusServiceUnavailable, resp)
for i, v := range s.handler.volmgr.AllWritable() {
if calls := v.Volume.(*MockVolume).called["GET"]; calls != 0 {
c.Errorf("volume %d got %d calls, expected 0", i, calls)
}
func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
locator := req.URL.Path[1:]
if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
- rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
+ rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
return
}
// isn't here, we can return 404 now instead of waiting for a
// buffer.
- buf, err := getBufferWithContext(ctx, bufs, BlockSize)
+ buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
}
defer bufs.Put(buf)
- size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
+ size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
if err != nil {
code := http.StatusInternalServerError
if err, ok := err.(*KeepError); ok {
resp.Write(buf[:size])
}
-// Return a new context that gets cancelled by resp's CloseNotifier.
-func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
- ctx, cancel := context.WithCancel(parent)
- if cn, ok := resp.(http.CloseNotifier); ok {
- go func(c <-chan bool) {
- select {
- case <-c:
- cancel()
- case <-ctx.Done():
- }
- }(cn.CloseNotify())
- }
- return ctx, cancel
-}
-
// Get a buffer from the pool -- but give up and return a non-nil
// error if ctx ends before we get a buffer.
func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
}
func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
- ctx, cancel := contextForResponse(context.TODO(), resp)
- defer cancel()
-
hash := mux.Vars(req)["hash"]
// Detect as many error conditions as possible before reading
}
}
- buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
+ buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
if err != nil {
http.Error(resp, err.Error(), http.StatusServiceUnavailable)
return
return
}
- result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
+ result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
bufs.Put(buf)
if err != nil {