// ConfigData contains environment variables and (when needed) cluster
// configuration, passed from dispatchcloud to crunch-run on stdin.
type ConfigData struct {
- Env map[string]string
- KeepBuffers int
- Cluster *arvados.Cluster
+ Env map[string]string
+ KeepBuffers int
+ EC2SpotCheck bool
+ Cluster *arvados.Cluster
}
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
"Block not found error",
"Unhandled exception during FUSE operation",
},
- ReportFunc: runner.reportArvMountWarning,
+ ReportFunc: func(pattern, text string) {
+ runner.updateRuntimeStatus("arv-mount: "+pattern, text)
+ },
}
c.Stdout = runner.arvMountLog
c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
}
}
-func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
- var updated arvados.Container
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+ type ec2metadata struct {
+ Action string `json:"action"`
+ Time time.Time `json:"time"`
+ }
+ runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL)
+ var metadata ec2metadata
+ check := func() error {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", "21600")
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("%s", resp.Status)
+ }
+ token, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ req, err = http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token", strings.TrimSpace(string(token)))
+ resp, err = http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ metadata = ec2metadata{}
+ if resp.StatusCode == http.StatusNotFound {
+ // "If Amazon EC2 is not preparing to stop or
+ // terminate the instance, or if you
+ // terminated the instance yourself,
+ // instance-action is not present in the
+ // instance metadata and you receive an HTTP
+ // 404 error when you try to retrieve it."
+ return nil
+ } else if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("%s", resp.Status)
+ }
+ err = json.NewDecoder(resp.Body).Decode(&metadata)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ failures := 0
+ var lastmetadata ec2metadata
+ for range time.NewTicker(spotInterruptionCheckInterval).C {
+ err := check()
+ if err != nil {
+ runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+ failures++
+ if failures > 3 {
+ runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many errors")
+ return
+ }
+ continue
+ }
+ if metadata != lastmetadata {
+ lastmetadata = metadata
+ text := fmt.Sprintf("Cloud provider indicates instance action %q scheduled for time %q", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+ runner.CrunchLog.Printf("%s", text)
+ runner.updateRuntimeStatus("instance interruption", text)
+ if proc, err := os.FindProcess(os.Getpid()); err == nil {
+ // trigger updateLogs
+ proc.Signal(syscall.SIGUSR1)
+ }
+ }
+ }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(warning, detail string) {
err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
"container": arvadosclient.Dict{
"runtime_status": arvadosclient.Dict{
- "warning": "arv-mount: " + pattern,
- "warningDetail": text,
+ "warning": warning,
+ "warningDetail": detail,
},
},
- }, &updated)
+ }, nil)
if err != nil {
runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
}
cr.expectCgroupParent = p
}
+ if conf.EC2SpotCheck {
+ go cr.checkSpotInterruptionNotices()
+ }
+
runerr := cr.Run()
if *memprofile != "" {
"io"
"io/ioutil"
"log"
+ "net/http"
+ "net/http/httptest"
"os"
"os/exec"
"regexp"
c.Check(ran, Equals, false)
}
+func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
+ var failedOnce bool
+ var stoptime time.Time
+ token := "fake-ec2-metadata-token"
+ stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if !failedOnce {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ failedOnce = true
+ return
+ }
+ switch r.URL.Path {
+ case "/latest/api/token":
+ fmt.Fprintln(w, token)
+ case "/latest/meta-data/spot/instance-action":
+ if r.Header.Get("X-aws-ec2-metadata-token") != token {
+ w.WriteHeader(http.StatusUnauthorized)
+ } else if stoptime.IsZero() {
+ w.WriteHeader(http.StatusNotFound)
+ } else {
+ fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, stoptime.Format(time.RFC3339))
+ }
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+ defer stub.Close()
+
+ defer func(i time.Duration, u string) {
+ spotInterruptionCheckInterval = i
+ ec2MetadataBaseURL = u
+ }(spotInterruptionCheckInterval, ec2MetadataBaseURL)
+ spotInterruptionCheckInterval = time.Second / 4
+ ec2MetadataBaseURL = stub.URL
+
+ go s.runner.checkSpotInterruptionNotices()
+ s.fullRunHelper(c, `{
+ "command": ["sleep", "3"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "state": "Locked"
+}`, nil, 0, func() {
+ time.Sleep(time.Second)
+ stoptime = time.Now().Add(time.Minute).UTC()
+ time.Sleep(time.Second)
+ })
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 250ms using instance metadata at http://.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Cloud provider indicates instance action "stop" scheduled for time "`+stoptime.Format(time.RFC3339)+`".*`)
+}
+
func (s *TestSuite) TestRunTimeExceeded(c *C) {
s.fullRunHelper(c, `{
"command": ["sleep", "3"],