18298: Use bjobs select[] args, cancel on "no suitable host".
authorTom Clegg <tom@curii.com>
Thu, 18 Nov 2021 21:43:33 +0000 (16:43 -0500)
committerTom Clegg <tom@curii.com>
Thu, 18 Nov 2021 21:43:33 +0000 (16:43 -0500)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom@curii.com>

lib/config/config.default.yml
lib/config/generated_config.go
lib/lsf/dispatch.go
lib/lsf/dispatch_test.go
lib/lsf/lsfqueue.go

index 411a79650b3421df477f32e0e96b574d068023ae..7813db4f0988ade30e979ecf9d2f5533759e06dc 100644 (file)
@@ -1089,7 +1089,7 @@ Clusters:
         # in /tmp on the compute node each time an Arvados container
         # runs. Ensure you have something in place to delete old files
         # from /tmp, or adjust the "-o" and "-e" arguments accordingly.
-        BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+        BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
 
         # Use sudo to switch to this user account when submitting LSF
         # jobs.
index f8553c3eb758785edb6e023b10e8a9697085e74c..39e4ae00cbcc23d8e4a22bf2e30390e692d67ab4 100644 (file)
@@ -1095,7 +1095,7 @@ Clusters:
         # in /tmp on the compute node each time an Arvados container
         # runs. Ensure you have something in place to delete old files
         # from /tmp, or adjust the "-o" and "-e" arguments accordingly.
-        BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+        BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
 
         # Use sudo to switch to this user account when submitting LSF
         # jobs.
index 6e35b7de929f8843bdd7bdb848698ca96c62a123..537d52a072d6a503262b1a228c868afc8f28b151 100644 (file)
@@ -167,7 +167,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 
        if ctr.State != dispatch.Locked {
                // already started by prior invocation
-       } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+       } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
                disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
                cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
                cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
@@ -181,16 +181,38 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
        disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
        defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
 
-       // If the container disappears from the lsf queue, there is
-       // no point in waiting for further dispatch updates: just
-       // clean up and return.
        go func(uuid string) {
+               cancelled := false
                for ctx.Err() == nil {
-                       if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+                       qent, ok := disp.lsfqueue.Lookup(uuid)
+                       if !ok {
+                               // If the container disappears from
+                               // the lsf queue, there is no point in
+                               // waiting for further dispatch
+                               // updates: just clean up and return.
                                disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
                                cancel()
                                return
                        }
+                       if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
+                               disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
+                               err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
+                                       "container": map[string]interface{}{
+                                               "runtime_status": map[string]string{
+                                                       "error": qent.PendReason,
+                                               },
+                                       },
+                               }, nil)
+                               if err != nil {
+                                       disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
+                                       continue // retry
+                               }
+                               err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
+                               if err != nil {
+                                       continue // retry (UpdateState() already logged the error)
+                               }
+                               cancelled = true
+                       }
                }
        }(ctr.UUID)
 
@@ -236,10 +258,10 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
        // from the queue.
        ticker := time.NewTicker(5 * time.Second)
        defer ticker.Stop()
-       for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
-               err := disp.lsfcli.Bkill(jobid)
+       for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
+               err := disp.lsfcli.Bkill(qent.ID)
                if err != nil {
-                       disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+                       disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
                }
                <-ticker.C
        }
@@ -262,10 +284,10 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s
 }
 
 func (disp *dispatcher) bkill(ctr arvados.Container) {
-       if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+       if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
                disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
-       } else if err := disp.lsfcli.Bkill(jobid); err != nil {
-               disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+       } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
+               disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
        }
 }
 
index bb3b1b9df6439de040b651728ce3c730289758c6..c044df09f65d42f5f4aad7903b60e27160d5ec98 100644 (file)
@@ -30,7 +30,8 @@ func Test(t *testing.T) {
 var _ = check.Suite(&suite{})
 
 type suite struct {
-       disp *dispatcher
+       disp     *dispatcher
+       crTooBig arvados.ContainerRequest
 }
 
 func (s *suite) TearDownTest(c *check.C) {
@@ -47,6 +48,22 @@ func (s *suite) SetUpTest(c *check.C) {
        s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
                return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
        }
+       err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+               "container_request": map[string]interface{}{
+                       "runtime_constraints": arvados.RuntimeConstraints{
+                               RAM:   1000000000000,
+                               VCPUs: 1,
+                       },
+                       "container_image":     arvadostest.DockerImage112PDH,
+                       "command":             []string{"sleep", "1"},
+                       "mounts":              map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+                       "output_path":         "/mnt/out",
+                       "state":               arvados.ContainerRequestStateCommitted,
+                       "priority":            1,
+                       "container_count_max": 1,
+               },
+       })
+       c.Assert(err, check.IsNil)
 }
 
 type lsfstub struct {
@@ -83,7 +100,10 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                                        "-J", arvadostest.LockedContainerUUID,
                                        "-n", "4",
                                        "-D", "11701MB",
-                                       "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"})
+                                       "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
+                                       "-R", "select[mem>=11701MB]",
+                                       "-R", "select[tmp>=0MB]",
+                                       "-R", "select[ncpus>=4]"})
                                mtx.Lock()
                                fakejobq[nextjobid] = args[1]
                                nextjobid++
@@ -93,7 +113,23 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                                        "-J", arvadostest.QueuedContainerUUID,
                                        "-n", "4",
                                        "-D", "11701MB",
-                                       "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"})
+                                       "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
+                                       "-R", "select[mem>=11701MB]",
+                                       "-R", "select[tmp>=45777MB]",
+                                       "-R", "select[ncpus>=4]"})
+                               mtx.Lock()
+                               fakejobq[nextjobid] = args[1]
+                               nextjobid++
+                               mtx.Unlock()
+                       case s.crTooBig.ContainerUUID:
+                               c.Check(args, check.DeepEquals, []string{
+                                       "-J", s.crTooBig.ContainerUUID,
+                                       "-n", "1",
+                                       "-D", "954187MB",
+                                       "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
+                                       "-R", "select[mem>=954187MB]",
+                                       "-R", "select[tmp>=256MB]",
+                                       "-R", "select[ncpus>=1]"})
                                mtx.Lock()
                                fakejobq[nextjobid] = args[1]
                                nextjobid++
@@ -107,11 +143,16 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ...
                        c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
                        var records []map[string]interface{}
                        for jobid, uuid := range fakejobq {
+                               stat, reason := "RUN", ""
+                               if uuid == s.crTooBig.ContainerUUID {
+                                       // The real bjobs output includes a trailing ';' here:
+                                       stat, reason = "PEND", "There are no suitable hosts for the job;"
+                               }
                                records = append(records, map[string]interface{}{
                                        "JOBID":       fmt.Sprintf("%d", jobid),
-                                       "STAT":        "RUN",
+                                       "STAT":        stat,
                                        "JOB_NAME":    uuid,
-                                       "PEND_REASON": "",
+                                       "PEND_REASON": reason,
                                })
                        }
                        out, err := json.Marshal(map[string]interface{}{
@@ -151,6 +192,7 @@ func (s *suite) TestSubmit(c *check.C) {
                sudoUser:  s.disp.Cluster.Containers.LSF.BsubSudoUser,
        }.stubCommand(s, c)
        s.disp.Start()
+
        deadline := time.Now().Add(20 * time.Second)
        for range time.NewTicker(time.Second).C {
                if time.Now().After(deadline) {
@@ -158,23 +200,37 @@ func (s *suite) TestSubmit(c *check.C) {
                        break
                }
                // "queuedcontainer" should be running
-               if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+               if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
                        continue
                }
                // "lockedcontainer" should be cancelled because it
                // has priority 0 (no matching container requests)
-               if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+               if _, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+                       continue
+               }
+               // "crTooBig" should be cancelled because lsf stub
+               // reports there is no suitable instance type
+               if _, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
                        continue
                }
                var ctr arvados.Container
                if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
                        c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
                        continue
-               }
-               if ctr.State != arvados.ContainerStateQueued {
+               } else if ctr.State != arvados.ContainerStateQueued {
                        c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
                        continue
                }
+
+               if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
+                       c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
+                       continue
+               } else if ctr.State != arvados.ContainerStateCancelled {
+                       c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
+                       continue
+               } else {
+                       c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
+               }
                c.Log("reached desired state")
                break
        }
index 971bdd42141cec13cae468fbd5b648edd046c2f6..3ed4d0c1820cfaad1340c1304902a7deabd0fcb7 100644 (file)
@@ -23,12 +23,12 @@ type lsfqueue struct {
        latest    map[string]bjobsEntry
 }
 
-// JobID waits for the next queue update (so even a job that was only
+// Lookup waits for the next queue update (so even a job that was only
 // submitted a nanosecond ago will show up) and then returns the LSF
-// job ID corresponding to the given container UUID.
-func (q *lsfqueue) JobID(uuid string) (string, bool) {
+// queue information corresponding to the given container UUID.
+func (q *lsfqueue) Lookup(uuid string) (bjobsEntry, bool) {
        ent, ok := q.getNext()[uuid]
-       return ent.ID, ok
+       return ent, ok
 }
 
 // All waits for the next queue update, then returns the names of all