1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
16 const slurm15NiceLimit int64 = 10000
18 type slurmJob struct {
21 priority int64 // current slurm priority (incorporates nice value)
22 nice int64 // current slurm nice value
26 // Squeue implements asynchronous polling monitor of the SLURM queue using the
28 type SqueueChecker struct {
33 queue map[string]*slurmJob
40 // HasUUID checks if a given container UUID is in the slurm queue.
41 // This does not run squeue directly, but instead blocks until woken
42 // up by next successful update of squeue.
43 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
44 sqc.startOnce.Do(sqc.start)
47 defer sqc.lock.RUnlock()
49 // block until next squeue broadcast signaling an update.
51 _, exists := sqc.queue[uuid]
55 // SetPriority sets or updates the desired (Arvados) priority for a
57 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
58 sqc.startOnce.Do(sqc.start)
61 job := sqc.queue[uuid]
63 // Wait in case the slurm job was just submitted and
64 // will appear in the next squeue update.
68 needUpdate := job != nil && job.wantPriority != want
73 job.wantPriority = want
78 // adjust slurm job nice values as needed to ensure slurm priority
79 // order matches Arvados priority order.
80 func (sqc *SqueueChecker) reniceAll() {
81 // This is slow (it shells out to scontrol many times) and no
82 // other goroutines update sqc.queue or any of the job fields
83 // we use here, so we don't acquire a lock.
84 jobs := make([]*slurmJob, 0, len(sqc.queue))
85 for _, j := range sqc.queue {
86 if j.wantPriority == 0 {
87 // SLURM job with unknown Arvados priority
88 // (perhaps it's not an Arvados job)
91 if j.priority <= 2*slurm15NiceLimit {
92 // SLURM <= 15.x implements "hold" by setting
93 // priority to 0. If we include held jobs
94 // here, we'll end up trying to push other
95 // jobs below them using negative priority,
96 // which won't help anything.
99 jobs = append(jobs, j)
102 sort.Slice(jobs, func(i, j int) bool {
103 if jobs[i].wantPriority != jobs[j].wantPriority {
104 return jobs[i].wantPriority > jobs[j].wantPriority
106 // break ties with container uuid --
107 // otherwise, the ordering would change from
108 // one interval to the next, and we'd do many
109 // pointless slurm queue rearrangements.
110 return jobs[i].uuid > jobs[j].uuid
113 renice := wantNice(jobs, sqc.PrioritySpread)
114 for i, job := range jobs {
116 if job.hitNiceLimit && niceNew > slurm15NiceLimit {
117 niceNew = slurm15NiceLimit
119 if niceNew == job.nice {
122 err := sqc.Slurm.Renice(job.uuid, niceNew)
123 if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
124 sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
125 job.hitNiceLimit = true
130 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
131 // after calling Stop.
132 func (sqc *SqueueChecker) Stop() {
138 // check gets the names of jobs in the SLURM queue (running and
139 // queued). If it succeeds, it updates sqc.queue and wakes up any
140 // goroutines that are waiting in HasUUID() or All().
141 func (sqc *SqueueChecker) check() {
142 cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
143 stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
144 cmd.Stdout, cmd.Stderr = stdout, stderr
145 if err := cmd.Run(); err != nil {
146 sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
150 lines := strings.Split(stdout.String(), "\n")
151 newq := make(map[string]*slurmJob, len(lines))
152 for _, line := range lines {
156 var uuid, state, reason string
158 if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
159 sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line)
163 // No other goroutines write to jobs' priority or nice
164 // fields, so we can read and write them without
166 replacing, ok := sqc.queue[uuid]
168 replacing = &slurmJob{uuid: uuid}
170 replacing.priority = p
172 newq[uuid] = replacing
174 if state == "PENDING" && ((reason == "BadConstraints" && p <= 2*slurm15NiceLimit) || reason == "launch failed requeued held") && replacing.wantPriority > 0 {
175 // When using SLURM 14.x or 15.x, our queued
176 // jobs land in this state when "scontrol
177 // reconfigure" invalidates their feature
178 // constraints by clearing all node features.
179 // They stay in this state even after the
180 // features reappear, until we run "scontrol
181 // release {jobid}". Priority is usually 0 in
182 // this state, but sometimes (due to a race
183 // with nice adjustments?) it's a small
186 // "scontrol release" is silent and successful
187 // regardless of whether the features have
188 // reappeared, so rather than second-guessing
189 // whether SLURM is ready, we just keep trying
190 // this until it works.
192 // "launch failed requeued held" seems to be
193 // another manifestation of this problem,
194 // resolved the same way.
195 sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
196 sqc.Slurm.Release(uuid)
197 } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
198 sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
204 sqc.notify.Broadcast()
207 // Initialize, and start a goroutine to call check() once per
208 // squeue.Period until terminated by calling Stop().
209 func (sqc *SqueueChecker) start() {
210 sqc.notify.L = sqc.lock.RLocker()
211 sqc.done = make(chan struct{})
213 ticker := time.NewTicker(sqc.Period)
224 // If this iteration took
225 // longer than sqc.Period,
226 // consume the next tick and
227 // wait. Otherwise we would
228 // starve other goroutines.
236 // All waits for the next squeue invocation, and returns all job
237 // names reported by squeue.
238 func (sqc *SqueueChecker) All() []string {
239 sqc.startOnce.Do(sqc.start)
241 defer sqc.lock.RUnlock()
244 for u := range sqc.queue {
245 uuids = append(uuids, u)