# in /tmp on the compute node each time an Arvados container
# runs. Ensure you have something in place to delete old files
# from /tmp, or adjust the "-o" and "-e" arguments accordingly.
- BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+ BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
# Use sudo to switch to this user account when submitting LSF
# jobs.
# in /tmp on the compute node each time an Arvados container
# runs. Ensure you have something in place to delete old files
# from /tmp, or adjust the "-o" and "-e" arguments accordingly.
- BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]"]
+ BsubArgumentsList: ["-o", "/tmp/crunch-run.%%J.out", "-e", "/tmp/crunch-run.%%J.err", "-J", "%U", "-n", "%C", "-D", "%MMB", "-R", "rusage[mem=%MMB:tmp=%TMB] span[hosts=1]", "-R", "select[mem>=%MMB]", "-R", "select[tmp>=%TMB]", "-R", "select[ncpus>=%C]"]
# Use sudo to switch to this user account when submitting LSF
# jobs.
if ctr.State != dispatch.Locked {
// already started by prior invocation
- } else if _, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ } else if _, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
disp.logger.Printf("Submitting container %s to LSF", ctr.UUID)
cmd := []string{disp.Cluster.Containers.CrunchRunCommand}
cmd = append(cmd, "--runtime-engine="+disp.Cluster.Containers.RuntimeEngine)
disp.logger.Printf("Start monitoring container %v in state %q", ctr.UUID, ctr.State)
defer disp.logger.Printf("Done monitoring container %s", ctr.UUID)
- // If the container disappears from the lsf queue, there is
- // no point in waiting for further dispatch updates: just
- // clean up and return.
go func(uuid string) {
+ cancelled := false
for ctx.Err() == nil {
- if _, ok := disp.lsfqueue.JobID(uuid); !ok {
+ qent, ok := disp.lsfqueue.Lookup(uuid)
+ if !ok {
+ // If the container disappears from
+ // the lsf queue, there is no point in
+ // waiting for further dispatch
+ // updates: just clean up and return.
disp.logger.Printf("container %s job disappeared from LSF queue", uuid)
cancel()
return
}
+ if !cancelled && qent.Stat == "PEND" && strings.Contains(qent.PendReason, "There are no suitable hosts for the job") {
+ disp.logger.Printf("container %s: %s", uuid, qent.PendReason)
+ err := disp.arvDispatcher.Arv.Update("containers", uuid, arvadosclient.Dict{
+ "container": map[string]interface{}{
+ "runtime_status": map[string]string{
+ "error": qent.PendReason,
+ },
+ },
+ }, nil)
+ if err != nil {
+ disp.logger.Printf("error setting runtime_status on %s: %s", uuid, err)
+ continue // retry
+ }
+ err = disp.arvDispatcher.UpdateState(uuid, dispatch.Cancelled)
+ if err != nil {
+ continue // retry (UpdateState() already logged the error)
+ }
+ cancelled = true
+ }
}
}(ctr.UUID)
// from the queue.
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
- for jobid, ok := disp.lsfqueue.JobID(ctr.UUID); ok; _, ok = disp.lsfqueue.JobID(ctr.UUID) {
- err := disp.lsfcli.Bkill(jobid)
+ for qent, ok := disp.lsfqueue.Lookup(ctr.UUID); ok; _, ok = disp.lsfqueue.Lookup(ctr.UUID) {
+ err := disp.lsfcli.Bkill(qent.ID)
if err != nil {
- disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
}
<-ticker.C
}
}
func (disp *dispatcher) bkill(ctr arvados.Container) {
- if jobid, ok := disp.lsfqueue.JobID(ctr.UUID); !ok {
+ if qent, ok := disp.lsfqueue.Lookup(ctr.UUID); !ok {
disp.logger.Debugf("bkill(%s): redundant, job not in queue", ctr.UUID)
- } else if err := disp.lsfcli.Bkill(jobid); err != nil {
- disp.logger.Warnf("%s: bkill(%d): %s", ctr.UUID, jobid, err)
+ } else if err := disp.lsfcli.Bkill(qent.ID); err != nil {
+ disp.logger.Warnf("%s: bkill(%s): %s", ctr.UUID, qent.ID, err)
}
}
var _ = check.Suite(&suite{})
type suite struct {
- disp *dispatcher
+ disp *dispatcher
+ crTooBig arvados.ContainerRequest
}
func (s *suite) TearDownTest(c *check.C) {
s.disp.lsfcli.stubCommand = func(string, ...string) *exec.Cmd {
return exec.Command("bash", "-c", "echo >&2 unimplemented stub; false")
}
+ err = arvados.NewClientFromEnv().RequestAndDecode(&s.crTooBig, "POST", "arvados/v1/container_requests", nil, map[string]interface{}{
+ "container_request": map[string]interface{}{
+ "runtime_constraints": arvados.RuntimeConstraints{
+ RAM: 1000000000000,
+ VCPUs: 1,
+ },
+ "container_image": arvadostest.DockerImage112PDH,
+ "command": []string{"sleep", "1"},
+ "mounts": map[string]arvados.Mount{"/mnt/out": {Kind: "tmp", Capacity: 1000}},
+ "output_path": "/mnt/out",
+ "state": arvados.ContainerRequestStateCommitted,
+ "priority": 1,
+ "container_count_max": 1,
+ },
+ })
+ c.Assert(err, check.IsNil)
}
type lsfstub struct {
"-J", arvadostest.LockedContainerUUID,
"-n", "4",
"-D", "11701MB",
- "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]"})
+ "-R", "rusage[mem=11701MB:tmp=0MB] span[hosts=1]",
+ "-R", "select[mem>=11701MB]",
+ "-R", "select[tmp>=0MB]",
+ "-R", "select[ncpus>=4]"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
nextjobid++
"-J", arvadostest.QueuedContainerUUID,
"-n", "4",
"-D", "11701MB",
- "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]"})
+ "-R", "rusage[mem=11701MB:tmp=45777MB] span[hosts=1]",
+ "-R", "select[mem>=11701MB]",
+ "-R", "select[tmp>=45777MB]",
+ "-R", "select[ncpus>=4]"})
+ mtx.Lock()
+ fakejobq[nextjobid] = args[1]
+ nextjobid++
+ mtx.Unlock()
+ case s.crTooBig.ContainerUUID:
+ c.Check(args, check.DeepEquals, []string{
+ "-J", s.crTooBig.ContainerUUID,
+ "-n", "1",
+ "-D", "954187MB",
+ "-R", "rusage[mem=954187MB:tmp=256MB] span[hosts=1]",
+ "-R", "select[mem>=954187MB]",
+ "-R", "select[tmp>=256MB]",
+ "-R", "select[ncpus>=1]"})
mtx.Lock()
fakejobq[nextjobid] = args[1]
nextjobid++
c.Check(args, check.DeepEquals, []string{"-u", "all", "-o", "jobid stat job_name pend_reason", "-json"})
var records []map[string]interface{}
for jobid, uuid := range fakejobq {
+ stat, reason := "RUN", ""
+ if uuid == s.crTooBig.ContainerUUID {
+ // The real bjobs output includes a trailing ';' here:
+ stat, reason = "PEND", "There are no suitable hosts for the job;"
+ }
records = append(records, map[string]interface{}{
"JOBID": fmt.Sprintf("%d", jobid),
- "STAT": "RUN",
+ "STAT": stat,
"JOB_NAME": uuid,
- "PEND_REASON": "",
+ "PEND_REASON": reason,
})
}
out, err := json.Marshal(map[string]interface{}{
sudoUser: s.disp.Cluster.Containers.LSF.BsubSudoUser,
}.stubCommand(s, c)
s.disp.Start()
+
deadline := time.Now().Add(20 * time.Second)
for range time.NewTicker(time.Second).C {
if time.Now().After(deadline) {
break
}
// "queuedcontainer" should be running
- if _, ok := s.disp.lsfqueue.JobID(arvadostest.QueuedContainerUUID); !ok {
+ if _, ok := s.disp.lsfqueue.Lookup(arvadostest.QueuedContainerUUID); !ok {
continue
}
// "lockedcontainer" should be cancelled because it
// has priority 0 (no matching container requests)
- if _, ok := s.disp.lsfqueue.JobID(arvadostest.LockedContainerUUID); ok {
+ if _, ok := s.disp.lsfqueue.Lookup(arvadostest.LockedContainerUUID); ok {
+ continue
+ }
+ // "crTooBig" should be cancelled because lsf stub
+ // reports there is no suitable instance type
+ if _, ok := s.disp.lsfqueue.Lookup(s.crTooBig.ContainerUUID); ok {
continue
}
var ctr arvados.Container
if err := s.disp.arvDispatcher.Arv.Get("containers", arvadostest.LockedContainerUUID, nil, &ctr); err != nil {
c.Logf("error getting container state for %s: %s", arvadostest.LockedContainerUUID, err)
continue
- }
- if ctr.State != arvados.ContainerStateQueued {
+ } else if ctr.State != arvados.ContainerStateQueued {
c.Logf("LockedContainer is not in the LSF queue but its arvados record has not been updated to state==Queued (state is %q)", ctr.State)
continue
}
+
+ if err := s.disp.arvDispatcher.Arv.Get("containers", s.crTooBig.ContainerUUID, nil, &ctr); err != nil {
+ c.Logf("error getting container state for %s: %s", s.crTooBig.ContainerUUID, err)
+ continue
+ } else if ctr.State != arvados.ContainerStateCancelled {
+ c.Logf("container %s is not in the LSF queue but its arvados record has not been updated to state==Cancelled (state is %q)", s.crTooBig.ContainerUUID, ctr.State)
+ continue
+ } else {
+ c.Check(ctr.RuntimeStatus["error"], check.Equals, "There are no suitable hosts for the job;")
+ }
c.Log("reached desired state")
break
}
latest map[string]bjobsEntry
}
-// JobID waits for the next queue update (so even a job that was only
+// Lookup waits for the next queue update (so even a job that was only
// submitted a nanosecond ago will show up) and then returns the LSF
-// job ID corresponding to the given container UUID.
-func (q *lsfqueue) JobID(uuid string) (string, bool) {
+// queue information corresponding to the given container UUID.
+func (q *lsfqueue) Lookup(uuid string) (bjobsEntry, bool) {
ent, ok := q.getNext()[uuid]
- return ent.ID, ok
+ return ent, ok
}
// All waits for the next queue update, then returns the names of all