From: Tom Clegg Date: Thu, 14 Apr 2022 14:18:15 +0000 (-0400) Subject: 18992: Load cluster config file if present in HPC environment. X-Git-Tag: 2.5.0~202^2~2 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/fb181ba27fd354e596d2216786ccee9a537bd0a3 18992: Load cluster config file if present in HPC environment. Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 219ed3b98d..6ac7a5f0cc 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -32,9 +32,11 @@ import ( "time" "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/crunchstat" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" "git.arvados.org/arvados.git/sdk/go/manifest" "golang.org/x/sys/unix" @@ -1722,6 +1724,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") + configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)") sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") @@ -1768,6 +1771,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + var keepstoreLogbuf bufThenWrite var conf ConfigData if *stdinConfig { err := json.NewDecoder(stdin).Decode(&conf) @@ -1789,6 +1793,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s // fill it using the container UUID prefix. conf.Cluster.ClusterID = containerUUID[:5] } + } else { + conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr)) } log.Printf("crunch-run %s started", cmd.Version.String()) @@ -1798,7 +1804,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s arvadosclient.CertFiles = []string{*caCertsPath} } - var keepstoreLogbuf bufThenWrite keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr)) if err != nil { log.Print(err) @@ -1959,8 +1964,61 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 0 } +// Try to load ConfigData in hpc (slurm/lsf) environment. This means +// loading the cluster config from the specified file and (if that +// works) getting the runtime_constraints container field from +// controller to determine # VCPUs so we can calculate KeepBuffers. +func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData { + var conf ConfigData + conf.Cluster = loadClusterConfigFile(configFile, stderr) + if conf.Cluster == nil { + // skip loading the container record -- we won't be + // able to start local keepstore anyway. + return conf + } + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err) + return conf + } + arv.Retries = 8 + var ctr arvados.Container + err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr) + if err != nil { + fmt.Fprintf(stderr, "error getting container record: %s\n", err) + return conf + } + if ctr.RuntimeConstraints.VCPUs > 0 { + conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU + } + return conf +} + +// Load cluster config file from given path. If an error occurs, log +// the error to stderr and return nil. +func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster { + ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info")) + ldr.Path = path + cfg, err := ldr.Load() + if err != nil { + fmt.Fprintf(stderr, "could not load config file %s: %s", path, err) + return nil + } + cluster, err := cfg.GetCluster("") + if err != nil { + fmt.Fprintf(stderr, "could not use config file %s: %s", path, err) + return nil + } + return cluster +} + func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) { - if configData.Cluster == nil || configData.KeepBuffers < 1 { + if configData.KeepBuffers < 1 { + fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers) + return nil, nil + } + if configData.Cluster == nil { + fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n") return nil, nil } for uuid, vol := range configData.Cluster.Volumes { diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go index 62df0032b4..1d2c7b09fd 100644 --- a/lib/crunchrun/crunchrun_test.go +++ b/lib/crunchrun/crunchrun_test.go @@ -50,7 +50,6 @@ type TestSuite struct { } func (s *TestSuite) SetUpTest(c *C) { - *brokenNodeHook = "" s.client = arvados.NewClientFromEnv() s.executor = &stubExecutor{} var err error @@ -1914,8 +1913,8 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) { func() { c.Log("// loadErr = cannot connect") s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?") - *brokenNodeHook = c.MkDir() + "/broken-node-hook" - err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700) + s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook" + err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700) c.Assert(err, IsNil) nextState = "Queued" }, @@ -1935,7 +1934,7 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) { }`, nil, 0, func() {}) c.Check(s.api.CalledWith("container.state", nextState), NotNil) c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") - if *brokenNodeHook != "" { + if s.runner.brokenNodeHook != "" { c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*") c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*") c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*") diff --git a/lib/crunchrun/integration_test.go b/lib/crunchrun/integration_test.go index 9b797fd867..ebd2726d0b 100644 --- a/lib/crunchrun/integration_test.go +++ b/lib/crunchrun/integration_test.go @@ -39,6 +39,7 @@ type integrationSuite struct { logCollection arvados.Collection outputCollection arvados.Collection + logFiles map[string]string // filename => contents } func (s *integrationSuite) SetUpSuite(c *C) { @@ -107,6 +108,7 @@ func (s *integrationSuite) SetUpTest(c *C) { s.stderr = bytes.Buffer{} s.logCollection = arvados.Collection{} s.outputCollection = arvados.Collection{} + s.logFiles = map[string]string{} s.cr = arvados.ContainerRequest{ Priority: 1, State: "Committed", @@ -201,20 +203,22 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) { s.engine = "docker" s.testRunTrivialContainer(c) - fs, err := s.logCollection.FileSystem(s.client, s.kc) - c.Assert(err, IsNil) - f, err := fs.Open("keepstore.txt") + log, logExists := s.logFiles["keepstore.txt"] if trial.logConfig == "none" { - c.Check(err, NotNil) - c.Check(os.IsNotExist(err), Equals, true) + c.Check(logExists, Equals, false) } else { - c.Assert(err, IsNil) - buf, err := ioutil.ReadAll(f) - c.Assert(err, IsNil) - c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`) - c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`) + c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`) + c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`) } } + + // Check that (1) config is loaded from $ARVADOS_CONFIG when + // not provided on stdin and (2) if a local keepstore is not + // started, crunch-run.txt explains why not. + s.SetUpTest(c) + s.stdin.Reset() + s.testRunTrivialContainer(c) + c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-000000000000000\) uses AccessViaHosts\n.*`) } func (s *integrationSuite) testRunTrivialContainer(c *C) { @@ -257,6 +261,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) { buf, err := ioutil.ReadAll(f) c.Assert(err, IsNil) c.Logf("\n===== %s =====\n%s", fi.Name(), buf) + s.logFiles[fi.Name()] = string(buf) } } s.logCollection = log diff --git a/sdk/go/ctxlog/log.go b/sdk/go/ctxlog/log.go index acbb11a361..e888f3151b 100644 --- a/sdk/go/ctxlog/log.go +++ b/sdk/go/ctxlog/log.go @@ -93,6 +93,11 @@ func setFormat(logger *logrus.Logger, format string) { FullTimestamp: true, TimestampFormat: rfc3339NanoFixed, } + case "plain": + logger.Formatter = &logrus.TextFormatter{ + DisableColors: true, + DisableTimestamp: true, + } case "json", "": logger.Formatter = &logrus.JSONFormatter{ TimestampFormat: rfc3339NanoFixed,