import (
"bytes"
+ "context"
"crypto/md5"
"encoding/json"
"errors"
"io"
"io/ioutil"
"log"
+ "math/rand"
"net/http"
"net/http/httptest"
"os"
"os/exec"
+ "path"
"regexp"
"runtime/pprof"
+ "strconv"
"strings"
"sync"
+ "sync/atomic"
"syscall"
"testing"
"time"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
"git.arvados.org/arvados.git/sdk/go/manifest"
- "golang.org/x/net/context"
. "gopkg.in/check.v1"
)
TestingT(t)
}
+const logLineStart = `(?m)(.*\n)*\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z `
+
var _ = Suite(&TestSuite{})
type TestSuite struct {
c.Check(ran, Equals, false)
}
-func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
- var failedOnce bool
- var stoptime time.Time
- token := "fake-ec2-metadata-token"
- stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if !failedOnce {
+func ec2MetadataServerStub(c *C, token *string, failureRate float64, stoptime *atomic.Value) *httptest.Server {
+ failedOnce := false
+ return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if !failedOnce || rand.Float64() < failureRate {
w.WriteHeader(http.StatusServiceUnavailable)
failedOnce = true
return
}
switch r.URL.Path {
case "/latest/api/token":
- fmt.Fprintln(w, token)
+ fmt.Fprintln(w, *token)
case "/latest/meta-data/spot/instance-action":
- if r.Header.Get("X-aws-ec2-metadata-token") != token {
+ if r.Header.Get("X-aws-ec2-metadata-token") != *token {
w.WriteHeader(http.StatusUnauthorized)
- } else if stoptime.IsZero() {
+ } else if t, _ := stoptime.Load().(time.Time); t.IsZero() {
w.WriteHeader(http.StatusNotFound)
} else {
- fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, stoptime.Format(time.RFC3339))
+ fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, t.Format(time.RFC3339))
}
default:
w.WriteHeader(http.StatusNotFound)
}
}))
+}
+
+func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
+ s.testSpotInterruptionNotice(c, 0.1)
+}
+
+func (s *TestSuite) TestSpotInterruptionNoticeNotAvailable(c *C) {
+ s.testSpotInterruptionNotice(c, 1)
+}
+
+func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
+ var stoptime atomic.Value
+ token := "fake-ec2-metadata-token"
+ stub := ec2MetadataServerStub(c, &token, failureRate, &stoptime)
defer stub.Close()
defer func(i time.Duration, u string) {
spotInterruptionCheckInterval = i
ec2MetadataBaseURL = u
}(spotInterruptionCheckInterval, ec2MetadataBaseURL)
- spotInterruptionCheckInterval = time.Second / 4
+ spotInterruptionCheckInterval = time.Second / 8
ec2MetadataBaseURL = stub.URL
go s.runner.checkSpotInterruptionNotices()
"state": "Locked"
}`, nil, func() int {
time.Sleep(time.Second)
- stoptime = time.Now().Add(time.Minute).UTC()
+ stoptime.Store(time.Now().Add(time.Minute).UTC())
+ token = "different-fake-ec2-metadata-token"
time.Sleep(time.Second)
return 0
})
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 250ms using instance metadata at http://.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Cloud provider indicates instance action "stop" scheduled for time "`+stoptime.Format(time.RFC3339)+`".*`)
+ if failureRate == 1 {
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
+ } else {
+ text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*`+text+`.*`)
+ c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
+ c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
+ c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
+ }
}
func (s *TestSuite) TestRunTimeExceeded(c *C) {
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
}
+func (s *TestSuite) testLogRSSThresholds(c *C, ram int, expected []int, notExpected int) {
+ s.runner.cgroupRoot = "testdata/fakestat"
+ s.fullRunHelper(c, `{
+ "command": ["true"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"ram": `+strconv.Itoa(ram)+`},
+ "state": "Locked"
+ }`, nil, func() int { return 0 })
+ logs := s.api.Logs["crunch-run"].String()
+ pattern := logLineStart + `Container using over %d%% of memory \(rss 734003200/%d bytes\)`
+ var threshold int
+ for _, threshold = range expected {
+ c.Check(logs, Matches, fmt.Sprintf(pattern, threshold, ram))
+ }
+ if notExpected > threshold {
+ c.Check(logs, Not(Matches), fmt.Sprintf(pattern, notExpected, ram))
+ }
+}
+
+func (s *TestSuite) TestLogNoRSSThresholds(c *C) {
+ s.testLogRSSThresholds(c, 7340032000, []int{}, 90)
+}
+
+func (s *TestSuite) TestLogSomeRSSThresholds(c *C) {
+ onePercentRSS := 7340032
+ s.testLogRSSThresholds(c, 102*onePercentRSS, []int{90, 95}, 99)
+}
+
+func (s *TestSuite) TestLogAllRSSThresholds(c *C) {
+ s.testLogRSSThresholds(c, 734003299, []int{90, 95, 99}, 0)
+}
+
+func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
+ s.runner.cgroupRoot = "testdata/fakestat"
+ s.runner.parentTemp = c.MkDir()
+ s.fullRunHelper(c, `{
+ "command": ["true"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"ram": 7340032000},
+ "state": "Locked"
+ }`, nil, func() int { return 0 })
+ logs := s.api.Logs["crunch-run"].String()
+ for _, expected := range []string{
+ `Maximum disk usage was \d+%, \d+/\d+ bytes`,
+ `Maximum container memory cache usage was 73400320 bytes`,
+ `Maximum container memory swap usage was 320 bytes`,
+ `Maximum container memory pgmajfault usage was 20 faults`,
+ `Maximum container memory rss usage was 10%, 734003200/7340032000 bytes`,
+ `Maximum crunch-run memory rss usage was \d+ bytes`,
+ } {
+ c.Check(logs, Matches, logLineStart+expected)
+ }
+}
+
func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
var collection_create, container_update arvadosclient.Dict
s.fullRunHelper(c, `{
c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
}
+func (s *TestSuite) TestSIGUSR2CostUpdate(c *C) {
+ pid := os.Getpid()
+ now := time.Now()
+ pricesJSON, err := json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-4 * time.Hour), Price: 2.4},
+ {StartTime: now.Add(-2 * time.Hour), Price: 2.6},
+ })
+ c.Assert(err, IsNil)
+
+ os.Setenv("InstanceType", `{"Price":2.2}`)
+ defer os.Unsetenv("InstanceType")
+ defer func(s string) { lockdir = s }(lockdir)
+ lockdir = c.MkDir()
+
+ // We can't use s.api.CalledWith because timing differences will yield
+ // different cost values across runs. getCostUpdate iterates over API
+ // calls until it finds one that sets the cost, then writes that value
+ // to the next index of costUpdates.
+ deadline := now.Add(time.Second)
+ costUpdates := make([]float64, 2)
+ costIndex := 0
+ apiIndex := 0
+ getCostUpdate := func() {
+ for ; time.Now().Before(deadline); time.Sleep(time.Second / 10) {
+ for apiIndex < len(s.api.Content) {
+ update := s.api.Content[apiIndex]
+ apiIndex++
+ var ok bool
+ var cost float64
+ if update, ok = update["container"].(arvadosclient.Dict); !ok {
+ continue
+ }
+ if cost, ok = update["cost"].(float64); !ok {
+ continue
+ }
+ c.Logf("API call #%d updates cost to %v", apiIndex-1, cost)
+ costUpdates[costIndex] = cost
+ costIndex++
+ return
+ }
+ }
+ }
+
+ s.fullRunHelper(c, `{
+ "command": ["true"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "state": "Locked",
+ "uuid": "zzzzz-dz642-20230320101530a"
+ }`, nil, func() int {
+ s.runner.costStartTime = now.Add(-3 * time.Hour)
+ err := syscall.Kill(pid, syscall.SIGUSR2)
+ c.Check(err, IsNil, Commentf("error sending first SIGUSR2 to runner"))
+ getCostUpdate()
+
+ err = os.WriteFile(path.Join(lockdir, pricesfile), pricesJSON, 0o700)
+ c.Check(err, IsNil, Commentf("error writing JSON prices file"))
+ err = syscall.Kill(pid, syscall.SIGUSR2)
+ c.Check(err, IsNil, Commentf("error sending second SIGUSR2 to runner"))
+ getCostUpdate()
+
+ return 0
+ })
+ // Comparing with format strings makes it easy to ignore minor variations
+ // in cost across runs while keeping diagnostics pretty.
+ c.Check(fmt.Sprintf("%.3f", costUpdates[0]), Equals, "6.600")
+ c.Check(fmt.Sprintf("%.3f", costUpdates[1]), Equals, "7.600")
+}
+
type FakeProcess struct {
cmdLine []string
}