+func ec2MetadataServerStub(c *C, token *string, failureRate float64, stoptime *atomic.Value) *httptest.Server {
+ failedOnce := false
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if !failedOnce || rand.Float64() < failureRate {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ failedOnce = true
+ return
+ }
+ switch r.URL.Path {
+ case "/latest/api/token":
+ fmt.Fprintln(w, *token)
+ case "/latest/meta-data/spot/instance-action":
+ if r.Header.Get("X-aws-ec2-metadata-token") != *token {
+ w.WriteHeader(http.StatusUnauthorized)
+ } else if t, _ := stoptime.Load().(time.Time); t.IsZero() {
+ w.WriteHeader(http.StatusNotFound)
+ } else {
+ fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, t.Format(time.RFC3339))
+ }
+ default:
+ w.WriteHeader(http.StatusNotFound)
+ }
+ }))
+}
+
+func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
+ s.testSpotInterruptionNotice(c, 0.1)
+}
+
+func (s *TestSuite) TestSpotInterruptionNoticeNotAvailable(c *C) {
+ s.testSpotInterruptionNotice(c, 1)
+}
+
+func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
+ var stoptime atomic.Value
+ token := "fake-ec2-metadata-token"
+ stub := ec2MetadataServerStub(c, &token, failureRate, &stoptime)
+ defer stub.Close()
+
+ defer func(i time.Duration, u string) {
+ spotInterruptionCheckInterval = i
+ ec2MetadataBaseURL = u
+ }(spotInterruptionCheckInterval, ec2MetadataBaseURL)
+ spotInterruptionCheckInterval = time.Second / 8
+ ec2MetadataBaseURL = stub.URL
+
+ go s.runner.checkSpotInterruptionNotices()
+ s.fullRunHelper(c, `{
+ "command": ["sleep", "3"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "state": "Locked"
+}`, nil, func() int {
+ time.Sleep(time.Second)
+ stoptime.Store(time.Now().Add(time.Minute).UTC())
+ token = "different-fake-ec2-metadata-token"
+ time.Sleep(time.Second)
+ return 0
+ })
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+ if failureRate == 1 {
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
+ } else {
+ text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*`+text+`.*`)
+ c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
+ c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
+ c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
+ }
+}
+