Merge branch '12737-update-rails-api'
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "log"
11         "sort"
12         "strings"
13         "sync"
14         "time"
15 )
16
17 type slurmJob struct {
18         uuid         string
19         wantPriority int64
20         priority     int64 // current slurm priority (incorporates nice value)
21         nice         int64 // current slurm nice value
22 }
23
24 // Squeue implements asynchronous polling monitor of the SLURM queue using the
25 // command 'squeue'.
26 type SqueueChecker struct {
27         Period         time.Duration
28         PrioritySpread int64
29         Slurm          Slurm
30         queue          map[string]*slurmJob
31         startOnce      sync.Once
32         done           chan struct{}
33         sync.Cond
34 }
35
36 // HasUUID checks if a given container UUID is in the slurm queue.
37 // This does not run squeue directly, but instead blocks until woken
38 // up by next successful update of squeue.
39 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
40         sqc.startOnce.Do(sqc.start)
41
42         sqc.L.Lock()
43         defer sqc.L.Unlock()
44
45         // block until next squeue broadcast signaling an update.
46         sqc.Wait()
47         _, exists := sqc.queue[uuid]
48         return exists
49 }
50
51 // SetPriority sets or updates the desired (Arvados) priority for a
52 // container.
53 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
54         sqc.startOnce.Do(sqc.start)
55         sqc.L.Lock()
56         defer sqc.L.Unlock()
57         job, ok := sqc.queue[uuid]
58         if !ok {
59                 // Wait in case the slurm job was just submitted and
60                 // will appear in the next squeue update.
61                 sqc.Wait()
62                 if job, ok = sqc.queue[uuid]; !ok {
63                         return
64                 }
65         }
66         job.wantPriority = want
67 }
68
69 // adjust slurm job nice values as needed to ensure slurm priority
70 // order matches Arvados priority order.
71 func (sqc *SqueueChecker) reniceAll() {
72         sqc.L.Lock()
73         defer sqc.L.Unlock()
74
75         jobs := make([]*slurmJob, 0, len(sqc.queue))
76         for _, j := range sqc.queue {
77                 if j.wantPriority == 0 {
78                         // SLURM job with unknown Arvados priority
79                         // (perhaps it's not an Arvados job)
80                         continue
81                 }
82                 if j.priority == 0 {
83                         // SLURM <= 15.x implements "hold" by setting
84                         // priority to 0. If we include held jobs
85                         // here, we'll end up trying to push other
86                         // jobs below them using negative priority,
87                         // which won't help anything.
88                         continue
89                 }
90                 jobs = append(jobs, j)
91         }
92
93         sort.Slice(jobs, func(i, j int) bool {
94                 return jobs[i].wantPriority > jobs[j].wantPriority
95         })
96         renice := wantNice(jobs, sqc.PrioritySpread)
97         for i, job := range jobs {
98                 if renice[i] == job.nice {
99                         continue
100                 }
101                 sqc.Slurm.Renice(job.uuid, renice[i])
102         }
103 }
104
105 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
106 // after calling Stop.
107 func (sqc *SqueueChecker) Stop() {
108         if sqc.done != nil {
109                 close(sqc.done)
110         }
111 }
112
113 // check gets the names of jobs in the SLURM queue (running and
114 // queued). If it succeeds, it updates sqc.queue and wakes up any
115 // goroutines that are waiting in HasUUID() or All().
116 func (sqc *SqueueChecker) check() {
117         // Mutex between squeue sync and running sbatch or scancel.  This
118         // establishes a sequence so that squeue doesn't run concurrently with
119         // sbatch or scancel; the next update of squeue will occur only after
120         // sbatch or scancel has completed.
121         sqc.L.Lock()
122         defer sqc.L.Unlock()
123
124         cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
125         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
126         cmd.Stdout, cmd.Stderr = stdout, stderr
127         if err := cmd.Run(); err != nil {
128                 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
129                 return
130         }
131
132         lines := strings.Split(stdout.String(), "\n")
133         newq := make(map[string]*slurmJob, len(lines))
134         for _, line := range lines {
135                 if line == "" {
136                         continue
137                 }
138                 var uuid, state, reason string
139                 var n, p int64
140                 if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
141                         log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
142                         continue
143                 }
144                 replacing, ok := sqc.queue[uuid]
145                 if !ok {
146                         replacing = &slurmJob{uuid: uuid}
147                 }
148                 replacing.priority = p
149                 replacing.nice = n
150                 newq[uuid] = replacing
151
152                 if state == "PENDING" && reason == "BadConstraints" && p == 0 && replacing.wantPriority > 0 {
153                         // When using SLURM 14.x or 15.x, our queued
154                         // jobs land in this state when "scontrol
155                         // reconfigure" invalidates their feature
156                         // constraints by clearing all node features.
157                         // They stay in this state even after the
158                         // features reappear, until we run "scontrol
159                         // release {jobid}".
160                         //
161                         // "scontrol release" is silent and successful
162                         // regardless of whether the features have
163                         // reappeared, so rather than second-guessing
164                         // whether SLURM is ready, we just keep trying
165                         // this until it works.
166                         sqc.Slurm.Release(uuid)
167                 }
168         }
169         sqc.queue = newq
170         sqc.Broadcast()
171 }
172
173 // Initialize, and start a goroutine to call check() once per
174 // squeue.Period until terminated by calling Stop().
175 func (sqc *SqueueChecker) start() {
176         sqc.L = &sync.Mutex{}
177         sqc.done = make(chan struct{})
178         go func() {
179                 ticker := time.NewTicker(sqc.Period)
180                 for {
181                         select {
182                         case <-sqc.done:
183                                 ticker.Stop()
184                                 return
185                         case <-ticker.C:
186                                 sqc.check()
187                                 sqc.reniceAll()
188                         }
189                 }
190         }()
191 }
192
193 // All waits for the next squeue invocation, and returns all job
194 // names reported by squeue.
195 func (sqc *SqueueChecker) All() []string {
196         sqc.startOnce.Do(sqc.start)
197         sqc.L.Lock()
198         defer sqc.L.Unlock()
199         sqc.Wait()
200         var uuids []string
201         for u := range sqc.queue {
202                 uuids = append(uuids, u)
203         }
204         return uuids
205 }