X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/6fe152024269d838e31bc224adbd518c43cbfee5..09cbdc3074b3f1e69c9c537875146f6da0a6ed8f:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index b237d9fa59..51e154c0ec 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -19,6 +19,7 @@ import ( "os" "os/exec" "os/signal" + "os/user" "path" "path/filepath" "regexp" @@ -31,11 +32,14 @@ import ( "time" "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/crunchstat" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" "git.arvados.org/arvados.git/sdk/go/manifest" + "golang.org/x/sys/unix" ) type command struct{} @@ -136,7 +140,9 @@ type ContainerRunner struct { MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) finalState string parentTemp string + costStartTime time.Time + keepstore *exec.Cmd keepstoreLogger io.WriteCloser keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser @@ -165,6 +171,7 @@ type ContainerRunner struct { enableMemoryLimit bool enableNetwork string // one of "default" or "always" networkMode string // "none", "host", or "" -- passed through to executor + brokenNodeHook string // script to run if node appears to be broken arvMountLog *ThrottledLogger containerWatchdogInterval time.Duration @@ -208,10 +215,9 @@ var errorBlacklist = []string{ "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*", "(?ms).*grpc: the connection is unavailable.*", } -var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)") func (runner *ContainerRunner) runBrokenNodeHook() { - if *brokenNodeHook == "" { + if runner.brokenNodeHook == "" { path := filepath.Join(lockdir, brokenfile) runner.CrunchLog.Printf("Writing %s to mark node as broken", path) f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700) @@ -221,9 +227,9 @@ func (runner *ContainerRunner) runBrokenNodeHook() { } f.Close() } else { - runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook) + runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook) // run killme script - c := exec.Command(*brokenNodeHook) + c := exec.Command(runner.brokenNodeHook) c.Stdout = runner.CrunchLog c.Stderr = runner.CrunchLog err := c.Run() @@ -418,11 +424,17 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","), fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())} - if runner.executor.Runtime() == "docker" { + if _, isdocker := runner.executor.(*dockerExecutor); isdocker { arvMountCmd = append(arvMountCmd, "--allow-other") } - if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { + if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 { + keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache") + if err != nil { + return nil, fmt.Errorf("while creating keep cache temp dir: %v", err) + } + arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk)) + } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) } @@ -452,8 +464,8 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { sort.Strings(binds) for _, bind := range binds { - mnt, ok := runner.Container.Mounts[bind] - if !ok { + mnt, notSecret := runner.Container.Mounts[bind] + if !notSecret { mnt = runner.SecretMounts[bind] } if bind == "stdout" || bind == "stderr" { @@ -522,8 +534,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } } else { src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount) - arvMountCmd = append(arvMountCmd, "--mount-tmp") - arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount)) + arvMountCmd = append(arvMountCmd, "--mount-tmp", fmt.Sprintf("tmp%d", tmpcount)) tmpcount++ } if mnt.Writable { @@ -583,9 +594,32 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("writing temp file: %v", err) } - if strings.HasPrefix(bind, runner.Container.OutputPath+"/") { + if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && (notSecret || runner.Container.Mounts[runner.Container.OutputPath].Kind != "collection") { + // In most cases, if the container + // specifies a literal file inside the + // output path, we copy it into the + // output directory (either a mounted + // collection or a staging area on the + // host fs). If it's a secret, it will + // be skipped when copying output from + // staging to Keep later. copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]}) } else { + // If a secret is outside OutputPath, + // we bind mount the secret file + // directly just like other mounts. We + // also use this strategy when a + // secret is inside OutputPath but + // OutputPath is a live collection, to + // avoid writing the secret to + // Keep. Attempting to remove a + // bind-mounted secret file from + // inside the container will return a + // "Device or resource busy" error + // that might not be handled well by + // the container, which is why we + // don't use this strategy when + // OutputPath is a staging directory. bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true} } @@ -633,6 +667,9 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("while trying to start arv-mount: %v", err) } + if runner.hoststatReporter != nil && runner.ArvMount != nil { + runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid) + } for _, p := range collectionPaths { _, err = os.Stat(p) @@ -706,6 +743,7 @@ func (runner *ContainerRunner) startHoststat() error { PollPeriod: runner.statInterval, } runner.hoststatReporter.Start() + runner.hoststatReporter.ReportPID("crunch-run", os.Getpid()) return nil } @@ -973,6 +1011,7 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st env["ARVADOS_API_TOKEN"] = tok env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST") env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE") + env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES") } workdir := runner.Container.Cwd if workdir == "." { @@ -987,6 +1026,10 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st runner.executorStdout = stdout runner.executorStderr = stderr + if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 { + nvidiaModprobe(runner.CrunchLog) + } + return runner.executor.Create(containerSpec{ Image: imageID, VCPUs: runner.Container.RuntimeConstraints.VCPUs, @@ -1052,6 +1095,26 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.ExitCode = &exitcode + extra := "" + if exitcode&0x80 != 0 { + // Convert raw exit status (0x80 + signal number) to a + // string to log after the code, like " (signal 101)" + // or " (signal 9, killed)" + sig := syscall.WaitStatus(exitcode).Signal() + if name := unix.SignalName(sig); name != "" { + extra = fmt.Sprintf(" (signal %d, %s)", sig, name) + } else { + extra = fmt.Sprintf(" (signal %d)", sig) + } + } + runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra) + err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "container": arvadosclient.Dict{"exit_code": exitcode}, + }, nil) + if err != nil { + runner.CrunchLog.Printf("ignoring error updating exit_code: %s", err) + } + var returnErr error if err = runner.executorStdin.Close(); err != nil { err = fmt.Errorf("error closing container stdin: %s", err) @@ -1118,10 +1181,9 @@ func (runner *ContainerRunner) updateLogs() { continue } - var updated arvados.Container err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ "container": arvadosclient.Dict{"log": saved.PortableDataHash}, - }, &updated) + }, nil) if err != nil { runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err) continue @@ -1399,13 +1461,17 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { if runner.LogsPDH != nil { update["log"] = *runner.LogsPDH } - if runner.finalState == "Complete" { - if runner.ExitCode != nil { - update["exit_code"] = *runner.ExitCode - } - if runner.OutputPDH != nil { - update["output"] = *runner.OutputPDH - } + if runner.ExitCode != nil { + update["exit_code"] = *runner.ExitCode + } else { + update["exit_code"] = nil + } + if runner.finalState == "Complete" && runner.OutputPDH != nil { + update["output"] = *runner.OutputPDH + } + var it arvados.InstanceType + if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 { + update["cost"] = it.Price * time.Now().Sub(runner.costStartTime).Seconds() / time.Hour.Seconds() } return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) } @@ -1434,7 +1500,12 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err // Run the full container lifecycle. func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String()) - runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime()) + runner.CrunchLog.Printf("%s", currentUserAndGroups()) + v, _ := exec.Command("arv-mount", "--version").CombinedOutput() + runner.CrunchLog.Printf("Using FUSE mount: %s", v) + runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime()) + runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID) + runner.costStartTime = time.Now() hostname, hosterr := os.Hostname() if hosterr != nil { @@ -1509,6 +1580,9 @@ func (runner *ContainerRunner) Run() (err error) { if err != nil { return } + if runner.keepstore != nil { + runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid) + } // set up FUSE mount and binds bindmounts, err = runner.SetupMounts() @@ -1679,6 +1753,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") + configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)") sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") @@ -1687,7 +1762,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`) memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container") runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity") + brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") + version := flags.Bool("version", false, "Write version information to stdout and exit 0.") ignoreDetachFlag := false if len(args) > 0 && args[0] == "-no-detach" { @@ -1703,6 +1780,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok { return code + } else if *version { + fmt.Fprintln(stdout, prog, cmd.Version.String()) + return 0 } else if !*list && flags.NArg() != 1 { fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n") return 2 @@ -1724,6 +1804,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + var keepstoreLogbuf bufThenWrite var conf ConfigData if *stdinConfig { err := json.NewDecoder(stdin).Decode(&conf) @@ -1745,6 +1826,8 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s // fill it using the container UUID prefix. conf.Cluster.ClusterID = containerUUID[:5] } + } else { + conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr)) } log.Printf("crunch-run %s started", cmd.Version.String()) @@ -1754,7 +1837,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s arvadosclient.CertFiles = []string{*caCertsPath} } - var keepstoreLogbuf bufThenWrite keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr)) if err != nil { log.Print(err) @@ -1785,6 +1867,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + cr.keepstore = keepstore if keepstore == nil { // Log explanation (if any) for why we're not running // a local keepstore. @@ -1840,24 +1923,34 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } defer cr.executor.Close() + cr.brokenNodeHook = *brokenNodeHook + gwAuthSecret := os.Getenv("GatewayAuthSecret") os.Unsetenv("GatewayAuthSecret") if gwAuthSecret == "" { // not safe to run a gateway service without an auth // secret cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)") - } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" { - // dispatcher did not tell us which external IP - // address to advertise --> no gateway service - cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)") - } else if de, ok := cr.executor.(*dockerExecutor); ok { + } else { + gwListen := os.Getenv("GatewayAddress") cr.gateway = Gateway{ - Address: gwListen, - AuthSecret: gwAuthSecret, - ContainerUUID: containerUUID, - DockerContainerID: &de.containerID, - Log: cr.CrunchLog, - ContainerIPAddress: dockerContainerIPAddress(&de.containerID), + Address: gwListen, + AuthSecret: gwAuthSecret, + ContainerUUID: containerUUID, + Target: cr.executor, + Log: cr.CrunchLog, + } + if gwListen == "" { + // Direct connection won't work, so we use the + // gateway_address field to indicate the + // internalURL of the controller process that + // has the current tunnel connection. + cr.gateway.ArvadosClient = cr.dispatcherClient + cr.gateway.UpdateTunnelURL = func(url string) { + cr.gateway.Address = "tunnel " + url + cr.DispatcherArvClient.Update("containers", containerUUID, + arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil) + } } err = cr.gateway.Start() if err != nil { @@ -1880,7 +1973,11 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.enableNetwork = *enableNetwork cr.networkMode = *networkMode if *cgroupParentSubsystem != "" { - p := findCgroup(*cgroupParentSubsystem) + p, err := findCgroup(*cgroupParentSubsystem) + if err != nil { + log.Printf("fatal: cgroup parent subsystem: %s", err) + return 1 + } cr.setCgroupParent = p cr.expectCgroupParent = p } @@ -1909,8 +2006,62 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 0 } +// Try to load ConfigData in hpc (slurm/lsf) environment. This means +// loading the cluster config from the specified file and (if that +// works) getting the runtime_constraints container field from +// controller to determine # VCPUs so we can calculate KeepBuffers. +func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData { + var conf ConfigData + conf.Cluster = loadClusterConfigFile(configFile, stderr) + if conf.Cluster == nil { + // skip loading the container record -- we won't be + // able to start local keepstore anyway. + return conf + } + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err) + return conf + } + arv.Retries = 8 + var ctr arvados.Container + err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr) + if err != nil { + fmt.Fprintf(stderr, "error getting container record: %s\n", err) + return conf + } + if ctr.RuntimeConstraints.VCPUs > 0 { + conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU + } + return conf +} + +// Load cluster config file from given path. If an error occurs, log +// the error to stderr and return nil. +func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster { + ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info")) + ldr.Path = path + cfg, err := ldr.Load() + if err != nil { + fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err) + return nil + } + cluster, err := cfg.GetCluster("") + if err != nil { + fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err) + return nil + } + fmt.Fprintf(stderr, "loaded config file %s\n", path) + return cluster +} + func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) { - if configData.Cluster == nil || configData.KeepBuffers < 1 { + if configData.KeepBuffers < 1 { + fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers) + return nil, nil + } + if configData.Cluster == nil { + fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n") return nil, nil } for uuid, vol := range configData.Cluster.Volumes { @@ -1929,7 +2080,8 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er // modify the cluster configuration that we feed it on stdin. configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers - ln, err := net.Listen("tcp", "localhost:0") + localaddr := localKeepstoreAddr() + ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0")) if err != nil { return nil, err } @@ -1939,7 +2091,7 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er return nil, err } ln.Close() - url := "http://localhost:" + port + url := "http://" + net.JoinHostPort(localaddr, port) fmt.Fprintf(logbuf, "starting keepstore on %s\n", url) @@ -2004,3 +2156,70 @@ func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, er os.Setenv("ARVADOS_KEEP_SERVICES", url) return cmd, nil } + +// return current uid, gid, groups in a format suitable for logging: +// "crunch-run process has uid=1234(arvados) gid=1234(arvados) +// groups=1234(arvados),114(fuse)" +func currentUserAndGroups() string { + u, err := user.Current() + if err != nil { + return fmt.Sprintf("error getting current user ID: %s", err) + } + s := fmt.Sprintf("crunch-run process has uid=%s(%s) gid=%s", u.Uid, u.Username, u.Gid) + if g, err := user.LookupGroupId(u.Gid); err == nil { + s += fmt.Sprintf("(%s)", g.Name) + } + s += " groups=" + if gids, err := u.GroupIds(); err == nil { + for i, gid := range gids { + if i > 0 { + s += "," + } + s += gid + if g, err := user.LookupGroupId(gid); err == nil { + s += fmt.Sprintf("(%s)", g.Name) + } + } + } + return s +} + +// Return a suitable local interface address for a local keepstore +// service. Currently this is the numerically lowest non-loopback ipv4 +// address assigned to a local interface that is not in any of the +// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8. +func localKeepstoreAddr() string { + var ips []net.IP + // Ignore error (proceed with zero IPs) + addrs, _ := processIPs(os.Getpid()) + for addr := range addrs { + ip := net.ParseIP(addr) + if ip == nil { + // invalid + continue + } + if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) || + ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) || + ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) { + // unsuitable + continue + } + ips = append(ips, ip) + } + if len(ips) == 0 { + return "0.0.0.0" + } + sort.Slice(ips, func(ii, jj int) bool { + i, j := ips[ii], ips[jj] + if len(i) != len(j) { + return len(i) < len(j) + } + for x := range i { + if i[x] != j[x] { + return i[x] < j[x] + } + } + return false + }) + return ips[0].String() +}