From: Tom Clegg Date: Fri, 15 Apr 2022 19:28:21 +0000 (-0400) Subject: Merge branch '18992-hpc-local-keepstore' X-Git-Tag: 2.5.0~202 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/f6e8752348958e3bb48c7509a4ff78689f2d64c9?hp=eccbdb78e4dd028621deba2207ee44d3f40d5d9a Merge branch '18992-hpc-local-keepstore' closes #18992 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 6512389815..e60880c217 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -969,15 +969,25 @@ Clusters: # A zero value disables this feature. # # In order for this feature to be activated, no volume may use - # AccessViaHosts, and each volume must have Replication higher - # than Collections.DefaultReplication. If these requirements are - # not satisfied, the feature is disabled automatically - # regardless of the value given here. + # AccessViaHosts, and no writable volume may have Replication + # lower than Collections.DefaultReplication. If these + # requirements are not satisfied, the feature is disabled + # automatically regardless of the value given here. # - # Note that when this configuration is enabled, the entire - # cluster configuration file, including the system root token, - # is copied to the worker node and held in memory for the - # duration of the container. + # When an HPC dispatcher is in use (see SLURM and LSF sections), + # this feature depends on the operator to ensure an up-to-date + # cluster configuration file (/etc/arvados/config.yml) is + # available on all compute nodes. If it is missing or not + # readable by the crunch-run user, the feature will be disabled + # automatically. To read it from a different location, add a + # "-config=/path/to/config.yml" argument to + # CrunchRunArgumentsList above. + # + # When the cloud dispatcher is in use (see CloudVMs section) and + # this configuration is enabled, the entire cluster + # configuration file, including the system root token, is copied + # to the worker node and held in memory for the duration of the + # container. LocalKeepBlobBuffersPerVCPU: 1 # When running a dedicated keepstore process for a container diff --git a/lib/crunchrun/cgroup.go b/lib/crunchrun/cgroup.go index 0b254f5bd7..48ec93b876 100644 --- a/lib/crunchrun/cgroup.go +++ b/lib/crunchrun/cgroup.go @@ -6,16 +6,16 @@ package crunchrun import ( "bytes" + "fmt" "io/ioutil" - "log" ) // Return the current process's cgroup for the given subsystem. -func findCgroup(subsystem string) string { +func findCgroup(subsystem string) (string, error) { subsys := []byte(subsystem) cgroups, err := ioutil.ReadFile("/proc/self/cgroup") if err != nil { - log.Fatal(err) + return "", err } for _, line := range bytes.Split(cgroups, []byte("\n")) { toks := bytes.SplitN(line, []byte(":"), 4) @@ -24,10 +24,9 @@ func findCgroup(subsystem string) string { } for _, s := range bytes.Split(toks[1], []byte(",")) { if bytes.Compare(s, subsys) == 0 { - return string(toks[2]) + return string(toks[2]), nil } } } - log.Fatalf("subsystem %q not found in /proc/self/cgroup", subsystem) - return "" + return "", fmt.Errorf("subsystem %q not found in /proc/self/cgroup", subsystem) } diff --git a/lib/crunchrun/cgroup_test.go b/lib/crunchrun/cgroup_test.go index b43479a3b4..eb87456d14 100644 --- a/lib/crunchrun/cgroup_test.go +++ b/lib/crunchrun/cgroup_test.go @@ -14,8 +14,10 @@ var _ = Suite(&CgroupSuite{}) func (s *CgroupSuite) TestFindCgroup(c *C) { for _, s := range []string{"devices", "cpu", "cpuset"} { - g := findCgroup(s) - c.Check(g, Not(Equals), "") + g, err := findCgroup(s) + if c.Check(err, IsNil) { + c.Check(g, Not(Equals), "", Commentf("subsys %q", s)) + } c.Logf("cgroup(%q) == %q", s, g) } } diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 65f43e9644..474fbf4ade 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -32,9 +32,11 @@ import ( "time" "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/crunchstat" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" "git.arvados.org/arvados.git/sdk/go/manifest" "golang.org/x/sys/unix" @@ -167,6 +169,7 @@ type ContainerRunner struct { enableMemoryLimit bool enableNetwork string // one of "default" or "always" networkMode string // "none", "host", or "" -- passed through to executor + brokenNodeHook string // script to run if node appears to be broken arvMountLog *ThrottledLogger containerWatchdogInterval time.Duration @@ -210,10 +213,9 @@ var errorBlacklist = []string{ "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*", "(?ms).*grpc: the connection is unavailable.*", } -var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)") func (runner *ContainerRunner) runBrokenNodeHook() { - if *brokenNodeHook == "" { + if runner.brokenNodeHook == "" { path := filepath.Join(lockdir, brokenfile) runner.CrunchLog.Printf("Writing %s to mark node as broken", path) f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700) @@ -223,9 +225,9 @@ func (runner *ContainerRunner) runBrokenNodeHook() { } f.Close() } else { - runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook) + runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook) // run killme script - c := exec.Command(*brokenNodeHook) + c := exec.Command(runner.brokenNodeHook) c.Stdout = runner.CrunchLog c.Stderr = runner.CrunchLog err := c.Run() @@ -1722,6 +1724,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") + configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)") sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") @@ -1730,6 +1733,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`) memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container") runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity") + brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") ignoreDetachFlag := false @@ -1767,6 +1771,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + var keepstoreLogbuf bufThenWrite var conf ConfigData if *stdinConfig { err := json.NewDecoder(stdin).Decode(&conf) @@ -1788,6 +1793,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s // fill it using the container UUID prefix. conf.Cluster.ClusterID = containerUUID[:5] } + } else { + conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr)) } log.Printf("crunch-run %s started", cmd.Version.String()) @@ -1797,7 +1804,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s arvadosclient.CertFiles = []string{*caCertsPath} } - var keepstoreLogbuf bufThenWrite keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr)) if err != nil { log.Print(err) @@ -1883,6 +1889,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } defer cr.executor.Close() + cr.brokenNodeHook = *brokenNodeHook + gwAuthSecret := os.Getenv("GatewayAuthSecret") os.Unsetenv("GatewayAuthSecret") if gwAuthSecret == "" { @@ -1923,7 +1931,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.enableNetwork = *enableNetwork cr.networkMode = *networkMode if *cgroupParentSubsystem != "" { - p := findCgroup(*cgroupParentSubsystem) + p, err := findCgroup(*cgroupParentSubsystem) + if err != nil { + log.Printf("fatal: cgroup parent subsystem: %s", err) + return 1 + } cr.setCgroupParent = p cr.expectCgroupParent = p } @@ -1952,8 +1964,62 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 0 } +// Try to load ConfigData in hpc (slurm/lsf) environment. This means +// loading the cluster config from the specified file and (if that +// works) getting the runtime_constraints container field from +// controller to determine # VCPUs so we can calculate KeepBuffers. +func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData { + var conf ConfigData + conf.Cluster = loadClusterConfigFile(configFile, stderr) + if conf.Cluster == nil { + // skip loading the container record -- we won't be + // able to start local keepstore anyway. + return conf + } + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err) + return conf + } + arv.Retries = 8 + var ctr arvados.Container + err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr) + if err != nil { + fmt.Fprintf(stderr, "error getting container record: %s\n", err) + return conf + } + if ctr.RuntimeConstraints.VCPUs > 0 { + conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU + } + return conf +} + +// Load cluster config file from given path. If an error occurs, log +// the error to stderr and return nil. +func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster { + ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info")) + ldr.Path = path + cfg, err := ldr.Load() + if err != nil { + fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err) + return nil + } + cluster, err := cfg.GetCluster("") + if err != nil { + fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err) + return nil + } + fmt.Fprintf(stderr, "loaded config file %s\n", path) + return cluster +} + func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) { - if configData.Cluster == nil || configData.KeepBuffers < 1 { + if configData.KeepBuffers < 1 { + fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers) + return nil, nil + } + if configData.Cluster == nil { + fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n") return nil, nil } for uuid, vol := range configData.Cluster.Volumes { diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go index 62df0032b4..1d2c7b09fd 100644 --- a/lib/crunchrun/crunchrun_test.go +++ b/lib/crunchrun/crunchrun_test.go @@ -50,7 +50,6 @@ type TestSuite struct { } func (s *TestSuite) SetUpTest(c *C) { - *brokenNodeHook = "" s.client = arvados.NewClientFromEnv() s.executor = &stubExecutor{} var err error @@ -1914,8 +1913,8 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) { func() { c.Log("// loadErr = cannot connect") s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?") - *brokenNodeHook = c.MkDir() + "/broken-node-hook" - err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700) + s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook" + err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700) c.Assert(err, IsNil) nextState = "Queued" }, @@ -1935,7 +1934,7 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) { }`, nil, 0, func() {}) c.Check(s.api.CalledWith("container.state", nextState), NotNil) c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") - if *brokenNodeHook != "" { + if s.runner.brokenNodeHook != "" { c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*") c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*") c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*") diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go index 9b797fd867..0b139dd97d 100644 --- a/lib/crunchrun/integration_test.go +++ b/lib/crunchrun/integration_test.go @@ -32,6 +32,7 @@ type integrationSuite struct { stdin bytes.Buffer stdout bytes.Buffer stderr bytes.Buffer + args []string cr arvados.ContainerRequest client *arvados.Client ac *arvadosclient.ArvadosClient @@ -39,6 +40,7 @@ type integrationSuite struct { logCollection arvados.Collection outputCollection arvados.Collection + logFiles map[string]string // filename => contents } func (s *integrationSuite) SetUpSuite(c *C) { @@ -102,11 +104,13 @@ func (s *integrationSuite) TearDownSuite(c *C) { func (s *integrationSuite) SetUpTest(c *C) { os.Unsetenv("ARVADOS_KEEP_SERVICES") s.engine = "docker" + s.args = nil s.stdin = bytes.Buffer{} s.stdout = bytes.Buffer{} s.stderr = bytes.Buffer{} s.logCollection = arvados.Collection{} s.outputCollection = arvados.Collection{} + s.logFiles = map[string]string{} s.cr = arvados.ContainerRequest{ Priority: 1, State: "Committed", @@ -201,20 +205,42 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) { s.engine = "docker" s.testRunTrivialContainer(c) - fs, err := s.logCollection.FileSystem(s.client, s.kc) - c.Assert(err, IsNil) - f, err := fs.Open("keepstore.txt") + log, logExists := s.logFiles["keepstore.txt"] if trial.logConfig == "none" { - c.Check(err, NotNil) - c.Check(os.IsNotExist(err), Equals, true) + c.Check(logExists, Equals, false) } else { - c.Assert(err, IsNil) - buf, err := ioutil.ReadAll(f) - c.Assert(err, IsNil) - c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`) - c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`) + c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`) + c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`) } } + + // Check that (1) config is loaded from $ARVADOS_CONFIG when + // not provided on stdin and (2) if a local keepstore is not + // started, crunch-run.txt explains why not. + s.SetUpTest(c) + s.stdin.Reset() + s.testRunTrivialContainer(c) + c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-000000000000000\) uses AccessViaHosts\n.*`) + + // Check that config read errors are logged + s.SetUpTest(c) + s.args = []string{"-config", c.MkDir() + "/config-error.yaml"} + s.stdin.Reset() + s.testRunTrivialContainer(c) + c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* no such file or directory\n.*`) + + s.SetUpTest(c) + s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"} + s.stdin.Reset() + err := ioutil.WriteFile(s.args[1], []byte{}, 0) + c.Check(err, IsNil) + s.testRunTrivialContainer(c) + c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`) + + s.SetUpTest(c) + s.stdin.Reset() + s.testRunTrivialContainer(c) + c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*loaded config file \Q`+os.Getenv("ARVADOS_CONFIG")+`\E\n.*`) } func (s *integrationSuite) testRunTrivialContainer(c *C) { @@ -227,11 +253,12 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) { args := []string{ "-runtime-engine=" + s.engine, "-enable-memory-limit=false", - s.cr.ContainerUUID, } if s.stdin.Len() > 0 { - args = append([]string{"-stdin-config=true"}, args...) + args = append(args, "-stdin-config=true") } + args = append(args, s.args...) + args = append(args, s.cr.ContainerUUID) code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr)) c.Logf("\n===== stdout =====\n%s", s.stdout.String()) c.Logf("\n===== stderr =====\n%s", s.stderr.String()) @@ -257,6 +284,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) { buf, err := ioutil.ReadAll(f) c.Assert(err, IsNil) c.Logf("\n===== %s =====\n%s", fi.Name(), buf) + s.logFiles[fi.Name()] = string(buf) } } s.logCollection = log diff --git a/sdk/go/ctxlog/log.go b/sdk/go/ctxlog/log.go index acbb11a361..e888f3151b 100644 --- a/sdk/go/ctxlog/log.go +++ b/sdk/go/ctxlog/log.go @@ -93,6 +93,11 @@ func setFormat(logger *logrus.Logger, format string) { FullTimestamp: true, TimestampFormat: rfc3339NanoFixed, } + case "plain": + logger.Formatter = &logrus.TextFormatter{ + DisableColors: true, + DisableTimestamp: true, + } case "json", "": logger.Formatter = &logrus.JSONFormatter{ TimestampFormat: rfc3339NanoFixed,