20718: Cleans up by running "go mod tidy -compat=1.17"
[arvados.git] / lib / crunchrun / crunchrun_test.go
index 786f9410a8ac77783edc8e04652960ba56a1217e..aa20104f33f4ecc09c6dea5671253a5ed740fa1a 100644 (file)
@@ -6,6 +6,7 @@ package crunchrun
 
 import (
        "bytes"
+       "context"
        "crypto/md5"
        "encoding/json"
        "errors"
@@ -13,14 +14,20 @@ import (
        "io"
        "io/ioutil"
        "log"
+       "math/rand"
        "net/http"
        "net/http/httptest"
+       "net/http/httputil"
+       "net/url"
        "os"
        "os/exec"
+       "path"
        "regexp"
        "runtime/pprof"
+       "strconv"
        "strings"
        "sync"
+       "sync/atomic"
        "syscall"
        "testing"
        "time"
@@ -31,9 +38,10 @@ import (
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
        "git.arvados.org/arvados.git/sdk/go/manifest"
-       "golang.org/x/net/context"
 
        . "gopkg.in/check.v1"
+       git_client "gopkg.in/src-d/go-git.v4/plumbing/transport/client"
+       git_http "gopkg.in/src-d/go-git.v4/plumbing/transport/http"
 )
 
 // Gocheck boilerplate
@@ -41,6 +49,8 @@ func TestCrunchExec(t *testing.T) {
        TestingT(t)
 }
 
+const logLineStart = `(?m)(.*\n)*\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z `
+
 var _ = Suite(&TestSuite{})
 
 type TestSuite struct {
@@ -410,6 +420,67 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
        return nil, nil
 }
 
+type apiStubServer struct {
+       server    *httptest.Server
+       proxy     *httputil.ReverseProxy
+       intercept func(http.ResponseWriter, *http.Request) bool
+
+       container arvados.Container
+       logs      map[string]string
+}
+
+func apiStub() (*arvados.Client, *apiStubServer) {
+       client := arvados.NewClientFromEnv()
+       apistub := &apiStubServer{}
+       apistub.server = httptest.NewTLSServer(apistub)
+       apistub.proxy = httputil.NewSingleHostReverseProxy(&url.URL{Scheme: "https", Host: client.APIHost})
+       if client.Insecure {
+               apistub.proxy.Transport = arvados.InsecureHTTPClient.Transport
+       }
+       client.APIHost = apistub.server.Listener.Addr().String()
+       return client, apistub
+}
+
+func (apistub *apiStubServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+       if apistub.intercept != nil && apistub.intercept(w, r) {
+               return
+       }
+       if r.Method == "POST" && r.URL.Path == "/arvados/v1/logs" {
+               var body struct {
+                       Log struct {
+                               EventType  string `json:"event_type"`
+                               Properties struct {
+                                       Text string
+                               }
+                       }
+               }
+               json.NewDecoder(r.Body).Decode(&body)
+               apistub.logs[body.Log.EventType] += body.Log.Properties.Text
+               return
+       }
+       if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+hwPDH {
+               json.NewEncoder(w).Encode(arvados.Collection{ManifestText: hwManifest})
+               return
+       }
+       if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+otherPDH {
+               json.NewEncoder(w).Encode(arvados.Collection{ManifestText: otherManifest})
+               return
+       }
+       if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+normalizedWithSubdirsPDH {
+               json.NewEncoder(w).Encode(arvados.Collection{ManifestText: normalizedManifestWithSubdirs})
+               return
+       }
+       if r.Method == "GET" && r.URL.Path == "/arvados/v1/collections/"+denormalizedWithSubdirsPDH {
+               json.NewEncoder(w).Encode(arvados.Collection{ManifestText: denormalizedManifestWithSubdirs})
+               return
+       }
+       if r.Method == "GET" && r.URL.Path == "/arvados/v1/containers/"+apistub.container.UUID {
+               json.NewEncoder(w).Encode(apistub.container)
+               return
+       }
+       apistub.proxy.ServeHTTP(w, r)
+}
+
 func (s *TestSuite) TestLoadImage(c *C) {
        s.runner.Container.ContainerImage = arvadostest.DockerImage112PDH
        s.runner.Container.Mounts = map[string]arvados.Mount{
@@ -681,8 +752,9 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn
                }
                return d, err
        }
+       client, _ := apiStub()
        s.runner.MkArvClient = func(token string) (IArvadosClient, IKeepClient, *arvados.Client, error) {
-               return &ArvTestClient{secretMounts: secretMounts}, &s.testContainerKeepClient, nil, nil
+               return &ArvTestClient{secretMounts: secretMounts}, &s.testContainerKeepClient, client, nil
        }
 
        if extraMounts != nil && len(extraMounts) > 0 {
@@ -777,38 +849,50 @@ func (s *TestSuite) TestRunAlreadyRunning(c *C) {
        c.Check(ran, Equals, false)
 }
 
-func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
-       var failedOnce bool
-       var stoptime time.Time
-       token := "fake-ec2-metadata-token"
-       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-               if !failedOnce {
+func ec2MetadataServerStub(c *C, token *string, failureRate float64, stoptime *atomic.Value) *httptest.Server {
+       failedOnce := false
+       return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               if !failedOnce || rand.Float64() < failureRate {
                        w.WriteHeader(http.StatusServiceUnavailable)
                        failedOnce = true
                        return
                }
                switch r.URL.Path {
                case "/latest/api/token":
-                       fmt.Fprintln(w, token)
+                       fmt.Fprintln(w, *token)
                case "/latest/meta-data/spot/instance-action":
-                       if r.Header.Get("X-aws-ec2-metadata-token") != token {
+                       if r.Header.Get("X-aws-ec2-metadata-token") != *token {
                                w.WriteHeader(http.StatusUnauthorized)
-                       } else if stoptime.IsZero() {
+                       } else if t, _ := stoptime.Load().(time.Time); t.IsZero() {
                                w.WriteHeader(http.StatusNotFound)
                        } else {
-                               fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, stoptime.Format(time.RFC3339))
+                               fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, t.Format(time.RFC3339))
                        }
                default:
                        w.WriteHeader(http.StatusNotFound)
                }
        }))
+}
+
+func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
+       s.testSpotInterruptionNotice(c, 0.1)
+}
+
+func (s *TestSuite) TestSpotInterruptionNoticeNotAvailable(c *C) {
+       s.testSpotInterruptionNotice(c, 1)
+}
+
+func (s *TestSuite) testSpotInterruptionNotice(c *C, failureRate float64) {
+       var stoptime atomic.Value
+       token := "fake-ec2-metadata-token"
+       stub := ec2MetadataServerStub(c, &token, failureRate, &stoptime)
        defer stub.Close()
 
        defer func(i time.Duration, u string) {
                spotInterruptionCheckInterval = i
                ec2MetadataBaseURL = u
        }(spotInterruptionCheckInterval, ec2MetadataBaseURL)
-       spotInterruptionCheckInterval = time.Second / 4
+       spotInterruptionCheckInterval = time.Second / 8
        ec2MetadataBaseURL = stub.URL
 
        go s.runner.checkSpotInterruptionNotices()
@@ -824,13 +908,22 @@ func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
     "state": "Locked"
 }`, nil, func() int {
                time.Sleep(time.Second)
-               stoptime = time.Now().Add(time.Minute).UTC()
+               stoptime.Store(time.Now().Add(time.Minute).UTC())
+               token = "different-fake-ec2-metadata-token"
                time.Sleep(time.Second)
                return 0
        })
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 250ms using instance metadata at http://.*`)
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 125ms using instance metadata at http://.*`)
        c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Cloud provider indicates instance action "stop" scheduled for time "`+stoptime.Format(time.RFC3339)+`".*`)
+       if failureRate == 1 {
+               c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Giving up on checking spot interruptions after too many consecutive failures.*`)
+       } else {
+               text := `Cloud provider scheduled instance stop at ` + stoptime.Load().(time.Time).Format(time.RFC3339)
+               c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*`+text+`.*`)
+               c.Check(s.api.CalledWith("container.runtime_status.warning", "preemption notice"), NotNil)
+               c.Check(s.api.CalledWith("container.runtime_status.warningDetail", text), NotNil)
+               c.Check(s.api.CalledWith("container.runtime_status.preemptionNotice", text), NotNil)
+       }
 }
 
 func (s *TestSuite) TestRunTimeExceeded(c *C) {
@@ -962,6 +1055,70 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
        c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
 }
 
+func (s *TestSuite) testLogRSSThresholds(c *C, ram int, expected []int, notExpected int) {
+       s.runner.cgroupRoot = "testdata/fakestat"
+       s.fullRunHelper(c, `{
+               "command": ["true"],
+               "container_image": "`+arvadostest.DockerImage112PDH+`",
+               "cwd": ".",
+               "environment": {},
+               "mounts": {"/tmp": {"kind": "tmp"} },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {"ram": `+strconv.Itoa(ram)+`},
+               "state": "Locked"
+       }`, nil, func() int { return 0 })
+       logs := s.api.Logs["crunch-run"].String()
+       pattern := logLineStart + `Container using over %d%% of memory \(rss 734003200/%d bytes\)`
+       var threshold int
+       for _, threshold = range expected {
+               c.Check(logs, Matches, fmt.Sprintf(pattern, threshold, ram))
+       }
+       if notExpected > threshold {
+               c.Check(logs, Not(Matches), fmt.Sprintf(pattern, notExpected, ram))
+       }
+}
+
+func (s *TestSuite) TestLogNoRSSThresholds(c *C) {
+       s.testLogRSSThresholds(c, 7340032000, []int{}, 90)
+}
+
+func (s *TestSuite) TestLogSomeRSSThresholds(c *C) {
+       onePercentRSS := 7340032
+       s.testLogRSSThresholds(c, 102*onePercentRSS, []int{90, 95}, 99)
+}
+
+func (s *TestSuite) TestLogAllRSSThresholds(c *C) {
+       s.testLogRSSThresholds(c, 734003299, []int{90, 95, 99}, 0)
+}
+
+func (s *TestSuite) TestLogMaximaAfterRun(c *C) {
+       s.runner.cgroupRoot = "testdata/fakestat"
+       s.runner.parentTemp = c.MkDir()
+       s.fullRunHelper(c, `{
+        "command": ["true"],
+        "container_image": "`+arvadostest.DockerImage112PDH+`",
+        "cwd": ".",
+        "environment": {},
+        "mounts": {"/tmp": {"kind": "tmp"} },
+        "output_path": "/tmp",
+        "priority": 1,
+        "runtime_constraints": {"ram": 7340032000},
+        "state": "Locked"
+    }`, nil, func() int { return 0 })
+       logs := s.api.Logs["crunch-run"].String()
+       for _, expected := range []string{
+               `Maximum disk usage was \d+%, \d+/\d+ bytes`,
+               `Maximum container memory cache usage was 73400320 bytes`,
+               `Maximum container memory swap usage was 320 bytes`,
+               `Maximum container memory pgmajfault usage was 20 faults`,
+               `Maximum container memory rss usage was 10%, 734003200/7340032000 bytes`,
+               `Maximum crunch-run memory rss usage was \d+ bytes`,
+       } {
+               c.Check(logs, Matches, logLineStart+expected)
+       }
+}
+
 func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
        var collection_create, container_update arvadosclient.Dict
        s.fullRunHelper(c, `{
@@ -1261,6 +1418,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
        cr := s.runner
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
+       cr.containerClient, _ = apiStub()
        cr.ContainerArvClient = &ArvTestClient{}
        cr.ContainerKeepClient = &KeepTestClient{}
        cr.Container.OutputStorageClasses = []string{"default"}
@@ -1583,7 +1741,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
        {
                i = 0
                cr.ArvMountPoint = ""
-               (*GitMountSuite)(nil).useTestGitServer(c)
+               git_client.InstallProtocol("https", git_http.NewClient(arvados.InsecureHTTPClient))
                cr.token = arvadostest.ActiveToken
                cr.Container.Mounts = make(map[string]arvados.Mount)
                cr.Container.Mounts = map[string]arvados.Mount{
@@ -2260,6 +2418,80 @@ func (s *TestSuite) TestCalculateCost(c *C) {
        c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
 }
 
+func (s *TestSuite) TestSIGUSR2CostUpdate(c *C) {
+       pid := os.Getpid()
+       now := time.Now()
+       pricesJSON, err := json.Marshal([]cloud.InstancePrice{
+               {StartTime: now.Add(-4 * time.Hour), Price: 2.4},
+               {StartTime: now.Add(-2 * time.Hour), Price: 2.6},
+       })
+       c.Assert(err, IsNil)
+
+       os.Setenv("InstanceType", `{"Price":2.2}`)
+       defer os.Unsetenv("InstanceType")
+       defer func(s string) { lockdir = s }(lockdir)
+       lockdir = c.MkDir()
+
+       // We can't use s.api.CalledWith because timing differences will yield
+       // different cost values across runs. getCostUpdate iterates over API
+       // calls until it finds one that sets the cost, then writes that value
+       // to the next index of costUpdates.
+       deadline := now.Add(time.Second)
+       costUpdates := make([]float64, 2)
+       costIndex := 0
+       apiIndex := 0
+       getCostUpdate := func() {
+               for ; time.Now().Before(deadline); time.Sleep(time.Second / 10) {
+                       for apiIndex < len(s.api.Content) {
+                               update := s.api.Content[apiIndex]
+                               apiIndex++
+                               var ok bool
+                               var cost float64
+                               if update, ok = update["container"].(arvadosclient.Dict); !ok {
+                                       continue
+                               }
+                               if cost, ok = update["cost"].(float64); !ok {
+                                       continue
+                               }
+                               c.Logf("API call #%d updates cost to %v", apiIndex-1, cost)
+                               costUpdates[costIndex] = cost
+                               costIndex++
+                               return
+                       }
+               }
+       }
+
+       s.fullRunHelper(c, `{
+               "command": ["true"],
+               "container_image": "`+arvadostest.DockerImage112PDH+`",
+               "cwd": ".",
+               "environment": {},
+               "mounts": {"/tmp": {"kind": "tmp"} },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {},
+               "state": "Locked",
+               "uuid": "zzzzz-dz642-20230320101530a"
+       }`, nil, func() int {
+               s.runner.costStartTime = now.Add(-3 * time.Hour)
+               err := syscall.Kill(pid, syscall.SIGUSR2)
+               c.Check(err, IsNil, Commentf("error sending first SIGUSR2 to runner"))
+               getCostUpdate()
+
+               err = os.WriteFile(path.Join(lockdir, pricesfile), pricesJSON, 0o700)
+               c.Check(err, IsNil, Commentf("error writing JSON prices file"))
+               err = syscall.Kill(pid, syscall.SIGUSR2)
+               c.Check(err, IsNil, Commentf("error sending second SIGUSR2 to runner"))
+               getCostUpdate()
+
+               return 0
+       })
+       // Comparing with format strings makes it easy to ignore minor variations
+       // in cost across runs while keeping diagnostics pretty.
+       c.Check(fmt.Sprintf("%.3f", costUpdates[0]), Equals, "6.600")
+       c.Check(fmt.Sprintf("%.3f", costUpdates[1]), Equals, "7.600")
+}
+
 type FakeProcess struct {
        cmdLine []string
 }