X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/458436270ce8fb80d421d55e192236c5ac4a225e..bfe0ea9b824dc057f07355a928ccb64ab68b6c57:/lib/crunchrun/crunchrun_test.go diff --git a/lib/crunchrun/crunchrun_test.go b/lib/crunchrun/crunchrun_test.go index 91a46e10ef..786f9410a8 100644 --- a/lib/crunchrun/crunchrun_test.go +++ b/lib/crunchrun/crunchrun_test.go @@ -12,6 +12,9 @@ import ( "fmt" "io" "io/ioutil" + "log" + "net/http" + "net/http/httptest" "os" "os/exec" "regexp" @@ -120,7 +123,7 @@ type stubExecutor struct { stopErr error stopped bool closed bool - runFunc func() + runFunc func() int exit chan int } @@ -132,10 +135,14 @@ func (e *stubExecutor) LoadImage(imageId string, tarball string, container arvad func (e *stubExecutor) Runtime() string { return "stub" } func (e *stubExecutor) Version() string { return "stub " + cmd.Version.String() } func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr } -func (e *stubExecutor) Start() error { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr } -func (e *stubExecutor) CgroupID() string { return "cgroupid" } -func (e *stubExecutor) Stop() error { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr } -func (e *stubExecutor) Close() { e.closed = true } +func (e *stubExecutor) Start() error { + e.exit = make(chan int, 1) + go func() { e.exit <- e.runFunc() }() + return e.startErr +} +func (e *stubExecutor) CgroupID() string { return "cgroupid" } +func (e *stubExecutor) Stop() error { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr } +func (e *stubExecutor) Close() { e.closed = true } func (e *stubExecutor) Wait(context.Context) (int, error) { return <-e.exit, e.waitErr } @@ -189,9 +196,10 @@ func (client *ArvTestClient) Create(resourceType string, if resourceType == "collections" && output != nil { mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string) + md5sum := md5.Sum([]byte(mt)) outmap := output.(*arvados.Collection) - outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt)) - outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%15.15x", md5.Sum([]byte(mt))) + outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5sum, len(mt)) + outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%015x", md5sum[:7]) } return nil @@ -539,9 +547,9 @@ func dockerLog(fd byte, msg string) []byte { } func (s *TestSuite) TestRunContainer(c *C) { - s.executor.runFunc = func() { + s.executor.runFunc = func() int { fmt.Fprintf(s.executor.created.Stdout, "Hello world\n") - s.executor.exit <- 0 + return 0 } var logs TestLogs @@ -595,7 +603,7 @@ func (s *TestSuite) TestUpdateContainerRunning(c *C) { cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") c.Assert(err, IsNil) - err = cr.UpdateContainerRunning() + err = cr.UpdateContainerRunning("") c.Check(err, IsNil) c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running") @@ -642,7 +650,7 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) { // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full // dress rehearsal of the Run() function, starting from a JSON container record. -func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func()) (*ArvTestClient, *ContainerRunner, string) { +func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn func() int) (*ArvTestClient, *ContainerRunner, string) { err := json.Unmarshal([]byte(record), &s.api.Container) c.Assert(err, IsNil) initialState := s.api.Container.State @@ -656,10 +664,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi c.Assert(err, IsNil) c.Logf("SecretMounts decoded %v json %q", sm, secretMounts) - s.executor.runFunc = func() { - fn() - s.executor.exit <- exitCode - } + s.executor.runFunc = fn s.runner.statInterval = 100 * time.Millisecond s.runner.containerWatchdogInterval = time.Second @@ -730,7 +735,7 @@ func (s *TestSuite) TestFullRunHello(c *C) { "runtime_constraints": {"vcpus":1,"ram":1000000}, "state": "Locked", "output_storage_classes": ["default"] -}`, nil, 0, func() { +}`, nil, func() int { c.Check(s.executor.created.Command, DeepEquals, []string{"echo", "hello world"}) c.Check(s.executor.created.Image, Equals, "sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678") c.Check(s.executor.created.Env, DeepEquals, map[string]string{"foo": "bar", "baz": "waz"}) @@ -740,6 +745,7 @@ func (s *TestSuite) TestFullRunHello(c *C) { c.Check(s.executor.created.EnableNetwork, Equals, false) c.Check(s.executor.created.CUDADeviceCount, Equals, 0) fmt.Fprintln(s.executor.created.Stdout, "hello world") + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -762,14 +768,71 @@ func (s *TestSuite) TestRunAlreadyRunning(c *C) { "runtime_constraints": {}, "scheduling_parameters":{"max_run_time": 1}, "state": "Running" -}`, nil, 2, func() { +}`, nil, func() int { ran = true + return 2 }) c.Check(s.api.CalledWith("container.state", "Cancelled"), IsNil) c.Check(s.api.CalledWith("container.state", "Complete"), IsNil) c.Check(ran, Equals, false) } +func (s *TestSuite) TestSpotInterruptionNotice(c *C) { + var failedOnce bool + var stoptime time.Time + token := "fake-ec2-metadata-token" + stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !failedOnce { + w.WriteHeader(http.StatusServiceUnavailable) + failedOnce = true + return + } + switch r.URL.Path { + case "/latest/api/token": + fmt.Fprintln(w, token) + case "/latest/meta-data/spot/instance-action": + if r.Header.Get("X-aws-ec2-metadata-token") != token { + w.WriteHeader(http.StatusUnauthorized) + } else if stoptime.IsZero() { + w.WriteHeader(http.StatusNotFound) + } else { + fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, stoptime.Format(time.RFC3339)) + } + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer stub.Close() + + defer func(i time.Duration, u string) { + spotInterruptionCheckInterval = i + ec2MetadataBaseURL = u + }(spotInterruptionCheckInterval, ec2MetadataBaseURL) + spotInterruptionCheckInterval = time.Second / 4 + ec2MetadataBaseURL = stub.URL + + go s.runner.checkSpotInterruptionNotices() + s.fullRunHelper(c, `{ + "command": ["sleep", "3"], + "container_image": "`+arvadostest.DockerImage112PDH+`", + "cwd": ".", + "environment": {}, + "mounts": {"/tmp": {"kind": "tmp"} }, + "output_path": "/tmp", + "priority": 1, + "runtime_constraints": {}, + "state": "Locked" +}`, nil, func() int { + time.Sleep(time.Second) + stoptime = time.Now().Add(time.Minute).UTC() + time.Sleep(time.Second) + return 0 + }) + c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 250ms using instance metadata at http://.*`) + c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`) + c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Cloud provider indicates instance action "stop" scheduled for time "`+stoptime.Format(time.RFC3339)+`".*`) +} + func (s *TestSuite) TestRunTimeExceeded(c *C) { s.fullRunHelper(c, `{ "command": ["sleep", "3"], @@ -782,8 +845,9 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) { "runtime_constraints": {}, "scheduling_parameters":{"max_run_time": 1}, "state": "Locked" -}`, nil, 0, func() { +}`, nil, func() int { time.Sleep(3 * time.Second) + return 0 }) c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) @@ -799,8 +863,9 @@ func (s *TestSuite) TestContainerWaitFails(c *C) { "output_path": "/tmp", "priority": 1, "state": "Locked" -}`, nil, 0, func() { +}`, nil, func() int { s.executor.waitErr = errors.New("Container is not running") + return 0 }) c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) @@ -818,8 +883,9 @@ func (s *TestSuite) TestCrunchstat(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" - }`, nil, 0, func() { + }`, nil, func() int { time.Sleep(time.Second) + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -851,10 +917,10 @@ func (s *TestSuite) TestNodeInfoLog(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" - }`, nil, 0, - func() { - time.Sleep(time.Second) - }) + }`, nil, func() int { + time.Sleep(time.Second) + return 0 + }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) @@ -885,9 +951,9 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" - }`, nil, 0, - func() { - }) + }`, nil, func() int { + return 0 + }) c.Assert(s.api.Logs["crunch-run"], NotNil) c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`) @@ -896,6 +962,42 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) { c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`) } +func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) { + var collection_create, container_update arvadosclient.Dict + s.fullRunHelper(c, `{ + "command": ["true"], + "container_image": "`+arvadostest.DockerImage112PDH+`", + "cwd": ".", + "environment": {}, + "mounts": {"/tmp": {"kind": "tmp"} }, + "output_path": "/tmp", + "priority": 1, + "runtime_constraints": {}, + "state": "Locked", + "uuid": "zzzzz-dz642-202301121543210" + }`, nil, func() int { + collection_create = s.api.CalledWith("ensure_unique_name", true) + container_update = s.api.CalledWith("container.state", "Running") + return 0 + }) + + c.Assert(collection_create, NotNil) + log_collection := collection_create["collection"].(arvadosclient.Dict) + c.Check(log_collection["name"], Equals, "logs for zzzzz-dz642-202301121543210") + manifest_text := log_collection["manifest_text"].(string) + // We check that the file size is at least two digits as an easy way to + // check the file isn't empty. + c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node-info\.txt( .+)?\n`) + c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node\.json( .+)?\n`) + + c.Assert(container_update, NotNil) + // As of Arvados 2.5.0, the container update must specify its log in PDH + // format for the API server to propagate it to container requests, which + // is what we care about for this test. + expect_pdh := fmt.Sprintf("%x+%d", md5.Sum([]byte(manifest_text)), len(manifest_text)) + c.Check(container_update["container"].(arvadosclient.Dict)["log"], Equals, expect_pdh) +} + func (s *TestSuite) TestContainerRecordLog(c *C) { s.fullRunHelper(c, `{ "command": ["sleep", "1"], @@ -907,9 +1009,10 @@ func (s *TestSuite) TestContainerRecordLog(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" - }`, nil, 0, - func() { + }`, nil, + func() int { time.Sleep(time.Second) + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -930,9 +1033,10 @@ func (s *TestSuite) TestFullRunStderr(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 1, func() { +}`, nil, func() int { fmt.Fprintln(s.executor.created.Stdout, "hello") fmt.Fprintln(s.executor.created.Stderr, "world") + return 1 }) final := s.api.CalledWith("container.state", "Complete") @@ -955,8 +1059,9 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 0, func() { +}`, nil, func() int { fmt.Fprintf(s.executor.created.Stdout, "workdir=%q", s.executor.created.WorkingDir) + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -976,8 +1081,9 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 0, func() { +}`, nil, func() int { fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir) + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -997,8 +1103,9 @@ func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) { "runtime_constraints": {}, "state": "Locked", "output_storage_classes": ["foo", "bar"] -}`, nil, 0, func() { +}`, nil, func() int { fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir) + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -1020,8 +1127,9 @@ func (s *TestSuite) TestEnableCUDADeviceCount(c *C) { "runtime_constraints": {"cuda": {"device_count": 2}}, "state": "Locked", "output_storage_classes": ["foo", "bar"] -}`, nil, 0, func() { +}`, nil, func() int { fmt.Fprintln(s.executor.created.Stdout, "ok") + return 0 }) c.Check(s.executor.created.CUDADeviceCount, Equals, 2) } @@ -1038,25 +1146,30 @@ func (s *TestSuite) TestEnableCUDAHardwareCapability(c *C) { "runtime_constraints": {"cuda": {"hardware_capability": "foo"}}, "state": "Locked", "output_storage_classes": ["foo", "bar"] -}`, nil, 0, func() { +}`, nil, func() int { fmt.Fprintln(s.executor.created.Stdout, "ok") + return 0 }) c.Check(s.executor.created.CUDADeviceCount, Equals, 0) } func (s *TestSuite) TestStopOnSignal(c *C) { - s.executor.runFunc = func() { + s.executor.runFunc = func() int { s.executor.created.Stdout.Write([]byte("foo\n")) s.runner.SigChan <- syscall.SIGINT + time.Sleep(10 * time.Second) + return 0 } s.testStopContainer(c) } func (s *TestSuite) TestStopOnArvMountDeath(c *C) { - s.executor.runFunc = func() { + s.executor.runFunc = func() int { s.executor.created.Stdout.Write([]byte("foo\n")) s.runner.ArvMountExit <- nil close(s.runner.ArvMountExit) + time.Sleep(10 * time.Second) + return 0 } s.runner.ArvMountExit = make(chan error) s.testStopContainer(c) @@ -1115,8 +1228,9 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 0, func() { +}`, nil, func() int { fmt.Fprintf(s.executor.created.Stdout, "%v", s.executor.created.Env) + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -1527,8 +1641,9 @@ func (s *TestSuite) TestStdout(c *C) { "state": "Locked" }` - s.fullRunHelper(c, helperRecord, nil, 0, func() { + s.fullRunHelper(c, helperRecord, nil, func() int { fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -1537,7 +1652,7 @@ func (s *TestSuite) TestStdout(c *C) { } // Used by the TestStdoutWithWrongPath*() -func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func()) (*ArvTestClient, *ContainerRunner, error) { +func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func() int) (*ArvTestClient, *ContainerRunner, error) { err := json.Unmarshal([]byte(record), &s.api.Container) c.Assert(err, IsNil) s.executor.runFunc = fn @@ -1553,7 +1668,7 @@ func (s *TestSuite) TestStdoutWithWrongPath(c *C) { "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} }, "output_path": "/tmp", "state": "Locked" -}`, func() {}) +}`, func() int { return 0 }) c.Check(err, ErrorMatches, ".*Stdout path does not start with OutputPath.*") } @@ -1562,7 +1677,7 @@ func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) { "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} }, "output_path": "/tmp", "state": "Locked" -}`, func() {}) +}`, func() int { return 0 }) c.Check(err, ErrorMatches, ".*unsupported mount kind 'tmp' for stdout.*") } @@ -1571,7 +1686,7 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) { "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} }, "output_path": "/tmp", "state": "Locked" -}`, func() {}) +}`, func() int { return 0 }) c.Check(err, ErrorMatches, ".*unsupported mount kind 'collection' for stdout.*") } @@ -1586,9 +1701,9 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) { "priority": 1, "runtime_constraints": {"API": true}, "state": "Locked" -}`, nil, 0, func() { +}`, nil, func() int { c.Check(s.executor.created.Env["ARVADOS_API_HOST"], Equals, os.Getenv("ARVADOS_API_HOST")) - s.executor.exit <- 3 + return 3 }) c.Check(s.api.CalledWith("container.exit_code", 3), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) @@ -1608,8 +1723,9 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) { "priority": 1, "runtime_constraints": {"API": true}, "state": "Locked" -}`, nil, 0, func() { +}`, nil, func() int { s.api.Container.Output = arvadostest.DockerImage112PDH + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -1623,9 +1739,9 @@ func (s *TestSuite) TestArvMountRuntimeStatusWarning(c *C) { ioutil.WriteFile(s.runner.ArvMountPoint+"/by_id/README", nil, 0666) return s.runner.ArvMountCmd([]string{"bash", "-c", "echo >&2 Test: Keep write error: I am a teapot; sleep 3"}, "") } - s.executor.runFunc = func() { + s.executor.runFunc = func() int { time.Sleep(time.Second) - s.executor.exit <- 137 + return 137 } record := `{ "command": ["sleep", "1"], @@ -1671,8 +1787,9 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"} - s.fullRunHelper(c, helperRecord, extraMounts, 0, func() { + s.fullRunHelper(c, helperRecord, extraMounts, func() int { fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -1697,7 +1814,8 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) { "output_path": "/tmp", "priority": 1, "runtime_constraints": {}, - "state": "Locked" + "state": "Locked", + "uuid": "zzzzz-dz642-202301130848001" }` extraMounts := []string{ @@ -1706,8 +1824,9 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) { "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt", } - api, _, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() { + api, _, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, func() int { fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) + return 0 }) c.Check(s.executor.created.BindMounts, DeepEquals, map[string]bindmount{ @@ -1720,22 +1839,25 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) { c.Check(api.CalledWith("container.exit_code", 0), NotNil) c.Check(api.CalledWith("container.state", "Complete"), NotNil) - for _, v := range api.Content { - if v["collection"] != nil { - c.Check(v["ensure_unique_name"], Equals, true) - collection := v["collection"].(arvadosclient.Dict) - if strings.Index(collection["name"].(string), "output") == 0 { - manifest := collection["manifest_text"].(string) - - c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out + output_count := uint(0) + for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content { + if v["collection"] == nil { + continue + } + collection := v["collection"].(arvadosclient.Dict) + if collection["name"].(string) != "output for zzzzz-dz642-202301130848001" { + continue + } + c.Check(v["ensure_unique_name"], Equals, true) + c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out ./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:bar 36:18:sub1file2 ./foo/baz 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 9:18:sub2file2 ./foo/sub1 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt ./foo/sub1/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt `) - } - } + output_count++ } + c.Check(output_count, Not(Equals), uint(0)) } func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(c *C) { @@ -1752,31 +1874,36 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest( "output_path": "/tmp", "priority": 1, "runtime_constraints": {}, - "state": "Locked" + "state": "Locked", + "uuid": "zzzzz-dz642-202301130848002" }` extraMounts := []string{ "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt", } - s.fullRunHelper(c, helperRecord, extraMounts, 0, func() { + s.fullRunHelper(c, helperRecord, extraMounts, func() int { fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) c.Check(s.api.CalledWith("container.state", "Complete"), NotNil) - for _, v := range s.api.Content { - if v["collection"] != nil { - collection := v["collection"].(arvadosclient.Dict) - if strings.Index(collection["name"].(string), "output") == 0 { - manifest := collection["manifest_text"].(string) - - c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out + output_count := uint(0) + for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content { + if v["collection"] == nil { + continue + } + collection := v["collection"].(arvadosclient.Dict) + if collection["name"].(string) != "output for zzzzz-dz642-202301130848002" { + continue + } + c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out ./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 10:17:bar `) - } - } + output_count++ } + c.Check(output_count, Not(Equals), uint(0)) } func (s *TestSuite) TestOutputError(c *C) { @@ -1793,8 +1920,9 @@ func (s *TestSuite) TestOutputError(c *C) { "runtime_constraints": {}, "state": "Locked" }` - s.fullRunHelper(c, helperRecord, nil, 0, func() { + s.fullRunHelper(c, helperRecord, nil, func() int { os.Symlink("/etc/hosts", s.runner.HostOutputDir+"/baz") + return 0 }) c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) @@ -1821,8 +1949,9 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) { "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt", } - api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() { + api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, func() int { fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) + return 0 }) c.Check(api.CalledWith("container.exit_code", 0), NotNil) @@ -1856,8 +1985,9 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) { "state": "Locked" }` - api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func() { + api, _, _ := s.fullRunHelper(c, helperRecord, nil, func() int { fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"]) + return 0 }) c.Check(api.CalledWith("container.exit_code", 0), NotNil) @@ -1887,9 +2017,10 @@ func (s *TestSuite) TestStderrMount(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 1, func() { +}`, nil, func() int { fmt.Fprintln(s.executor.created.Stdout, "hello") fmt.Fprintln(s.executor.created.Stderr, "oops") + return 1 }) final := api.CalledWith("container.state", "Complete") @@ -1939,7 +2070,7 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 0, func() {}) +}`, nil, func() int { return 0 }) c.Check(s.api.CalledWith("container.state", nextState), NotNil) c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*") if s.runner.brokenNodeHook != "" { @@ -1970,7 +2101,7 @@ func (s *TestSuite) TestBadCommand(c *C) { "priority": 1, "runtime_constraints": {}, "state": "Locked" -}`, nil, 0, func() {}) +}`, nil, func() int { return 0 }) c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil) c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*") } @@ -1993,10 +2124,11 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) { "state": "Locked" }` - s.fullRunHelper(c, helperRecord, nil, 0, func() { + s.fullRunHelper(c, helperRecord, nil, func() int { content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf") c.Check(err, IsNil) c.Check(string(content), Equals, "mypassword") + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -2022,10 +2154,11 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) { }` s.SetUpTest(c) - s.fullRunHelper(c, helperRecord, nil, 0, func() { + s.fullRunHelper(c, helperRecord, nil, func() int { content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf") c.Check(err, IsNil) c.Check(string(content), Equals, "mypassword") + return 0 }) c.Check(s.api.CalledWith("container.exit_code", 0), NotNil) @@ -2051,7 +2184,7 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) { }` s.SetUpTest(c) - _, _, realtemp := s.fullRunHelper(c, helperRecord, nil, 0, func() { + _, _, realtemp := s.fullRunHelper(c, helperRecord, nil, func() int { // secret.conf should be provisioned as a separate // bind mount, i.e., it should not appear in the // (fake) fuse filesystem as viewed from the host. @@ -2061,6 +2194,7 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) { } err = ioutil.WriteFile(s.runner.HostOutputDir+"/.arvados#collection", []byte(`{"manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"}`), 0700) c.Check(err, IsNil) + return 0 }) content, err := ioutil.ReadFile(realtemp + "/text1/mountdata.text") @@ -2076,7 +2210,10 @@ func (s *TestSuite) TestCalculateCost(c *C) { defer func(s string) { lockdir = s }(lockdir) lockdir = c.MkDir() now := time.Now() - cr := ContainerRunner{costStartTime: now.Add(-time.Hour)} + cr := s.runner + cr.costStartTime = now.Add(-time.Hour) + var logbuf bytes.Buffer + cr.CrunchLog.Immediate = log.New(&logbuf, "", 0) // if there's no InstanceType env var, cost is calculated as 0 os.Unsetenv("InstanceType") @@ -2087,6 +2224,7 @@ func (s *TestSuite) TestCalculateCost(c *C) { // hasn't found any data), cost is calculated based on // InstanceType env var os.Setenv("InstanceType", `{"Price":1.2}`) + defer os.Unsetenv("InstanceType") cost = cr.calculateCost(now) c.Check(cost, Equals, 1.2) @@ -2105,6 +2243,7 @@ func (s *TestSuite) TestCalculateCost(c *C) { // next update (via --list + SIGUSR2) tells us the spot price // increased to $3/h 15 minutes ago j, err = json.Marshal([]cloud.InstancePrice{ + {StartTime: now.Add(-time.Hour / 3), Price: 2.0}, // dup of -time.Hour/2 price {StartTime: now.Add(-time.Hour / 4), Price: 3.0}, }) c.Assert(err, IsNil) @@ -2112,6 +2251,13 @@ func (s *TestSuite) TestCalculateCost(c *C) { cr.loadPrices() cost = cr.calculateCost(now) c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4) + + cost = cr.calculateCost(now.Add(-time.Hour / 2)) + c.Check(cost, Equals, 0.5) + + c.Logf("%s", logbuf.String()) + c.Check(logbuf.String(), Matches, `(?ms).*Instance price changed to 1\.00 at 20.* changed to 2\.00 .* changed to 3\.00 .*`) + c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`) } type FakeProcess struct {