From f6e8d7c2cada1570bac3e98f0712ad8651b8d9fd Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 18 Nov 2021 16:43:33 -0500 Subject: [PATCH] 18298: Use bjobs select[] args, cancel on "no suitable host". Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- lib/config/config.default.yml | 2 +- lib/config/generated_config.go | 2 +- lib/lsf/dispatch.go | 44 +++++++++++++++----- lib/lsf/dispatch_test.go | 74 +++++++++++++++++++++++++++++----- lib/lsf/lsfqueue.go | 8 ++-- 5 files changed, 104 insertions(+), 26 deletions(-) diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml index 411a79650b..7813db4f09 100644 --- a/lib/config/config.default.yml +++ b/lib/config/config.default.yml @@ -1089,7 +1089,7 @@ Clusters: # in /tmp on the compute node each time an Arvados container # runs. Ensure you have something in place to delete old files # from /tmp, or adjust the "-o" and "-e" arguments accordingly. - BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"] + BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"] # Use sudo to switch to this user account when submitting LSF # jobs. diff --git a/lib/config/generated_config.go b/lib/config/generated_config.go index f8553c3eb7..39e4ae00cb 100644 --- a/lib/config/generated_config.go +++ b/lib/config/generated_config.go @@ -1095,7 +1095,7 @@ Clusters: # in /tmp on the compute node each time an Arvados container # runs. Ensure you have something in place to delete old files # from /tmp, or adjust the "-o" and "-e" arguments accordingly. - BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"] + BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"] # Use sudo to switch to this user account when submitting LSF # jobs. diff --git a/lib/lsf/dispatch.go b/lib/lsf/dispatch.go index 6e35b7de92..537d52a072 100644 --- a/lib/lsf/dispatch.go +++ b/lib/lsf/dispatch.go @@ -167,7 +167,7 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain if ctr.State != dispatch.Locked { // already started by prior invocation - } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok { + } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok { disp.logger.Printf("Submitting container %s to LSF", ctr.UUID) cmd := []string{disp.Cluster.Containers.CrunchRunCommand} cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine) @@ -181,16 +181,38 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State) defer disp.logger.Printf("Done monitoring container %s", ctr.UUID) - // If the container disappears from the lsf queue, there is - // no point in waiting for further dispatch updates: just - // clean up and return. go func(uuid string) { + cancelled := false for ctx.Err() == nil { - if _, ok := disp.lsfqueue.JobID(uuid); !ok { + qent, ok := disp.lsfqueue.Lookup(uuid) + if !ok { + // If the container disappears from + // the lsf queue, there is no point in + // waiting for further dispatch + // updates: just clean up and return. disp.logger.Printf("container %s job disappeared from LSF queue", uuid) cancel() return } + if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") { + disp.logger.Printf("container %s: %s", uuid, qent.PendReason) + err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{ + "container": map[string]interface{}{ + "runtime_status": map[string]string{ + "error": qent.PendReason, + }, + }, + }, nil) + if err != nil { + disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err) + continue // retry + } + err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled) + if err != nil { + continue // retry (UpdateState() already logged the error) + } + cancelled = true + } } }(ctr.UUID) @@ -236,10 +258,10 @@ func (disp *dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain // from the queue. ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) { - err := disp.lsfcli.Bkill(jobid) + for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) { + err := disp.lsfcli.Bkill(qent.ID) if err != nil { - disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err) + disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err) } <-ticker.C } @@ -262,10 +284,10 @@ func (disp *dispatcher) submit(container arvados.Container, crunchRunCommand []s } func (disp *dispatcher) bkill(ctr arvados.Container) { - if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok { + if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok { disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID) - } else if err := disp.lsfcli.Bkill(jobid); err != nil { - disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err) + } else if err := disp.lsfcli.Bkill(qent.ID); err != nil { + disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err) } } diff --git a/lib/lsf/dispatch_test.go b/lib/lsf/dispatch_test.go index bb3b1b9df6..c044df09f6 100644 --- a/lib/lsf/dispatch_test.go +++ b/lib/lsf/dispatch_test.go @@ -30,7 +30,8 @@ func Test(t *testing.T) { var _ = check.Suite(&suite{}) type suite struct { - disp *dispatcher + disp *dispatcher + crTooBig arvados.ContainerRequest } func (s *suite) TearDownTest(c *check.C) { @@ -47,6 +48,22 @@ func (s *suite) SetUpTest(c *check.C) { s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd { return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false") } + err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{ + "container_request": map[string]interface{}{ + "runtime_constraints": arvados.RuntimeConstraints{ + RAM: 1000000000000, + VCPUs: 1, + }, + "container_image": arvadostest.DockerImage112PDH, + "command": []string{"sleep", "1"}, + "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}}, + "output_path": "/mnt/out", + "state": arvados.ContainerRequestStateCommitted, + "priority": 1, + "container_count_max": 1, + }, + }) + c.Assert(err, check.IsNil) } type lsfstub struct { @@ -83,7 +100,10 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ... "-J", arvadostest.LockedContainerUUID, "-n", "4", "-D", "11701MB", - "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"}) + "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]", + "-R", "select[mem>=11701MB]", + "-R", "select[tmp>=0MB]", + "-R", "select[ncpus>=4]"}) mtx.Lock() fakejobq[nextjobid] = args[1] nextjobid++ @@ -93,7 +113,23 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ... "-J", arvadostest.QueuedContainerUUID, "-n", "4", "-D", "11701MB", - "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"}) + "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]", + "-R", "select[mem>=11701MB]", + "-R", "select[tmp>=45777MB]", + "-R", "select[ncpus>=4]"}) + mtx.Lock() + fakejobq[nextjobid] = args[1] + nextjobid++ + mtx.Unlock() + case s.crTooBig.ContainerUUID: + c.Check(args, check.DeepEquals, []string{ + "-J", s.crTooBig.ContainerUUID, + "-n", "1", + "-D", "954187MB", + "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]", + "-R", "select[mem>=954187MB]", + "-R", "select[tmp>=256MB]", + "-R", "select[ncpus>=1]"}) mtx.Lock() fakejobq[nextjobid] = args[1] nextjobid++ @@ -107,11 +143,16 @@ func (stub lsfstub) stubCommand(s *suite, c *check.C) func(prog string, args ... c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"}) var records []map[string]interface{} for jobid, uuid := range fakejobq { + stat, reason := "RUN", "" + if uuid == s.crTooBig.ContainerUUID { + // The real bjobs output includes a trailing ';' here: + stat, reason = "PEND", "There are no suitable hosts for the job;" + } records = append(records, map[string]interface{}{ "JOBID": fmt.Sprintf("%d", jobid), - "STAT": "RUN", + "STAT": stat, "JOB_NAME": uuid, - "PEND_REASON": "", + "PEND_REASON": reason, }) } out, err := json.Marshal(map[string]interface{}{ @@ -151,6 +192,7 @@ func (s *suite) TestSubmit(c *check.C) { sudoUser: s.disp.Cluster.Containers.LSF.BsubSudoUser, }.stubCommand(s, c) s.disp.Start() + deadline := time.Now().Add(20 * time.Second) for range time.NewTicker(time.Second).C { if time.Now().After(deadline) { @@ -158,23 +200,37 @@ func (s *suite) TestSubmit(c *check.C) { break } // "queuedcontainer" should be running - if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok { + if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok { continue } // "lockedcontainer" should be cancelled because it // has priority 0 (no matching container requests) - if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok { + if _, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok { + continue + } + // "crTooBig" should be cancelled because lsf stub + // reports there is no suitable instance type + if _, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok { continue } var ctr arvados.Container if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil { c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err) continue - } - if ctr.State != arvados.ContainerStateQueued { + } else if ctr.State != arvados.ContainerStateQueued { c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State) continue } + + if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil { + c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err) + continue + } else if ctr.State != arvados.ContainerStateCancelled { + c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State) + continue + } else { + c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;") + } c.Log("reached desired state") break } diff --git a/lib/lsf/lsfqueue.go b/lib/lsf/lsfqueue.go index 971bdd4214..3ed4d0c182 100644 --- a/lib/lsf/lsfqueue.go +++ b/lib/lsf/lsfqueue.go @@ -23,12 +23,12 @@ type lsfqueue struct { latest map[string]bjobsEntry } -// JobID waits for the next queue update (so even a job that was only +// Lookup waits for the next queue update (so even a job that was only // submitted a nanosecond ago will show up) and then returns the LSF -// job ID corresponding to the given container UUID. -func (q *lsfqueue) JobID(uuid string) (string, bool) { +// queue information corresponding to the given container UUID. +func (q *lsfqueue) Lookup(uuid string) (bjobsEntry, bool) { ent, ok := q.getNext()[uuid] - return ent.ID, ok + return ent, ok } // All waits for the next queue update, then returns the names of all -- 2.30.2