Merge branch 'master' of git.curoverse.com:arvados into 13076-r-autogen-api
[arvados.git] / services / crunch-dispatch-slurm / squeue.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "bytes"
9         "fmt"
10         "log"
11         "sort"
12         "strings"
13         "sync"
14         "time"
15 )
16
17 type slurmJob struct {
18         uuid         string
19         wantPriority int64
20         priority     int64 // current slurm priority (incorporates nice value)
21         nice         int64 // current slurm nice value
22 }
23
24 // Squeue implements asynchronous polling monitor of the SLURM queue using the
25 // command 'squeue'.
26 type SqueueChecker struct {
27         Period         time.Duration
28         PrioritySpread int64
29         Slurm          Slurm
30         queue          map[string]*slurmJob
31         startOnce      sync.Once
32         done           chan struct{}
33         sync.Cond
34 }
35
36 // HasUUID checks if a given container UUID is in the slurm queue.
37 // This does not run squeue directly, but instead blocks until woken
38 // up by next successful update of squeue.
39 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
40         sqc.startOnce.Do(sqc.start)
41
42         sqc.L.Lock()
43         defer sqc.L.Unlock()
44
45         // block until next squeue broadcast signaling an update.
46         sqc.Wait()
47         _, exists := sqc.queue[uuid]
48         return exists
49 }
50
51 // SetPriority sets or updates the desired (Arvados) priority for a
52 // container.
53 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
54         sqc.startOnce.Do(sqc.start)
55         sqc.L.Lock()
56         defer sqc.L.Unlock()
57         job, ok := sqc.queue[uuid]
58         if !ok {
59                 // Wait in case the slurm job was just submitted and
60                 // will appear in the next squeue update.
61                 sqc.Wait()
62                 if job, ok = sqc.queue[uuid]; !ok {
63                         return
64                 }
65         }
66         job.wantPriority = want
67 }
68
69 // adjust slurm job nice values as needed to ensure slurm priority
70 // order matches Arvados priority order.
71 func (sqc *SqueueChecker) reniceAll() {
72         sqc.L.Lock()
73         defer sqc.L.Unlock()
74
75         jobs := make([]*slurmJob, 0, len(sqc.queue))
76         for _, j := range sqc.queue {
77                 if j.wantPriority == 0 {
78                         // SLURM job with unknown Arvados priority
79                         // (perhaps it's not an Arvados job)
80                         continue
81                 }
82                 jobs = append(jobs, j)
83         }
84
85         sort.Slice(jobs, func(i, j int) bool {
86                 return jobs[i].wantPriority > jobs[j].wantPriority
87         })
88         renice := wantNice(jobs, sqc.PrioritySpread)
89         for i, job := range jobs {
90                 if renice[i] == job.nice {
91                         continue
92                 }
93                 log.Printf("updating slurm priority for %q: nice %d => %d", job.uuid, job.nice, renice[i])
94                 sqc.Slurm.Renice(job.uuid, renice[i])
95         }
96 }
97
98 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
99 // after calling Stop.
100 func (sqc *SqueueChecker) Stop() {
101         if sqc.done != nil {
102                 close(sqc.done)
103         }
104 }
105
106 // check gets the names of jobs in the SLURM queue (running and
107 // queued). If it succeeds, it updates sqc.queue and wakes up any
108 // goroutines that are waiting in HasUUID() or All().
109 func (sqc *SqueueChecker) check() {
110         // Mutex between squeue sync and running sbatch or scancel.  This
111         // establishes a sequence so that squeue doesn't run concurrently with
112         // sbatch or scancel; the next update of squeue will occur only after
113         // sbatch or scancel has completed.
114         sqc.L.Lock()
115         defer sqc.L.Unlock()
116
117         cmd := sqc.Slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
118         stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
119         cmd.Stdout, cmd.Stderr = stdout, stderr
120         if err := cmd.Run(); err != nil {
121                 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
122                 return
123         }
124
125         lines := strings.Split(stdout.String(), "\n")
126         newq := make(map[string]*slurmJob, len(lines))
127         for _, line := range lines {
128                 if line == "" {
129                         continue
130                 }
131                 var uuid string
132                 var n, p int64
133                 if _, err := fmt.Sscan(line, &uuid, &n, &p); err != nil {
134                         log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
135                         continue
136                 }
137                 replacing, ok := sqc.queue[uuid]
138                 if !ok {
139                         replacing = &slurmJob{uuid: uuid}
140                 }
141                 replacing.priority = p
142                 replacing.nice = n
143                 newq[uuid] = replacing
144         }
145         sqc.queue = newq
146         sqc.Broadcast()
147 }
148
149 // Initialize, and start a goroutine to call check() once per
150 // squeue.Period until terminated by calling Stop().
151 func (sqc *SqueueChecker) start() {
152         sqc.L = &sync.Mutex{}
153         sqc.done = make(chan struct{})
154         go func() {
155                 ticker := time.NewTicker(sqc.Period)
156                 for {
157                         select {
158                         case <-sqc.done:
159                                 ticker.Stop()
160                                 return
161                         case <-ticker.C:
162                                 sqc.check()
163                                 sqc.reniceAll()
164                         }
165                 }
166         }()
167 }
168
169 // All waits for the next squeue invocation, and returns all job
170 // names reported by squeue.
171 func (sqc *SqueueChecker) All() []string {
172         sqc.startOnce.Do(sqc.start)
173         sqc.L.Lock()
174         defer sqc.L.Unlock()
175         sqc.Wait()
176         var uuids []string
177         for u := range sqc.queue {
178                 uuids = append(uuids, u)
179         }
180         return uuids
181 }