Merge branch '12552-wf-priority'
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "log"
11         "sort"
12         "strings"
13         "sync"
14         "time"
15 )
16
17 type slurmJob struct {
18         uuid         string
19         wantPriority int64
20         priority     int64 // current slurm priority (incorporates nice value)
21         nice         int64 // current slurm nice value
22 }
23
24 // Squeue implements asynchronous polling monitor of the SLURM queue using the
25 // command 'squeue'.
26 type SqueueChecker struct {
27         Period         time.Duration
28         PrioritySpread int64
29         Slurm          Slurm
30         queue          map[string]*slurmJob
31         startOnce      sync.Once
32         done           chan struct{}
33         sync.Cond
34 }
35
36 // HasUUID checks if a given container UUID is in the slurm queue.
37 // This does not run squeue directly, but instead blocks until woken
38 // up by next successful update of squeue.
39 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
40         sqc.startOnce.Do(sqc.start)
41
42         sqc.L.Lock()
43         defer sqc.L.Unlock()
44
45         // block until next squeue broadcast signaling an update.
46         sqc.Wait()
47         _, exists := sqc.queue[uuid]
48         return exists
49 }
50
51 // SetPriority sets or updates the desired (Arvados) priority for a
52 // container.
53 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
54         sqc.startOnce.Do(sqc.start)
55         sqc.L.Lock()
56         defer sqc.L.Unlock()
57         job, ok := sqc.queue[uuid]
58         if !ok {
59                 // Wait in case the slurm job was just submitted and
60                 // will appear in the next squeue update.
61                 sqc.Wait()
62                 if job, ok = sqc.queue[uuid]; !ok {
63                         return
64                 }
65         }
66         job.wantPriority = want
67 }
68
69 // adjust slurm job nice values as needed to ensure slurm priority
70 // order matches Arvados priority order.
71 func (sqc *SqueueChecker) reniceAll() {
72         sqc.L.Lock()
73         defer sqc.L.Unlock()
74
75         jobs := make([]*slurmJob, 0, len(sqc.queue))
76         for _, j := range sqc.queue {
77                 if j.wantPriority == 0 {
78                         // SLURM job with unknown Arvados priority
79                         // (perhaps it's not an Arvados job)
80                         continue
81                 }
82                 if j.priority == 0 {
83                         // SLURM <= 15.x implements "hold" by setting
84                         // priority to 0. If we include held jobs
85                         // here, we'll end up trying to push other
86                         // jobs below them using negative priority,
87                         // which won't help anything.
88                         continue
89                 }
90                 jobs = append(jobs, j)
91         }
92
93         sort.Slice(jobs, func(i, j int) bool {
94                 if jobs[i].wantPriority != jobs[j].wantPriority {
95                         return jobs[i].wantPriority > jobs[j].wantPriority
96                 } else {
97                         // break ties with container uuid --
98                         // otherwise, the ordering would change from
99                         // one interval to the next, and we'd do many
100                         // pointless slurm queue rearrangements.
101                         return jobs[i].uuid > jobs[j].uuid
102                 }
103         })
104         renice := wantNice(jobs, sqc.PrioritySpread)
105         for i, job := range jobs {
106                 if renice[i] == job.nice {
107                         continue
108                 }
109                 sqc.Slurm.Renice(job.uuid, renice[i])
110         }
111 }
112
113 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
114 // after calling Stop.
115 func (sqc *SqueueChecker) Stop() {
116         if sqc.done != nil {
117                 close(sqc.done)
118         }
119 }
120
121 // check gets the names of jobs in the SLURM queue (running and
122 // queued). If it succeeds, it updates sqc.queue and wakes up any
123 // goroutines that are waiting in HasUUID() or All().
124 func (sqc *SqueueChecker) check() {
125         // Mutex between squeue sync and running sbatch or scancel.  This
126         // establishes a sequence so that squeue doesn't run concurrently with
127         // sbatch or scancel; the next update of squeue will occur only after
128         // sbatch or scancel has completed.
129         sqc.L.Lock()
130         defer sqc.L.Unlock()
131
132         cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
133         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
134         cmd.Stdout, cmd.Stderr = stdout, stderr
135         if err := cmd.Run(); err != nil {
136                 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
137                 return
138         }
139
140         lines := strings.Split(stdout.String(), "\n")
141         newq := make(map[string]*slurmJob, len(lines))
142         for _, line := range lines {
143                 if line == "" {
144                         continue
145                 }
146                 var uuid, state, reason string
147                 var n, p int64
148                 if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
149                         log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
150                         continue
151                 }
152                 replacing, ok := sqc.queue[uuid]
153                 if !ok {
154                         replacing = &slurmJob{uuid: uuid}
155                 }
156                 replacing.priority = p
157                 replacing.nice = n
158                 newq[uuid] = replacing
159
160                 if state == "PENDING" && reason == "BadConstraints" && p == 0 && replacing.wantPriority > 0 {
161                         // When using SLURM 14.x or 15.x, our queued
162                         // jobs land in this state when "scontrol
163                         // reconfigure" invalidates their feature
164                         // constraints by clearing all node features.
165                         // They stay in this state even after the
166                         // features reappear, until we run "scontrol
167                         // release {jobid}".
168                         //
169                         // "scontrol release" is silent and successful
170                         // regardless of whether the features have
171                         // reappeared, so rather than second-guessing
172                         // whether SLURM is ready, we just keep trying
173                         // this until it works.
174                         sqc.Slurm.Release(uuid)
175                 }
176         }
177         sqc.queue = newq
178         sqc.Broadcast()
179 }
180
181 // Initialize, and start a goroutine to call check() once per
182 // squeue.Period until terminated by calling Stop().
183 func (sqc *SqueueChecker) start() {
184         sqc.L = &sync.Mutex{}
185         sqc.done = make(chan struct{})
186         go func() {
187                 ticker := time.NewTicker(sqc.Period)
188                 for {
189                         select {
190                         case <-sqc.done:
191                                 ticker.Stop()
192                                 return
193                         case <-ticker.C:
194                                 sqc.check()
195                                 sqc.reniceAll()
196                         }
197                 }
198         }()
199 }
200
201 // All waits for the next squeue invocation, and returns all job
202 // names reported by squeue.
203 func (sqc *SqueueChecker) All() []string {
204         sqc.startOnce.Do(sqc.start)
205         sqc.L.Lock()
206         defer sqc.L.Unlock()
207         sqc.Wait()
208         var uuids []string
209         for u := range sqc.queue {
210                 uuids = append(uuids, u)
211         }
212         return uuids
213 }