Merge branch '19624-priority-doc'
[arvados.git] / lib / crunchrun / crunchrun_test.go
index 62df0032b40800e9b2c2eb59ed4c42e639f9e9a1..aaba1c42045e325266534cdf45551ab43199c6f0 100644 (file)
@@ -12,6 +12,7 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "log"
        "os"
        "os/exec"
        "regexp"
@@ -22,6 +23,8 @@ import (
        "testing"
        "time"
 
+       "git.arvados.org/arvados.git/lib/cloud"
+       "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
        "git.arvados.org/arvados.git/sdk/go/arvadostest"
@@ -50,7 +53,6 @@ type TestSuite struct {
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
-       *brokenNodeHook = ""
        s.client = arvados.NewClientFromEnv()
        s.executor = &stubExecutor{}
        var err error
@@ -129,6 +131,7 @@ func (e *stubExecutor) LoadImage(imageId string, tarball string, container arvad
        return e.loadErr
 }
 func (e *stubExecutor) Runtime() string                 { return "stub" }
+func (e *stubExecutor) Version() string                 { return "stub " + cmd.Version.String() }
 func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr }
 func (e *stubExecutor) Start() error                    { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr }
 func (e *stubExecutor) CgroupID() string                { return "cgroupid" }
@@ -137,6 +140,10 @@ func (e *stubExecutor) Close()                          { e.closed = true }
 func (e *stubExecutor) Wait(context.Context) (int, error) {
        return <-e.exit, e.waitErr
 }
+func (e *stubExecutor) InjectCommand(ctx context.Context, _, _ string, _ bool, _ []string) (*exec.Cmd, error) {
+       return nil, errors.New("unimplemented")
+}
+func (e *stubExecutor) IPAddress() (string, error) { return "", errors.New("unimplemented") }
 
 const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234"
 
@@ -183,9 +190,10 @@ func (client *ArvTestClient) Create(resourceType string,
 
        if resourceType == "collections" && output != nil {
                mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
+               md5sum := md5.Sum([]byte(mt))
                outmap := output.(*arvados.Collection)
-               outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
-               outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%15.15x", md5.Sum([]byte(mt)))
+               outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5sum, len(mt))
+               outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%015x", md5sum[:7])
        }
 
        return nil
@@ -271,7 +279,7 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
                if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
                        client.WasSetRunning = true
                }
-       } else if resourceType == "collections" {
+       } else if resourceType == "collections" && output != nil {
                mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
                output.(*arvados.Collection).UUID = uuid
                output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt)))
@@ -589,7 +597,7 @@ func (s *TestSuite) TestUpdateContainerRunning(c *C) {
        cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
 
-       err = cr.UpdateContainerRunning()
+       err = cr.UpdateContainerRunning("")
        c.Check(err, IsNil)
 
        c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
@@ -886,7 +894,44 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
        c.Assert(s.api.Logs["crunch-run"], NotNil)
        c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
        c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
-       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container 'zzzzz-zzzzz-zzzzzzzzzzzzzzz' using stub runtime.*`)
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
+}
+
+func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
+       var collection_create, container_update arvadosclient.Dict
+       s.fullRunHelper(c, `{
+               "command": ["true"],
+               "container_image": "`+arvadostest.DockerImage112PDH+`",
+               "cwd": ".",
+               "environment": {},
+               "mounts": {"/tmp": {"kind": "tmp"} },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {},
+               "state": "Locked",
+               "uuid": "zzzzz-dz642-202301121543210"
+       }`, nil, 0,
+               func() {
+                       collection_create = s.api.CalledWith("ensure_unique_name", true)
+                       container_update = s.api.CalledWith("container.state", "Running")
+               })
+
+       c.Assert(collection_create, NotNil)
+       log_collection := collection_create["collection"].(arvadosclient.Dict)
+       c.Check(log_collection["name"], Equals, "logs for zzzzz-dz642-202301121543210")
+       manifest_text := log_collection["manifest_text"].(string)
+       // We check that the file size is at least two digits as an easy way to
+       // check the file isn't empty.
+       c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node-info\.txt( .+)?\n`)
+       c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node\.json( .+)?\n`)
+
+       c.Assert(container_update, NotNil)
+       // As of Arvados 2.5.0, the container update must specify its log in PDH
+       // format for the API server to propagate it to container requests, which
+       // is what we care about for this test.
+       expect_pdh := fmt.Sprintf("%x+%d", md5.Sum([]byte(manifest_text)), len(manifest_text))
+       c.Check(container_update["container"].(arvadosclient.Dict)["log"], Equals, expect_pdh)
 }
 
 func (s *TestSuite) TestContainerRecordLog(c *C) {
@@ -1293,7 +1338,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
                c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
-                       "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
+                       "--read-write", "--storage-classes", "default", "--crunchstat-interval=5", "--ram-cache",
                        "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
                c.Check(bindmounts, DeepEquals, map[string]bindmount{
                        "/keepinp": {realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", true},
@@ -1376,7 +1421,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
                bindmounts, err := cr.SetupMounts()
                c.Check(err, IsNil)
                c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
-                       "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
+                       "--read-write", "--storage-classes", "default", "--crunchstat-interval=5", "--ram-cache",
                        "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
                c.Check(bindmounts, DeepEquals, map[string]bindmount{
                        "/tmp":     {realTemp + "/tmp2", false},
@@ -1690,7 +1735,8 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
                "output_path": "/tmp",
                "priority": 1,
                "runtime_constraints": {},
-               "state": "Locked"
+               "state": "Locked",
+               "uuid": "zzzzz-dz642-202301130848001"
        }`
 
        extraMounts := []string{
@@ -1713,22 +1759,25 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       for _, v := range api.Content {
-               if v["collection"] != nil {
-                       c.Check(v["ensure_unique_name"], Equals, true)
-                       collection := v["collection"].(arvadosclient.Dict)
-                       if strings.Index(collection["name"].(string), "output") == 0 {
-                               manifest := collection["manifest_text"].(string)
-
-                               c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+       output_count := uint(0)
+       for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content {
+               if v["collection"] == nil {
+                       continue
+               }
+               collection := v["collection"].(arvadosclient.Dict)
+               if collection["name"].(string) != "output for zzzzz-dz642-202301130848001" {
+                       continue
+               }
+               c.Check(v["ensure_unique_name"], Equals, true)
+               c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
 ./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:bar 36:18:sub1file2
 ./foo/baz 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 9:18:sub2file2
 ./foo/sub1 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
 ./foo/sub1/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
 `)
-                       }
-               }
+               output_count++
        }
+       c.Check(output_count, Not(Equals), uint(0))
 }
 
 func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(c *C) {
@@ -1745,7 +1794,8 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                "output_path": "/tmp",
                "priority": 1,
                "runtime_constraints": {},
-               "state": "Locked"
+               "state": "Locked",
+               "uuid": "zzzzz-dz642-202301130848002"
        }`
 
        extraMounts := []string{
@@ -1758,18 +1808,21 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
-       for _, v := range s.api.Content {
-               if v["collection"] != nil {
-                       collection := v["collection"].(arvadosclient.Dict)
-                       if strings.Index(collection["name"].(string), "output") == 0 {
-                               manifest := collection["manifest_text"].(string)
-
-                               c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+       output_count := uint(0)
+       for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content {
+               if v["collection"] == nil {
+                       continue
+               }
+               collection := v["collection"].(arvadosclient.Dict)
+               if collection["name"].(string) != "output for zzzzz-dz642-202301130848002" {
+                       continue
+               }
+               c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
 ./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 10:17:bar
 `)
-                       }
-               }
+               output_count++
        }
+       c.Check(output_count, Not(Equals), uint(0))
 }
 
 func (s *TestSuite) TestOutputError(c *C) {
@@ -1914,8 +1967,8 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
                func() {
                        c.Log("// loadErr = cannot connect")
                        s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
-                       *brokenNodeHook = c.MkDir() + "/broken-node-hook"
-                       err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
+                       s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook"
+                       err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
                        c.Assert(err, IsNil)
                        nextState = "Queued"
                },
@@ -1935,7 +1988,7 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
 }`, nil, 0, func() {})
                c.Check(s.api.CalledWith("container.state", nextState), NotNil)
                c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
-               if *brokenNodeHook != "" {
+               if s.runner.brokenNodeHook != "" {
                        c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
                        c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
                        c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
@@ -2065,6 +2118,60 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
        c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"), NotNil)
 }
 
+func (s *TestSuite) TestCalculateCost(c *C) {
+       defer func(s string) { lockdir = s }(lockdir)
+       lockdir = c.MkDir()
+       now := time.Now()
+       cr := s.runner
+       cr.costStartTime = now.Add(-time.Hour)
+       var logbuf bytes.Buffer
+       cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
+
+       // if there's no InstanceType env var, cost is calculated as 0
+       os.Unsetenv("InstanceType")
+       cost := cr.calculateCost(now)
+       c.Check(cost, Equals, 0.0)
+
+       // with InstanceType env var and loadPrices() hasn't run (or
+       // hasn't found any data), cost is calculated based on
+       // InstanceType env var
+       os.Setenv("InstanceType", `{"Price":1.2}`)
+       defer os.Unsetenv("InstanceType")
+       cost = cr.calculateCost(now)
+       c.Check(cost, Equals, 1.2)
+
+       // first update tells us the spot price was $1/h until 30
+       // minutes ago when it increased to $2/h
+       j, err := json.Marshal([]cloud.InstancePrice{
+               {StartTime: now.Add(-4 * time.Hour), Price: 1.0},
+               {StartTime: now.Add(-time.Hour / 2), Price: 2.0},
+       })
+       c.Assert(err, IsNil)
+       os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+       cr.loadPrices()
+       cost = cr.calculateCost(now)
+       c.Check(cost, Equals, 1.5)
+
+       // next update (via --list + SIGUSR2) tells us the spot price
+       // increased to $3/h 15 minutes ago
+       j, err = json.Marshal([]cloud.InstancePrice{
+               {StartTime: now.Add(-time.Hour / 3), Price: 2.0}, // dup of -time.Hour/2 price
+               {StartTime: now.Add(-time.Hour / 4), Price: 3.0},
+       })
+       c.Assert(err, IsNil)
+       os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+       cr.loadPrices()
+       cost = cr.calculateCost(now)
+       c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
+
+       cost = cr.calculateCost(now.Add(-time.Hour / 2))
+       c.Check(cost, Equals, 0.5)
+
+       c.Logf("%s", logbuf.String())
+       c.Check(logbuf.String(), Matches, `(?ms).*Instance price changed to 1\.00 at 20.* changed to 2\.00 .* changed to 3\.00 .*`)
+       c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
+}
+
 type FakeProcess struct {
        cmdLine []string
 }