19961: Merge branch 'main'
[arvados.git] / lib / crunchrun / crunchrun_test.go
index 1f4681e3a7bbcab1545e2f2003bed2d0af9d4897..786f9410a8ac77783edc8e04652960ba56a1217e 100644 (file)
@@ -12,6 +12,9 @@ import (
        "fmt"
        "io"
        "io/ioutil"
+       "log"
+       "net/http"
+       "net/http/httptest"
        "os"
        "os/exec"
        "regexp"
@@ -22,6 +25,7 @@ import (
        "testing"
        "time"
 
+       "git.arvados.org/arvados.git/lib/cloud"
        "git.arvados.org/arvados.git/lib/cmd"
        "git.arvados.org/arvados.git/sdk/go/arvados"
        "git.arvados.org/arvados.git/sdk/go/arvadosclient"
@@ -119,7 +123,7 @@ type stubExecutor struct {
        stopErr     error
        stopped     bool
        closed      bool
-       runFunc     func()
+       runFunc     func() int
        exit        chan int
 }
 
@@ -131,10 +135,14 @@ func (e *stubExecutor) LoadImage(imageId string, tarball string, container arvad
 func (e *stubExecutor) Runtime() string                 { return "stub" }
 func (e *stubExecutor) Version() string                 { return "stub " + cmd.Version.String() }
 func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr }
-func (e *stubExecutor) Start() error                    { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr }
-func (e *stubExecutor) CgroupID() string                { return "cgroupid" }
-func (e *stubExecutor) Stop() error                     { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
-func (e *stubExecutor) Close()                          { e.closed = true }
+func (e *stubExecutor) Start() error {
+       e.exit = make(chan int, 1)
+       go func() { e.exit <- e.runFunc() }()
+       return e.startErr
+}
+func (e *stubExecutor) CgroupID() string { return "cgroupid" }
+func (e *stubExecutor) Stop() error      { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
+func (e *stubExecutor) Close()           { e.closed = true }
 func (e *stubExecutor) Wait(context.Context) (int, error) {
        return <-e.exit, e.waitErr
 }
@@ -539,9 +547,9 @@ func dockerLog(fd byte, msg string) []byte {
 }
 
 func (s *TestSuite) TestRunContainer(c *C) {
-       s.executor.runFunc = func() {
+       s.executor.runFunc = func() int {
                fmt.Fprintf(s.executor.created.Stdout, "Hello world\n")
-               s.executor.exit <- 0
+               return 0
        }
 
        var logs TestLogs
@@ -595,7 +603,7 @@ func (s *TestSuite) TestUpdateContainerRunning(c *C) {
        cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
        c.Assert(err, IsNil)
 
-       err = cr.UpdateContainerRunning()
+       err = cr.UpdateContainerRunning("")
        c.Check(err, IsNil)
 
        c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
@@ -642,7 +650,7 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
 
 // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
 // dress rehearsal of the Run() function, starting from a JSON container record.
-func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func()) (*ArvTestClient, *ContainerRunner, string) {
+func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, fn func() int) (*ArvTestClient, *ContainerRunner, string) {
        err := json.Unmarshal([]byte(record), &s.api.Container)
        c.Assert(err, IsNil)
        initialState := s.api.Container.State
@@ -656,10 +664,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
        c.Assert(err, IsNil)
        c.Logf("SecretMounts decoded %v json %q", sm, secretMounts)
 
-       s.executor.runFunc = func() {
-               fn()
-               s.executor.exit <- exitCode
-       }
+       s.executor.runFunc = fn
 
        s.runner.statInterval = 100 * time.Millisecond
        s.runner.containerWatchdogInterval = time.Second
@@ -730,7 +735,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
     "runtime_constraints": {"vcpus":1,"ram":1000000},
     "state": "Locked",
     "output_storage_classes": ["default"]
-}`, nil, 0, func() {
+}`, nil, func() int {
                c.Check(s.executor.created.Command, DeepEquals, []string{"echo", "hello world"})
                c.Check(s.executor.created.Image, Equals, "sha256:d8309758b8fe2c81034ffc8a10c36460b77db7bc5e7b448c4e5b684f9d95a678")
                c.Check(s.executor.created.Env, DeepEquals, map[string]string{"foo": "bar", "baz": "waz"})
@@ -740,6 +745,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
                c.Check(s.executor.created.EnableNetwork, Equals, false)
                c.Check(s.executor.created.CUDADeviceCount, Equals, 0)
                fmt.Fprintln(s.executor.created.Stdout, "hello world")
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -762,14 +768,71 @@ func (s *TestSuite) TestRunAlreadyRunning(c *C) {
     "runtime_constraints": {},
     "scheduling_parameters":{"max_run_time": 1},
     "state": "Running"
-}`, nil, 2, func() {
+}`, nil, func() int {
                ran = true
+               return 2
        })
        c.Check(s.api.CalledWith("container.state", "Cancelled"), IsNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), IsNil)
        c.Check(ran, Equals, false)
 }
 
+func (s *TestSuite) TestSpotInterruptionNotice(c *C) {
+       var failedOnce bool
+       var stoptime time.Time
+       token := "fake-ec2-metadata-token"
+       stub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+               if !failedOnce {
+                       w.WriteHeader(http.StatusServiceUnavailable)
+                       failedOnce = true
+                       return
+               }
+               switch r.URL.Path {
+               case "/latest/api/token":
+                       fmt.Fprintln(w, token)
+               case "/latest/meta-data/spot/instance-action":
+                       if r.Header.Get("X-aws-ec2-metadata-token") != token {
+                               w.WriteHeader(http.StatusUnauthorized)
+                       } else if stoptime.IsZero() {
+                               w.WriteHeader(http.StatusNotFound)
+                       } else {
+                               fmt.Fprintf(w, `{"action":"stop","time":"%s"}`, stoptime.Format(time.RFC3339))
+                       }
+               default:
+                       w.WriteHeader(http.StatusNotFound)
+               }
+       }))
+       defer stub.Close()
+
+       defer func(i time.Duration, u string) {
+               spotInterruptionCheckInterval = i
+               ec2MetadataBaseURL = u
+       }(spotInterruptionCheckInterval, ec2MetadataBaseURL)
+       spotInterruptionCheckInterval = time.Second / 4
+       ec2MetadataBaseURL = stub.URL
+
+       go s.runner.checkSpotInterruptionNotices()
+       s.fullRunHelper(c, `{
+    "command": ["sleep", "3"],
+    "container_image": "`+arvadostest.DockerImage112PDH+`",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+    "runtime_constraints": {},
+    "state": "Locked"
+}`, nil, func() int {
+               time.Sleep(time.Second)
+               stoptime = time.Now().Add(time.Minute).UTC()
+               time.Sleep(time.Second)
+               return 0
+       })
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Checking for spot interruptions every 250ms using instance metadata at http://.*`)
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Error checking spot interruptions: 503 Service Unavailable.*`)
+       c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Cloud provider indicates instance action "stop" scheduled for time "`+stoptime.Format(time.RFC3339)+`".*`)
+}
+
 func (s *TestSuite) TestRunTimeExceeded(c *C) {
        s.fullRunHelper(c, `{
     "command": ["sleep", "3"],
@@ -782,8 +845,9 @@ func (s *TestSuite) TestRunTimeExceeded(c *C) {
     "runtime_constraints": {},
     "scheduling_parameters":{"max_run_time": 1},
     "state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
                time.Sleep(3 * time.Second)
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
@@ -799,8 +863,9 @@ func (s *TestSuite) TestContainerWaitFails(c *C) {
     "output_path": "/tmp",
     "priority": 1,
     "state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
                s.executor.waitErr = errors.New("Container is not running")
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
@@ -818,8 +883,9 @@ func (s *TestSuite) TestCrunchstat(c *C) {
                "priority": 1,
                "runtime_constraints": {},
                "state": "Locked"
-       }`, nil, 0, func() {
+       }`, nil, func() int {
                time.Sleep(time.Second)
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -851,10 +917,10 @@ func (s *TestSuite) TestNodeInfoLog(c *C) {
                "priority": 1,
                "runtime_constraints": {},
                "state": "Locked"
-       }`, nil, 0,
-               func() {
-                       time.Sleep(time.Second)
-               })
+       }`, nil, func() int {
+               time.Sleep(time.Second)
+               return 0
+       })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
@@ -885,9 +951,9 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
                "priority": 1,
                "runtime_constraints": {},
                "state": "Locked"
-       }`, nil, 0,
-               func() {
-               })
+       }`, nil, func() int {
+               return 0
+       })
 
        c.Assert(s.api.Logs["crunch-run"], NotNil)
        c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
@@ -896,6 +962,42 @@ func (s *TestSuite) TestLogVersionAndRuntime(c *C) {
        c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
 }
 
+func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
+       var collection_create, container_update arvadosclient.Dict
+       s.fullRunHelper(c, `{
+               "command": ["true"],
+               "container_image": "`+arvadostest.DockerImage112PDH+`",
+               "cwd": ".",
+               "environment": {},
+               "mounts": {"/tmp": {"kind": "tmp"} },
+               "output_path": "/tmp",
+               "priority": 1,
+               "runtime_constraints": {},
+               "state": "Locked",
+               "uuid": "zzzzz-dz642-202301121543210"
+       }`, nil, func() int {
+               collection_create = s.api.CalledWith("ensure_unique_name", true)
+               container_update = s.api.CalledWith("container.state", "Running")
+               return 0
+       })
+
+       c.Assert(collection_create, NotNil)
+       log_collection := collection_create["collection"].(arvadosclient.Dict)
+       c.Check(log_collection["name"], Equals, "logs for zzzzz-dz642-202301121543210")
+       manifest_text := log_collection["manifest_text"].(string)
+       // We check that the file size is at least two digits as an easy way to
+       // check the file isn't empty.
+       c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node-info\.txt( .+)?\n`)
+       c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node\.json( .+)?\n`)
+
+       c.Assert(container_update, NotNil)
+       // As of Arvados 2.5.0, the container update must specify its log in PDH
+       // format for the API server to propagate it to container requests, which
+       // is what we care about for this test.
+       expect_pdh := fmt.Sprintf("%x+%d", md5.Sum([]byte(manifest_text)), len(manifest_text))
+       c.Check(container_update["container"].(arvadosclient.Dict)["log"], Equals, expect_pdh)
+}
+
 func (s *TestSuite) TestContainerRecordLog(c *C) {
        s.fullRunHelper(c, `{
                "command": ["sleep", "1"],
@@ -907,9 +1009,10 @@ func (s *TestSuite) TestContainerRecordLog(c *C) {
                "priority": 1,
                "runtime_constraints": {},
                "state": "Locked"
-       }`, nil, 0,
-               func() {
+       }`, nil,
+               func() int {
                        time.Sleep(time.Second)
+                       return 0
                })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -930,9 +1033,10 @@ func (s *TestSuite) TestFullRunStderr(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 1, func() {
+}`, nil, func() int {
                fmt.Fprintln(s.executor.created.Stdout, "hello")
                fmt.Fprintln(s.executor.created.Stderr, "world")
+               return 1
        })
 
        final := s.api.CalledWith("container.state", "Complete")
@@ -955,8 +1059,9 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
                fmt.Fprintf(s.executor.created.Stdout, "workdir=%q", s.executor.created.WorkingDir)
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -976,8 +1081,9 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
                fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir)
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -997,8 +1103,9 @@ func (s *TestSuite) TestFullRunSetOutputStorageClasses(c *C) {
     "runtime_constraints": {},
     "state": "Locked",
     "output_storage_classes": ["foo", "bar"]
-}`, nil, 0, func() {
+}`, nil, func() int {
                fmt.Fprintln(s.executor.created.Stdout, s.executor.created.WorkingDir)
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -1020,8 +1127,9 @@ func (s *TestSuite) TestEnableCUDADeviceCount(c *C) {
     "runtime_constraints": {"cuda": {"device_count": 2}},
     "state": "Locked",
     "output_storage_classes": ["foo", "bar"]
-}`, nil, 0, func() {
+}`, nil, func() int {
                fmt.Fprintln(s.executor.created.Stdout, "ok")
+               return 0
        })
        c.Check(s.executor.created.CUDADeviceCount, Equals, 2)
 }
@@ -1038,25 +1146,30 @@ func (s *TestSuite) TestEnableCUDAHardwareCapability(c *C) {
     "runtime_constraints": {"cuda": {"hardware_capability": "foo"}},
     "state": "Locked",
     "output_storage_classes": ["foo", "bar"]
-}`, nil, 0, func() {
+}`, nil, func() int {
                fmt.Fprintln(s.executor.created.Stdout, "ok")
+               return 0
        })
        c.Check(s.executor.created.CUDADeviceCount, Equals, 0)
 }
 
 func (s *TestSuite) TestStopOnSignal(c *C) {
-       s.executor.runFunc = func() {
+       s.executor.runFunc = func() int {
                s.executor.created.Stdout.Write([]byte("foo\n"))
                s.runner.SigChan <- syscall.SIGINT
+               time.Sleep(10 * time.Second)
+               return 0
        }
        s.testStopContainer(c)
 }
 
 func (s *TestSuite) TestStopOnArvMountDeath(c *C) {
-       s.executor.runFunc = func() {
+       s.executor.runFunc = func() int {
                s.executor.created.Stdout.Write([]byte("foo\n"))
                s.runner.ArvMountExit <- nil
                close(s.runner.ArvMountExit)
+               time.Sleep(10 * time.Second)
+               return 0
        }
        s.runner.ArvMountExit = make(chan error)
        s.testStopContainer(c)
@@ -1115,8 +1228,9 @@ func (s *TestSuite) TestFullRunSetEnv(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
                fmt.Fprintf(s.executor.created.Stdout, "%v", s.executor.created.Env)
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -1527,8 +1641,9 @@ func (s *TestSuite) TestStdout(c *C) {
                "state": "Locked"
        }`
 
-       s.fullRunHelper(c, helperRecord, nil, 0, func() {
+       s.fullRunHelper(c, helperRecord, nil, func() int {
                fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -1537,7 +1652,7 @@ func (s *TestSuite) TestStdout(c *C) {
 }
 
 // Used by the TestStdoutWithWrongPath*()
-func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func()) (*ArvTestClient, *ContainerRunner, error) {
+func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func() int) (*ArvTestClient, *ContainerRunner, error) {
        err := json.Unmarshal([]byte(record), &s.api.Container)
        c.Assert(err, IsNil)
        s.executor.runFunc = fn
@@ -1553,7 +1668,7 @@ func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
     "output_path": "/tmp",
     "state": "Locked"
-}`, func() {})
+}`, func() int { return 0 })
        c.Check(err, ErrorMatches, ".*Stdout path does not start with OutputPath.*")
 }
 
@@ -1562,7 +1677,7 @@ func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
     "output_path": "/tmp",
     "state": "Locked"
-}`, func() {})
+}`, func() int { return 0 })
        c.Check(err, ErrorMatches, ".*unsupported mount kind 'tmp' for stdout.*")
 }
 
@@ -1571,7 +1686,7 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
     "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
     "output_path": "/tmp",
     "state": "Locked"
-}`, func() {})
+}`, func() int { return 0 })
        c.Check(err, ErrorMatches, ".*unsupported mount kind 'collection' for stdout.*")
 }
 
@@ -1586,9 +1701,9 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
     "priority": 1,
     "runtime_constraints": {"API": true},
     "state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
                c.Check(s.executor.created.Env["ARVADOS_API_HOST"], Equals, os.Getenv("ARVADOS_API_HOST"))
-               s.executor.exit <- 3
+               return 3
        })
        c.Check(s.api.CalledWith("container.exit_code", 3), NotNil)
        c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
@@ -1608,8 +1723,9 @@ func (s *TestSuite) TestFullRunSetOutput(c *C) {
     "priority": 1,
     "runtime_constraints": {"API": true},
     "state": "Locked"
-}`, nil, 0, func() {
+}`, nil, func() int {
                s.api.Container.Output = arvadostest.DockerImage112PDH
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -1623,9 +1739,9 @@ func (s *TestSuite) TestArvMountRuntimeStatusWarning(c *C) {
                ioutil.WriteFile(s.runner.ArvMountPoint+"/by_id/README", nil, 0666)
                return s.runner.ArvMountCmd([]string{"bash", "-c", "echo >&2 Test: Keep write error: I am a teapot; sleep 3"}, "")
        }
-       s.executor.runFunc = func() {
+       s.executor.runFunc = func() int {
                time.Sleep(time.Second)
-               s.executor.exit <- 137
+               return 137
        }
        record := `{
     "command": ["sleep", "1"],
@@ -1671,8 +1787,9 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C
 
        extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
 
-       s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+       s.fullRunHelper(c, helperRecord, extraMounts, func() int {
                fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -1707,8 +1824,9 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
                "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
        }
 
-       api, _, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+       api, _, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, func() int {
                fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+               return 0
        })
 
        c.Check(s.executor.created.BindMounts, DeepEquals, map[string]bindmount{
@@ -1764,8 +1882,9 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
        }
 
-       s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+       s.fullRunHelper(c, helperRecord, extraMounts, func() int {
                fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -1801,8 +1920,9 @@ func (s *TestSuite) TestOutputError(c *C) {
                "runtime_constraints": {},
                "state": "Locked"
        }`
-       s.fullRunHelper(c, helperRecord, nil, 0, func() {
+       s.fullRunHelper(c, helperRecord, nil, func() int {
                os.Symlink("/etc/hosts", s.runner.HostOutputDir+"/baz")
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
@@ -1829,8 +1949,9 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
                "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
        }
 
-       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func() {
+       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, func() int {
                fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+               return 0
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1864,8 +1985,9 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) {
                "state": "Locked"
        }`
 
-       api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func() {
+       api, _, _ := s.fullRunHelper(c, helperRecord, nil, func() int {
                fmt.Fprintln(s.executor.created.Stdout, s.executor.created.Env["FROBIZ"])
+               return 0
        })
 
        c.Check(api.CalledWith("container.exit_code", 0), NotNil)
@@ -1895,9 +2017,10 @@ func (s *TestSuite) TestStderrMount(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 1, func() {
+}`, nil, func() int {
                fmt.Fprintln(s.executor.created.Stdout, "hello")
                fmt.Fprintln(s.executor.created.Stderr, "oops")
+               return 1
        })
 
        final := api.CalledWith("container.state", "Complete")
@@ -1947,7 +2070,7 @@ func (s *TestSuite) TestFullBrokenDocker(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 0, func() {})
+}`, nil, func() int { return 0 })
                c.Check(s.api.CalledWith("container.state", nextState), NotNil)
                c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
                if s.runner.brokenNodeHook != "" {
@@ -1978,7 +2101,7 @@ func (s *TestSuite) TestBadCommand(c *C) {
     "priority": 1,
     "runtime_constraints": {},
     "state": "Locked"
-}`, nil, 0, func() {})
+}`, nil, func() int { return 0 })
                c.Check(s.api.CalledWith("container.state", "Cancelled"), NotNil)
                c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Possible causes:.*is missing.*")
        }
@@ -2001,10 +2124,11 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
                "state": "Locked"
        }`
 
-       s.fullRunHelper(c, helperRecord, nil, 0, func() {
+       s.fullRunHelper(c, helperRecord, nil, func() int {
                content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
                c.Check(err, IsNil)
                c.Check(string(content), Equals, "mypassword")
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -2030,10 +2154,11 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
        }`
 
        s.SetUpTest(c)
-       s.fullRunHelper(c, helperRecord, nil, 0, func() {
+       s.fullRunHelper(c, helperRecord, nil, func() int {
                content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
                c.Check(err, IsNil)
                c.Check(string(content), Equals, "mypassword")
+               return 0
        })
 
        c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
@@ -2059,7 +2184,7 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
        }`
 
        s.SetUpTest(c)
-       _, _, realtemp := s.fullRunHelper(c, helperRecord, nil, 0, func() {
+       _, _, realtemp := s.fullRunHelper(c, helperRecord, nil, func() int {
                // secret.conf should be provisioned as a separate
                // bind mount, i.e., it should not appear in the
                // (fake) fuse filesystem as viewed from the host.
@@ -2069,6 +2194,7 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
                }
                err = ioutil.WriteFile(s.runner.HostOutputDir+"/.arvados#collection", []byte(`{"manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"}`), 0700)
                c.Check(err, IsNil)
+               return 0
        })
 
        content, err := ioutil.ReadFile(realtemp + "/text1/mountdata.text")
@@ -2080,6 +2206,60 @@ func (s *TestSuite) TestSecretTextMountPoint(c *C) {
        c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"), NotNil)
 }
 
+func (s *TestSuite) TestCalculateCost(c *C) {
+       defer func(s string) { lockdir = s }(lockdir)
+       lockdir = c.MkDir()
+       now := time.Now()
+       cr := s.runner
+       cr.costStartTime = now.Add(-time.Hour)
+       var logbuf bytes.Buffer
+       cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
+
+       // if there's no InstanceType env var, cost is calculated as 0
+       os.Unsetenv("InstanceType")
+       cost := cr.calculateCost(now)
+       c.Check(cost, Equals, 0.0)
+
+       // with InstanceType env var and loadPrices() hasn't run (or
+       // hasn't found any data), cost is calculated based on
+       // InstanceType env var
+       os.Setenv("InstanceType", `{"Price":1.2}`)
+       defer os.Unsetenv("InstanceType")
+       cost = cr.calculateCost(now)
+       c.Check(cost, Equals, 1.2)
+
+       // first update tells us the spot price was $1/h until 30
+       // minutes ago when it increased to $2/h
+       j, err := json.Marshal([]cloud.InstancePrice{
+               {StartTime: now.Add(-4 * time.Hour), Price: 1.0},
+               {StartTime: now.Add(-time.Hour / 2), Price: 2.0},
+       })
+       c.Assert(err, IsNil)
+       os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+       cr.loadPrices()
+       cost = cr.calculateCost(now)
+       c.Check(cost, Equals, 1.5)
+
+       // next update (via --list + SIGUSR2) tells us the spot price
+       // increased to $3/h 15 minutes ago
+       j, err = json.Marshal([]cloud.InstancePrice{
+               {StartTime: now.Add(-time.Hour / 3), Price: 2.0}, // dup of -time.Hour/2 price
+               {StartTime: now.Add(-time.Hour / 4), Price: 3.0},
+       })
+       c.Assert(err, IsNil)
+       os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+       cr.loadPrices()
+       cost = cr.calculateCost(now)
+       c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
+
+       cost = cr.calculateCost(now.Add(-time.Hour / 2))
+       c.Check(cost, Equals, 0.5)
+
+       c.Logf("%s", logbuf.String())
+       c.Check(logbuf.String(), Matches, `(?ms).*Instance price changed to 1\.00 at 20.* changed to 2\.00 .* changed to 3\.00 .*`)
+       c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
+}
+
 type FakeProcess struct {
        cmdLine []string
 }