17244: Refactor crunchstat to use cgroup unified/hybrid modes.
[arvados.git] / lib / crunchrun / crunchrun.go
index 3def8851ce1ef044d7a917e2a66c49e0a7e51a3b..04fc6c0d0c88a10935c28f19d09cc7410432d9dd 100644 (file)
@@ -12,6 +12,7 @@ import (
        "flag"
        "fmt"
        "io"
+       "io/fs"
        "io/ioutil"
        "log"
        "net"
@@ -50,9 +51,10 @@ var Command = command{}
 // ConfigData contains environment variables and (when needed) cluster
 // configuration, passed from dispatchcloud to crunch-run on stdin.
 type ConfigData struct {
-       Env         map[string]string
-       KeepBuffers int
-       Cluster     *arvados.Cluster
+       Env          map[string]string
+       KeepBuffers  int
+       EC2SpotCheck bool
+       Cluster      *arvados.Cluster
 }
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -151,20 +153,12 @@ type ContainerRunner struct {
        hoststatLogger   io.WriteCloser
        hoststatReporter *crunchstat.Reporter
        statInterval     time.Duration
-       cgroupRoot       string
-       // What we expect the container's cgroup parent to be.
-       expectCgroupParent string
        // What we tell docker to use as the container's cgroup
-       // parent. Note: Ideally we would use the same field for both
-       // expectCgroupParent and setCgroupParent, and just make it
-       // default to "docker". However, when using docker < 1.10 with
-       // systemd, specifying a non-empty cgroup parent (even the
-       // default value "docker") hits a docker bug
-       // (https://github.com/docker/docker/issues/17126). Using two
-       // separate fields makes it possible to use the "expect cgroup
-       // parent to be X" feature even on sites where the "specify
-       // cgroup parent" feature breaks.
+       // parent.
        setCgroupParent string
+       // Fake root dir where crunchstat.Reporter should read OS
+       // files, for testing.
+       crunchstatFakeFS fs.FS
 
        cStateLock sync.Mutex
        cCancelled bool // StopContainer() invoked
@@ -319,7 +313,12 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
                        "Block not found error",
                        "Unhandled exception during FUSE operation",
                },
-               ReportFunc: runner.reportArvMountWarning,
+               ReportFunc: func(pattern, text string) {
+                       runner.updateRuntimeStatus(arvadosclient.Dict{
+                               "warning":       "arv-mount: " + pattern,
+                               "warningDetail": text,
+                       })
+               },
        }
        c.Stdout = runner.arvMountLog
        c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
@@ -632,7 +631,7 @@ func (runner *ContainerRunner) SetupMounts() (map[string]bindmount, error) {
                        if err != nil {
                                return nil, fmt.Errorf("creating temp dir: %v", err)
                        }
-                       err = gitMount(mnt).extractTree(runner.ContainerArvClient, tmpdir, token)
+                       err = gitMount(mnt).extractTree(runner.containerClient, tmpdir, token)
                        if err != nil {
                                return nil, err
                        }
@@ -728,6 +727,7 @@ func (runner *ContainerRunner) stopHoststat() error {
                return nil
        }
        runner.hoststatReporter.Stop()
+       runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog)
        err := runner.hoststatLogger.Close()
        if err != nil {
                return fmt.Errorf("error closing hoststat logs: %v", err)
@@ -743,7 +743,7 @@ func (runner *ContainerRunner) startHoststat() error {
        runner.hoststatLogger = NewThrottledLogger(w)
        runner.hoststatReporter = &crunchstat.Reporter{
                Logger:     log.New(runner.hoststatLogger, "", 0),
-               CgroupRoot: runner.cgroupRoot,
+               Pid:        func() int { return 1 },
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
@@ -758,12 +758,15 @@ func (runner *ContainerRunner) startCrunchstat() error {
        }
        runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
-               CID:          runner.executor.CgroupID(),
-               Logger:       log.New(runner.statLogger, "", 0),
-               CgroupParent: runner.expectCgroupParent,
-               CgroupRoot:   runner.cgroupRoot,
-               PollPeriod:   runner.statInterval,
-               TempDir:      runner.parentTemp,
+               Pid:    runner.executor.Pid,
+               FS:     runner.crunchstatFakeFS,
+               Logger: log.New(runner.statLogger, "", 0),
+               MemThresholds: map[string][]crunchstat.Threshold{
+                       "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
+               },
+               PollPeriod:      runner.statInterval,
+               TempDir:         runner.parentTemp,
+               ThresholdLogger: runner.CrunchLog,
        }
        runner.statReporter.Start()
        return nil
@@ -1113,6 +1116,7 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
        runner.CrunchLog.Printf("Container exited with status code %d%s", exitcode, extra)
        err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "select":    []string{"uuid"},
                "container": arvadosclient.Dict{"exit_code": exitcode},
        }, nil)
        if err != nil {
@@ -1142,6 +1146,9 @@ func (runner *ContainerRunner) WaitFinish() error {
 
        if runner.statReporter != nil {
                runner.statReporter.Stop()
+               runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{
+                       "rss": runner.Container.RuntimeConstraints.RAM,
+               })
                err = runner.statLogger.Close()
                if err != nil {
                        runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
@@ -1186,7 +1193,10 @@ func (runner *ContainerRunner) updateLogs() {
                }
 
                err = runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
-                       "container": arvadosclient.Dict{"log": saved.PortableDataHash},
+                       "select": []string{"uuid"},
+                       "container": arvadosclient.Dict{
+                               "log": saved.PortableDataHash,
+                       },
                }, nil)
                if err != nil {
                        runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
@@ -1197,16 +1207,116 @@ func (runner *ContainerRunner) updateLogs() {
        }
 }
 
-func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
-       var updated arvados.Container
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+const ec2TokenTTL = time.Second * 21600
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+       type ec2metadata struct {
+               Action string    `json:"action"`
+               Time   time.Time `json:"time"`
+       }
+       runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL)
+       var metadata ec2metadata
+       var token string
+       var tokenExp time.Time
+       check := func() error {
+               ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+               defer cancel()
+               if token == "" || tokenExp.Sub(time.Now()) < time.Minute {
+                       req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+                       if err != nil {
+                               return err
+                       }
+                       req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second)))
+                       resp, err := http.DefaultClient.Do(req)
+                       if err != nil {
+                               return err
+                       }
+                       defer resp.Body.Close()
+                       if resp.StatusCode != http.StatusOK {
+                               return fmt.Errorf("%s", resp.Status)
+                       }
+                       newtoken, err := ioutil.ReadAll(resp.Body)
+                       if err != nil {
+                               return err
+                       }
+                       token = strings.TrimSpace(string(newtoken))
+                       tokenExp = time.Now().Add(ec2TokenTTL)
+               }
+               req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+               if err != nil {
+                       return err
+               }
+               req.Header.Set("X-aws-ec2-metadata-token", token)
+               resp, err := http.DefaultClient.Do(req)
+               if err != nil {
+                       return err
+               }
+               defer resp.Body.Close()
+               metadata = ec2metadata{}
+               switch resp.StatusCode {
+               case http.StatusOK:
+                       break
+               case http.StatusNotFound:
+                       // "If Amazon EC2 is not preparing to stop or
+                       // terminate the instance, or if you
+                       // terminated the instance yourself,
+                       // instance-action is not present in the
+                       // instance metadata and you receive an HTTP
+                       // 404 error when you try to retrieve it."
+                       return nil
+               case http.StatusUnauthorized:
+                       token = ""
+                       return fmt.Errorf("%s", resp.Status)
+               default:
+                       return fmt.Errorf("%s", resp.Status)
+               }
+               err = json.NewDecoder(resp.Body).Decode(&metadata)
+               if err != nil {
+                       return err
+               }
+               return nil
+       }
+       failures := 0
+       var lastmetadata ec2metadata
+       for range time.NewTicker(spotInterruptionCheckInterval).C {
+               err := check()
+               if err != nil {
+                       runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+                       failures++
+                       if failures > 5 {
+                               runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures")
+                               return
+                       }
+                       continue
+               }
+               failures = 0
+               if metadata != lastmetadata {
+                       lastmetadata = metadata
+                       text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+                       runner.CrunchLog.Printf("%s", text)
+                       runner.updateRuntimeStatus(arvadosclient.Dict{
+                               "warning":          "preemption notice",
+                               "warningDetail":    text,
+                               "preemptionNotice": text,
+                       })
+                       if proc, err := os.FindProcess(os.Getpid()); err == nil {
+                               // trigger updateLogs
+                               proc.Signal(syscall.SIGUSR1)
+                       }
+               }
+       }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) {
        err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "select": []string{"uuid"},
                "container": arvadosclient.Dict{
-                       "runtime_status": arvadosclient.Dict{
-                               "warning":       "arv-mount: " + pattern,
-                               "warningDetail": text,
-                       },
+                       "runtime_status": status,
                },
-       }, &updated)
+       }, nil)
        if err != nil {
                runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
        }
@@ -1219,7 +1329,9 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
                // Output may have been set directly by the container, so
                // refresh the container record to check.
                err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
-                       nil, &runner.Container)
+                       arvadosclient.Dict{
+                               "select": []string{"output"},
+                       }, &runner.Container)
                if err != nil {
                        return err
                }
@@ -1232,7 +1344,6 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
 
        txt, err := (&copier{
                client:        runner.containerClient,
-               arvClient:     runner.ContainerArvClient,
                keepClient:    runner.ContainerKeepClient,
                hostOutputDir: runner.HostOutputDir,
                ctrOutputDir:  runner.Container.OutputPath,
@@ -1258,6 +1369,7 @@ func (runner *ContainerRunner) CaptureOutput(bindmounts map[string]bindmount) er
        var resp arvados.Collection
        err = runner.ContainerArvClient.Create("collections", arvadosclient.Dict{
                "ensure_unique_name": true,
+               "select":             []string{"portable_data_hash"},
                "collection": arvadosclient.Dict{
                        "is_trashed":    true,
                        "name":          "output for " + runner.Container.UUID,
@@ -1384,6 +1496,8 @@ func (runner *ContainerRunner) CommitLogs() error {
        return nil
 }
 
+// Create/update the log collection. Return value has UUID and
+// PortableDataHash fields populated, but others may be blank.
 func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.Collection, err error) {
        runner.logMtx.Lock()
        defer runner.logMtx.Unlock()
@@ -1408,11 +1522,20 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
        if final {
                updates["is_trashed"] = true
        } else {
-               exp := time.Now().Add(crunchLogUpdatePeriod * 24)
+               // We set trash_at so this collection gets
+               // automatically cleaned up eventually.  It used to be
+               // 12 hours but we had a situation where the API
+               // server was down over a weekend but the containers
+               // kept running such that the log collection got
+               // trashed, so now we make it 2 weeks.  refs #20378
+               exp := time.Now().Add(time.Duration(24*14) * time.Hour)
                updates["trash_at"] = exp
                updates["delete_at"] = exp
        }
-       reqBody := arvadosclient.Dict{"collection": updates}
+       reqBody := arvadosclient.Dict{
+               "select":     []string{"uuid", "portable_data_hash"},
+               "collection": updates,
+       }
        var err2 error
        if runner.logUUID == "" {
                reqBody["ensure_unique_name"] = true
@@ -1447,7 +1570,10 @@ func (runner *ContainerRunner) UpdateContainerRunning(logId string) error {
        return runner.DispatcherArvClient.Update(
                "containers",
                runner.Container.UUID,
-               arvadosclient.Dict{"container": updates},
+               arvadosclient.Dict{
+                       "select":    []string{"uuid"},
+                       "container": updates,
+               },
                nil,
        )
 }
@@ -1485,7 +1611,10 @@ func (runner *ContainerRunner) UpdateContainerFinal() error {
                update["output"] = *runner.OutputPDH
        }
        update["cost"] = runner.calculateCost(time.Now())
-       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{"container": update}, nil)
+       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+               "select":    []string{"uuid"},
+               "container": update,
+       }, nil)
 }
 
 // IsCancelled returns the value of Cancelled, with goroutine safety.
@@ -1530,11 +1659,7 @@ func (runner *ContainerRunner) Run() (err error) {
        signal.Notify(sigusr2, syscall.SIGUSR2)
        defer signal.Stop(sigusr2)
        runner.loadPrices()
-       go func() {
-               for range sigusr2 {
-                       runner.loadPrices()
-               }
-       }()
+       go runner.handleSIGUSR2(sigusr2)
 
        runner.finalState = "Queued"
 
@@ -1776,9 +1901,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
        log := log.New(stderr, "", 0)
        flags := flag.NewFlagSet(prog, flag.ContinueOnError)
        statInterval := flags.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
-       cgroupRoot := flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
-       cgroupParent := flags.String("cgroup-parent", "docker", "name of container's parent cgroup (ignored if -cgroup-parent-subsystem is used)")
-       cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container")
+       flags.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree (obsolete, ignored)")
+       flags.String("cgroup-parent", "docker", "name of container's parent cgroup (obsolete, ignored)")
+       cgroupParentSubsystem := flags.String("cgroup-parent-subsystem", "", "use current cgroup for given subsystem as parent cgroup for container (cgroups v1 only)")
        caCertsPath := flags.String("ca-certs", "", "Path to TLS root certificates")
        detach := flags.Bool("detach", false, "Detach from parent process and run in the background")
        stdinConfig := flags.Bool("stdin-config", false, "Load config and environment variables from JSON message on stdin")
@@ -1880,7 +2005,9 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                log.Printf("%s: %v", containerUUID, err)
                return 1
        }
-       api.Retries = 8
+       // arvadosclient now interprets Retries=10 to mean
+       // Timeout=10m, retrying with exponential backoff + jitter.
+       api.Retries = 10
 
        kc, err := keepclient.MakeKeepClient(api)
        if err != nil {
@@ -1968,6 +2095,7 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        ContainerUUID: containerUUID,
                        Target:        cr.executor,
                        Log:           cr.CrunchLog,
+                       LogCollection: cr.LogCollection,
                }
                if gwListen == "" {
                        // Direct connection won't work, so we use the
@@ -1978,7 +2106,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        cr.gateway.UpdateTunnelURL = func(url string) {
                                cr.gateway.Address = "tunnel " + url
                                cr.DispatcherArvClient.Update("containers", containerUUID,
-                                       arvadosclient.Dict{"container": arvadosclient.Dict{"gateway_address": cr.gateway.Address}}, nil)
+                                       arvadosclient.Dict{
+                                               "select":    []string{"uuid"},
+                                               "container": arvadosclient.Dict{"gateway_address": cr.gateway.Address},
+                                       }, nil)
                        }
                }
                err = cr.gateway.Start()
@@ -1996,8 +2127,6 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
 
        cr.parentTemp = parentTemp
        cr.statInterval = *statInterval
-       cr.cgroupRoot = *cgroupRoot
-       cr.expectCgroupParent = *cgroupParent
        cr.enableMemoryLimit = *enableMemoryLimit
        cr.enableNetwork = *enableNetwork
        cr.networkMode = *networkMode
@@ -2008,7 +2137,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                        return 1
                }
                cr.setCgroupParent = p
-               cr.expectCgroupParent = p
+       }
+
+       if conf.EC2SpotCheck {
+               go cr.checkSpotInterruptionNotices()
        }
 
        runerr := cr.Run()
@@ -2052,7 +2184,9 @@ func hpcConfData(uuid string, configFile string, stderr io.Writer) ConfigData {
                fmt.Fprintf(stderr, "error setting up arvadosclient: %s\n", err)
                return conf
        }
-       arv.Retries = 8
+       // arvadosclient now interprets Retries=10 to mean
+       // Timeout=10m, retrying with exponential backoff + jitter.
+       arv.Retries = 10
        var ctr arvados.Container
        err = arv.Call("GET", "containers", uuid, "", arvadosclient.Dict{"select": []string{"runtime_constraints"}}, &ctr)
        if err != nil {
@@ -2336,3 +2470,16 @@ func (cr *ContainerRunner) calculateCost(now time.Time) float64 {
 
        return cost
 }
+
+func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
+       for range sigchan {
+               runner.loadPrices()
+               update := arvadosclient.Dict{
+                       "select": []string{"uuid"},
+                       "container": arvadosclient.Dict{
+                               "cost": runner.calculateCost(time.Now()),
+                       },
+               }
+               runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil)
+       }
+}