1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
17 type slurmJob struct {
20 priority int64 // current slurm priority (incorporates nice value)
21 nice int64 // current slurm nice value
24 // Squeue implements asynchronous polling monitor of the SLURM queue using the
26 type SqueueChecker struct {
30 queue map[string]*slurmJob
36 // HasUUID checks if a given container UUID is in the slurm queue.
37 // This does not run squeue directly, but instead blocks until woken
38 // up by next successful update of squeue.
39 func (sqc *SqueueChecker) HasUUID(uuid string) bool {
40 sqc.startOnce.Do(sqc.start)
45 // block until next squeue broadcast signaling an update.
47 _, exists := sqc.queue[uuid]
51 // SetPriority sets or updates the desired (Arvados) priority for a
53 func (sqc *SqueueChecker) SetPriority(uuid string, want int64) {
54 sqc.startOnce.Do(sqc.start)
57 job, ok := sqc.queue[uuid]
59 // Wait in case the slurm job was just submitted and
60 // will appear in the next squeue update.
62 if job, ok = sqc.queue[uuid]; !ok {
66 job.wantPriority = want
69 // adjust slurm job nice values as needed to ensure slurm priority
70 // order matches Arvados priority order.
71 func (sqc *SqueueChecker) reniceAll() {
75 jobs := make([]*slurmJob, 0, len(sqc.queue))
76 for _, j := range sqc.queue {
77 if j.wantPriority == 0 {
78 // SLURM job with unknown Arvados priority
79 // (perhaps it's not an Arvados job)
83 // SLURM <= 15.x implements "hold" by setting
84 // priority to 0. If we include held jobs
85 // here, we'll end up trying to push other
86 // jobs below them using negative priority,
87 // which won't help anything.
90 jobs = append(jobs, j)
93 sort.Slice(jobs, func(i, j int) bool {
94 if jobs[i].wantPriority != jobs[j].wantPriority {
95 return jobs[i].wantPriority > jobs[j].wantPriority
97 // break ties with container uuid --
98 // otherwise, the ordering would change from
99 // one interval to the next, and we'd do many
100 // pointless slurm queue rearrangements.
101 return jobs[i].uuid > jobs[j].uuid
104 renice := wantNice(jobs, sqc.PrioritySpread)
105 for i, job := range jobs {
106 if renice[i] == job.nice {
109 sqc.Slurm.Renice(job.uuid, renice[i])
113 // Stop stops the squeue monitoring goroutine. Do not call HasUUID
114 // after calling Stop.
115 func (sqc *SqueueChecker) Stop() {
121 // check gets the names of jobs in the SLURM queue (running and
122 // queued). If it succeeds, it updates sqc.queue and wakes up any
123 // goroutines that are waiting in HasUUID() or All().
124 func (sqc *SqueueChecker) check() {
125 // Mutex between squeue sync and running sbatch or scancel. This
126 // establishes a sequence so that squeue doesn't run concurrently with
127 // sbatch or scancel; the next update of squeue will occur only after
128 // sbatch or scancel has completed.
132 cmd := sqc.Slurm.QueueCommand([]string{"--all", "--noheader", "--format=%j %y %Q %T %r"})
133 stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
134 cmd.Stdout, cmd.Stderr = stdout, stderr
135 if err := cmd.Run(); err != nil {
136 log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
140 lines := strings.Split(stdout.String(), "\n")
141 newq := make(map[string]*slurmJob, len(lines))
142 for _, line := range lines {
146 var uuid, state, reason string
148 if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
149 log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
152 replacing, ok := sqc.queue[uuid]
154 replacing = &slurmJob{uuid: uuid}
156 replacing.priority = p
158 newq[uuid] = replacing
160 if state == "PENDING" && reason == "BadConstraints" && p == 0 && replacing.wantPriority > 0 {
161 // When using SLURM 14.x or 15.x, our queued
162 // jobs land in this state when "scontrol
163 // reconfigure" invalidates their feature
164 // constraints by clearing all node features.
165 // They stay in this state even after the
166 // features reappear, until we run "scontrol
169 // "scontrol release" is silent and successful
170 // regardless of whether the features have
171 // reappeared, so rather than second-guessing
172 // whether SLURM is ready, we just keep trying
173 // this until it works.
174 sqc.Slurm.Release(uuid)
181 // Initialize, and start a goroutine to call check() once per
182 // squeue.Period until terminated by calling Stop().
183 func (sqc *SqueueChecker) start() {
184 sqc.L = &sync.Mutex{}
185 sqc.done = make(chan struct{})
187 ticker := time.NewTicker(sqc.Period)
201 // All waits for the next squeue invocation, and returns all job
202 // names reported by squeue.
203 func (sqc *SqueueChecker) All() []string {
204 sqc.startOnce.Do(sqc.start)
209 for u := range sqc.queue {
210 uuids = append(uuids, u)