X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/97d0e12a78e9245c8ed29c070ffe0399e5cd6cb4..31779a06b28e21a9409ec7c6310f0871b65d13ff:/lib/crunchrun/crunchrun.go diff --git a/lib/crunchrun/crunchrun.go b/lib/crunchrun/crunchrun.go index 23fbc430b4..9add35335f 100644 --- a/lib/crunchrun/crunchrun.go +++ b/lib/crunchrun/crunchrun.go @@ -6,16 +6,21 @@ package crunchrun import ( "bytes" + "context" "encoding/json" "errors" "flag" "fmt" "io" + "io/fs" "io/ioutil" "log" + "net" + "net/http" "os" "os/exec" "os/signal" + "os/user" "path" "path/filepath" "regexp" @@ -27,19 +32,31 @@ import ( "syscall" "time" + "git.arvados.org/arvados.git/lib/cloud" "git.arvados.org/arvados.git/lib/cmd" + "git.arvados.org/arvados.git/lib/config" "git.arvados.org/arvados.git/lib/crunchstat" "git.arvados.org/arvados.git/sdk/go/arvados" "git.arvados.org/arvados.git/sdk/go/arvadosclient" + "git.arvados.org/arvados.git/sdk/go/ctxlog" "git.arvados.org/arvados.git/sdk/go/keepclient" "git.arvados.org/arvados.git/sdk/go/manifest" - "golang.org/x/net/context" + "golang.org/x/sys/unix" ) type command struct{} var Command = command{} +// ConfigData contains environment variables and (when needed) cluster +// configuration, passed from dispatchcloud to crunch-run on stdin. +type ConfigData struct { + Env map[string]string + KeepBuffers int + EC2SpotCheck bool + Cluster *arvados.Cluster +} + // IArvadosClient is the minimal Arvados API methods used by crunch-run. type IArvadosClient interface { Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error @@ -66,7 +83,7 @@ type IKeepClient interface { // NewLogWriter is a factory function to create a new log writer. type NewLogWriter func(name string) (io.WriteCloser, error) -type RunArvMount func(args []string, tok string) (*exec.Cmd, error) +type RunArvMount func(cmdline []string, tok string) (*exec.Cmd, error) type MkTempDir func(string, string) (string, error) @@ -77,7 +94,10 @@ type PsProcess interface { // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { - executor containerExecutor + executor containerExecutor + executorStdin io.Closer + executorStdout io.Closer + executorStderr io.Closer // Dispatcher client is initialized with the Dispatcher token. // This is a privileged token used to manage container status @@ -106,8 +126,6 @@ type ContainerRunner struct { ExitCode *int NewLogWriter NewLogWriter CrunchLog *ThrottledLogger - Stdout io.WriteCloser - Stderr io.WriteCloser logUUID string logMtx sync.Mutex LogCollection arvados.CollectionFileSystem @@ -125,26 +143,22 @@ type ContainerRunner struct { MkArvClient func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) finalState string parentTemp string + costStartTime time.Time + keepstore *exec.Cmd + keepstoreLogger io.WriteCloser + keepstoreLogbuf *bufThenWrite statLogger io.WriteCloser statReporter *crunchstat.Reporter hoststatLogger io.WriteCloser hoststatReporter *crunchstat.Reporter statInterval time.Duration - cgroupRoot string - // What we expect the container's cgroup parent to be. - expectCgroupParent string // What we tell docker to use as the container's cgroup - // parent. Note: Ideally we would use the same field for both - // expectCgroupParent and setCgroupParent, and just make it - // default to "docker". However, when using docker < 1.10 with - // systemd, specifying a non-empty cgroup parent (even the - // default value "docker") hits a docker bug - // (https://github.com/docker/docker/issues/17126). Using two - // separate fields makes it possible to use the "expect cgroup - // parent to be X" feature even on sites where the "specify - // cgroup parent" feature breaks. + // parent. setCgroupParent string + // Fake root dir where crunchstat.Reporter should read OS + // files, for testing. + crunchstatFakeFS fs.FS cStateLock sync.Mutex cCancelled bool // StopContainer() invoked @@ -152,11 +166,15 @@ type ContainerRunner struct { enableMemoryLimit bool enableNetwork string // one of "default" or "always" networkMode string // "none", "host", or "" -- passed through to executor + brokenNodeHook string // script to run if node appears to be broken arvMountLog *ThrottledLogger containerWatchdogInterval time.Duration gateway Gateway + + prices []cloud.InstancePrice + pricesLock sync.Mutex } // setupSignals sets up signal handling to gracefully terminate the @@ -195,10 +213,9 @@ var errorBlacklist = []string{ "(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*", "(?ms).*grpc: the connection is unavailable.*", } -var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)") func (runner *ContainerRunner) runBrokenNodeHook() { - if *brokenNodeHook == "" { + if runner.brokenNodeHook == "" { path := filepath.Join(lockdir, brokenfile) runner.CrunchLog.Printf("Writing %s to mark node as broken", path) f, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR, 0700) @@ -208,9 +225,9 @@ func (runner *ContainerRunner) runBrokenNodeHook() { } f.Close() } else { - runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook) + runner.CrunchLog.Printf("Running broken node hook %q", runner.brokenNodeHook) // run killme script - c := exec.Command(*brokenNodeHook) + c := exec.Command(runner.brokenNodeHook) c.Stdout = runner.CrunchLog c.Stderr = runner.CrunchLog err := c.Run() @@ -259,23 +276,21 @@ func (runner *ContainerRunner) LoadImage() (string, error) { return "", fmt.Errorf("cannot choose from multiple tar files in image collection: %v", tarfiles) } imageID := tarfiles[0][:len(tarfiles[0])-4] - imageFile := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + tarfiles[0] + imageTarballPath := runner.ArvMountPoint + "/by_id/" + runner.Container.ContainerImage + "/" + imageID + ".tar" runner.CrunchLog.Printf("Using Docker image id %q", imageID) - if !runner.executor.ImageLoaded(imageID) { - runner.CrunchLog.Print("Loading Docker image from keep") - err = runner.executor.LoadImage(imageFile) - if err != nil { - return "", err - } - } else { - runner.CrunchLog.Print("Docker image is available") + runner.CrunchLog.Print("Loading Docker image from keep") + err = runner.executor.LoadImage(imageID, imageTarballPath, runner.Container, runner.ArvMountPoint, + runner.containerClient) + if err != nil { + return "", err } + return imageID, nil } -func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (c *exec.Cmd, err error) { - c = exec.Command("arv-mount", arvMountCmd...) +func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *exec.Cmd, err error) { + c = exec.Command(cmdline[0], cmdline[1:]...) // Copy our environment, but override ARVADOS_API_TOKEN with // the container auth token. @@ -292,8 +307,21 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( return nil, err } runner.arvMountLog = NewThrottledLogger(w) + scanner := logScanner{ + Patterns: []string{ + "Keep write error", + "Block not found error", + "Unhandled exception during FUSE operation", + }, + ReportFunc: func(pattern, text string) { + runner.updateRuntimeStatus(arvadosclient.Dict{ + "warning": "arv-mount: " + pattern, + "warningDetail": text, + }) + }, + } c.Stdout = runner.arvMountLog - c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr) + c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner) runner.CrunchLog.Printf("Running %v", c.Args) @@ -393,14 +421,24 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { pdhOnly := true tmpcount := 0 arvMountCmd := []string{ + "arv-mount", "--foreground", - "--allow-other", "--read-write", "--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","), fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())} - if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { - arvMountCmd = append(arvMountCmd, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) + if _, isdocker := runner.executor.(*dockerExecutor); isdocker { + arvMountCmd = append(arvMountCmd, "--allow-other") + } + + if runner.Container.RuntimeConstraints.KeepCacheDisk > 0 { + keepcachedir, err := runner.MkTempDir(runner.parentTemp, "keepcache") + if err != nil { + return nil, fmt.Errorf("while creating keep cache temp dir: %v", err) + } + arvMountCmd = append(arvMountCmd, "--disk-cache", "--disk-cache-dir", keepcachedir, "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheDisk)) + } else if runner.Container.RuntimeConstraints.KeepCacheRAM > 0 { + arvMountCmd = append(arvMountCmd, "--ram-cache", "--file-cache", fmt.Sprintf("%d", runner.Container.RuntimeConstraints.KeepCacheRAM)) } collectionPaths := []string{} @@ -429,8 +467,8 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { sort.Strings(binds) for _, bind := range binds { - mnt, ok := runner.Container.Mounts[bind] - if !ok { + mnt, notSecret := runner.Container.Mounts[bind] + if !notSecret { mnt = runner.SecretMounts[bind] } if bind == "stdout" || bind == "stderr" { @@ -499,8 +537,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } } else { src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount) - arvMountCmd = append(arvMountCmd, "--mount-tmp") - arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount)) + arvMountCmd = append(arvMountCmd, "--mount-tmp", fmt.Sprintf("tmp%d", tmpcount)) tmpcount++ } if mnt.Writable { @@ -560,9 +597,32 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("writing temp file: %v", err) } - if strings.HasPrefix(bind, runner.Container.OutputPath+"/") { + if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && (notSecret || runner.Container.Mounts[runner.Container.OutputPath].Kind != "collection") { + // In most cases, if the container + // specifies a literal file inside the + // output path, we copy it into the + // output directory (either a mounted + // collection or a staging area on the + // host fs). If it's a secret, it will + // be skipped when copying output from + // staging to Keep later. copyFiles = append(copyFiles, copyFile{tmpfn, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]}) } else { + // If a secret is outside OutputPath, + // we bind mount the secret file + // directly just like other mounts. We + // also use this strategy when a + // secret is inside OutputPath but + // OutputPath is a live collection, to + // avoid writing the secret to + // Keep. Attempting to remove a + // bind-mounted secret file from + // inside the container will return a + // "Device or resource busy" error + // that might not be handled well by + // the container, which is why we + // don't use this strategy when + // OutputPath is a staging directory. bindmounts[bind] = bindmount{HostPath: tmpfn, ReadOnly: true} } @@ -571,7 +631,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { if err != nil { return nil, fmt.Errorf("creating temp dir: %v", err) } - err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token) + err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token) if err != nil { return nil, err } @@ -594,16 +654,25 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) { } if pdhOnly { - arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id") + // If we are only mounting collections by pdh, make + // sure we don't subscribe to websocket events to + // avoid putting undesired load on the API server + arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id", "--disable-event-listening") } else { arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id") } + // the by_uuid mount point is used by singularity when writing + // out docker images converted to SIF + arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_uuid") arvMountCmd = append(arvMountCmd, runner.ArvMountPoint) runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token) if err != nil { return nil, fmt.Errorf("while trying to start arv-mount: %v", err) } + if runner.hoststatReporter != nil && runner.ArvMount != nil { + runner.hoststatReporter.ReportPID("arv-mount", runner.ArvMount.Process.Pid) + } for _, p := range collectionPaths { _, err = os.Stat(p) @@ -658,6 +727,7 @@ func (runner *ContainerRunner) stopHoststat() error { return nil } runner.hoststatReporter.Stop() + runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog) err := runner.hoststatLogger.Close() if err != nil { return fmt.Errorf("error closing hoststat logs: %v", err) @@ -672,11 +742,20 @@ func (runner *ContainerRunner) startHoststat() error { } runner.hoststatLogger = NewThrottledLogger(w) runner.hoststatReporter = &crunchstat.Reporter{ - Logger: log.New(runner.hoststatLogger, "", 0), - CgroupRoot: runner.cgroupRoot, + Logger: log.New(runner.hoststatLogger, "", 0), + // Our own cgroup is the "host" cgroup, in the sense + // that it accounts for resource usage outside the + // container. It doesn't count _all_ resource usage on + // the system. + // + // TODO?: Use the furthest ancestor of our own cgroup + // that has stats available. (Currently crunchstat + // does not have that capability.) + Pid: os.Getpid, PollPeriod: runner.statInterval, } runner.hoststatReporter.Start() + runner.hoststatReporter.ReportPID("crunch-run", os.Getpid()) return nil } @@ -687,12 +766,15 @@ func (runner *ContainerRunner) startCrunchstat() error { } runner.statLogger = NewThrottledLogger(w) runner.statReporter = &crunchstat.Reporter{ - CID: runner.executor.CgroupID(), - Logger: log.New(runner.statLogger, "", 0), - CgroupParent: runner.expectCgroupParent, - CgroupRoot: runner.cgroupRoot, - PollPeriod: runner.statInterval, - TempDir: runner.parentTemp, + Pid: runner.executor.Pid, + FS: runner.crunchstatFakeFS, + Logger: log.New(runner.statLogger, "", 0), + MemThresholds: map[string][]crunchstat.Threshold{ + "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}), + }, + PollPeriod: runner.statInterval, + TempDir: runner.parentTemp, + ThresholdLogger: runner.CrunchLog, } runner.statReporter.Start() return nil @@ -877,7 +959,7 @@ func (runner *ContainerRunner) getStdoutFile(mntPath string) (*os.File, error) { // CreateContainer creates the docker container. func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error { - var stdin io.ReadCloser + var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) if mnt, ok := runner.Container.Mounts["stdin"]; ok { switch mnt.Kind { case "collection": @@ -944,6 +1026,7 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st env["ARVADOS_API_TOKEN"] = tok env["ARVADOS_API_HOST"] = os.Getenv("ARVADOS_API_HOST") env["ARVADOS_API_HOST_INSECURE"] = os.Getenv("ARVADOS_API_HOST_INSECURE") + env["ARVADOS_KEEP_SERVICES"] = os.Getenv("ARVADOS_KEEP_SERVICES") } workdir := runner.Container.Cwd if workdir == "." { @@ -954,20 +1037,29 @@ func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[st if !runner.enableMemoryLimit { ram = 0 } + runner.executorStdin = stdin + runner.executorStdout = stdout + runner.executorStderr = stderr + + if runner.Container.RuntimeConstraints.CUDA.DeviceCount > 0 { + nvidiaModprobe(runner.CrunchLog) + } + return runner.executor.Create(containerSpec{ - Image: imageID, - VCPUs: runner.Container.RuntimeConstraints.VCPUs, - RAM: ram, - WorkingDir: workdir, - Env: env, - BindMounts: bindmounts, - Command: runner.Container.Command, - EnableNetwork: enableNetwork, - NetworkMode: runner.networkMode, - CgroupParent: runner.setCgroupParent, - Stdin: stdin, - Stdout: stdout, - Stderr: stderr, + Image: imageID, + VCPUs: runner.Container.RuntimeConstraints.VCPUs, + RAM: ram, + WorkingDir: workdir, + Env: env, + BindMounts: bindmounts, + Command: runner.Container.Command, + EnableNetwork: enableNetwork, + CUDADeviceCount: runner.Container.RuntimeConstraints.CUDA.DeviceCount, + NetworkMode: runner.networkMode, + CgroupParent: runner.setCgroupParent, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, }) } @@ -1018,14 +1110,59 @@ func (runner *ContainerRunner) WaitFinish() error { } runner.ExitCode = &exitcode + extra := "" + if exitcode&0x80 != 0 { + // Convert raw exit status (0x80 + signal number) to a + // string to log after the code, like " (signal 101)" + // or " (signal 9, killed)" + sig := syscall.WaitStatus(exitcode).Signal() + if name := unix.SignalName(sig); name != "" { + extra = fmt.Sprintf(" (signal %d, %s)", sig, name) + } else { + extra = fmt.Sprintf(" (signal %d)", sig) + } + } + runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra) + err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{"exit_code": exitcode}, + }, nil) + if err != nil { + runner.CrunchLog.Printf("ignoring error updating exit_code: %s", err) + } + + var returnErr error + if err = runner.executorStdin.Close(); err != nil { + err = fmt.Errorf("error closing container stdin: %s", err) + runner.CrunchLog.Printf("%s", err) + returnErr = err + } + if err = runner.executorStdout.Close(); err != nil { + err = fmt.Errorf("error closing container stdout: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } + if err = runner.executorStderr.Close(); err != nil { + err = fmt.Errorf("error closing container stderr: %s", err) + runner.CrunchLog.Printf("%s", err) + if returnErr == nil { + returnErr = err + } + } + if runner.statReporter != nil { runner.statReporter.Stop() + runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{ + "rss": runner.Container.RuntimeConstraints.RAM, + }) err = runner.statLogger.Close() if err != nil { runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) } } - return nil + return returnErr } func (runner *ContainerRunner) updateLogs() { @@ -1063,10 +1200,12 @@ func (runner *ContainerRunner) updateLogs() { continue } - var updated arvados.Container err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ - "container": arvadosclient.Dict{"log": saved.PortableDataHash}, - }, &updated) + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "log": saved.PortableDataHash, + }, + }, nil) if err != nil { runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err) continue @@ -1076,6 +1215,121 @@ func (runner *ContainerRunner) updateLogs() { } } +var spotInterruptionCheckInterval = 5 * time.Second +var ec2MetadataBaseURL = "http://169.254.169.254" + +const ec2TokenTTL = time.Second * 21600 + +func (runner *ContainerRunner) checkSpotInterruptionNotices() { + type ec2metadata struct { + Action string `json:"action"` + Time time.Time `json:"time"` + } + runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL) + var metadata ec2metadata + var token string + var tokenExp time.Time + check := func() error { + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute)) + defer cancel() + if token == "" || tokenExp.Sub(time.Now()) < time.Minute { + req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil) + if err != nil { + return err + } + req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second))) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s", resp.Status) + } + newtoken, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + token = strings.TrimSpace(string(newtoken)) + tokenExp = time.Now().Add(ec2TokenTTL) + } + req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil) + if err != nil { + return err + } + req.Header.Set("X-aws-ec2-metadata-token", token) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + metadata = ec2metadata{} + switch resp.StatusCode { + case http.StatusOK: + break + case http.StatusNotFound: + // "If Amazon EC2 is not preparing to stop or + // terminate the instance, or if you + // terminated the instance yourself, + // instance-action is not present in the + // instance metadata and you receive an HTTP + // 404 error when you try to retrieve it." + return nil + case http.StatusUnauthorized: + token = "" + return fmt.Errorf("%s", resp.Status) + default: + return fmt.Errorf("%s", resp.Status) + } + err = json.NewDecoder(resp.Body).Decode(&metadata) + if err != nil { + return err + } + return nil + } + failures := 0 + var lastmetadata ec2metadata + for range time.NewTicker(spotInterruptionCheckInterval).C { + err := check() + if err != nil { + runner.CrunchLog.Printf("Error checking spot interruptions: %s", err) + failures++ + if failures > 5 { + runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures") + return + } + continue + } + failures = 0 + if metadata != lastmetadata { + lastmetadata = metadata + text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339)) + runner.CrunchLog.Printf("%s", text) + runner.updateRuntimeStatus(arvadosclient.Dict{ + "warning": "preemption notice", + "warningDetail": text, + "preemptionNotice": text, + }) + if proc, err := os.FindProcess(os.Getpid()); err == nil { + // trigger updateLogs + proc.Signal(syscall.SIGUSR1) + } + } + } +} + +func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) { + err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "runtime_status": status, + }, + }, nil) + if err != nil { + runner.CrunchLog.Printf("error updating container runtime_status: %s", err) + } +} + // CaptureOutput saves data from the container's output directory if // needed, and updates the container output accordingly. func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) error { @@ -1083,7 +1337,9 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er // Output may have been set directly by the container, so // refresh the container record to check. err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID, - nil, &runner.Container) + arvadosclient.Dict{ + "select": []string{"output"}, + }, &runner.Container) if err != nil { return err } @@ -1096,7 +1352,6 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er txt, err := (&copier{ client: runner.containerClient, - arvClient: runner.ContainerArvClient, keepClient: runner.ContainerKeepClient, hostOutputDir: runner.HostOutputDir, ctrOutputDir: runner.Container.OutputPath, @@ -1122,6 +1377,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er var resp arvados.Collection err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{ "ensure_unique_name": true, + "select": []string{"portable_data_hash"}, "collection": arvadosclient.Dict{ "is_trashed": true, "name": "output for " + runner.Container.UUID, @@ -1146,6 +1402,7 @@ func (runner *ContainerRunner) CleanupDirs() { if umnterr != nil { runner.CrunchLog.Printf("Error unmounting: %v", umnterr) + runner.ArvMount.Process.Kill() } else { // If arv-mount --unmount gets stuck for any reason, we // don't want to wait for it forever. Do Wait() in a goroutine @@ -1176,12 +1433,14 @@ func (runner *ContainerRunner) CleanupDirs() { } } } + runner.ArvMount = nil } if runner.ArvMountPoint != "" { if rmerr := os.Remove(runner.ArvMountPoint); rmerr != nil { runner.CrunchLog.Printf("While cleaning up arv-mount directory %s: %v", runner.ArvMountPoint, rmerr) } + runner.ArvMountPoint = "" } if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil { @@ -1216,6 +1475,16 @@ func (runner *ContainerRunner) CommitLogs() error { runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0) }() + if runner.keepstoreLogger != nil { + // Flush any buffered logs from our local keepstore + // process. Discard anything logged after this point + // -- it won't end up in the log collection, so + // there's no point writing it to the collectionfs. + runner.keepstoreLogbuf.SetWriter(io.Discard) + runner.keepstoreLogger.Close() + runner.keepstoreLogger = nil + } + if runner.LogsPDH != nil { // If we have already assigned something to LogsPDH, // we must be closing the re-opened log, which won't @@ -1224,6 +1493,7 @@ func (runner *ContainerRunner) CommitLogs() error { // -- it exists only to send logs to other channels. return nil } + saved, err := runner.saveLogCollection(true) if err != nil { return fmt.Errorf("error saving log collection: %s", err) @@ -1234,6 +1504,8 @@ func (runner *ContainerRunner) CommitLogs() error { return nil } +// Create/update the log collection. Return value has UUID and +// PortableDataHash fields populated, but others may be blank. func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) { runner.logMtx.Lock() defer runner.logMtx.Unlock() @@ -1258,11 +1530,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C if final { updates["is_trashed"] = true } else { - exp := time.Now().Add(crunchLogUpdatePeriod * 24) + // We set trash_at so this collection gets + // automatically cleaned up eventually. It used to be + // 12 hours but we had a situation where the API + // server was down over a weekend but the containers + // kept running such that the log collection got + // trashed, so now we make it 2 weeks. refs #20378 + exp := time.Now().Add(time.Duration(24*14) * time.Hour) updates["trash_at"] = exp updates["delete_at"] = exp } - reqBody := arvadosclient.Dict{"collection": updates} + reqBody := arvadosclient.Dict{ + "select": []string{"uuid", "portable_data_hash"}, + "collection": updates, + } var err2 error if runner.logUUID == "" { reqBody["ensure_unique_name"] = true @@ -1281,14 +1562,28 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C } // UpdateContainerRunning updates the container state to "Running" -func (runner *ContainerRunner) UpdateContainerRunning() error { +func (runner *ContainerRunner) UpdateContainerRunning(logId string) error { runner.cStateLock.Lock() defer runner.cStateLock.Unlock() if runner.cCancelled { return ErrCancelled } - return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, - arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil) + updates := arvadosclient.Dict{ + "gateway_address": runner.gateway.Address, + "state": "Running", + } + if logId != "" { + updates["log"] = logId + } + return runner.DispatcherArvClient.Update( + "containers", + runner.Container.UUID, + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": updates, + }, + nil, + ) } // ContainerToken returns the api_token the container (and any @@ -1315,15 +1610,19 @@ func (runner *ContainerRunner) UpdateContainerFinal() error { if runner.LogsPDH != nil { update["log"] = *runner.LogsPDH } - if runner.finalState == "Complete" { - if runner.ExitCode != nil { - update["exit_code"] = *runner.ExitCode - } - if runner.OutputPDH != nil { - update["output"] = *runner.OutputPDH - } + if runner.ExitCode != nil { + update["exit_code"] = *runner.ExitCode + } else { + update["exit_code"] = nil + } + if runner.finalState == "Complete" && runner.OutputPDH != nil { + update["output"] = *runner.OutputPDH } - return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil) + update["cost"] = runner.calculateCost(time.Now()) + return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": update, + }, nil) } // IsCancelled returns the value of Cancelled, with goroutine safety. @@ -1350,7 +1649,12 @@ func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, err // Run the full container lifecycle. func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String()) - runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID) + runner.CrunchLog.Printf("%s", currentUserAndGroups()) + v, _ := exec.Command("arv-mount", "--version").CombinedOutput() + runner.CrunchLog.Printf("Using FUSE mount: %s", v) + runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime()) + runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID) + runner.costStartTime = time.Now() hostname, hosterr := os.Hostname() if hosterr != nil { @@ -1359,6 +1663,12 @@ func (runner *ContainerRunner) Run() (err error) { runner.CrunchLog.Printf("Executing on host '%s'", hostname) } + sigusr2 := make(chan os.Signal, 1) + signal.Notify(sigusr2, syscall.SIGUSR2) + defer signal.Stop(sigusr2) + runner.loadPrices() + go runner.handleSIGUSR2(sigusr2) + runner.finalState = "Queued" defer func() { @@ -1416,6 +1726,7 @@ func (runner *ContainerRunner) Run() (err error) { } checkErr("stopHoststat", runner.stopHoststat()) checkErr("CommitLogs", runner.CommitLogs()) + runner.CleanupDirs() checkErr("UpdateContainerFinal", runner.UpdateContainerFinal()) }() @@ -1424,6 +1735,9 @@ func (runner *ContainerRunner) Run() (err error) { if err != nil { return } + if runner.keepstore != nil { + runner.hoststatReporter.ReportPID("keepstore", runner.keepstore.Process.Pid) + } // set up FUSE mount and binds bindmounts, err = runner.SetupMounts() @@ -1466,7 +1780,14 @@ func (runner *ContainerRunner) Run() (err error) { return } - err = runner.UpdateContainerRunning() + logCollection, err := runner.saveLogCollection(false) + var logId string + if err == nil { + logId = logCollection.PortableDataHash + } else { + runner.CrunchLog.Printf("Error committing initial log collection: %v", err) + } + err = runner.UpdateContainerRunning(logId) if err != nil { return } @@ -1585,23 +1906,27 @@ func NewContainerRunner(dispatcherClient *arvados.Client, } func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int { + log := log.New(stderr, "", 0) flags := flag.NewFlagSet(prog, flag.ContinueOnError) statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting") - cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree") - cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)") - cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container") + flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree (obsolete, ignored)") + flags.String("cgroup-parent", "docker", "name of container's parent cgroup (obsolete, ignored)") + cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given `subsystem` as parent cgroup for container (subsystem argument is only relevant for cgroups v1; in cgroups v2 / unified mode, any non-empty value means use current cgroup)") caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates") detach := flags.Bool("detach", false, "Detach from parent process and run in the background") - stdinEnv := flags.Bool("stdin-env", false, "Load environment variables from JSON message on stdin") + stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin") + configFile := flags.String("config", arvados.DefaultConfigFile, "filename of cluster config file to try loading if -stdin-config=false (default is $ARVADOS_CONFIG)") sleep := flags.Duration("sleep", 0, "Delay before starting (testing use only)") kill := flags.Int("kill", -1, "Send signal to an existing crunch-run process for given UUID") - list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes") + list := flags.Bool("list", false, "List UUIDs of existing crunch-run processes (and notify them to use price data passed on stdin)") enableMemoryLimit := flags.Bool("enable-memory-limit", true, "tell container runtime to limit container's memory usage") enableNetwork := flags.String("container-enable-networking", "default", "enable networking \"always\" (for all containers) or \"default\" (for containers that request it)") networkMode := flags.String("container-network-mode", "default", `Docker network mode for container (use any argument valid for docker --net)`) memprofile := flags.String("memprofile", "", "write memory profile to `file` after running container") runtimeEngine := flags.String("runtime-engine", "docker", "container runtime: docker or singularity") + brokenNodeHook := flags.String("broken-node-hook", "", "script to run if node is detected to be broken (for example, Docker daemon is not running)") flags.Duration("check-containerd", 0, "Ignored. Exists for compatibility with older versions.") + version := flags.Bool("version", false, "Write version information to stdout and exit 0.") ignoreDetachFlag := false if len(args) > 0 && args[0] == "-no-detach" { @@ -1615,40 +1940,58 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s ignoreDetachFlag = true } - if err := flags.Parse(args); err == flag.ErrHelp { + if ok, code := cmd.ParseFlags(flags, prog, args, "container-uuid", stderr); !ok { + return code + } else if *version { + fmt.Fprintln(stdout, prog, cmd.Version.String()) return 0 - } else if err != nil { - log.Print(err) - return 1 - } - - if *stdinEnv && !ignoreDetachFlag { - // Load env vars on stdin if asked (but not in a - // detached child process, in which case stdin is - // /dev/null). - err := loadEnv(os.Stdin) - if err != nil { - log.Print(err) - return 1 - } + } else if !*list && flags.NArg() != 1 { + fmt.Fprintf(stderr, "missing required argument: container-uuid (try -help)\n") + return 2 } containerUUID := flags.Arg(0) switch { case *detach && !ignoreDetachFlag: - return Detach(containerUUID, prog, args, os.Stdout, os.Stderr) + return Detach(containerUUID, prog, args, stdin, stdout, stderr) case *kill >= 0: - return KillProcess(containerUUID, syscall.Signal(*kill), os.Stdout, os.Stderr) + return KillProcess(containerUUID, syscall.Signal(*kill), stdout, stderr) case *list: - return ListProcesses(os.Stdout, os.Stderr) + return ListProcesses(stdin, stdout, stderr) } - if containerUUID == "" { + if len(containerUUID) != 27 { log.Printf("usage: %s [options] UUID", prog) return 1 } + var keepstoreLogbuf bufThenWrite + var conf ConfigData + if *stdinConfig { + err := json.NewDecoder(stdin).Decode(&conf) + if err != nil { + log.Printf("decode stdin: %s", err) + return 1 + } + for k, v := range conf.Env { + err = os.Setenv(k, v) + if err != nil { + log.Printf("setenv(%q): %s", k, err) + return 1 + } + } + if conf.Cluster != nil { + // ClusterID is missing from the JSON + // representation, but we need it to generate + // a valid config file for keepstore, so we + // fill it using the container UUID prefix. + conf.Cluster.ClusterID = containerUUID[:5] + } + } else { + conf = hpcConfData(containerUUID, *configFile, io.MultiWriter(&keepstoreLogbuf, stderr)) + } + log.Printf("crunch-run %s started", cmd.Version.String()) time.Sleep(*sleep) @@ -1656,16 +1999,27 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s arvadosclient.CertFiles = []string{*caCertsPath} } + keepstore, err := startLocalKeepstore(conf, io.MultiWriter(&keepstoreLogbuf, stderr)) + if err != nil { + log.Print(err) + return 1 + } + if keepstore != nil { + defer keepstore.Process.Kill() + } + api, err := arvadosclient.MakeArvadosClient() if err != nil { log.Printf("%s: %v", containerUUID, err) return 1 } - api.Retries = 8 + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + api.Retries = 10 - kc, kcerr := keepclient.MakeKeepClient(api) - if kcerr != nil { - log.Printf("%s: %v", containerUUID, kcerr) + kc, err := keepclient.MakeKeepClient(api) + if err != nil { + log.Printf("%s: %v", containerUUID, err) return 1 } kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2} @@ -1677,6 +2031,44 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 1 } + cr.keepstore = keepstore + if keepstore == nil { + // Log explanation (if any) for why we're not running + // a local keepstore. + var buf bytes.Buffer + keepstoreLogbuf.SetWriter(&buf) + if buf.Len() > 0 { + cr.CrunchLog.Printf("%s", strings.TrimSpace(buf.String())) + } + } else if logWhat := conf.Cluster.Containers.LocalKeepLogsToContainerLog; logWhat == "none" { + cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) + keepstoreLogbuf.SetWriter(io.Discard) + } else { + cr.CrunchLog.Printf("using local keepstore process (pid %d) at %s, writing logs to keepstore.txt in log collection", keepstore.Process.Pid, os.Getenv("ARVADOS_KEEP_SERVICES")) + logwriter, err := cr.NewLogWriter("keepstore") + if err != nil { + log.Print(err) + return 1 + } + cr.keepstoreLogger = NewThrottledLogger(logwriter) + + var writer io.WriteCloser = cr.keepstoreLogger + if logWhat == "errors" { + writer = &filterKeepstoreErrorsOnly{WriteCloser: writer} + } else if logWhat != "all" { + // should have been caught earlier by + // dispatcher's config loader + log.Printf("invalid value for Containers.LocalKeepLogsToContainerLog: %q", logWhat) + return 1 + } + err = keepstoreLogbuf.SetWriter(writer) + if err != nil { + log.Print(err) + return 1 + } + cr.keepstoreLogbuf = &keepstoreLogbuf + } + switch *runtimeEngine { case "docker": cr.executor, err = newDockerExecutor(containerUUID, cr.CrunchLog.Printf, cr.containerWatchdogInterval) @@ -1695,24 +2087,38 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s } defer cr.executor.Close() + cr.brokenNodeHook = *brokenNodeHook + gwAuthSecret := os.Getenv("GatewayAuthSecret") os.Unsetenv("GatewayAuthSecret") if gwAuthSecret == "" { // not safe to run a gateway service without an auth // secret cr.CrunchLog.Printf("Not starting a gateway server (GatewayAuthSecret was not provided by dispatcher)") - } else if gwListen := os.Getenv("GatewayAddress"); gwListen == "" { - // dispatcher did not tell us which external IP - // address to advertise --> no gateway service - cr.CrunchLog.Printf("Not starting a gateway server (GatewayAddress was not provided by dispatcher)") - } else if de, ok := cr.executor.(*dockerExecutor); ok { + } else { + gwListen := os.Getenv("GatewayAddress") cr.gateway = Gateway{ - Address: gwListen, - AuthSecret: gwAuthSecret, - ContainerUUID: containerUUID, - DockerContainerID: &de.containerID, - Log: cr.CrunchLog, - ContainerIPAddress: dockerContainerIPAddress(&de.containerID), + Address: gwListen, + AuthSecret: gwAuthSecret, + ContainerUUID: containerUUID, + Target: cr.executor, + Log: cr.CrunchLog, + LogCollection: cr.LogCollection, + } + if gwListen == "" { + // Direct connection won't work, so we use the + // gateway_address field to indicate the + // internalURL of the controller process that + // has the current tunnel connection. + cr.gateway.ArvadosClient = cr.dispatcherClient + cr.gateway.UpdateTunnelURL = func(url string) { + cr.gateway.Address = "tunnel " + url + cr.DispatcherArvClient.Update("containers", containerUUID, + arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}, + }, nil) + } } err = cr.gateway.Start() if err != nil { @@ -1729,15 +2135,20 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s cr.parentTemp = parentTemp cr.statInterval = *statInterval - cr.cgroupRoot = *cgroupRoot - cr.expectCgroupParent = *cgroupParent cr.enableMemoryLimit = *enableMemoryLimit cr.enableNetwork = *enableNetwork cr.networkMode = *networkMode if *cgroupParentSubsystem != "" { - p := findCgroup(*cgroupParentSubsystem) + p, err := findCgroup(os.DirFS("/"), *cgroupParentSubsystem) + if err != nil { + log.Printf("fatal: cgroup parent subsystem: %s", err) + return 1 + } cr.setCgroupParent = p - cr.expectCgroupParent = p + } + + if conf.EC2SpotCheck { + go cr.checkSpotInterruptionNotices() } runerr := cr.Run() @@ -1764,21 +2175,319 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s return 0 } -func loadEnv(rdr io.Reader) error { - buf, err := ioutil.ReadAll(rdr) +// Try to load ConfigData in hpc (slurm/lsf) environment. This means +// loading the cluster config from the specified file and (if that +// works) getting the runtime_constraints container field from +// controller to determine # VCPUs so we can calculate KeepBuffers. +func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData { + var conf ConfigData + conf.Cluster = loadClusterConfigFile(configFile, stderr) + if conf.Cluster == nil { + // skip loading the container record -- we won't be + // able to start local keepstore anyway. + return conf + } + arv, err := arvadosclient.MakeArvadosClient() + if err != nil { + fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err) + return conf + } + // arvadosclient now interprets Retries=10 to mean + // Timeout=10m, retrying with exponential backoff + jitter. + arv.Retries = 10 + var ctr arvados.Container + err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr) + if err != nil { + fmt.Fprintf(stderr, "error getting container record: %s\n", err) + return conf + } + if ctr.RuntimeConstraints.VCPUs > 0 { + conf.KeepBuffers = ctr.RuntimeConstraints.VCPUs * conf.Cluster.Containers.LocalKeepBlobBuffersPerVCPU + } + return conf +} + +// Load cluster config file from given path. If an error occurs, log +// the error to stderr and return nil. +func loadClusterConfigFile(path string, stderr io.Writer) *arvados.Cluster { + ldr := config.NewLoader(&bytes.Buffer{}, ctxlog.New(stderr, "plain", "info")) + ldr.Path = path + cfg, err := ldr.Load() + if err != nil { + fmt.Fprintf(stderr, "could not load config file %s: %s\n", path, err) + return nil + } + cluster, err := cfg.GetCluster("") + if err != nil { + fmt.Fprintf(stderr, "could not use config file %s: %s\n", path, err) + return nil + } + fmt.Fprintf(stderr, "loaded config file %s\n", path) + return cluster +} + +func startLocalKeepstore(configData ConfigData, logbuf io.Writer) (*exec.Cmd, error) { + if configData.KeepBuffers < 1 { + fmt.Fprintf(logbuf, "not starting a local keepstore process because KeepBuffers=%v in config\n", configData.KeepBuffers) + return nil, nil + } + if configData.Cluster == nil { + fmt.Fprint(logbuf, "not starting a local keepstore process because cluster config file was not loaded\n") + return nil, nil + } + for uuid, vol := range configData.Cluster.Volumes { + if len(vol.AccessViaHosts) > 0 { + fmt.Fprintf(logbuf, "not starting a local keepstore process because a volume (%s) uses AccessViaHosts\n", uuid) + return nil, nil + } + if !vol.ReadOnly && vol.Replication < configData.Cluster.Collections.DefaultReplication { + fmt.Fprintf(logbuf, "not starting a local keepstore process because a writable volume (%s) has replication less than Collections.DefaultReplication (%d < %d)\n", uuid, vol.Replication, configData.Cluster.Collections.DefaultReplication) + return nil, nil + } + } + + // Rather than have an alternate way to tell keepstore how + // many buffers to use when starting it this way, we just + // modify the cluster configuration that we feed it on stdin. + configData.Cluster.API.MaxKeepBlobBuffers = configData.KeepBuffers + + localaddr := localKeepstoreAddr() + ln, err := net.Listen("tcp", net.JoinHostPort(localaddr, "0")) + if err != nil { + return nil, err + } + _, port, err := net.SplitHostPort(ln.Addr().String()) + if err != nil { + ln.Close() + return nil, err + } + ln.Close() + url := "http://" + net.JoinHostPort(localaddr, port) + + fmt.Fprintf(logbuf, "starting keepstore on %s\n", url) + + var confJSON bytes.Buffer + err = json.NewEncoder(&confJSON).Encode(arvados.Config{ + Clusters: map[string]arvados.Cluster{ + configData.Cluster.ClusterID: *configData.Cluster, + }, + }) if err != nil { - return fmt.Errorf("read stdin: %s", err) + return nil, err } - var env map[string]string - err = json.Unmarshal(buf, &env) + cmd := exec.Command("/proc/self/exe", "keepstore", "-config=-") + if target, err := os.Readlink(cmd.Path); err == nil && strings.HasSuffix(target, ".test") { + // If we're a 'go test' process, running + // /proc/self/exe would start the test suite in a + // child process, which is not what we want. + cmd.Path, _ = exec.LookPath("go") + cmd.Args = append([]string{"go", "run", "../../cmd/arvados-server"}, cmd.Args[1:]...) + cmd.Env = os.Environ() + } + cmd.Stdin = &confJSON + cmd.Stdout = logbuf + cmd.Stderr = logbuf + cmd.Env = append(cmd.Env, + "GOGC=10", + "ARVADOS_SERVICE_INTERNAL_URL="+url) + err = cmd.Start() if err != nil { - return fmt.Errorf("decode stdin: %s", err) + return nil, fmt.Errorf("error starting keepstore process: %w", err) } - for k, v := range env { - err = os.Setenv(k, v) + cmdExited := false + go func() { + cmd.Wait() + cmdExited = true + }() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) + defer cancel() + poll := time.NewTicker(time.Second / 10) + defer poll.Stop() + client := http.Client{} + for range poll.C { + testReq, err := http.NewRequestWithContext(ctx, "GET", url+"/_health/ping", nil) + testReq.Header.Set("Authorization", "Bearer "+configData.Cluster.ManagementToken) if err != nil { - return fmt.Errorf("setenv(%q): %s", k, err) + return nil, err + } + resp, err := client.Do(testReq) + if err == nil { + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + break + } + } + if cmdExited { + return nil, fmt.Errorf("keepstore child process exited") + } + if ctx.Err() != nil { + return nil, fmt.Errorf("timed out waiting for new keepstore process to report healthy") } } - return nil + os.Setenv("ARVADOS_KEEP_SERVICES", url) + return cmd, nil +} + +// return current uid, gid, groups in a format suitable for logging: +// "crunch-run process has uid=1234(arvados) gid=1234(arvados) +// groups=1234(arvados),114(fuse)" +func currentUserAndGroups() string { + u, err := user.Current() + if err != nil { + return fmt.Sprintf("error getting current user ID: %s", err) + } + s := fmt.Sprintf("crunch-run process has uid=%s(%s) gid=%s", u.Uid, u.Username, u.Gid) + if g, err := user.LookupGroupId(u.Gid); err == nil { + s += fmt.Sprintf("(%s)", g.Name) + } + s += " groups=" + if gids, err := u.GroupIds(); err == nil { + for i, gid := range gids { + if i > 0 { + s += "," + } + s += gid + if g, err := user.LookupGroupId(gid); err == nil { + s += fmt.Sprintf("(%s)", g.Name) + } + } + } + return s +} + +// Return a suitable local interface address for a local keepstore +// service. Currently this is the numerically lowest non-loopback ipv4 +// address assigned to a local interface that is not in any of the +// link-local/vpn/loopback ranges 169.254/16, 100.64/10, or 127/8. +func localKeepstoreAddr() string { + var ips []net.IP + // Ignore error (proceed with zero IPs) + addrs, _ := processIPs(os.Getpid()) + for addr := range addrs { + ip := net.ParseIP(addr) + if ip == nil { + // invalid + continue + } + if ip.Mask(net.CIDRMask(8, 32)).Equal(net.IPv4(127, 0, 0, 0)) || + ip.Mask(net.CIDRMask(10, 32)).Equal(net.IPv4(100, 64, 0, 0)) || + ip.Mask(net.CIDRMask(16, 32)).Equal(net.IPv4(169, 254, 0, 0)) { + // unsuitable + continue + } + ips = append(ips, ip) + } + if len(ips) == 0 { + return "0.0.0.0" + } + sort.Slice(ips, func(ii, jj int) bool { + i, j := ips[ii], ips[jj] + if len(i) != len(j) { + return len(i) < len(j) + } + for x := range i { + if i[x] != j[x] { + return i[x] < j[x] + } + } + return false + }) + return ips[0].String() +} + +func (cr *ContainerRunner) loadPrices() { + buf, err := os.ReadFile(filepath.Join(lockdir, pricesfile)) + if err != nil { + if !os.IsNotExist(err) { + cr.CrunchLog.Printf("loadPrices: read: %s", err) + } + return + } + var prices []cloud.InstancePrice + err = json.Unmarshal(buf, &prices) + if err != nil { + cr.CrunchLog.Printf("loadPrices: decode: %s", err) + return + } + cr.pricesLock.Lock() + defer cr.pricesLock.Unlock() + var lastKnown time.Time + if len(cr.prices) > 0 { + lastKnown = cr.prices[0].StartTime + } + cr.prices = cloud.NormalizePriceHistory(append(prices, cr.prices...)) + for i := len(cr.prices) - 1; i >= 0; i-- { + price := cr.prices[i] + if price.StartTime.After(lastKnown) { + cr.CrunchLog.Printf("Instance price changed to %#.3g at %s", price.Price, price.StartTime.UTC()) + } + } +} + +func (cr *ContainerRunner) calculateCost(now time.Time) float64 { + cr.pricesLock.Lock() + defer cr.pricesLock.Unlock() + + // First, make a "prices" slice with the real data as far back + // as it goes, and (if needed) a "since the beginning of time" + // placeholder containing a reasonable guess about what the + // price was between cr.costStartTime and the earliest real + // data point. + prices := cr.prices + if len(prices) == 0 { + // use price info in InstanceType record initially + // provided by cloud dispatcher + var p float64 + var it arvados.InstanceType + if j := os.Getenv("InstanceType"); j != "" && json.Unmarshal([]byte(j), &it) == nil && it.Price > 0 { + p = it.Price + } + prices = []cloud.InstancePrice{{Price: p}} + } else if prices[len(prices)-1].StartTime.After(cr.costStartTime) { + // guess earlier pricing was the same as the earliest + // price we know about + filler := prices[len(prices)-1] + filler.StartTime = time.Time{} + prices = append(prices, filler) + } + + // Now that our history of price changes goes back at least as + // far as cr.costStartTime, add up the costs for each + // interval. + cost := 0.0 + spanEnd := now + for _, ip := range prices { + spanStart := ip.StartTime + if spanStart.After(now) { + // pricing information from the future -- not + // expected from AWS, but possible in + // principle, and exercised by tests. + continue + } + last := false + if spanStart.Before(cr.costStartTime) { + spanStart = cr.costStartTime + last = true + } + cost += ip.Price * spanEnd.Sub(spanStart).Seconds() / 3600 + if last { + break + } + spanEnd = spanStart + } + + return cost +} + +func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) { + for range sigchan { + runner.loadPrices() + update := arvadosclient.Dict{ + "select": []string{"uuid"}, + "container": arvadosclient.Dict{ + "cost": runner.calculateCost(time.Now()), + }, + } + runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil) + } }