Merge branch 'main' into 18842-arv-mount-disk-config
[arvados.git] / lib / lsf / dispatch_test.go
index 641453e5480ced43609efcb499d5dbff61383cae..a381b25e9d075bc993f6327c579107025a62fe79 100644 (file)
@@ -6,6 +6,7 @@ package lsf
 
 import (
        "context"
+       "encoding/json"
        "fmt"
        "math/rand"
        "os/exec"
@@ -29,7 +30,10 @@ func Test(t *testing.T) {
 var _ = check.Suite(&suite{})
 
 type suite struct {
-       disp *dispatcher
+       disp          *dispatcher
+       crTooBig      arvados.ContainerRequest
+       crPending     arvados.ContainerRequest
+       crCUDARequest arvados.ContainerRequest
 }
 
 func (s *suite) TearDownTest(c *check.C) {
@@ -41,11 +45,76 @@ func (s *suite) SetUpTest(c *check.C) {
        c.Assert(err, check.IsNil)
        cluster, err := cfg.GetCluster("")
        c.Assert(err, check.IsNil)
-       cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second)
+       cluster.Containers.ReserveExtraRAM = 256 << 20
+       cluster.Containers.CloudVMs.PollInterval = arvados.Duration(time.Second / 4)
+       cluster.Containers.MinRetryPeriod = arvados.Duration(time.Second / 4)
+       cluster.InstanceTypes = arvados.InstanceTypeMap{
+               "biggest_available_node": arvados.InstanceType{
+                       RAM:             100 << 30, // 100 GiB
+                       VCPUs:           4,
+                       IncludedScratch: 100 << 30,
+                       Scratch:         100 << 30,
+               }}
        s.disp = newHandler(context.Background(), cluster, arvadostest.Dispatch1Token, prometheus.NewRegistry()).(*dispatcher)
        s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
                return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
        }
+       err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+               "container_request": map[string]interface{}{
+                       "runtime_constraints": arvados.RuntimeConstraints{
+                               RAM:   1000000000000,
+                               VCPUs: 1,
+                       },
+                       "container_image":     arvadostest.DockerImage112PDH,
+                       "command":             []string{"sleep", "1"},
+                       "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+                       "output_path":         "/mnt/out",
+                       "state":               arvados.ContainerRequestStateCommitted,
+                       "priority":            1,
+                       "container_count_max": 1,
+               },
+       })
+       c.Assert(err, check.IsNil)
+
+       err = arvados.NewClientFromEnv().RequestAndDecode(&s.crPending, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+               "container_request": map[string]interface{}{
+                       "runtime_constraints": arvados.RuntimeConstraints{
+                               RAM:   100000000,
+                               VCPUs: 2,
+                       },
+                       "container_image":     arvadostest.DockerImage112PDH,
+                       "command":             []string{"sleep", "1"},
+                       "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+                       "output_path":         "/mnt/out",
+                       "state":               arvados.ContainerRequestStateCommitted,
+                       "priority":            1,
+                       "container_count_max": 1,
+               },
+       })
+       c.Assert(err, check.IsNil)
+
+       err = arvados.NewClientFromEnv().RequestAndDecode(&s.crCUDARequest, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+               "container_request": map[string]interface{}{
+                       "runtime_constraints": arvados.RuntimeConstraints{
+                               RAM:   16000000,
+                               VCPUs: 1,
+                               CUDA: arvados.CUDARuntimeConstraints{
+                                       DeviceCount:        1,
+                                       DriverVersion:      "11.0",
+                                       HardwareCapability: "8.0",
+                               },
+                       },
+                       "container_image":     arvadostest.DockerImage112PDH,
+                       "command":             []string{"sleep", "1"},
+                       "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+                       "output_path":         "/mnt/out",
+                       "state":               arvados.ContainerRequestStateCommitted,
+                       "priority":            1,
+                       "container_count_max": 1,
+               },
+       })
+       c.Assert(err, check.IsNil)
+
 }
 
 type lsfstub struct {
@@ -72,7 +141,11 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                switch prog {
                case "bsub":
                        defaultArgs := s.disp.Cluster.Containers.LSF.BsubArgumentsList
-                       c.Assert(len(args), check.Equals, len(defaultArgs))
+                       if args[5] == s.crCUDARequest.ContainerUUID {
+                               c.Assert(len(args), check.Equals, len(defaultArgs)+len(s.disp.Cluster.Containers.LSF.BsubCUDAArguments))
+                       } else {
+                               c.Assert(len(args), check.Equals, len(defaultArgs))
+                       }
                        // %%J must have been rewritten to %J
                        c.Check(args[1], check.Equals, "/tmp/crunch-run.%J.out")
                        args = args[4:]
@@ -82,7 +155,10 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                                        "-J", arvadostest.LockedContainerUUID,
                                        "-n", "4",
                                        "-D", "11701MB",
-                                       "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"})
+                                       "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
+                                       "-R", "select[mem>=11701MB]",
+                                       "-R", "select[tmp>=0MB]",
+                                       "-R", "select[ncpus>=4]"})
                                mtx.Lock()
                                fakejobq[nextjobid] = args[1]
                                nextjobid++
@@ -92,7 +168,37 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                                        "-J", arvadostest.QueuedContainerUUID,
                                        "-n", "4",
                                        "-D", "11701MB",
-                                       "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"})
+                                       "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
+                                       "-R", "select[mem>=11701MB]",
+                                       "-R", "select[tmp>=45777MB]",
+                                       "-R", "select[ncpus>=4]"})
+                               mtx.Lock()
+                               fakejobq[nextjobid] = args[1]
+                               nextjobid++
+                               mtx.Unlock()
+                       case s.crPending.ContainerUUID:
+                               c.Check(args, check.DeepEquals, []string{
+                                       "-J", s.crPending.ContainerUUID,
+                                       "-n", "2",
+                                       "-D", "352MB",
+                                       "-R", "rusage[mem=352MB:tmp=8448MB] span[hosts=1]",
+                                       "-R", "select[mem>=352MB]",
+                                       "-R", "select[tmp>=8448MB]",
+                                       "-R", "select[ncpus>=2]"})
+                               mtx.Lock()
+                               fakejobq[nextjobid] = args[1]
+                               nextjobid++
+                               mtx.Unlock()
+                       case s.crCUDARequest.ContainerUUID:
+                               c.Check(args, check.DeepEquals, []string{
+                                       "-J", s.crCUDARequest.ContainerUUID,
+                                       "-n", "1",
+                                       "-D", "528MB",
+                                       "-R", "rusage[mem=528MB:tmp=256MB] span[hosts=1]",
+                                       "-R", "select[mem>=528MB]",
+                                       "-R", "select[tmp>=256MB]",
+                                       "-R", "select[ncpus>=1]",
+                                       "-gpu", "num=1"})
                                mtx.Lock()
                                fakejobq[nextjobid] = args[1]
                                nextjobid++
@@ -103,13 +209,31 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                        }
                        return exec.Command("echo", "submitted job")
                case "bjobs":
-                       c.Check(args, check.DeepEquals, []string{"-u", "all", "-noheader", "-o", "jobid stat job_name:30"})
-                       out := ""
+                       c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
+                       var records []map[string]interface{}
                        for jobid, uuid := range fakejobq {
-                               out += fmt.Sprintf(`%d %s %s\n`, jobid, "RUN", uuid)
+                               stat, reason := "RUN", ""
+                               if uuid == s.crPending.ContainerUUID {
+                                       // The real bjobs output includes a trailing ';' here:
+                                       stat, reason = "PEND", "There are no suitable hosts for the job;"
+                               }
+                               records = append(records, map[string]interface{}{
+                                       "JOBID":       fmt.Sprintf("%d", jobid),
+                                       "STAT":        stat,
+                                       "JOB_NAME":    uuid,
+                                       "PEND_REASON": reason,
+                               })
+                       }
+                       out, err := json.Marshal(map[string]interface{}{
+                               "COMMAND": "bjobs",
+                               "JOBS":    len(fakejobq),
+                               "RECORDS": records,
+                       })
+                       if err != nil {
+                               panic(err)
                        }
-                       c.Logf("bjobs out: %q", out)
-                       return exec.Command("printf", out)
+                       c.Logf("bjobs out: %s", out)
+                       return exec.Command("printf", string(out))
                case "bkill":
                        killid, _ := strconv.Atoi(args[0])
                        if uuid, ok := fakejobq[killid]; !ok {
@@ -137,30 +261,53 @@ func (s *suite) TestSubmit(c *check.C) {
                sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
        }.stubCommand(s, c)
        s.disp.Start()
+
        deadline := time.Now().Add(20 * time.Second)
        for range time.NewTicker(time.Second).C {
                if time.Now().After(deadline) {
                        c.Error("timed out")
                        break
                }
+               // "crTooBig" should never be submitted to lsf because
+               // it is bigger than any configured instance type
+               if ent, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
+                       c.Errorf("Lookup(crTooBig) == true, ent = %#v", ent)
+                       break
+               }
                // "queuedcontainer" should be running
-               if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+               if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
+                       c.Log("Lookup(queuedcontainer) == false")
+                       continue
+               }
+               // "crPending" should be pending
+               if ent, ok := s.disp.lsfqueue.Lookup(s.crPending.ContainerUUID); !ok {
+                       c.Logf("Lookup(crPending) == false", ent)
                        continue
                }
                // "lockedcontainer" should be cancelled because it
                // has priority 0 (no matching container requests)
-               if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+               if ent, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+                       c.Logf("Lookup(lockedcontainer) == true, ent = %#v", ent)
                        continue
                }
                var ctr arvados.Container
                if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
                        c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
                        continue
-               }
-               if ctr.State != arvados.ContainerStateQueued {
+               } else if ctr.State != arvados.ContainerStateQueued {
                        c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
                        continue
                }
+
+               if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
+                       c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
+                       continue
+               } else if ctr.State != arvados.ContainerStateCancelled {
+                       c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
+                       continue
+               } else {
+                       c.Check(ctr.RuntimeStatus["error"], check.Equals, "constraints not satisfiable by any configured instance type")
+               }
                c.Log("reached desired state")
                break
        }