12552: Add slurm-renice test cases.
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "log"
11         "sort"
12         "strings"
13         "sync"
14         "time"
15 )
16
17 type slurmJob struct {
18         uuid         string
19         wantPriority int64
20         priority     int64 // current slurm priority (incorporates nice value)
21         nice         int64 // current slurm nice value
22 }
23
24 // Squeue implements asynchronous polling monitor of the SLURM queue using the
25 // command 'squeue'.
26 type SqueueChecker struct {
27         Period         time.Duration
28         PrioritySpread int64
29         Slurm          Slurm
30         queue          map[string]*slurmJob
31         startOnce      sync.Once
32         done           chan struct{}
33         sync.Cond
34 }
35
36 // HasUUID checks if a given container UUID is in the slurm queue.
37 // This does not run squeue directly, but instead blocks until woken
38 // up by next successful update of squeue.
39 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
40         sqc.startOnce.Do(sqc.start)
41
42         sqc.L.Lock()
43         defer sqc.L.Unlock()
44
45         // block until next squeue broadcast signaling an update.
46         sqc.Wait()
47         _, exists := sqc.queue[uuid]
48         return exists
49 }
50
51 // SetPriority sets or updates the desired (Arvados) priority for a
52 // container.
53 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
54         sqc.startOnce.Do(sqc.start)
55         sqc.L.Lock()
56         defer sqc.L.Unlock()
57         if _, ok := sqc.queue[uuid]; !ok {
58                 // Wait in case the slurm job was just submitted and
59                 // will appear in the next squeue update.
60                 sqc.Wait()
61                 if _, ok = sqc.queue[uuid]; !ok {
62                         return
63                 }
64         }
65         sqc.queue[uuid].wantPriority = want
66 }
67
68 // adjust slurm job nice values as needed to ensure slurm priority
69 // order matches Arvados priority order.
70 func (sqc *SqueueChecker) reniceAll() {
71         sqc.L.Lock()
72         defer sqc.L.Unlock()
73
74         jobs := make([]*slurmJob, 0, len(sqc.queue))
75         for _, j := range sqc.queue {
76                 if j.wantPriority == 0 {
77                         // SLURM job with unknown Arvados priority
78                         // (perhaps it's not an Arvados job)
79                         continue
80                 }
81                 jobs = append(jobs, j)
82         }
83
84         sort.Slice(jobs, func(i, j int) bool {
85                 return jobs[i].wantPriority > jobs[j].wantPriority
86         })
87         renice := wantNice(jobs, sqc.PrioritySpread)
88         for i, job := range jobs {
89                 if renice[i] == job.nice {
90                         continue
91                 }
92                 log.Printf("updating slurm priority for %q: nice %d => %d", job.uuid, job.nice, renice[i])
93                 sqc.Slurm.Renice(job.uuid, renice[i])
94         }
95 }
96
97 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
98 // after calling Stop.
99 func (sqc *SqueueChecker) Stop() {
100         if sqc.done != nil {
101                 close(sqc.done)
102         }
103 }
104
105 // check gets the names of jobs in the SLURM queue (running and
106 // queued). If it succeeds, it updates sqc.queue and wakes up any
107 // goroutines that are waiting in HasUUID() or All().
108 func (sqc *SqueueChecker) check() {
109         // Mutex between squeue sync and running sbatch or scancel.  This
110         // establishes a sequence so that squeue doesn't run concurrently with
111         // sbatch or scancel; the next update of squeue will occur only after
112         // sbatch or scancel has completed.
113         sqc.L.Lock()
114         defer sqc.L.Unlock()
115
116         cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
117         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
118         cmd.Stdout, cmd.Stderr = stdout, stderr
119         if err := cmd.Run(); err != nil {
120                 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
121                 return
122         }
123
124         lines := strings.Split(stdout.String(), "\n")
125         newq := make(map[string]*slurmJob, len(lines))
126         for _, line := range lines {
127                 if line == "" {
128                         continue
129                 }
130                 var uuid string
131                 var n, p int64
132                 if _, err := fmt.Sscan(line, &uuid, &n, &p); err != nil {
133                         log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
134                         continue
135                 }
136                 replacing, ok := sqc.queue[uuid]
137                 if !ok {
138                         replacing = &slurmJob{uuid: uuid}
139                 }
140                 replacing.priority = p
141                 replacing.nice = n
142                 newq[uuid] = replacing
143         }
144         sqc.queue = newq
145         sqc.Broadcast()
146 }
147
148 // Initialize, and start a goroutine to call check() once per
149 // squeue.Period until terminated by calling Stop().
150 func (sqc *SqueueChecker) start() {
151         sqc.L = &sync.Mutex{}
152         sqc.done = make(chan struct{})
153         go func() {
154                 ticker := time.NewTicker(sqc.Period)
155                 for {
156                         select {
157                         case <-sqc.done:
158                                 ticker.Stop()
159                                 return
160                         case <-ticker.C:
161                                 sqc.check()
162                                 sqc.reniceAll()
163                         }
164                 }
165         }()
166 }
167
168 // All waits for the next squeue invocation, and returns all job
169 // names reported by squeue.
170 func (sqc *SqueueChecker) All() []string {
171         sqc.startOnce.Do(sqc.start)
172         sqc.L.Lock()
173         defer sqc.L.Unlock()
174         sqc.Wait()
175         var uuids []string
176         for u := range sqc.queue {
177                 uuids = append(uuids, u)
178         }
179         return uuids
180 }