// ConfigData contains environment variables and (when needed) cluster
// configuration, passed from dispatchcloud to crunch-run on stdin.
type ConfigData struct {
- Env map[string]string
- KeepBuffers int
- Cluster *arvados.Cluster
+ Env map[string]string
+ KeepBuffers int
+ EC2SpotCheck bool
+ Cluster *arvados.Cluster
}
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
"Block not found error",
"Unhandled exception during FUSE operation",
},
- ReportFunc: runner.reportArvMountWarning,
+ ReportFunc: func(pattern, text string) {
+ runner.updateRuntimeStatus("arv-mount: "+pattern, text)
+ },
}
c.Stdout = runner.arvMountLog
c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
}
}
-func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
- var updated arvados.Container
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+const ec2TokenTTL = time.Second * 21600
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+ type ec2metadata struct {
+ Action string `json:"action"`
+ Time time.Time `json:"time"`
+ }
+ runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL)
+ var metadata ec2metadata
+ var token string
+ var tokenExp time.Time
+ check := func() error {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ if token == "" || tokenExp.Sub(time.Now()) < time.Minute {
+ req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second)))
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("%s", resp.Status)
+ }
+ newtoken, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ token = strings.TrimSpace(string(newtoken))
+ tokenExp = time.Now().Add(ec2TokenTTL)
+ }
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token", token)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ metadata = ec2metadata{}
+ switch resp.StatusCode {
+ case http.StatusOK:
+ break
+ case http.StatusNotFound:
+ // "If Amazon EC2 is not preparing to stop or
+ // terminate the instance, or if you
+ // terminated the instance yourself,
+ // instance-action is not present in the
+ // instance metadata and you receive an HTTP
+ // 404 error when you try to retrieve it."
+ return nil
+ case http.StatusUnauthorized:
+ token = ""
+ return fmt.Errorf("%s", resp.Status)
+ default:
+ return fmt.Errorf("%s", resp.Status)
+ }
+ err = json.NewDecoder(resp.Body).Decode(&metadata)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ failures := 0
+ var lastmetadata ec2metadata
+ for range time.NewTicker(spotInterruptionCheckInterval).C {
+ err := check()
+ if err != nil {
+ runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+ failures++
+ if failures > 5 {
+ runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures")
+ return
+ }
+ continue
+ }
+ failures = 0
+ if metadata != lastmetadata {
+ lastmetadata = metadata
+ text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+ runner.CrunchLog.Printf("%s", text)
+ runner.updateRuntimeStatus("instance interruption", text)
+ if proc, err := os.FindProcess(os.Getpid()); err == nil {
+ // trigger updateLogs
+ proc.Signal(syscall.SIGUSR1)
+ }
+ }
+ }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(warning, detail string) {
err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
"container": arvadosclient.Dict{
"runtime_status": arvadosclient.Dict{
- "warning": "arv-mount: " + pattern,
- "warningDetail": text,
+ "warning": warning,
+ "warningDetail": detail,
},
},
- }, &updated)
+ }, nil)
if err != nil {
runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
}
}
// UpdateContainerRunning updates the container state to "Running"
-func (runner *ContainerRunner) UpdateContainerRunning() error {
+func (runner *ContainerRunner) UpdateContainerRunning(logId string) error {
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
if runner.cCancelled {
return ErrCancelled
}
- return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
- arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
+ updates := arvadosclient.Dict{
+ "gateway_address": runner.gateway.Address,
+ "state": "Running",
+ }
+ if logId != "" {
+ updates["log"] = logId
+ }
+ return runner.DispatcherArvClient.Update(
+ "containers",
+ runner.Container.UUID,
+ arvadosclient.Dict{"container": updates},
+ nil,
+ )
}
// ContainerToken returns the api_token the container (and any
return
}
- err = runner.UpdateContainerRunning()
+ logCollection, err := runner.saveLogCollection(false)
+ var logId string
+ if err == nil {
+ logId = logCollection.PortableDataHash
+ } else {
+ runner.CrunchLog.Printf("Error committing initial log collection: %v", err)
+ }
+ err = runner.UpdateContainerRunning(logId)
if err != nil {
return
}
cr.expectCgroupParent = p
}
+ if conf.EC2SpotCheck {
+ go cr.checkSpotInterruptionNotices()
+ }
+
runerr := cr.Run()
if *memprofile != "" {