19986: Log when a container uses nearly max RAM
[arvados.git] / lib / crunchrun / crunchrun.go
index 3def8851ce1ef044d7a917e2a66c49e0a7e51a3b..3607cafaf0149bc9763c9fc5679ec6ebb6d0a2e1 100644 (file)
@@ -50,9 +50,10 @@ var Command = command{}
 // ConfigData contains environment variables and (when needed) cluster
 // configuration, passed from dispatchcloud to crunch-run on stdin.
 type ConfigData struct {
-       Env         map[string]string
-       KeepBuffers int
-       Cluster     *arvados.Cluster
+       Env          map[string]string
+       KeepBuffers  int
+       EC2SpotCheck bool
+       Cluster      *arvados.Cluster
 }
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -319,7 +320,12 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
                        "Block not found error",
                        "Unhandled exception during FUSE operation",
                },
-               ReportFunc: runner.reportArvMountWarning,
+               ReportFunc: func(pattern, text string) {
+                       runner.updateRuntimeStatus(arvadosclient.Dict{
+                               "warning":       "arv-mount: " + pattern,
+                               "warningDetail": text,
+                       })
+               },
        }
        c.Stdout = runner.arvMountLog
        c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
@@ -758,12 +764,16 @@ func (runner *ContainerRunner) startCrunchstat() error {
        }
        runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
-               CID:          runner.executor.CgroupID(),
-               Logger:       log.New(runner.statLogger, "", 0),
                CgroupParent: runner.expectCgroupParent,
                CgroupRoot:   runner.cgroupRoot,
-               PollPeriod:   runner.statInterval,
-               TempDir:      runner.parentTemp,
+               CID:          runner.executor.CgroupID(),
+               Logger:       log.New(runner.statLogger, "", 0),
+               MemThresholds: map[string][]crunchstat.Threshold{
+                       "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
+               },
+               PollPeriod:      runner.statInterval,
+               TempDir:         runner.parentTemp,
+               ThresholdLogger: runner.CrunchLog,
        }
        runner.statReporter.Start()
        return nil
@@ -1197,16 +1207,115 @@ func (runner *ContainerRunner) updateLogs() {
        }
 }
 
-func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
-       var updated arvados.Container
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+const ec2TokenTTL = time.Second * 21600
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+       type ec2metadata struct {
+               Action string    `json:"action"`
+               Time   time.Time `json:"time"`
+       }
+       runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL)
+       var metadata ec2metadata
+       var token string
+       var tokenExp time.Time
+       check := func() error {
+               ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+               defer cancel()
+               if token == "" || tokenExp.Sub(time.Now()) < time.Minute {
+                       req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+                       if err != nil {
+                               return err
+                       }
+                       req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second)))
+                       resp, err := http.DefaultClient.Do(req)
+                       if err != nil {
+                               return err
+                       }
+                       defer resp.Body.Close()
+                       if resp.StatusCode != http.StatusOK {
+                               return fmt.Errorf("%s", resp.Status)
+                       }
+                       newtoken, err := ioutil.ReadAll(resp.Body)
+                       if err != nil {
+                               return err
+                       }
+                       token = strings.TrimSpace(string(newtoken))
+                       tokenExp = time.Now().Add(ec2TokenTTL)
+               }
+               req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+               if err != nil {
+                       return err
+               }
+               req.Header.Set("X-aws-ec2-metadata-token", token)
+               resp, err := http.DefaultClient.Do(req)
+               if err != nil {
+                       return err
+               }
+               defer resp.Body.Close()
+               metadata = ec2metadata{}
+               switch resp.StatusCode {
+               case http.StatusOK:
+                       break
+               case http.StatusNotFound:
+                       // "If Amazon EC2 is not preparing to stop or
+                       // terminate the instance, or if you
+                       // terminated the instance yourself,
+                       // instance-action is not present in the
+                       // instance metadata and you receive an HTTP
+                       // 404 error when you try to retrieve it."
+                       return nil
+               case http.StatusUnauthorized:
+                       token = ""
+                       return fmt.Errorf("%s", resp.Status)
+               default:
+                       return fmt.Errorf("%s", resp.Status)
+               }
+               err = json.NewDecoder(resp.Body).Decode(&metadata)
+               if err != nil {
+                       return err
+               }
+               return nil
+       }
+       failures := 0
+       var lastmetadata ec2metadata
+       for range time.NewTicker(spotInterruptionCheckInterval).C {
+               err := check()
+               if err != nil {
+                       runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+                       failures++
+                       if failures > 5 {
+                               runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures")
+                               return
+                       }
+                       continue
+               }
+               failures = 0
+               if metadata != lastmetadata {
+                       lastmetadata = metadata
+                       text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+                       runner.CrunchLog.Printf("%s", text)
+                       runner.updateRuntimeStatus(arvadosclient.Dict{
+                               "warning":          "preemption notice",
+                               "warningDetail":    text,
+                               "preemptionNotice": text,
+                       })
+                       if proc, err := os.FindProcess(os.Getpid()); err == nil {
+                               // trigger updateLogs
+                               proc.Signal(syscall.SIGUSR1)
+                       }
+               }
+       }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) {
        err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
                "container": arvadosclient.Dict{
-                       "runtime_status": arvadosclient.Dict{
-                               "warning":       "arv-mount: " + pattern,
-                               "warningDetail": text,
-                       },
+                       "runtime_status": status,
                },
-       }, &updated)
+       }, nil)
        if err != nil {
                runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
        }
@@ -2011,6 +2120,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                cr.expectCgroupParent = p
        }
 
+       if conf.EC2SpotCheck {
+               go cr.checkSpotInterruptionNotices()
+       }
+
        runerr := cr.Run()
 
        if *memprofile != "" {