# A zero value disables this feature.
#
# In order for this feature to be activated, no volume may use
- # AccessViaHosts, and each volume must have Replication higher
- # than Collections.DefaultReplication. If these requirements are
- # not satisfied, the feature is disabled automatically
- # regardless of the value given here.
+ # AccessViaHosts, and no writable volume may have Replication
+ # lower than Collections.DefaultReplication. If these
+ # requirements are not satisfied, the feature is disabled
+ # automatically regardless of the value given here.
#
- # Note that when this configuration is enabled, the entire
- # cluster configuration file, including the system root token,
- # is copied to the worker node and held in memory for the
- # duration of the container.
+ # When an HPC dispatcher is in use (see SLURM and LSF sections),
+ # this feature depends on the operator to ensure an up-to-date
+ # cluster configuration file (/etc/arvados/config.yml) is
+ # available on all compute nodes. If it is missing or not
+ # readable by the crunch-run user, the feature will be disabled
+ # automatically. To read it from a different location, add a
+ # "-config=/path/to/config.yml" argument to
+ # CrunchRunArgumentsList above.
+ #
+ # When the cloud dispatcher is in use (see CloudVMs section) and
+ # this configuration is enabled, the entire cluster
+ # configuration file, including the system root token, is copied
+ # to the worker node and held in memory for the duration of the
+ # container.
LocalKeepBlobBuffersPerVCPU: 1
# When running a dedicated keepstore process for a container
import (
"bytes"
+ "fmt"
"io/ioutil"
- "log"
)
// Return the current process's cgroup for the given subsystem.
-func findCgroup(subsystem string) string {
+func findCgroup(subsystem string) (string, error) {
subsys := []byte(subsystem)
cgroups, err := ioutil.ReadFile("/proc/self/cgroup")
if err != nil {
- log.Fatal(err)
+ return "", err
}
for _, line := range bytes.Split(cgroups, []byte("\n")) {
toks := bytes.SplitN(line, []byte(":"), 4)
}
for _, s := range bytes.Split(toks[1], []byte(",")) {
if bytes.Compare(s, subsys) == 0 {
- return string(toks[2])
+ return string(toks[2]), nil
}
}
}
- log.Fatalf("subsystem %q not found in /proc/self/cgroup", subsystem)
- return ""
+ return "", fmt.Errorf("subsystem %q not found in /proc/self/cgroup", subsystem)
}
func (s *CgroupSuite) TestFindCgroup(c *C) {
for _, s := range []string{"devices", "cpu", "cpuset"} {
- g := findCgroup(s)
- c.Check(g, Not(Equals), "")
+ g, err := findCgroup(s)
+ if c.Check(err, IsNil) {
+ c.Check(g, Not(Equals), "", Commentf("subsys %q", s))
+ }
c.Logf("cgroup(%q) == %q", s, g)
}
}
"time"
"git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/crunchstat"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
"golang.org/x/sys/unix"
enableMemoryLimit bool
enableNetwork string // one of "default" or "always"
networkMode string // "none", "host", or "" -- passed through to executor
+ brokenNodeHook string // script to run if node appears to be broken
arvMountLog *ThrottledLogger
containerWatchdogInterval time.Duration
"(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
"(?ms).*grpc: the connection is unavailable.*",
}
-var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
func (runner *ContainerRunner) runBrokenNodeHook() {
- if *brokenNodeHook == "" {
+ if runner.brokenNodeHook == "" {
path := filepath.Join(lockdir, brokenfile)
runner.CrunchLog.Printf("Writing %s to mark node as broken", path)
f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700)
}
f.Close()
} else {
- runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook)
// run killme script
- c := exec.Command(*brokenNodeHook)
+ c := exec.Command(runner.brokenNodeHook)
c.Stdout = runner.CrunchLog
c.Stderr = runner.CrunchLog
err := c.Run()
caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
+ configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)")
sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)")
kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID")
list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes")
networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`)
memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container")
runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity")
+ brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)")
flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.")
ignoreDetachFlag := false
return 1
}
+ var keepstoreLogbuf bufThenWrite
var conf ConfigData
if *stdinConfig {
err := json.NewDecoder(stdin).Decode(&conf)
// fill it using the container UUID prefix.
conf.Cluster.ClusterID = containerUUID[:5]
}
+ } else {
+ conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr))
}
log.Printf("crunch-run %s started", cmd.Version.String())
arvadosclient.CertFiles = []string{*caCertsPath}
}
- var keepstoreLogbuf bufThenWrite
keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr))
if err != nil {
log.Print(err)
}
defer cr.executor.Close()
+ cr.brokenNodeHook = *brokenNodeHook
+
gwAuthSecret := os.Getenv("GatewayAuthSecret")
os.Unsetenv("GatewayAuthSecret")
if gwAuthSecret == "" {
cr.enableNetwork = *enableNetwork
cr.networkMode = *networkMode
if *cgroupParentSubsystem != "" {
- p := findCgroup(*cgroupParentSubsystem)
+ p, err := findCgroup(*cgroupParentSubsystem)
+ if err != nil {
+ log.Printf("fatal: cgroup parent subsystem: %s", err)
+ return 1
+ }
cr.setCgroupParent = p
cr.expectCgroupParent = p
}
return 0
}
+// Try to load ConfigData in hpc (slurm/lsf) environment. This means
+// loading the cluster config from the specified file and (if that
+// works) getting the runtime_constraints container field from
+// controller to determine # VCPUs so we can calculate KeepBuffers.
+func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
+ var conf ConfigData
+ conf.Cluster = loadClusterConfigFile(configFile, stderr)
+ if conf.Cluster == nil {
+ // skip loading the container record -- we won't be
+ // able to start local keepstore anyway.
+ return conf
+ }
+ arv, err := arvadosclient.MakeArvadosClient()
+ if err != nil {
+ fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
+ return conf
+ }
+ arv.Retries = 8
+ var ctr arvados.Container
+ err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
+ if err != nil {
+ fmt.Fprintf(stderr, "error getting container record: %s\n", err)
+ return conf
+ }
+ if ctr.RuntimeConstraints.VCPUs > 0 {
+ conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU
+ }
+ return conf
+}
+
+// Load cluster config file from given path. If an error occurs, log
+// the error to stderr and return nil.
+func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster {
+ ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info"))
+ ldr.Path = path
+ cfg, err := ldr.Load()
+ if err != nil {
+ fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err)
+ return nil
+ }
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err)
+ return nil
+ }
+ fmt.Fprintf(stderr, "loaded config file %s\n", path)
+ return cluster
+}
+
func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) {
- if configData.Cluster == nil || configData.KeepBuffers < 1 {
+ if configData.KeepBuffers < 1 {
+ fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers)
+ return nil, nil
+ }
+ if configData.Cluster == nil {
+ fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n")
return nil, nil
}
for uuid, vol := range configData.Cluster.Volumes {
}
func (s *TestSuite) SetUpTest(c *C) {
- *brokenNodeHook = ""
s.client = arvados.NewClientFromEnv()
s.executor = &stubExecutor{}
var err error
func() {
c.Log("// loadErr = cannot connect")
s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
- *brokenNodeHook = c.MkDir() + "/broken-node-hook"
- err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
+ s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook"
+ err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
c.Assert(err, IsNil)
nextState = "Queued"
},
}`, nil, 0, func() {})
c.Check(s.api.CalledWith("container.state", nextState), NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
- if *brokenNodeHook != "" {
+ if s.runner.brokenNodeHook != "" {
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
stdin bytes.Buffer
stdout bytes.Buffer
stderr bytes.Buffer
+ args []string
cr arvados.ContainerRequest
client *arvados.Client
ac *arvadosclient.ArvadosClient
logCollection arvados.Collection
outputCollection arvados.Collection
+ logFiles map[string]string // filename => contents
}
func (s *integrationSuite) SetUpSuite(c *C) {
func (s *integrationSuite) SetUpTest(c *C) {
os.Unsetenv("ARVADOS_KEEP_SERVICES")
s.engine = "docker"
+ s.args = nil
s.stdin = bytes.Buffer{}
s.stdout = bytes.Buffer{}
s.stderr = bytes.Buffer{}
s.logCollection = arvados.Collection{}
s.outputCollection = arvados.Collection{}
+ s.logFiles = map[string]string{}
s.cr = arvados.ContainerRequest{
Priority: 1,
State: "Committed",
s.engine = "docker"
s.testRunTrivialContainer(c)
- fs, err := s.logCollection.FileSystem(s.client, s.kc)
- c.Assert(err, IsNil)
- f, err := fs.Open("keepstore.txt")
+ log, logExists := s.logFiles["keepstore.txt"]
if trial.logConfig == "none" {
- c.Check(err, NotNil)
- c.Check(os.IsNotExist(err), Equals, true)
+ c.Check(logExists, Equals, false)
} else {
- c.Assert(err, IsNil)
- buf, err := ioutil.ReadAll(f)
- c.Assert(err, IsNil)
- c.Check(string(buf), trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
- c.Check(string(buf), trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
+ c.Check(log, trial.matchGetReq, `(?ms).*"reqMethod":"GET".*`)
+ c.Check(log, trial.matchPutReq, `(?ms).*"reqMethod":"PUT".*,"reqPath":"0e3bcff26d51c895a60ea0d4585e134d".*`)
}
}
+
+ // Check that (1) config is loaded from $ARVADOS_CONFIG when
+ // not provided on stdin and (2) if a local keepstore is not
+ // started, crunch-run.txt explains why not.
+ s.SetUpTest(c)
+ s.stdin.Reset()
+ s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*not starting a local keepstore process because a volume \(zzzzz-nyw5e-000000000000000\) uses AccessViaHosts\n.*`)
+
+ // Check that config read errors are logged
+ s.SetUpTest(c)
+ s.args = []string{"-config", c.MkDir() + "/config-error.yaml"}
+ s.stdin.Reset()
+ s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* no such file or directory\n.*`)
+
+ s.SetUpTest(c)
+ s.args = []string{"-config", c.MkDir() + "/config-unreadable.yaml"}
+ s.stdin.Reset()
+ err := ioutil.WriteFile(s.args[1], []byte{}, 0)
+ c.Check(err, IsNil)
+ s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*could not load config file \Q`+s.args[1]+`\E:.* permission denied\n.*`)
+
+ s.SetUpTest(c)
+ s.stdin.Reset()
+ s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*loaded config file \Q`+os.Getenv("ARVADOS_CONFIG")+`\E\n.*`)
}
func (s *integrationSuite) testRunTrivialContainer(c *C) {
args := []string{
"-runtime-engine=" + s.engine,
"-enable-memory-limit=false",
- s.cr.ContainerUUID,
}
if s.stdin.Len() > 0 {
- args = append([]string{"-stdin-config=true"}, args...)
+ args = append(args, "-stdin-config=true")
}
+ args = append(args, s.args...)
+ args = append(args, s.cr.ContainerUUID)
code := command{}.RunCommand("crunch-run", args, &s.stdin, io.MultiWriter(&s.stdout, os.Stderr), io.MultiWriter(&s.stderr, os.Stderr))
c.Logf("\n===== stdout =====\n%s", s.stdout.String())
c.Logf("\n===== stderr =====\n%s", s.stderr.String())
buf, err := ioutil.ReadAll(f)
c.Assert(err, IsNil)
c.Logf("\n===== %s =====\n%s", fi.Name(), buf)
+ s.logFiles[fi.Name()] = string(buf)
}
}
s.logCollection = log
FullTimestamp: true,
TimestampFormat: rfc3339NanoFixed,
}
+ case "plain":
+ logger.Formatter = &logrus.TextFormatter{
+ DisableColors: true,
+ DisableTimestamp: true,
+ }
case "json", "":
logger.Formatter = &logrus.JSONFormatter{
TimestampFormat: rfc3339NanoFixed,