19961: Detect and log EC2 spot interruption notices.
authorTom Clegg <tom@curii.com>
Tue, 14 Feb 2023 20:53:54 +0000 (15:53 -0500)
committerTom Clegg <tom@curii.com>
Tue, 14 Feb 2023 20:53:54 +0000 (15:53 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/crunchrun/crunchrun.go
lib/crunchrun/crunchrun_test.go
lib/dispatchcloud/worker/runner.go

index 3def8851ce1ef044d7a917e2a66c49e0a7e51a3b..ea48512181b0134d441e15a4f048afcbb0726fdd 100644 (file)
@@ -50,9 +50,10 @@ var Command = command{}
 // ConfigData contains environment variables and (when needed) cluster
 // configuration, passed from dispatchcloud to crunch-run on stdin.
 type ConfigData struct {
-       Env         map[string]string
-       KeepBuffers int
-       Cluster     *arvados.Cluster
+       Env          map[string]string
+       KeepBuffers  int
+       EC2SpotCheck bool
+       Cluster      *arvados.Cluster
 }
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -319,7 +320,9 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
                        "Block not found error",
                        "Unhandled exception during FUSE operation",
                },
-               ReportFunc: runner.reportArvMountWarning,
+               ReportFunc: func(pattern, text string) {
+                       runner.updateRuntimeStatus("arv-mount: "+pattern, text)
+               },
        }
        c.Stdout = runner.arvMountLog
        c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
@@ -1197,16 +1200,99 @@ func (runner *ContainerRunner) updateLogs() {
        }
 }
 
-func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
-       var updated arvados.Container
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+       type ec2metadata struct {
+               Action string    `json:"action"`
+               Time   time.Time `json:"time"`
+       }
+       runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL)
+       var metadata ec2metadata
+       check := func() error {
+               ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+               defer cancel()
+               req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+               if err != nil {
+                       return err
+               }
+               req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600")
+               resp, err := http.DefaultClient.Do(req)
+               if err != nil {
+                       return err
+               }
+               defer resp.Body.Close()
+               if resp.StatusCode != http.StatusOK {
+                       return fmt.Errorf("%s", resp.Status)
+               }
+               token, err := ioutil.ReadAll(resp.Body)
+               if err != nil {
+                       return err
+               }
+               req, err = http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+               if err != nil {
+                       return err
+               }
+               req.Header.Set("X-aws-ec2-metadata-token", strings.TrimSpace(string(token)))
+               resp, err = http.DefaultClient.Do(req)
+               if err != nil {
+                       return err
+               }
+               defer resp.Body.Close()
+               metadata = ec2metadata{}
+               if resp.StatusCode == http.StatusNotFound {
+                       // "If Amazon EC2 is not preparing to stop or
+                       // terminate the instance, or if you
+                       // terminated the instance yourself,
+                       // instance-action is not present in the
+                       // instance metadata and you receive an HTTP
+                       // 404 error when you try to retrieve it."
+                       return nil
+               } else if resp.StatusCode != http.StatusOK {
+                       return fmt.Errorf("%s", resp.Status)
+               }
+               err = json.NewDecoder(resp.Body).Decode(&metadata)
+               if err != nil {
+                       return err
+               }
+               return nil
+       }
+       failures := 0
+       var lastmetadata ec2metadata
+       for range time.NewTicker(spotInterruptionCheckInterval).C {
+               err := check()
+               if err != nil {
+                       runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+                       failures++
+                       if failures > 3 {
+                               runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many errors")
+                               return
+                       }
+                       continue
+               }
+               if metadata != lastmetadata {
+                       lastmetadata = metadata
+                       text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+                       runner.CrunchLog.Printf("%s", text)
+                       runner.updateRuntimeStatus("instance interruption", text)
+                       if proc, err := os.FindProcess(os.Getpid()); err == nil {
+                               // trigger updateLogs
+                               proc.Signal(syscall.SIGUSR1)
+                       }
+               }
+       }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(warning, detail string) {
        err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
                "container": arvadosclient.Dict{
                        "runtime_status": arvadosclient.Dict{
-                               "warning":       "arv-mount: " + pattern,
-                               "warningDetail": text,
+                               "warning":       warning,
+                               "warningDetail": detail,
                        },
                },
-       }, &updated)
+       }, nil)
        if err != nil {
                runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
        }
@@ -2011,6 +2097,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                cr.expectCgroupParent = p
        }
 
+       if conf.EC2SpotCheck {
+               go cr.checkSpotInterruptionNotices()
+       }
+
        runerr := cr.Run()
 
        if *memprofile != "" {
index aaba1c42045e325266534cdf45551ab43199c6f0..a83af23c8a70832eb36f3159863a8674d6beb512 100644 (file)
@@ -13,6 +13,8 @@ import (
        "io"
        "io/ioutil"
        "log"
+       "net/http"
+       "net/http/httptest"
        "os"
        "os/exec"
        "regexp"
@@ -772,6 +774,61 @@ func (s *TestSuite) TestRunAlreadyRunning(c *C) {
        c.Check(ran, Equals, false)
 }
 
+func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
+       var failedOnce bool
+       var stoptime time.Time
+       token := "fake-ec2-metadata-token"
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               if !failedOnce {
+                       w.WriteHeader(http.StatusServiceUnavailable)
+                       failedOnce = true
+                       return
+               }
+               switch r.URL.Path {
+               case "/latest/api/token":
+                       fmt.Fprintln(w, token)
+               case "/latest/meta-data/spot/instance-action":
+                       if r.Header.Get("X-aws-ec2-metadata-token") != token {
+                               w.WriteHeader(http.StatusUnauthorized)
+                       } else if stoptime.IsZero() {
+                               w.WriteHeader(http.StatusNotFound)
+                       } else {
+                               fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, stoptime.Format(time.RFC3339))
+                       }
+               default:
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+       defer stub.Close()
+
+       defer func(i time.Duration, u string) {
+               spotInterruptionCheckInterval = i
+               ec2MetadataBaseURL = u
+       }(spotInterruptionCheckInterval, ec2MetadataBaseURL)
+       spotInterruptionCheckInterval = time.Second / 4
+       ec2MetadataBaseURL = stub.URL
+
+       go s.runner.checkSpotInterruptionNotices()
+       s.fullRunHelper(c, `{
+    "command": ["sleep", "3"],
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {},
+    "state": "Locked"
+}`, nil, 0, func() {
+               time.Sleep(time.Second)
+               stoptime = time.Now().Add(time.Minute).UTC()
+               time.Sleep(time.Second)
+       })
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 250ms using instance metadata at http://.*`)
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Cloud provider indicates instance action "stop" scheduled for time "`+stoptime.Format(time.RFC3339)+`".*`)
+}
+
 func (s *TestSuite) TestRunTimeExceeded(c *C) {
        s.fullRunHelper(c, `{
     "command": ["sleep", "3"],
index 29c4b8e0a36a3be2a721e1bc509335817e86842c..ac039272cf9c5b5374c6a9dd60462b0b9e2a684c 100644 (file)
@@ -63,6 +63,9 @@ func newRemoteRunner(uuid string, wkr *worker) *remoteRunner {
                configData.Cluster = wkr.wp.cluster
                configData.KeepBuffers = bufs * wkr.instType.VCPUs
        }
+       if wkr.wp.cluster.Containers.CloudVMs.Driver == "ec2" && wkr.instType.Preemptible {
+               configData.EC2SpotCheck = true
+       }
        configJSON, err := json.Marshal(configData)
        if err != nil {
                panic(err)