X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/ca6c0353c236d5fc0f829880e845cbd8489ea9ad..f93ba744038d79211a48105bd691e26e1770cc61:/lib/lsf/dispatch_test.go diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go index 44a1a3d8cb..a99983f34a 100644 --- a/lib/lsf/dispatch_test.go +++ b/lib/lsf/dispatch_test.go @@ -6,6 +6,7 @@ package lsf import ( "context" + "encoding/json" "fmt" "math/rand" "os/exec" @@ -29,7 +30,9 @@ func Test(t *testing.T) { var _ = check.Suite(&suite{}) type suite struct { - disp *dispatcher + disp *dispatcher + crTooBig arvados.ContainerRequest + crCUDARequest arvados.ContainerRequest } func (s *suite) TearDownTest(c *check.C) { @@ -41,11 +44,51 @@ func (s *suite) SetUpTest(c *check.C) { c.Assert(err, check.IsNil) cluster, err := cfg.GetCluster("") c.Assert(err, check.IsNil) - cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second) + cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4) + cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4) s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher) s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd { return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false") } + err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{ + "container_request": map[string]interface{}{ + "runtime_constraints": arvados.RuntimeConstraints{ + RAM: 1000000000000, + VCPUs: 1, + }, + "container_image": arvadostest.DockerImage112PDH, + "command": []string{"sleep", "1"}, + "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}}, + "output_path": "/mnt/out", + "state": arvados.ContainerRequestStateCommitted, + "priority": 1, + "container_count_max": 1, + }, + }) + c.Assert(err, check.IsNil) + + err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{ + "container_request": map[string]interface{}{ + "runtime_constraints": arvados.RuntimeConstraints{ + RAM: 16000000, + VCPUs: 1, + CUDA: arvados.CUDARuntimeConstraints{ + DeviceCount: 1, + DriverVersion: "11.0", + HardwareCapability: "8.0", + }, + }, + "container_image": arvadostest.DockerImage112PDH, + "command": []string{"sleep", "1"}, + "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}}, + "output_path": "/mnt/out", + "state": arvados.ContainerRequestStateCommitted, + "priority": 1, + "container_count_max": 1, + }, + }) + c.Assert(err, check.IsNil) + } type lsfstub struct { @@ -72,18 +115,24 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ... switch prog { case "bsub": defaultArgs := s.disp.Cluster.Containers.LSF.BsubArgumentsList - c.Assert(len(args) > len(defaultArgs), check.Equals, true) - c.Check(args[:len(defaultArgs)], check.DeepEquals, defaultArgs) - args = args[len(defaultArgs):] - - c.Check(args[0], check.Equals, "-J") + if args[5] == s.crCUDARequest.ContainerUUID { + c.Assert(len(args), check.Equals, len(defaultArgs)+len(s.disp.Cluster.Containers.LSF.BsubCUDAArguments)) + } else { + c.Assert(len(args), check.Equals, len(defaultArgs)) + } + // %%J must have been rewritten to %J + c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out") + args = args[4:] switch args[1] { case arvadostest.LockedContainerUUID: c.Check(args, check.DeepEquals, []string{ "-J", arvadostest.LockedContainerUUID, "-n", "4", "-D", "11701MB", - "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"}) + "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]", + "-R", "select[mem>=11701MB]", + "-R", "select[tmp>=0MB]", + "-R", "select[ncpus>=4]"}) mtx.Lock() fakejobq[nextjobid] = args[1] nextjobid++ @@ -93,7 +142,37 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ... "-J", arvadostest.QueuedContainerUUID, "-n", "4", "-D", "11701MB", - "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"}) + "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]", + "-R", "select[mem>=11701MB]", + "-R", "select[tmp>=45777MB]", + "-R", "select[ncpus>=4]"}) + mtx.Lock() + fakejobq[nextjobid] = args[1] + nextjobid++ + mtx.Unlock() + case s.crTooBig.ContainerUUID: + c.Check(args, check.DeepEquals, []string{ + "-J", s.crTooBig.ContainerUUID, + "-n", "1", + "-D", "954187MB", + "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]", + "-R", "select[mem>=954187MB]", + "-R", "select[tmp>=256MB]", + "-R", "select[ncpus>=1]"}) + mtx.Lock() + fakejobq[nextjobid] = args[1] + nextjobid++ + mtx.Unlock() + case s.crCUDARequest.ContainerUUID: + c.Check(args, check.DeepEquals, []string{ + "-J", s.crCUDARequest.ContainerUUID, + "-n", "1", + "-D", "528MB", + "-R", "rusage[mem=528MB:tmp=256MB] span[hosts=1]", + "-R", "select[mem>=528MB]", + "-R", "select[tmp>=256MB]", + "-R", "select[ncpus>=1]", + "-gpu", "num=1"}) mtx.Lock() fakejobq[nextjobid] = args[1] nextjobid++ @@ -104,13 +183,31 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ... } return exec.Command("echo", "submitted job") case "bjobs": - c.Check(args, check.DeepEquals, []string{"-u", "all", "-noheader", "-o", "jobid stat job_name:30"}) - out := "" + c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"}) + var records []map[string]interface{} for jobid, uuid := range fakejobq { - out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid) + stat, reason := "RUN", "" + if uuid == s.crTooBig.ContainerUUID { + // The real bjobs output includes a trailing ';' here: + stat, reason = "PEND", "There are no suitable hosts for the job;" + } + records = append(records, map[string]interface{}{ + "JOBID": fmt.Sprintf("%d", jobid), + "STAT": stat, + "JOB_NAME": uuid, + "PEND_REASON": reason, + }) } - c.Logf("bjobs out: %q", out) - return exec.Command("printf", out) + out, err := json.Marshal(map[string]interface{}{ + "COMMAND": "bjobs", + "JOBS": len(fakejobq), + "RECORDS": records, + }) + if err != nil { + panic(err) + } + c.Logf("bjobs out: %s", out) + return exec.Command("printf", string(out)) case "bkill": killid, _ := strconv.Atoi(args[0]) if uuid, ok := fakejobq[killid]; !ok { @@ -138,6 +235,7 @@ func (s *suite) TestSubmit(c *check.C) { sudoUser: s.disp.Cluster.Containers.LSF.BsubSudoUser, }.stubCommand(s, c) s.disp.Start() + deadline := time.Now().Add(20 * time.Second) for range time.NewTicker(time.Second).C { if time.Now().After(deadline) { @@ -145,23 +243,40 @@ func (s *suite) TestSubmit(c *check.C) { break } // "queuedcontainer" should be running - if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok { + if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok { + c.Log("Lookup(queuedcontainer) == false") continue } // "lockedcontainer" should be cancelled because it // has priority 0 (no matching container requests) - if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok { + if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok { + c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent) + continue + } + // "crTooBig" should be cancelled because lsf stub + // reports there is no suitable instance type + if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok { + c.Logf("Lookup(crTooBig) == true, ent = %#v", ent) continue } var ctr arvados.Container if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil { c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err) continue - } - if ctr.State != arvados.ContainerStateQueued { + } else if ctr.State != arvados.ContainerStateQueued { c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State) continue } + + if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil { + c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err) + continue + } else if ctr.State != arvados.ContainerStateCancelled { + c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State) + continue + } else { + c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;") + } c.Log("reached desired state") break }