18992: Load cluster config file if present in HPC environment.
authorTom Clegg <tom@curii.com>
Thu, 14 Apr 2022 14:18:15 +0000 (10:18 -0400)
committerTom Clegg <tom@curii.com>
Thu, 14 Apr 2022 14:18:15 +0000 (10:18 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/crunchrun/integration_test.go
sdk/go/ctxlog/log.go

index 219ed3b98d3cac6aa171d063f1ef85cd4fbf0b3f..6ac7a5f0cc0931369ac9bd869b7e1dcda3883f0a 100644 (file)
@@ -32,9 +32,11 @@ import (
        "time"
 
        "git.arvados.org/arvados.git/lib/cmd"
+       "git.arvados.org/arvados.git/lib/config"
        "git.arvados.org/arvados.git/lib/crunchstat"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
+       "git.arvados.org/arvados.git/sdk/go/ctxlog"
        "git.arvados.org/arvados.git/sdk/go/keepclient"
        "git.arvados.org/arvados.git/sdk/go/manifest"
        "golang.org/x/sys/unix"
@@ -1722,6 +1724,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
        detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
        stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+       configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
        sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
        kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
        list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
@@ -1768,6 +1771,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                return 1
        }
 
+       var keepstoreLogbuf bufThenWrite
        var conf ConfigData
        if *stdinConfig {
                err := json.NewDecoder(stdin).Decode(&conf)
@@ -1789,6 +1793,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        // fill it using the container UUID prefix.
                        conf.Cluster.ClusterID = containerUUID[:5]
                }
+       } else {
+               conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
        }
 
        log.Printf("crunch-run %s started", cmd.Version.String())
@@ -1798,7 +1804,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                arvadosclient.CertFiles = []string{*caCertsPath}
        }
 
-       var keepstoreLogbuf bufThenWrite
        keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
        if err != nil {
                log.Print(err)
@@ -1959,8 +1964,61 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        return 0
 }
 
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+       var conf ConfigData
+       conf.Cluster = loadClusterConfigFile(configFile, stderr)
+       if conf.Cluster == nil {
+               // skip loading the container record -- we won't be
+               // able to start local keepstore anyway.
+               return conf
+       }
+       arv, err := arvadosclient.MakeArvadosClient()
+       if err != nil {
+               fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+               return conf
+       }
+       arv.Retries = 8
+       var ctr arvados.Container
+       err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+       if err != nil {
+               fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+               return conf
+       }
+       if ctr.RuntimeConstraints.VCPUs > 0 {
+               conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+       }
+       return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+       ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+       ldr.Path = path
+       cfg, err := ldr.Load()
+       if err != nil {
+               fmt.Fprintf(stderr, "could not load config file %s: %s", path, err)
+               return nil
+       }
+       cluster, err := cfg.GetCluster("")
+       if err != nil {
+               fmt.Fprintf(stderr, "could not use config file %s: %s", path, err)
+               return nil
+       }
+       return cluster
+}
+
 func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
-       if configData.Cluster == nil || configData.KeepBuffers < 1 {
+       if configData.KeepBuffers < 1 {
+               fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+               return nil, nil
+       }
+       if configData.Cluster == nil {
+               fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
                return nil, nil
        }
        for uuid, vol := range configData.Cluster.Volumes {
index 62df0032b40800e9b2c2eb59ed4c42e639f9e9a1..1d2c7b09fd0773466f54a0846a5507ad64627623 100644 (file)
@@ -50,7 +50,6 @@ type TestSuite struct {
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
-       *brokenNodeHook = ""
        s.client = arvados.NewClientFromEnv()
        s.executor = &stubExecutor{}
        var err error
@@ -1914,8 +1913,8 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
                func() {
                        c.Log("// loadErr = cannot connect")
                        s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
-                       *brokenNodeHook = c.MkDir() + "/broken-node-hook"
-                       err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
+                       s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook"
+                       err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
                        c.Assert(err, IsNil)
                        nextState = "Queued"
                },
@@ -1935,7 +1934,7 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
 }`, nil, 0, func() {})
                c.Check(s.api.CalledWith("container.state", nextState), NotNil)
                c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-               if *brokenNodeHook != "" {
+               if s.runner.brokenNodeHook != "" {
                        c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
                        c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
                        c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
index 9b797fd8676a5dc5690c15ce7adbf5840488a292..ebd2726d0b8e98842d7a9e4ebe1079aafce20d28 100644 (file)
@@ -39,6 +39,7 @@ type integrationSuite struct {
 
        logCollection    arvados.Collection
        outputCollection arvados.Collection
+       logFiles         map[string]string // filename => contents
 }
 
 func (s *integrationSuite) SetUpSuite(c *C) {
@@ -107,6 +108,7 @@ func (s *integrationSuite) SetUpTest(c *C) {
        s.stderr = bytes.Buffer{}
        s.logCollection = arvados.Collection{}
        s.outputCollection = arvados.Collection{}
+       s.logFiles = map[string]string{}
        s.cr = arvados.ContainerRequest{
                Priority:       1,
                State:          "Committed",
@@ -201,20 +203,22 @@ func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
                s.engine = "docker"
                s.testRunTrivialContainer(c)
 
-               fs, err := s.logCollection.FileSystem(s.client, s.kc)
-               c.Assert(err, IsNil)
-               f, err := fs.Open("keepstore.txt")
+               log, logExists := s.logFiles["keepstore.txt"]
                if trial.logConfig == "none" {
-                       c.Check(err, NotNil)
-                       c.Check(os.IsNotExist(err), Equals, true)
+                       c.Check(logExists, Equals, false)
                } else {
-                       c.Assert(err, IsNil)
-                       buf, err := ioutil.ReadAll(f)
-                       c.Assert(err, IsNil)
-                       c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
-                       c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+                       c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+                       c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
                }
        }
+
+       // Check that (1) config is loaded from $ARVADOS_CONFIG when
+       // not provided on stdin and (2) if a local keepstore is not
+       // started, crunch-run.txt explains why not.
+       s.SetUpTest(c)
+       s.stdin.Reset()
+       s.testRunTrivialContainer(c)
+       c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-000000000000000\) uses AccessViaHosts\n.*`)
 }
 
 func (s *integrationSuite) testRunTrivialContainer(c *C) {
@@ -257,6 +261,7 @@ func (s *integrationSuite) testRunTrivialContainer(c *C) {
                        buf, err := ioutil.ReadAll(f)
                        c.Assert(err, IsNil)
                        c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
+                       s.logFiles[fi.Name()] = string(buf)
                }
        }
        s.logCollection = log
index acbb11a3611094be4eed0ce13e5c03ffca9e758b..e888f3151b732bedd4eddc66a590b5f9699a4149 100644 (file)
@@ -93,6 +93,11 @@ func setFormat(logger *logrus.Logger, format string) {
                        FullTimestamp:   true,
                        TimestampFormat: rfc3339NanoFixed,
                }
+       case "plain":
+               logger.Formatter = &logrus.TextFormatter{
+                       DisableColors:    true,
+                       DisableTimestamp: true,
+               }
        case "json", "":
                logger.Formatter = &logrus.JSONFormatter{
                        TimestampFormat: rfc3339NanoFixed,