X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/90483e26cf9f0ced8e5b12586ae3a5e1ec620add..f4aa4dbbefe8b6dd65e3a112642da288774cf951:/services/crunch-dispatch-slurm/squeue.go diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index adb620ea8d..742943f197 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -8,24 +8,28 @@ import ( "bytes" "fmt" "log" + "sort" "strings" "sync" "time" ) -type jobPriority struct { - niceness int - currentPriority int +type slurmJob struct { + uuid string + wantPriority int64 + priority int64 // current slurm priority (incorporates nice value) + nice int64 // current slurm nice value } // Squeue implements asynchronous polling monitor of the SLURM queue using the // command 'squeue'. type SqueueChecker struct { - Period time.Duration - Slurm Slurm - uuids map[string]jobPriority - startOnce sync.Once - done chan struct{} + Period time.Duration + PrioritySpread int64 + Slurm Slurm + queue map[string]*slurmJob + startOnce sync.Once + done chan struct{} sync.Cond } @@ -40,22 +44,69 @@ func (sqc *SqueueChecker) HasUUID(uuid string) bool { // block until next squeue broadcast signaling an update. sqc.Wait() - _, exists := sqc.uuids[uuid] + _, exists := sqc.queue[uuid] return exists } -// GetNiceness returns the niceness of a given uuid, or -1 if it doesn't exist. -func (sqc *SqueueChecker) GetNiceness(uuid string) int { +// SetPriority sets or updates the desired (Arvados) priority for a +// container. +func (sqc *SqueueChecker) SetPriority(uuid string, want int64) { sqc.startOnce.Do(sqc.start) + sqc.L.Lock() + defer sqc.L.Unlock() + job, ok := sqc.queue[uuid] + if !ok { + // Wait in case the slurm job was just submitted and + // will appear in the next squeue update. + sqc.Wait() + if job, ok = sqc.queue[uuid]; !ok { + return + } + } + job.wantPriority = want +} +// adjust slurm job nice values as needed to ensure slurm priority +// order matches Arvados priority order. +func (sqc *SqueueChecker) reniceAll() { sqc.L.Lock() defer sqc.L.Unlock() - n, exists := sqc.uuids[uuid] - if exists { - return n.niceness - } else { - return -1 + jobs := make([]*slurmJob, 0, len(sqc.queue)) + for _, j := range sqc.queue { + if j.wantPriority == 0 { + // SLURM job with unknown Arvados priority + // (perhaps it's not an Arvados job) + continue + } + if j.priority == 0 { + // SLURM <= 15.x implements "hold" by setting + // priority to 0. If we include held jobs + // here, we'll end up trying to push other + // jobs below them using negative priority, + // which won't help anything. + continue + } + jobs = append(jobs, j) + } + + sort.Slice(jobs, func(i, j int) bool { + if jobs[i].wantPriority != jobs[j].wantPriority { + return jobs[i].wantPriority > jobs[j].wantPriority + } else { + // break ties with container uuid -- + // otherwise, the ordering would change from + // one interval to the next, and we'd do many + // pointless slurm queue rearrangements. + return jobs[i].uuid > jobs[j].uuid + } + }) + renice := wantNice(jobs, sqc.PrioritySpread) + for i, job := range jobs { + if renice[i] == job.nice { + continue + } + sqc.Slurm.Renice(job.uuid, renice[i]) } } @@ -68,7 +119,7 @@ func (sqc *SqueueChecker) Stop() { } // check gets the names of jobs in the SLURM queue (running and -// queued). If it succeeds, it updates squeue.uuids and wakes up any +// queued). If it succeeds, it updates sqc.queue and wakes up any // goroutines that are waiting in HasUUID() or All(). func (sqc *SqueueChecker) check() { // Mutex between squeue sync and running sbatch or scancel. This @@ -78,7 +129,7 @@ func (sqc *SqueueChecker) check() { sqc.L.Lock() defer sqc.L.Unlock() - cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"}) + cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"}) stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{} cmd.Stdout, cmd.Stderr = stdout, stderr if err := cmd.Run(); err != nil { @@ -87,16 +138,50 @@ func (sqc *SqueueChecker) check() { } lines := strings.Split(stdout.String(), "\n") - sqc.uuids = make(map[string]jobPriority, len(lines)) + newq := make(map[string]*slurmJob, len(lines)) for _, line := range lines { - var uuid string - var nice int - var prio int - fmt.Sscan(line, &uuid, &nice, &prio) - if uuid != "" { - sqc.uuids[uuid] = jobPriority{nice, prio} + if line == "" { + continue + } + var uuid, state, reason string + var n, p int64 + if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil { + log.Printf("warning: ignoring unparsed line in squeue output: %q", line) + continue + } + replacing, ok := sqc.queue[uuid] + if !ok { + replacing = &slurmJob{uuid: uuid} + } + replacing.priority = p + replacing.nice = n + newq[uuid] = replacing + + if state == "PENDING" && ((reason == "BadConstraints" && p == 0) || reason == "launch failed requeued held") && replacing.wantPriority > 0 { + // When using SLURM 14.x or 15.x, our queued + // jobs land in this state when "scontrol + // reconfigure" invalidates their feature + // constraints by clearing all node features. + // They stay in this state even after the + // features reappear, until we run "scontrol + // release {jobid}". + // + // "scontrol release" is silent and successful + // regardless of whether the features have + // reappeared, so rather than second-guessing + // whether SLURM is ready, we just keep trying + // this until it works. + // + // "launch failed requeued held" seems to be + // another manifestation of this problem, + // resolved the same way. + log.Printf("releasing held job %q", uuid) + sqc.Slurm.Release(uuid) + } else if p < 1<<20 && replacing.wantPriority > 0 { + log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason) } } + sqc.queue = newq sqc.Broadcast() } @@ -114,6 +199,7 @@ func (sqc *SqueueChecker) start() { return case <-ticker.C: sqc.check() + sqc.reniceAll() } } }() @@ -127,8 +213,8 @@ func (sqc *SqueueChecker) All() []string { defer sqc.L.Unlock() sqc.Wait() var uuids []string - for uuid := range sqc.uuids { - uuids = append(uuids, uuid) + for u := range sqc.queue { + uuids = append(uuids, u) } return uuids }