Merge branch '14596-no-requeue'
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "sort"
11         "strings"
12         "sync"
13         "time"
14 )
15
16 const slurm15NiceLimit int64 = 10000
17
18 type slurmJob struct {
19         uuid         string
20         wantPriority int64
21         priority     int64 // current slurm priority (incorporates nice value)
22         nice         int64 // current slurm nice value
23         hitNiceLimit bool
24 }
25
26 // Squeue implements asynchronous polling monitor of the SLURM queue using the
27 // command 'squeue'.
28 type SqueueChecker struct {
29         Logger         logger
30         Period         time.Duration
31         PrioritySpread int64
32         Slurm          Slurm
33         queue          map[string]*slurmJob
34         startOnce      sync.Once
35         done           chan struct{}
36         lock           sync.RWMutex
37         notify         sync.Cond
38 }
39
40 // HasUUID checks if a given container UUID is in the slurm queue.
41 // This does not run squeue directly, but instead blocks until woken
42 // up by next successful update of squeue.
43 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
44         sqc.startOnce.Do(sqc.start)
45
46         sqc.lock.RLock()
47         defer sqc.lock.RUnlock()
48
49         // block until next squeue broadcast signaling an update.
50         sqc.notify.Wait()
51         _, exists := sqc.queue[uuid]
52         return exists
53 }
54
55 // SetPriority sets or updates the desired (Arvados) priority for a
56 // container.
57 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
58         sqc.startOnce.Do(sqc.start)
59
60         sqc.lock.RLock()
61         job := sqc.queue[uuid]
62         if job == nil {
63                 // Wait in case the slurm job was just submitted and
64                 // will appear in the next squeue update.
65                 sqc.notify.Wait()
66                 job = sqc.queue[uuid]
67         }
68         needUpdate := job != nil && job.wantPriority != want
69         sqc.lock.RUnlock()
70
71         if needUpdate {
72                 sqc.lock.Lock()
73                 job.wantPriority = want
74                 sqc.lock.Unlock()
75         }
76 }
77
78 // adjust slurm job nice values as needed to ensure slurm priority
79 // order matches Arvados priority order.
80 func (sqc *SqueueChecker) reniceAll() {
81         // This is slow (it shells out to scontrol many times) and no
82         // other goroutines update sqc.queue or any of the job fields
83         // we use here, so we don't acquire a lock.
84         jobs := make([]*slurmJob, 0, len(sqc.queue))
85         for _, j := range sqc.queue {
86                 if j.wantPriority == 0 {
87                         // SLURM job with unknown Arvados priority
88                         // (perhaps it's not an Arvados job)
89                         continue
90                 }
91                 if j.priority <= 2*slurm15NiceLimit {
92                         // SLURM <= 15.x implements "hold" by setting
93                         // priority to 0. If we include held jobs
94                         // here, we'll end up trying to push other
95                         // jobs below them using negative priority,
96                         // which won't help anything.
97                         continue
98                 }
99                 jobs = append(jobs, j)
100         }
101
102         sort.Slice(jobs, func(i, j int) bool {
103                 if jobs[i].wantPriority != jobs[j].wantPriority {
104                         return jobs[i].wantPriority > jobs[j].wantPriority
105                 } else {
106                         // break ties with container uuid --
107                         // otherwise, the ordering would change from
108                         // one interval to the next, and we'd do many
109                         // pointless slurm queue rearrangements.
110                         return jobs[i].uuid > jobs[j].uuid
111                 }
112         })
113         renice := wantNice(jobs, sqc.PrioritySpread)
114         for i, job := range jobs {
115                 niceNew := renice[i]
116                 if job.hitNiceLimit && niceNew > slurm15NiceLimit {
117                         niceNew = slurm15NiceLimit
118                 }
119                 if niceNew == job.nice {
120                         continue
121                 }
122                 err := sqc.Slurm.Renice(job.uuid, niceNew)
123                 if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
124                         sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
125                         job.hitNiceLimit = true
126                 }
127         }
128 }
129
130 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
131 // after calling Stop.
132 func (sqc *SqueueChecker) Stop() {
133         if sqc.done != nil {
134                 close(sqc.done)
135         }
136 }
137
138 // check gets the names of jobs in the SLURM queue (running and
139 // queued). If it succeeds, it updates sqc.queue and wakes up any
140 // goroutines that are waiting in HasUUID() or All().
141 func (sqc *SqueueChecker) check() {
142         cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
143         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
144         cmd.Stdout, cmd.Stderr = stdout, stderr
145         if err := cmd.Run(); err != nil {
146                 sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
147                 return
148         }
149
150         lines := strings.Split(stdout.String(), "\n")
151         newq := make(map[string]*slurmJob, len(lines))
152         for _, line := range lines {
153                 if line == "" {
154                         continue
155                 }
156                 var uuid, state, reason string
157                 var n, p int64
158                 if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
159                         sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line)
160                         continue
161                 }
162
163                 // No other goroutines write to jobs' priority or nice
164                 // fields, so we can read and write them without
165                 // locks.
166                 replacing, ok := sqc.queue[uuid]
167                 if !ok {
168                         replacing = &slurmJob{uuid: uuid}
169                 }
170                 replacing.priority = p
171                 replacing.nice = n
172                 newq[uuid] = replacing
173
174                 if state == "PENDING" && ((reason == "BadConstraints" && p <= 2*slurm15NiceLimit) || reason == "launch failed requeued held") && replacing.wantPriority > 0 {
175                         // When using SLURM 14.x or 15.x, our queued
176                         // jobs land in this state when "scontrol
177                         // reconfigure" invalidates their feature
178                         // constraints by clearing all node features.
179                         // They stay in this state even after the
180                         // features reappear, until we run "scontrol
181                         // release {jobid}". Priority is usually 0 in
182                         // this state, but sometimes (due to a race
183                         // with nice adjustments?) it's a small
184                         // positive value.
185                         //
186                         // "scontrol release" is silent and successful
187                         // regardless of whether the features have
188                         // reappeared, so rather than second-guessing
189                         // whether SLURM is ready, we just keep trying
190                         // this until it works.
191                         //
192                         // "launch failed requeued held" seems to be
193                         // another manifestation of this problem,
194                         // resolved the same way.
195                         sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
196                         sqc.Slurm.Release(uuid)
197                 } else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
198                         sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
199                 }
200         }
201         sqc.lock.Lock()
202         sqc.queue = newq
203         sqc.lock.Unlock()
204         sqc.notify.Broadcast()
205 }
206
207 // Initialize, and start a goroutine to call check() once per
208 // squeue.Period until terminated by calling Stop().
209 func (sqc *SqueueChecker) start() {
210         sqc.notify.L = sqc.lock.RLocker()
211         sqc.done = make(chan struct{})
212         go func() {
213                 ticker := time.NewTicker(sqc.Period)
214                 for {
215                         select {
216                         case <-sqc.done:
217                                 ticker.Stop()
218                                 return
219                         case <-ticker.C:
220                                 sqc.check()
221                                 sqc.reniceAll()
222                                 select {
223                                 case <-ticker.C:
224                                         // If this iteration took
225                                         // longer than sqc.Period,
226                                         // consume the next tick and
227                                         // wait. Otherwise we would
228                                         // starve other goroutines.
229                                 default:
230                                 }
231                         }
232                 }
233         }()
234 }
235
236 // All waits for the next squeue invocation, and returns all job
237 // names reported by squeue.
238 func (sqc *SqueueChecker) All() []string {
239         sqc.startOnce.Do(sqc.start)
240         sqc.lock.RLock()
241         defer sqc.lock.RUnlock()
242         sqc.notify.Wait()
243         var uuids []string
244         for u := range sqc.queue {
245                 uuids = append(uuids, u)
246         }
247         return uuids
248 }