13399: Link to wiki in warning message.
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "log"
11         "sort"
12         "strings"
13         "sync"
14         "time"
15 )
16
17 const slurm15NiceLimit int64 = 10000
18
19 type slurmJob struct {
20         uuid         string
21         wantPriority int64
22         priority     int64 // current slurm priority (incorporates nice value)
23         nice         int64 // current slurm nice value
24         hitNiceLimit bool
25 }
26
27 // Squeue implements asynchronous polling monitor of the SLURM queue using the
28 // command 'squeue'.
29 type SqueueChecker struct {
30         Period         time.Duration
31         PrioritySpread int64
32         Slurm          Slurm
33         queue          map[string]*slurmJob
34         startOnce      sync.Once
35         done           chan struct{}
36         sync.Cond
37 }
38
39 // HasUUID checks if a given container UUID is in the slurm queue.
40 // This does not run squeue directly, but instead blocks until woken
41 // up by next successful update of squeue.
42 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
43         sqc.startOnce.Do(sqc.start)
44
45         sqc.L.Lock()
46         defer sqc.L.Unlock()
47
48         // block until next squeue broadcast signaling an update.
49         sqc.Wait()
50         _, exists := sqc.queue[uuid]
51         return exists
52 }
53
54 // SetPriority sets or updates the desired (Arvados) priority for a
55 // container.
56 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
57         sqc.startOnce.Do(sqc.start)
58         sqc.L.Lock()
59         defer sqc.L.Unlock()
60         job, ok := sqc.queue[uuid]
61         if !ok {
62                 // Wait in case the slurm job was just submitted and
63                 // will appear in the next squeue update.
64                 sqc.Wait()
65                 if job, ok = sqc.queue[uuid]; !ok {
66                         return
67                 }
68         }
69         job.wantPriority = want
70 }
71
72 // adjust slurm job nice values as needed to ensure slurm priority
73 // order matches Arvados priority order.
74 func (sqc *SqueueChecker) reniceAll() {
75         sqc.L.Lock()
76         defer sqc.L.Unlock()
77
78         jobs := make([]*slurmJob, 0, len(sqc.queue))
79         for _, j := range sqc.queue {
80                 if j.wantPriority == 0 {
81                         // SLURM job with unknown Arvados priority
82                         // (perhaps it's not an Arvados job)
83                         continue
84                 }
85                 if j.priority == 0 {
86                         // SLURM <= 15.x implements "hold" by setting
87                         // priority to 0. If we include held jobs
88                         // here, we'll end up trying to push other
89                         // jobs below them using negative priority,
90                         // which won't help anything.
91                         continue
92                 }
93                 jobs = append(jobs, j)
94         }
95
96         sort.Slice(jobs, func(i, j int) bool {
97                 if jobs[i].wantPriority != jobs[j].wantPriority {
98                         return jobs[i].wantPriority > jobs[j].wantPriority
99                 } else {
100                         // break ties with container uuid --
101                         // otherwise, the ordering would change from
102                         // one interval to the next, and we'd do many
103                         // pointless slurm queue rearrangements.
104                         return jobs[i].uuid > jobs[j].uuid
105                 }
106         })
107         renice := wantNice(jobs, sqc.PrioritySpread)
108         for i, job := range jobs {
109                 niceNew := renice[i]
110                 if job.hitNiceLimit && niceNew > slurm15NiceLimit {
111                         niceNew = slurm15NiceLimit
112                 }
113                 if niceNew == job.nice {
114                         continue
115                 }
116                 err := sqc.Slurm.Renice(job.uuid, niceNew)
117                 if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
118                         log.Printf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
119                         job.hitNiceLimit = true
120                 }
121         }
122 }
123
124 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
125 // after calling Stop.
126 func (sqc *SqueueChecker) Stop() {
127         if sqc.done != nil {
128                 close(sqc.done)
129         }
130 }
131
132 // check gets the names of jobs in the SLURM queue (running and
133 // queued). If it succeeds, it updates sqc.queue and wakes up any
134 // goroutines that are waiting in HasUUID() or All().
135 func (sqc *SqueueChecker) check() {
136         // Mutex between squeue sync and running sbatch or scancel.  This
137         // establishes a sequence so that squeue doesn't run concurrently with
138         // sbatch or scancel; the next update of squeue will occur only after
139         // sbatch or scancel has completed.
140         sqc.L.Lock()
141         defer sqc.L.Unlock()
142
143         cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
144         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
145         cmd.Stdout, cmd.Stderr = stdout, stderr
146         if err := cmd.Run(); err != nil {
147                 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
148                 return
149         }
150
151         lines := strings.Split(stdout.String(), "\n")
152         newq := make(map[string]*slurmJob, len(lines))
153         for _, line := range lines {
154                 if line == "" {
155                         continue
156                 }
157                 var uuid, state, reason string
158                 var n, p int64
159                 if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
160                         log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
161                         continue
162                 }
163                 replacing, ok := sqc.queue[uuid]
164                 if !ok {
165                         replacing = &slurmJob{uuid: uuid}
166                 }
167                 replacing.priority = p
168                 replacing.nice = n
169                 newq[uuid] = replacing
170
171                 if state == "PENDING" && ((reason == "BadConstraints" && p <= 2*slurm15NiceLimit) || reason == "launch failed requeued held") && replacing.wantPriority > 0 {
172                         // When using SLURM 14.x or 15.x, our queued
173                         // jobs land in this state when "scontrol
174                         // reconfigure" invalidates their feature
175                         // constraints by clearing all node features.
176                         // They stay in this state even after the
177                         // features reappear, until we run "scontrol
178                         // release {jobid}". Priority is usually 0 in
179                         // this state, but sometimes (due to a race
180                         // with nice adjustments?) it's a small
181                         // positive value.
182                         //
183                         // "scontrol release" is silent and successful
184                         // regardless of whether the features have
185                         // reappeared, so rather than second-guessing
186                         // whether SLURM is ready, we just keep trying
187                         // this until it works.
188                         //
189                         // "launch failed requeued held" seems to be
190                         // another manifestation of this problem,
191                         // resolved the same way.
192                         log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
193                         sqc.Slurm.Release(uuid)
194                 } else if p < 1<<20 && replacing.wantPriority > 0 {
195                         log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
196                 }
197         }
198         sqc.queue = newq
199         sqc.Broadcast()
200 }
201
202 // Initialize, and start a goroutine to call check() once per
203 // squeue.Period until terminated by calling Stop().
204 func (sqc *SqueueChecker) start() {
205         sqc.L = &sync.Mutex{}
206         sqc.done = make(chan struct{})
207         go func() {
208                 ticker := time.NewTicker(sqc.Period)
209                 for {
210                         select {
211                         case <-sqc.done:
212                                 ticker.Stop()
213                                 return
214                         case <-ticker.C:
215                                 sqc.check()
216                                 sqc.reniceAll()
217                         }
218                 }
219         }()
220 }
221
222 // All waits for the next squeue invocation, and returns all job
223 // names reported by squeue.
224 func (sqc *SqueueChecker) All() []string {
225         sqc.startOnce.Do(sqc.start)
226         sqc.L.Lock()
227         defer sqc.L.Unlock()
228         sqc.Wait()
229         var uuids []string
230         for u := range sqc.queue {
231                 uuids = append(uuids, u)
232         }
233         return uuids
234 }