//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
"bytes"
"fmt"
- "log"
"sort"
"strings"
"sync"
hitNiceLimit bool
}
-// Squeue implements asynchronous polling monitor of the SLURM queue using the
-// command 'squeue'.
+// SqueueChecker implements asynchronous polling monitor of the SLURM queue
+// using the command 'squeue'.
type SqueueChecker struct {
+ Logger logger
Period time.Duration
PrioritySpread int64
Slurm Slurm
// adjust slurm job nice values as needed to ensure slurm priority
// order matches Arvados priority order.
func (sqc *SqueueChecker) reniceAll() {
- sqc.lock.RLock()
- defer sqc.lock.RUnlock()
-
+ // This is slow (it shells out to scontrol many times) and no
+ // other goroutines update sqc.queue or any of the job fields
+ // we use here, so we don't acquire a lock.
jobs := make([]*slurmJob, 0, len(sqc.queue))
for _, j := range sqc.queue {
if j.wantPriority == 0 {
sort.Slice(jobs, func(i, j int) bool {
if jobs[i].wantPriority != jobs[j].wantPriority {
return jobs[i].wantPriority > jobs[j].wantPriority
- } else {
- // break ties with container uuid --
- // otherwise, the ordering would change from
- // one interval to the next, and we'd do many
- // pointless slurm queue rearrangements.
- return jobs[i].uuid > jobs[j].uuid
}
+ // break ties with container uuid --
+ // otherwise, the ordering would change from
+ // one interval to the next, and we'd do many
+ // pointless slurm queue rearrangements.
+ return jobs[i].uuid > jobs[j].uuid
})
renice := wantNice(jobs, sqc.PrioritySpread)
for i, job := range jobs {
}
err := sqc.Slurm.Renice(job.uuid, niceNew)
if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
- log.Printf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
+ sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
job.hitNiceLimit = true
}
}
// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
- sqc.lock.Lock()
- defer sqc.lock.Unlock()
-
cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
- log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+ sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
return
}
var uuid, state, reason string
var n, p int64
if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
- log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+ sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line)
continue
}
+
+ // No other goroutines write to jobs' priority or nice
+ // fields, so we can read and write them without
+ // locks.
replacing, ok := sqc.queue[uuid]
if !ok {
replacing = &slurmJob{uuid: uuid}
// "launch failed requeued held" seems to be
// another manifestation of this problem,
// resolved the same way.
- log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
+ sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
sqc.Slurm.Release(uuid)
} else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
- log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
+ sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
}
}
+ sqc.lock.Lock()
sqc.queue = newq
+ sqc.lock.Unlock()
sqc.notify.Broadcast()
}