projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
15003: Merge branch 'master'
[arvados.git]
/
services
/
crunch-dispatch-slurm
/
squeue.go
diff --git
a/services/crunch-dispatch-slurm/squeue.go
b/services/crunch-dispatch-slurm/squeue.go
index 9514da822b7e748a7498fab0aafb2104371dae65..5aee7e087b2658945b2eebe1f2f309d67c351d16 100644
(file)
--- a/
services/crunch-dispatch-slurm/squeue.go
+++ b/
services/crunch-dispatch-slurm/squeue.go
@@
-7,30
+7,34
@@
package main
import (
"bytes"
"fmt"
import (
"bytes"
"fmt"
- "log"
"sort"
"strings"
"sync"
"time"
)
"sort"
"strings"
"sync"
"time"
)
+const slurm15NiceLimit int64 = 10000
+
type slurmJob struct {
uuid string
wantPriority int64
priority int64 // current slurm priority (incorporates nice value)
nice int64 // current slurm nice value
type slurmJob struct {
uuid string
wantPriority int64
priority int64 // current slurm priority (incorporates nice value)
nice int64 // current slurm nice value
+ hitNiceLimit bool
}
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
type SqueueChecker struct {
}
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
type SqueueChecker struct {
+ Logger logger
Period time.Duration
PrioritySpread int64
Slurm Slurm
queue map[string]*slurmJob
startOnce sync.Once
done chan struct{}
Period time.Duration
PrioritySpread int64
Slurm Slurm
queue map[string]*slurmJob
startOnce sync.Once
done chan struct{}
- sync.Cond
+ lock sync.RWMutex
+ notify sync.Cond
}
// HasUUID checks if a given container UUID is in the slurm queue.
}
// HasUUID checks if a given container UUID is in the slurm queue.
@@
-39,11
+43,11
@@
type SqueueChecker struct {
func (sqc *SqueueChecker) HasUUID(uuid string) bool {
sqc.startOnce.Do(sqc.start)
func (sqc *SqueueChecker) HasUUID(uuid string) bool {
sqc.startOnce.Do(sqc.start)
- sqc.
L.
Lock()
- defer sqc.
L.
Unlock()
+ sqc.
lock.R
Lock()
+ defer sqc.
lock.R
Unlock()
// block until next squeue broadcast signaling an update.
// block until next squeue broadcast signaling an update.
- sqc.Wait()
+ sqc.
notify.
Wait()
_, exists := sqc.queue[uuid]
return exists
}
_, exists := sqc.queue[uuid]
return exists
}
@@
-52,26
+56,31
@@
func (sqc *SqueueChecker) HasUUID(uuid string) bool {
// container.
func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
sqc.startOnce.Do(sqc.start)
// container.
func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
sqc.startOnce.Do(sqc.start)
- sqc.L.Lock()
-
defer sqc.L.Unl
ock()
- job
, ok
:= sqc.queue[uuid]
- if
!ok
{
+
+
sqc.lock.RL
ock()
+ job := sqc.queue[uuid]
+ if
job == nil
{
// Wait in case the slurm job was just submitted and
// will appear in the next squeue update.
// Wait in case the slurm job was just submitted and
// will appear in the next squeue update.
- sqc.Wait()
- if job, ok = sqc.queue[uuid]; !ok {
- return
- }
+ sqc.notify.Wait()
+ job = sqc.queue[uuid]
+ }
+ needUpdate := job != nil && job.wantPriority != want
+ sqc.lock.RUnlock()
+
+ if needUpdate {
+ sqc.lock.Lock()
+ job.wantPriority = want
+ sqc.lock.Unlock()
}
}
- job.wantPriority = want
}
// adjust slurm job nice values as needed to ensure slurm priority
// order matches Arvados priority order.
func (sqc *SqueueChecker) reniceAll() {
}
// adjust slurm job nice values as needed to ensure slurm priority
// order matches Arvados priority order.
func (sqc *SqueueChecker) reniceAll() {
- sqc.L.Lock()
- defer sqc.L.Unlock()
-
+ // This is slow (it shells out to scontrol many times) and no
+ // other goroutines update sqc.queue or any of the job fields
+ // we use here, so we don't acquire a lock.
jobs := make([]*slurmJob, 0, len(sqc.queue))
for _, j := range sqc.queue {
if j.wantPriority == 0 {
jobs := make([]*slurmJob, 0, len(sqc.queue))
for _, j := range sqc.queue {
if j.wantPriority == 0 {
@@
-79,7
+88,7
@@
func (sqc *SqueueChecker) reniceAll() {
// (perhaps it's not an Arvados job)
continue
}
// (perhaps it's not an Arvados job)
continue
}
- if j.priority
== 0
{
+ if j.priority
<= 2*slurm15NiceLimit
{
// SLURM <= 15.x implements "hold" by setting
// priority to 0. If we include held jobs
// here, we'll end up trying to push other
// SLURM <= 15.x implements "hold" by setting
// priority to 0. If we include held jobs
// here, we'll end up trying to push other
@@
-103,10
+112,18
@@
func (sqc *SqueueChecker) reniceAll() {
})
renice := wantNice(jobs, sqc.PrioritySpread)
for i, job := range jobs {
})
renice := wantNice(jobs, sqc.PrioritySpread)
for i, job := range jobs {
- if renice[i] == job.nice {
+ niceNew := renice[i]
+ if job.hitNiceLimit && niceNew > slurm15NiceLimit {
+ niceNew = slurm15NiceLimit
+ }
+ if niceNew == job.nice {
continue
}
continue
}
- sqc.Slurm.Renice(job.uuid, renice[i])
+ err := sqc.Slurm.Renice(job.uuid, niceNew)
+ if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
+ sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
+ job.hitNiceLimit = true
+ }
}
}
}
}
@@
-122,18
+139,11
@@
func (sqc *SqueueChecker) Stop() {
// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
// queued). If it succeeds, it updates sqc.queue and wakes up any
// goroutines that are waiting in HasUUID() or All().
func (sqc *SqueueChecker) check() {
- // Mutex between squeue sync and running sbatch or scancel. This
- // establishes a sequence so that squeue doesn't run concurrently with
- // sbatch or scancel; the next update of squeue will occur only after
- // sbatch or scancel has completed.
- sqc.L.Lock()
- defer sqc.L.Unlock()
-
cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
-
log.Print
f("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+
sqc.Logger.Warn
f("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
return
}
return
}
@@
-146,9
+156,13
@@
func (sqc *SqueueChecker) check() {
var uuid, state, reason string
var n, p int64
if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
var uuid, state, reason string
var n, p int64
if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
-
log.Printf("warning:
ignoring unparsed line in squeue output: %q", line)
+
sqc.Logger.Warnf("
ignoring unparsed line in squeue output: %q", line)
continue
}
continue
}
+
+ // No other goroutines write to jobs' priority or nice
+ // fields, so we can read and write them without
+ // locks.
replacing, ok := sqc.queue[uuid]
if !ok {
replacing = &slurmJob{uuid: uuid}
replacing, ok := sqc.queue[uuid]
if !ok {
replacing = &slurmJob{uuid: uuid}
@@
-157,34
+171,43
@@
func (sqc *SqueueChecker) check() {
replacing.nice = n
newq[uuid] = replacing
replacing.nice = n
newq[uuid] = replacing
- if state == "PENDING" &&
reason == "BadConstraints" && p == 0
&& replacing.wantPriority > 0 {
+ if state == "PENDING" &&
((reason == "BadConstraints" && p <= 2*slurm15NiceLimit) || reason == "launch failed requeued held")
&& replacing.wantPriority > 0 {
// When using SLURM 14.x or 15.x, our queued
// jobs land in this state when "scontrol
// reconfigure" invalidates their feature
// constraints by clearing all node features.
// They stay in this state even after the
// features reappear, until we run "scontrol
// When using SLURM 14.x or 15.x, our queued
// jobs land in this state when "scontrol
// reconfigure" invalidates their feature
// constraints by clearing all node features.
// They stay in this state even after the
// features reappear, until we run "scontrol
- // release {jobid}".
+ // release {jobid}". Priority is usually 0 in
+ // this state, but sometimes (due to a race
+ // with nice adjustments?) it's a small
+ // positive value.
//
// "scontrol release" is silent and successful
// regardless of whether the features have
// reappeared, so rather than second-guessing
// whether SLURM is ready, we just keep trying
// this until it works.
//
// "scontrol release" is silent and successful
// regardless of whether the features have
// reappeared, so rather than second-guessing
// whether SLURM is ready, we just keep trying
// this until it works.
- log.Printf("releasing held job %q", uuid)
+ //
+ // "launch failed requeued held" seems to be
+ // another manifestation of this problem,
+ // resolved the same way.
+ sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
sqc.Slurm.Release(uuid)
sqc.Slurm.Release(uuid)
- } else if
p < 1<<20
&& replacing.wantPriority > 0 {
-
log.Printf("warning:
job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
+ } else if
state != "RUNNING" && p <= 2*slurm15NiceLimit
&& replacing.wantPriority > 0 {
+
sqc.Logger.Warnf("
job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
}
}
}
}
+ sqc.lock.Lock()
sqc.queue = newq
sqc.queue = newq
- sqc.Broadcast()
+ sqc.lock.Unlock()
+ sqc.notify.Broadcast()
}
// Initialize, and start a goroutine to call check() once per
// squeue.Period until terminated by calling Stop().
func (sqc *SqueueChecker) start() {
}
// Initialize, and start a goroutine to call check() once per
// squeue.Period until terminated by calling Stop().
func (sqc *SqueueChecker) start() {
- sqc.
L = &sync.Mutex{}
+ sqc.
notify.L = sqc.lock.RLocker()
sqc.done = make(chan struct{})
go func() {
ticker := time.NewTicker(sqc.Period)
sqc.done = make(chan struct{})
go func() {
ticker := time.NewTicker(sqc.Period)
@@
-196,6
+219,15
@@
func (sqc *SqueueChecker) start() {
case <-ticker.C:
sqc.check()
sqc.reniceAll()
case <-ticker.C:
sqc.check()
sqc.reniceAll()
+ select {
+ case <-ticker.C:
+ // If this iteration took
+ // longer than sqc.Period,
+ // consume the next tick and
+ // wait. Otherwise we would
+ // starve other goroutines.
+ default:
+ }
}
}
}()
}
}
}()
@@
-205,9
+237,9
@@
func (sqc *SqueueChecker) start() {
// names reported by squeue.
func (sqc *SqueueChecker) All() []string {
sqc.startOnce.Do(sqc.start)
// names reported by squeue.
func (sqc *SqueueChecker) All() []string {
sqc.startOnce.Do(sqc.start)
- sqc.
L.
Lock()
- defer sqc.
L.
Unlock()
- sqc.Wait()
+ sqc.
lock.R
Lock()
+ defer sqc.
lock.R
Unlock()
+ sqc.
notify.
Wait()
var uuids []string
for u := range sqc.queue {
uuids = append(uuids, u)
var uuids []string
for u := range sqc.queue {
uuids = append(uuids, u)