19961: Reuse metadata token within expected TTL, adjust retry logic.
[arvados.git] / lib / crunchrun / crunchrun.go
index 507dfc4dd713da6dfdb0410d7d043726dcca5f76..a9c65cca422922dac2c5a43a658c7c874ff0ce58 100644 (file)
@@ -50,9 +50,10 @@ var Command = command{}
 // ConfigData contains environment variables and (when needed) cluster
 // configuration, passed from dispatchcloud to crunch-run on stdin.
 type ConfigData struct {
-       Env         map[string]string
-       KeepBuffers int
-       Cluster     *arvados.Cluster
+       Env          map[string]string
+       KeepBuffers  int
+       EC2SpotCheck bool
+       Cluster      *arvados.Cluster
 }
 
 // IArvadosClient is the minimal Arvados API methods used by crunch-run.
@@ -319,7 +320,9 @@ func (runner *ContainerRunner) ArvMountCmd(cmdline []string, token string) (c *e
                        "Block not found error",
                        "Unhandled exception during FUSE operation",
                },
-               ReportFunc: runner.reportArvMountWarning,
+               ReportFunc: func(pattern, text string) {
+                       runner.updateRuntimeStatus("arv-mount: "+pattern, text)
+               },
        }
        c.Stdout = runner.arvMountLog
        c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
@@ -1197,16 +1200,114 @@ func (runner *ContainerRunner) updateLogs() {
        }
 }
 
-func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
-       var updated arvados.Container
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+const ec2TokenTTL = time.Second * 21600
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+       type ec2metadata struct {
+               Action string    `json:"action"`
+               Time   time.Time `json:"time"`
+       }
+       runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL)
+       var metadata ec2metadata
+       var token string
+       var tokenExp time.Time
+       check := func() error {
+               ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+               defer cancel()
+               if token == "" || tokenExp.Sub(time.Now()) < time.Minute {
+                       req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+                       if err != nil {
+                               return err
+                       }
+                       req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second)))
+                       resp, err := http.DefaultClient.Do(req)
+                       if err != nil {
+                               return err
+                       }
+                       defer resp.Body.Close()
+                       if resp.StatusCode != http.StatusOK {
+                               return fmt.Errorf("%s", resp.Status)
+                       }
+                       newtoken, err := ioutil.ReadAll(resp.Body)
+                       if err != nil {
+                               return err
+                       }
+                       token = strings.TrimSpace(string(newtoken))
+                       tokenExp = time.Now().Add(ec2TokenTTL)
+               }
+               req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+               if err != nil {
+                       return err
+               }
+               req.Header.Set("X-aws-ec2-metadata-token", token)
+               resp, err := http.DefaultClient.Do(req)
+               if err != nil {
+                       return err
+               }
+               defer resp.Body.Close()
+               metadata = ec2metadata{}
+               switch resp.StatusCode {
+               case http.StatusOK:
+                       break
+               case http.StatusNotFound:
+                       // "If Amazon EC2 is not preparing to stop or
+                       // terminate the instance, or if you
+                       // terminated the instance yourself,
+                       // instance-action is not present in the
+                       // instance metadata and you receive an HTTP
+                       // 404 error when you try to retrieve it."
+                       return nil
+               case http.StatusUnauthorized:
+                       token = ""
+                       return fmt.Errorf("%s", resp.Status)
+               default:
+                       return fmt.Errorf("%s", resp.Status)
+               }
+               err = json.NewDecoder(resp.Body).Decode(&metadata)
+               if err != nil {
+                       return err
+               }
+               return nil
+       }
+       failures := 0
+       var lastmetadata ec2metadata
+       for range time.NewTicker(spotInterruptionCheckInterval).C {
+               err := check()
+               if err != nil {
+                       runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+                       failures++
+                       if failures > 5 {
+                               runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures")
+                               return
+                       }
+                       continue
+               }
+               failures = 0
+               if metadata != lastmetadata {
+                       lastmetadata = metadata
+                       text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+                       runner.CrunchLog.Printf("%s", text)
+                       runner.updateRuntimeStatus("instance interruption", text)
+                       if proc, err := os.FindProcess(os.Getpid()); err == nil {
+                               // trigger updateLogs
+                               proc.Signal(syscall.SIGUSR1)
+                       }
+               }
+       }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(warning, detail string) {
        err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
                "container": arvadosclient.Dict{
                        "runtime_status": arvadosclient.Dict{
-                               "warning":       "arv-mount: " + pattern,
-                               "warningDetail": text,
+                               "warning":       warning,
+                               "warningDetail": detail,
                        },
                },
-       }, &updated)
+       }, nil)
        if err != nil {
                runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
        }
@@ -1431,14 +1532,25 @@ func (runner *ContainerRunner) saveLogCollection(final bool) (response arvados.C
 }
 
 // UpdateContainerRunning updates the container state to "Running"
-func (runner *ContainerRunner) UpdateContainerRunning() error {
+func (runner *ContainerRunner) UpdateContainerRunning(logId string) error {
        runner.cStateLock.Lock()
        defer runner.cStateLock.Unlock()
        if runner.cCancelled {
                return ErrCancelled
        }
-       return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
-               arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
+       updates := arvadosclient.Dict{
+               "gateway_address": runner.gateway.Address,
+               "state":           "Running",
+       }
+       if logId != "" {
+               updates["log"] = logId
+       }
+       return runner.DispatcherArvClient.Update(
+               "containers",
+               runner.Container.UUID,
+               arvadosclient.Dict{"container": updates},
+               nil,
+       )
 }
 
 // ContainerToken returns the api_token the container (and any
@@ -1636,7 +1748,14 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
 
-       err = runner.UpdateContainerRunning()
+       logCollection, err := runner.saveLogCollection(false)
+       var logId string
+       if err == nil {
+               logId = logCollection.PortableDataHash
+       } else {
+               runner.CrunchLog.Printf("Error committing initial log collection: %v", err)
+       }
+       err = runner.UpdateContainerRunning(logId)
        if err != nil {
                return
        }
@@ -1993,6 +2112,10 @@ func (command) RunCommand(prog string, args []string, stdin io.Reader, stdout, s
                cr.expectCgroupParent = p
        }
 
+       if conf.EC2SpotCheck {
+               go cr.checkSpotInterruptionNotices()
+       }
+
        runerr := cr.Run()
 
        if *memprofile != "" {