"fmt"
"io"
"io/ioutil"
+ "log"
"os"
"os/exec"
"regexp"
"testing"
"time"
+ "git.arvados.org/arvados.git/lib/cloud"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
runner *ContainerRunner
executor *stubExecutor
keepmount string
+ keepmountTmp []string
testDispatcherKeepClient KeepTestClient
testContainerKeepClient KeepTestClient
}
func (s *TestSuite) SetUpTest(c *C) {
- *brokenNodeHook = ""
s.client = arvados.NewClientFromEnv()
s.executor = &stubExecutor{}
var err error
}
s.runner.RunArvMount = func(cmd []string, tok string) (*exec.Cmd, error) {
s.runner.ArvMountPoint = s.keepmount
+ for i, opt := range cmd {
+ if opt == "--mount-tmp" {
+ err := os.Mkdir(s.keepmount+"/"+cmd[i+1], 0700)
+ if err != nil {
+ return nil, err
+ }
+ s.keepmountTmp = append(s.keepmountTmp, cmd[i+1])
+ }
+ }
return nil, nil
}
s.keepmount = c.MkDir()
err = os.Mkdir(s.keepmount+"/by_id", 0755)
+ s.keepmountTmp = nil
c.Assert(err, IsNil)
err = os.Mkdir(s.keepmount+"/by_id/"+arvadostest.DockerImage112PDH, 0755)
c.Assert(err, IsNil)
return e.loadErr
}
func (e *stubExecutor) Runtime() string { return "stub" }
+func (e *stubExecutor) Version() string { return "stub " + cmd.Version.String() }
func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr }
func (e *stubExecutor) Start() error { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr }
func (e *stubExecutor) CgroupID() string { return "cgroupid" }
func (e *stubExecutor) Wait(context.Context) (int, error) {
return <-e.exit, e.waitErr
}
+func (e *stubExecutor) InjectCommand(ctx context.Context, _, _ string, _ bool, _ []string) (*exec.Cmd, error) {
+ return nil, errors.New("unimplemented")
+}
+func (e *stubExecutor) IPAddress() (string, error) { return "", errors.New("unimplemented") }
const fakeInputCollectionPDH = "ffffffffaaaaaaaa88888888eeeeeeee+1234"
if resourceType == "collections" && output != nil {
mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
+ md5sum := md5.Sum([]byte(mt))
outmap := output.(*arvados.Collection)
- outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
- outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%15.15x", md5.Sum([]byte(mt)))
+ outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5sum, len(mt))
+ outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%015x", md5sum[:7])
}
return nil
if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
client.WasSetRunning = true
}
- } else if resourceType == "collections" {
+ } else if resourceType == "collections" && output != nil {
mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
output.(*arvados.Collection).UUID = uuid
output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt)))
return errors.New("not implemented")
}
+func (fw FileWrapper) Snapshot() (*arvados.Subtree, error) {
+ return nil, errors.New("not implemented")
+}
+
+func (fw FileWrapper) Splice(*arvados.Subtree) error {
+ return errors.New("not implemented")
+}
+
func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
if filename == hwImageID+".tar" {
rdr := ioutil.NopCloser(&bytes.Buffer{})
cr, err := NewContainerRunner(s.client, api, kc, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
c.Assert(err, IsNil)
- err = cr.UpdateContainerRunning()
+ err = cr.UpdateContainerRunning("")
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
s.runner.statInterval = 100 * time.Millisecond
s.runner.containerWatchdogInterval = time.Second
- am := &ArvMountCmdLine{}
- s.runner.RunArvMount = am.ArvMountTest
realTemp := c.MkDir()
tempcount := 0
c.Check(s.executor.created.RAM, Equals, int64(1000000))
c.Check(s.executor.created.NetworkMode, Equals, "default")
c.Check(s.executor.created.EnableNetwork, Equals, false)
+ c.Check(s.executor.created.CUDADeviceCount, Equals, 0)
fmt.Fprintln(s.executor.created.Stdout, "hello world")
})
c.Assert(s.api.Logs["crunch-run"], NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container 'zzzzz-zzzzz-zzzzzzzzzzzzzzz' using stub runtime.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
+}
+
+func (s *TestSuite) TestCommitNodeInfoBeforeStart(c *C) {
+ var collection_create, container_update arvadosclient.Dict
+ s.fullRunHelper(c, `{
+ "command": ["true"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "state": "Locked",
+ "uuid": "zzzzz-dz642-202301121543210"
+ }`, nil, 0,
+ func() {
+ collection_create = s.api.CalledWith("ensure_unique_name", true)
+ container_update = s.api.CalledWith("container.state", "Running")
+ })
+
+ c.Assert(collection_create, NotNil)
+ log_collection := collection_create["collection"].(arvadosclient.Dict)
+ c.Check(log_collection["name"], Equals, "logs for zzzzz-dz642-202301121543210")
+ manifest_text := log_collection["manifest_text"].(string)
+ // We check that the file size is at least two digits as an easy way to
+ // check the file isn't empty.
+ c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node-info\.txt( .+)?\n`)
+ c.Check(manifest_text, Matches, `\. .+ \d+:\d{2,}:node\.json( .+)?\n`)
+
+ c.Assert(container_update, NotNil)
+ // As of Arvados 2.5.0, the container update must specify its log in PDH
+ // format for the API server to propagate it to container requests, which
+ // is what we care about for this test.
+ expect_pdh := fmt.Sprintf("%x+%d", md5.Sum([]byte(manifest_text)), len(manifest_text))
+ c.Check(container_update["container"].(arvadosclient.Dict)["log"], Equals, expect_pdh)
}
func (s *TestSuite) TestContainerRecordLog(c *C) {
c.Check(s.testContainerKeepClient.StorageClasses, DeepEquals, []string{"foo", "bar"})
}
+func (s *TestSuite) TestEnableCUDADeviceCount(c *C) {
+ s.fullRunHelper(c, `{
+ "command": ["pwd"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": "/bin",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"cuda": {"device_count": 2}},
+ "state": "Locked",
+ "output_storage_classes": ["foo", "bar"]
+}`, nil, 0, func() {
+ fmt.Fprintln(s.executor.created.Stdout, "ok")
+ })
+ c.Check(s.executor.created.CUDADeviceCount, Equals, 2)
+}
+
+func (s *TestSuite) TestEnableCUDAHardwareCapability(c *C) {
+ s.fullRunHelper(c, `{
+ "command": ["pwd"],
+ "container_image": "`+arvadostest.DockerImage112PDH+`",
+ "cwd": "/bin",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {"cuda": {"hardware_capability": "foo"}},
+ "state": "Locked",
+ "output_storage_classes": ["foo", "bar"]
+}`, nil, 0, func() {
+ fmt.Fprintln(s.executor.created.Stdout, "ok")
+ })
+ c.Check(s.executor.created.CUDADeviceCount, Equals, 0)
+}
+
func (s *TestSuite) TestStopOnSignal(c *C) {
s.executor.runFunc = func() {
s.executor.created.Stdout.Write([]byte("foo\n"))
bindmounts, err := cr.SetupMounts()
c.Check(err, IsNil)
c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
- "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
+ "--read-write", "--storage-classes", "default", "--crunchstat-interval=5", "--ram-cache",
"--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
c.Check(bindmounts, DeepEquals, map[string]bindmount{
"/keepinp": {realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", true},
bindmounts, err := cr.SetupMounts()
c.Check(err, IsNil)
c.Check(am.Cmd, DeepEquals, []string{"arv-mount", "--foreground",
- "--read-write", "--storage-classes", "default", "--crunchstat-interval=5",
+ "--read-write", "--storage-classes", "default", "--crunchstat-interval=5", "--ram-cache",
"--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "--disable-event-listening", "--mount-by-id", "by_uuid", realTemp + "/keep1"})
c.Check(bindmounts, DeepEquals, map[string]bindmount{
"/tmp": {realTemp + "/tmp2", false},
})
c.Check(s.api.CalledWith("container.exit_code", 3), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*status code 3\n.*`)
}
func (s *TestSuite) TestFullRunSetOutput(c *C) {
}
s.executor.runFunc = func() {
time.Sleep(time.Second)
- s.executor.exit <- 0
+ s.executor.exit <- 137
}
record := `{
"command": ["sleep", "1"],
c.Assert(err, IsNil)
err = s.runner.Run()
c.Assert(err, IsNil)
- c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(s.api.CalledWith("container.exit_code", 137), NotNil)
c.Check(s.api.CalledWith("container.runtime_status.warning", "arv-mount: Keep write error"), NotNil)
c.Check(s.api.CalledWith("container.runtime_status.warningDetail", "Test: Keep write error: I am a teapot"), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Container exited with status code 137 \(signal 9, SIGKILL\).*`)
}
func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {},
- "state": "Locked"
+ "state": "Locked",
+ "uuid": "zzzzz-dz642-202301130848001"
}`
extraMounts := []string{
c.Check(api.CalledWith("container.exit_code", 0), NotNil)
c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- for _, v := range api.Content {
- if v["collection"] != nil {
- c.Check(v["ensure_unique_name"], Equals, true)
- collection := v["collection"].(arvadosclient.Dict)
- if strings.Index(collection["name"].(string), "output") == 0 {
- manifest := collection["manifest_text"].(string)
-
- c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+ output_count := uint(0)
+ for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content {
+ if v["collection"] == nil {
+ continue
+ }
+ collection := v["collection"].(arvadosclient.Dict)
+ if collection["name"].(string) != "output for zzzzz-dz642-202301130848001" {
+ continue
+ }
+ c.Check(v["ensure_unique_name"], Equals, true)
+ c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:bar 36:18:sub1file2
./foo/baz 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 9:18:sub2file2
./foo/sub1 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
./foo/sub1/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
`)
- }
- }
+ output_count++
}
+ c.Check(output_count, Not(Equals), uint(0))
}
func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(c *C) {
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {},
- "state": "Locked"
+ "state": "Locked",
+ "uuid": "zzzzz-dz642-202301130848002"
}`
extraMounts := []string{
c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
- for _, v := range s.api.Content {
- if v["collection"] != nil {
- collection := v["collection"].(arvadosclient.Dict)
- if strings.Index(collection["name"].(string), "output") == 0 {
- manifest := collection["manifest_text"].(string)
-
- c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
+ output_count := uint(0)
+ for _, v := range s.runner.ContainerArvClient.(*ArvTestClient).Content {
+ if v["collection"] == nil {
+ continue
+ }
+ collection := v["collection"].(arvadosclient.Dict)
+ if collection["name"].(string) != "output for zzzzz-dz642-202301130848002" {
+ continue
+ }
+ c.Check(collection["manifest_text"].(string), Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 10:17:bar
`)
- }
- }
+ output_count++
}
+ c.Check(output_count, Not(Equals), uint(0))
}
func (s *TestSuite) TestOutputError(c *C) {
func() {
c.Log("// loadErr = cannot connect")
s.executor.loadErr = errors.New("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
- *brokenNodeHook = c.MkDir() + "/broken-node-hook"
- err := ioutil.WriteFile(*brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
+ s.runner.brokenNodeHook = c.MkDir() + "/broken-node-hook"
+ err := ioutil.WriteFile(s.runner.brokenNodeHook, []byte("#!/bin/sh\nexec echo killme\n"), 0700)
c.Assert(err, IsNil)
nextState = "Queued"
},
}`, nil, 0, func() {})
c.Check(s.api.CalledWith("container.state", nextState), NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*unable to run containers.*")
- if *brokenNodeHook != "" {
+ if s.runner.brokenNodeHook != "" {
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*Running broken node hook.*")
c.Check(s.api.Logs["crunch-run"].String(), Matches, "(?ms).*killme.*")
c.Check(s.api.Logs["crunch-run"].String(), Not(Matches), "(?ms).*Writing /var/lock/crunch-run-broken to mark node as broken.*")
c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil)
c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ""), NotNil)
+
+ // under secret mounts, output dir is a collection, not captured in output
+ helperRecord = `{
+ "command": ["true"],
+ "container_image": "` + arvadostest.DockerImage112PDH + `",
+ "cwd": "/bin",
+ "mounts": {
+ "/tmp": {"kind": "collection", "writable": true}
+ },
+ "secret_mounts": {
+ "/tmp/secret.conf": {"kind": "text", "content": "mypassword"}
+ },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {},
+ "state": "Locked"
+ }`
+
+ s.SetUpTest(c)
+ _, _, realtemp := s.fullRunHelper(c, helperRecord, nil, 0, func() {
+ // secret.conf should be provisioned as a separate
+ // bind mount, i.e., it should not appear in the
+ // (fake) fuse filesystem as viewed from the host.
+ content, err := ioutil.ReadFile(s.runner.HostOutputDir + "/secret.conf")
+ if !c.Check(errors.Is(err, os.ErrNotExist), Equals, true) {
+ c.Logf("secret.conf: content %q, err %#v", content, err)
+ }
+ err = ioutil.WriteFile(s.runner.HostOutputDir+"/.arvados#collection", []byte(`{"manifest_text":". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"}`), 0700)
+ c.Check(err, IsNil)
+ })
+
+ content, err := ioutil.ReadFile(realtemp + "/text1/mountdata.text")
+ c.Check(err, IsNil)
+ c.Check(string(content), Equals, "mypassword")
+ c.Check(s.executor.created.BindMounts["/tmp/secret.conf"], DeepEquals, bindmount{realtemp + "/text1/mountdata.text", true})
+ c.Check(s.api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(s.api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(s.runner.ContainerArvClient.(*ArvTestClient).CalledWith("collection.manifest_text", ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:foo.txt\n"), NotNil)
+}
+
+func (s *TestSuite) TestCalculateCost(c *C) {
+ defer func(s string) { lockdir = s }(lockdir)
+ lockdir = c.MkDir()
+ now := time.Now()
+ cr := s.runner
+ cr.costStartTime = now.Add(-time.Hour)
+ var logbuf bytes.Buffer
+ cr.CrunchLog.Immediate = log.New(&logbuf, "", 0)
+
+ // if there's no InstanceType env var, cost is calculated as 0
+ os.Unsetenv("InstanceType")
+ cost := cr.calculateCost(now)
+ c.Check(cost, Equals, 0.0)
+
+ // with InstanceType env var and loadPrices() hasn't run (or
+ // hasn't found any data), cost is calculated based on
+ // InstanceType env var
+ os.Setenv("InstanceType", `{"Price":1.2}`)
+ defer os.Unsetenv("InstanceType")
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.2)
+
+ // first update tells us the spot price was $1/h until 30
+ // minutes ago when it increased to $2/h
+ j, err := json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-4 * time.Hour), Price: 1.0},
+ {StartTime: now.Add(-time.Hour / 2), Price: 2.0},
+ })
+ c.Assert(err, IsNil)
+ os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+ cr.loadPrices()
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.5)
+
+ // next update (via --list + SIGUSR2) tells us the spot price
+ // increased to $3/h 15 minutes ago
+ j, err = json.Marshal([]cloud.InstancePrice{
+ {StartTime: now.Add(-time.Hour / 3), Price: 2.0}, // dup of -time.Hour/2 price
+ {StartTime: now.Add(-time.Hour / 4), Price: 3.0},
+ })
+ c.Assert(err, IsNil)
+ os.WriteFile(lockdir+"/"+pricesfile, j, 0777)
+ cr.loadPrices()
+ cost = cr.calculateCost(now)
+ c.Check(cost, Equals, 1.0/2+2.0/4+3.0/4)
+
+ cost = cr.calculateCost(now.Add(-time.Hour / 2))
+ c.Check(cost, Equals, 0.5)
+
+ c.Logf("%s", logbuf.String())
+ c.Check(logbuf.String(), Matches, `(?ms).*Instance price changed to 1\.00 at 20.* changed to 2\.00 .* changed to 3\.00 .*`)
+ c.Check(logbuf.String(), Not(Matches), `(?ms).*changed to 2\.00 .* changed to 2\.00 .*`)
}
type FakeProcess struct {